관리-도구
편집 파일: backup.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import logging import time from defence360agent.contracts.config import BackupConfig from defence360agent.contracts.license import LicenseCLN from defence360agent.internals.cln import CLNError from defence360agent.rpc_tools import ValidationError from defence360agent.rpc_tools.lookup import CommonEndpoints, bind, wraps from defence360agent.subsys.backup_systems import ( Acronis, BackupException, CloudLinux, CloudLinuxBase, CloudLinuxOnPremise, R1Soft, get_available_backends_names, get_backend, ) from defence360agent.utils import Scope, Singleton logger = logging.getLogger(__name__) CLN_RESPONSE_ERROR = """ Error with one of the next reasons: * Not found linked backup for the server * Not found linked cloudlinux user for the server * IP of server is different in last time """ PLAIN_AUTH_FIELDS = { "username": "Username should be provided", "password": "Password should be provided", } REQUIRED_FIELDS = { Acronis: PLAIN_AUTH_FIELDS, R1Soft: { "encryption_key": "Encryption key should be provided", "ip": "IP should be provided", **PLAIN_AUTH_FIELDS, }, CloudLinuxOnPremise: PLAIN_AUTH_FIELDS, } def validate_backend_args(f): @wraps(f) async def wrapper(*args, **kwargs): required_options = REQUIRED_FIELDS.get(kwargs["backend"], {}) errors = [ error_msg for field, error_msg in required_options.items() if not kwargs.get(field) ] if errors: raise ValidationError("\n".join(errors)) return await f(*args, **kwargs) return wrapper class BackupEndpoints(CommonEndpoints, metaclass=Singleton): SCOPE = Scope.IM360 NOT_RUNNING, INIT, BACKUP, DONE = "not_running", "init", "backup", "done" def __init__(self, sink): super().__init__(sink) self._status = self.NOT_RUNNING self._current_backend = self._error = None self._init_task = self._backup_task = None self._backup_started_time = 0 @property def _backup_pending(self): return time.time() - self._backup_started_time < 60 * 5 @validate_backend_args @bind("backup-systems", "init") async def init(self, backend, **kwargs): if LicenseCLN.is_demo(): raise ValidationError("This action is not allowed in demo version") loop = asyncio.get_event_loop() if self._current_backend and not self._error: if self._status == self.INIT: raise ValidationError( "Backup initialization is already in progress" ) if self._status == self.BACKUP: raise ValidationError("Backup process is already in progress") logger.info("Starting init task") # flush error from previous sessions self._error = None self._task = loop.create_task(backend.init(**kwargs)) self._task.add_done_callback(self._init_task_done) self._status = self.INIT self._current_backend = backend return "Backup initialization process is in progress" async def _set_current_backend(self, conf): if not self._current_backend and conf["backup_system"]: self._current_backend = get_backend(conf["backup_system"]) if self._status == self.NOT_RUNNING: # if backup exists, assuming all done if await self._current_backend.check_state(): self._status = self.DONE else: self._error = "No backups found!" async def _include_init_stages_info(self, status): if status["state"] != self.INIT: advanced_data = await self._current_backend.show() status.update(advanced_data) if ( isinstance(self._current_backend, CloudLinuxBase) and self._status == self.BACKUP ): if self._error is not None: if await self._current_backend.check_state(): # error should already be fixed, # because there is some backups self._error = None status["state"] = self._status = self.DONE else: progress = await self._current_backend.get_backup_progress() if progress is None: if self._backup_pending: status["progress"] = 0 else: # if there is no progress after a while # let's admit backup is completed status["state"] = self._status = self.DONE else: status["progress"] = progress return status async def _get_advanced_status(self, status): await self._set_current_backend(status) status["state"] = self._status status["error"] = self._error status["log_path"] = getattr(self._current_backend, "log_path", None) status["backup_system"] = getattr(self._current_backend, "name", None) if self._error is not None and self._status == self.INIT: self._error = self._current_backend = None self._status = self.NOT_RUNNING if self._current_backend: try: status = await self._include_init_stages_info(status) except asyncio.CancelledError: raise except Exception as e: # Ignoring "No backup for host" errors for 5 minutes if "No backup for host" in str(e) and self._backup_pending: logger.info( "Error %s will be ignored for 5 minutes " "after init state has been finished", e, ) # want to show human friendly error if backup is unpaid elif self._current_backend == CloudLinux: resp = await self._current_backend.check() if resp.get("status") == CloudLinux.UNPAID: status["error"] = ( "Backup is unpaid! Please, check " "it out in your CLN account." ) else: raise e return status @bind("backup-systems", "extended-status") async def extended_status(self): status = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {}) status = await self._get_advanced_status(status) return {"items": status} @bind("backup-systems", "status") async def status(self, user=None): status = BackupConfig().config_to_dict().get("BACKUP_SYSTEM", {}) return {"items": status} @bind("backup-systems", "list") async def list(self): return get_available_backends_names() @bind("backup-systems", "disable") async def disable(self, backend, **kwargs): self._current_backend = self._error = None self._status = self.NOT_RUNNING await backend.disable(**kwargs) @bind("backup-systems", "check") async def check(self, backend): try: return {"items": await backend.check()} except CLNError: raise ValidationError(CLN_RESPONSE_ERROR) except (LookupError, RuntimeError) as e: raise ValidationError(str(e)) from e def _init_task_done(self, future): logger.info("In init done callback") e = future.exception() if e is not None: if isinstance(e, CLNError): self._error = CLN_RESPONSE_ERROR else: logger.exception("Backup init task failed", exc_info=e) self._error = str(e) else: logger.info("Starting initial backup task") self._status = self.BACKUP self._backup_started_time = time.time() self._backup_task = asyncio.get_event_loop().create_task( self._current_backend.make_backup() ) self._backup_task.add_done_callback(self._backup_task_done) def _backup_task_done(self, future): logger.info("In backup done callback") e = future.exception() if e is not None: logger.exception("Initial backup task failed", exc_info=e) if isinstance(e, BackupException): self._error = str(e) else: self._error = "Unknown error: " + str(e) else: self._status = self.DONE