ConcurrentUtils.jl
Promise
| API | Summary |
|---|---|
Promise | Create a promise of type T; i.e., a memory location that holds a value of type T that can be set once and retrieved asynchronously. |
try_race_fetch | Try to retrieve a value if it is already set. Return Ok(value) on success and Err(NotSetError()) on failure. |
try_race_put_with! | Fetch an existing value or set a computed value (computed = thunk()). The thunk is called at most once for each instance of promise. |
race_put_with! | Fetch an existing value or set value = thunk(). The thunk is called at most once for each instance of promise. |
try_race_put! | Try to set a value in the promise. |
ConcurrentUtils.Promise — TypePromise{T}()
Promise()Create a promise of type T; i.e., a memory location that holds a value of type T that can be set once and retrieved asynchronously.
Extended help
For an explanation on the concept, see: Futures and promises - Wikipedia.
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_fetch(p)
Try.Err: NotSetError()
julia> put!(p, 123);
julia> fetch(p)
123
julia> try_race_put!(p, 456)
Try.Err: 123Memory ordering
An event that retrieves or waits for a value from a promise establishes a happened-before edge from the event that have set the value to the promise. Invocations of the API that includes an event that sets a value to a promise include:
put!(promise, value)that does not throw.try_race_put!(promise, value)that returns anOkresult.try_race_put_with!(thunk, promise)that callsthunk(and hence returns anOk).race_put_with!(thunk, promise)that callsthunk.
(The invocation thunk() is sequenced-before the event that sets the value.)
Invocations of the API that includes an event that retrieves or waits for a value from a promise include:
fetch(promise)wait(promise)try_race_fetch(promise)that returns anOkresult.try_race_put_with!(thunk, promise)that does not callthunk(and hence returns anErr)race_put_with!(thunk, promise)that does not callthunk
Supported operations
A promise::Promise{T} supports the following operations:
put!(promise, value): Set avalue; throws an error if some value has already been set.try_race_put!(promise, value): Try to set avalue.fetch(promise): Obtain a value that thepromiseholds. Wait if necessary.wait(promise): Wait for a value to be set.try_race_fetch(promise): Try to obtain an existing value in thepromise.try_race_put_with!(thunk, promise): Try to get a value or set a value generated bythunk.race_put_with!(thunk, promise): Similar totry_race_put_with!but the returned value does not indicate ifthunkis called or not.
ConcurrentUtils.try_race_fetch — Functiontry_race_fetch(promiselike) -> Ok(value::T) or Err(NotSetError())Try to retrieve a value if it is already set. Return Ok(value) on success and Err(NotSetError()) on failure.
try_race_fetch can be called on a Promise or a tasklet (@tasklet).
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_fetch(p)
Try.Err: NotSetError()
julia> put!(p, 123);
julia> try_race_fetch(p)
Try.Ok: 123ConcurrentUtils.try_race_put_with! — Functiontry_race_put_with!(thunk, promise::Promise{T}) -> Ok(computed::T) or Err(existing::T)Fetch an existing value or set a computed value (computed = thunk()). The thunk is called at most once for each instance of promise.
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_put_with!(p) do
123 + 456
end
Try.Ok: 579
julia> try_race_put_with!(p) do
42
end
Try.Err: 579ConcurrentUtils.race_put_with! — Functionrace_put_with!(thunk, promise::Promise{T}) -> value::TFetch an existing value or set value = thunk(). The thunk is called at most once for each instance of promise.
This is similar to try_race_put_with! but the caller cannot tell if thunk is called or not by the return type.
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> race_put_with!(p) do
println("called")
123 + 456
end
called
579
julia> race_put_with!(p) do
println("called")
42
end
579ConcurrentUtils.try_race_put! — Functiontry_race_put!(promise::Promise{T}, value) -> Ok(value′::T) or Err(existing::T)Try to set a value in the promise.
Since the value is converted to T first, the returned value′ may not be identical to the input value.
Extended help
Examples
julia> using ConcurrentUtils
julia> p = Promise{Int}();
julia> try_race_put!(p, 123)
Try.Ok: 123
julia> try_race_put!(p, 456)
Try.Err: 123Promise-like interfaces
| API | Summary |
|---|---|
@tasklet | Create an object that is a memoized version of () -> code but also acts like a Promise that is not settable. |
Once | A concurrent object for lazily initializing an object of type T. |
ConcurrentUtils.@tasklet — Macro@tasklet codeCreate an object that is a memoized version of () -> code but also acts like a Promise that is not settable.
A t = @tasklet code supports: t(), fetch(t), wait(t), and try_race_fetch.
Extended help
Examples
julia> using ConcurrentUtils
julia> t = @tasklet begin
println("called")
123
end;
julia> try_race_fetch(t)
Try.Err: NotSetError()
julia> t()
called
123
julia> t()
123
julia> fetch(t)
123
julia> wait(t);Memory ordering
An event that retrieves or waits for a value from a tasklet t establishes a happened-before edge from the events in code. Invocations of the API that includes an event that retrieves or waits for a value from a tasklet t include:
fetch(t)wait(t)try_race_fetch(t)that returns anOkresult.
Supported operations
A tasklet t = @tasklet code supports the following operations:
t(): Evaluatecodeif it hasn't been evaluated. Otherwise, equivalent tofetch.fetch(t): Wait for other tasks to invoket()and then return the result.wait(t): Wait for other tasks to invoket().try_race_fetch: Try to retrieve the result oft()if it is already called.
ConcurrentUtils.Once — TypeOnce{T}(f = T)
Once(f)A concurrent object for lazily initializing an object of type T.
Given O = Once{T}(f), invoking O[] evaluates v = f() if f has not been called via O[] and return the value v. Otherwise, O[] returns the value v returned from the first invocation of O[].
Examples
julia> using ConcurrentUtils
julia> O = Once{Vector{Int}}(() -> zeros(Int, 3));
julia> v = O[]
3-element Vector{Int64}:
0
0
0
julia> v === O[]
trueExtended help
When used as in Once{T}(f), the function f must always return a value of type T. As such, T() isa T must hold for type T used as in Once{T}().
When used as in Once(f), the function f must always return a value of concrete consistent type. If Once object is used as a global constant in a package, the type of the value returned from f must not change for different julia processes for each stack of Julia environment. Currently, Once(f) also directly invokes f() to compute the result type but this value is thrown away. This is because Once(f) is assumed to be called at the top-level of a package for lazily initializing a global state and serializing the computed value in the precompile cache is not desired.
Known limitation: If O[] is evaluated for a global O::Once{T} during precompilation, the resulting value is serialized into the precompile cache.
Read-write Lock
| API | Summary |
|---|---|
ReadWriteLock | Create a read-write lock. |
lock_read | lock_read(rwlock) takes reader (shared) lock. It must be released with unlock_read. |
unlock_read | Unlock the reader lock taken with a preceding invocation of lock_read. |
trylock_read | Try to take reader lock. Return true on success. |
ConcurrentUtils.ReadWriteLock — TypeReadWriteLock()Create a read-write lock.
Use lock_read and unlock_read for taking reader (shared) lock. Use lock and unlock for taking writer (exclusive) lock.
See also: ReadWriteGuard
Extended help
Examples
julia> using ConcurrentUtils
julia> rwlock = ReadWriteLock();
julia> lock(rwlock) do
# "mutate" data
end;
julia> lock_read(rwlock) do
# "read" data
end;Supported operations
lock_readtrylock_read(not very efficient but lock-free)unlock_readlocktrylockunlock
ConcurrentUtils.lock_read — Functionlock_read(rwlock)
lock_read(f, rwlock)lock_read(rwlock) takes reader (shared) lock. It must be released with unlock_read.
The second method lock_read(f, rwlock) execute the function f without any arguments after taking the reader lock and release it before returning. lock_read(f, rwlock) returns the result of f().
See also: ReadWriteLock
ConcurrentUtils.unlock_read — Functionunlock_read(rwlock)Unlock the reader lock taken with a preceding invocation of lock_read.
See also: ReadWriteLock
ConcurrentUtils.trylock_read — Functiontrylock_read(rwlock) -> acquired::BoolTry to take reader lock. Return true on success.
This function is lock-free but may not be efficient.
See also: ReadWriteLock
Guards
| API | Summary |
|---|---|
Guard | Guard mutable data. Use guarding to obtain exclusive access to data. |
ReadWriteGuard | Guard mutable data. Use guarding and guarding_read to obtain exclusive ("write") and shared ("read") access to data. |
guarding | Apply f! to the data wrapped in guard while obtaining exclusive access. |
guarding_read | Apply f to the data wrapped in guard while obtaining shared access. |
ConcurrentUtils.Guard — TypeGuard(data)Guard mutable data. Use guarding to obtain exclusive access to data.
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = Guard(Ref(0));
julia> guarding(guard) do ref
ref[] += 1
end
1ConcurrentUtils.ReadWriteGuard — TypeReadWriteGuard(data)Guard mutable data. Use guarding and guarding_read to obtain exclusive ("write") and shared ("read") access to data.
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = ReadWriteGuard(Ref(0));
julia> guarding(guard) do ref
ref[] += 1
end
1
julia> guarding_read(guard) do ref
ref[] # must not mutate anything
end
1ConcurrentUtils.guarding — Functionguarding(f!, guard)Apply f! to the data wrapped in guard while obtaining exclusive access.
See: Guard, ReadWriteGuard, guarding_read
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = Guard(Ref(0));
julia> guarding(guard) do ref
ref[] += 1
end
1ConcurrentUtils.guarding_read — Functionguarding_read(f, guard)Apply f to the data wrapped in guard while obtaining shared access.
See: Guard, ReadWriteGuard, guarding
Extended help
Examples
julia> using ConcurrentUtils
julia> guard = ReadWriteGuard(Ref(0));
julia> guarding_read(guard) do ref
ref[] # must not mutate anything
end
0Low-level interfaces
| API | Summary |
|---|---|
ThreadLocalStorage | Create a thread-local storage of type T created by factory(). |
Backoff | Create a callable backoff where backoff() spin-wait some amount of times. |
spinloop | A hint to the compiler, runtime, and hardware that spinloop() is in the middle of a spin loop. Call this in a spin loop that requires some other worker threads to make forward progress in order for the current thread to make forward progress. |
ConcurrentUtils.ThreadLocalStorage — TypeThreadLocalStorage{T}(factory)
ThreadLocalStorage(factory)Create a thread-local storage of type T created by factory().
An instance tls of ThreadLocalStorage support the operation x = tls[] for obtaining an object x of value T.
Using this API is extremely tricky. Arguably, it is not even well-defined when and how it can be used.
Theoretically, it is safe to use this API if the programmer can ensure that, once a value x = tls[] is obtained, the code does not hit any yield points until there is no more access to x. However, it is not possible to know if a certain operation is yield-free in general.
Thus, this API currently exists primarily for helping migration of code written using nthreads and threadid in an ad-hoc manner.
An object of type T is allocated for each worker thread of the Julia runtime. If T is not given, T = typeof(factory()) is used (i.e., factory is assumed to be type-stable).
Extended help
Examples
julia> using ConcurrentUtils
julia> tls = ThreadLocalStorage(Ref{Int});
julia> tls[] isa Ref{Int}
true
julia> tls[][] = 123;
julia> tls[][]
123ConcurrentUtils.Backoff — TypeBackoff(mindelay, maxdelay) -> backoffCreate a callable backoff where backoff() spin-wait some amount of times.
The number of maximum calls to spinloop starts at mindelay and exponentially increases up to maxdelay. backoff() returns the number of spinloop called.
Backoff uses an internal RNG and it does not consume the default task-local RNG.
Extended help
Examples
If islocked does not cause data races, Backoff can be used to implement a backoff lock.
julia> using ConcurrentUtils
julia> function trylock_with_backoff(lck; nspins = 1000, mindelay = 1, maxdelay = 1000)
backoff = Backoff(mindelay, maxdelay)
n = 0
while true
while islocked(lck)
spinloop()
n += 1
n > nspins && return false
end
trylock(lck) && return true
n += backoff()
end
end;
julia> lck = ReentrantLock();
julia> trylock_with_backoff(lck)
trueConcurrentUtils.spinloop — Functionspinloop()A hint to the compiler, runtime, and hardware that spinloop() is in the middle of a spin loop. Call this in a spin loop that requires some other worker threads to make forward progress in order for the current thread to make forward progress.
Observe that the above sentence specifically mentions worker threads and not Tasks. A Julia programmer should always be alarmed whenever an API talks about threads instead of Tasks. Prefer higher-level APIs such as channels and condition variables.
A proper use of spinloop requires extra cares such as a fallback that waits in the Julia scheduler and/or a mechanism that enables the spin loop code path given that other threads exist and a task that can break the spin loop is running or will be scheduled eventually.
Implementation detail
It calls GC.safepoint() and jl_cpu_pause.