관리-도구
편집 파일: panel.py
import asyncio import base64 import json import logging import os import pwd import re import http.client import urllib import urllib.parse import socket from collections import defaultdict from packaging.version import Version from typing import Dict, List, Set from defence360agent.application.determine_hosting_panel import ( DA_FILE, is_directadmin_installed, ) from defence360agent.contracts.config import Core from defence360agent.utils import run, timeit from .. import base from ..base import PanelException logger = logging.getLogger(__name__) BASE_DIR = "/home" CMD = "/usr/bin/imunify360-command-wrapper" HOOKS_DIR = "/usr/local/directadmin/scripts/custom" SUDO_GROUP = "imunify360-sudousers" SUDO_LINE = "%{0} ALL=NOPASSWD: {1}".format(SUDO_GROUP, CMD) SUDO_TTY_LINE = "Defaults!/usr/bin/imunify360-command-wrapper !requiretty" _VIRTUAL_DOMAINOWNERS = "/etc/virtual/domainowners" TCP_PORTS_DA = base.TCP_PORTS_COMMON + ["2222", "35000-35999"] class DirectAdminException(base.PanelException): pass def get_user_domains(path=_VIRTUAL_DOMAINOWNERS) -> Dict[str, str]: """Return a mapping from domain name to user name owning this domain.""" domains = {} with open(path, "rb") as f: for bline in f: try: line = bline.decode() except UnicodeDecodeError as e: logger.warning("Broken line in %s: %r (%s)", path, bline, e) continue pos = line.find(":") if pos != -1: domains[line[:pos].strip()] = line[pos + 1 :].strip() return domains async def get_directadmin_version() -> Version: cmd = ["/usr/local/directadmin/directadmin", "v"] retcode, stdout, stderr = await run(cmd) try: version_pattern = rb"^(Version: )?DirectAdmin (v.)?([\d.]+)" result = re.search(version_pattern, stdout, flags=re.MULTILINE) return Version(result.group(3).decode()) except (ValueError, AttributeError): raise PanelException( "Failed to parse directadmin version." f" {retcode=}, {stdout=}, {stderr=}" ) class DirectAdmin(base.AbstractPanel): NAME = "DirectAdmin" DA_BINARY = DA_FILE OPEN_PORTS = { "tcp": { "in": ["465"] + TCP_PORTS_DA, "out": ["113"] + TCP_PORTS_DA, }, "udp": { "in": ["20", "21", "53", "443", "35000-35999", "80"], "out": ["20", "21", "53", "113", "123", "35000-35999"], }, } exception = DirectAdminException @classmethod def is_installed(cls): return is_directadmin_installed() @classmethod async def version(cls): # example output 'Version: DirectAdmin v.1.53.0' _, data, _ = await run([cls.DA_BINARY, "v"]) return data.decode().split()[2] @base.ensure_valid_panel() async def add_sudouser(self, user): if user in self._get_admins() or os.environ.get("usertype") == "admin": os.system("gpasswd -a {0} {1}".format(user, SUDO_GROUP)) @base.ensure_valid_panel() async def delete_sudouser(self, user): if user in self._get_admins(): os.system("gpasswd -d {0} {1}".format(user, SUDO_GROUP)) @staticmethod def _add_line(path, content): with open(path, "r+") as f: content += "\n" if content not in f.readlines(): f.write(content) @staticmethod def _remove_line(path, content): with open(path, "r+") as f: data = "".join(line for line in f if content not in line.strip()) f.seek(0) f.truncate(0) f.write(data) def _get_admins(self): with open("/usr/local/directadmin/data/admin/admin.list", "r") as f: admin_list = f.read().split() return admin_list def _create_hook(self, hook, content): path = os.path.join(HOOKS_DIR, hook) if not os.path.exists(path): open(path, "w").close() self._add_line(path, "#!/bin/sh") uid = pwd.getpwnam("diradmin").pw_uid gid = pwd.getpwnam("diradmin").pw_uid os.chown(path, uid, gid) os.chmod(path, 0o700) self._add_line(path, content) def _delete_hook(self, hook, content): path = os.path.join(HOOKS_DIR, hook) if os.path.exists(path): self._remove_line(path, content) @base.ensure_valid_panel() async def enable_imunify360_plugin(self, name=None): os.system("/usr/sbin/groupadd -f {}".format(SUDO_GROUP)) self._add_line("/etc/sudoers", SUDO_LINE) self._add_line("/etc/sudoers", SUDO_TTY_LINE) for user in self._get_admins(): await self.add_sudouser(user) self._create_hook( "user_create_post.sh", '/usr/bin/imunify360-agent add-sudouser --user "$username"', ) self._create_hook( "user_destroy_pre.sh", '/usr/bin/imunify360-agent delete-sudouser --user "$username"', ) self._create_hook( "user_restore_post.sh", '/usr/bin/imunify360-agent add-sudouser --user "$username"', ) @base.ensure_valid_panel() async def disable_imunify360_plugin(self, plugin_name=None): self._remove_line("/etc/sudoers", SUDO_LINE) self._remove_line("/etc/sudoers", SUDO_TTY_LINE) for user in self._get_admins(): await self.delete_sudouser(user) os.system("/usr/sbin/groupdel {}".format(SUDO_GROUP)) self._delete_hook( "user_create_post.sh", '/usr/bin/imunify360-agent add-sudouser --user "$username"', ) self._delete_hook( "user_destroy_pre.sh", '/usr/bin/imunify360-agent delete-sudouser --user "$username"', ) self._delete_hook( "user_restore_post.sh", '/usr/bin/imunify360-agent add-sudouser --user "$username"', ) async def get_users(self) -> List[str]: """ :return: list: list of directadmin users """ return list(set(get_user_domains().values())) async def get_user_domains(self): """ :return: list: domains hosted on server via directadmin """ return list(get_user_domains().keys()) async def get_domain_to_owner(self): """ :return: domain to list of users pairs """ return {domain: [user] for domain, user in get_user_domains().items()} async def get_domains_per_user(self): """ :return: user to list of domains pairs """ user_to_domains = defaultdict(list) for domain, user in get_user_domains().items(): user_to_domains[user].append(domain) return user_to_domains def basedirs(self) -> Set[str]: return {BASE_DIR} async def docroots_info(self) -> Dict: if await get_directadmin_version() >= Version("1.62.8"): return await self.docroots_info_new() return await self.docroots_info_legacy() async def docroots_info_new(self) -> Dict: cmd = ["/usr/local/directadmin/directadmin", "--root-auth-url"] with timeit("Call DA binary to obtain auth URL", logger): retcode, stdout, stderr = await run(cmd) if retcode != 0: raise PanelException( f"Failed to obtain auth URL. Unexpected return code {retcode}." f" stdout={stdout!r}, stderr={stderr!r}" ) parsed_url = urllib.parse.urlparse(stdout.decode().strip()) basic_auth, domain = parsed_url.netloc.split("@") basic_auth = base64.standard_b64encode(basic_auth.encode()).decode() document_roots_url = "/".join( [ parsed_url._replace(netloc=domain).geturl(), "CMD_API_DOMAIN?json=yes&action=document_root_all", ] ) loop = asyncio.get_event_loop() request = urllib.request.Request( document_roots_url, headers={"Authorization": f"Basic {basic_auth}"}, method="GET", ) return await loop.run_in_executor(None, self._do_request, request) async def docroots_info_legacy(self) -> Dict: cmd = [ "/usr/local/directadmin/directadmin", "--DocumentRoot", ] with timeit("Call DA binary to obtain all docroots", logger): ret, out, err = await run(cmd) if ret != 1: raise PanelException( "Failed to obtain document roots. Unexpected return code {}." " stdout={!r}, stderr={!r}".format(ret, out, err) ) output = json.loads(out.decode()) return output @staticmethod def parse_document_root_output(output) -> Dict: ret = dict() for username, userdata in output["users"].items(): for domainname, domaindata in userdata["domains"].items(): if domaindata.get("public_html"): ret[domaindata["public_html"]] = domainname for _, sub_data in domaindata.get("subdomains", {}).items(): if sub_data.get("public_html"): ret[sub_data["public_html"]] = domainname return ret async def list_docroots(self) -> Dict[str, str]: info = await self.docroots_info() return self.parse_document_root_output(info) def _do_request(self, request: urllib.request.Request) -> None: try: with urllib.request.urlopen( request, timeout=Core.DEFAULT_SOCKET_TIMEOUT ) as response: if response.status != 200: raise PanelException( "status code is {}".format(response.status) ) return json.loads(response.read().decode()) except ( UnicodeDecodeError, http.client.HTTPException, json.JSONDecodeError, socket.timeout, urllib.error.URLError, ) as e: raise PanelException from e