Skip to content

Commit

Permalink
its alive
Browse files Browse the repository at this point in the history
  • Loading branch information
jakevc committed Jan 28, 2025
1 parent a17d9b6 commit b914564
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 8 deletions.
11 changes: 6 additions & 5 deletions snakemake_executor_plugin_aws_batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __post_init__(self):
# snakemake/snakemake:latest container image
self.container_image = self.workflow.remote_execution_settings.container_image

# set the rate limit for status checks
self.next_seconds_between_status_checks = 5

# access executor specific settings
self.settings = self.workflow.executor_settings
self.logger.debug(f"ExecutorSettings: {pformat(self.settings, indent=2)}")
Expand Down Expand Up @@ -151,7 +154,6 @@ def run_job(self, job: JobExecutorInterface):
except Exception as e:
raise WorkflowError(e)

self.logger.debug(f"Job info: {pformat(job_info, indent=2)}")
self.report_job_submission(
SubmittedJobInfo(
job=job, external_jobid=job_info["jobId"], aux=dict(job_info)
Expand Down Expand Up @@ -185,11 +187,12 @@ async def check_active_jobs(
self.logger.debug(f"Monitoring {len(active_jobs)} active Batch jobs")
for job in active_jobs:
async with self.status_rate_limiter:
status_code = self._get_job_status(job)
status_code, msg = self._get_job_status(job)
if status_code == 0:
self.report_job_success(job)
elif status_code is not None:
self.report_job_error(job)
message = f"AWS Batch job failed. Code: {status_code}, Msg: {msg}."
self.report_job_error(job, msg=message)
else:
yield job

Expand All @@ -210,12 +213,10 @@ def _get_job_status(self, job: SubmittedJobInfo) -> tuple[int, Optional[str]]:
exit_code = None
log_stream_name = None
job_desc = self._describer.describe(self.batch_client, job.external_jobid, 1)
self.logger.debug(f"JOB DESCRIPTION: {job_desc}")
job_status = job_desc["status"]

# set log stream name if not none
log_details = {"status": job_status, "jobId": job.external_jobid}
self.logger.debug(f"LOG DETAILS: {log_details}")

if "container" in job_desc and "logStreamName" in job_desc["container"]:
log_stream_name = job_desc["container"]["logStreamName"]
Expand Down
18 changes: 18 additions & 0 deletions snakemake_executor_plugin_aws_batch/batch_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,21 @@ def describe_jobs(self, **kwargs):
:return: The response from the describe_jobs method.
"""
return self.client.describe_jobs(**kwargs)

def deregister_job_definition(self, **kwargs):
"""
Deregister a job definition in AWS Batch.
:param kwargs: The keyword arguments passed to deregister_job_definition method.
:return: The response from the deregister_job_definition method.
"""
return self.client.deregister_job_definition(**kwargs)

def terminate_job(self, **kwargs):
"""
Terminate a job in AWS Batch.
:param kwargs: The keyword arguments to pass to the terminate_job method.
:return: The response from the terminate_job method.
"""
return self.client.terminate_job(**kwargs)
4 changes: 2 additions & 2 deletions snakemake_executor_plugin_aws_batch/batch_job_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def __init__(
self.batch_client = batch_client
self.created_job_defs = []

def _make_container_command(remote_command: str) -> List[str]:
def _make_container_command(self, remote_command: str) -> List[str]:
"""
Return docker CMD form of the command
"""
return [shlex.quote(part) for part in shlex.split(str)]
return [shlex.quote(part) for part in shlex.split(remote_command)]

def build_job_definition(self):
job_uuid = str(uuid.uuid4())
Expand Down
2 changes: 1 addition & 1 deletion terraform/vars.tf
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ variable "instance_types" {
variable "max_vcpus" {
description = "The maximum number of vCPUs for the compute environment"
type = number
default = 2
default = 16
}

variable "min_vcpus" {
Expand Down

0 comments on commit b914564

Please sign in to comment.