Skip to content

Commit

Permalink
Add timeout parameter to wait(::Condition) (JuliaLang#56974)
Browse files Browse the repository at this point in the history
We have a need for this capability. I believe this closes
JuliaLang#36217.

The implementation is straightforward and there are a couple of tests.
  • Loading branch information
kpamnany committed Jan 21, 2025
1 parent d0efc46 commit 9bd4ca2
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 2 deletions.
98 changes: 96 additions & 2 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,106 @@ restarted by an explicit call to [`schedule`](@ref) or [`yieldto`](@ref).
Often `wait` is called within a `while` loop to ensure a waited-for condition is met before
proceeding.
"""
function wait(c::GenericCondition; first::Bool=false)
function wait end

# wait with timeout
#
# The behavior of wait changes if a timeout is specified. There are
# three concurrent entities that can interact:
# 1. Task W: the task that calls wait w/timeout.
# 2. Task T: the task created to handle a timeout.
# 3. Task N: the task that notifies the Condition being waited on.
#
# Typical flow:
# - W enters the Condition's wait queue.
# - W creates T and stops running (calls wait()).
# - T, when scheduled, waits on a Timer.
# - Two common outcomes:
# - N notifies the Condition.
# - W starts running, closes the Timer, sets waiter_left and returns
# the notify'ed value.
# - The closed Timer throws an EOFError to T which simply ends.
# - The Timer expires.
# - T starts running and locks the Condition.
# - T confirms that waiter_left is unset and that W is still in the
# Condition's wait queue; it then removes W from the wait queue,
# sets dosched to true and unlocks the Condition.
# - If dosched is true, T schedules W with the special :timed_out
# value.
# - T ends.
# - W runs and returns :timed_out.
#
# Some possible interleavings:
# - N notifies the Condition but the Timer expires and T starts running
# before W:
# - W closing the expired Timer is benign.
# - T will find that W is no longer in the Condition's wait queue
# (which is protected by a lock) and will not schedule W.
# - N notifies the Condition; W runs and calls wait on the Condition
# again before the Timer expires:
# - W sets waiter_left before leaving. When T runs, it will find that
# waiter_left is set and will not schedule W.
#
# The lock on the Condition's wait queue and waiter_left together
# ensure proper synchronization and behavior of the tasks involved.

"""
wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.
If the keyword `first` is set to `true`, the waiter will be put _first_
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
If `timeout` is specified, cancel the `wait` when it expires and return
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
millisecond.
"""
function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
timeout == 0.0 || timeout 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))

ct = current_task()
_wait2(c, ct, first)
token = unlockall(c.lock)

timer::Union{Timer, Nothing} = nothing
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
if timeout > 0.0
timer = Timer(timeout)
waiter_left = Threads.Atomic{Bool}(false)
# start a task to wait on the timer
t = Task() do
try
wait(timer)
catch e
# if the timer was closed, the waiting task has been scheduled; do nothing
e isa EOFError && return
end
dosched = false
lock(c.lock)
# Confirm that the waiting task is still in the wait queue and remove it. If
# the task is not in the wait queue, it must have been notified already so we
# don't do anything here.
if !waiter_left[] && ct.queue == c.waitq
dosched = true
Base.list_deletefirst!(c.waitq, ct)
end
unlock(c.lock)
# send the waiting task a timeout
dosched && schedule(ct, :timed_out)
end
t.sticky = false
Threads._spawn_set_thrpool(t, :interactive)
schedule(t)
end

try
return wait()
res = wait()
if timer !== nothing
close(timer)
waiter_left[] = true
end
return res
catch
ct.queue === nothing || list_deletefirst!(ct.queue::IntrusiveLinkedList{Task}, ct)
rethrow()
Expand Down
17 changes: 17 additions & 0 deletions test/channels.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Random
using Base.Threads
using Base: Experimental
using Base: n_avail

Expand Down Expand Up @@ -36,6 +37,22 @@ end
@test fetch(t) == "finished"
end

@testset "timed wait on Condition" begin
a = Threads.Condition()
@test_throws ArgumentError @lock a wait(a; timeout=0.0005)
@test @lock a wait(a; timeout=0.1)==:timed_out
lock(a)
@spawn begin
@lock a notify(a)
end
@test try
wait(a; timeout=2)
true
finally
unlock(a)
end
end

@testset "various constructors" begin
c = Channel()
@test eltype(c) == Any
Expand Down

0 comments on commit 9bd4ca2

Please sign in to comment.