Julio.jl is still work-in-progress.
Select
using Julio: Julio, Events
using TestJulio.select can be used for executing exactly one ready event from a set of events.
Selecting a queue
function test_simple_select()To demonstrate how Julio.select works, suppose that we have multiple queues and waiting for an element from them.
qin1, qout1 = Julio.queue() # input/output endpoints for the first queue
qin2, qout2 = Julio.queue() # input/output endpoints for the second queueSuppose there is an element in the second queue:
put!(qin2, 222)Then waiting on take! event of an empty queue and a nonempty queue does not block. It executes the take! event on the nonempty queue:
selected = nothing
Julio.select(
(take!, qout1) => item -> begin
selected = item
end,
(take!, qout2) => item -> begin
selected = item
end,
)
@test selected == 222endSelecting an arbitrary event
Julio.select can also be used with various events.
function test_mixed_select()It supports, for example, unbuffered channel:
send_endpoint, _ = Julio.channel()...and read/write on IO objects such as a pipe:
Julio.open(`echo "hello"`) do output...and acquiring locks:
lck = Julio.Lock()When the Base API $f has keyword arguments, you can use Julio.Events.$f to create an event. Note that ($f, args...) is equivalent to Julio.Events.$f(args...).
selected = nothing
Julio.select(
Events.readline(output; keep = true) => item -> begin
selected = item
end,
Events.lock(lck) => _ -> begin
unlock(lck)
selected = :lock
end,
(put!, send_endpoint, 1) => _ -> begin
selected = :put_1
end,
)In the above example, since the output pipe and the lock are both ready, Julio.select can select any one of them. However, since there is no other task take!ing the element from the channel cho, the put! event can not be selected.
@test selected in ("hello\n", :lock)If Events.lock(lck) was selected, the output is not consumed:
if selected === :lock
@test readline(output) == "hello"
end end
endExample: bounding search results
Suppose we need to move at least minitems items from one channel to another while filtering them using a predicate function f. Furthermore, we don't want to loose any items. That is to say, once an item is taken from the input channel, it must be put into the output channel (unless f evaluates to false). Note that the we cannot use the cancel scope (naively) due to the last requirement; i.e., it is not correct to cancel the task when it's blocked while putting the item to the output channel. While we can still use the cancel scope by surrounding post-take! code in a Julio.shield block, the folowing code demonstrates more straightforward approach based on Julio.select.
To setup cancellation specific to one event (take!), we can use explicit "cancellation token" and combine it with the original event.
using Julio: maybetake!, tryput!
function channel_filter!(f, output, input, minitems; ntasks = 4 * Threads.nthreads())
nitems = Threads.Atomic{Int}(0)
done = Julio.Promise{Nothing}() # cancellation token
Julio.withtaskgroup() do tg
for _ in 1:ntasks
Julio.spawn!(tg) do
while true
m = Julio.select(
(fetch, done) => Returns(nothing), # return nothing when done
(maybetake!, input), # return Some(x) if we took x
)
x = @something(m, break) # break if done
if f(x)
put!(output, x)
# If enough items have been sent, signal other tasks to finish.
if Threads.atomic_add!(nitems, 1) + 1 >= minitems
tryput!(done, nothing)
break
end
end
end
end
end
end
endIn the above example, we use (Julio.maybetake!, input) event instead of (take!, input) event. This is for handling the case input is closed. That is to say, m === nothing if input is closed or tryput!(done, nothing) has been executed.
function test_channel_filter()
Julio.withtaskgroup() do tg
send_endpoint1, receive_endpoint1 = Julio.channel()
send_endpoint2, receive_endpoint2 = Julio.channel()
Julio.spawn!(tg) do
try
for i in 1:15
put!(send_endpoint1, i)
end
finally
close(send_endpoint1)
end
end
Julio.spawn!(tg) do
try
channel_filter!(isodd, send_endpoint2, receive_endpoint1, 3; ntasks = 2)
finally
close(send_endpoint2)
end
end
try
out2 = collect(receive_endpoint2)
out1 = collect(receive_endpoint1)
sort!(out2)
sort!(out1)
@test length(out2) >= 3 # `channel_filter!` produced at least 3 elements
@test out2 == (1:length(out2)) .* 2 .- 1
@test out1 == out1[1]:out1[end]
finally
close(receive_endpoint1)
close(receive_endpoint2)
end
end
endThis page was generated using Literate.jl.