관리-도구
편집 파일: report.py
#!/opt/cloudlinux/venv/bin/python3 import re from dataclasses import dataclass, asdict from sqlalchemy import func from sqlalchemy.sql.expression import literal_column from wmt.common import cfg from wmt.db import ScrapeResult, session_scope @dataclass class SummaryReport: count_all: int count_successful: int count_failed: int count_undone: int average_time: float def to_template(self, *args): return [self.count_all, self.count_successful, self.count_failed, self.count_undone, # emails need time in ms int(self.average_time / 10**3)] @dataclass class ErrorReport: code: str count_errors: int url: str def to_template(self, alternative): url = url_to_domain(self.url) if alternative == 'html': url = f'<a href="{self.url}">{url}</a>' return [url, self.count_errors, self.code] @dataclass class DurationReport: url: str average_time: float def to_template(self, alternative): url = url_to_domain(self.url) if alternative == 'html': url = f'<a href="{self.url}">{url}</a>' return [url, # emails need time in ms int(self.average_time / 10**3)] def url_to_domain(url): pattern = r'http(s)?://' return re.sub(pattern, '', url) def generate_report(engine, start_date, end_date): with session_scope(engine) as session: # gets counter per status code per website -> group key: website: status_code pair # e.g (test.com 404 3), (test.com 500 2) subquery = session.query(ScrapeResult.response_code, ScrapeResult.website, func.count().label('err_count')).\ filter(ScrapeResult.create_date >= start_date, ScrapeResult.create_date <= end_date, ScrapeResult.response_code != 200, ScrapeResult.is_finished == True)\ .group_by(ScrapeResult.response_code, ScrapeResult.website)\ .subquery() # group previous subquery by website # code count website # [('451,500', 3, 'http://www.flightradar24.com'), # ('404', 2, 'http://broken.com')] error_stats = session.query(func.group_concat(subquery.c.response_code), func.sum(subquery.c.err_count), subquery.c.website)\ .group_by(subquery.c.website)\ .all() # website avg ms count # [('http://www.stackoverflow.com', 538.0816599732262, 2241), # ('http://www.suser.com', 66.53859883980365, 2241)] success_stats = session.query(ScrapeResult.website, func.avg(ScrapeResult.response_time_ms).label('average_time'), func.count())\ .filter(ScrapeResult.create_date >= start_date, ScrapeResult.create_date <= end_date, ScrapeResult.response_code == 200)\ .group_by(ScrapeResult.website)\ .order_by(literal_column('average_time').desc()) \ .all() count_unsuccessful = session.query(ScrapeResult)\ .filter(ScrapeResult.create_date >= start_date, ScrapeResult.create_date <= end_date, ScrapeResult.is_finished == False)\ .count() success_stats = [(url, average_time, count) for url, average_time, count in success_stats if not cfg.is_domain_ignored(url)] error_stats = [(code, count, url) for code, count, url in error_stats if not cfg.is_domain_ignored(url)] error_report = [ErrorReport(code=code, count_errors=count_errors, url=url) for code, count_errors, url in error_stats] duration_report = [DurationReport(url=url, average_time=int(round(average_time * 1000))) for url, average_time, _ in success_stats] successful_requests_count = sum(success_count for url, _, success_count in success_stats) error_requests_count = sum(errors_count for _, errors_count, url in error_stats) averages = [item[1] for item in success_stats] average_count = 0 if not averages else int(round(1000 * sum(averages) / len(averages))) summary_report = SummaryReport(count_all=successful_requests_count + error_requests_count + count_unsuccessful, count_successful=successful_requests_count, count_failed=error_requests_count, count_undone=count_unsuccessful, average_time=average_count) return { 'summary_report': summary_report, 'error_report': error_report, 'duration_report': duration_report } def report_dict(report): return { 'summary_report': asdict(report['summary_report']), 'error_report': [asdict(item) for item in report['error_report']], 'duration_report': [asdict(item) for item in report['duration_report']] }