Skip to content

Commit

Permalink
Merge pull request #1 from psrenergy/pr/job-requests
Browse files Browse the repository at this point in the history
Send several jobs at once
  • Loading branch information
pedroripper authored Mar 26, 2024
2 parents 5473272 + cb23526 commit fdaefc5
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 86 deletions.
61 changes: 41 additions & 20 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,48 @@
name: test
name: CI
on:
push:
branches: [master, develop]
branches: [master]
pull_request:
types: [opened, synchronize, reopened]
concurrency:
# Skip intermediate builds: always.
# Cancel intermediate builds: only if it is a pull request build.
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }}
jobs:
test-windows:
name: Test Windows
runs-on: self-hosted
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
include:
- version: '1'
os: windows-latest
arch: x64
- version: '1.6'
os: windows-latest
arch: x64
- version: '1'
os: ubuntu-latest
arch: x64
- version: '1.6'
os: ubuntu-latest
arch: x64
steps:
- name: Initialize instance
run: |
Remove-Item -Path '${{ github.workspace }}\*' -Force -Recurse
- uses: actions/checkout@v3
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
with:
fetch-depth: 0

- name: Test
run: |
.\test\test.bat
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: actions/cache@v1
env:
cache-name: cache-artifacts
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
restore-keys: |
${{ runner.os }}-test-${{ env.cache-name }}-
${{ runner.os }}-test-
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
file: lcov.info
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
# JobQueueMPI.jl
# JobQueueMPI.jl

[build-img]: https://github.com/psrenergy/JobQueueMPI.jl/actions/workflows/test.yml/badge.svg?branch=master
[build-url]: https://github.com/psrenergy/JobQueueMPI.jl/actions?query=workflow%3ACI

[codecov-img]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl/coverage.svg?branch=master
[codecov-url]: https://codecov.io/gh/psrenergy/JobQueueMPI.jl?branch=master

| **Build Status** | **Coverage** |
|:-----------------:|:-----------------:|
| [![Build Status][build-img]][build-url] | [![Codecov branch][codecov-img]][codecov-url] |[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://psrenergy.github.io/PSRClassesInterface.jl/dev/)


JobQueueMPI.jl is a Julia package that provides a simplified interface for running multiple jobs in parallel using [MPI.jl](https://github.com/JuliaParallel/MPI.jl).

It uses the Job Queue concept to manage the jobs and the MPI processes. The user can add jobs to the queue and the package will take care of sending them to the available MPI processes.

## Installation

You can install JobQueueMPI.jl using the Julia package manager. From the Julia REPL, type `]` to enter the Pkg REPL mode and run:

```julia
pkg> add JobQueueMPI
```

## How it works

First, when running a program using MPI, the user has to set the number of processes that will parallelize the computation. One of these processes will be the controller, and the others will be the workers.

We can easily delimit the areas of the code that will be executed only by the controller or the worker.

JobQueueMPI.jl has the following components:

- `Controller`: The controller is responsible for managing the jobs and the workers. It keeps track of the jobs that have been sent and received and sends the jobs to the available workers.
- `Worker`: The worker is responsible for executing the jobs. It receives the jobs from the controller, executes them, and sends the results back to the controller.
77 changes: 43 additions & 34 deletions src/controller.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ mutable struct Controller
debug_mode::Bool
worker_status::Vector{WorkerStatus}
job_queue::Vector{Job}
jobs_sent::Int
jobs_received::Int
pending_jobs::Vector{JobRequest}
function Controller(n_workers::Int; debug_mode::Bool = false)
return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Any[], 0, 0)
return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Vector{Job}(), Vector{JobRequest}())
end
end

Expand All @@ -17,59 +16,69 @@ struct TerminationMessage end
_is_worker_available(controller::Controller, worker::Int) =
controller.worker_status[worker] == WORKER_AVAILABLE

function _pick_job_to_send!(controller::Controller)
return popfirst!(controller.job_queue)
end
is_job_queue_empty(controller::Controller) = isempty(controller.job_queue)
any_pending_jobs(controller::Controller) = !isempty(controller.pending_jobs)

function _pick_available_worker(controller::Controller)
for i in 1:controller.n_workers
if _is_worker_available(controller, i)
return i
end
function _pick_job_to_send!(controller::Controller)
if !is_job_queue_empty(controller)
return popfirst!(controller.job_queue)
else
error("Controller does not have any jobs to send.")
end
return error("No available workers. You should check with any_available_workers() first.")
end

function _any_available_workers(controller::Controller)
function _pick_available_workers(controller::Controller)
available_workers = []
for i in 1:controller.n_workers
if _is_worker_available(controller, i)
return true
push!(available_workers, i)
end
end
return false
return available_workers
end

function add_job_to_queue!(controller::Controller, message::Any, f::Function)
return push!(controller.job_queue, Job(message, f))
function add_job_to_queue!(controller::Controller, message::Any)
return push!(controller.job_queue, Job(message))
end

function send_job_to_any_available_worker(controller::Controller)
if _any_available_workers(controller)
worker = _pick_available_worker(controller)
job = _pick_job_to_send!(controller)
controller.jobs_sent += 1
controller.worker_status[worker] = WORKER_BUSY
MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32)
return true
else
return false
function send_jobs_to_any_available_workers(controller::Controller)
available_workers = _pick_available_workers(controller)
for worker in available_workers
if !is_job_queue_empty(controller)
job = _pick_job_to_send!(controller)
controller.worker_status[worker] = WORKER_BUSY
request = MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32)
push!(controller.pending_jobs, JobRequest(worker, request))
end
end
return nothing
end

function send_termination_message(controller::Controller)
requests = Vector{JobRequest}()
for worker in 1:controller.n_workers
MPI.isend(TerminationMessage(), _mpi_comm(); dest = worker, tag = worker + 32)
controller.worker_status[worker] = WORKER_TERMINATED
request = MPI.isend(TerminationMessage(), _mpi_comm(); dest = worker, tag = worker + 32)
controller.worker_status[worker] = WORKER_AVAILABLE
push!(requests, JobRequest(worker, request))
end
return _wait_all(requests)
end

function check_for_workers_job(controller::Controller)
for worker in 1:controller.n_workers
has_job = MPI.Iprobe(_mpi_comm(); source = worker, tag = worker + 32)
for j_i in eachindex(controller.pending_jobs)
has_job = MPI.Iprobe(
_mpi_comm();
source = controller.pending_jobs[j_i].worker,
tag = controller.pending_jobs[j_i].worker + 32,
)
if has_job
job = MPI.recv(_mpi_comm(); source = worker, tag = worker + 32)
controller.jobs_received += 1
controller.worker_status[worker] = WORKER_AVAILABLE
job = MPI.recv(
_mpi_comm();
source = controller.pending_jobs[j_i].worker,
tag = controller.pending_jobs[j_i].worker + 32,
)
controller.worker_status[controller.pending_jobs[j_i].worker] = WORKER_AVAILABLE
deleteat!(controller.pending_jobs, j_i)
return job
end
end
Expand Down
20 changes: 18 additions & 2 deletions src/job.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@ abstract type AbstractJob end

mutable struct Job <: AbstractJob
message::Any
f::Function
end

mutable struct JobAnswer <: AbstractJob
message::Any
end

mutable struct JobInterrupt <: AbstractJob
message::Any
end

mutable struct JobTerminate <: AbstractJob
message::Any
end

mutable struct JobRequest
worker::Int
request::MPI.Request
end

get_message(job::AbstractJob) = job.message
get_task(job::Job) = job.f

function _wait_all(job_requests::Vector{JobRequest})
requests = [job_request.request for job_request in job_requests]
return MPI.Waitall(requests)
end
1 change: 0 additions & 1 deletion src/worker.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
@enum WorkerStatus begin
WORKER_BUSY = 0
WORKER_AVAILABLE = 1
WORKER_TERMINATED = 2
end

mutable struct Worker
Expand Down
7 changes: 5 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ ENV["JULIA_PROJECT"] = dirname(Base.active_project())

JQM = JobQueueMPI

test_1_path = joinpath(".", "test1.jl")
test_2_path = joinpath(".", "test2.jl")

@testset verbose = true "JobQueueMPI Tests" begin
mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project ..\\test\\test1.jl`))
mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project ..\\test\\test2.jl`))
mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project $(test_1_path)`))
mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project $(test_2_path)`))
end
23 changes: 10 additions & 13 deletions test/test1.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mutable struct Message
vector_idx::Int
end

all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)

function sum_100(message::Message)
message.value += 100
return JobAnswer(message)
Expand All @@ -27,8 +29,7 @@ function workers_loop()
break
end
message = JQM.get_message(job)
job_task = JQM.get_task(job)
return_job = job_task(message)
return_job = sum_100(message)
JQM.send_job_to_controller(worker, return_job)
end
exit(0)
Expand All @@ -44,28 +45,23 @@ function job_queue(data)

if JQM.is_controller_process() # I am root
new_data = Array{T}(undef, N)
sent_messages = 0
delivered_messages = 0

controller = Controller(JQM.num_workers())

for i in eachindex(data)
message = Message(data[i], i)
JQM.add_job_to_queue!(controller, message, sum_100)
JQM.add_job_to_queue!(controller, message)
end

while sent_messages < N || delivered_messages < N
if sent_messages < N
if JQM.send_job_to_any_available_worker(controller)
sent_messages += 1
end
while !all_jobs_done(controller)
if !JQM.is_job_queue_empty(controller)
JQM.send_jobs_to_any_available_workers(controller)
end
if delivered_messages < N
if JQM.any_pending_jobs(controller)
job_answer = JQM.check_for_workers_job(controller)
if !isnothing(job_answer)
message = JQM.get_message(job_answer)
update_data(new_data, message)
delivered_messages += 1
end
end
end
Expand All @@ -76,7 +72,8 @@ function job_queue(data)
end
workers_loop()
JQM.mpi_barrier()
return JQM.mpi_finalize()
JQM.mpi_finalize()
return nothing
end

@testset "Sum 100" begin
Expand Down
23 changes: 10 additions & 13 deletions test/test2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ mutable struct WorkerMessage
vector_idx::Int
end

all_jobs_done(controller) = JQM.is_job_queue_empty(controller) && !JQM.any_pending_jobs(controller)

function get_divisors(message::ControllerMessage)
number = message.value
divisors = []
Expand Down Expand Up @@ -40,8 +42,7 @@ function workers_loop()
break
end
message = JQM.get_message(job)
job_task = JQM.get_task(job)
return_job = job_task(message)
return_job = get_divisors(message)
JQM.send_job_to_controller(worker, return_job)
end
exit(0)
Expand All @@ -56,28 +57,23 @@ function divisors(data)

if JQM.is_controller_process() # I am root
new_data = Array{Array{Int}}(undef, N)
sent_messages = 0
delivered_messages = 0

controller = Controller(JQM.num_workers())

for i in eachindex(data)
message = ControllerMessage(data[i], i)
JQM.add_job_to_queue!(controller, message, get_divisors)
JQM.add_job_to_queue!(controller, message)
end

while sent_messages < N || delivered_messages < N
if sent_messages < N
if JQM.send_job_to_any_available_worker(controller)
sent_messages += 1
end
while !all_jobs_done(controller)
if !JQM.is_job_queue_empty(controller)
JQM.send_jobs_to_any_available_workers(controller)
end
if delivered_messages < N
if JQM.any_pending_jobs(controller)
job_answer = JQM.check_for_workers_job(controller)
if !isnothing(job_answer)
message = JQM.get_message(job_answer)
update_data(new_data, message)
delivered_messages += 1
end
end
end
Expand All @@ -88,7 +84,8 @@ function divisors(data)
end
workers_loop()
JQM.mpi_barrier()
return JQM.mpi_finalize()
JQM.mpi_finalize()
return nothing
end

@testset "Divisors" begin
Expand Down

0 comments on commit fdaefc5

Please sign in to comment.