관리-도구
편집 파일: api.py
"""Gather information from Plesk via DB querries. """ import logging import re from collections import defaultdict from typing import Dict, List, Sequence from defence360agent.utils import CheckRunError, check_run, retry_on from defence360agent.subsys.panels.base import PanelException logger = logging.getLogger(__name__) async def raise_panel_exception(*args, **kwargs): raise PanelException(*args, **kwargs) @retry_on(CheckRunError, max_tries=3, on_error=raise_panel_exception) async def _run_query(query): return (await check_run(["plesk", "db", "-N", "-e", query])).decode() async def get_user_to_domain() -> Dict[str, List[str]]: """Return mapping: user -> user's domains.""" result = ( await _run_query( "select login, name from domains " " left join hosting on dom_id = domains.id " " right join sys_users on hosting.sys_user_id = sys_users.id" ) ).split() result_mapping = defaultdict(list) for user, domain in zip(result[0::2], result[1::2]): if domain != "NULL": result_mapping[user].append(domain) return result_mapping async def get_domain_to_user() -> Dict[str, List[str]]: """Return mapping: domain -> user.""" result = ( await _run_query( "select name, login " "from domains " " left join hosting on dom_id = domains.id " " left join sys_users on hosting.sys_user_id = sys_users.id" ) ).split() return {domain: [user] for domain, user in zip(result[0::2], result[1::2])} async def get_user_details() -> Dict[str, Dict[str, str]]: """ Returns dict with user to email and locale pairs Not used, because MyImunify implemented for cPanel only yet """ user_details = {} results = await _run_query( "SELECT CONCAT(login, ';',email, ';',locale) FROM clients;" ) for record in results.split("\n"): if not record: continue user, email, locale = record.split(";") user_details[user] = { "email": email, "locale": locale.replace("-", "_"), } return user_details async def get_domains() -> List[str]: """Return: list of domains""" return (await _run_query("select name from domains")).split() async def get_users() -> List[str]: """Return: users that created by Plesk""" return ( await _run_query( "SELECT sys_users.login FROM sys_users JOIN hosting ON" " hosting.sys_user_id=sys_users.id JOIN domains ON" " hosting.dom_id=domains.id AND domains.webspace_id=0" ) ).split() async def count_customers_with_subscriptions() -> int: # pragma: no cover """Return: count active customers with at least one (any) domain""" return int( await _run_query( "select count(distinct cl_id) from clients c join domains d on " "d.cl_id = c.id where c.status = 0" ) ) async def get_admin_emails() -> list: emails = await _run_query("SELECT email FROM clients WHERE type='admin';") return [email for email in emails.split("\n") if email] async def list_docroots() -> Sequence[str]: query = "SELECT DISTINCT hosting.www_root FROM hosting;" return (await _run_query(query)).split() async def list_docroots_domains_users(): sql = ( "select hosting.www_root, domains.name, sys_users.login" " from hosting" " inner join domains on hosting.dom_id = domains.id" " inner join sys_users on hosting.sys_user_id=sys_users.id" ) data = await _run_query(sql) retval = [string.split() for string in data.split("\n") if string] return retval