관리-도구
편집 파일: main_loop.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import time import psutil from lvestats.eventloop.plugin_executors import ( DbRecoveredException, DbRecoverFailedException, PluginExecutionException, PluginTimeoutException, SameProcessPluginExecutor, SeparateProcessPluginExecutor, ) from lvestats.lib.config import ConfigError __author__ = 'iseletsk' class MainLoop(object): def __init__(self, config, interval=5, plugins=None, plugin_timeout=5, multiproc=True, profiling_log=None): """ :param config: :param lve_data: data being collected, used and modified by plugins during main loop execution. :param interval: update interval in seconds :param plugins: list of plugins :param plugin_timeout: time in seconds to let plugin be executed :param str profiling_log: if not None -- file to write profiling info :return: """ self.config = config self.interval = interval self.default_interval = interval self.do_exit = False if plugins is None: self.plugins = [] else: self.plugins = plugins self.aggregation_period = config.get('aggregation_period', 60) self.default_user_plugin_timeout = plugin_timeout self.log = logging.getLogger('main.loop') if multiproc: self.executor = SeparateProcessPluginExecutor(config, profiling_log) else: self.executor = SameProcessPluginExecutor(config) self.profiling_log = profiling_log def exit(self): self.log.debug('Want to exit') self.do_exit = True @staticmethod def _plugin_path_name(plugin_class): """ >>> from lvestats.plugins.generic.aggregators import LveUsageAggregator >>> plugin_class = LveUsageAggregator >>> MainLoop._plugin_path_name(plugin_class) :param plugin_class: :return: Filesystem path to plugin and it's name """ import inspect # pylint: disable=import-outside-toplevel path = inspect.getfile(plugin_class) path = path.replace('.pyc', '.py') name = plugin_class.__name__ return path, name def get_debug_info(self, stat_getter): try: stat_value = stat_getter() return str(stat_value) except Exception as e: self.log.debug('Error while getting info: %s', str(e)) return '' def run(self, times=None): cnt = None if times: cnt = times profiler = None if self.profiling_log: import cProfile # pylint: disable=import-outside-toplevel profiler = cProfile.Profile() if not cnt: cnt = 10 self.log.info('Profiling server for %s plugin cycles', cnt) profiler.enable() while True: now = time.time() for plugin in self.plugins: if self.do_exit: self.log.debug('Exiting') self.executor.terminate() return if getattr(plugin, '__is_user_plugin__', False): timeout = getattr(plugin, 'timeout', self.default_user_plugin_timeout) else: timeout = getattr(plugin, 'timeout', None) try: if profiler: profiler.runcall(self.executor.execute, plugin, now, timeout) else: self.executor.execute(plugin, now, timeout) except DbRecoveredException: self.log.warning("Db was corrupt, recovered") continue except DbRecoverFailedException: self.log.exception("Unable to recover database") return except PluginTimeoutException: load = { 'process_count': self.get_debug_info(lambda: len(psutil.pids())), 'cpu': self.get_debug_info(psutil.cpu_times_percent), 'mem': self.get_debug_info(lambda: psutil.virtual_memory().percent), 'io': self.get_debug_info(psutil.disk_io_counters) } path, name = MainLoop._plugin_path_name(plugin) self.log.error("Plugin %s:%s timed out", path, name, extra={'data': load}) continue except PluginExecutionException: path, name = MainLoop._plugin_path_name(plugin) self.log.exception("Error during execution of %s:%s", path, name) continue except ConfigError: path, name = MainLoop._plugin_path_name(plugin) self.log.exception("Unable to init plugin %s:%s", path, name) if cnt: cnt -= 1 if cnt == 0: break self._sleep(now) if profiler: self.log.info('Profiling finished') import pstats # pylint: disable=import-outside-toplevel with open(self.profiling_log, 'a+', encoding='utf-8') as f: f.write('Main loop profile, internal time:\n') stats = pstats.Stats(profiler, stream=f) stats.sort_stats('time').print_stats(20) f.write('Main loop profile, cumulative time:\n') stats.sort_stats('cumulative').print_stats(20) self.executor.terminate() def _sleep(self, now): time_passed = time.time() - now pause = self.interval - time_passed if pause > 0: if self.interval > self.default_interval: self.interval -= 0.5 time.sleep(pause) else: if self.interval < self.aggregation_period: self.interval += 0.5