diff --git a/src/saturn_engine/worker_manager/services/sync.py b/src/saturn_engine/worker_manager/services/sync.py index 7c5b8f96..1a52344a 100644 --- a/src/saturn_engine/worker_manager/services/sync.py +++ b/src/saturn_engine/worker_manager/services/sync.py @@ -26,21 +26,24 @@ def sync_jobs() -> None: job_definition_name=job_definition.name, ) - if ( - not last_job - or croniter( + if last_job: + # If a job already exists, check it has completed and + # the interval has elapsed to start a new one. + if not last_job.completed_at: + continue + + scheduled_at = croniter( job_definition.minimal_interval, last_job.started_at, ).get_next(ret_type=datetime) - < utcnow() - ): - job_name: str = f"{job_definition.name}-{int(time.time())}" - queue = queues_store.create_queue( - session=session, name=job_name - ) - jobs_store.create_job( - name=job_name, - session=session, - queue_name=queue.name, - job_definition_name=job_definition.name, - ) + if scheduled_at > utcnow(): + continue + + job_name: str = f"{job_definition.name}-{int(time.time())}" + queue = queues_store.create_queue(session=session, name=job_name) + jobs_store.create_job( + name=job_name, + session=session, + queue_name=queue.name, + job_definition_name=job_definition.name, + ) diff --git a/tests/worker_manager/api/test_jobs.py b/tests/worker_manager/api/test_jobs.py index a2bcca95..cd4d645d 100644 --- a/tests/worker_manager/api/test_jobs.py +++ b/tests/worker_manager/api/test_jobs.py @@ -240,6 +240,7 @@ def test_jobs_sync( session=session, queue_name=queue.name, job_definition_name="running", + started_at=utcnow() - timedelta(days=8), ) jobs_store.create_job( name="due", @@ -266,7 +267,7 @@ def test_jobs_sync( "items": [ { "completed_at": None, - "started_at": "2018-01-02T00:00:00+00:00", + "started_at": "2017-12-25T00:00:00+00:00", "cursor": None, "name": "running", }, @@ -298,7 +299,7 @@ def test_jobs_sync( "completed_at": None, "cursor": None, "name": "running", - "started_at": "2018-01-02T00:00:00+00:00", + "started_at": "2017-12-25T00:00:00+00:00", }, { # The old due job, untouched