관리-도구
편집 파일: lveinfolib_gov.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from math import ceil from sqlalchemy import Integer, desc from sqlalchemy import and_, select, func, case, cast from sqlalchemy import or_ from lvestats.lib import uidconverter from lvestats.lib.commons.dateutil import ( gm_datetime_to_unixtimestamp, gm_to_local, unixtimestamp_to_gm_datetime, local_to_gm, round_1m, ) from lvestats.lib.lveinfolib import OutputFormatter from lvestats.lib.lveinfolib import convert_to_list, enumerate_duplicate_columns, add_labes_to_column from lvestats.lib.lveinfolib import dyn_time_unit_groups from lvestats.orm import history_gov def get_bitwise_restrict(cause_of_restrict): """ :param sqlalchemy.orm.attributes.InstrumentedAttribute cause_of_restrict: :return sqlalchemy.sql.elements.BinaryExpression: """ return ( func.max(cause_of_restrict.op('&')(4)) + func.max(cause_of_restrict.op('&')(2)) + func.max(cause_of_restrict.op('&')(1)) ) def get_cause_of_restrict(cause_of_restrict): """ :param sqlalchemy.sql.elements.BinaryExpression cause_of_restrict: :return sqlalchemy.sql.elements.Case: """ return case( [ (cause_of_restrict == 1, 'C'), (cause_of_restrict == 2, 'R'), (cause_of_restrict == 3, 'CR'), (cause_of_restrict == 4, 'W'), (cause_of_restrict == 5, 'CW'), (cause_of_restrict == 6, 'RW'), (cause_of_restrict == 7, 'CRW'), ], else_='', ) class HistoryShowDBGov(object): DEFAULT_ORDER = 'ts' def __init__( self, dbengine, period_from, period_to, uid=None, server_id='localhost', show_columns=None, order_by=None, limit=None, cfg=None, time_unit=None, reverse=False, by_usage=None, by_usage_percentage=0.9, ): """ :param bool reverse: :param sqlalchemy.engine.base.Engine dbengine: :param datetime.datetime period_from: :param datetime.datetime period_to: :param int|None uid: :param str server_id: :param str|list|tuple|None show_columns: :param str order_by: :param int|None limit: :param dict|None cfg: :param int|None time_unit: :param bool reverse: :param list[str]|str|None by_usage: :param float by_usage_percentage: value in range [0..1] :return list: """ self.by_usage_percentage = by_usage_percentage self.by_usage = by_usage self.dbengine = dbengine self.period_from = period_from self.period_to = period_to self.uid = uid self.server_id = server_id self.requested_columns = show_columns self.order_by = order_by self.limit = limit self.cfg = cfg or {'server_id': 'localhost'} if time_unit == -1: self.time_unit = None else: self.time_unit = time_unit self.reverse = reverse def history_dbgov_show_dynamic(self): """ Show dbgov statistics with dynamic time-unit. :rtype: list """ if self.uid is None: return [] # get first and last record time max_ts, min_ts = self._get_min_max_ts(self.uid) period_from = max(gm_to_local(unixtimestamp_to_gm_datetime(min_ts)), gm_to_local(self.period_from)) period_to = min(gm_to_local(unixtimestamp_to_gm_datetime(max_ts)), gm_to_local(self.period_to)) # remove seconds from time period_from = round_1m(period_from) period_to = round_1m(period_to) rows = [] for t_from, t_to, time_unit in reversed(dyn_time_unit_groups(period_from, period_to)): self.period_from = local_to_gm(t_from) self.period_to = local_to_gm(t_to) self.time_unit = time_unit rows += self.history_dbgov_show() return rows def history_dbgov_show(self): """ Show statistics from history_gov table :rtype: list """ # check and return some data without run sql query if self.uid == tuple() or self.uid == []: return [] if self.uid is not None and not isinstance(self.uid, (list, tuple)) and self.uid <= 0: return [] t_from = gm_datetime_to_unixtimestamp(self.period_from) t_to = gm_datetime_to_unixtimestamp(self.period_to) # convert 0 to None limit = self.limit or None # get information how to aggregate info in different columns # e.g. 'ts': sum(history_gov.ts) or 'cpu': avg(history_gov.cpu) table_aggregate_info = self._get_table_aggregate_info(t_from, t_to) # get list of columns to select from database columns_to_select = self._get_columns_for_select(table_aggregate_info) # order_by query order_by_query = self._get_order_by_query(table_aggregate_info) where_query = and_(history_gov.ts.between(t_from, t_to), history_gov.server_id == self.server_id) # query to group group_by_time_query = self.round_time(t_from) if isinstance(self.uid, int): query = ( select(enumerate_duplicate_columns(columns_to_select)) .where(and_(where_query, history_gov.uid == self.uid)) .order_by(order_by_query) ) if self.time_unit is not None: query = query.group_by(group_by_time_query, history_gov.uid).limit(limit) else: query = query.group_by(group_by_time_query, history_gov.uid, history_gov.cause_of_restrict).limit(limit) else: query = select(enumerate_duplicate_columns(columns_to_select)).where(where_query).order_by(order_by_query) # filter by user list if need if isinstance(self.uid, (list, tuple)): query = query.where(history_gov.uid.in_(list(self.uid))) # group and limit query = query.group_by( history_gov.server_id, history_gov.uid, ).limit(limit) if self.by_usage: query = query.having(self._get_by_usage_query(table_aggregate_info)) rows = self.dbengine.execute(query) # replace uid with username and normalise types results = OutputFormatter( fields=self.requested_columns, rows=rows.fetchall(), orders=[ ( ['USER'], lambda uid_: uidconverter.uid_to_username( int(uid_), self.cfg['server_id'], self.server_id, self.dbengine ), ), (['CPU', 'READ', 'WRITE'], lambda x: x if x is None else round(float(x), 7)), (['TS', 'FROM', 'TO', 'lCPU', 'lREAD', 'lWRITE', 'CON'], lambda x: x if x is None else int(x)), ], ) rows.close() return results.get_corrected_list() def _get_columns_for_select(self, table_aggregate_info): """ Return list of columns that should be selected from table. :param table_aggregate_info: dict from _get_table_aggregate_info :return: query for each column :rtype: list """ columns_to_select = [] for column_name in convert_to_list(self.requested_columns): columns_to_select.append(table_aggregate_info[column_name.lower()]) return columns_to_select def _get_table_aggregate_info(self, t_from, t_to): """ Returns dictionary: { column_name: aggregate_query } Ignores t_from and t_to if self.uid is not None; :type t_from: int :type t_to: int :rtype: dict """ if self.uid is None or isinstance(self.uid, list): columns_info = self.get_history_gov_all_users_func(t_to - t_from) else: columns_info = self.get_history_gov_all_users_func() return columns_info def _get_min_max_ts(self, uid): """ Get min and max timestamp for given user id. :rtype: tuple(int, int) """ query = select([func.min(history_gov.ts), func.max(history_gov.ts)]).where(history_gov.uid == uid) min_ts, max_ts = self.dbengine.execute(query).fetchall()[0] return max_ts, min_ts def _get_order_by_query(self, history_gov_aggregate_func): """ Create order_by query :param history_gov_aggregate_func: :return: sqlalchemy query """ # generate order by if self.order_by is None or self.order_by == 'con': # con - deprecated; use default self.order_by = self.DEFAULT_ORDER if self.order_by == 'ts' and self.uid is None: self.order_by = 'id' order_by_query = history_gov_aggregate_func[self.order_by.lower()] if self.order_by not in ('id', 'con', 'ts') and self.reverse: order_by_query = desc(order_by_query) return order_by_query def _get_by_usage_query(self, table_aggregate_info): """ Get by_usage query. :param table_aggregate_info: dict from _get_table_aggregate_info :return: """ filter_list = [] for by_usage_item in convert_to_list(self.by_usage): if by_usage_item.lower() == 'cpu': filter_list.append( (table_aggregate_info['cpu'] / table_aggregate_info['lcpu']) > self.by_usage_percentage ) elif by_usage_item.lower() == 'io': filter_list.append( (table_aggregate_info['io'] / table_aggregate_info['lio']) > self.by_usage_percentage ) else: raise NotImplementedError() return or_(*filter_list) def get_history_gov_all_users_func(self, time_period=None): """ :type time_period: int :rtype: dict """ t_from = gm_datetime_to_unixtimestamp(self.period_from) # round time depending on time_unit normalized_from = self.round_time(t_from, offset=0, ts_query=func.min(history_gov.ts)) normalized_to = self.round_time(t_from, offset=1, ts_query=func.min(history_gov.ts)) time_interval = time_period or self.time_unit if time_interval is None: column_labels = add_labes_to_column( { 'from': normalized_from, 'to': normalized_to, 'ts': history_gov.ts, 'id': history_gov.uid, 'user': history_gov.uid, 'cpu': func.max(history_gov.sum_cpu), 'read': func.sum(history_gov.sum_read), 'write': func.sum(history_gov.sum_write), 'io': func.avg(history_gov.sum_write + history_gov.sum_read), 'lio': func.max(history_gov.limit_write_on_period_end + history_gov.limit_read_on_period_end), 'con': 0, # 'CON' deprecated every time 0 'lcpu': func.max(history_gov.limit_cpu_on_period_end), 'lread': func.max(history_gov.limit_read_on_period_end), 'lwrite': func.max(history_gov.limit_write_on_period_end), 'restrict': get_cause_of_restrict(history_gov.cause_of_restrict), } ) else: minutes_passed = int(ceil(time_interval / 60.0)) column_labels = add_labes_to_column( { 'from': normalized_from, 'to': normalized_to, 'ts': func.max(history_gov.ts), 'id': history_gov.uid, 'user': history_gov.uid, 'cpu': func.sum(history_gov.sum_cpu) / minutes_passed, 'read': func.sum(history_gov.sum_read) / minutes_passed, 'write': func.sum(history_gov.sum_write) / minutes_passed, 'con': 0, # 'CON' deprecated every time 0 'io': func.sum(history_gov.sum_write + history_gov.sum_read) / minutes_passed, 'lio': func.max(history_gov.limit_write_on_period_end + history_gov.limit_read_on_period_end), 'lcpu': func.max(history_gov.limit_cpu_on_period_end), 'lread': func.max(history_gov.limit_read_on_period_end), 'lwrite': func.max(history_gov.limit_write_on_period_end), 'restrict': get_cause_of_restrict(get_bitwise_restrict(history_gov.cause_of_restrict)), } ) return column_labels def round_time(self, time_from, offset=0, ts_query=None): """ Round time depending on time_unit. Returns query, that returns nearest (N * time_unit + time_from) for current ts. :param ts_query: custom column :type time_from: int :type offset: int :return: """ if ts_query is None: ts_query = history_gov.ts if self.time_unit: if self.dbengine.url.drivername == 'sqlite': # cast(..., Integer) using for compatibility with lve-stats-2.1-8 database; 'created' saved as float return cast(((ts_query - time_from) / self.time_unit + offset), Integer) * self.time_unit + time_from else: return func.floor((ts_query - time_from) / self.time_unit + offset) * self.time_unit + time_from else: return ts_query