관리-도구
편집 파일: snapshot_saver.py
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import string from random import choice from typing import Dict, List, Optional, Tuple # NOQA import sqlalchemy.orm.session # NOQA from sqlalchemy.orm import sessionmaker from clcommon.clpwd import ClPwd from clcommon.cpapi import NotSupported from clcommon.cpapi.cpapiexceptions import NoDBAccessData from clcommon.utils import ExternalProgramFailed from lvestats.core.plugin import LveStatsPlugin from lvestats.lib.commons import proctitle from lvestats.lib.commons.func import get_chunks, reboot_lock from lvestats.lib.commons.htpasswd import HtpasswdFile from lvestats.lib.commons.litespeed import LiteSpeed, LiteSpeedException, LiteSpeedInvalidCredentials from lvestats.lib.snapshot import Snapshot from lvestats.lib.ustate import MySQLOperationalError, SQLSnapshot, get_lveps from lvestats.orm.incident import incident DEFAULT_PERIOD_BETWEEN_INCIDENTS = 300 # time separating incidents DEFAULT_SNAPSHOTS_PER_MINUTE = 2 # number of snapshots per minute DEFAULT_MAX_SNAPSHOTS_PER_INCIDENT = 10 # the maximum number of snapshots in the incident APACHE = 'apache' LITESPEED = 'litespeed' class LitespeedHelper(object): def __init__(self): self.is_running = False self.state_changed = False # config option; None if server should be # detected automatically, # False if we must use apache # True if we must use litespeed self.force_litespeed = None # create random login-password pair self.login = 'lve-stats-admin' self.password = self.generate_random_password() self.broken_config = False def check_litespeed_state(self): """Check litespeed state""" litespeed_running = LiteSpeed.is_litespeed_running() self.state_changed = litespeed_running != self.is_running self.is_running = litespeed_running def dump_passwd(self): try: passwdfile = HtpasswdFile(LiteSpeed.HTPASSWD_PATH) except ValueError: self.broken_config = True logging.warning("Can't change the password. Please, check the file:\n '%s'", LiteSpeed.HTPASSWD_PATH) return passwdfile.update(self.login, self.password) passwdfile.save() logging.debug("Password successfully changed.") def get_user_data(self, username): # type: (str) -> list """Get user data proxy method""" litespeed = LiteSpeed(self.login, self.password) try: return litespeed.get_user_data(username) except LiteSpeedInvalidCredentials: self.dump_passwd() raise @staticmethod def generate_random_password(): # type: () -> str chars = string.ascii_letters + string.digits + '!@#$%^&*()' return ''.join(choice(chars) for _ in range(16)) def get_use_litespeed(self): # type: () -> bool """Get what we must use: litespeed or apache""" is_litespeed_running = self.is_running if self.broken_config: return False elif self.force_litespeed is not None: return self.force_litespeed return is_litespeed_running class SnapshotHelper(object): username_dbquery_map = None ps = {} clpwd = None sql_snap = SQLSnapshot() def __init__(self): self.litespeed_died = False self._sql_snapshot_works = None self.log = logging.getLogger('SnapshotSaver') def get_names(self, lve_id): # type: (int) -> List[str] try: return self.clpwd.get_names(lve_id) except ClPwd.NoSuchUserException: return [] def get_snapshot_data(self, lve_id, litespeed_info): # type: (int, LitespeedHelper) -> Tuple[dict, list, list] processes = self.ps.get(lve_id, {}).get('TID', {}) queries = [] urls = [] for username in self.get_names(lve_id): queries += self.username_dbquery_map.get(username, []) use_litespeed = litespeed_info.get_use_litespeed() if use_litespeed: try: urls += litespeed_info.get_user_data(username) except LiteSpeedException as e: # do not save message every time if not self.litespeed_died: logging.warning('Error during getting information from Litespeed: %s', e) self.litespeed_died = True urls += proctitle.Proctitle().get_user_data(username) else: urls += proctitle.Proctitle().get_user_data(username) return processes, queries, urls def invalidate(self, lve_id_list): # type: (List[int]) -> None self.clpwd = ClPwd() all_usernames = [] for lve_id in lve_id_list: all_usernames += self.get_names(int(lve_id)) self.username_dbquery_map = self.retrieve_queries(all_usernames) try: self.ps = get_lveps() except ExternalProgramFailed as e: self.log.warning('An error occurred while getting processes list', exc_info=e) def retrieve_queries(self, login_names): # type: (Optional[List[str]]) -> Dict[str, List[str]] try: with self.sql_snap as db_requests: result = db_requests.get(login_names) if not self._sql_snapshot_works: self._sql_snapshot_works = True self.log.info('SQL snapshot is supported and operational') return result except MySQLOperationalError as e: self.log.warning('An error occurred while getting MySQL process list', exc_info=e) except (NotSupported, NoDBAccessData) as e: # errors which we can write only once # because who needs this message in log each 5 seconds? if self._sql_snapshot_works in [None, True]: self._sql_snapshot_works = False self.log.info('SQL snapshot is not supported by this control panel', exc_info=e) return {} class SnapshotSaver(LveStatsPlugin): server_id = '' period_between_incidents = DEFAULT_PERIOD_BETWEEN_INCIDENTS max_snapshots_per_incident = DEFAULT_MAX_SNAPSHOTS_PER_INCIDENT _period = 60 // DEFAULT_SNAPSHOTS_PER_MINUTE def __init__(self): self.incidents_last_ts = {} self.log = logging.getLogger('SnapshotSaver') self.session = None # type: sqlalchemy.orm.session.Session self._snapshots_data = {} self.incidents_cache = {} self.first_run = True self.last_run = 0 self.litespeed_info = LitespeedHelper() self.snapshots_enabled = True self.compresslevel = 1 self._helper = SnapshotHelper() def set_config(self, config): # type: (dict) -> None self.server_id = config.get('server_id', self.server_id) self.period_between_incidents = int(config.get('period_between_incidents', self.period_between_incidents)) self.max_snapshots_per_incident = int(config.get('max_snapshots_per_incident', self.max_snapshots_per_incident)) self._period = 60 // int(config.get('snapshots_per_minute', DEFAULT_SNAPSHOTS_PER_MINUTE)) self.litespeed_info.force_litespeed = self._get_webserver_option(config) self.setup_litespeed(force_webserver_message=True) self.snapshots_enabled = config.get('disable_snapshots', "false").lower() != "true" self.compresslevel = max(min(int(config.get('compresslevel', 1)), 9), 1) def _get_webserver_option(self, config): # type: (dict) -> Optional[bool] """ Check which webserver we must force to use: Apache or Litespeed :return: None if webserver autodetect False if apache should be used True if litespeed should be used """ if 'litespeed' in config: option = config['litespeed'].lower() if option in ['on', '1']: return True if option in ['off', '0']: return False return None def _incomplete_incidents_query(self): # type: () -> sqlalchemy.orm.query.Query """Generate sqlalchemy Query instance for select incomplete incidents""" return self.session.query(incident).filter( incident.server_id == self.server_id, incident.incident_end_time.is_(None) ) def finalize_incidents(self): # type: () -> None not_finalize_incidents_query = self._incomplete_incidents_query().filter( incident.dump_time < self.now - self.period_between_incidents ) finalized_numb = not_finalize_incidents_query.update({incident.incident_end_time: incident.dump_time}) self.log.debug('%i old incidents period was finalized', finalized_numb) self.session.commit() def save_old_incidents(self): # type: () -> None old_incidents = { uid: ts for uid, ts in list(self.incidents_last_ts.items()) if ts < self.now - self.period_between_incidents } if old_incidents: try: for _inc in ( self.session.query(incident) .filter( incident.server_id == self.server_id, incident.incident_end_time.is_(None), incident.uid.in_(list(old_incidents.keys())), ) .all() ): _inc.incident_end_time = old_incidents[_inc.uid] except Exception as e: self.session.rollback() self.log.error("Unable to save old incidents: (%s)", str(e)) else: self.session.commit() for uid in list(old_incidents.keys()): self.incidents_last_ts.pop(uid) try: self.incidents_cache.pop(uid) except KeyError: pass def get_incident(self, uid): # type: (int) -> incident _inc = self._incomplete_incidents_query().filter(incident.uid == uid).first() if not _inc: _inc = incident( uid=uid, incident_start_time=self.now, server_id=self.server_id, snapshot_count=0, incident_end_time=None, ) self.session.add(_inc) self.log.debug( 'New incident-period for uid %s started; incident_start_time=%i', uid, self.now, ) self.incidents_last_ts[uid] = self.now return _inc def init_session(self): if self.first_run: # Running for the first time self.session = sessionmaker(bind=self.engine)() self.finalize_incidents() self.first_run = False def process_lve(self, lve_id, faults): # type: (int, Dict[str, int]) -> None try: lve_id = int(lve_id) self.incidents_last_ts[lve_id] = self.now _inc = self.incidents_cache[lve_id] self.log.debug( 'Faults %s for uid %s detected; timestamp %i', faults, lve_id, self.now, ) if _inc["snapshot_count"] < self.max_snapshots_per_incident: self.save_snapshot(_inc, faults) self.log.debug( 'Snapshot for uid %s with timestamp %i saved', lve_id, self.now, ) except Exception as e: self.session.rollback() self.log.warning("Unable to save incident for LVE %s (%s)", lve_id, e) def setup_litespeed(self, force_webserver_message=False): # type: (bool) -> None """Check state and configure access to litespeed""" self.litespeed_info.check_litespeed_state() if self.litespeed_info.state_changed and self.litespeed_info.is_running: self.litespeed_info.dump_passwd() if self.litespeed_info.state_changed or force_webserver_message: use_litespeed = self.litespeed_info.get_use_litespeed() litespeed_running = self.litespeed_info.is_running if use_litespeed and not litespeed_running: self.log.info("Litespeed is not running properly. Check litespeed license key.") webserver = LITESPEED if use_litespeed else APACHE msg = f"{webserver} webserver will be used now to obtain data" self.log.info(msg) def execute(self, lve_data): # type: (dict) -> None if not self.snapshots_enabled: return self.init_session() self.setup_litespeed() lve_ids = list(lve_data.get('lve_usage', {}).keys()) lve_faults = lve_data.get('faults', {}) self._helper.invalidate(lve_ids) self.cache_snapshots(lve_faults) if self.now - self.last_run >= self._period: self.last_run = self.now with reboot_lock(): self.save_old_incidents() self._insert_new_incidents(lve_faults) self._increment_snapshot_count(lve_faults) for lve_id, faults in list(lve_faults.items()): self.process_lve(lve_id, faults) lve_data["faults"] = {} self._snapshots_data = {} def _increment_snapshot_count(self, lve_faults): # type: (Dict[int, Dict[str, int]]) -> None lve_id_list = list(lve_faults.keys()) for chunk in get_chunks(lve_id_list, 250): self._incomplete_incidents_query().filter( incident.uid.in_(chunk), incident.snapshot_count < self.max_snapshots_per_incident ).update( {"snapshot_count": incident.snapshot_count + 1, "dump_time": int(self.now)}, synchronize_session=False ) self.session.commit() def _insert_new_incidents(self, lve_faults): # type: (Dict[int, Dict[str, int]]) -> None new_incidents = { lve_id: { 'uid': lve_id, "incident_start_time": self.now, "server_id": self.server_id, "snapshot_count": 0, "incident_end_time": None, } for lve_id, _ in list(lve_faults.items()) if lve_id not in self.incidents_cache } if new_incidents: self.incidents_cache.update(new_incidents) insert_list = (incident(**_inc) for _inc in list(new_incidents.values())) try: # Better to use # self.session.bulk_insert_mappings(incident, new_incidents) # but it will be available only in SQLAlchemy version > 1.0 self.session.add_all(insert_list) except Exception as e: self.log.error("Unable to save new incidents: %s", str(e)) else: self.session.commit() def cache_snapshots(self, lve_faults): # type: (Dict[int, Dict[str, int]]) -> None for lve_id in list(lve_faults.keys()): if lve_id not in self._snapshots_data: self._snapshots_data[lve_id] = self._helper.get_snapshot_data(lve_id, self.litespeed_info) def save_snapshot(self, _incident, faults): # type: (dict, Dict[str, int]) -> None lve_id = _incident["uid"] processes, queries, urls = self._snapshots_data.get(lve_id, ({}, [], [])) snapshot_ = Snapshot(_incident, self.compresslevel) data = { 'uid': _incident["uid"], 'dump_time': int(self.now), 'server_id': self.server_id, 'incident_start_time': _incident["incident_start_time"], 'snap_proc': processes, 'snap_sql': queries, 'snap_http': urls, 'snap_faults': faults, } try: snapshot_.save(data) except IOError as e: self.log.error(str(e)) _incident["snapshot_count"] += 1