관리-도구
편집 파일: ustate.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging from typing import Dict, List, Optional # NOQA from clcommon import mysql_lib from clcommon.cpapi import cpusers, db_access, dblogin_cplogin_pairs from clcommon.utils import run_command log = logging.getLogger('ustate') class MySQLOperationalError(Exception): pass def _parse_lveps_output(lveps_output): """ parse /usr/sbin/lveps -c 1 -p -d -n -x result example returned data: {504: {'CPU': '26%', 'IO': '0', 'MEM': '1', 'EP': '0', 'IOPS': 'N/A', 'PNO': '3', 'TNO': '3', 'TID': {4400: {'CPU': '26%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'md5sum', 'IOPS': 'N/A'}, 4381: {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'su', 'IOPS': 'N/A'}, 4382: {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'bash', 'IOPS': 'N/A'}}}, 500: {'CPU': '13%', 'IO': '0', 'MEM': '1', 'EP': '0', 'IOPS': 'N/A', 'PNO': '3', 'TNO': '3', 'TID': {4266: {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'su', 'IOPS': 'N/A'}, 4299: {'CPU': '13%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'cat', 'IOPS': 'N/A'}, 4267: {'CPU': '0%', 'IO': 'N/A', 'MEM': '1', 'CMD': 'bash', 'IOPS': 'N/A'}}}} example of data manipulation: getting a list of user id >>> lveps_data = _parse_lveps_output() >>> user_id_list = lveps_data.keys() a list of processes tid particular user id 504 >>> user_tid_list = lveps_data[504]['TID'].keys() getting CPU load user >>> user_cpu = lveps_data[504]['CPU'] getting CPU load specific process >>> lveps_data[504]['TID'][4400] """ lveps_lines = lveps_output.split('\n') header_line = lveps_lines.pop(0) columns_name = header_line.split() # replace columns name to standart replace_col = {'SPEED': 'CPU', 'COM': 'CMD'} columns_name = [replace_col.get(col_name, col_name) for col_name in columns_name] lveps_data = {} user_id = 0 for lveps_line_index, lveps_line in enumerate(lveps_lines): if not lveps_line: continue lveps_line_splited = lveps_line.split(None, len(columns_name) - 1) # if the first one is number - use it as lve_id has_id = (len(lveps_line_splited) == len(columns_name)) and lveps_line_splited[0].isdigit() if not has_id: lveps_line_splited = lveps_line.split(None, len(columns_name) - 2) lveps_line_splited.insert(0, '') if len(lveps_line_splited) != len(columns_name): log.error( "lveps output was incorrect: %s", lveps_line, extra={"data": {"lveps_lines": lveps_lines[max(0, lveps_line_index - 5):lveps_line_index + 15]}}, ) break lveps_dict_line = dict(zip(columns_name, lveps_line_splited)) if has_id: # FIXME: need to add a filter to a minimum uid user_id = int(lveps_dict_line.pop('ID')) try: del lveps_dict_line['CMD'] except KeyError: pass lveps_data[user_id] = lveps_dict_line lveps_data[user_id]['TID'] = {} lveps_data[user_id].pop('PID', None) else: lveps_dict_line.pop('EP', None) lveps_dict_line.pop('PNO', None) lveps_dict_line.pop('TNO', None) lveps_dict_line.pop('ID', None) try: lveps_data[user_id]['TID'][int(lveps_dict_line.pop('TID'))] = lveps_dict_line except (ValueError, KeyError) as e: log.error("Can't parse lveps output: %s", str(e)) return lveps_data def get_lveps(): lveps_output = _get_lveps_output() return _parse_lveps_output(lveps_output) def _get_lveps_output(): lveps_output = run_command( [ '/usr/sbin/lveps', '-c', '1', '-p', '-d', '-n', '-o', '-x', # tells lveps to put at least one space between columns for correct parsing 'id:10,ep:10,pno:10,pid:15,tno:5,tid:15,cpu:7,mem:15,com:256', ], convert_to_str=False, ) # ignore any non-utf8 symbols here # we use this information only to show user return lveps_output.decode('utf-8', 'replace') class SQLSnapshot(object): def __init__(self): self._mysql_conn = None self._dblogin_cplogin_map = {} self._db_users = set() # attribute to detect new database users def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def connect(self): """ Obtain access data and connect to mysql database """ access = db_access() mysql_login = access['login'] mysql_pass = access['pass'] mysql_host = access.get('host', 'localhost') connector = mysql_lib.MySQLConnector( host=mysql_host, user=mysql_login, passwd=mysql_pass, use_unicode=True, charset="utf8mb4", ) try: self._mysql_conn = connector.connect() except mysql_lib.MySQLError as e: raise MySQLOperationalError(str(e)) from e def _refresh_map(self): """ Refresh <database user>:<system user> map """ self._dblogin_cplogin_map = dict(dblogin_cplogin_pairs()) def close(self): """ Close Mysql connection """ self._mysql_conn.close() def _raw_processlist(self): result = tuple() try: cursor = self._mysql_conn.cursor() cursor.execute('SHOW FULL PROCESSLIST') result = cursor.fetchall() except (UnicodeDecodeError, mysql_lib.MySQLError) as e: log.warning('Error occurred during executing the `SHOW FULL PROCESSLIST` command', exc_info=e) return result def _get_sql_process_list(self): """ Group processlist by database user name :rtype: dict """ process_snapshot = {} for sql_tuple in self._raw_processlist(): db_username = sql_tuple[1] sql_cmd = sql_tuple[4] # (CMD) type/state of the request, in this case we are only interested 'Query' sql_time = sql_tuple[5] # (Time) Time sql_query = sql_tuple[7] or '' # (SQL-query) sql query, None if no sql query if sql_cmd == 'Sleep': # filter 'Sleep' state continue snapshot_line = [sql_cmd, sql_time, sql_query] # group by database user name grouped_by_user = process_snapshot.get(db_username, []) grouped_by_user.append(snapshot_line) process_snapshot[db_username] = grouped_by_user return process_snapshot def get(self, cplogin_lst=None): # type: (Optional[List[str]]) -> Dict[str, List] """ :param cplogin_lst: a list of users to retrieve data; None if the data is returned for all users registered in the control panel :return: sql queries for each user """ process_snapshot = self._get_sql_process_list() # refresh map if new database users detect new_db_users = set(process_snapshot.keys()) - self._db_users if new_db_users: self._refresh_map() log.debug( 'New database user(s) %s detected; database users map refreshed', str(list(new_db_users))[1:-1], ) # refresh self._db_users to detect new users for new_db_user in new_db_users: self._db_users.add(new_db_user) # group and filter by control panel users sql_snapshot = {} cplogin_lst_ = cplogin_lst or cpusers() or [] for db_username, sql_snap in list(process_snapshot.items()): cp_username = self._dblogin_cplogin_map.get(db_username) if cp_username is not None and cp_username in cplogin_lst_: # group sql snapshots by control panel user sql_snap_ = sql_snapshot.get(cp_username, []) sql_snap_.extend(sql_snap) sql_snapshot[cp_username] = sql_snap_ return sql_snapshot