관리-도구
편집 파일: whm.py
import json import logging import subprocess import signal from urllib.parse import quote from defence360agent.utils import check_run, CheckRunError from defence360agent.subsys.panels.base import PanelException logger = logging.getLogger(__name__) # use complete path as recommended by cPanel docs # https://documentation.cpanel.net/display/DD/WHM+API+1+Functions+-+modsec_is_installed WHMAPI1_CMD = "/usr/sbin/whmapi1" WHMAPI_CERT_ERROR_LIST = [ "no certificate", "no key with the id", "cannot read license file", "invalid license file", "license file expired", ] class WHMAPIException(PanelException): """Got broken output or other problem during WHMAPI call""" pass class WHMAPILicenseError(WHMAPIException): """Raises when cannot Read License File""" pass async def whmapi1(function, sudo=False, **kwargs): cmd = ["sudo"] if sudo else [] cmd.extend([WHMAPI1_CMD, "--output=json", function]) params = ["{}={}".format(k, quote(v)) for k, v in kwargs.items()] try: raw_output = (await check_run(cmd + params)).decode() except CheckRunError as e: if e.returncode == -signal.SIGTERM: logger.warning(e) raise WHMAPIException(e) else: raise e try: output = json.loads(raw_output) except json.JSONDecodeError as e: raise WHMAPIException( f"Broken output from whmapi1: {raw_output!r}, reason: {e}" ) from e try: if output["metadata"]["result"]: return output["data"] else: raise WHMAPIException( "whmapi {} command failed: {}".format( output["metadata"]["command"], output["metadata"]["reason"] ) ) except KeyError as e: if ("statusmsg", "Cannot Read License File") in output.items(): logger.warning("Cannot Read CPanel License File") raise WHMAPILicenseError else: raise WHMAPIException( "Broken output from whmapi1 (KeyError: {}): {!r}".format( e, output ) ) def run_whmapi(args, *path_list): # FIXME: this script partly copypaste 'whmapi1' function cmd = [WHMAPI1_CMD, *args, "--output=json"] try: logger.debug("subprocess.run(%r)", cmd) res = subprocess.run(cmd, check=True, stdout=subprocess.PIPE) except subprocess.CalledProcessError as e: raise WHMAPIException("Failed to run whmapi1: %s" % e) from e if path_list: decoded_output = json.loads(res.stdout.decode()) result = [] for i, element_path in enumerate(path_list): try: item = decoded_output for key in element_path: item = item[key] result.append(item) except KeyError as e: if i == 0: # we guarantee to *always* return first element from # path_list raise WHMAPIException( "Could not parse whmapi1 output" ) from e else: # and have no guarantee for the rest of path_list result.append(None) else: return if len(result) == 1: return result[0] else: return result def run_whmapi_result(args): return run_whmapi(args, ["metadata", "result"]) def run_whmapi_result_and_reason(args): result, reason = run_whmapi( args, ["metadata", "result"], ["metadata", "reason"] ) # explicit is better than implicit! return result, reason