관리-도구
편집 파일: native.py
from logging import getLogger from typing import Dict from defence360agent.feature_management.constants import ( AV, EXTENSION_DEFAULTS, FEATURE_EXT_VARIABLES, NATIVE_EXTENSION_NAME, PROACTIVE, ) from defence360agent.feature_management.control import ( is_native_feature_management_enabled, ) from defence360agent.feature_management.model import FeatureManagementPerms from defence360agent.feature_management.utils import set_feature from defence360agent.plugins.event_monitor_message_processor import ( SettingsChangeBase, ) from defence360agent.subsys.panels.cpanel import packages logger = getLogger(__name__) class NativeFeatureManagementSettingsChange(SettingsChangeBase): FEATURES_LIST = [PROACTIVE, AV] async def _process_account_removed(self, message): await super()._process_account_removed(message) user = message.get("user") or message.get("username") if user: logger.info("Resetting FM settings for %s", user) FeatureManagementPerms.delete().where( FeatureManagementPerms.user == user ).execute() async def _process_modify(self, message): await super()._process_modify(message) if "old_username" in message.data: # User renamed old_username = message.data["old_username"] FeatureManagementPerms.update(user=message.username).where( FeatureManagementPerms.user == old_username ).execute() async def _get_settings_from_message(self, message): settings = {} for feature in self.FEATURES_LIST: try: value = message.data[FEATURE_EXT_VARIABLES[feature]] except KeyError: value = None settings[feature] = value return settings @classmethod async def _get_package_settings( cls, package_name: str, add_to_package: bool ) -> Dict[str, str]: logger.info("Getting package settings %s", package_name) pkg_info = await packages.get_package_info(package_name) package_settings = {} try: for feature, pe_var in FEATURE_EXT_VARIABLES.items(): package_settings[feature] = pkg_info[pe_var] return package_settings except KeyError: logger.info( "No extension's fields in package settings %s", pkg_info ) if add_to_package: await packages.add_extension( NATIVE_EXTENSION_NAME, pkg_info, **EXTENSION_DEFAULTS ) except packages.PackageNotExistError: logger.warning("Package %s doesn't exist", package_name) return cls._default_settings() @staticmethod def _default_settings() -> Dict[str, str]: return { feature: EXTENSION_DEFAULTS[pe_var] for feature, pe_var in FEATURE_EXT_VARIABLES.items() } async def on_settings_change(self, user, feature, value): await set_feature(user, feature, value) def _message_is_relatable(self, message): return True async def is_enabled(self): return await is_native_feature_management_enabled()