관리-도구
편집 파일: dbgov_saver.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import os import re from sqlalchemy.exc import SQLAlchemyError from clcommon.clpwd import ClPwd from lvestats.core.plugin import LveStatsPlugin from lvestats.lib.commons.func import reboot_lock from lvestats.lib.commons.sizeutil import dbgov_io_bytes_value from lvestats.orm.history_gov import history_gov MAX_FILES_PER_TRANSACTION = 1000 class DBGovSaver(LveStatsPlugin): DBSTAT_DIR = "/var/lve/dbgovernor/" FILE_PATTERN = re.compile(r"governor\.[0-9]+$", re.IGNORECASE) _history_gov_col = list(history_gov.__table__.columns.keys()) def __init__(self): self.logger = logging.getLogger(__name__) self.server_id = 'localhost' self.engine = None self.headers = ( ('username', str), None, # max_simultaneous_requests not support ('sum_cpu', float), ('sum_write', float), ('sum_read', float), None, # max_cpu not support None, # max_write not support None, # max_read not support ('number_of_restricts', int), ('limit_cpu_on_period_end', int), ('limit_read_on_period_end', int), ('limit_write_on_period_end', int), ('cause_of_restrict', int), ('uid', int), ) self._headers_len = len(self.headers) self.cl_pwd = ClPwd() self.min_uid = self.cl_pwd.get_sys_min_uid(500) def set_config(self, config): self.server_id = config.get('server_id', self.server_id) def get_user_id(self, username): try: return self.cl_pwd.get_uid(username) except self.cl_pwd.NoSuchUserException as e: self.logger.debug('Can not obtain user id for "%s"; %s', username, e) return -1 def scan_dir(self): """ Scans directory generated by db governer and prepares statistics for insertion into database. :return: list of tuples [(file name, [lines]), (file name, [lines])...] """ if os.path.exists(self.DBSTAT_DIR): flist = filter(self.FILE_PATTERN.search, os.listdir(self.DBSTAT_DIR)) for f in flist: try: file_name = os.path.join(self.DBSTAT_DIR, f) with open(file_name, 'r', encoding='utf-8') as f_stats: f_stats_lines = f_stats.readlines() yield file_name, f_stats_lines except IOError: self.logger.warning("No file statistic") except UnicodeDecodeError: with open(file_name, 'r', errors='surrogateescape', encoding='utf-8') as file: f_source = file.read() self.logger.error( 'Error while decoding the file %s', f, exc_info=True, extra={f: f_source}, ) yield file_name, [] def write_to_db(self, conn, scanned): """ :type scanned: generator :type conn: sqlalchemy.engine.base.Connection :rtype: list(dict(str, int|str)) """ values_list = [] unlink_list = [] for n_, (file_name, lines) in enumerate(scanned): if MAX_FILES_PER_TRANSACTION < n_: break for line in lines: try: self.logger.debug("write: %s", line) line_splited = line.strip().split(';') file_timestamp = int(file_name.split('.')[-1]) values = {'server_id': self.server_id, 'ts': file_timestamp} values.update(dict([(h_[0], h_[1](v_)) for h_, v_ in zip(self.headers, line_splited) if h_])) if not values.get('uid'): # for backward compatibility with governor-mysql < 1.2-1 values['uid'] = self.get_user_id(values['username']) # extend dict by user id if values['uid'] >= self.min_uid: # ignoring system users and when we can't extract user id values_list.append(values) except (IndexError, ValueError): self.logger.warning('Can not parse file %s; data from file not be writen to database', file_name) unlink_list.append(file_name) # Data to transfer to CM plugin data_for_cm = {} # insert all data per one commit with reboot_lock(): if values_list: try: # filter for insert only supported columns values_list_filtered = [ {k: v for k, v in list(d.items()) if k in self._history_gov_col} for d in values_list ] # form data for CM plugin for dbgov_data in values_list_filtered: uid = dbgov_data['uid'] data_for_cm[uid] = { 'cpu_limit': dbgov_data['limit_cpu_on_period_end'], 'io_limit': dbgov_io_bytes_value( dbgov_data['limit_read_on_period_end'], dbgov_data['limit_write_on_period_end'] ), 'cpu_usage': round(dbgov_data['sum_cpu'], 1), 'io_usage': dbgov_io_bytes_value(dbgov_data['sum_read'], dbgov_data['sum_write']), } conn.execute(history_gov.__table__.insert(), values_list_filtered) except (SQLAlchemyError, KeyError) as e: self.logger.warning(str(e)) try: list(map(os.unlink, unlink_list)) except OSError: pass return data_for_cm def execute(self, lve_data): """ :type lve_data: dict """ if "dbgov_data" not in lve_data: lve_data["dbgov_data"] = [] conn = self.engine.connect() try: scanned = self.scan_dir() dbgov_data_for_cm = self.write_to_db(conn, scanned) if dbgov_data_for_cm: lve_data["dbgov_data_for_cm"] = dbgov_data_for_cm finally: conn.close()