Reagents.jl is still work-in-progress.
Negative acknowledgement (NACK)
using Reagents
using Reagents: WithNack, Return
using TestReagents.jl provides so-called negative acknowledgement (NACK) reagent Reagents.WithNack which is taken from Concurrent ML. This is useful for writing "client-server" style of code where the client can abort the request.
Concurrent ML provides composable synchronizable operations called events which are similar to Reagent. Turon (2012) discussed influences of Concurrent ML on reagents.
See also:
How it works
Let us set up a demo.
function nack_demo()Reagents.WithNack has non-trivial effect only when used inside the choice combinator which possibly blocking branches. Thus, to selectively trigger two branches in the choice combinator, we create two channels:
s1, r1 = Reagents.channel()
s2, r2 = Reagents.channel()To receive the negative acknowledgement, we craete one more channel:
send_gotnack, receive_gotnack = Reagents.channel()The first branch br1 (below) uses Reagents.WithNack. It passes the negative acknowledgement reagent nack to the user-defined function (the do block; it returns a reagent). The reagent nack blocks until this branch br1 is cancelled (i.e., another branch of | is chosen).
br1 = WithNack() do nack
@async (nack ∘ send_gotnack)(:gotnack)
return r1
endWe just use a channel endpoint for another branch:
br2 = r2These two reagents are composed with the choice combinator |:
choice = br1 | br2Returning the reagents so that they can be invoked differently for trying differnt scenarios:
return (; choice, s1, s2, receive_gotnack)
endScenario 1: nack is triggered
function test_nack_demo_1()
(; choice, s2, receive_gotnack) = nack_demo()
@sync beginLet us choose the second branch br2 which does not include WithNack:
@async s2(222)
@test choice() == 222
endSince the branch br1 with WithNack is not chosen, we get the negative acknowledgement:
@test receive_gotnack() == :gotnack
endScenario 2: nack is not triggered
function test_nack_demo_2()
(; choice, s1, receive_gotnack) = nack_demo()
@sync beginThis time, we choose the first branch br1 which includes WithNack:
@async s1(111)
@test choice() == 111
endSince we chose the WithNack's branch, nack is not triggered this time:
@test Reagents.trysync!(receive_gotnack) === nothing
endClient-server pattern
WithNack is useful for writing "client-server" pattern. As an example, we'll create an in-process "server" that issues unique IDs. That is to say, we'd like to have the following API:
function test_unique_id_provider_api()
with_unique_id_provider() do unique_id
@test unique_id() == 0
@test unique_id() == 1
end
endHere, unique_id is a reagent for communicating with a server created in with_unique_id_provider.
unique_id_provider!
Let us start from the event loop of the server. The server listens to ID requests from request_receive and a shutdown request from shutdown_receive.
function unique_id_provider!(request_receive, shutdown_receive)It keeps the current available ID as its local variable:
id = 0
while trueFirst, the server listens to both request_receive and shutdown_receive. The latter returns nothing upon shutdown request.
receive_request_or_shutdown = request_receive | shutdown_receiveWhen the shutdown_receive reagent is chosen (i.e., the reaction result is nothing), the short-circuting @something evaluates the break statement so that the server exits the loop:
(; reply, abort) = @something(receive_request_or_shutdown(), break)The client (see below) sends reply and abort channel endpoints. The server tries to send the ID with Return(id) ⨟ reply while also listening to the abort (NACK) and shutdown requests:
try_reply = (
(Return(id) ⨟ reply ⨟ Return(true)) | # try sending the id
(abort ⨟ Return(false)) | # or wait for the abort (NACK)
shutdown_receive # or wait for shutdown
)The server only increments the ID when the client received the ID.
if @something(try_reply(), break)
id += 1
end
end # while true
end # function unique_id_provider!(For an ID server, this property is probably not required. But consider, e.g., a lock server, where it is important to know that the client received the reply.)
with_unique_id_provider
The channels connecting the server and client are set up in the function below. The client API can be invoked inside the function f passed as the argument:
function with_unique_id_provider(f)
request_send, request_receive = Reagents.channel()
shutdown_send, shutdown_receive = Reagents.channel(Nothing)For each request, the client creates the channel (reply) for receiving the ID and also the negative acknowledgement reaagent abort for communicating that the request is aborted:
unique_id = WithNack() do abort
reply, receive = Reagents.channel(Int, Nothing)
request_send((; reply, abort))
return receive
endFinally, we start the server in a task and execute the client's code f:
@sync begin
@async unique_id_provider!(request_receive, shutdown_receive)
try
f(unique_id)
finally
shutdown_send()
end
end
endTesting the ID server
function test_unique_id_provider()
with_unique_id_provider() do unique_idWhen used alone, unique_id simply sends a request and wait for a reply from the ID server:
@test unique_id() == 0
@test unique_id() == 1Demonstrating the behavior of aborting the request is a bit more involved. First, we create a task that tries to send the "cancellation" request via a channel:
send, receive = Reagents.channel(Nothing)
canceller = @task send()
yield(canceller)Since we don't know when send() will be invoked, we'll try it in a loop. The variable prev keeps track of the last id issued by the server:
prev = unique_id()
while trueThen invoke unique_id and receive together. If this reaction takes choose the branch of receive ("cancellation"), it returns a nothing:
ans = (unique_id | receive)()
if ans === nothingHere, we have attempted to invoke the unique_id reagent but it was aborted by another reagent receive. Since this triggers the nack reagent abort, this reaction did not update the server's state (the variable id). So, the next call to unique_id should increment the ID only by one:
@test unique_id() == prev + 1
breakIf receive was not tirggered, we keep the id ans so that it can be used in the next iteration:
else
prev = ans::Int
end
end
end
endThis page was generated using Literate.jl.