Reagents.jl is still work-in-progress.
How to create a cancellable blocking API
using Reagents: Block, CAS, Computed, Map, Read, Reagents, Return
As demonstrated in the examples Treiber stack and Michael and Scott queue, reagents can be used for defining nonblocking data structures. However, reagents can also be used for constructing complex synchronization APIs.
Blocking containers
When a value is not available in a nonblocking container, it is very useful to wait (block) until the value is available (as in Base.Channel
). Using Reagents.channel
(which we use here like an unbuffered Base.Channel
), we can mechanically transform a nonblocking concurrent data container to a waitable data structure.
To this end, let us define a simple wrapper type that wraps an underlying nonblocking data collection (.data
) and the channel (.send
and .receive
):
struct Blocking{T,Data,Send,Receive}
eltype::Val{T}
data::Data # holds value of type T
send::Send # swaps value::T -> nothing
receive::Receive # swaps nothing -> value::T
end
function Blocking(data)
send, receive = Reagents.channel(eltype(data), Nothing)
return Blocking(Val(eltype(data)), data, send, receive)
end
Base.eltype(::Type{<:Blocking{T}}) where {T} = T
The idea is to try sending or receiving the item via the channel and "then" try to manipulate the data collection. We can do this atomically by using the choice reagent |
. Note that the order of this operation is important. For example, when invoking putting(b)
, it tries b.send
first and this "attempt" is still active even during putting(b.data)
. Since this attempt to b.send
is atomically withdrawn when committing the reaction, the item is added to the data collection if and only if there is no other tasks invoking taking(b)
.
putting(b::Blocking) = b.send | putting(b.data)
taking(b::Blocking) = b.receive | taking(b.data)
Base.put!(b::Blocking, x) = putting(b)(convert(eltype(b), x))
Base.take!(b::Blocking) = taking(b)()
This Blocking
wrapper can be used to extend existing nonblocking data structures such as Treiber stack and Michael and Scott queue that we have already defined.
include("treiberstack.jl")
include("msqueue.jl")
To this end, we need to transform trypopping
and trypoppingfirst
to a reagent that blocks when the item is not ready. This can be done by a simple helper reagent that blocks when the input is nothing
:
blocknothing() = Map(x -> x === nothing ? Block() : something(x))
Then, it is straightforward to define the taking
and putting
functions required by the Blocking
wrapper:
putting(c::TreiberStack) = pushing(c)
taking(c::TreiberStack) = trypopping(c) ⨟ blocknothing()
putting(c::MSQueue) = pushing(c)
taking(c::MSQueue) = trypoppingfirst(c) ⨟ blocknothing()
Test blocking containers
using Test
function test_put_take_queue()
When there are enough items in the data container, Blocking(MSQueue{Int}())
behaves like MSQueue{Int}()
:
items = Blocking(MSQueue{Int}())
put!(items, 111)
put!(items, 222)
@test take!(items) == 111
@test take!(items) == 222
However, when take!
is invoked on an empty collection (which is enforced by the "unfair scheduling" yield(t)
), it blocks until the corresponding put!
is invoked:
t = @task take!(items)
yield(t)
put!(items, 333)
@test fetch(t) === 333
end
It works with TreiberStack
, too:
function test_put_take_stack()
items = Blocking(TreiberStack{Int}())
put!(items, 111)
put!(items, 222)
@test take!(items) == 222
@test take!(items) == 111
t = @task take!(items)
yield(t)
put!(items, 333)
@test fetch(t) === 333
end
Generic cancellable operations
It is often useful to cancel a blocking operation safely. This can be expressed quite naturally using reagents — we'll create a CancellationToken which can be composed with any blocking operation. The pattern is similar to the Go idiom of listening to a Done
channel as one of the blocking operations in a select
statement.
First, let us define a singleton sentinel value for indicating a given reaction is cancelled:
struct Cancelled end
To illustrate the idea, let us again use a Blocking(MSQueue{Int}())
:
function test_cancellation_idea()
items = Blocking(MSQueue{Int}())
We use an additional channel for sending the cancellation signal:
send, receive = Reagents.channel(Cancelled, Nothing)
The idea is to "listen to" the cancellation signal and then try to invoke a blocking reaction. If there is no cancellation signal, it behaves like the reagent without the cancellation:
t = @task (receive | taking(items))()
yield(t)
put!(items, 111)
@test fetch(t) == 111
If the cancellation signal is fired before the corresponding put!
, the result of the reaction is the sentinel Cancelled()
.
t = @task (receive | taking(items))()
yield(t)
send(Cancelled())
@test fetch(t) isa Cancelled
end
Note that the above idea is still hard to use directly, since send(Cancelled())
only triggers the reactions that are happening simultaneously. We can introduce a Reagents.Ref{Bool}
to make the cancellation permanent.
Let us wrap this idea in a single object:
struct CancellationToken
iscancelled::typeof(Reagents.Ref{Bool}())
send::typeof(Reagents.channel(Cancelled, Nothing)[1])
receive::typeof(Reagents.channel(Cancelled, Nothing)[2])
end
function CancellationToken()
iscancelled = Reagents.Ref{Bool}(false)
send, receive = Reagents.channel(Cancelled, Nothing)
return CancellationToken(iscancelled, send, receive)
end
We can then transform an arbitrary reagent to a reagent that can be cancelled via a "signal" through CancellationToken
(defined in cancel!
below). The resulting reagent is the composition of three components: listener
, checker
, and the original reagent
:
function cancellable(reagent::Reagents.Reagent, token::CancellationToken)
listener = Return(nothing) ⨟ token.receive
checker = Read(token.iscancelled) ⨟ Map(x -> x ? Cancelled() : Block())
return listener | checker | reagent
end
The listener
reagent is essentially equivalent to the idea demonstrated above. It is prefixed with the Return(nothing)
reagent to make sure we always invoke the token.receive
swap point with the valid input nothing
.
The checker
reagent checks token.iscancelled
; if it is already true
, it ends the reaction with the value Cancelled()
. Otherwise, it indicates that the next reagent should be tried by returning the Block
failure value.
Finally, if both listener
and checker
are blocked, the original reagent
is invoked. When this reagent
is blocked, the first reagent between listener
and reagent
that is awaken determines the result value of this reaction.
We can then use cancellable
combinator to define a cancellable_take!
function:
cancellable_take!(b::Blocking, token::CancellationToken) = cancellable(taking(b), token)()
To cancel!
a token
, we first set iscancelled[]
. This way, all future cancellable_take!
returns Cancelled
due to the checker
reagent defined above. We then clear out any existing peers listening to the token.receive
swap endpoint.
function cancel!(token::CancellationToken)
token.iscancelled[] = true
while Reagents.trysync!(token.send, Cancelled()) !== nothing
end
end
Test generic cancellable operations
function test_cancellation_token()
items = Blocking(MSQueue{Int}())
token = CancellationToken()
Before cancellation, cancellable_take!
works like normal take!
:
t = @task cancellable_take!(items, token)
yield(t)
put!(items, 111)
@test fetch(t) == 111
Calling cancel!(token)
cancels all cancellable_take!(items, token)
calls that are already happening (waiting for an item) and also the calls happening after the cancellation.
t = @task cancellable_take!(items, token)
yield(t)
cancel!(token)
@test fetch(t) isa Cancelled
@test cancellable_take!(items, token) isa Cancelled
Note that the cancellation mechanism is introduced outside the Blocking
container. It is different from, e.g., cancelling put!(::Base.Channel)
via closing the Base.Channel
. Thus, the container itself still works:
put!(items, 222)
@test take!(items) == 222
end
This page was generated using Literate.jl.