Skip to content

Commit

Permalink
Merge branch 'master' into kf/globalminworld
Browse files Browse the repository at this point in the history
  • Loading branch information
DilumAluthge authored Jan 27, 2025
2 parents dfa20f4 + c19a6bb commit 7f6d503
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 114 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ New language features
- actual running time for the task (`Base.Experimental.task_running_time_ns`), and
- wall-time for the task (`Base.Experimental.task_wall_time_ns`).
- Support for Unicode 16 ([#56925]).
- `Threads.@spawn` now takes a `:samepool` argument to specify the same threadpool as the caller.
`Threads.@spawn :samepool foo()` which is shorthand for `Threads.@spawn Threads.threadpool() foo()` ([#57109])

Language changes
----------------
Expand Down
90 changes: 3 additions & 87 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -125,104 +125,20 @@ proceeding.
"""
function wait end

# wait with timeout
#
# The behavior of wait changes if a timeout is specified. There are
# three concurrent entities that can interact:
# 1. Task W: the task that calls wait w/timeout.
# 2. Task T: the task created to handle a timeout.
# 3. Task N: the task that notifies the Condition being waited on.
#
# Typical flow:
# - W enters the Condition's wait queue.
# - W creates T and stops running (calls wait()).
# - T, when scheduled, waits on a Timer.
# - Two common outcomes:
# - N notifies the Condition.
# - W starts running, closes the Timer, sets waiter_left and returns
# the notify'ed value.
# - The closed Timer throws an EOFError to T which simply ends.
# - The Timer expires.
# - T starts running and locks the Condition.
# - T confirms that waiter_left is unset and that W is still in the
# Condition's wait queue; it then removes W from the wait queue,
# sets dosched to true and unlocks the Condition.
# - If dosched is true, T schedules W with the special :timed_out
# value.
# - T ends.
# - W runs and returns :timed_out.
#
# Some possible interleavings:
# - N notifies the Condition but the Timer expires and T starts running
# before W:
# - W closing the expired Timer is benign.
# - T will find that W is no longer in the Condition's wait queue
# (which is protected by a lock) and will not schedule W.
# - N notifies the Condition; W runs and calls wait on the Condition
# again before the Timer expires:
# - W sets waiter_left before leaving. When T runs, it will find that
# waiter_left is set and will not schedule W.
#
# The lock on the Condition's wait queue and waiter_left together
# ensure proper synchronization and behavior of the tasks involved.

"""
wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
wait(c::GenericCondition; first::Bool=false)
Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.
If the keyword `first` is set to `true`, the waiter will be put _first_
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
If `timeout` is specified, cancel the `wait` when it expires and return
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
millisecond.
"""
function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
timeout == 0.0 || timeout 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))

function wait(c::GenericCondition; first::Bool=false)
ct = current_task()
_wait2(c, ct, first)
token = unlockall(c.lock)

timer::Union{Timer, Nothing} = nothing
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
if timeout > 0.0
timer = Timer(timeout)
waiter_left = Threads.Atomic{Bool}(false)
# start a task to wait on the timer
t = Task() do
try
wait(timer)
catch e
# if the timer was closed, the waiting task has been scheduled; do nothing
e isa EOFError && return
end
dosched = false
lock(c.lock)
# Confirm that the waiting task is still in the wait queue and remove it. If
# the task is not in the wait queue, it must have been notified already so we
# don't do anything here.
if !waiter_left[] && ct.queue == c.waitq
dosched = true
Base.list_deletefirst!(c.waitq, ct)
end
unlock(c.lock)
# send the waiting task a timeout
dosched && schedule(ct, :timed_out)
end
t.sticky = false
Threads._spawn_set_thrpool(t, :interactive)
schedule(t)
end

try
res = wait()
if timer !== nothing
close(timer)
waiter_left[] = true
end
return res
return wait()
catch
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
Expand Down
109 changes: 109 additions & 0 deletions base/experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
module Experimental

using Base: Threads, sync_varname, is_function_def, @propagate_inbounds
using Base: GenericCondition
using Base.Meta

"""
Expand Down Expand Up @@ -577,4 +578,112 @@ function task_wall_time_ns(t::Task=current_task())
return end_at - start_at
end

# wait_with_timeout
#
# A version of `wait(c::Condition)` that additionally allows the
# specification of a timeout. This is experimental as it will likely
# be dropped when a cancellation framework is added.
#
# The parallel behavior of wait_with_timeout is specified here. There
# are three concurrent entities that can interact:
# 1. Task W: the task that calls wait_with_timeout.
# 2. Task T: the task created to handle a timeout.
# 3. Task N: the task that notifies the Condition being waited on.
#
# Typical flow:
# - W enters the Condition's wait queue.
# - W creates T and stops running (calls wait()).
# - T, when scheduled, waits on a Timer.
# - Two common outcomes:
# - N notifies the Condition.
# - W starts running, closes the Timer, sets waiter_left and returns
# the notify'ed value.
# - The closed Timer throws an EOFError to T which simply ends.
# - The Timer expires.
# - T starts running and locks the Condition.
# - T confirms that waiter_left is unset and that W is still in the
# Condition's wait queue; it then removes W from the wait queue,
# sets dosched to true and unlocks the Condition.
# - If dosched is true, T schedules W with the special :timed_out
# value.
# - T ends.
# - W runs and returns :timed_out.
#
# Some possible interleavings:
# - N notifies the Condition but the Timer expires and T starts running
# before W:
# - W closing the expired Timer is benign.
# - T will find that W is no longer in the Condition's wait queue
# (which is protected by a lock) and will not schedule W.
# - N notifies the Condition; W runs and calls wait on the Condition
# again before the Timer expires:
# - W sets waiter_left before leaving. When T runs, it will find that
# waiter_left is set and will not schedule W.
#
# The lock on the Condition's wait queue and waiter_left together
# ensure proper synchronization and behavior of the tasks involved.

"""
wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.
If the keyword `first` is set to `true`, the waiter will be put _first_
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
If `timeout` is specified, cancel the `wait` when it expires and return
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
millisecond.
"""
function wait_with_timeout(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
ct = current_task()
Base._wait2(c, ct, first)
token = Base.unlockall(c.lock)

timer::Union{Timer, Nothing} = nothing
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
if timeout > 0.0
timer = Timer(timeout)
waiter_left = Threads.Atomic{Bool}(false)
# start a task to wait on the timer
t = Task() do
try
wait(timer)
catch e
# if the timer was closed, the waiting task has been scheduled; do nothing
e isa EOFError && return
end
dosched = false
lock(c.lock)
# Confirm that the waiting task is still in the wait queue and remove it. If
# the task is not in the wait queue, it must have been notified already so we
# don't do anything here.
if !waiter_left[] && ct.queue == c.waitq
dosched = true
Base.list_deletefirst!(c.waitq, ct)
end
unlock(c.lock)
# send the waiting task a timeout
dosched && schedule(ct, :timed_out)
end
t.sticky = false
Threads._spawn_set_thrpool(t, :interactive)
schedule(t)
end

try
res = wait()
if timer !== nothing
close(timer)
waiter_left[] = true
end
return res
catch
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
rethrow()
finally
Base.relockall(c.lock, token)
end
end

end # module
16 changes: 12 additions & 4 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -440,10 +440,11 @@ function _spawn_set_thrpool(t::Task, tp::Symbol)
end

"""
Threads.@spawn [:default|:interactive] expr
Threads.@spawn [:default|:interactive|:samepool] expr
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
thread in the specified threadpool (`:default` if unspecified). The task is
thread in the specified threadpool: `:default`, `:interactive`, or `:samepool`
to use the same as the caller. `:default` is used if unspecified. The task is
allocated to a thread once one becomes available. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call
[`fetch`](@ref) to wait and then obtain its return value.
Expand All @@ -468,6 +469,9 @@ the variable's value in the current task.
!!! compat "Julia 1.9"
A threadpool may be specified as of Julia 1.9.
!!! compat "Julia 1.12"
The same threadpool may be specified as of Julia 1.12.
# Examples
```julia-repl
julia> t() = println("Hello from ", Threads.threadid());
Expand All @@ -486,7 +490,7 @@ macro spawn(args...)
ttype, ex = args
if ttype isa QuoteNode
ttype = ttype.value
if ttype !== :interactive && ttype !== :default
if !in(ttype, (:interactive, :default, :samepool))
throw(ArgumentError(LazyString("unsupported threadpool in @spawn: ", ttype)))
end
tp = QuoteNode(ttype)
Expand All @@ -507,7 +511,11 @@ macro spawn(args...)
let $(letargs...)
local task = Task($thunk)
task.sticky = false
_spawn_set_thrpool(task, $(esc(tp)))
local tp = $(esc(tp))
if tp == :samepool
tp = Threads.threadpool()
end
_spawn_set_thrpool(task, tp)
if $(Expr(:islocal, var))
put!($var, task)
end
Expand Down
2 changes: 1 addition & 1 deletion contrib/refresh_checksums.mk
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ CLANG_TRIPLETS=$(filter %-darwin %-freebsd,$(TRIPLETS))
NON_CLANG_TRIPLETS=$(filter-out %-darwin %-freebsd,$(TRIPLETS))

# These are the projects currently using BinaryBuilder; both GCC-expanded and non-GCC-expanded:
BB_PROJECTS=openssl libssh2 nghttp2 mpfr curl libgit2 pcre libuv unwind llvmunwind dsfmt objconv p7zip zlib libsuitesparse openlibm blastrampoline libtracyclient
BB_PROJECTS=openssl libssh2 nghttp2 mpfr curl libgit2 pcre libuv unwind llvmunwind dsfmt objconv p7zip zlib libsuitesparse openlibm blastrampoline libtracyclient mmtk_julia
BB_GCC_EXPANDED_PROJECTS=openblas csl
BB_CXX_EXPANDED_PROJECTS=gmp llvm clang llvm-tools lld
# These are non-BB source-only deps
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7405afe10033da0431c8fd920a8cbbbf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ad3498cfee95bcd088e47c15eb2707f47ced9493881ec356cbeb22f66207406d23a3e3b27e70a00be7c2c755c6651f54f5378ef42bf4d1312c84d589010aab7b

This file was deleted.

This file was deleted.

4 changes: 4 additions & 0 deletions deps/checksums/mmtk_julia
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ mmtk_julia-f07d66aafc86af84ea988b35335acc9bbc770fa1.tar.gz/md5/38afb5db6d8c55413
mmtk_julia-f07d66aafc86af84ea988b35335acc9bbc770fa1.tar.gz/sha512/78525582a46a6baf8d33df7b622e55cf244439afcd7192ba55489c1bc18393d1237d2903d517c610484bf9e2a7338ad31435a9cbf70889d6bcf87c40cec829e5
mmtk_julia.v0.30.3+1.x86_64-linux-gnu.tar.gz/md5/631b204574da7062802dac501a4b711f
mmtk_julia.v0.30.3+1.x86_64-linux-gnu.tar.gz/sha512/daaed59d08fc49621479ed638dea0aac0cba123986e486571447e8e21e9a098776ce2e87fbd92ddea276782fc44621f23d40fa213296b28e1d4480553c7de4f7
mmtk_julia-c9e046baf3a0d52fe75d6c8b28f6afd69b045d95.tar.gz/md5/73a8fbea71edce30a39a30f31969dd8e
mmtk_julia-c9e046baf3a0d52fe75d6c8b28f6afd69b045d95.tar.gz/sha512/374848b7696b565dea66daa208830581f92c1fcb0138e7a7ab88564402e94bc79c54b6ed370ec68473e31e2bd411bf82c97793796c31d39aafbbfffea9c05588
mmtk_julia.v0.30.4+0.x86_64-linux-gnu.tar.gz/md5/8cdeb14fd69945f64308be49f6912f9c
mmtk_julia.v0.30.4+0.x86_64-linux-gnu.tar.gz/sha512/3692502f65dec8c0971b56b9bf8178641892b390d520cbcd69880d75b7500e6341534d87882246e68998f590f824ec54c18f4b8fb4aa09b8f313de065c48450e
6 changes: 3 additions & 3 deletions deps/mmtk_julia.version
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MMTK_JULIA_BRANCH = master
MMTK_JULIA_SHA1 = f07d66aafc86af84ea988b35335acc9bbc770fa1
MMTK_JULIA_SHA1 = c9e046baf3a0d52fe75d6c8b28f6afd69b045d95
MMTK_JULIA_GIT_URL := https://github.com/mmtk/mmtk-julia.git
MMTK_JULIA_TAR_URL = https://github.com/mmtk/mmtk-julia/archive/refs/tags/v0.30.3.tar.gz
MMTK_JULIA_JLL_VER := 0.30.3+1
MMTK_JULIA_TAR_URL = https://github.com/mmtk/mmtk-julia/archive/refs/tags/v0.30.4.tar.gz
MMTK_JULIA_JLL_VER := 0.30.4+0
MMTK_JULIA_JLL_NAME := mmtk_julia
Loading

0 comments on commit 7f6d503

Please sign in to comment.