ConcurrentUtils.jl
Promise
API | Summary |
---|---|
Promise | Create a promise of type T ; i.e., a memory location that holds a value of type T that can be set once and retrieved asynchronously. |
try_race_fetch | Try to retrieve a value if it is already set. Return Ok(value) on success and Err(NotSetError()) on failure. |
try_race_put_with! | Fetch an existing value or set a computed value (computed = thunk() ). The thunk is called at most once for each instance of promise . |
race_put_with! | Fetch an existing value or set value = thunk() . The thunk is called at most once for each instance of promise . |
try_race_put! | Try to set a value in the promise . |
ConcurrentUtils.Promise
— TypePromise{T}()
Promise()
Create a promise of type T
; i.e., a memory location that holds a value of type T
that can be set once and retrieved asynchronously.
Extended help
For an explanation on the concept, see: Futures and promises - Wikipedia.
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_fetch(p)
Try.Err: NotSetError()
julia> put!(p, 123);
julia> fetch(p)
123
julia> try_race_put!(p, 456)
Try.Err: 123
Memory ordering
An event that retrieves or waits for a value from a promise
establishes a happened-before edge from the event that have set the value to the promise
. Invocations of the API that includes an event that sets a value to a promise
include:
put!(promise, value)
that does not throw.try_race_put!(promise, value)
that returns anOk
result.try_race_put_with!(thunk, promise)
that callsthunk
(and hence returns anOk
).race_put_with!(thunk, promise)
that callsthunk
.
(The invocation thunk()
is sequenced-before the event that sets the value.)
Invocations of the API that includes an event that retrieves or waits for a value from a promise
include:
fetch(promise)
wait(promise)
try_race_fetch(promise)
that returns anOk
result.try_race_put_with!(thunk, promise)
that does not callthunk
(and hence returns anErr
)race_put_with!(thunk, promise)
that does not callthunk
Supported operations
A promise::Promise{T}
supports the following operations:
put!(promise, value)
: Set avalue
; throws an error if some value has already been set.try_race_put!(promise, value)
: Try to set avalue
.fetch(promise)
: Obtain a value that thepromise
holds. Wait if necessary.wait(promise)
: Wait for a value to be set.try_race_fetch(promise)
: Try to obtain an existing value in thepromise
.try_race_put_with!(thunk, promise)
: Try to get a value or set a value generated bythunk
.race_put_with!(thunk, promise)
: Similar totry_race_put_with!
but the returned value does not indicate ifthunk
is called or not.
ConcurrentUtils.try_race_fetch
— Functiontry_race_fetch(promiselike) -> Ok(value::T) or Err(NotSetError())
Try to retrieve a value
if it is already set. Return Ok(value)
on success and Err(NotSetError())
on failure.
try_race_fetch
can be called on a Promise
or a tasklet (@tasklet
).
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_fetch(p)
Try.Err: NotSetError()
julia> put!(p, 123);
julia> try_race_fetch(p)
Try.Ok: 123
ConcurrentUtils.try_race_put_with!
— Functiontry_race_put_with!(thunk, promise::Promise{T}) -> Ok(computed::T) or Err(existing::T)
Fetch an existing
value or set a computed
value (computed = thunk()
). The thunk
is called at most once for each instance of promise
.
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_put_with!(p) do
123 + 456
end
Try.Ok: 579
julia> try_race_put_with!(p) do
42
end
Try.Err: 579
ConcurrentUtils.race_put_with!
— Functionrace_put_with!(thunk, promise::Promise{T}) -> value::T
Fetch an existing value
or set value = thunk()
. The thunk
is called at most once for each instance of promise
.
This is similar to try_race_put_with!
but the caller cannot tell if thunk
is called or not by the return type.
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> race_put_with!(p) do
println("called")
123 + 456
end
called
579
julia> race_put_with!(p) do
println("called")
42
end
579
ConcurrentUtils.try_race_put!
— Functiontry_race_put!(promise::Promise{T}, value) -> Ok(value′::T) or Err(existing::T)
Try to set a value
in the promise
.
Since the value
is converted to T
first, the returned value′
may not be identical to the input value
.
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_put!(p, 123)
Try.Ok: 123
julia> try_race_put!(p, 456)
Try.Err: 123
Promise-like interfaces
API | Summary |
---|---|
@tasklet | Create an object that is a memoized version of () -> code but also acts like a Promise that is not settable. |
Once | A concurrent object for lazily initializing an object of type T . |
ConcurrentUtils.@tasklet
— Macro@tasklet code
Create an object that is a memoized version of () -> code
but also acts like a Promise
that is not settable.
A t = @tasklet code
supports: t()
, fetch(t)
, wait(t)
, and try_race_fetch
.
Extended help
Examples
julia> using ConcurrentUtils
julia> t = @tasklet begin
println("called")
123
end;
julia> try_race_fetch(t)
Try.Err: NotSetError()
julia> t()
called
123
julia> t()
123
julia> fetch(t)
123
julia> wait(t);
Memory ordering
An event that retrieves or waits for a value from a tasklet t
establishes a happened-before edge from the events in code
. Invocations of the API that includes an event that retrieves or waits for a value from a tasklet t
include:
fetch(t)
wait(t)
try_race_fetch(t)
that returns anOk
result.
Supported operations
A tasklet t = @tasklet code
supports the following operations:
t()
: Evaluatecode
if it hasn't been evaluated. Otherwise, equivalent tofetch
.fetch(t)
: Wait for other tasks to invoket()
and then return the result.wait(t)
: Wait for other tasks to invoket()
.try_race_fetch
: Try to retrieve the result oft()
if it is already called.
ConcurrentUtils.Once
— TypeOnce{T}(f = T)
Once(f)
A concurrent object for lazily initializing an object of type T
.
Given O = Once{T}(f)
, invoking O[]
evaluates v = f()
if f
has not been called via O[]
and return the value v
. Otherwise, O[]
returns the value v
returned from the first invocation of O[]
.
Examples
julia> using ConcurrentUtils
julia> O = Once{Vector{Int}}(() -> zeros(Int, 3));
julia> v = O[]
3-element Vector{Int64}:
0
0
0
julia> v === O[]
true
Extended help
When used as in Once{T}(f)
, the function f
must always return a value of type T
. As such, T() isa T
must hold for type T
used as in Once{T}()
.
When used as in Once(f)
, the function f
must always return a value of concrete consistent type. If Once
object is used as a global constant in a package, the type of the value returned from f
must not change for different julia
processes for each stack of Julia environment. Currently, Once(f)
also directly invokes f()
to compute the result type but this value is thrown away. This is because Once(f)
is assumed to be called at the top-level of a package for lazily initializing a global state and serializing the computed value in the precompile cache is not desired.
Known limitation: If O[]
is evaluated for a global O::Once{T}
during precompilation, the resulting value is serialized into the precompile cache.
Read-write Lock
API | Summary |
---|---|
ReadWriteLock | Create a read-write lock. |
lock_read | lock_read(rwlock) takes reader (shared) lock. It must be released with unlock_read . |
unlock_read | Unlock the reader lock taken with a preceding invocation of lock_read . |
trylock_read | Try to take reader lock. Return true on success. |
ConcurrentUtils.ReadWriteLock
— TypeReadWriteLock()
Create a read-write lock.
Use lock_read
and unlock_read
for taking reader (shared) lock. Use lock
and unlock
for taking writer (exclusive) lock.
See also: ReadWriteGuard
Extended help
Examples
julia> using ConcurrentUtils
julia> rwlock = ReadWriteLock();
julia> lock(rwlock) do
# "mutate" data
end;
julia> lock_read(rwlock) do
# "read" data
end;
Supported operations
lock_read
trylock_read
(not very efficient but lock-free)unlock_read
lock
trylock
unlock
ConcurrentUtils.lock_read
— Functionlock_read(rwlock)
lock_read(f, rwlock)
lock_read(rwlock)
takes reader (shared) lock. It must be released with unlock_read
.
The second method lock_read(f, rwlock)
execute the function f
without any arguments after taking the reader lock and release it before returning. lock_read(f, rwlock)
returns the result of f()
.
See also: ReadWriteLock
ConcurrentUtils.unlock_read
— Functionunlock_read(rwlock)
Unlock the reader lock taken with a preceding invocation of lock_read
.
See also: ReadWriteLock
ConcurrentUtils.trylock_read
— Functiontrylock_read(rwlock) -> acquired::Bool
Try to take reader lock. Return true
on success.
This function is lock-free but may not be efficient.
See also: ReadWriteLock
Guards
API | Summary |
---|---|
Guard | Guard mutable data . Use guarding to obtain exclusive access to data . |
ReadWriteGuard | Guard mutable data . Use guarding and guarding_read to obtain exclusive ("write") and shared ("read") access to data . |
guarding | Apply f! to the data wrapped in guard while obtaining exclusive access. |
guarding_read | Apply f to the data wrapped in guard while obtaining shared access. |
ConcurrentUtils.Guard
— TypeGuard(data)
Guard mutable data
. Use guarding
to obtain exclusive access to data
.
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = Guard(Ref(0));
julia> guarding(guard) do ref
ref[] += 1
end
1
ConcurrentUtils.ReadWriteGuard
— TypeReadWriteGuard(data)
Guard mutable data
. Use guarding
and guarding_read
to obtain exclusive ("write") and shared ("read") access to data
.
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = ReadWriteGuard(Ref(0));
julia> guarding(guard) do ref
ref[] += 1
end
1
julia> guarding_read(guard) do ref
ref[] # must not mutate anything
end
1
ConcurrentUtils.guarding
— Functionguarding(f!, guard)
Apply f!
to the data wrapped in guard
while obtaining exclusive access.
See: Guard
, ReadWriteGuard
, guarding_read
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = Guard(Ref(0));
julia> guarding(guard) do ref
ref[] += 1
end
1
ConcurrentUtils.guarding_read
— Functionguarding_read(f, guard)
Apply f
to the data wrapped in guard
while obtaining shared access.
See: Guard
, ReadWriteGuard
, guarding
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = ReadWriteGuard(Ref(0));
julia> guarding_read(guard) do ref
ref[] # must not mutate anything
end
0
Low-level interfaces
API | Summary |
---|---|
ThreadLocalStorage | Create a thread-local storage of type T created by factory() . |
Backoff | Create a callable backoff where backoff() spin-wait some amount of times. |
spinloop | A hint to the compiler, runtime, and hardware that spinloop() is in the middle of a spin loop. Call this in a spin loop that requires some other worker threads to make forward progress in order for the current thread to make forward progress. |
ConcurrentUtils.ThreadLocalStorage
— TypeThreadLocalStorage{T}(factory)
ThreadLocalStorage(factory)
Create a thread-local storage of type T
created by factory()
.
An instance tls
of ThreadLocalStorage
support the operation x = tls[]
for obtaining an object x
of value T
.
Using this API is extremely tricky. Arguably, it is not even well-defined when and how it can be used.
Theoretically, it is safe to use this API if the programmer can ensure that, once a value x = tls[]
is obtained, the code does not hit any yield points until there is no more access to x
. However, it is not possible to know if a certain operation is yield-free in general.
Thus, this API currently exists primarily for helping migration of code written using nthreads
and threadid
in an ad-hoc manner.
An object of type T
is allocated for each worker thread of the Julia runtime. If T
is not given, T = typeof(factory())
is used (i.e., factory
is assumed to be type-stable).
Extended help
Examples
julia> using ConcurrentUtils
julia> tls = ThreadLocalStorage(Ref{Int});
julia> tls[] isa Ref{Int}
true
julia> tls[][] = 123;
julia> tls[][]
123
ConcurrentUtils.Backoff
— TypeBackoff(mindelay, maxdelay) -> backoff
Create a callable backoff
where backoff()
spin-wait some amount of times.
The number of maximum calls to spinloop
starts at mindelay
and exponentially increases up to maxdelay
. backoff()
returns the number of spinloop
called.
Backoff
uses an internal RNG and it does not consume the default task-local RNG.
Extended help
Examples
If islocked
does not cause data races, Backoff
can be used to implement a backoff lock.
julia> using ConcurrentUtils
julia> function trylock_with_backoff(lck; nspins = 1000, mindelay = 1, maxdelay = 1000)
backoff = Backoff(mindelay, maxdelay)
n = 0
while true
while islocked(lck)
spinloop()
n += 1
n > nspins && return false
end
trylock(lck) && return true
n += backoff()
end
end;
julia> lck = ReentrantLock();
julia> trylock_with_backoff(lck)
true
ConcurrentUtils.spinloop
— Functionspinloop()
A hint to the compiler, runtime, and hardware that spinloop()
is in the middle of a spin loop. Call this in a spin loop that requires some other worker threads to make forward progress in order for the current thread to make forward progress.
Observe that the above sentence specifically mentions worker threads and not Task
s. A Julia programmer should always be alarmed whenever an API talks about threads instead of Task
s. Prefer higher-level APIs such as channels and condition variables.
A proper use of spinloop
requires extra cares such as a fallback that waits in the Julia scheduler and/or a mechanism that enables the spin loop code path given that other threads exist and a task that can break the spin loop is running or will be scheduled eventually.
Implementation detail
It calls GC.safepoint()
and jl_cpu_pause
.