관리-도구
편집 파일: php_manager.py
# coding: utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import json import os from pwd import getpwuid from future.utils import iteritems import cldetectlib as detect from clcommon.clconfig import get_param, replace_param from clcommon.ui_config import UIConfig from clselect.clselectctlphp import format_summary, parse_extensions, API_1 from clselect import ClSelect from clselect import ClUserSelect, ClExtSelect, ClUserExtSelect, ClUserOptSelect from clselect.baseclselect import BaseSelectorError from clselect.clselectexcept import ClSelectExcept as ClSelectExceptions from clcommon.cpapi import get_main_username_by_uid class PhpManager(object): """Responsible for actual PhpSelector selector high-level API""" interpreter = 'php' # DirectAdmin parameters DA_PATH = "/usr/local/directadmin/plugins/phpselector/plugin.conf" DA_PARAM = "active" def __init__(self, cfg=None, pkg=None): euid = os.geteuid() self.is_root_user = euid == 0 self.user_name = get_main_username_by_uid(euid) if self.is_root_user: self.cl_select_lib = ClSelect(self.interpreter) else: self.cl_select_lib = ClUserSelect(self.interpreter) @property def selector_enabled(self): """ Get current status for panel :return: bool """ if detect.is_da(): # DirectAdmin return get_param(self.DA_PATH, self.DA_PARAM) == "yes" else: return not UIConfig().get_param('hidePhpApp', 'uiSettings') @selector_enabled.setter def selector_enabled(self, status): """ Enable/disable selector for each panel :param status: bool :return: """ if detect.is_da(): try: replace_param(self.DA_PATH, self.DA_PARAM, "yes" if status else "no") except (OSError, IOError) as e: raise BaseSelectorError("Can not process config file %s with reason: %s" % ( self.DA_PATH, str(e))) UIConfig().set_config({'uiSettings': {'hidePhpApp': not status}}) else: UIConfig().set_config({'uiSettings': {'hidePhpApp': not status}}) @property def extensions_is_hidden(self): """ Extensions was hidden by admin :return: bool """ return UIConfig().get_param('hidePHPextensions', 'uiSettings') is True @property def domains_tab_is_hidden(self): """ Domains tab was hidden by admin :return: bool """ return UIConfig().get_param('hideDomainsTab', 'uiSettings') is True def switch_current_version(self, version): """ Set current version for user :param version: str :return: None (succes) """ if self.is_root_user: raise BaseSelectorError("Not supported as root user") # cloudlinux-selector utility automatically drops privileges # so there is no way to know here that we are executed from root # keep it this way until someone asks to change it elif message := self._get_version_selection_disabled_msg(): raise ClSelectExceptions.VersionModificationBlocked(message) c = ClUserSelect(self.interpreter) c.set_version(self.user_name, str(version)) c.clean_crui_images([self.user_name]) def reset_extensions(self, version): """ Reset extension for selected version :param version: str :return: List of enabled extensions after reset """ if self.is_root_user: raise BaseSelectorError("Not supported as root user") ClSelect.check_multiphp_system_default_version() c = ClUserExtSelect(self.interpreter) extensions = c.reset_extensions(self.user_name, str(version)) c.clean_crui_images([self.user_name]) return {"extensions": extensions} def set_extensions(self, version, extensions): """ Set extensions for php version for admin and user :param version: str :param extensions: list :return: response for user. Error or warning list or no error """ to_enable = [] for k, v in iteritems(extensions): if v == "enabled": to_enable.append(k) elif v != "disabled": return {"status": "ERROR: %s is not a valid state of an extension" % v} if self.is_root_user: ClExtSelect(self.interpreter).replace_extensions(str(version), to_enable) return {} else: if self.extensions_is_hidden: raise BaseSelectorError("Extensions was disabled by admin") c = ClUserExtSelect(self.interpreter) extensions, resolved_dependencies, conflict_dependencies = c\ .bulk_replace_extensions(self.user_name, str(version), to_enable) c.clean_crui_images([self.user_name]) if resolved_dependencies: resolved_dependencies = ['{} enabled as dependency ({})'.format(*ext) for ext in resolved_dependencies] # ToDo: add ability to show several warnings and split messages warnings = (', '.join(resolved_dependencies)) if resolved_dependencies else '' if conflict_dependencies: if warnings: warnings = warnings + '. ' warnings = warnings + '{} skipped as conflicting.'.format(', '.join(conflict_dependencies)) if warnings: return {'warning': warnings} return {} def get_summary(self): """ Return all information of php selector All extensions(user, admin) and options(for user), selected version (for user) :return: dict """ summary_data = self._fetch_summary_data() summary_json = self._parse_summary_data_to_json(summary_data) summary_json.update(self._get_common_summary_data()) if not self.is_root_user: summary_json.update(self._get_user_specific_summary_data(summary_data)) if 'available_versions' not in summary_json: if 'message' in summary_json: raise BaseSelectorError(summary_json['message']) return {'status': 'ERROR', 'data': summary_json} self._process_versions(summary_json['available_versions']) return summary_json def _fetch_summary_data(self): """ Fetch summary data for user or root :return: tuple containing the summary data """ if self.is_root_user: return self.cl_select_lib.get_summary(False) try: return self.cl_select_lib.get_summary(self.user_name, True) except ClSelectExceptions.NotCageFSUser as e: raise BaseSelectorError({ 'message': 'User %(user)s not in CageFs', 'context': {'user': self.user_name} }) from e def _parse_summary_data_to_json(self, summary_data): """ Parse summary data to JSON format :param summary_data: tuple :return: dict """ json_data = format_summary(summary_data, format='json', api_version=API_1) return json.loads(json_data) def _get_common_summary_data(self): """ Get summary data applicable for both root and non-root users :return: dict containing common summary data """ return { 'selector_enabled': self.selector_enabled, 'extensions_is_hidden': self.extensions_is_hidden, 'domains_tab_is_hidden': self.domains_tab_is_hidden, } def _get_user_specific_summary_data(self, summary_data): """ Get summary data specific to non-root users :param summary_data: tuple containing the summary data :return: dict with user-specific summary data """ selected_version = self._find_selected_version(summary_data) version_selection_disabled_msg = self._get_version_selection_disabled_msg() data = {} if selected_version: data['selected_version'] = selected_version if version_selection_disabled_msg: data['version_selection_disabled_msg'] = version_selection_disabled_msg return data def _find_selected_version(self, summary_data): """ Determine the PHP version selected by the user based on the summary data. :param summary_data: tuple A tuple containing version information :return: str or None The selected PHP version as a string if found, otherwise None """ for version_info in summary_data: version, (_, _, is_selected) = version_info if is_selected: return version return None def _get_version_selection_disabled_msg(self): """ Retrieve the message displayed when PHP version selection is disabled. This method is only applicable for non-root users :return: str - The message indicating why PHP version selection is disabled for the user """ return self.cl_select_lib.get_version_selection_disabled_msg(self.user_name) def _process_versions(self, versions_list): """ Process versions list and add extensions and options to it :param versions_list: list of versions """ for item in versions_list: # get extensions for users only and exclude native version if not self.is_root_user and item['version'].startswith('native'): continue try: extensions_json = self._get_extensions(item['version']) item.update(extensions_json) except ClSelectExceptions.UnableToGetExtensions as e: raise BaseSelectorError({ 'message': str(e), 'details': e.details, }) from e # get options for every version for users exclude native if not self.is_root_user and not item['version'].startswith('native'): try: options_json = ClUserOptSelect(self.interpreter).get_options( self.user_name, item['version'], ) item['options'] = options_json except ClSelectExceptions.UnableToGetExtensions as e: raise BaseSelectorError({ 'message': str(e), 'details': e.details, }) from e def set_options(self, version, options): """ Save php options :param version: version which save options for :type version: str :param options: dict of options for save :type options: dict :return: None (success) """ if self.is_root_user: raise BaseSelectorError("Not supported as root user") ClSelect.check_multiphp_system_default_version() c = ClUserOptSelect(self.interpreter) c.bulk_insert_options( self.user_name, str(version), options) c.clean_crui_images([self.user_name]) def switch_default_version(self, version): ClSelect(self.interpreter).set_version(str(version)) def get_current_version(self, user_names = None): """ Return current versions for list of users or all users :param user_names: array of users :return: information with users and versions """ user_map = ClUserSelect(self.interpreter).get_version_user_map(user_names) result = {} for version, users in user_map.items(): result.update({user: {'version': version} for user in users}) return {'users': result} def _get_extensions(self, version): """ return list of exnensions for selected version (for user and admin) :param version: version get extensions for :type version: str :return: """ if self.is_root_user: # Recreate ClExtSelect for noncache ext_list = ClExtSelect(self.interpreter).list_extensions(version) else: ext_list = ClUserExtSelect(self.interpreter).list_all_extensions(self.user_name, version) return parse_extensions(ext_list, version, fmt='json')