diff --git a/Project.toml b/Project.toml index 34476ef..2743470 100644 --- a/Project.toml +++ b/Project.toml @@ -2,7 +2,7 @@ name = "AWSBatch" uuid = "dcae83d4-2881-5875-9d49-e5534165e9c0" license = "MIT" authors = ["Invenia Technical Computing"] -version = "2.0.1" +version = "2.1.0" [deps] AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc" diff --git a/src/AWSBatch.jl b/src/AWSBatch.jl index 08aac51..b0c490c 100644 --- a/src/AWSBatch.jl +++ b/src/AWSBatch.jl @@ -12,16 +12,16 @@ using Mocking export BatchJob, ComputeEnvironment, BatchEnvironmentError, BatchJobError export JobQueue, JobDefinition, JobState, LogEvent -export run_batch, describe, status, status_reason, wait, log_events, isregistered, register, deregister +export run_batch, + describe, status, status_reason, wait, log_events, isregistered, register, deregister export list_job_queues, list_job_definitions, create_compute_environment, create_job_queue - +export terminate_jobs const logger = getlogger(@__MODULE__) # Register the module level logger at runtime so that folks can access the logger via `getlogger(MyModule)` # NOTE: If this line is not included then the precompiled `MyModule.logger` won't be registered at runtime. __init__() = Memento.register(logger) - include("exceptions.jl") include("log_event.jl") include("compute_environment.jl") @@ -29,7 +29,7 @@ include("job_queue.jl") include("job_state.jl") include("job_definition.jl") include("batch_job.jl") - +include("utilities.jl") """ run_batch(; @@ -63,14 +63,14 @@ function run_batch(; name::AbstractString="", queue::AbstractString="", region::AbstractString="", - definition::Union{AbstractString, JobDefinition, Nothing}=nothing, + definition::Union{AbstractString,JobDefinition,Nothing}=nothing, image::AbstractString="", vcpus::Integer=1, memory::Integer=-1, role::AbstractString="", cmd::Cmd=``, num_jobs::Integer=1, - parameters::Dict{String, String}=Dict{String, String}(), + parameters::Dict{String,String}=Dict{String,String}(), allow_job_registration::Bool=true, aws_config::AbstractAWSConfig=global_aws_config(), ) @@ -134,13 +134,15 @@ function run_batch(; # Error if required parameters were not explicitly set and cannot be inferred if isempty(name) || isempty(queue) || memory < 0 - throw(BatchEnvironmentError( - "Unable to perform AWS Batch introspection when not running within " * - "an AWS Batch job. Current job parameters are: " * - "\nname=$name" * - "\nqueue=$queue" * - "\nmemory=$memory" - )) + throw( + BatchEnvironmentError( + "Unable to perform AWS Batch introspection when not running within " * + "an AWS Batch job. Current job parameters are: " * + "\nname=$name" * + "\nqueue=$queue" * + "\nmemory=$memory", + ), + ) end # Reuse a previously registered job definition if available. @@ -151,7 +153,7 @@ function run_batch(; definition = JobDefinition(reusable_job_definition_arn) end elseif definition === nothing - # Use the job name as the definiton name since the definition name was not specified + # Use the job name as the definiton name since the definition name was not specified definition = name end @@ -170,26 +172,26 @@ function run_batch(; aws_config=aws_config, ) else - throw(BatchEnvironmentError(string( - "Attempting to register job definition \"$definition\" but registering ", - "job definitions is disallowed. Current job definition parameters are: ", - "\nimage=$image", - "\nrole=$role", - "\nvcpus=$vcpus", - "\nmemory=$memory", - "\ncmd=$cmd", - "\nparameters=$parameters", - ))) + throw( + BatchEnvironmentError( + string( + "Attempting to register job definition \"$definition\" but registering ", + "job definitions is disallowed. Current job definition parameters are: ", + "\nimage=$image", + "\nrole=$role", + "\nvcpus=$vcpus", + "\nmemory=$memory", + "\ncmd=$cmd", + "\nparameters=$parameters", + ), + ), + ) end end # Parameters that can be overridden are `memory`, `vcpus`, `command`, and `environment` # See https://docs.aws.amazon.com/batch/latest/APIReference/API_ContainerOverrides.html - container_overrides = Dict( - "vcpus" => vcpus, - "memory" => memory, - "command" => cmd.exec, - ) + container_overrides = Dict("vcpus" => vcpus, "memory" => memory, "command" => cmd.exec) return submit( name, diff --git a/src/utilities.jl b/src/utilities.jl new file mode 100644 index 0000000..b3c49d1 --- /dev/null +++ b/src/utilities.jl @@ -0,0 +1,59 @@ +_incomplete_job(job) = !(job["status"] in ["SUCCEEDED", "FAILED"]) +_suffix_asterisk(prefix) = endswith(prefix, "*") ? prefix : string(prefix, "*") + +function _get_job_ids(job_queue, prefix) + # Check if the prefix provided ends w/ '*", if not append it + prefix = _suffix_asterisk(prefix) + + resp = @mock Batch.list_jobs( + Dict( + "jobQueue" => job_queue, + "filters" => [Dict("name" => "JOB_NAME", "values" => [prefix])], + ), + ) + jobs = resp["jobSummaryList"] + + while haskey(resp, "nextToken") + resp = @mock Batch.list_jobs( + Dict("jobQueue" => job_queue, "nextToken" => resp["nextToken"]) + ) + append!(jobs, resp["jobSummaryList"]) + end + + filter!(j -> _incomplete_job(j), jobs) + + return [j["jobId"] for j in jobs] +end + +""" + terminate_jobs() + +Terminate all Batch jobs with a given prefix. + +# Arguments +- `job_queue::JobQueue`: JobQueue where the jobs reside +- `prefix::AbstractString`: Prefix for the jobs + +# Keywords +- `reason::AbstractString=""`: Reason to terminate the jobs + +# Return +- `Array{String}`: Terminated Job Ids +""" +function terminate_jobs( + job_queue::AbstractString, prefix::AbstractString; reason::AbstractString="" +) + job_ids = _get_job_ids(job_queue, prefix) + + for job_id in job_ids + @mock Batch.terminate_job(job_id, reason) + end + + return job_ids +end + +function terminate_jobs( + job_queue::JobQueue, prefix::AbstractString; reason::AbstractString="" +) + return terminate_jobs(job_queue.arn, prefix; reason=reason) +end diff --git a/test/runtests.jl b/test/runtests.jl index 56097c9..8215496 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,5 @@ using AWS +using AWS.AWSExceptions: AWSException using AWSTools.CloudFormation: stack_output using AWSBatch using Dates @@ -8,8 +9,6 @@ using Memento.TestUtils: @test_log, @test_nolog using Mocking using Test -using AWS.AWSExceptions: AWSException - Mocking.activate() # Controls the running of various tests: "local", "batch" @@ -81,6 +80,7 @@ include("mock.jl") include("job_state.jl") include("batch_job.jl") include("run_batch.jl") + include("utilities.jl") end else warn(logger, "Skipping \"local\" tests. Set `ENV[\"TESTS\"] = \"local\"` to run.") diff --git a/test/utilities.jl b/test/utilities.jl new file mode 100644 index 0000000..29ad3c8 --- /dev/null +++ b/test/utilities.jl @@ -0,0 +1,43 @@ +list_jobs_patch = @patch function AWSBatch.Batch.list_jobs(params) + response = Dict{String,Any}( + "jobSummaryList" => [ + Dict("jobId" => "1", "status" => "SUCCEEDED"), + Dict("jobId" => "2", "status" => "FAILED"), + Dict("jobId" => "3", "status" => "RUNNABLE"), + ], + ) + + if !haskey(params, "nextToken") + response["nextToken"] = "nextToken" + end + + return response +end + +terminate_job_patch = @patch AWSBatch.Batch.terminate_job(a...) = Dict() + +@testset "_incomplete_job" begin + @test_throws KeyError AWSBatch._incomplete_job(Dict()) + @test !AWSBatch._incomplete_job(Dict("status" => "SUCCEEDED")) + @test !AWSBatch._incomplete_job(Dict("status" => "FAILED")) + @test AWSBatch._incomplete_job(Dict("status" => "FOOBAR")) +end + +@testset "_suffix_asterisk" begin + @test AWSBatch._suffix_asterisk("foobar") == "foobar*" + @test AWSBatch._suffix_asterisk("foobar*") == "foobar*" +end + +@testset "_get_job_ids" begin + apply(list_jobs_patch) do + response = AWSBatch._get_job_ids("foo", "bar") + @test response == ["3", "3"] + end +end + +@testset "terminate_jobs" begin + apply([list_jobs_patch, terminate_job_patch]) do + response = terminate_jobs("foo", "bar") + @test response == ["3", "3"] + end +end