관리-도구
편집 파일: environments.py
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import absolute_import from __future__ import division import os import sys import pwd import subprocess import shutil import filecmp from datetime import datetime from clselect.clselectctl import get_directory from clselect.utils import check_call, check_output, list_dirs, run_command from .extensions import EXTENSION_PATTERN, ExtensionInfo from .interpreters import Interpreter, interpreters import simplejson as json from simplejson import JSONDecodeError from clselect.clselectexcept import ClSelectExcept DEFAULT_PREFIX = 'virtualenv' BACKUP_PREFIX = '.virtualenv.backup' # only this binary it brought by alt-python-virtualenv package VIRTUALENV_BIN = '/opt/cloudlinux/venv/bin/virtualenv' VERSION_DELIMITER = '#' WRAPPERS_PATH = '/usr/share/l.v.e-manager/utils' PYTHON_WRAPPER = 'python_wrapper' SET_ENV_VARS_SCRIPT = 'set_env_vars.py' class Environment(object): def __init__(self, name, user=None, prefix=None): self.name = name if user: self.user = user else: self.user = pwd.getpwuid(os.getuid()).pw_name self.homepath = pwd.getpwnam(self.user).pw_dir self.pip_logfile = os.path.join(self.homepath, '.pip/pip.log') if prefix is None: self.prefix = DEFAULT_PREFIX else: self.prefix = prefix self.path = os.path.join(_abs_prefix(self.user, self.prefix), name) self.backup_path = os.path.join( _abs_prefix(self.user, BACKUP_PREFIX), self.name) self._requirements = None self._interpreter = None self._pip = None self.interpreter_name = 'python' + name # Create extenstion remap table self._extension_remap = {'MySQLdb': 'MySQL-python'} def __repr__(self): return ("%s.%s(name='%s', user='%s', prefix='%s')" % ( self.__class__.__module__, self.__class__.__name__, self.name, self.user, self.prefix)) def _demote(self): user_pwd = pwd.getpwnam(self.user) def func(): os.setgid(user_pwd.pw_gid) os.setuid(user_pwd.pw_uid) os.environ['USER'] = self.user os.environ['HOME'] = user_pwd.pw_dir return func def as_dict(self, key=None): e = { 'name': self.name, 'interpreter': self.interpreter(), 'extensions': self.extensions(), } if key: del e[key] return {getattr(self, key): e} return e def as_deepdict(self, key=None, with_extensions=True): e = { 'name': self.name, 'interpreter': self.interpreter().as_dict(), } if with_extensions: e.update({ 'extensions': self.extensions(), }) if key: del e[key] return {getattr(self, key): e} return e def create(self, interpreter=None, version=None, wait=None): if not interpreter: interpreter = Interpreter(target_user=self.user) path = self.path if version: path = os.path.join(path, version) prompt = "({}:{})".format( get_directory(os.path.basename(self.prefix)), self.name, ) args = [ VIRTUALENV_BIN, '--prompt', prompt, '--python', interpreter.binary, path, ] kwargs = {"preexec_fn": self._demote(), "cwd": self.homepath, "wait": wait} try: check_call(*args, **kwargs) except ClSelectExcept.ExternalProgramFailed as err: err = str(err) err_trace = None # Change error text and add help if disk quota exceeded if "Disk quota exceeded" in err: err_text = "Disk quota exceeded.\n " \ "Contact system administrator to increase disk quota." elif "Traceback" in err: # Find second ":" character. First is "Traceback :" err_char = err.find(":", err.find(":")+1) # Find last row of trace err_trace_end = err[:err_char].rfind('\n') # pylint: disable=indexing-exception if err_trace_end == -1 or err_char == -1: err_text = err else: # Trace row without error err_trace = err[:err_trace_end] # pylint: disable=indexing-exception # Only error without first trace err_text = err[err_trace_end+1:] # pylint: disable=indexing-exception else: err_text = err raise ClSelectExcept.ExternalProgramFailed( message=err_text, details=err_trace, ) self.configure_environment() def detect_python_binary(self, bin_path): files_to_check = [ 'python', self.interpreter_name.split('.')[0], self.interpreter_name, ] for file in files_to_check: path = os.path.join(bin_path, file) if not os.path.islink(path) or os.readlink(path).startswith('/opt/alt/python'): return path return None def configure_environment(self, auto_restore=False): """ Configures environment: 1. Rename binary to pythonX.Y_bin 2. Makes symlink from python binary to python_wrapper """ bin_path = os.path.join(self.path, 'bin') new_interpreter_path = os.path.join(bin_path, self.interpreter_name) + '_bin' interpreter_path = self.detect_python_binary(bin_path) if interpreter_path is None: return if os.path.exists(new_interpreter_path): os.remove(new_interpreter_path) os.rename(interpreter_path, new_interpreter_path) try: if not os.path.exists(interpreter_path): os.symlink(os.path.join(WRAPPERS_PATH, PYTHON_WRAPPER), interpreter_path) except (IOError, OSError): if auto_restore: os.rename(new_interpreter_path, interpreter_path) raise if not os.path.exists(os.path.join(bin_path, SET_ENV_VARS_SCRIPT)): os.symlink(os.path.join(WRAPPERS_PATH, SET_ENV_VARS_SCRIPT), os.path.join(bin_path, SET_ENV_VARS_SCRIPT)) def destroy(self, version=None): path = self.path if version: path = os.path.join(path, version) if os.path.exists(path): check_call('/bin/rm', '-r', '--interactive=never', path, preexec_fn=self._demote()) def _get_extension_name(self, extension_name): """ Returns extensions name considering extension remap table :param extension_name: Input extension name :return: Result extension name """ if extension_name in self._extension_remap: return self._extension_remap[extension_name] else: return extension_name def _recreate(self, version): """ Recreate python virtual environment with requirements :return: """ # if virtual environment does not exists, just create it # unfortunately, we don't have requirements env_path = os.path.join(self.path, version) if not os.path.exists(self.pip(version=version)): return print('Re-create python virtualenv:', env_path) # pip freeze, save last requirements into the file self._pip_freeze(version) # remember the requirements in the memory requirements_path = self.pip_requirements(version) requirements = [] if os.path.exists(requirements_path): reqs_file = open(requirements_path, 'r') requirements = reqs_file.readlines() reqs_file.close() # destroy python virtual environment self.destroy(version=version) # create python virtual environment self.create(version=version, wait=True) # put remembered requirements into the file reqs_file = open(requirements_path, 'w') reqs_file.writelines(requirements) reqs_file.close() # pip install -r requirements, install requirements check_call( self.pip(version=version), 'install', '-r', self.pip_requirements(version)) def recreate(self): for version in interpreters(key='version').keys(): self._recreate(version) def exists(self): return os.path.exists(self.path) def interpreter(self): if not self._interpreter: self._interpreter = Interpreter(prefix=self.path, target_user=self.user) return self._interpreter def extension_install(self, extension_name): extension_name = self._get_extension_name(extension_name) locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name) t = extension_name.split(VERSION_DELIMITER) extension, version = t[0], t[1:] or '' command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile + ' install ' + extension_name) if version: version = version[0] command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile + ' install ' + extension + '==' + version) if ExtensionInfo.is_extensions_locked(locked_extensions, extension_name, version): raise ValueError("Extension '%s' install is prohibited. System extension" % extension_name) check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath) self._pip_freeze() def extension_install_requirements(self, requirements_path): command = ('/bin/bash', '-l', '-c', self.pip() + ' --log-file=' + self.pip_logfile + ' install -r {}'.format(requirements_path)) check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath) self._pip_freeze() def extension_update(self, extension): check_call(self.pip(), '--log-file='+self.pip_logfile, 'install', '--upgrade', extension, preexec_fn=self._demote(), cwd=self.homepath) self._pip_freeze() def extension_uninstall(self, extension): locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name) t = extension.split(VERSION_DELIMITER) extension, version = t[0], t[1:] or '' if version: version = version[0] if ExtensionInfo.is_extensions_locked(locked_extensions, extension, version): raise ValueError("Extension '%s' removal is prohibited" % extension) p = subprocess.Popen( (self.pip(), '--log-file='+self.pip_logfile, 'uninstall', extension), preexec_fn=self._demote(), stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE, cwd=self.homepath, text=True) stdout, stderr = p.communicate('y') if p.returncode: raise Exception(stderr or stdout) self._pip_freeze() def extensions(self): result = {} locked_extensions = ExtensionInfo.get_locked_extensions(self.interpreter_name) try: output = check_output(self.pip(), 'list', '--log-file='+self.pip_logfile, '--format=json', preexec_fn=self._demote(), cwd=self.homepath) extensions = [(x['name'], x['version']) for x in json.loads(output)] except (JSONDecodeError, KeyError, ValueError, ClSelectExcept.FileProcessError, ClSelectExcept.ExternalProgramFailed): output = check_output(self.pip(), 'list', '--log-file='+self.pip_logfile, preexec_fn=self._demote(), cwd=self.homepath) extensions = EXTENSION_PATTERN.findall(output) docs = (ExtensionInfo().extension_doc(extension) for extension, _ in extensions) for (name, version), doc in zip(extensions, docs): if ExtensionInfo.is_extensions_locked(locked_extensions, name, version): version_diff = list(set([v.strip() for v in version.split(',')]) - set(locked_extensions.get(name))) if version_diff and len(locked_extensions.get(name)) != 0: result[name] = {'doc': doc, 'version': ', '.join(version_diff)} else: result[name] = {'doc': doc, 'version': version} return result def pip(self, version=None): if version is not None: return os.path.join(self.path, version, 'bin', 'pip') if not self._pip: self._pip = os.path.join(self.path, 'bin', 'pip') return self._pip def pip_requirements(self, version=None): if version is not None: return os.path.join(self.path, version, 'requirement.pip') return os.path.join(self.path, 'requirement.pip') def update_python_interpreter(self, backup=False, force=False, verbose=True, _alt_interpreters_dict=None): """ copy binary python from /opt/alt/pythonXY/bin/pythonX.Y to virtualenvdir/bin/pythonX.Y :param backup: make backup old python interpreter :param force: force rewrite python interpreter without check :param verbose: print actions details to stdout :return: True - updating success; False - updating fail """ update_result = False interpreter = self.interpreter() if _alt_interpreters_dict: main_interpreter = _alt_interpreters_dict[interpreter.version] else: main_interpreter = interpreters(key='version')[interpreter.version] # path to original /opt/alt/pythonXY/bin/pythonX.Y updated_list = list() # list updated interpreters if os.path.islink(interpreter.python_bin) and os.readlink(interpreter.python_bin).startswith('/opt/alt/python'): if verbose: print('Nothing to do, binary in your virtual environment is already symlink to global python!') return False # make backup and delete old python binary python_backup = interpreter.python_bin + '.orig_%s' % datetime.now().strftime("%Y-%m-%d_%H-%M") stat_ = os.stat(interpreter.python_bin) shutil.copy(interpreter.python_bin, python_backup) os.chown(python_backup, stat_.st_uid, stat_.st_gid) # preserving owner try: for virtualenv_python_bin in interpreter.binary_list: if filecmp.cmp(main_interpreter.binary, interpreter.python_bin) and not force: update_result = False if verbose: print(" not need updating; skip '%s'" % virtualenv_python_bin) continue if verbose: sys.stdout.write(" copy '%s' -> '%s'..." % (main_interpreter.binary, virtualenv_python_bin)) run_command(cmd=('/bin/cp', '--force', main_interpreter.binary, virtualenv_python_bin)) updated_list.append(virtualenv_python_bin) print("Done") update_result = True except (OSError, IOError) as e: # rollback binaries python if something is wrong print("Fail %s" % str(e)) for updated_python in updated_list: shutil.copyfile(python_backup, updated_python) # safe copy with preserve owner and mode os.unlink(python_backup) if not backup: # delete backup if not need os.unlink(python_backup) return update_result def _pip_freeze(self, version=None): """ Output installed packages in requirements format :return: None """ if not os.path.exists(self.pip(version)): return command = (self.pip(version), 'freeze', '-l') f = open(self.pip_requirements(version), 'w') check_call(args=command, preexec_fn=self._demote(), cwd=self.homepath, output=f) def pip_freeze(self): """ Output installed packages in requirements format :return: None """ for version in interpreters(key='version').keys(): self._pip_freeze(version=version) def _abs_prefix(user=None, prefix=None): if not prefix: prefix = DEFAULT_PREFIX if user: return os.path.join(pwd.getpwnam(user).pw_dir, prefix) else: return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix) def environments(user=None, prefix=None): venv_path = _abs_prefix(user, prefix) try: env_list = list_dirs(venv_path) except OSError: return [] envs = [] for env_name in env_list: envs.append(Environment(env_name, user, prefix)) return envs def environments_dict(key, user=None, prefix=None): return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix)) def environments_deepdict(key, user=None, prefix=None): return dict(list(e.as_deepdict(key=key).items()) for e in environments(user, prefix))