Note

Julio.jl is still work-in-progress.

Selecting on custom events

Julio.jl is built on top of Reagents.jl, a framework for writing programs with complex nonblocking algorithms and synchronizations. In fact, Julia.select is a thin wrapper on top of the choice combinator | defined in Reagents.jl. Therefore, it is possible to define custom synchronization events using Reagents.jl API.

using Reagents
using Reagents: CAS, Computed, Return, Read, PostCommit
using Julio
using Julio: Events
using Test

Let us define a "leaky" "broadcasting" channel; i.e., "broadcasting" in the sense that a put! can be take!n by multiple tasks and "leaky" in the sense that the items will be lost if there are no receiver tasks executing take! while the sender is executing put!.

struct BroadcastChannel{T,Receivers}
    eltype::Val{T}
    lck::Julio.Lock
    receivers::Receivers
end

function BroadcastChannel{T}() where {T}
    receivers = typeof(Julio.Promise{T}())[]
    lck = Julio.Lock()
    return BroadcastChannel(Val(T), lck, receivers)
end

(We use a lock-based implementation to keep the example simple. It should also be possible to use some nonblocking algorithms.)

The receiver requests an item simply by posting a Julio.Promise:

(TODO: make it work without touching Julio.Internal)

Base.take!(bc::BroadcastChannel{T}) where {T} = Julio.Internal.apply(take!, bc)::T
function Julio.Internal.event(::typeof(take!), bc::BroadcastChannel{T}) where {T}
    (; lck, receivers) = bc
    p = Julio.Promise{T}()
    lock(lck) do
        filter!(isopen, receivers)
        push!(receivers, p)
    end
    return Reagents.WithNack() do nack
        # Cleanup in case this event is not selected:
        Reagents.dissolve(nack ⨟ PostCommit(_ -> close(p)); once = true)
        Return(nothing)
    end ⨟ Events.fetch(p)
end

An item is sent to all the receivers registered at the time put! is called:

function Base.put!(bc::BroadcastChannel{T}, x) where {T}
    (; lck, receivers) = bc
    x = convert(T, x)
    lock(lck) do
        for p in receivers
            Julio.tryput!(p, x)  # tryput! instead of put! to ignore closed promises
        end
        empty!(receivers)
    end
end

Demo:

function test_broadcastchannel(; ntasks = 4)
    bc = BroadcastChannel{Int}()
    done = Julio.Promise{Nothing}()
    Julio.withtaskgroup() do tg
        tasks = map(1:ntasks) do _
            Julio.spawn!(tg) do
                local items = Int[]
                while true
                    Julio.select(
                        (fetch, done) => Returns(true),
                        (take!, bc) => x -> begin
                            push!(items, x)
                            false
                        end,
                    ) && break
                end
                return items
            end
        end
        try
            for x in 1:2^10
                put!(bc, x)
            end
        finally
            done[] = nothing
        end
        for t in tasks
            items = fetch(t)
            @test all(>(0), diff(items))
        end
    end
end

This page was generated using Literate.jl.