Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(crons): Record historic check-in volume counts #79448

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
# This key is used to store the hashmap of Mapping[PartitionKey, Timestamp]
MONITOR_TASKS_PARTITION_CLOCKS = "sentry.monitors.partition_clocks"

# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"

# We record 30 days worth of historical data for each minute of check-ins.
MONITOR_VOLUME_RETENTION = timedelta(days=30)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we keep keys every minute, this will be 43k keys. Could be a good reason to have our own cluster.


CLOCK_TICK_CODEC: Codec[ClockTick] = get_topic_codec(Topic.MONITORS_CLOCK_TICK)


Expand Down Expand Up @@ -70,6 +76,33 @@ def _dispatch_tick(ts: datetime):
_clock_tick_producer.produce(ArroyoTopic(topic), payload)


def _make_reference_ts(ts: datetime):
"""
Produce a timestamp number with the seconds and microsecond removed
"""
return int(ts.replace(second=0, microsecond=0).timestamp())


def update_check_in_volume(ts: datetime):
"""
Increment a counter for this particular timestamp trimmed down to the
minute.

This counter will be used as historical data to help incidate if we may
have had some data-loss (due to an incident) and would want to tick our
clock in a mode where misses and time-outs are created as "unknown".
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is using rc-default-cache in US, I wonder if we should try and migrate to our own cluster so that we're more isolated


reference_ts = _make_reference_ts(ts)
key = MONITOR_VOLUME_HISTORY.format(reference_ts)

pipeline = redis_client.pipeline()
pipeline.incr(key, amount=1)
pipeline.expire(key, MONITOR_VOLUME_RETENTION)
Comment on lines +101 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just probabilistically set the expire 10% of the time to nearly halve the number of writes. Probably no need if we do the batching option though

pipeline.execute()


def try_monitor_clock_tick(ts: datetime, partition: int):
"""
Handles triggering the monitor tasks when we've rolled over the minute.
Expand All @@ -84,8 +117,7 @@ def try_monitor_clock_tick(ts: datetime, partition: int):

# Trim the timestamp seconds off, these tasks are run once per minute and
# should have their timestamp clamped to the minute.
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())
reference_ts = _make_reference_ts(ts)

# Store the current clock value for this partition.
redis_client.zadd(
Expand Down
6 changes: 5 additions & 1 deletion src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from sentry.db.postgres.transactions import in_test_hide_transaction_boundary
from sentry.killswitches import killswitch_matches_context
from sentry.models.project import Project
from sentry.monitors.clock_dispatch import try_monitor_clock_tick
from sentry.monitors.clock_dispatch import try_monitor_clock_tick, update_check_in_volume
from sentry.monitors.constants import PermitCheckInStatus
from sentry.monitors.logic.mark_failed import mark_failed
from sentry.monitors.logic.mark_ok import mark_ok
Expand Down Expand Up @@ -937,6 +937,7 @@ def process_checkin_group(items: list[CheckinItem]):
completely serially.
"""
for item in items:
update_check_in_volume(item.ts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're doing this here, I think it'd likely be better to just pass all the timestamps to the function in bulk. The single process method can just pass a list of one item.

Actually, it'd be even better to just do this in process_batch. Just store all the timestamps after we decode the messages, and then call bulk_update_check_in_volume with all of them

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah good call

process_checkin(item)


Expand Down Expand Up @@ -1009,6 +1010,9 @@ def process_single(message: Message[KafkaPayload | FilteredPayload]):
ts = message.value.timestamp
partition = message.value.partition.index

if wrapper["message_type"] != "clock_pulse":
update_check_in_volume(ts)

try:
try_monitor_clock_tick(ts, partition)
except Exception:
Expand Down
27 changes: 27 additions & 0 deletions tests/sentry/monitors/consumers/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,13 @@ def test_organization_killswitch(self):

assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()

@mock.patch("sentry.monitors.consumers.monitor_consumer.update_check_in_volume")
def test_monitor_update_check_in_volumne(self, update_check_in_volume):
monitor = self._create_monitor(slug="my-monitor")

self.send_checkin(monitor.slug)
assert update_check_in_volume.call_count == 1

@mock.patch("sentry.monitors.consumers.monitor_consumer.try_monitor_clock_tick")
def test_monitor_tasks_trigger(self, try_monitor_clock_tick):
monitor = self._create_monitor(slug="my-monitor")
Expand All @@ -1038,6 +1045,26 @@ def test_monitor_tasks_trigger(self, try_monitor_clock_tick):
logger.exception.assert_called_with("Failed to trigger monitor tasks")
try_monitor_clock_tick.side_effect = None

@mock.patch("sentry.monitors.consumers.monitor_consumer.update_check_in_volume")
def test_parallel_monitor_update_check_in_volume(self, update_check_in_volume):
factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4)
commit = mock.Mock()
consumer = factory.create_with_partitions(commit, {self.partition: 0})

monitor = self._create_monitor(slug="my-monitor")

# Send enough check-ins to process the batch
self.send_checkin(monitor.slug, consumer=consumer)
self.send_checkin(monitor.slug, consumer=consumer)
self.send_checkin(monitor.slug, consumer=consumer)
self.send_checkin(monitor.slug, consumer=consumer)

# One final check-in will trigger the batch to process (but will not
# yet be processed itself)
self.send_checkin(monitor.slug, consumer=consumer)

assert update_check_in_volume.call_count == 4

@mock.patch("sentry.monitors.consumers.monitor_consumer.try_monitor_clock_tick")
def test_parallel_monitor_task_triggers(self, try_monitor_clock_tick):
factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4)
Expand Down
Loading