Note

Reagents.jl is still work-in-progress.

Negative acknowledgement (NACK)

using Reagents
using Reagents: WithNack, Return
using Test

Reagents.jl provides so-called negative acknowledgement (NACK) reagent Reagents.WithNack which is taken from Concurrent ML. This is useful for writing "client-server" style of code where the client can abort the request.

Note

Concurrent ML provides composable synchronizable operations called events which are similar to Reagent. Turon (2012) discussed influences of Concurrent ML on reagents.

See also:

How it works

Let us set up a demo.

function nack_demo()

Reagents.WithNack has non-trivial effect only when used inside the choice combinator which possibly blocking branches. Thus, to selectively trigger two branches in the choice combinator, we create two channels:

    s1, r1 = Reagents.channel()
    s2, r2 = Reagents.channel()

To receive the negative acknowledgement, we craete one more channel:

    send_gotnack, receive_gotnack = Reagents.channel()

The first branch br1 (below) uses Reagents.WithNack. It passes the negative acknowledgement reagent nack to the user-defined function (the do block; it returns a reagent). The reagent nack blocks until this branch br1 is cancelled (i.e., another branch of | is chosen).

    br1 = WithNack() do nack
        @async (nack ∘ send_gotnack)(:gotnack)
        return r1
    end

We just use a channel endpoint for another branch:

    br2 = r2

These two reagents are composed with the choice combinator |:

    choice = br1 | br2

Returning the reagents so that they can be invoked differently for trying differnt scenarios:

    return (; choice, s1, s2, receive_gotnack)
end

Scenario 1: nack is triggered

function test_nack_demo_1()
    (; choice, s2, receive_gotnack) = nack_demo()
    @sync begin

Let us choose the second branch br2 which does not include WithNack:

        @async s2(222)
        @test choice() == 222
    end

Since the branch br1 with WithNack is not chosen, we get the negative acknowledgement:

    @test receive_gotnack() == :gotnack
end

Scenario 2: nack is not triggered

function test_nack_demo_2()
    (; choice, s1, receive_gotnack) = nack_demo()
    @sync begin

This time, we choose the first branch br1 which includes WithNack:

        @async s1(111)
        @test choice() == 111
    end

Since we chose the WithNack's branch, nack is not triggered this time:

    @test Reagents.trysync!(receive_gotnack) === nothing
end

Client-server pattern

WithNack is useful for writing "client-server" pattern. As an example, we'll create an in-process "server" that issues unique IDs. That is to say, we'd like to have the following API:

function test_unique_id_provider_api()
    with_unique_id_provider() do unique_id
        @test unique_id() == 0
        @test unique_id() == 1
    end
end

Here, unique_id is a reagent for communicating with a server created in with_unique_id_provider.

unique_id_provider!

Let us start from the event loop of the server. The server listens to ID requests from request_receive and a shutdown request from shutdown_receive.

function unique_id_provider!(request_receive, shutdown_receive)

It keeps the current available ID as its local variable:

    id = 0
    while true

First, the server listens to both request_receive and shutdown_receive. The latter returns nothing upon shutdown request.

        receive_request_or_shutdown = request_receive | shutdown_receive

When the shutdown_receive reagent is chosen (i.e., the reaction result is nothing), the short-circuting @something evaluates the break statement so that the server exits the loop:

        (; reply, abort) = @something(receive_request_or_shutdown(), break)

The client (see below) sends reply and abort channel endpoints. The server tries to send the ID with Return(id) ⨟ reply while also listening to the abort (NACK) and shutdown requests:

        try_reply = (
            (Return(id) ⨟ reply ⨟ Return(true)) |  # try sending the id
            (abort ⨟ Return(false)) |              # or wait for the abort (NACK)
            shutdown_receive                       # or wait for shutdown
        )

The server only increments the ID when the client received the ID.

        if @something(try_reply(), break)
            id += 1
        end
    end  # while true
end  # function unique_id_provider!

(For an ID server, this property is probably not required. But consider, e.g., a lock server, where it is important to know that the client received the reply.)

with_unique_id_provider

The channels connecting the server and client are set up in the function below. The client API can be invoked inside the function f passed as the argument:

function with_unique_id_provider(f)
    request_send, request_receive = Reagents.channel()
    shutdown_send, shutdown_receive = Reagents.channel(Nothing)

For each request, the client creates the channel (reply) for receiving the ID and also the negative acknowledgement reaagent abort for communicating that the request is aborted:

    unique_id = WithNack() do abort
        reply, receive = Reagents.channel(Int, Nothing)
        request_send((; reply, abort))
        return receive
    end

Finally, we start the server in a task and execute the client's code f:

    @sync begin
        @async unique_id_provider!(request_receive, shutdown_receive)
        try
            f(unique_id)
        finally
            shutdown_send()
        end
    end
end

Testing the ID server

function test_unique_id_provider()
    with_unique_id_provider() do unique_id

When used alone, unique_id simply sends a request and wait for a reply from the ID server:

        @test unique_id() == 0
        @test unique_id() == 1

Demonstrating the behavior of aborting the request is a bit more involved. First, we create a task that tries to send the "cancellation" request via a channel:

        send, receive = Reagents.channel(Nothing)
        canceller = @task send()
        yield(canceller)

Since we don't know when send() will be invoked, we'll try it in a loop. The variable prev keeps track of the last id issued by the server:

        prev = unique_id()
        while true

Then invoke unique_id and receive together. If this reaction takes choose the branch of receive ("cancellation"), it returns a nothing:

            ans = (unique_id | receive)()
            if ans === nothing

Here, we have attempted to invoke the unique_id reagent but it was aborted by another reagent receive. Since this triggers the nack reagent abort, this reaction did not update the server's state (the variable id). So, the next call to unique_id should increment the ID only by one:

                @test unique_id() == prev + 1
                break

If receive was not tirggered, we keep the id ans so that it can be used in the next iteration:

            else
                prev = ans::Int
            end
        end
    end
end

This page was generated using Literate.jl.