Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job-runner): async job functionality #6427

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

Conversation

xurui-c
Copy link
Member

@xurui-c xurui-c commented Oct 16, 2024

https://getsentry.atlassian.net/jira/software/c/projects/SNS/boards/147?selectedIssue=SNS-2871

At times we will want to execute long-running ClickHouse jobs that don’t fit neatly into the migrations system; either they only run in a single environment, or we don’t want to block later migrations on them finishing. This PR supports users to create job types that can run asynchronously.

@xurui-c xurui-c force-pushed the rachel/async2 branch 6 times, most recently from 159cde0 to 61142e8 Compare October 17, 2024 18:37
statuses = _get_job_status_multi(
[_build_job_status_key(job_id) for job_id in job_ids]
)
for i in range(len(job_ids)):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this for loop defeats the purpose of mget used in _get_job_status_multi, so I'm also not opposed to write _get_is_job_async_multi and _get_job_types_multi

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the implementations of _get_is_job_async_multi and _get_job_types_multi look like?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We got rid of [get_is_job_async](https://github.com/getsentry/snuba/pull/6427#discussion_r1809149842), so I won't be implementing _get_is_job_async_multi. The implementation of _get_job_types_multi will just mirror _get_job_types_multi and _get_job_status_multi

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ will revisit this in subsequent PR

snuba/manual_jobs/runner.py Outdated Show resolved Hide resolved
@getsentry getsentry deleted a comment from sentry-io bot Oct 17, 2024
@xurui-c xurui-c marked this pull request as ready for review October 21, 2024 18:05
@xurui-c xurui-c requested a review from a team as a code owner October 21, 2024 18:05
@xurui-c xurui-c requested a review from onewland October 21, 2024 18:05
@xurui-c xurui-c requested a review from onkar October 21, 2024 22:01
Copy link
Contributor

@onewland onewland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the overall logic is fine but we need a test that triggers the transition of async job from ASYNC_RUNNING_BACKGROUND to FINISHED.

Copy link

codecov bot commented Oct 21, 2024

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
903 1 902 3
View the top 1 failed tests by shortest run time
tests.datasets.test_errors_replacer.TestReplacer test_process_offset_twice
Stack Traces | 0.245s run time
Traceback (most recent call last):
  File ".../tests/datasets/test_errors_replacer.py", line 370, in test_process_offset_twice
    assert self.replacer.process_message(message) is None
AssertionError: assert (ReplacementMessageMetadata(partition_index=1, offset=42, consumer_group='consumer_group'), UnmergeGroupsReplacement(state_name=<ReplacerState.ERRORS: 'errors'>, timestamp=datetime.datetime(2024, 10, 22, 0, 19, 58, 304099), hashes=['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], all_columns=[FlattenedColumn(None, 'project_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'event_id', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'platform', schemas.String(modifiers=None)), FlattenedColumn(None, 'environment', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'release', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'dist', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v4', schemas.IPv4(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v6', schemas.IPv6(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user', schemas.String(modifiers=None)), FlattenedColumn(None, 'user_id', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_email', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_method', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_referer', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn('tags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('tags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'transaction_name', schemas.String(modifiers=None)), FlattenedColumn(None, 'span_id', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'trace_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'partition', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'offset', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'message_timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'retention_days', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'deleted', schemas.UInt(8, modifiers=None)), FlattenedColumn(None, 'group_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'primary_hash', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'hierarchical_hashes', schemas.Array(schemas.UUID(modifiers=None), modifiers=None)), FlattenedColumn(None, 'received', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'message', schemas.String(modifiers=None)), FlattenedColumn(None, 'title', schemas.String(modifiers=None)), FlattenedColumn(None, 'culprit', schemas.String(modifiers=None)), FlattenedColumn(None, 'level', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'location', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'type', schemas.String(modifiers=None)), FlattenedColumn('exception_stacks', 'type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'value', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_handled', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'abs_path', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'colno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'filename', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'function', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'lineno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'in_app', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'package', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'module', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'stack_level', schemas.Array(schemas.UInt(16, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn(None, 'exception_main_thread', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_integrations', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'name', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'version', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'trace_sampled', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'num_processing_errors', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'replay_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False)))], project_id=1, previous_group_id=1, new_group_id=2)) is None
 +  where (ReplacementMessageMetadata(partition_index=1, offset=42, consumer_group='consumer_group'), UnmergeGroupsReplacement(state_name=<ReplacerState.ERRORS: 'errors'>, timestamp=datetime.datetime(2024, 10, 22, 0, 19, 58, 304099), hashes=['aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'], all_columns=[FlattenedColumn(None, 'project_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'event_id', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'platform', schemas.String(modifiers=None)), FlattenedColumn(None, 'environment', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'release', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'dist', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v4', schemas.IPv4(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'ip_address_v6', schemas.IPv6(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user', schemas.String(modifiers=None)), FlattenedColumn(None, 'user_id', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'user_email', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_name', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_method', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'http_referer', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn('tags', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('tags', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'key', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('contexts', 'value', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'transaction_name', schemas.String(modifiers=None)), FlattenedColumn(None, 'span_id', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'trace_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'partition', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'offset', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'message_timestamp', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'retention_days', schemas.UInt(16, modifiers=None)), FlattenedColumn(None, 'deleted', schemas.UInt(8, modifiers=None)), FlattenedColumn(None, 'group_id', schemas.UInt(64, modifiers=None)), FlattenedColumn(None, 'primary_hash', schemas.UUID(modifiers=None)), FlattenedColumn(None, 'hierarchical_hashes', schemas.Array(schemas.UUID(modifiers=None), modifiers=None)), FlattenedColumn(None, 'received', schemas.DateTime(modifiers=None)), FlattenedColumn(None, 'message', schemas.String(modifiers=None)), FlattenedColumn(None, 'title', schemas.String(modifiers=None)), FlattenedColumn(None, 'culprit', schemas.String(modifiers=None)), FlattenedColumn(None, 'level', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'location', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'version', schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'type', schemas.String(modifiers=None)), FlattenedColumn('exception_stacks', 'type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'value', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_type', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_stacks', 'mechanism_handled', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'abs_path', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'colno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'filename', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'function', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'lineno', schemas.Array(schemas.UInt(32, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'in_app', schemas.Array(schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'package', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'module', schemas.Array(schemas.String(modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn('exception_frames', 'stack_level', schemas.Array(schemas.UInt(16, modifiers=SchemaModifiers(nullable=True, readonly=False)), modifiers=None)), FlattenedColumn(None, 'exception_main_thread', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'sdk_integrations', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'name', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn('modules', 'version', schemas.Array(schemas.String(modifiers=None), modifiers=None)), FlattenedColumn(None, 'trace_sampled', schemas.UInt(8, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'num_processing_errors', schemas.UInt(64, modifiers=SchemaModifiers(nullable=True, readonly=False))), FlattenedColumn(None, 'replay_id', schemas.UUID(modifiers=SchemaModifiers(nullable=True, readonly=False)))], project_id=1, previous_group_id=1, new_group_id=2)) = <bound method ReplacerWorker.process_message of <snuba.replacer.ReplacerWorker object at 0x7f5f135e4210>>(Message({Partition(topic=Topic(name='replacements'), index=1): 43}))
 +    where <bound method ReplacerWorker.process_message of <snuba.replacer.ReplacerWorker object at 0x7f5f135e4210>> = <snuba.replacer.ReplacerWorker object at 0x7f5f135e4210>.process_message
 +      where <snuba.replacer.ReplacerWorker object at 0x7f5f135e4210> = <tests.datasets.test_errors_replacer.TestReplacer object at 0x7f5f30b55a50>.replacer

To view individual test run time comparison to the main branch, go to the Test Analytics Dashboard

Comment on lines 33 to 38
def test_job_status_changes_from_async_running_background_to_finished() -> None:
assert get_job_status(JOB_ID) == JobStatus.NOT_STARTED
run_job(async_job_spec)
with patch.object(AsyncJob, "async_finished_check", return_value=True):
assert get_job_status(JOB_ID) == JobStatus.FINISHED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to be annoying, but could we add a third assertion here to verify this job passes through ASYNC_RUNNING_BACKGROUND before reaching FINISHED?

Copy link
Contributor

@onewland onewland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine if the test faillure is just a flake

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants