Note

Reagents.jl is still work-in-progress.

Locks

We can quite easily implement various locks based on the Blocking containers we have implemented.

include("cancellablecontainers.jl")
using Reagents: PostCommit, dissolve

using ArgCheck: @argcheck

Simple lock

A simple (non-reentrant) lock can be created as a simple wrapper of a blocking container (e.g., a stack):

struct SimpleLock <: Base.AbstractLock
    access::typeof(Blocking(TreiberStack{Nothing}()))
end

A lock is acquired by emptying the container so that subsequent attempts to acquire the lock will block until the element is put back into the container:

acquiring(l::SimpleLock) = taking(l.access)
realeasing(l::SimpleLock) = Return(nothing) ⨟ putting(l.access)

Base.lock(l::SimpleLock) = acquiring(l)()
Base.unlock(l::SimpleLock) = realeasing(l)()

Thus, when creating a lock with an empty container, it is in the locked state. We need to unlock it (i.e., put one element in the container) before start using the lock:

function SimpleLock()
    l = SimpleLock(Blocking(TreiberStack{Nothing}()))
    unlock(l)  # start with unlocked state
    return l
end

Here's how it works:

function test_simplelock()
    l = SimpleLock()
    send, receive = Reagents.channel()
    @sync begin
        child_locked = Threads.Atomic{Bool}(false)
        child_unlocked = Threads.Atomic{Bool}(false)
        lock(l) do

While the lock is acquired, the child task cannot lock it:

            Threads.@spawn begin
                send(:child_started)
                lock(l) do
                    child_locked[] = true
                    send(:child_locked)
                end
                child_unlocked[] = true
                send(:child_unlocked)
            end
            @test receive() === :child_started
            @test !child_locked[]
        end  # lock(l) do

After unlocking the lock in the parent task, the child task can acquire the lock:

        @test receive() === :child_locked
        @test child_locked[]
        @test receive() === :child_unlocked
        @test child_unlocked[]
    end
end

Locking with a timeout

Since SimpleLock exposes the reagent API, it can be composed with other reagents. For example, it is straightforward to add timeout to the lock:

function timeout(seconds::Real)
    send, receive = Reagents.channel(Nothing)
    @async begin
        sleep(seconds)
        send(nothing)
    end
    return receive
end

function try_with_timeout(reagent, seconds::Real)
    reagent = (reagent ⨟ Map(Some)) | timeout(seconds)
    return reagent()
end

Calling try_with_timeout(reagent, seconds) execute reagent with the timeout seconds. It returns nothing on timeout. If reagent completes its reaction with the output value, try_with_timeout returns Some(value). It can be used with arbitrary reagent, including acquiring(::SimpleLock):

function test_simplelock_timeout()
    l = SimpleLock()

If the lock is not acquired already, adding timeout does nothing:

    a1 = try_with_timeout(acquiring(l), 0.1)
    @test a1 isa Some  # successfully acquire

If the lock is already acquired, try_with_timeout will fail after the timeout:

    a2 = fetch(@async try_with_timeout(acquiring(l), 0.1))
    @test a2 === nothing  # failed to acquire
end

Trying to acquire multiple locks

SimpleLock can also be composed with itself. For example, we can use the choice combinator | to acquire an available lock:

function test_simplelock_multiple()
    l1 = SimpleLock()
    l2 = SimpleLock()

Let us lock l1 first, so that subsequent lock cannot acquire it:

    lock(l1) do
        local ans
        @sync begin
            Threads.@spawn begin

Since we need to change the action depending on which lock is acquired (importantly, which one to unlock), we use Reagents.Return to associate different returned value for each branch of the | combinator:

                ans = (
                    (acquiring(l1) ⨟ Return(1)) |  # try lock l1; will fail
                    (acquiring(l2) ⨟ Return(2))    # try lock l2; will succeeds
                )()

Since l1 is already acquired, we should have ans == 2 here (checked below). But first, let's unlock the corresponding lock:

                if ans == 1  # unreachable, but demonstrating the generic usage
                    unlock(l1)
                elseif ans == 2
                    unlock(l2)
                end
            end
        end

As mentioned above, we expect that l2 was acquired in the child task:

        @test ans == 2
    end
end

Remembering which lock to unlock is rather cumbersome. Let us wrap it in an interface that can be used with the do-block syntax:

function lockany(f, pairs...)
    acquired, value = mapfoldl(lv -> acquiring(first(lv)) ⨟ Return(lv), |, pairs)()
    try
        f(value)
    finally
        unlock(acquired)
    end
end

The above code can now be expressed more succinctly:

function test_simplelock_lockany()
    l1 = SimpleLock()
    l2 = SimpleLock()
    local ans
    lock(l1) do
        @sync begin
            Threads.@spawn begin
                lockany(l1 => 1, l2 => 2) do x
                    ans = x
                end
            end
        end
    end
    @test ans == 2
end

Note: SimpleLock and SimpleSemaphore are inspired by Turon & Russo (2011).

Semaphore

SimpleLock can be extended to a semaphore by just initially fillying more than one elements:

struct SimpleSemaphore
    accesses::typeof(Blocking(TreiberStack{Nothing}()))
end

function SimpleSemaphore(n::Integer)
    @argcheck n > 0
    accesses = Blocking(TreiberStack{Nothing}())
    for _ in 1:n
        put!(accesses, nothing)
    end
    return SimpleSemaphore(accesses)
end

acquiring(l::SimpleSemaphore) = taking(l.accesses)
realeasing(l::SimpleSemaphore) = Return(nothing) ⨟ putting(l.accesses)

Base.acquire(l::SimpleSemaphore) = acquiring(l)()
Base.release(l::SimpleSemaphore) = realeasing(l)()

Unlike SimpleLock, we can acquire SimpleSemaphore(n) n times before blocked:

function test_simplesemaphore()
    sem = SimpleSemaphore(2)
    Base.acquire(sem)
    Base.acquire(sem)
    t = @task Base.acquire(sem)
    yield(t)
    @test !istaskdone(t)
    Base.release(sem)
    wait(t)
end

Reader-writer lock

This example is from Turon & Russo (2011) “Scalable Join Patterns.” As mentioned in Turon (2012), the join pattern can be expressed with catalysts.

Their reader-writer lock is acquired and released by sending messages to a channel. Let us define a simple wrapper type to express this:

struct ChLock <: Base.AbstractLock
    acq::typeof(Reagents.channel(Nothing)[1])
    rel::typeof(Reagents.channel(Nothing)[1])
end

acquiring(l::ChLock) = l.acq
realeasing(l::ChLock) = l.rel

Base.lock(l::ChLock) = acquiring(l)()
Base.unlock(l::ChLock) = realeasing(l)()

To create two kinds of locks, we create 2 * 2 = 4 channels. The state of the lock is mantained by two blocking data structures (Note: We only need to store at most one element. So, a stack is an overkill. But that's the most cheap data structure we have implemented so far in the tutorial):

function reader_writer_lock()
    idle = Blocking(TreiberStack{Nothing}())
    shared = Blocking(TreiberStack{Int}())
    acqr = Reagents.channel(Nothing)
    acqw = Reagents.channel(Nothing)
    relr = Reagents.channel(Nothing)
    relw = Reagents.channel(Nothing)
    dissolve(acqr[2] ⨟ taking(idle) ⨟ PostCommit(_ -> put!(shared, 1)))
    dissolve(acqr[2] ⨟ taking(shared) ⨟ PostCommit(n -> put!(shared, n + 1)))
    dissolve(relr[2] ⨟ taking(shared) ⨟ PostCommit() do n
        if n == 1
            put!(idle, nothing)
        else
            put!(shared, n - 1)
        end
    end)
    dissolve(acqw[2] ⨟ taking(idle))
    dissolve(relw[2] ⨟ PostCommit(_ -> put!(idle, nothing)))
    put!(idle, nothing)
    return ChLock(acqr[1], relr[1]), ChLock(acqw[1], relw[1])
end

Observe that how the states of the lock are implemented:

  • When the reader-writer lock is not acquired, idle has a single element. shared is empty.
  • When at least one of the reader (shared) lock is acquired, the number of acquired locks are stored in the shared container. The idle container is empty.
  • When the writer (exclusive) lock is acquired, both the shared and idle container is empty.

As discussed in catalysts, Reagents.dissolve is used for expressing the rules that expressing allowed transitions between these states.

Here's how it works:

function test_reader_writer_lock()
    s1, r1 = Reagents.channel()
    s2, r2 = Reagents.channel()
    rlock, wlock = reader_writer_lock()
    @sync begin

Reader lock can be acquired multiple times:

        Threads.@spawn begin
            lock(rlock) do
                s1(1)
                s2(:done)
            end
        end
        Threads.@spawn begin
            lock(rlock) do
                s1(2)
                s2(:done)
            end
        end
        @test sort!([r1(), r1()]) == [1, 2]

While the reader lock is aquired, the writer lock cannot be acquired:

        wlocked = Threads.Atomic{Bool}(false)
        Threads.@spawn begin
            lock(wlock) do
                wlocked[] = true
                s1(3)
                s2(:done)
            end
        end
        for _ in 1:3
            sleep(0.1)
            @test !wlocked[]
        end
        @test r2() === r2() === :done  # releaseing `rlock`
        @test r1() == 3
        @test wlocked[]

While the writer lock is aquired, the reader lock cannot be acquired:

        r4locked = Threads.Atomic{Bool}(false)
        r5locked = Threads.Atomic{Bool}(false)
        Threads.@spawn begin
            lock(rlock) do
                r4locked[] = true
                s1(4)
                s2(:done)
            end
        end
        Threads.@spawn begin
            lock(rlock) do
                r5locked[] = true
                s1(5)
                s2(:done)
            end
        end
        for _ in 1:3
            sleep(0.1)
            @test r4locked[] == r5locked[] == false
        end
        @test r2() === :done  # releaseing `wlock`
        @test sort!([r1(), r1()]) == [4, 5]
        @test r4locked[] == r5locked[] == true
        @test r2() === r2() === :done  # releaseing `rlock`
    end
end

References

  • Turon, Aaron. “Reagents: Expressing and Composing Fine-Grained Concurrency.” In Proceedings of the 33rd ACM SIGPLAN Conference on Programming Language Design and Implementation, 157–168. PLDI ’12. New York, NY, USA: Association for Computing Machinery, 2012. https://doi.org/10.1145/2254064.2254084.

  • Turon, Aaron J., and Claudio V. Russo. “Scalable Join Patterns.” In Proceedings of the 2011 ACM International Conference on Object Oriented Programming Systems Languages and Applications, 575–594. OOPSLA ’11. New York, NY, USA: Association for Computing Machinery, 2011. https://doi.org/10.1145/2048066.2048111.


This page was generated using Literate.jl.