관리-도구
편집 파일: malware_response.py
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from collections import defaultdict from logging import getLogger from typing import List from defence360agent.contracts.config import Malware as Config from defence360agent.contracts.license import LicenseError from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from imav.malwarelib.model import MalwareHit from imav.malwarelib.utils import malware_response from imav.malwarelib.utils.malware_response import ( ClientError, FileTooLargeError, ) from defence360agent.utils import get_results_iterable_expression logger = getLogger(__name__) class MalwareResponsePlugin(MessageSink, MessageSource): async def create_source(self, loop, sink): self._loop = loop self._sink = sink async def create_sink(self, loop): self._loop = loop @expect(MessageType.MalwareResponse) async def process_message_malware_response(self, message): hashes = self._collect_hashes(message.files) unknown_hashes = hashes[MessageType.MalwareResponse.UNKNOWN.lower()] if unknown_hashes: await self._process_unknown_hash(unknown_hashes) async def _process_unknown_hash(self, hashes: set): """ If there is no such file with given hash in database, Imunify360 Client should upload file using API endpoint /api/v1/upload. """ if not Config.SEND_FILES: logger.info("Sending unknown files to MRS is disabled.") return orig_files = self._get_orig_files(hashes) for orig_file in orig_files: logger.info("Unknown file %s. Uploading to MRS.", orig_file) try: await malware_response.upload_with_retries(orig_file) except LicenseError as e: logger.warning( "Cannot upload the following files to MRS %r: %s", orig_files, e, ) break except ClientError as e: logger.error("File %s uploading failed. %s.", orig_file, e) except FileNotFoundError: logger.warning( "File %s not found, skipping upload.", orig_file ) except FileTooLargeError as e: logger.warning("File %s uploading failed. %s.", orig_file, e) @staticmethod def _collect_hashes(files): type_to_hash = defaultdict(set) for file_hash, hash_type in files.items(): type_to_hash[hash_type.lower()].add(file_hash) return type_to_hash @staticmethod def _get_orig_files(hashes: set) -> List[str]: result = [] def expression(hashes): return MalwareHit.select().where(MalwareHit.hash.in_(hashes)) for hit in get_results_iterable_expression(expression, hashes): try: result.append(hit.orig_file) except FileNotFoundError: logger.warning("File %s not found, skipping.", hit.orig_file) return result