관리-도구
편집 파일: lves_tracker.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import typing import pprint from dataclasses import dataclass from functools import cached_property from datetime import timedelta from types import MappingProxyType from typing import Callable, Mapping from collections.abc import Set from ._logs import logger from .notify import Emitter, Signal from .common import ( BurstingMultipliers, LveId, Timestamp, LveLimits, ApplyLveSettings, SerializedLveId, get_deserialized_lve_id, LveStats, LveUsage, LveState, empty_usage, ) from .history import IntervalType, LveHistory from .lve_sm import LveStateManager, LveStateSummary @dataclass(frozen=True) class LveStateManagerFactory: _lve_to_history: dict[LveId, LveHistory] _apply_lve_settings: ApplyLveSettings _quota: timedelta _quota_window: timedelta _bursting_multipliers: BurstingMultipliers _fail_fast: bool = True def __post_init__(self): lves_with_broken_history = { str(lve_id) for lve_id, history in self._lve_to_history.items() if history.ongoing_interval_type == IntervalType.OVERUSING } if lves_with_broken_history: raise ValueError( 'LVEs ' + ', '.join(lves_with_broken_history) + ' are marked as "overusing" ' ' in initial history loaded from persistent storage!' ) def __call__( self, lve_id: LveId, now: Timestamp, normal_limits: LveLimits, stats: LveStats, usage: LveUsage, ) -> LveStateManager: try: history = self._lve_to_history.pop(lve_id) except KeyError: history = LveHistory() else: cutoff = typing.cast(Timestamp, now - self._quota_window.total_seconds()) history = history.trim(cutoff) return LveStateManager( now=now, lve_id=lve_id, initial_history=history, bursting_multipliers=self._bursting_multipliers, initial_normal_limits=normal_limits, initial_stats=stats, initial_usage=usage, quota=self._quota, quota_window=self._quota_window, apply_lve_settings=self._apply_lve_settings, fail_fast=self._fail_fast, ) class LvesTracker: def __init__( self, create_lve_manager: LveStateManagerFactory, fail_fast: bool = True, deserialize_lve_id: Callable[[SerializedLveId], LveId] = get_deserialized_lve_id, ) -> None: self._create_lve_manager = create_lve_manager self._fail_fast = fail_fast self._deserialize_lve_id = deserialize_lve_id self._serialized_id_to_manager = dict[SerializedLveId, LveStateManager]() # FIXME(vlebedev): Remove these state-sets and replace them with signals and external handlers, # which are interested in state switches. self._state_sets = ( self._bursted, self._unbursted, self._overusing, self._exceeded, ) = ( set[LveStateManager](), set[LveStateManager](), set[LveStateManager](), set[LveStateManager](), ) self._on_manager_added = Emitter() @cached_property def serialized_id_to_manager(self) -> Mapping[SerializedLveId, LveStateManager]: return MappingProxyType(self._serialized_id_to_manager) @property def bursted(self) -> Set[LveStateManager]: return self._bursted @property def unbursted(self) -> Set[LveStateManager]: return self._unbursted @property def overusing(self) -> Set[LveStateManager]: return self._overusing @property def quota_exceeded(self) -> Set[LveStateManager]: return self._exceeded @property def on_manager_added(self) -> Signal: return self._on_manager_added def update( self, now: Timestamp, normal_limits_by_id: Mapping[LveId, LveLimits], stats_by_id: Mapping[SerializedLveId, LveStats], usages_by_id: Mapping[SerializedLveId, LveUsage], ) -> None: # TODO(vlebedev): Filter out users belonging to resellers (LVEStat contains reseller_id field) currently_existing_ids = stats_by_id.keys() newly_appeared_raw_ids = currently_existing_ids - self._serialized_id_to_manager.keys() for serialized_lve_id in newly_appeared_raw_ids: if serialized_lve_id == 0: continue # NOTE(vlebedev): Users under resellers are not supported for now. if stats_by_id[serialized_lve_id].reseller_id != 0: continue lve_id: LveId = self._deserialize_lve_id(serialized_lve_id) # TODO(vlebedev): Can it be that there are no normal limits and/or stats are available? # What to do in this case? errors = [] try: normal_limits = normal_limits_by_id[lve_id] except KeyError: errors.append('normal limits') try: stats = stats_by_id[serialized_lve_id] except KeyError: errors.append('stats') if errors: # TODO(vlebedev): Raise exception when `fail_fast` is set. logger.warning( 'LVE "%s": some "get_initial_readings" listeners failed: %s readings are absent!', lve_id, ' and '.join(f'"{e}"' for e in errors), ) continue manager = self._create_lve_manager( now=now, lve_id=lve_id, normal_limits=normal_limits, stats=stats, usage=usages_by_id.get(serialized_lve_id, empty_usage), ) self._serialized_id_to_manager[serialized_lve_id] = manager logger.debug('LVE "%s": unknown LVE appeared - created manager for it', lve_id) if logger.isEnabledFor(logging.DEBUG): logger.debug('LVEs known to adjuster: \n%s', pprint.pformat({ slid: str(LveStateSummary.for_lve(m)) for slid, m in self._serialized_id_to_manager.items() }, width=-1)) disappeared_exc, to_delete = LveStateManager.Disappered(now=now), set() for serialized_lve_id, manager in self._serialized_id_to_manager.items(): lve_id = manager.lve_id manager.trim_history(now) manager.step(LveStateManager.UpdateReadings( now=now, normal_limits=normal_limits_by_id.get(lve_id), stats=stats_by_id.get(serialized_lve_id), usage=usages_by_id.get(serialized_lve_id, empty_usage), )) if serialized_lve_id not in currently_existing_ids and manager.state != LveState.EXISTED: manager.step(disappeared_exc) for state_set in self._state_sets: state_set.discard(manager) if not manager.history_contains_overusing: to_delete.add(serialized_lve_id) continue lve_quota_exceeded = manager.check_quota_exceeded(now) (self._overusing.add if manager.is_overusing else self._overusing.discard)(manager) (self._bursted.add if manager.is_bursted else self._bursted.discard)(manager) (self._unbursted.add if manager.is_unbursted else self._unbursted.discard)(manager) (self._exceeded.add if lve_quota_exceeded else self._exceeded.discard)(manager) if self._fail_fast: if self._bursted.intersection(self._unbursted) != set(): raise AssertionError('LVEs can`t be both bursted and unbursted!') if self._overusing.intersection(self._unbursted) != set(): raise AssertionError('LVEs can`t be both overusing and unbursted!') for serialized_lve_id in to_delete: manager = self._serialized_id_to_manager.pop(serialized_lve_id) logger.debug( 'LVE "%s": LVE is no longer active and has empty history - forgetting corresponding manager', manager.lve_id, ) # NOTE(vlebedev): Trigger signal listeners only after internal state is finished to be updated. for serialized_id in newly_appeared_raw_ids: try: manager = self._serialized_id_to_manager[serialized_id] except KeyError: continue try: self._on_manager_added(manager) except Exception: if self._fail_fast: raise logger.exception('LVE "%s": some "on_manager_created" listeners failed!', manager.lve_id)