Julio.jl is still work-in-progress.
Select
using Julio: Julio, Events
using Test
Julio.select
can be used for executing exactly one ready event from a set of events.
Selecting a queue
function test_simple_select()
To demonstrate how Julio.select
works, suppose that we have multiple queues and waiting for an element from them.
qin1, qout1 = Julio.queue() # input/output endpoints for the first queue
qin2, qout2 = Julio.queue() # input/output endpoints for the second queue
Suppose there is an element in the second queue:
put!(qin2, 222)
Then waiting on take!
event of an empty queue and a nonempty queue does not block. It executes the take!
event on the nonempty queue:
selected = nothing
Julio.select(
(take!, qout1) => item -> begin
selected = item
end,
(take!, qout2) => item -> begin
selected = item
end,
)
@test selected == 222
end
Selecting an arbitrary event
Julio.select
can also be used with various events.
function test_mixed_select()
It supports, for example, unbuffered channel:
send_endpoint, _ = Julio.channel()
...and read/write on IO
objects such as a pipe:
Julio.open(`echo "hello"`) do output
...and acquiring locks:
lck = Julio.Lock()
When the Base
API $f
has keyword arguments, you can use Julio.Events.$f
to create an event. Note that ($f, args...)
is equivalent to Julio.Events.$f(args...)
.
selected = nothing
Julio.select(
Events.readline(output; keep = true) => item -> begin
selected = item
end,
Events.lock(lck) => _ -> begin
unlock(lck)
selected = :lock
end,
(put!, send_endpoint, 1) => _ -> begin
selected = :put_1
end,
)
In the above example, since the output
pipe and the lock
are both ready, Julio.select
can select any one of them. However, since there is no other task take!
ing the element from the channel cho
, the put!
event can not be selected.
@test selected in ("hello\n", :lock)
If Events.lock(lck)
was selected, the output
is not consumed:
if selected === :lock
@test readline(output) == "hello"
end
end
end
Example: bounding search results
Suppose we need to move at least minitems
items from one channel to another while filtering them using a predicate function f
. Furthermore, we don't want to loose any items. That is to say, once an item is taken from the input channel, it must be put into the output channel (unless f
evaluates to false). Note that the we cannot use the cancel scope (naively) due to the last requirement; i.e., it is not correct to cancel the task when it's blocked while putting the item to the output channel. While we can still use the cancel scope by surrounding post-take!
code in a Julio.shield
block, the folowing code demonstrates more straightforward approach based on Julio.select
.
To setup cancellation specific to one event (take!
), we can use explicit "cancellation token" and combine it with the original event.
using Julio: maybetake!, tryput!
function channel_filter!(f, output, input, minitems; ntasks = 4 * Threads.nthreads())
nitems = Threads.Atomic{Int}(0)
done = Julio.Promise{Nothing}() # cancellation token
Julio.withtaskgroup() do tg
for _ in 1:ntasks
Julio.spawn!(tg) do
while true
m = Julio.select(
(fetch, done) => Returns(nothing), # return nothing when done
(maybetake!, input), # return Some(x) if we took x
)
x = @something(m, break) # break if done
if f(x)
put!(output, x)
# If enough items have been sent, signal other tasks to finish.
if Threads.atomic_add!(nitems, 1) + 1 >= minitems
tryput!(done, nothing)
break
end
end
end
end
end
end
end
In the above example, we use (Julio.maybetake!, input)
event instead of (take!, input)
event. This is for handling the case input
is closed. That is to say, m === nothing
if input
is closed or tryput!(done, nothing)
has been executed.
function test_channel_filter()
Julio.withtaskgroup() do tg
send_endpoint1, receive_endpoint1 = Julio.channel()
send_endpoint2, receive_endpoint2 = Julio.channel()
Julio.spawn!(tg) do
try
for i in 1:15
put!(send_endpoint1, i)
end
finally
close(send_endpoint1)
end
end
Julio.spawn!(tg) do
try
channel_filter!(isodd, send_endpoint2, receive_endpoint1, 3; ntasks = 2)
finally
close(send_endpoint2)
end
end
try
out2 = collect(receive_endpoint2)
out1 = collect(receive_endpoint1)
sort!(out2)
sort!(out1)
@test length(out2) >= 3 # `channel_filter!` produced at least 3 elements
@test out2 == (1:length(out2)) .* 2 .- 1
@test out1 == out1[1]:out1[end]
finally
close(receive_endpoint1)
close(receive_endpoint2)
end
end
end
This page was generated using Literate.jl.