Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate Jobs w/ Prefix #23

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name = "AWSBatch"
uuid = "dcae83d4-2881-5875-9d49-e5534165e9c0"
license = "MIT"
authors = ["Invenia Technical Computing"]
version = "2.0.1"
version = "2.1.0"

[deps]
AWS = "fbe9abb3-538b-5e4e-ba9e-bc94f4f92ebc"
Expand Down
60 changes: 31 additions & 29 deletions src/AWSBatch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ using Mocking

export BatchJob, ComputeEnvironment, BatchEnvironmentError, BatchJobError
export JobQueue, JobDefinition, JobState, LogEvent
export run_batch, describe, status, status_reason, wait, log_events, isregistered, register, deregister
export run_batch,
describe, status, status_reason, wait, log_events, isregistered, register, deregister
export list_job_queues, list_job_definitions, create_compute_environment, create_job_queue

export terminate_jobs

const logger = getlogger(@__MODULE__)
# Register the module level logger at runtime so that folks can access the logger via `getlogger(MyModule)`
# NOTE: If this line is not included then the precompiled `MyModule.logger` won't be registered at runtime.
__init__() = Memento.register(logger)


include("exceptions.jl")
include("log_event.jl")
include("compute_environment.jl")
include("job_queue.jl")
include("job_state.jl")
include("job_definition.jl")
include("batch_job.jl")

include("utilities.jl")

"""
run_batch(;
Expand Down Expand Up @@ -63,14 +63,14 @@ function run_batch(;
name::AbstractString="",
queue::AbstractString="",
region::AbstractString="",
definition::Union{AbstractString, JobDefinition, Nothing}=nothing,
definition::Union{AbstractString,JobDefinition,Nothing}=nothing,
image::AbstractString="",
vcpus::Integer=1,
memory::Integer=-1,
role::AbstractString="",
cmd::Cmd=``,
num_jobs::Integer=1,
parameters::Dict{String, String}=Dict{String, String}(),
parameters::Dict{String,String}=Dict{String,String}(),
allow_job_registration::Bool=true,
aws_config::AbstractAWSConfig=global_aws_config(),
)
Expand Down Expand Up @@ -134,13 +134,15 @@ function run_batch(;

# Error if required parameters were not explicitly set and cannot be inferred
if isempty(name) || isempty(queue) || memory < 0
throw(BatchEnvironmentError(
"Unable to perform AWS Batch introspection when not running within " *
"an AWS Batch job. Current job parameters are: " *
"\nname=$name" *
"\nqueue=$queue" *
"\nmemory=$memory"
))
throw(
BatchEnvironmentError(
"Unable to perform AWS Batch introspection when not running within " *
"an AWS Batch job. Current job parameters are: " *
"\nname=$name" *
"\nqueue=$queue" *
"\nmemory=$memory",
),
)
end

# Reuse a previously registered job definition if available.
Expand All @@ -151,7 +153,7 @@ function run_batch(;
definition = JobDefinition(reusable_job_definition_arn)
end
elseif definition === nothing
# Use the job name as the definiton name since the definition name was not specified
# Use the job name as the definiton name since the definition name was not specified
definition = name
end

Expand All @@ -170,26 +172,26 @@ function run_batch(;
aws_config=aws_config,
)
else
throw(BatchEnvironmentError(string(
"Attempting to register job definition \"$definition\" but registering ",
"job definitions is disallowed. Current job definition parameters are: ",
"\nimage=$image",
"\nrole=$role",
"\nvcpus=$vcpus",
"\nmemory=$memory",
"\ncmd=$cmd",
"\nparameters=$parameters",
)))
throw(
BatchEnvironmentError(
string(
"Attempting to register job definition \"$definition\" but registering ",
"job definitions is disallowed. Current job definition parameters are: ",
"\nimage=$image",
"\nrole=$role",
"\nvcpus=$vcpus",
"\nmemory=$memory",
"\ncmd=$cmd",
"\nparameters=$parameters",
),
),
)
end
end

# Parameters that can be overridden are `memory`, `vcpus`, `command`, and `environment`
# See https://docs.aws.amazon.com/batch/latest/APIReference/API_ContainerOverrides.html
container_overrides = Dict(
"vcpus" => vcpus,
"memory" => memory,
"command" => cmd.exec,
)
container_overrides = Dict("vcpus" => vcpus, "memory" => memory, "command" => cmd.exec)

return submit(
name,
Expand Down
59 changes: 59 additions & 0 deletions src/utilities.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
_incomplete_job(job) = !(job["status"] in ["SUCCEEDED", "FAILED"])
_suffix_asterisk(prefix) = endswith(prefix, "*") ? prefix : string(prefix, "*")

function _get_job_ids(job_queue, prefix)
# Check if the prefix provided ends w/ '*", if not append it
prefix = _suffix_asterisk(prefix)

resp = @mock Batch.list_jobs(
Dict(
"jobQueue" => job_queue,
"filters" => [Dict("name" => "JOB_NAME", "values" => [prefix])],
),
)
jobs = resp["jobSummaryList"]

while haskey(resp, "nextToken")
resp = @mock Batch.list_jobs(
Dict("jobQueue" => job_queue, "nextToken" => resp["nextToken"])
)
append!(jobs, resp["jobSummaryList"])
end

filter!(j -> _incomplete_job(j), jobs)
mattBrzezinski marked this conversation as resolved.
Show resolved Hide resolved

return [j["jobId"] for j in jobs]
end

"""
terminate_jobs()

Terminate all Batch jobs with a given prefix.

# Arguments
- `job_queue::JobQueue`: JobQueue where the jobs reside
- `prefix::AbstractString`: Prefix for the jobs

# Keywords
- `reason::AbstractString=""`: Reason to terminate the jobs

# Return
- `Array{String}`: Terminated Job Ids
"""
function terminate_jobs(
job_queue::AbstractString, prefix::AbstractString; reason::AbstractString=""
)
job_ids = _get_job_ids(job_queue, prefix)

for job_id in job_ids
@mock Batch.terminate_job(job_id, reason)
end

return job_ids
end

function terminate_jobs(
job_queue::JobQueue, prefix::AbstractString; reason::AbstractString=""
)
return terminate_jobs(job_queue.arn, prefix; reason=reason)
end
4 changes: 2 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using AWS
using AWS.AWSExceptions: AWSException
using AWSTools.CloudFormation: stack_output
using AWSBatch
using Dates
Expand All @@ -8,8 +9,6 @@ using Memento.TestUtils: @test_log, @test_nolog
using Mocking
using Test

using AWS.AWSExceptions: AWSException

Mocking.activate()

# Controls the running of various tests: "local", "batch"
Expand Down Expand Up @@ -81,6 +80,7 @@ include("mock.jl")
include("job_state.jl")
include("batch_job.jl")
include("run_batch.jl")
include("utilities.jl")
end
else
warn(logger, "Skipping \"local\" tests. Set `ENV[\"TESTS\"] = \"local\"` to run.")
Expand Down
43 changes: 43 additions & 0 deletions test/utilities.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
list_jobs_patch = @patch function AWSBatch.Batch.list_jobs(params)
response = Dict{String,Any}(
"jobSummaryList" => [
Dict("jobId" => "1", "status" => "SUCCEEDED"),
Dict("jobId" => "2", "status" => "FAILED"),
Dict("jobId" => "3", "status" => "RUNNABLE"),
],
)

if !haskey(params, "nextToken")
response["nextToken"] = "nextToken"
end

return response
end

terminate_job_patch = @patch AWSBatch.Batch.terminate_job(a...) = Dict()

@testset "_incomplete_job" begin
@test_throws KeyError AWSBatch._incomplete_job(Dict())
@test !AWSBatch._incomplete_job(Dict("status" => "SUCCEEDED"))
@test !AWSBatch._incomplete_job(Dict("status" => "FAILED"))
@test AWSBatch._incomplete_job(Dict("status" => "FOOBAR"))
end

@testset "_suffix_asterisk" begin
@test AWSBatch._suffix_asterisk("foobar") == "foobar*"
@test AWSBatch._suffix_asterisk("foobar*") == "foobar*"
end

@testset "_get_job_ids" begin
apply(list_jobs_patch) do
response = AWSBatch._get_job_ids("foo", "bar")
@test response == ["3", "3"]
end
end

@testset "terminate_jobs" begin
apply([list_jobs_patch, terminate_job_patch]) do
response = terminate_jobs("foo", "bar")
@test response == ["3", "3"]
end
end