
Reagents.jl is still work-in-progress.

Promises and Futures

Blocking containers tutorial demonstrated how to wait on the arrival (put!) of a new element. However, put! is not the only interesting event on a concurrent data structure. By defining Promise and Future, the following code demonstrates a strategy for signaling and responding to additional events such as a call to close.

using Reagents
using Reagents: Block, CAS, Computed, Map, PostCommit, Read, Return


Let us implement Promise with the following API:

function test_promise_fetches()

We can create a promise, possibly with a specific element type:

    p = Promise{Int}()

Calling fetch(p::Promise) will wait for p to be fullfiled:

    task = @task fetch(p)
    @test !istaskdone(task)  # the task is suspended

We can set the value of the promise with the Ref-like interface:

    p[] = 111

Once the value is set, all the calls to fetch are unblocked:

    @test fetch(task) == 111  # Note: `task` is calling `fetch(p)`

fetch(p::Promise) can be called multiple times and it does not block after the value is set:

    @test fetch(p) == 111

We also implement close(::Promise), which unblock fetch but with exception.

function test_promise_close_before_fetches()

Suppose we created a promise and there is a task waiting for it:

    p = Promise{Int}()
    t = @task fetch(p)

... but the promise is closed before setting the value


Then, previously blocked fetch(::Promise) rasies an exception:

    err = try
    catch err
    @test err isa TaskFailedException
    @test occursin("promise is closed", sprint(showerror, err))

Subsequent call to fetch(::Promise) also throws an exception:

    @test_throws ErrorException("promise is closed") fetch(p)

Implementing Promise

We store the state of Promise in a single Ref by using the Union type.

struct Closed end

const PromiseRef{T} = Reagents.Ref{Union{
    Nothing,  # indicates the value is not set and the promise is not closed
    Some{T},  # indicates that the value of type `T` is set
    Closed,   # indicates that the promise is closed

The Promise type also contains a channel for sending and receiving signals on the state change:

struct Promise{T,Ref<:PromiseRef{T}}

Promise() = Promise{Any}()
function Promise{T}() where {T}
    send, receive =
    return Promise(PromiseRef{T}(nothing), send, receive)

Since setting value and closing the channel are similar, we define an internal function that tries to set p.value::Reagents.Ref if it's not already set and then, upon success, notify all the waiters:

tryputting_internal(p::Promise) =
    Computed() do x
        CAS(p.value, nothing, x)
    end ⨟ PostCommit() do _
        while Reagents.trysync!(p.send) !== nothing

Then, we can define a reagent for setting a value and a reagent for closing the promise as simple wrappers:

tryputting(p::Promise{T}) where {T} = Map(Some{T}) ⨟ tryputting_internal(p)
closing(p::Promise) = Return(Closed()) ⨟ tryputting_internal(p)

The reagent for fetching the promise needs to first listen to the putting and closing events (to avoid missing the notification) and then check if the value is set:

fetching(p::Promise{T}) where {T} =
    (p.receive ⨟ Read(p.value) ⨟ Map(something)) |
    (Read(p.value) ⨟ Map(x -> x === nothing ? Block() : something(x)))

We check the returned value of fetching outside reagent. If it is the Closed sentinel value, the exception is thrown:

function check_promise_closed(@nospecialize(value))
    if value isa Closed
        error("promise is closed")
    return value

It is now straightforward to define the API mentioned above:

Base.fetch(p::Promise) = check_promise_closed(fetching(p)())

Base.close(p::Promise) = closing(p)()
Base.isopen(p::Promise) = !(p.value[] isa Closed)

function Base.setindex!(p::Promise{T}, x) where {T}
    x = convert(T, x)
    if Reagents.trysync!(tryputting(p), x) === nothing
        error("promise already has a value")

Since we defined underlying synchronization mechanisms as reagents, we can compose them. For example, to wait for two promises to be ready, we can use the combinator &:

function test_promise_fetch_all()
    p1 = Promise{Int}()
    p2 = Promise{Int}()
    t = @task (fetching(p1) & fetching(p2))()
    p1[] = 222
    @test !istaskdone(t)
    p2[] = 333
    @test fetch(t) == (222, 333)

Or to wait for the first available promise, use |

function test_promise_fetch_any()
    p1 = Promise{Int}()
    p2 = Promise{Int}()
    t = @task (fetching(p1) | fetching(p2))()
    p1[] = 444
    @test fetch(t) == 444


Let us define a Future as a Promise and a thunk that generates the value to be stored in the Promise. That is to say, we'd like to have the following API:

function test_future_fetch_calls_thunk()
    thunk() = 111 + 222
    f = Future{Int}(thunk)
    @test fetch(f) == 333

Importantly, Future(thunk) calls thunk at most once.

function test_future_thunk_is_called_once()

To define this behavior, consider that we have a thunk that has a side-effect (which is not an intended use-case but useful for describing the behavior):

    ncalled = Ref(0)
    function thunk()
        ncalled[] += 1
        return 111 + 222
    f = Future{Int}(thunk)

The first fetch will call the thunk:

    ncalled[] = 0
    @test fetch(f) == 333
    @test ncalled[] == 1

The Subsequent call to fetch does not call the thunk and uses the value internally stored:

    ncalled[] = 0
    @test fetch(f) == 333
    @test ncalled[] == 0

Like Promise, fetching closed Future throws an exception:

function test_future_close()
    ncalled = Ref(0)
    function thunk()
        ncalled[] += 1
        return 111 + 222
    f = Future{Int}(thunk)
    @test_throws ErrorException("promise is closed") fetch(f)
    @test_throws ErrorException("promise is closed") fetch(f)

Furthermore, the thunk is not called when the future is closed before the first fetch call:

    @test ncalled[] == 0

Implementing Future

A Future{T} wraps a Promise{T} and a thunk that produces a value of type T. We also have an auxiliary state started tracking if the call to thunk is already started or not.

struct Future{T,F,Value<:Promise{T}}

Future(f) = Future{Any}(f)
Future{T}(f) where {T} = Future(f, Promise{T}(), Threads.Atomic{Bool}(false))

We also use this example to demonstrate that not all states have to be expressed through Reagents.jl API. Here, we use a simple Threads.Atomic{Bool} flag for the started state.

The core functionality of Future is the ability to run the thunk (at most) once. Let us define an internal function that assures this invariance. The following function tryrun!(f::Future) returns nothing when the thunk is already called. Otherwise, it calls the thunk and then set its value. However, if there is a call to close before this function returns (i.e., setting the value to the promise failed), it also returns nothing. If it successfully sets the value to the promise, it returns Some(nothing).

function tryrun!(f::Future{T}) where {T}
    if Threads.atomic_cas!(f.started, false, true) === false
        y = f.thunk()
        y = convert(T, y)
        # Set the value, if it hasn't been set:
        if Reagents.trysync!(tryputting(f.value), y) === nothing
            return nothing  # already closed
        # Successfully stored the value:
        return Some(y)
        # Lost the race:
        return nothing

We can then wrap this in a reagent. If the call to thunk is successfully, the computed value is returned as-is (Return(something(y))). Otherwise,

fetching(f::Future) =
    Computed() do _
        # Optimization: if already closed, not need to call the thunk:
        isopen(f.value) || return fetching(f.value)
        # If still open, try to compute the value:
        y = tryrun!(f)
        if y === nothing

The future can be closed by simply closing the underlying promise:

closing(f::Future) = closing(f.value)

Finally, we can wrap these reagents into the blocking API mentioned above:

Base.fetch(f::Future) = check_promise_closed(fetching(f)())
Base.close(f::Future) = closing(f)()

