관리-도구
편집 파일: dbengine.py
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import pathlib import sqlalchemy.engine from sqlalchemy import delete, func from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.url import make_url from sqlalchemy.ext.declarative.clsregistry import _ModuleMarker from sqlalchemy.orm import RelationshipProperty, sessionmaker from clcommon import mysql_lib from clcommon.clexception import FormattedException from lvestats.orm import Base, history from lvestats.orm import LVE_STATS_2_TABLENAME_PREFIX from lvestats.utils.dbmigrator import alembic_migrate DEFAULT_DB_PATH = '/var/lve/lvestats2.db' MYSQL, POSTGRESQL, SQLITE = 'mysql', 'postgresql', 'sqlite' SUPPORTED_DATABASES = [MYSQL, POSTGRESQL, SQLITE] DB_CLIENT_LIBRARY_MAP = { MYSQL: 'pymysql', POSTGRESQL: 'psycopg2', } __author__ = 'shaman' def find_not_existing_tables_or_views(engine, names): """ Checks if views exist. :param sqlalchemy.engine.base.Engine engine: :return: """ inspector = Inspector.from_engine(engine) existing = set(inspector.get_view_names()) not_existing = {LVE_STATS_2_TABLENAME_PREFIX + n for n in names} - existing return tuple(x.replace(LVE_STATS_2_TABLENAME_PREFIX, '') for x in tuple(not_existing)) def get_list_of_tables_without_views(): """ Used to call engine.create_all() without views that are described as Table() :return: """ tables = dict(Base.metadata.tables).values() tables_for_disabled_features = [] burstable_limits_feature_flag = '/opt/cloudlinux/flags/enabled-flags.d/burstable-limits.flag' if not pathlib.Path(burstable_limits_feature_flag).exists(): tables_for_disabled_features.append(LVE_STATS_2_TABLENAME_PREFIX + 'bursting_events') return [t for t in tables if t.name not in tables_for_disabled_features] class MakeDbException(FormattedException): pass class DatabaseConfigError(FormattedException): pass def cfg_get_uri(cfg, check_support=True): """ Extract and validate database connection uri from config :param dict cfg: :param bool check_support: :return str: """ # check present "db_type" db_type = cfg.get('db_type') if db_type is None: raise DatabaseConfigError( {'message': '"%(config_variable)s" not in config', 'context': {'config_variable': 'db_type'}} ) # check supported databases if check_support and db_type not in SUPPORTED_DATABASES: raise DatabaseConfigError( { 'message': 'Incorrect database type %(database_type)s. You should only use %(database_types)s. ' 'The correct db_type for mariadb is "mysql"', 'context': {'database_type': db_type, 'database_types': '", "'.join(SUPPORTED_DATABASES)}, } ) # check present "connect_string" connect_string = cfg.get('connect_string') if db_type != SQLITE and connect_string is None: raise DatabaseConfigError( {'message': '"%(config_variable)s" not in config', 'context': {'config_variable': 'connect_string'}} ) # generate database uri if db_type == SQLITE: uri = f'{db_type}:///{cfg.get("sqlite_db_path", DEFAULT_DB_PATH)}' else: client_library = get_db_client_library_name(db_type) uri = f'{db_type}+{client_library}://{connect_string}' uri = get_fixed_uri(db_type, uri) return uri def make_db_engine(cfg, just_print=False, debug=False, pool_class=None, check_support=True): """ Create sqlalchemy database engine :param dict cfg: :param bool just_print: :param bool debug: :param pool_class: :param bool check_support: :return: """ log = logging.getLogger('init_db_engine') try: connect_string = cfg_get_uri(cfg, check_support=check_support) except DatabaseConfigError as e: msg = 'Unable to configure database connection. %(error)s' context = {'error': str(e)} log.error(msg, context) raise MakeDbException({'message': msg, 'context': context}) from e log.info('Connecting to database: %s', cfg['db_type']) if debug is False and 'debug' in cfg: debug = cfg['debug'] if just_print: def dump(sql, *multiparams, **params): print(sql) engine = sqlalchemy.engine.create_engine(connect_string, strategy='mock', executor=dump) else: engine = sqlalchemy.engine.create_engine(connect_string, echo=debug, poolclass=pool_class) return engine def validate_database(engine, hide_logging=False, base=Base): log = logging.getLogger('validate_database') if hide_logging: log.propagate = False log.addHandler(logging.NullHandler()) result = {"table_error": False, "column_error": False, "missing_tables": [], "orm_tables": []} database_inspection = Inspector.from_engine(engine) real_tables = database_inspection.get_table_names() for _, _class in list(base._decl_class_registry.items()): # pylint: disable=protected-access if isinstance(_class, _ModuleMarker): continue table = _class.__tablename__ result['orm_tables'].append(table) if table in real_tables: real_columns = [c["name"] for c in database_inspection.get_columns(table)] mapper = sqlalchemy.inspect(_class) for column_prop in mapper.attrs: if isinstance(column_prop, RelationshipProperty): # We have no relationships pass else: for column in column_prop.columns: if column.key not in real_columns: log.critical( "Model %s declares column %s which does not exist in database %s", _class, column.key, engine, ) result["column_error"] = True else: log.critical("Model %s declares table %s which does not exist in database %s", _class, table, engine) result["table_error"] = True result["missing_tables"].append(table) if result["table_error"]: log.critical("You can run 'lve-create-db --create-missing-tables' command to recreate missing tables.") if result["column_error"]: log.critical("You can try to recreate malformed table, or ask CloudLinux support for help.") return result def check_need_create_db(engine): inspect_result = validate_database(engine, hide_logging=True) return inspect_result['missing_tables'] == inspect_result['orm_tables'] def create_db(engine): Base.metadata.create_all(engine, tables=get_list_of_tables_without_views()) # write revision version to database only after create all tables alembic_migrate(engine, stamp=True) def setup_db(engine, create_missing_tables=False, cfg=None): if create_missing_tables or check_need_create_db(engine): create_db(engine) else: alembic_migrate(engine, lve_stats_cfg=cfg) def drop_tables(engine): Base.metadata.drop_all(engine) def clear_rows(engine, server_id=None): conn_ = engine.connect() trans = conn_.begin() # delete all rows in one transaction try: for _, table_object in list(Base.metadata.tables.items()): if server_id is None: delete_query = delete(table_object) elif isinstance(server_id, (list, tuple)): delete_query = delete(table_object, table_object.c.server_id in server_id) else: delete_query = delete(table_object, table_object.c.server_id == server_id) conn_.execute(delete_query) trans.commit() except Exception: trans.rollback() raise def recreate_db(engine): drop_tables(engine) create_db(engine) def find_lost_keep_alive(session, server_id, from_timestmp=None, to_timestamp=None): # query for find timestmps with lost keepalive records # SELECT created FROM lve_stats2_history WHERE server_id = 'server_id' # GROUP BY created HAVING max(id) > 0 AND min(id) > 0 lost_keepalive = ( session.query(history.created) .filter(history.server_id == server_id) .group_by(history.created) .having(func.max(history.id) > 0) .having(func.min(history.id) > 0) ) if to_timestamp is not None: lost_keepalive = lost_keepalive.filter(history.created <= to_timestamp) if from_timestmp is not None: lost_keepalive = lost_keepalive.filter(history.created >= from_timestmp) return [record_[0] for record_ in lost_keepalive] def fix_lost_keep_alive(session, server_id, from_timestmp=None, to_timestamp=None, log_=None): lost_keep_alive = find_lost_keep_alive(session, server_id, from_timestmp, to_timestamp) if not lost_keep_alive: return # add losted keep alive records v2_keepalive_row = {col: 0 for col in list(history.__table__.columns.keys())} v2_keepalive_row[history.server_id.key] = server_id for timestamp in lost_keep_alive: history_keepalive = history(**v2_keepalive_row) history_keepalive.created = timestamp # setup timestamp session.add(history_keepalive) session.commit() if log_: log_.info('Was fixed %s losted keep alive records: %s', len(lost_keep_alive), lost_keep_alive) def fix_db(engine, cfg, from_timestmp=None, to_timestamp=None, log_=None): # autocommit=True using for clear cache in mariadb between queries session = sessionmaker(bind=engine)() fix_lost_keep_alive(session, cfg['server_id'], from_timestmp=from_timestmp, to_timestamp=to_timestamp, log_=log_) session.close() def get_db_client_library_name(db_type): """ Get database Python client library name :param str db_type: :return str: """ client_library = DB_CLIENT_LIBRARY_MAP.get(db_type) if client_library is None: raise DatabaseConfigError( { 'message': 'The client library to connect to the "%(database_type)s" ' 'database cannot be detected', 'context': { 'database_type': db_type, }, } ) return client_library def get_fixed_uri(db_type, uri): """ Get fixed URI for the specified database :param str db_type: :param str uri: :return str: """ if db_type != MYSQL: return uri u = make_url(uri) return str(mysql_lib.get_rfc1738_db_uri(u))