From f5759f0913c1178488a99feafc447c22833f6a7f Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Mon, 21 Oct 2024 15:55:41 -0400 Subject: [PATCH] feat(crons): Record historic check-in volume counts Part of GH-79328 --- src/sentry/monitors/clock_dispatch.py | 36 +++++++++++++++++-- .../monitors/consumers/monitor_consumer.py | 6 +++- .../consumers/test_monitor_consumer.py | 27 ++++++++++++++ 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/sentry/monitors/clock_dispatch.py b/src/sentry/monitors/clock_dispatch.py index ae63161ccc43aa..c3dd39f7c67ccc 100644 --- a/src/sentry/monitors/clock_dispatch.py +++ b/src/sentry/monitors/clock_dispatch.py @@ -22,6 +22,12 @@ # This key is used to store the hashmap of Mapping[PartitionKey, Timestamp] MONITOR_TASKS_PARTITION_CLOCKS = "sentry.monitors.partition_clocks" +# This key is used to record historical date about the volume of check-ins. +MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}" + +# We record 30 days worth of historical data for each minute of check-ins. +MONITOR_VOLUME_RETENTION = timedelta(days=30) + CLOCK_TICK_CODEC: Codec[ClockTick] = get_topic_codec(Topic.MONITORS_CLOCK_TICK) @@ -70,6 +76,33 @@ def _dispatch_tick(ts: datetime): _clock_tick_producer.produce(ArroyoTopic(topic), payload) +def _make_reference_ts(ts: datetime): + """ + Produce a timestamp number with the seconds and microsecond removed + """ + return int(ts.replace(second=0, microsecond=0).timestamp()) + + +def update_check_in_volume(ts: datetime): + """ + Increment a counter for this particular timestamp trimmed down to the + minute. + + This counter will be used as historical data to help incidate if we may + have had some data-loss (due to an incident) and would want to tick our + clock in a mode where misses and time-outs are created as "unknown". + """ + redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) + + reference_ts = _make_reference_ts(ts) + key = MONITOR_VOLUME_HISTORY.format(reference_ts) + + pipeline = redis_client.pipeline() + pipeline.incr(key, amount=1) + pipeline.expire(key, MONITOR_VOLUME_RETENTION) + pipeline.execute() + + def try_monitor_clock_tick(ts: datetime, partition: int): """ Handles triggering the monitor tasks when we've rolled over the minute. @@ -84,8 +117,7 @@ def try_monitor_clock_tick(ts: datetime, partition: int): # Trim the timestamp seconds off, these tasks are run once per minute and # should have their timestamp clamped to the minute. - reference_datetime = ts.replace(second=0, microsecond=0) - reference_ts = int(reference_datetime.timestamp()) + reference_ts = _make_reference_ts(ts) # Store the current clock value for this partition. redis_client.zadd( diff --git a/src/sentry/monitors/consumers/monitor_consumer.py b/src/sentry/monitors/consumers/monitor_consumer.py index f2a7e618ae7533..75753e07c064cd 100644 --- a/src/sentry/monitors/consumers/monitor_consumer.py +++ b/src/sentry/monitors/consumers/monitor_consumer.py @@ -28,7 +28,7 @@ from sentry.db.postgres.transactions import in_test_hide_transaction_boundary from sentry.killswitches import killswitch_matches_context from sentry.models.project import Project -from sentry.monitors.clock_dispatch import try_monitor_clock_tick +from sentry.monitors.clock_dispatch import try_monitor_clock_tick, update_check_in_volume from sentry.monitors.constants import PermitCheckInStatus from sentry.monitors.logic.mark_failed import mark_failed from sentry.monitors.logic.mark_ok import mark_ok @@ -937,6 +937,7 @@ def process_checkin_group(items: list[CheckinItem]): completely serially. """ for item in items: + update_check_in_volume(item.ts) process_checkin(item) @@ -1009,6 +1010,9 @@ def process_single(message: Message[KafkaPayload | FilteredPayload]): ts = message.value.timestamp partition = message.value.partition.index + if wrapper["message_type"] != "clock_pulse": + update_check_in_volume(ts) + try: try_monitor_clock_tick(ts, partition) except Exception: diff --git a/tests/sentry/monitors/consumers/test_monitor_consumer.py b/tests/sentry/monitors/consumers/test_monitor_consumer.py index 1226553508294e..98ef34106e6f12 100644 --- a/tests/sentry/monitors/consumers/test_monitor_consumer.py +++ b/tests/sentry/monitors/consumers/test_monitor_consumer.py @@ -1016,6 +1016,13 @@ def test_organization_killswitch(self): assert not MonitorCheckIn.objects.filter(guid=self.guid).exists() + @mock.patch("sentry.monitors.consumers.monitor_consumer.update_check_in_volume") + def test_monitor_update_check_in_volumne(self, update_check_in_volume): + monitor = self._create_monitor(slug="my-monitor") + + self.send_checkin(monitor.slug) + assert update_check_in_volume.call_count == 1 + @mock.patch("sentry.monitors.consumers.monitor_consumer.try_monitor_clock_tick") def test_monitor_tasks_trigger(self, try_monitor_clock_tick): monitor = self._create_monitor(slug="my-monitor") @@ -1038,6 +1045,26 @@ def test_monitor_tasks_trigger(self, try_monitor_clock_tick): logger.exception.assert_called_with("Failed to trigger monitor tasks") try_monitor_clock_tick.side_effect = None + @mock.patch("sentry.monitors.consumers.monitor_consumer.update_check_in_volume") + def test_parallel_monitor_update_check_in_volume(self, update_check_in_volume): + factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4) + commit = mock.Mock() + consumer = factory.create_with_partitions(commit, {self.partition: 0}) + + monitor = self._create_monitor(slug="my-monitor") + + # Send enough check-ins to process the batch + self.send_checkin(monitor.slug, consumer=consumer) + self.send_checkin(monitor.slug, consumer=consumer) + self.send_checkin(monitor.slug, consumer=consumer) + self.send_checkin(monitor.slug, consumer=consumer) + + # One final check-in will trigger the batch to process (but will not + # yet be processed itself) + self.send_checkin(monitor.slug, consumer=consumer) + + assert update_check_in_volume.call_count == 4 + @mock.patch("sentry.monitors.consumers.monitor_consumer.try_monitor_clock_tick") def test_parallel_monitor_task_triggers(self, try_monitor_clock_tick): factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4)