관리-도구
편집 파일: statsnotifier.py
#!/opt/cloudlinux/venv/bin/python3 -bb # -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # # Author: Igor Seletskiy <iseletsk@cloudlinux.com> # Alexander Grynchuk <agrynchuk@cloudlinux.com> # Illarion Kovalchuk <ikovalchuk@cloudlinux.com> # # pylint: disable=too-many-lines import itertools import json import locale import logging import os import pwd from datetime import datetime, timedelta from email import message_from_string from email.header import Header from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from functools import reduce from itertools import groupby from typing import Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple # NOQA from prettytable import ALL, PrettyTable from clcommon import clemail, clproc, cpapi from clcommon.clproc import LIMIT_LVP_ID from clcommon.cpapi import admins, cpinfo, get_admin_locale, get_user_login_url from clcommon.cpapi.plugins.universal import get_admin_email as system_admin_email from clcommon.mail_helper import MailHelper from clconfig.lve_stats2_lib import get_notification as get_admin_notification from lveapi import NameMap from lvestats.core.plugin import LveStatsPlugin from lvestats.lib.commons import dateutil from lvestats.lib.commons.func import ( deserialize_lve_id, gcd, get_hostname, merge_dicts, serialize_lve_id, user_should_be_notified, ) from lvestats.lib.commons.sizeutil import mempages_to_bytes from lvestats.lib.lveinfolib import FIELD_AVERAGE, FIELD_FAULT, FIELD_LIMIT, HistoryShowUnion, get_lve_version from lvestats.lib.notifications_helper import NotificationsHelper try: # Just for backward compatibility with not released lve-utils from clconfig.lve_stats2_reseller_lib import get_notification # pylint: disable=ungrouped-imports except ImportError: # Will results in a fallback to admin's settings: def get_notification(reseller): return None DEFAULT_LOCALE = 'en_US' ADMIN_TEMPL = 'admin_notify.txt' ADMIN_TEMPL_HTML = 'admin_notify.html' USER_TEMPL = 'user_notify.txt' USER_TEMPL_HTML = 'user_notify.html' RESELLER_FAULTS_TEMPL = 'reseller_faults_notify.txt' RESELLER_FAULTS_TEMPL_HTML = 'reseller_faults_notify.html' RESELLER_TEMPL = 'reseller_notify.txt' RESELLER_TEMPL_HTML = 'reseller_notify.html' DEFAULT_ADMIN_MAIL = system_admin_email() DEFAULT_SUBJECT = 'Hosting account resources exceeded' FIELDS_ALL = ['ID'] + FIELD_FAULT + ['anyF'] + FIELD_AVERAGE + FIELD_LIMIT NotifySettings = NamedTuple( 'NotifySettings', [ ('NOTIFY_MIN_FAULTS', int), ('NOTIFY_INTERVAL', int), ], ) NotifyFaultsOptions = NamedTuple( 'NotifyFaultsOptions', [ ('NOTIFY_CPU', bool), ('NOTIFY_IO', bool), ('NOTIFY_IOPS', bool), ('NOTIFY_MEMORY', bool), ('NOTIFY_EP', bool), ('NOTIFY_NPROC', bool), ], ) AdminSettings = NamedTuple( 'AdminSettings', [ ('NOTIFY_ADMIN', Optional[bool]), ('NOTIFY_CUSTOMERS_ON_FAULTS', Optional[bool]), ('NOTIFY_RESELLER_CUSTOMERS', Optional[bool]), ('NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS', Optional[bool]), ('NOTIFY_FAULTS_TYPES', NotifyFaultsOptions), ('NOTIFY_OPTIONS_SELF', NotifySettings), ('NOTIFY_OPTIONS_CUSTOMER', NotifySettings), ('NOTIFY_FROM_EMAIL', Optional[str]), ('REPORT_ADMIN_EMAIL', Optional[str]), ('NOTIFY_SUBJECT', str), ('NOTIFY_CHARSET_EMAIL', Optional[str]), ], ) ResellerSettings = NamedTuple( 'ResellerSettings', [ ('NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS', bool), ('NOTIFY_CUSTOMERS_ON_FAULTS', bool), ('NOTIFY_RESELLER_ON_TOTAL_FAULTS', bool), ('NOTIFY_FAULTS_TYPES', NotifyFaultsOptions), ('NOTIFY_OPTIONS_SELF', NotifySettings), ('NOTIFY_OPTIONS_CUSTOMER', NotifySettings), ], ) class StatsNotifierTemplateError(clemail.jinja2.exceptions.TemplateError): pass class StatsNotifier(LveStatsPlugin): DEFAULT_PERIOD = 12 * 60 * 60 # 12 hours MIN_PERIOD = 1 * 60 * 60 # 1 hour TEMPLATE_DIR = '/usr/share/lve/emails/' TEMPLATE_CUSTOM_DIR = '/etc/cl.emails.d/' ADMIN_LVP_ID = 0 LOCALE_DEFINES_FILE = 'locale_defines.json' def __init__(self): self.server_id = None self.db_engine = None self.lve_version = 6 self.log = logging.getLogger('statsnotifier') self._load_plugin_settings() self.hostname = get_hostname() self._mail_helper = MailHelper() self._reseller_settings_cache = {} self._proc_lve = clproc.ProcLve() self._name_map = NameMap() try: self.resellers = set(cpapi.resellers()) except (cpapi.NotSupported, AttributeError): self.resellers = set() self._notifications = NotificationsHelper() def _load_plugin_settings(self): """[Re]load all plugin settings""" self._admin_settings = self.get_admin_settings() self._default_reseller_settings = self.get_default_reseller_settings() # let's wake up at least every hour (minimal period for resellers) # then we will iter over all resellers and do stuff notify_interval_admin = self._admin_settings.NOTIFY_OPTIONS_SELF.NOTIFY_INTERVAL notify_interval_users = self._admin_settings.NOTIFY_OPTIONS_CUSTOMER.NOTIFY_INTERVAL self.period = reduce(gcd, [notify_interval_admin, notify_interval_users, StatsNotifier.MIN_PERIOD]) # Set default email headers self._prepare_default_mail_headers() # Log plugin settings self.log.info("Config: NOTIFY_ADMIN=%s", self._admin_settings.NOTIFY_ADMIN) self.log.info("Config: NOTIFY_CUSTOMERS_ON_FAULTS=%s", self._admin_settings.NOTIFY_CUSTOMERS_ON_FAULTS) self.log.info("Config: NOTIFY_RESELLER_CUSTOMERS=%s", self._admin_settings.NOTIFY_RESELLER_CUSTOMERS) self.log.info( "Config: NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS=%s", self._admin_settings.NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS ) self.log.info("Config: NOTIFY_CPU=%s", self._admin_settings.NOTIFY_FAULTS_TYPES.NOTIFY_CPU) self.log.info("Config: NOTIFY_IO=%s", self._admin_settings.NOTIFY_FAULTS_TYPES.NOTIFY_IO) self.log.info("Config: NOTIFY_IOPS=%s", self._admin_settings.NOTIFY_FAULTS_TYPES.NOTIFY_IOPS) self.log.info("Config: NOTIFY_MEMORY=%s", self._admin_settings.NOTIFY_FAULTS_TYPES.NOTIFY_MEMORY) self.log.info("Config: NOTIFY_EP=%s", self._admin_settings.NOTIFY_FAULTS_TYPES.NOTIFY_EP) self.log.info("Config: NOTIFY_NPROC=%s", self._admin_settings.NOTIFY_FAULTS_TYPES.NOTIFY_NPROC) self.log.info( "Config: NOTIFY_INTERVAL_ADMIN=%s seconds", self._admin_settings.NOTIFY_OPTIONS_SELF.NOTIFY_INTERVAL ) self.log.info("Config: NOTIFY_MIN_FAULTS_ADMIN=%s", self._admin_settings.NOTIFY_OPTIONS_SELF.NOTIFY_MIN_FAULTS) self.log.info( "Config: NOTIFY_INTERVAL_USER=%s seconds", self._admin_settings.NOTIFY_OPTIONS_CUSTOMER.NOTIFY_INTERVAL ) self.log.info( "Config: NOTIFY_MIN_FAULTS_USER=%s", self._admin_settings.NOTIFY_OPTIONS_CUSTOMER.NOTIFY_MIN_FAULTS ) self.log.info("Config: NOTIFY_FROM_EMAIL=%s", self._admin_settings.NOTIFY_FROM_EMAIL) self.log.info("Config: REPORT_ADMIN_EMAIL=%s", self._admin_settings.REPORT_ADMIN_EMAIL) self.log.info("Config: NOTIFY_SUBJECT=%s", self._admin_settings.NOTIFY_SUBJECT) self.log.info("Config: NOTIFY_CHARSET_EMAIL=%s", self._admin_settings.NOTIFY_CHARSET_EMAIL) self.log.info("Config: Email subject used if locale_defines.json absent: %s", self.mail_headers['Subject']) def _get_resellers_settings(self, with_admin=False): # type: (bool) -> Tuple[int, ResellerSettings] # get notifications settings for each reseller that is present in proc/lve/list if self._proc_lve.resellers_supported(): for reseller_id in self._proc_lve.lvp_id_list(): reseller_name = self._name_map.get_name(reseller_id) if reseller_name is None: self.log.warning( "Unable to obtain notify settings: reseller %s exists " "in /proc/lve/resellers, but is absent in ve.cfg.", reseller_id, ) continue reseller_settings = self.get_reseller_settings(reseller_name) yield reseller_id, reseller_settings or self._default_reseller_settings if with_admin: yield self.ADMIN_LVP_ID, self._admin_settings def _get_customers_options_filtered(self): # type: () -> filter """ We do not need resellers with disabled NOTIFY_CUSTOMERS_ON_FAULTS notifications, also we do not need those resellers, whose customers were notified recently. Let's remove such resellers from check-list. """ resellers_settings = self._get_resellers_settings(with_admin=True) # we do not need resellers that disables their customers notification # also we do not need resellers that were notified recently def _filter_by_enabled_notifications(reseller_notify_settings): # type: (Tuple[int, ResellerSettings]) -> bool (reseller_id, notify_options) = reseller_notify_settings return notify_options.NOTIFY_CUSTOMERS_ON_FAULTS and self._notifications.users_need_notification( reseller_id, notify_options.NOTIFY_OPTIONS_CUSTOMER.NOTIFY_INTERVAL ) return filter(_filter_by_enabled_notifications, resellers_settings) def _get_customers_options_grouped(self): # type: () -> groupby """ If some resellers have same settings for notify we can send notifications to their users using only one database request. For that, we must group them by customers settings """ def _group_by_customers_options(customers_notify_settings): # type: (Tuple[str, Optional[ResellerSettings]]) -> Tuple[NotifySettings, NotifyFaultsOptions] (_, notify_options) = customers_notify_settings return notify_options.NOTIFY_OPTIONS_CUSTOMER, notify_options.NOTIFY_FAULTS_TYPES return groupby(self._get_customers_options_filtered(), _group_by_customers_options) def _get_resellers_options_filtered(self): # type: () -> filter """ We do not need resellers with disabled notifications at all also we do not need resellers that were notified recently. Let's remove such resellers from check-list. """ resellers_settings = self._get_resellers_settings() def _filter_by_enabled_notifications(reseller_notify_options): # type: (Tuple[int, ResellerSettings]) -> bool (reseller_id, notify_options) = reseller_notify_options return notify_options.NOTIFY_RESELLER_ON_TOTAL_FAULTS and self._notifications.reseller_need_notification( reseller_id, notify_options.NOTIFY_OPTIONS_SELF.NOTIFY_INTERVAL ) return filter(_filter_by_enabled_notifications, resellers_settings) def _get_resellers_options_grouped(self): # type: () -> groupby """ If some resellers have same settings for notifywe can send them notifications using only one database request. For that, we must group them by settings """ resellers_settings = self._get_resellers_options_filtered() def _group_by_self_options(reseller_options_grouped): # type: (Tuple[int, Optional[ResellerSettings]]) -> Tuple[NotifySettings, NotifyFaultsOptions] (_, notify_options) = reseller_options_grouped return notify_options.NOTIFY_OPTIONS_SELF, notify_options.NOTIFY_FAULTS_TYPES return groupby(resellers_settings, _group_by_self_options) def _get_reseller_customers_faults_options_filtered(self): # type: () -> filter """Get resellers with enabled NOTIFY_ON_CUSTOMERS_FAULTS""" resellers_settings = self._get_resellers_settings(with_admin=True) def _filter_by_enabled_notifications(resellers_settings): # type: (Tuple[int, ResellerSettings]) -> bool (reseller_id, notify_options) = resellers_settings return ( notify_options.NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS and self._notifications.reseller_need_notification( reseller_id, notify_options.NOTIFY_OPTIONS_SELF.NOTIFY_INTERVAL ) ) return filter(_filter_by_enabled_notifications, resellers_settings) def _get_resellers_customers_faults_options_grouped(self): # type: () -> groupby resellers_settings = self._get_reseller_customers_faults_options_filtered() def _group_by_self_options(reseller_customers_settings): # type: (Tuple[int, ResellerSettings]) -> Tuple[NotifySettings, NotifyFaultsOptions] (_, notify_options) = reseller_customers_settings return notify_options.NOTIFY_OPTIONS_SELF, notify_options.NOTIFY_FAULTS_TYPES return groupby(resellers_settings, _group_by_self_options) def _prepare_default_mail_headers(self): # type: () -> None self.mail_headers = { # pylint: disable=attribute-defined-outside-init 'Subject': self._admin_settings.NOTIFY_SUBJECT or DEFAULT_SUBJECT } def set_config(self, config): self.server_id = config.get('server_id', 'localhost') def set_db_engine(self, engine): self.db_engine = engine self.lve_version = get_lve_version(engine, self.server_id) @staticmethod def _get_panel_login_url(domain): # type: (str) -> str try: return get_user_login_url(domain) except cpapi.NotSupported: return f'http://{domain}/' @staticmethod def get_locales(localename): # type: (str) -> str return locale.normalize(localename).split('.')[0] def get_users_fault(self, period, notify_min_faults, notify_faults_types, uids=None): # type: (int, int, NotifyFaultsOptions, List[int]) -> List[Dict] """ Get statistics for given period; """ period_to = datetime.utcnow() period_from = period_to - timedelta(seconds=period) by_fault_arg = self._get_faults_keys(notify_faults_types) if not by_fault_arg: return [] history_show = HistoryShowUnion( self.db_engine, period_from, period_to, server_id=self.server_id, show_columns=FIELDS_ALL, order_by='anyF', by_fault=by_fault_arg, threshold=notify_min_faults, uid=uids, ) users_fault_data = history_show.proceed_dict() users_stats = [] for user_stat in users_fault_data: user_stat['PERIOD'] = period # convert mempages to kbytes user_stat.update( { key: mempages_to_bytes(val) // 1024 for key, val in user_stat.items() if key in ('aVMem', 'aPMem', 'lVMem', 'lPMem') } ) # convert bytes/s to kbytes/s user_stat.update({key: val // 1024 for key, val in user_stat.items() if key in ('aIO', 'lIO')}) users_stats.append(user_stat) return users_stats @staticmethod def _get_faults_keys(notify_types_options): # type: (NotifyFaultsOptions) -> List[str] """ Get list of keys for 'by_faults'; If reseller_settings is not None, use admin settings; Else - use reseller's settings; """ by_fault_arg = itertools.compress( ['CPUf', 'VMemF', 'PMemF', 'EPf', 'NprocF', 'IOf', 'IOPSf'], [ notify_types_options.NOTIFY_CPU, notify_types_options.NOTIFY_MEMORY, # deprecated notify_types_options.NOTIFY_MEMORY, notify_types_options.NOTIFY_EP, notify_types_options.NOTIFY_NPROC, notify_types_options.NOTIFY_IO, notify_types_options.NOTIFY_IOPS, ], ) return list(by_fault_arg) def _get_resellers_data(self, faults_data): # type: (Iterable[Dict]) -> Iterator[Dict] """ Get information about reseller's accounts; """ for reseller_data in faults_data: reseller_id, _ = deserialize_lve_id(reseller_data['ID']) reseller_data['LOGIN'] = self._name_map.get_name(reseller_id) try: cp_userinfo = cpapi.cpinfo(reseller_data['LOGIN'], keyls=('mail', 'locale'), search_sys_users=False)[0] except (cpapi.cpapiexceptions.NotSupported, IndexError): continue reseller_data['TOMAIL'] = cp_userinfo[0] reseller_data['LOCALE'] = self.get_locales(cp_userinfo[1] or 'en') yield reseller_data def get_users_data(self, fault_users_data): # type: (Iterable[Dict]) -> List[Dict] """ Get information about end user's accounts; """ users_data = [] for user_data in fault_users_data: user_id, _ = deserialize_lve_id(int(user_data['ID'])) try: user_pwd_data = pwd.getpwuid(user_id) user_data['LOGIN'] = user_pwd_data.pw_name user_data['TONAME'] = user_pwd_data.pw_gecos.split(',')[0] except KeyError: continue try: cp_userinfo = cpapi.cpinfo(user_data['LOGIN'], keyls=('mail', 'dns', 'locale', 'reseller'))[0] except (IndexError, cpapi.cpapiexceptions.NotSupported): continue user_data['TOMAIL'] = cp_userinfo[0] # user's email user_data['DOMAIN'] = cp_userinfo[1] # user's domain user_data['LOCALE'] = self.get_locales(cp_userinfo[2] or 'en') # Old key presents for backward compatibility user_data['RESELLER'] = cp_userinfo[3] # reseller name # Fixed reseller key without misspeling # Now it works as it should be according to docs user_data['RESELLER'] = cp_userinfo[3] # reseller name user_data['HOSTNAME'] = self.hostname # server hostname user_data['LOGIN_URL'] = self._get_panel_login_url(cp_userinfo[1]) user_data['FROMMAIL'] = self._get_notify_from_mail(user_data['RESELLER']) users_data.append(user_data) return users_data def _get_notify_from_mail(self, reseller): # type: (str) -> str """ If reseller is admin -> send email from admin's mail If reseller is reseller -> send email from reseller's mail """ from_mail = self._admin_settings.NOTIFY_FROM_EMAIL # probably we should replace condition with `not is_admin(reseller)` if reseller in self.resellers: try: # TODO: add NOTIFY_FROM_EMAIL option for reseller from_mail = cpapi.cpinfo(reseller, keyls=('mail',), search_sys_users=False)[0][0] except (IndexError, cpapi.cpapiexceptions.NotSupported): self.log.exception("can't obtain notify_from_mail for %s from cpapi", reseller) return from_mail def _detect_admin_email(self): # type: () -> str """ Determine server admin email; Priority: 1. report_admin_email from config 2. cpapi.get_admin_email() 3. DEFAULT_ADMIN_MAIL """ try: email = self._admin_settings.REPORT_ADMIN_EMAIL or cpapi.get_admin_email() except cpapi.NotSupported: email = None if not email: email = DEFAULT_ADMIN_MAIL self.log.warning( "Can't obtain admin email from control panel. System admin email will be used '%s'", email, ) return email def _detect_path_for_file(self, locale_name: str, templ_filename: str) -> Tuple[str, str]: """ Detects path for specified file. Checks directories in order: 1. self.TEMPLATE_CUSTOM_DIR = '/etc/cl.emails.d/<locale_name>' 2. self.TEMPLATE_DIR = '/usr/share/lve/emails/<locale_name>' :param locale_name: locale name to check :param templ_filename: template filename (without path) :return: Tuple: (file_dir, full_path_to_file) """ # 1. check self.TEMPLATE_CUSTOM_DIR templates_dir = os.path.join(self.TEMPLATE_CUSTOM_DIR, locale_name) full_filename = os.path.join(templates_dir, templ_filename) if os.path.exists(full_filename): return templates_dir, full_filename # 2. check self.TEMPLATE_DIR templates_dir = os.path.join(self.TEMPLATE_DIR, locale_name) return templates_dir, os.path.join(templates_dir, templ_filename) def generate_msg_body(self, templ_data: dict, text_templ_name: str, html_templ_name: str = None): locale_name = templ_data.get('LOCALE', DEFAULT_LOCALE) templates_dir, text_templ_path = self._detect_path_for_file(locale_name, text_templ_name) if not os.path.exists(templates_dir) or not os.path.exists(text_templ_path): logging.info( "Unable to find templates for locale '%s': file '%s' does not exist. " "Statsnotifier will use default templates with locale %s. See " "https://docs.cloudlinux.com/cloudlinux_os_components/#customize-lve-stats2-notifications" " in order to find how to create localized template and hide this warning.", locale_name, text_templ_path, DEFAULT_LOCALE, ) templates_dir, text_templ_path = self._detect_path_for_file(DEFAULT_LOCALE, text_templ_name) # html part is optional, just print warning when it is absent if html_templ_name: html_templ_path = os.path.join(templates_dir, html_templ_name) if not os.path.exists(html_templ_path): logging.info( "Unable to find optional HTML message template '%s'. " "Sending email with only TEXT part." "You can safely ignore this message if you " "do not want to use HTML email templates." "See https://docs.cloudlinux.com/cloudlinux_os_components/#customize-lve-stats2-notifications", html_templ_path, ) html_templ_path = None else: html_templ_path = None _templ_name = text_templ_name # Load data from /usr/share/lve/emails/<locale_name>/locale_defines.json file # locale_defines example: # {'NOTIFY_FROM_SUBJECT': 'Ваш сервер болен', # 'PERIOD': {'days': 'дней', 'hours': 'часов', 'minutes': 'минут', 'seconds': 'секунд'}, # 'TONAME': {'admin': 'Админ', 'reseller': 'Reseller', 'customer': 'Server user'} # } locale_defines = self._load_locales_data(templates_dir) # Fill localized data according to self._locale_defines try: templ_data['TONAME'] = locale_defines['TONAME'][templ_data['user_type']] except KeyError: pass try: number, english_name = dateutil.seconds_to_human_view(templ_data['PERIOD']) try: localized_period_name = locale_defines['PERIOD'][english_name] except KeyError: localized_period_name = english_name templ_data['PERIOD'] = f'{number} {localized_period_name}' except KeyError: pass html_body = None try: subject, text_body = clemail.ClEmail.generate_mail_jinja2(text_templ_path, templ_data=templ_data) if html_templ_path: _templ_name = html_templ_name _, html_body = clemail.ClEmail.generate_mail_jinja2(html_templ_path, templ_data=templ_data) except (clemail.jinja2.exceptions.TemplateError, IOError) as e: raise StatsNotifierTemplateError( f'Can not generate message for user "{templ_data.get("LOGIN")}"; ' f'template "{os.path.join(locale_name, _templ_name)}". Jinja2: {e}' ) from e # Put subject from locales s_subject = locale_defines.get('NOTIFY_FROM_SUBJECT', subject) return s_subject, text_body, html_body def generate_msg(self, templ_data: dict, text_templ_name: str, html_templ_name: str = None) -> MIMEMultipart: subject, text_body, html_body = self.generate_msg_body(templ_data, text_templ_name, html_templ_name) if html_body: # generate multipart message text + html # Clear "text" and "html" data from unreadable symbols text_body = text_body.encode(self._admin_settings.NOTIFY_CHARSET_EMAIL, 'xmlcharrefreplace').decode( self._admin_settings.NOTIFY_CHARSET_EMAIL ) html_body = html_body.encode(self._admin_settings.NOTIFY_CHARSET_EMAIL, 'xmlcharrefreplace').decode( self._admin_settings.NOTIFY_CHARSET_EMAIL ) # Attach parts into message container. # According to RFC 2046, the last part of a multipart message, # in this case # the HTML message, is best and preferred. msg = MIMEMultipart('alternative') msg.attach(MIMEText(text_body, 'plain', self._admin_settings.NOTIFY_CHARSET_EMAIL)) msg.attach(MIMEText(html_body, 'html', self._admin_settings.NOTIFY_CHARSET_EMAIL)) else: # generate simple message (text only) text_body = text_body.encode(self._admin_settings.NOTIFY_CHARSET_EMAIL, 'xmlcharrefreplace').decode( self._admin_settings.NOTIFY_CHARSET_EMAIL ) msg = message_from_string(text_body) msg.add_header('Content-Type', f'text/plain; charset="{self._admin_settings.NOTIFY_CHARSET_EMAIL}"') msg.add_header('Content-Transfer-Encoding', '8bit') # configure message headers msg['Subject'] = Header(subject or self.mail_headers['Subject'], 'utf-8').encode() msg['From'] = templ_data.get('FROMMAIL') or self._admin_settings.NOTIFY_FROM_EMAIL msg['To'] = templ_data['TOMAIL'] return msg def send_msg(self, msg): to_addrs = [_.strip() for _ in msg['To'].split(',')] self._mail_helper.sendmail(msg['From'], to_addrs, msg, encoding_name=self._admin_settings.NOTIFY_CHARSET_EMAIL) def send_notification(self, templ_data: dict, text_templ_name: str, html_templ_name: str = None): try: msg = self.generate_msg(templ_data, text_templ_name, html_templ_name) self.send_msg(msg) except StatsNotifierTemplateError as e: self.log.warning(str(e)) def _has_fault(self, column_name, faults): return any(user[column_name] > 0 for user in faults) def _generate_table_data(self, users_data, notify_types_options): # type: (List[Dict], NotifyFaultsOptions) -> Tuple[Optional[str], Optional[str]] """Generate summary table with info about users""" table_columns = [('Username', 'LOGIN'), ('Domain', 'DOMAIN'), ('Name', 'FROMNAME')] additional_columns = itertools.compress( [ [('CPU Limit, %', 'lCPU'), ('CPU Faults', 'CPUf')], [('VMem Limit, KB', 'lVMem'), ('VMem Faults', 'VMemF')], [('PMem Limit, KB', 'lPMem'), ('PMem Faults', 'PMemF')], [('EP Limit', 'lEP'), ('EP Faults', 'EPf')], [('NPROC Limit', 'lNproc'), ('NPROC Faults', 'NprocF')], [('IO Limit, KB/s', 'lIO'), ('IO Faults', 'IOf')], ], [ notify_types_options.NOTIFY_CPU and self._has_fault('CPUf', users_data), notify_types_options.NOTIFY_MEMORY and self._has_fault('VMemF', users_data), # deprecated notify_types_options.NOTIFY_MEMORY and self._has_fault('PMemF', users_data), notify_types_options.NOTIFY_EP and self._has_fault('EPf', users_data), notify_types_options.NOTIFY_NPROC and self._has_fault('NprocF', users_data), notify_types_options.NOTIFY_IO and self._has_fault('IOf', users_data), ], ) table_columns.extend(sum(additional_columns, [])) if self.lve_version > 6: if notify_types_options.NOTIFY_IOPS and self._has_fault('IOPSf', users_data): table_columns.extend([('IOPS Limit', 'lIOPS'), ('IOPS Faults', 'IOPSf')]) columns = [header for header, _ in table_columns] if len(users_data) == 0: return None, None # Generates admin/reseller table from users_data and table_columns def gen_table_body(): table_b = [] for data in users_data: table_line = [] for _, user_data_key in table_columns: cell = str(data.get(user_data_key, '---')) table_line.append(cell) table_b.append(table_line) return table_b table = PrettyTable(columns) table.horizontal_char = '=' table.junction_char = "=" list(map(table.add_row, gen_table_body())) s_table = table.get_string() s_html_table = table.get_html_string(format=True, border=True, hrules=ALL, vrules=ALL) return s_table, s_html_table def _prepare_template_data(self, user_data): # type: (Dict) -> Dict """ Extend keys in new mode register ('aCPU' => 'acpu'), correct 'aVMem', 'aPMem', 'lVMem', 'lPMem' for backward compatibility with old templates """ templ_data_dict = merge_dicts(user_data, {k.lower(): v for k, v in user_data.items()}) # old templates used camelCase-like variables # and there, info about memory must be in bytes templ_data_dict.update( {key: var * 1024 for key, var in templ_data_dict.items() if key in ('aVMem', 'aPMem', 'lVMem', 'lPMem')} ) return templ_data_dict def _notify_users(self, users_data): """ Send email to each user in 'users_data', with given template path in 'templ_name' :type users_data: collections.Iterable[dict] :return: Nothing """ for templ_data in users_data: templ_data['user_type'] = 'customer' if not templ_data.get('TOMAIL'): logging.debug('User %s has not set email, skip notification') continue user_template_data = self._prepare_template_data(templ_data) self.send_notification( templ_data=user_template_data, text_templ_name=USER_TEMPL, html_templ_name=USER_TEMPL_HTML ) def _check_admin(self): if not (self._admin_settings.NOTIFY_ADMIN and any(self._admin_settings.NOTIFY_FAULTS_TYPES)): return if not self._notifications.admin_need_notification(self._admin_settings.NOTIFY_OPTIONS_SELF.NOTIFY_INTERVAL): return self._notifications.mark_admin_notified() period = self._admin_settings.NOTIFY_OPTIONS_SELF.NOTIFY_INTERVAL users_fault_data = self.get_users_fault( period=period, notify_min_faults=self._admin_settings.NOTIFY_OPTIONS_SELF.NOTIFY_MIN_FAULTS, notify_faults_types=self._admin_settings.NOTIFY_FAULTS_TYPES, uids=list(self._proc_lve.lve_id_list(0)), ) # TODO: should we show admin info about reseller's end users? users_data = self.get_users_data(fault_users_data=users_fault_data) if not users_data: return s_text_table, s_html_table = self._generate_table_data(users_data, self._admin_settings.NOTIFY_FAULTS_TYPES) # do nothing, if no table generated if not s_text_table: self.log.debug("During admin check, no users faults found. Skipping...") return admin_template_data = { 'STATS': s_text_table, 'STATS_HTML': s_html_table, 'FROMMAIL': self._admin_settings.NOTIFY_FROM_EMAIL, 'LOCALE': self.get_locales(get_admin_locale()), 'TOMAIL': self._detect_admin_email(), 'PERIOD': period, 'TONAME': 'Administrator', 'user_type': 'admin', 'HOSTNAME': self.hostname, } self.send_notification( templ_data=admin_template_data, text_templ_name=ADMIN_TEMPL, html_templ_name=ADMIN_TEMPL_HTML ) return True def get_panel_reseller_data(self, reseller_login): try: panel_user_data = cpapi.cpinfo(reseller_login, keyls=('mail', 'locale'), search_sys_users=False)[0] except ( IOError, IndexError, cpapi.cpapiexceptions.NotSupported, ): return {} return { 'TOMAIL': panel_user_data[0], 'LOCALE': self.get_locales(panel_user_data[1] or 'en'), 'RESELLER_USERNAME': reseller_login, } def prepare_resellers_data(self, users_data): resellers_data = {} for user_data in users_data: reseller_login = user_data.get('RESELLER') if not reseller_login or reseller_login not in self.resellers: continue if reseller_login not in resellers_data: reseller_data = self.get_panel_reseller_data(reseller_login) if not reseller_data.get('TOMAIL'): continue resellers_data[reseller_login] = merge_dicts( {'USERS': [user_data['LOGIN']], 'PERIOD': user_data['PERIOD']}, reseller_data, ) else: resellers_data[reseller_login]['USERS'].append(user_data['LOGIN']) result = [] # Cycle by resellers for reseller_login, data in resellers_data.items(): data['LOGIN'] = reseller_login reseller_settings = self.get_reseller_settings(reseller_login) or self._default_reseller_settings s_text_table, s_html_table = self._generate_table_data( [u for u in users_data if u['LOGIN'] in data['USERS']], reseller_settings.NOTIFY_FAULTS_TYPES ) if not s_text_table: continue data['STATS'] = s_text_table data['STATS_HTML'] = s_html_table data['HOSTNAME'] = self.hostname result.append(data) return result def _prepare_resellers_summary_data(self, resellers_data): """ With enabled second-level of resellers's limits we have information about reseller's faults; Here we prepare information about such resellers; :type resellers_data: collections.Iterable[dict] :return: dict """ for reseller_data in resellers_data: # TODO: looks like never happens reseller_login = reseller_data.get('LOGIN') if not reseller_login: continue panel_data = self.get_panel_reseller_data(reseller_login) if not panel_data.get('TOMAIL'): continue yield merge_dicts({'HOSTNAME': self.hostname}, reseller_data, panel_data) def _load_locales_data(self, templates_dir: str) -> Dict: """ Load locales data from specified directory (file /usr/share/lve/emails/<locale_name>/locale_defines.json) :param templates_dir: Directory name to search json file :return: Dictionary loaded from JSON file """ # Try to get locale defines locale_defines_file = os.path.join(templates_dir, self.LOCALE_DEFINES_FILE) try: with open(locale_defines_file, encoding='utf-8') as json_file: return json.load(json_file) except Exception as e: # Can't read /usr/share/lve/emails/<locale_name>/locale_defines.json file self.log.warning("Can't read/parse email localization file %s: %s", locale_defines_file, str(e)) return {} @staticmethod def add_default_data(data_list, key, val): for data in data_list: data[key] = data.get(key) or val yield data def get_admin_settings(self): """ Read admin settings config (using shared code in lve-utils) and map it into our namedtuple structure :rtype: AdminSettings """ raw_data = get_admin_notification() notification_data = raw_data['faultsNotification'] faults_to_include = notification_data['faultsToInclude'] min_to_notify = notification_data['minimumNumberOfFaultsToNotify'] periods = notification_data['notify'] try: notify_interval_admin = dateutil.time_dict_to_seconds(periods['admin']) notify_interval_user = dateutil.time_dict_to_seconds(periods['user']) except (ValueError, TypeError) as e: notify_interval_user = self.DEFAULT_PERIOD notify_interval_admin = self.DEFAULT_PERIOD self.log.warning("Can't set interval to admin. Period value is incorrect: %s", str(e)) email_settings = notification_data['email'] settings = AdminSettings( NOTIFY_ADMIN=notification_data['notifyAdmin'], NOTIFY_CUSTOMERS_ON_FAULTS=notification_data['notifyCustomers'], NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS=notification_data['notifyResellers'], NOTIFY_RESELLER_CUSTOMERS=notification_data['notifyResellerCustomers'], NOTIFY_FAULTS_TYPES=NotifyFaultsOptions( NOTIFY_CPU=faults_to_include['cpu'], NOTIFY_IO=faults_to_include['io'], NOTIFY_IOPS=faults_to_include['iops'], NOTIFY_MEMORY=faults_to_include['mem'], NOTIFY_EP=faults_to_include['concurrentConnections'], NOTIFY_NPROC=faults_to_include['nproc'], ), NOTIFY_OPTIONS_SELF=NotifySettings( NOTIFY_MIN_FAULTS=min_to_notify['admin'], NOTIFY_INTERVAL=notify_interval_admin ), NOTIFY_OPTIONS_CUSTOMER=NotifySettings( NOTIFY_MIN_FAULTS=min_to_notify['user'], NOTIFY_INTERVAL=notify_interval_user ), NOTIFY_FROM_EMAIL=email_settings.get('notifyFromEmail') or DEFAULT_ADMIN_MAIL, REPORT_ADMIN_EMAIL=email_settings.get('reportAdminMail') or '', NOTIFY_SUBJECT=email_settings.get('notifySubject') or '', NOTIFY_CHARSET_EMAIL=email_settings.get('notifyCharset') or 'us-ascii', ) return settings def get_default_reseller_settings(self): # type: () -> ResellerSettings return ResellerSettings( NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS=self._admin_settings.NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS, NOTIFY_CUSTOMERS_ON_FAULTS=self._admin_settings.NOTIFY_RESELLER_CUSTOMERS, NOTIFY_RESELLER_ON_TOTAL_FAULTS=self._admin_settings.NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS, NOTIFY_FAULTS_TYPES=self._admin_settings.NOTIFY_FAULTS_TYPES, NOTIFY_OPTIONS_SELF=self._admin_settings.NOTIFY_OPTIONS_SELF, NOTIFY_OPTIONS_CUSTOMER=self._admin_settings.NOTIFY_OPTIONS_CUSTOMER, ) def get_reseller_settings(self, reseller, use_cache=True): # type: (str, Optional[bool]) -> Optional[ResellerSettings] if use_cache and reseller in self._reseller_settings_cache: return self._reseller_settings_cache[reseller] raw_data = get_notification(reseller) if not raw_data or 'faultsNotification' not in raw_data: if use_cache: self._reseller_settings_cache[reseller] = None return notification_data = raw_data['faultsNotification'] faults_to_include = notification_data['faultsToInclude'] min_to_notify = notification_data['minimumNumberOfFaultsToNotify'] periods = notification_data['notify'] try: notify_interval_reseller = dateutil.time_dict_to_seconds(periods['reseller']) notify_interval_customer = dateutil.time_dict_to_seconds(periods['customer']) except (ValueError, TypeError) as e: notify_interval_customer = self.DEFAULT_PERIOD notify_interval_reseller = self.DEFAULT_PERIOD self.log.warning("Can't set interval to reseller %s. Period value is incorrect: %s", reseller, str(e)) res = ResellerSettings( # Admin's global setting analogue is NOTIFY_RESELLER NOTIFY_RESELLER_ON_CUSTOMERS_FAULTS=notification_data['notifyResellerOnCustomers'], # Admin's global setting analogue is NOTIFY_CUSTOMER NOTIFY_CUSTOMERS_ON_FAULTS=notification_data['notifyCustomers'], NOTIFY_RESELLER_ON_TOTAL_FAULTS=notification_data['notifyReseller'], NOTIFY_FAULTS_TYPES=NotifyFaultsOptions( NOTIFY_CPU=faults_to_include['cpu'], NOTIFY_IO=faults_to_include['io'], NOTIFY_IOPS=faults_to_include['iops'], NOTIFY_MEMORY=faults_to_include['mem'], NOTIFY_EP=faults_to_include['concurrentConnections'], NOTIFY_NPROC=faults_to_include['nproc'], ), NOTIFY_OPTIONS_SELF=NotifySettings( NOTIFY_MIN_FAULTS=min_to_notify['reseller'], NOTIFY_INTERVAL=notify_interval_reseller ), NOTIFY_OPTIONS_CUSTOMER=NotifySettings( NOTIFY_MIN_FAULTS=min_to_notify['customer'], NOTIFY_INTERVAL=notify_interval_customer ), ) if use_cache: self._reseller_settings_cache[reseller] = res return res def _process_resellers_mails(self, users_data_reseller): """ Decide and send mails to resellers as resellers (doesn't include notification to reseller about it's personal account as a customer) :return: Nothing """ resellers_data = self.prepare_resellers_data(users_data_reseller) if not resellers_data: return resellers_data = self.add_default_data(resellers_data, 'TONAME', 'Reseller') for reseller_info in resellers_data: reseller_info['user_type'] = 'reseller' if not reseller_info.get('TOMAIL'): continue self.send_notification( templ_data=reseller_info, text_templ_name=RESELLER_TEMPL, html_templ_name=RESELLER_TEMPL_HTML ) def _process_resellers_summary_info_mails(self, resellers_data): # type: (Iterable[Dict]) -> None """ When second level of reseller limits is enabled we collect summary info about reseller; When total load makes fault, we notify reseller about it; """ resellers_faults_data = self._prepare_resellers_summary_data(resellers_data) resellers_faults_data = self.add_default_data(resellers_faults_data, 'TONAME', 'Reseller') for reseller_info in resellers_faults_data: reseller_info['user_type'] = 'reseller' self._notify_reseller_summary_faults(reseller_info) def _notify_reseller_summary_faults(self, reseller_info): # type: (Dict) -> None """Prepare data for reseller_faults_notify template""" reseller_info = self._prepare_template_data(reseller_info) self.send_notification( templ_data=reseller_info, text_templ_name=RESELLER_FAULTS_TEMPL, html_templ_name=RESELLER_FAULTS_TEMPL_HTML ) def _process_users_mails(self, users_data): """ Decide and send mails to customers(including resellers personal account notification as a customer) :return: Nothing """ for user_info in users_data[:]: account_owner = user_info['RESELLER'] user_notification_enabled = user_should_be_notified(user_info['LOGIN']) # reseller's will override do_notify = self._admin_settings.NOTIFY_CUSTOMERS_ON_FAULTS and user_notification_enabled # is user belongs to some reseller? if account_owner in self.resellers: reseller_settings = self.get_reseller_settings(account_owner) or self._default_reseller_settings do_notify = reseller_settings.NOTIFY_CUSTOMERS_ON_FAULTS and user_notification_enabled if not do_notify: # Exclude only in users' data copy to prevent affecting # resellers notifications(summary tables) users_data.remove(user_info) continue self._notify_users(self.add_default_data(users_data, 'TONAME', 'Customer')) def _iter_customers_faults_data(self): # type: () -> Iterator[Dict] """ Find all customers that might cause faults from /proc/lve/list, get additional data from control panel and return it """ # iter over resellers and prepare data for (notify_settings, notify_faults_types), resellers in self._get_customers_options_grouped(): # mark all resellers as those, whose users were notified notified_resellers = [reseller_id for reseller_id, _ in resellers] self._notifications.mark_users_notified(notified_resellers) # list users that may cause faults during period users_to_check = sum( (list(self._proc_lve.lve_id_list(reseller_id)) for reseller_id in notified_resellers), [] ) yield self.__get_customers_faults_data(users_to_check, notify_faults_types, notify_settings) def __get_customers_faults_data(self, users_to_check, notify_faults_types, notify_settings): # type: (list[int], NotifyFaultsOptions, NotifySettings) -> list[dict] # search statistics in database for given users users_fault_data = self.get_users_fault( period=notify_settings.NOTIFY_INTERVAL, notify_min_faults=notify_settings.NOTIFY_MIN_FAULTS, notify_faults_types=notify_faults_types, uids=users_to_check, ) # Receive users info from cpapi # TODO rewrite with generators because modify or copy users_data[:] # both may lead to problems users_data = self.get_users_data(fault_users_data=users_fault_data) return users_data def _check_users(self): for users_data in self._iter_customers_faults_data(): self._process_users_mails(users_data) def _iter_resellers_faults_data(self): # type: () -> Tuple[List[Dict], List[int]] """ Find all resellers that might cause faults from /proc/lve/resellers, get additional data from control panel and return it """ # get all resellers that may be notified for (notify_settings, notify_faults_types), resellers in self._get_resellers_options_grouped(): resellers_uids = [reseller_id for reseller_id, _ in resellers] # in our database resellers have serialized id's resellers_to_check = [serialize_lve_id(LIMIT_LVP_ID, reseller_id) for reseller_id in resellers_uids] # get data from database for those resellers that might cause faults faults_data = self.get_users_fault( period=notify_settings.NOTIFY_INTERVAL, notify_min_faults=notify_settings.NOTIFY_MIN_FAULTS, notify_faults_types=notify_faults_types, uids=resellers_to_check, ) # receive resellers info from cpapi resellers_data = self._get_resellers_data(faults_data=faults_data) yield resellers_data, resellers_uids def _iter_reseller_users_list_faults_data(self): # type: () -> Tuple[List[Dict], List[int]] for (notify_settings, notify_faults_types), resellers in self._get_resellers_customers_faults_options_grouped(): # mark all resellers as those, whose users were notified notified_resellers = [reseller_id for reseller_id, _ in resellers] # list of users that may cause faults during period users_to_check = [] for reseller_id in notified_resellers: users_to_check.extend(self._proc_lve.lve_id_list(reseller_id)) customers_faults = self.__get_customers_faults_data(users_to_check, notify_faults_types, notify_settings) yield customers_faults, notified_resellers def _check_resellers(self): notified_resellers = set() for resellers_data, reseller_ids in self._iter_resellers_faults_data(): # send reseller his own total faults during PERIOD # works only with enabled second level of reseller limits self._process_resellers_summary_info_mails(resellers_data) notified_resellers.update(reseller_ids) for users_data, reseller_ids in self._iter_reseller_users_list_faults_data(): # send reseller table with his users faults during PERIOD # works with and without second level of reseller limits self._process_resellers_mails(users_data) notified_resellers.update(reseller_ids) self._notifications.mark_resellers_notified(notified_resellers) def execute(self, lve_data): # type: (Dict) -> None # Reset this cache each time because resellers should have ability to # change their notification preferences without restarting # StatsNotifier plugin self._reseller_settings_cache = {} self._name_map.link_xml_node(use_cache=False) self._check_users() # check if any users need to be notified self._check_resellers() # check if some resellers must be notified on their faults self._check_admin() # check if we must notify admin himself self._notifications.save_to_persistent_storage() # save timestamps to file def get_stats_notifier_parameters(user): """ Gets parameter for user """ default_reseller = 'root' reseller = cpinfo(user, keyls=('reseller',)) if reseller: default_reseller = reseller[0][0] stat_notifier = StatsNotifier() if default_reseller not in admins(): settings = ( stat_notifier.get_reseller_settings(default_reseller) or stat_notifier.get_default_reseller_settings() ) else: settings = stat_notifier.get_admin_settings() return settings.NOTIFY_CUSTOMERS_ON_FAULTS