diff --git a/.JuliaFormatter.toml b/.JuliaFormatter.toml new file mode 100644 index 0000000..f83e5c5 --- /dev/null +++ b/.JuliaFormatter.toml @@ -0,0 +1,26 @@ +# Configuration file for JuliaFormatter.jl +# For more information, see: https://domluna.github.io/JuliaFormatter.jl/stable/config/ + +indent = 4 +margin = 120 +always_for_in = true +align_assignment = true +align_struct_field = true +whitespace_typedefs = true +whitespace_ops_in_indices = false +remove_extra_newlines = true +import_to_using = false +pipe_to_function_call = false +short_to_long_function_def = false +long_to_short_function_def = false +whitespace_in_kwargs = true +annotate_untyped_fields_with_any = true +format_docstrings = true +conditional_to_if = false +normalize_line_endings = "auto" +trailing_comma = true +join_lines_based_on_source = true +indent_submodule = false +separate_kwargs_with_semicolon = true +surround_whereop_typeparameters = true +always_use_return = true \ No newline at end of file diff --git a/.github/workflows/format_check.yml b/.github/workflows/format_check.yml new file mode 100644 index 0000000..e4dcced --- /dev/null +++ b/.github/workflows/format_check.yml @@ -0,0 +1,35 @@ +name: format-check +on: + push: + branches: + - master + - release-* + pull_request: + types: [opened, synchronize, reopened] +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: julia-actions/setup-julia@latest + with: + version: '1' + - uses: actions/checkout@v1 + - name: Format check + shell: julia --color=yes {0} + run: | + using Pkg + # If you update the version, also update the style guide docs. + Pkg.add(PackageSpec(name="JuliaFormatter", version="1")) + using JuliaFormatter + format("src", verbose=true) + format("test", verbose=true) + format("docs", verbose=true) + out = String(read(Cmd(`git diff`))) + if isempty(out) + exit(0) + end + @error "Some files have not been formatted !!!" + write(stdout, out) + exit(1) + +# Workflow from https://github.com/jump-dev/JuMP.jl/blob/master/.github/workflows/format_check.yml \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..983a9e3 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,48 @@ +name: Test +on: + push: + branches: [master] + pull_request: + types: [opened, synchronize, reopened] +jobs: + test: + name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + include: + - version: '1' + os: windows-latest + arch: x64 + - version: '1.6' + os: windows-latest + arch: x64 + - version: '1' + os: ubuntu-latest + arch: x64 + - version: '1.6' + os: ubuntu-latest + arch: x64 + steps: + - uses: actions/checkout@v2 + - uses: julia-actions/setup-julia@v1 + with: + version: ${{ matrix.version }} + arch: ${{ matrix.arch }} + - uses: actions/cache@v1 + env: + cache-name: cache-artifacts + with: + path: ~/.julia/artifacts + key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} + restore-keys: | + ${{ runner.os }}-test-${{ env.cache-name }}- + ${{ runner.os }}-test- + ${{ runner.os }}- + - uses: julia-actions/julia-buildpkg@v1 + - uses: julia-actions/julia-runtest@v1 + - uses: julia-actions/julia-processcoverage@v1 + - uses: codecov/codecov-action@v1 + with: + file: lcov.info \ No newline at end of file diff --git a/mpiexecjl.ps1 b/mpiexecjl.ps1 new file mode 100644 index 0000000..7bd987d --- /dev/null +++ b/mpiexecjl.ps1 @@ -0,0 +1,78 @@ +# Copyright (C) 2023 Guilherme Bodin +# License is MIT "Expat" +# +# Commentary: +# +# Command line utility to call the `mpiexec` binary used by the `MPI.jl` version +# in the given Julia project. It has the same syntax as the `mpiexec` binary +# that would be called, with the additional `--project=...` flag to select a +# different Julia project. +# +# Examples of usage (the MPI flags available depend on the MPI implementation +# called): +# +# $ mpiexecjl -n 40 julia mpi-script.jl +# $ mpiexecjl --project=my_experiment -n 80 --oversubscribe julia mpi-script.jl +# To call the application in parallel in the non-compiled mode powershell.exe -ExecutionPolicy ByPass -File .\mpiexecjl.ps1 -n 3 --project=. julia .\main.jl + +function usage { + Write-Host "Usage: $([System.IO.Path]::GetFileNameWithoutExtension($MyInvocation.MyCommand.Name)) [--project=...] MPIEXEC_ARGUMENTS..." + Write-Host "Call the mpiexec binary in the Julia environment specified by the --project option." + Write-Host "If no project is specified, the MPI associated with the global Julia environment will be used." + Write-Host "All other arguments are forwarded to mpiexec." +} + +$julia_args = @() +$PROJECT_ARG = "" + +foreach ($arg in $args) { + if ($arg -match "^--project(=.*)?$") { + $PROJECT_ARG = $arg + } + elseif ($arg -eq "-h" -or $arg -eq "--help") { + $helpRequested = $true + usage + Write-Host "Below is the help of the current mpiexec." + Write-Host + exit 0 + } + else { + $julia_args += $arg + } +} + +if (-not $julia_args) { + Write-Error "ERROR: no arguments specified." + usage + exit 1 +} + +if ($env:JULIA_BINDIR) { + $JULIA_CMD = Join-Path $env:JULIA_BINDIR "julia" +} else { + $JULIA_CMD = "julia" +} + +$SCRIPT = @' +using MPI +ENV[\"JULIA_PROJECT\"] = dirname(Base.active_project()) +mpiexec(exe -> run(`$exe $ARGS`)) +'@ + +$PRECOMPILATION_SCRIPT = @' +println(\"precompiling current project before running MPI\") + +import Pkg +Pkg.activate(dirname(Base.active_project())) +Pkg.instantiate() + +println(\"precompilation finished\") +'@ + +& $JULIA_CMD $PROJECT_ARG -e $PRECOMPILATION_SCRIPT + +if ($PROJECT_ARG) { + & $JULIA_CMD $PROJECT_ARG --color=yes --startup-file=no --compile=min -O0 -e $SCRIPT -- $julia_args +} else { + & $JULIA_CMD --color=yes --startup-file=no --compile=min -O0 -e $SCRIPT -- $julia_args +} \ No newline at end of file diff --git a/src/JobQueueMPI.jl b/src/JobQueueMPI.jl index 5d52e25..c19cc9b 100644 --- a/src/JobQueueMPI.jl +++ b/src/JobQueueMPI.jl @@ -2,7 +2,10 @@ module JobQueueMPI using MPI +export Controller, JobQueueMPI, Job, JobAnswer, Worker, TerminationMessage + include("mpi_utils.jl") +include("job.jl") include("worker.jl") include("controller.jl") diff --git a/src/controller.jl b/src/controller.jl index bfe12ab..b03b269 100644 --- a/src/controller.jl +++ b/src/controller.jl @@ -4,24 +4,20 @@ mutable struct Controller n_workers::Int debug_mode::Bool worker_status::Vector{WorkerStatus} - job_queue::Vector{Any} - messages_sent::Int - messages_received::Int - function Controller(n_workers::Int; debug_mode::Bool=false) - return new( - n_workers, - debug_mode, - fill(WORKER_AVAILABLE, n_workers), - Any[], - 0, - 0, - ) + job_queue::Vector{Job} + jobs_sent::Int + jobs_received::Int + function Controller(n_workers::Int; debug_mode::Bool = false) + return new(n_workers, debug_mode, fill(WORKER_AVAILABLE, n_workers), Any[], 0, 0) end end struct TerminationMessage end -function _pick_message_to_send!(controller::Controller) +_is_worker_available(controller::Controller, worker::Int) = + controller.worker_status[worker] == WORKER_AVAILABLE + +function _pick_job_to_send!(controller::Controller) return popfirst!(controller.job_queue) end @@ -31,7 +27,7 @@ function _pick_available_worker(controller::Controller) return i end end - error("No available workers. You should check with any_available_workers() first.") + return error("No available workers. You should check with any_available_workers() first.") end function _any_available_workers(controller::Controller) @@ -43,41 +39,39 @@ function _any_available_workers(controller::Controller) return false end -function _has_message_to_send(controller::Controller) - return !isempty(controller.job_queue) -end - -function add_message_to_queue!(controller::Controller, message::Any) - push!(controller.job_queue, message) +function add_job_to_queue!(controller::Controller, message::Any, f::Function) + return push!(controller.job_queue, Job(message, f)) end -function send_message_to_any_available_worker_worker(controller::Controller) - if _has_message_to_send(controller) && any_available_workers(controller) +function send_job_to_any_available_worker(controller::Controller) + if _any_available_workers(controller) worker = _pick_available_worker(controller) - message = _pick_message_to_send!(controller) - controller.messages_sent += 1 + job = _pick_job_to_send!(controller) + controller.jobs_sent += 1 controller.worker_status[worker] = WORKER_BUSY - MPI.isend(message, _mpi_comm(); dest=worker, tag=worker + 32) + MPI.isend(job, _mpi_comm(); dest = worker, tag = worker + 32) + return true + else + return false end end function send_termination_message(controller::Controller) for worker in 1:controller.n_workers - MPI.isend(TerminationMessage(), _mpi_comm(); dest=worker, tag=worker + 32) + MPI.isend(TerminationMessage(), _mpi_comm(); dest = worker, tag = worker + 32) controller.worker_status[worker] = WORKER_TERMINATED end end -function check_for_workers_message(controller::Controller) - message = nothing +function check_for_workers_job(controller::Controller) for worker in 1:controller.n_workers - has_message = MPI.Iprobe(comm(); source = worker, tag = worker + 32) - if has_message - message = MPI.recv(comm(); source = worker, tag = worker + 32) - controller.messages_received += 1 + has_job = MPI.Iprobe(_mpi_comm(); source = worker, tag = worker + 32) + if has_job + job = MPI.recv(_mpi_comm(); source = worker, tag = worker + 32) + controller.jobs_received += 1 controller.worker_status[worker] = WORKER_AVAILABLE - break + return job end end - return message -end \ No newline at end of file + return nothing +end diff --git a/src/job.jl b/src/job.jl new file mode 100644 index 0000000..20ccf0a --- /dev/null +++ b/src/job.jl @@ -0,0 +1,13 @@ +abstract type AbstractJob end + +mutable struct Job <: AbstractJob + message::Any + f::Function +end + +mutable struct JobAnswer <: AbstractJob + message::Any +end + +get_message(job::AbstractJob) = job.message +get_task(job::Job) = job.f diff --git a/src/mpi_utils.jl b/src/mpi_utils.jl index 1e6adf4..ab6a5c2 100644 --- a/src/mpi_utils.jl +++ b/src/mpi_utils.jl @@ -22,4 +22,4 @@ controller_rank() = 0 is_worker_process() = !is_controller_process() -num_workers() = world_size() - 1 \ No newline at end of file +num_workers() = world_size() - 1 diff --git a/src/worker.jl b/src/worker.jl index 2a32951..448ca5d 100644 --- a/src/worker.jl +++ b/src/worker.jl @@ -6,25 +6,19 @@ end mutable struct Worker rank::Int + Worker() = new(my_rank()) end -function workers_loop(f, args...) - if is_worker_process() - worker = Worker(my_rank()) - while true - message = MPI.recv(comm(); source = controller_rank(), tag = worker.rank + 32) - if message == TerminationMessage() - break - end - message = f(args...) - send_message_to_controller(worker, message) - end - else - error("This function should only be called by worker processes.") - end - return 0 +function reset_request(worker::Worker) + return worker.request = nothing end -function send_message_to_controller(worker::Worker, message::Any) - return MPI.send(message, _mpi_comm(); dest=controller_rank(), tag=worker.rank + 32) -end \ No newline at end of file +has_job(worker::Worker) = + MPI.Iprobe(_mpi_comm(); source = controller_rank(), tag = worker.rank + 32) == true + +function send_job_to_controller(worker::Worker, job::Any) + return MPI.isend(job, _mpi_comm(); dest = controller_rank(), tag = worker.rank + 32) +end + +receive_job(worker::Worker) = + MPI.recv(_mpi_comm(); source = controller_rank(), tag = worker.rank + 32) diff --git a/test/Project.toml b/test/Project.toml new file mode 100644 index 0000000..c60df76 --- /dev/null +++ b/test/Project.toml @@ -0,0 +1,3 @@ +[deps] +MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/test/runtests.jl b/test/runtests.jl index 6043679..d47a6c7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,11 @@ using Test +using JobQueueMPI +using MPI +ENV["JULIA_PROJECT"] = dirname(Base.active_project()) -@testset "JobQueueMPI" begin - @test true -end \ No newline at end of file +JQM = JobQueueMPI + +@testset verbose = true "JobQueueMPI Tests" begin + mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project ..\\test\\test1.jl`)) + mpiexec(exe -> run(`$exe -n 3 $(Base.julia_cmd()) --project ..\\test\\test2.jl`)) +end diff --git a/test/test1.jl b/test/test1.jl new file mode 100644 index 0000000..77813db --- /dev/null +++ b/test/test1.jl @@ -0,0 +1,80 @@ +using JobQueueMPI +using Test +JQM = JobQueueMPI + +mutable struct Message + value::Int + vector_idx::Int +end + +function sum_100(message::Message) + message.value += 100 + return JobAnswer(message) +end + +function update_data(new_data, message::Message) + idx = message.vector_idx + value = message.value + return new_data[idx] = value +end + +function job_queue(data) + JQM.mpi_init() + JQM.mpi_barrier() + + T = eltype(data) + N = length(data) + + if JQM.is_controller_process() # I am root + new_data = Array{T}(undef, N) + sent_messages = 0 + delivered_messages = 0 + + controller = Controller(JQM.num_workers()) + + for i in eachindex(data) + message = Message(data[i], i) + JQM.add_job_to_queue!(controller, message, sum_100) + end + + while sent_messages < N || delivered_messages < N + if sent_messages < N + if JQM.send_job_to_any_available_worker(controller) + sent_messages += 1 + end + end + if delivered_messages < N + job_answer = JQM.check_for_workers_job(controller) + if !isnothing(job_answer) + message = JQM.get_message(job_answer) + update_data(new_data, message) + delivered_messages += 1 + end + end + end + + JQM.send_termination_message(controller) + + return new_data + else # If rank == worker + worker = Worker() + while true + job = JQM.receive_job(worker) + if job == TerminationMessage() + break + end + message = JQM.get_message(job) + job_task = JQM.get_task(job) + return_job = job_task(message) + JQM.send_job_to_controller(worker, return_job) + end + exit(0) + end + JQM.mpi_barrier() + return JQM.mpi_finalize() +end + +@testset "Sum 100" begin + data = collect(1:10) + @test job_queue(data) == [101, 102, 103, 104, 105, 106, 107, 108, 109, 110] +end diff --git a/test/test2.jl b/test/test2.jl new file mode 100644 index 0000000..e666611 --- /dev/null +++ b/test/test2.jl @@ -0,0 +1,102 @@ +using JobQueueMPI +using Test +JQM = JobQueueMPI + +mutable struct ControllerMessage + value::Int + vector_idx::Int +end + +mutable struct WorkerMessage + divisors::Array{Int} + vector_idx::Int +end + +function get_divisors(message::ControllerMessage) + number = message.value + divisors = [] + + for i in 1:number + if number % i == 0 + push!(divisors, i) + end + end + + return JQM.JobAnswer(WorkerMessage(divisors, message.vector_idx)) +end + +function update_data(new_data, message::WorkerMessage) + idx = message.vector_idx + value = message.divisors + return new_data[idx] = value +end + +function divisors(data) + JQM.mpi_init() + JQM.mpi_barrier() + + N = length(data) + + if JQM.is_controller_process() # I am root + new_data = Array{Array{Int}}(undef, N) + sent_messages = 0 + delivered_messages = 0 + + controller = Controller(JQM.num_workers()) + + for i in eachindex(data) + message = ControllerMessage(data[i], i) + JQM.add_job_to_queue!(controller, message, get_divisors) + end + + while sent_messages < N || delivered_messages < N + if sent_messages < N + if JQM.send_job_to_any_available_worker(controller) + sent_messages += 1 + end + end + if delivered_messages < N + job_answer = JQM.check_for_workers_job(controller) + if !isnothing(job_answer) + message = JQM.get_message(job_answer) + update_data(new_data, message) + delivered_messages += 1 + end + end + end + + JQM.send_termination_message(controller) + + return new_data + else # If rank == worker + worker = Worker() + while true + job = JQM.receive_job(worker) + if job == TerminationMessage() + break + end + message = JQM.get_message(job) + job_task = JQM.get_task(job) + return_job = job_task(message) + JQM.send_job_to_controller(worker, return_job) + end + exit(0) + end + JQM.mpi_barrier() + return JQM.mpi_finalize() +end + +@testset "Divisors" begin + data = [2, 4, 6, 8, 10, 12, 14, 16, 18, 20] + values = divisors(data) + @test values[1] == [1, 2] + @test values[2] == [1, 2, 4] + @test values[3] == [1, 2, 3, 6] + @test values[4] == [1, 2, 4, 8] + @test values[5] == [1, 2, 5, 10] + @test values[6] == [1, 2, 3, 4, 6, 12] + @test values[7] == [1, 2, 7, 14] + @test values[8] == [1, 2, 4, 8, 16] + @test values[9] == [1, 2, 3, 6, 9, 18] + @test values[10] == [1, 2, 4, 5, 10, 20] +end