diff --git a/shub_workflow/contrib/__init__.py b/shub_workflow/contrib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shub_workflow/contrib/sentry.py b/shub_workflow/contrib/sentry.py new file mode 100644 index 0000000..549c14d --- /dev/null +++ b/shub_workflow/contrib/sentry.py @@ -0,0 +1,54 @@ +import logging + +from typing import List + +from spidermon.contrib.actions.sentry import SendSentryMessage + +from shub_workflow.script import BaseScriptProtocol +from shub_workflow.utils import resolve_shub_jobkey +from shub_workflow.utils.monitor import BaseMonitorProtocol + + +LOG = logging.getLogger(__name__) + + +class SentryMixin(BaseScriptProtocol): + """ + A class for adding sentry alert capabilities to a shub_workflow class. + """ + + def __init__(self): + self.sentry_handler = SendSentryMessage( + fake=self.project_settings.get("SPIDERMON_SENTRY_FAKE"), + sentry_dsn=self.project_settings.get("SPIDERMON_SENTRY_DSN"), + sentry_log_level=self.project_settings.get("SPIDERMON_SENTRY_LOG_LEVEL"), + project_name=self.project_settings.get("SPIDERMON_SENTRY_PROJECT_NAME"), + environment=self.project_settings.get("SPIDERMON_SENTRY_ENVIRONMENT_TYPE"), + ) + self.messages: List[str] = [] + + def send_sentry_message(self): + if self.messages: + message = dict() + title = f"{self.sentry_handler.project_name} | {self.sentry_handler.environment} | Monitor notification" + message["title"] = title + message["failure_reasons"] = "/n".join(self.messages) + job_key = resolve_shub_jobkey() + if job_key: + message["job_link"] = f"https://app.zyte.com/p/{job_key}" + if self.sentry_handler.fake: + LOG.info(message) + else: + self.sentry_handler.send_message(message) + + def append_message(self, message: str): + self.messages.append(message) + + +class MonitorSentryMixin(SentryMixin, BaseMonitorProtocol): + """ + Mixin for adding sentry capabilities to shub_workflow monitors. + """ + def close(self): + super().close() # type: ignore + self.send_sentry_message() diff --git a/shub_workflow/utils/__init__.py b/shub_workflow/utils/__init__.py index 1c72899..71b797e 100644 --- a/shub_workflow/utils/__init__.py +++ b/shub_workflow/utils/__init__.py @@ -19,6 +19,10 @@ def hashstr(text: str) -> str: return u.hexdigest() +def resolve_shub_jobkey() -> Optional[str]: + return os.environ.get("SHUB_JOBKEY") + + def resolve_project_id(project_id=None) -> Optional[int]: """ Gets project id from following sources in following order of precedence: @@ -37,8 +41,9 @@ def resolve_project_id(project_id=None) -> Optional[int]: return int(os.environ["PROJECT_ID"]) # for ScrapyCloud jobs: - if os.environ.get("SHUB_JOBKEY"): - return int(os.environ["SHUB_JOBKEY"].split("/")[0]) + jobkey = resolve_shub_jobkey() + if jobkey: + return int(jobkey.split("/")[0]) # read from scrapinghub.yml try: diff --git a/shub_workflow/utils/monitor.py b/shub_workflow/utils/monitor.py index 160eb8e..cc465dc 100644 --- a/shub_workflow/utils/monitor.py +++ b/shub_workflow/utils/monitor.py @@ -1,15 +1,16 @@ import re +import abc import time import logging import inspect -from typing import Dict, Type, Tuple, Optional +from typing import Dict, Type, Tuple, Optional, Protocol from datetime import timedelta, datetime from collections import defaultdict import dateparser from scrapy import Spider -from shub_workflow.script import BaseScript, SpiderName, JobDict +from shub_workflow.script import BaseScript, BaseScriptProtocol, SpiderName, JobDict LOG = logging.getLogger(__name__) @@ -21,7 +22,14 @@ def _get_number(txt: str) -> Optional[int]: return None -class BaseMonitor(BaseScript): +class BaseMonitorProtocol(BaseScriptProtocol, Protocol): + + @abc.abstractmethod + def close(self): + ... + + +class BaseMonitor(BaseScript, BaseMonitorProtocol): # a map from spiders classes to check, to a stats prefix to identify the aggregated stats. target_spider_classes: Dict[Type[Spider], str] = {Spider: ""} @@ -108,6 +116,7 @@ def run(self): self.run_stats_hooks(start_limit, end_limit) self.upload_stats() self.print_stats() + self.close() def run_stats_hooks(self, start_limit, end_limit): for stat, val in self.stats.get_stats().items(): @@ -261,3 +270,6 @@ def check_script_logs(self, start_limit, end_limit): self.stats.inc_value(stat, val) if stat_suffix: self.stats.inc_value(stat + f"/{stat_suffix}", val) + + def close(self): + pass