Julio.jl is still work-in-progress.
Selecting on custom events
Julio.jl is built on top of Reagents.jl, a framework for writing programs with complex nonblocking algorithms and synchronizations. In fact, Julia.select is a thin wrapper on top of the choice combinator | defined in Reagents.jl. Therefore, it is possible to define custom synchronization events using Reagents.jl API.
using Reagents
using Reagents: CAS, Computed, Return, Read, PostCommit
using Julio
using Julio: Events
using TestLet us define a "leaky" "broadcasting" channel; i.e., "broadcasting" in the sense that a put! can be take!n by multiple tasks and "leaky" in the sense that the items will be lost if there are no receiver tasks executing take! while the sender is executing put!.
struct BroadcastChannel{T,Receivers}
eltype::Val{T}
lck::Julio.Lock
receivers::Receivers
end
function BroadcastChannel{T}() where {T}
receivers = typeof(Julio.Promise{T}())[]
lck = Julio.Lock()
return BroadcastChannel(Val(T), lck, receivers)
end(We use a lock-based implementation to keep the example simple. It should also be possible to use some nonblocking algorithms.)
The receiver requests an item simply by posting a Julio.Promise:
(TODO: make it work without touching Julio.Internal)
Base.take!(bc::BroadcastChannel{T}) where {T} = Julio.Internal.apply(take!, bc)::T
function Julio.Internal.event(::typeof(take!), bc::BroadcastChannel{T}) where {T}
(; lck, receivers) = bc
p = Julio.Promise{T}()
lock(lck) do
filter!(isopen, receivers)
push!(receivers, p)
end
return Reagents.WithNack() do nack
# Cleanup in case this event is not selected:
Reagents.dissolve(nack ⨟ PostCommit(_ -> close(p)); once = true)
Return(nothing)
end ⨟ Events.fetch(p)
endAn item is sent to all the receivers registered at the time put! is called:
function Base.put!(bc::BroadcastChannel{T}, x) where {T}
(; lck, receivers) = bc
x = convert(T, x)
lock(lck) do
for p in receivers
Julio.tryput!(p, x) # tryput! instead of put! to ignore closed promises
end
empty!(receivers)
end
endDemo:
function test_broadcastchannel(; ntasks = 4)
bc = BroadcastChannel{Int}()
done = Julio.Promise{Nothing}()
Julio.withtaskgroup() do tg
tasks = map(1:ntasks) do _
Julio.spawn!(tg) do
local items = Int[]
while true
Julio.select(
(fetch, done) => Returns(true),
(take!, bc) => x -> begin
push!(items, x)
false
end,
) && break
end
return items
end
end
try
for x in 1:2^10
put!(bc, x)
end
finally
done[] = nothing
end
for t in tasks
items = fetch(t)
@test all(>(0), diff(items))
end
end
endThis page was generated using Literate.jl.