관리-도구
편집 파일: model.py
import asyncio import itertools import logging from typing import List, Optional from peewee import BooleanField, CharField import defence360agent.subsys.panels.hosting_panel as hp from defence360agent.contracts.messages import MessageType from defence360agent.model import Model, instance from defence360agent.model.simplification import run_in_executor from defence360agent.utils import execute_iterable_expression from defence360agent.utils.config import update_config from imav.malwarelib.model import MalwareHit logger = logging.getLogger(__name__) class MyImunify(Model): """Secure-site related settings""" class Meta: database = instance.db db_table = "myimunify" #: The username of the end-user, or an empty string for the default value #: for all new users. user = CharField(unique=True) #: Is MyImunify protection enabled/disabled to the end-user. protection = BooleanField(null=False, default=False) @classmethod def get_protection(cls, user: Optional[str]) -> bool: """Get SecureSite protection by username""" if user is None or user == "root": # root return True perm, _ = cls.get_or_create(user=user, defaults={"protection": False}) return perm.protection @classmethod def update_users_protection(cls, users: List[str], status: bool): cls.insert_many( [{"user": user, "protection": status} for user in users] ).on_conflict( conflict_target=[cls.user], preserve=[], update={cls.protection: status}, ).execute() async def malware_cleanup(sink, user): hits = MalwareHit.malicious_select(user=user, cleanup=True) if hits: await sink.process_message(MessageType.MalwareCleanupTask(hits=hits)) async def update_users_protection(sink, users: List[str], status: bool): await run_in_executor( None, lambda: MyImunify.update_users_protection(users, status), ) proactive_mode = None if not status: proactive_mode = "LOG" tasks = [ update_config( sink, {"PROACTIVE_DEFENCE": {"mode": proactive_mode}}, user, ) for user in users ] if status: tasks += [malware_cleanup(sink, user) for user in users] await asyncio.gather(*tasks) async def set_protection_status_for_all_users(sink, status: bool): """Set protection status for all users""" panel_users = await hp.HostingPanel().get_users() await update_users_protection(sink, panel_users, status) async def sync_users(sink, users: List[str]) -> bool: """Synchronize existing permissions with myimunify users""" panel_users = set(users) myimunify_users = MyImunify.select(MyImunify.user) myimunify_users = set(itertools.chain(*myimunify_users.tuples())) users_to_remove = myimunify_users - panel_users if users_to_remove: logger.info("Remove myimunify users %s", users_to_remove) def expression(users_to_remove): return MyImunify.delete().where( MyImunify.user.in_(users_to_remove) ) execute_iterable_expression(expression, list(users_to_remove)) users_to_add = panel_users - myimunify_users if users_to_add: logger.info("Add permissions to users %s", users_to_add) await update_users_protection(sink, users, False) return bool(users_to_add) or bool(users_to_remove)