Skip to content

Commit

Permalink
Working
Browse files Browse the repository at this point in the history
  • Loading branch information
pedroripper committed Mar 25, 2024
1 parent a5ccaee commit 0a00738
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 57 deletions.
26 changes: 26 additions & 0 deletions .JuliaFormatter.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Configuration file for JuliaFormatter.jl
# For more information, see: https://domluna.github.io/JuliaFormatter.jl/stable/config/

indent = 4
margin = 120
always_for_in = true
align_assignment = true
align_struct_field = true
whitespace_typedefs = true
whitespace_ops_in_indices = false
remove_extra_newlines = true
import_to_using = false
pipe_to_function_call = false
short_to_long_function_def = false
long_to_short_function_def = false
whitespace_in_kwargs = true
annotate_untyped_fields_with_any = true
format_docstrings = true
conditional_to_if = false
normalize_line_endings = "auto"
trailing_comma = true
join_lines_based_on_source = true
indent_submodule = false
separate_kwargs_with_semicolon = true
surround_whereop_typeparameters = true
always_use_return = true
35 changes: 35 additions & 0 deletions .github/workflows/format_check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: format-check
on:
push:
branches:
- master
- release-*
pull_request:
types: [opened, synchronize, reopened]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: julia-actions/setup-julia@latest
with:
version: '1'
- uses: actions/checkout@v1
- name: Format check
shell: julia --color=yes {0}
run: |
using Pkg
# If you update the version, also update the style guide docs.
Pkg.add(PackageSpec(name="JuliaFormatter", version="1"))
using JuliaFormatter
format("src", verbose=true)
format("test", verbose=true)
format("docs", verbose=true)
out = String(read(Cmd(`git diff`)))
if isempty(out)
exit(0)
end
@error "Some files have not been formatted !!!"
write(stdout, out)
exit(1)
# Workflow from https://github.com/jump-dev/JuMP.jl/blob/master/.github/workflows/format_check.yml
48 changes: 48 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
name: Test
on:
push:
branches: [master]
pull_request:
types: [opened, synchronize, reopened]
jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
include:
- version: '1'
os: windows-latest
arch: x64
- version: '1.6'
os: windows-latest
arch: x64
- version: '1'
os: ubuntu-latest
arch: x64
- version: '1.6'
os: ubuntu-latest
arch: x64
steps:
- uses: actions/checkout@v2
- uses: julia-actions/setup-julia@v1
with:
version: ${{ matrix.version }}
arch: ${{ matrix.arch }}
- uses: actions/cache@v1
env:
cache-name: cache-artifacts
with:
path: ~/.julia/artifacts
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
restore-keys: |
${{ runner.os }}-test-${{ env.cache-name }}-
${{ runner.os }}-test-
${{ runner.os }}-
- uses: julia-actions/julia-buildpkg@v1
- uses: julia-actions/julia-runtest@v1
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v1
with:
file: lcov.info
78 changes: 78 additions & 0 deletions mpiexecjl.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright (C) 2023 Guilherme Bodin
# License is MIT "Expat"
#
# Commentary:
#
# Command line utility to call the `mpiexec` binary used by the `MPI.jl` version
# in the given Julia project. It has the same syntax as the `mpiexec` binary
# that would be called, with the additional `--project=...` flag to select a
# different Julia project.
#
# Examples of usage (the MPI flags available depend on the MPI implementation
# called):
#
# $ mpiexecjl -n 40 julia mpi-script.jl
# $ mpiexecjl --project=my_experiment -n 80 --oversubscribe julia mpi-script.jl
# To call the application in parallel in the non-compiled mode powershell.exe -ExecutionPolicy ByPass -File .\mpiexecjl.ps1 -n 3 --project=. julia .\main.jl

function usage {
Write-Host "Usage: $([System.IO.Path]::GetFileNameWithoutExtension($MyInvocation.MyCommand.Name)) [--project=...] MPIEXEC_ARGUMENTS..."
Write-Host "Call the mpiexec binary in the Julia environment specified by the --project option."
Write-Host "If no project is specified, the MPI associated with the global Julia environment will be used."
Write-Host "All other arguments are forwarded to mpiexec."
}

$julia_args = @()
$PROJECT_ARG = ""

foreach ($arg in $args) {
if ($arg -match "^--project(=.*)?$") {
$PROJECT_ARG = $arg
}
elseif ($arg -eq "-h" -or $arg -eq "--help") {
$helpRequested = $true
usage
Write-Host "Below is the help of the current mpiexec."
Write-Host
exit 0
}
else {
$julia_args += $arg
}
}

if (-not $julia_args) {
Write-Error "ERROR: no arguments specified."
usage
exit 1
}

if ($env:JULIA_BINDIR) {
$JULIA_CMD = Join-Path $env:JULIA_BINDIR "julia"
} else {
$JULIA_CMD = "julia"
}

$SCRIPT = @'
using MPI
ENV[\"JULIA_PROJECT\"] = dirname(Base.active_project())
mpiexec(exe -> run(`$exe $ARGS`))
'@

$PRECOMPILATION_SCRIPT = @'
println(\"precompiling current project before running MPI\")
import Pkg
Pkg.activate(dirname(Base.active_project()))
Pkg.instantiate()
println(\"precompilation finished\")
'@

& $JULIA_CMD $PROJECT_ARG -e $PRECOMPILATION_SCRIPT

if ($PROJECT_ARG) {
& $JULIA_CMD $PROJECT_ARG --color=yes --startup-file=no --compile=min -O0 -e $SCRIPT -- $julia_args
} else {
& $JULIA_CMD --color=yes --startup-file=no --compile=min -O0 -e $SCRIPT -- $julia_args
}
3 changes: 3 additions & 0 deletions src/JobQueueMPI.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ module JobQueueMPI

using MPI

export Controller, JobQueueMPI, Job, JobAnswer, Worker, TerminationMessage

include("mpi_utils.jl")
include("job.jl")
include("worker.jl")
include("controller.jl")

Expand Down
64 changes: 29 additions & 35 deletions src/controller.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,20 @@ mutable struct Controller
n_workers::Int
debug_mode::Bool
worker_status::Vector{WorkerStatus}
job_queue::Vector{Any}
messages_sent::Int
messages_received::Int
function Controller(n_workers::Int; debug_mode::Bool=false)
return new(
n_workers,
debug_mode,
fill(WORKER_AVAILABLE, n_workers),
Any[],
0,
0,
)
job_queue::Vector{Job}
jobs_sent::Int
jobs_received::Int
function Controller(n_workers::Int; debug_mode::Bool = false)
return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Any[], 0, 0)
end
end

struct TerminationMessage end

function _pick_message_to_send!(controller::Controller)
_is_worker_available(controller::Controller, worker::Int) =
controller.worker_status[worker] == WORKER_AVAILABLE

function _pick_job_to_send!(controller::Controller)
return popfirst!(controller.job_queue)
end

Expand All @@ -31,7 +27,7 @@ function _pick_available_worker(controller::Controller)
return i
end
end
error("No available workers. You should check with any_available_workers() first.")
return error("No available workers. You should check with any_available_workers() first.")
end

function _any_available_workers(controller::Controller)
Expand All @@ -43,41 +39,39 @@ function _any_available_workers(controller::Controller)
return false
end

function _has_message_to_send(controller::Controller)
return !isempty(controller.job_queue)
end

function add_message_to_queue!(controller::Controller, message::Any)
push!(controller.job_queue, message)
function add_job_to_queue!(controller::Controller, message::Any, f::Function)
return push!(controller.job_queue, Job(message, f))
end

function send_message_to_any_available_worker_worker(controller::Controller)
if _has_message_to_send(controller) && any_available_workers(controller)
function send_job_to_any_available_worker(controller::Controller)
if _any_available_workers(controller)
worker = _pick_available_worker(controller)
message = _pick_message_to_send!(controller)
controller.messages_sent += 1
job = _pick_job_to_send!(controller)
controller.jobs_sent += 1
controller.worker_status[worker] = WORKER_BUSY
MPI.isend(message, _mpi_comm(); dest=worker, tag=worker + 32)
MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32)
return true
else
return false
end
end

function send_termination_message(controller::Controller)
for worker in 1:controller.n_workers
MPI.isend(TerminationMessage(), _mpi_comm(); dest=worker, tag=worker + 32)
MPI.isend(TerminationMessage(), _mpi_comm(); dest = worker, tag = worker + 32)
controller.worker_status[worker] = WORKER_TERMINATED
end
end

function check_for_workers_message(controller::Controller)
message = nothing
function check_for_workers_job(controller::Controller)
for worker in 1:controller.n_workers
has_message = MPI.Iprobe(comm(); source = worker, tag = worker + 32)
if has_message
message = MPI.recv(comm(); source = worker, tag = worker + 32)
controller.messages_received += 1
has_job = MPI.Iprobe(_mpi_comm(); source = worker, tag = worker + 32)
if has_job
job = MPI.recv(_mpi_comm(); source = worker, tag = worker + 32)
controller.jobs_received += 1
controller.worker_status[worker] = WORKER_AVAILABLE
break
return job
end
end
return message
end
return nothing
end
13 changes: 13 additions & 0 deletions src/job.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
abstract type AbstractJob end

mutable struct Job <: AbstractJob
message::Any
f::Function
end

mutable struct JobAnswer <: AbstractJob
message::Any
end

get_message(job::AbstractJob) = job.message
get_task(job::Job) = job.f
2 changes: 1 addition & 1 deletion src/mpi_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ controller_rank() = 0

is_worker_process() = !is_controller_process()

num_workers() = world_size() - 1
num_workers() = world_size() - 1
30 changes: 12 additions & 18 deletions src/worker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,19 @@ end

mutable struct Worker
rank::Int
Worker() = new(my_rank())
end

function workers_loop(f, args...)
if is_worker_process()
worker = Worker(my_rank())
while true
message = MPI.recv(comm(); source = controller_rank(), tag = worker.rank + 32)
if message == TerminationMessage()
break
end
message = f(args...)
send_message_to_controller(worker, message)
end
else
error("This function should only be called by worker processes.")
end
return 0
function reset_request(worker::Worker)
return worker.request = nothing
end

function send_message_to_controller(worker::Worker, message::Any)
return MPI.send(message, _mpi_comm(); dest=controller_rank(), tag=worker.rank + 32)
end
has_job(worker::Worker) =
MPI.Iprobe(_mpi_comm(); source = controller_rank(), tag = worker.rank + 32) == true

function send_job_to_controller(worker::Worker, job::Any)
return MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32)
end

receive_job(worker::Worker) =
MPI.recv(_mpi_comm(); source = controller_rank(), tag = worker.rank + 32)
3 changes: 3 additions & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[deps]
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
12 changes: 9 additions & 3 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
using Test
using JobQueueMPI
using MPI
ENV["JULIA_PROJECT"] = dirname(Base.active_project())

@testset "JobQueueMPI" begin
@test true
end
JQM = JobQueueMPI

@testset verbose = true "JobQueueMPI Tests" begin
mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project ..\\test\\test1.jl`))
mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project ..\\test\\test2.jl`))
end
Loading

0 comments on commit 0a00738

Please sign in to comment.