tmc::channel#
tmc::channel
is a high-performance async MPMC queue. It is unbounded and linearizable / FIFO.
Producers may enqueue items using push()
or post()
. Consumers may dequeue items using pull()
.
If the queue is empty, the consumer will suspend until an item is enqueued.
It’s recommended to close()
and drain()
the channel before destruction.
However, this is not required; if the channel is destroyed while elements remain inside, they will also be destroyed.
The channel can be configured by setting the Config template parameter which provides
BlockSize
,
PackingLevel
,
and EmbedFirstBlock
,
or at runtime by calling set_reuse_blocks()
,
set_consumer_spins()
,
and set_heavy_load_threshold()
.
Usage Example#
#include "tmc/task.hpp"
#include "tmc/spawn_tuple.hpp"
#include "tmc/channel.hpp"
tmc::task<void> producer(tmc::chan_tok<int> chan) {
for (int i = 0; i < 100; ++i) {
co_await chan.push(1);
}
co_await chan.drain();
}
tmc::task<void> consumer(tmc::chan_tok<int> chan) {
std::optional<int> data = co_await chan.pull();
while (data.has_value()) {
int& i = data.value();
// do something with i
data = co_await chan.pull();
}
}
tmc::task<void> chan_quickstart() {
tmc::chan_tok<int> chan = tmc::make_channel<int>();
co_await tmc::spawn_tuple(producer(chan), consumer(chan));
}
API Reference#
-
template<typename T, typename Config = tmc::chan_default_config>
inline chan_tok<T, Config> tmc::make_channel() noexcept# Creates a new channel and returns an access token to it.
-
template<typename T, typename Config>
class chan_tok# Tokens share ownership of a channel by reference counting. Access to the channel (from multiple tokens) is thread-safe, but access to a single token from multiple threads is not. To access the channel from multiple threads or tasks concurrently, make a copy of the token for each (by using the copy constructor).
Public Functions
-
template<typename U>
inline bool post(U &&Val) noexcept# If the channel is open, this will always return true, indicating that Val was enqueued.
If the channel is closed, this will return false, and Val will not be enqueued.
Will not suspend or block.
-
template<typename U>
inline chan_t::aw_push push(U &&Val) noexcept# Returns a bool. If the channel is open, this will always return true, indicating that Val was enqueued.
If the channel is closed, this will return false, and Val will not be enqueued.
May suspend to do producer clustering under high load.
-
inline chan_t::aw_pull pull() noexcept#
Returns a std::optional<T>. If the channel is open, this will always return a value.
If the channel is closed, this will continue to return values until the queue has been fully drained. After that, it will return an empty optional.
May suspend until a value is available, or the queue is closed.
-
inline void close() noexcept#
All future producers will return false. Consumers will continue to read data until the channel is drained, at which point all consumers will return false.
This function is idempotent and thread-safe. It is not lock-free. It may contend the lock against
close()
anddrain()
.
-
inline tmc::task<void> drain() noexcept#
If the channel is not already closed, it will be closed. Then, waits for consumers to drain all remaining data from the channel. After all data has been consumed from the channel, any waiting consumers will be awakened, and all current and future consumers will immediately return empty results.
This function is idempotent and thread-safe. It is not lock-free. It may contend the lock against
close()
anddrain()
.
-
inline void drain_wait() noexcept#
If the channel is not already closed, it will be closed. Then, waits for consumers to drain all remaining data from the channel. After all data has been consumed from the channel, any waiting consumers will be awakened, and all current and future consumers will immediately return empty results.
Warning: Avoid calling drain_wait() from a coroutine or function that may run on an executor. It may deadlock with consumers waiting to run on a single-threaded executor. Prefer to co_await drain() instead. drain_wait() is safe to call from an external thread.
This function is idempotent and thread-safe. It is not lock-free. It may contend the lock against
close()
anddrain()
.
-
inline chan_tok &set_reuse_blocks(bool Reuse) noexcept#
If true, spent blocks will be cleared and moved to the tail of the queue. If false, spent blocks will be deleted. Default: true
If Config::EmbedFirstBlock == true, this will be forced to true.
-
inline chan_tok &set_consumer_spins(size_t SpinCount) noexcept#
If a consumer sees no data is ready at a ticket, it will spin wait this many times. Each spin wait is an asm(“pause”) and reload. Default: 0
-
inline chan_tok &set_heavy_load_threshold(size_t Threshold) noexcept#
If the total number of elements pushed per second to the queue is greater than this threshold, then the queue will attempt to move producers and consumers near each other to optimize sharing efficiency. The default value of 2,000,000 represents an item being pushed every 500ns. This behavior can be disabled entirely by setting this to 0.
-
inline chan_tok(const chan_tok &Other) noexcept#
Copy Constructor: The new chan_tok will have its own hazard pointer so that it can be used concurrently with the other token.
If the other token is from a different channel, this token will now point to that channel.
-
inline chan_tok &operator=(const chan_tok &Other) noexcept#
Copy Assignment: If the other token is from a different channel, this token will now point to that channel.
-
inline chan_tok(chan_tok &&Other) noexcept#
Move Constructor: The moved-from token will become empty; it will release its channel pointer, and its hazard pointer.
-
inline chan_tok &operator=(chan_tok &&Other) noexcept#
Move Assignment: The moved-from token will become empty; it will release its channel pointer, and its hazard pointer.
If the other token is from a different channel, this token will now point to that channel.
-
inline ~chan_tok()#
Releases the token’s hazard pointer and decrements the channel’s shared reference count. When the last token for a channel is destroyed, the channel will also be destroyed. If the channel was not drained and any data remains in the channel, the destructor will also be called for each remaining data element.
-
template<typename U>
-
struct chan_default_config#
Public Static Attributes
-
static constexpr size_t BlockSize = 4096#
The number of elements that can be stored in each block in the channel linked list.
-
static constexpr size_t PackingLevel = 0#
At level 0, queue elements will be padded up to the next increment of 64 bytes. This reduces false sharing between neighboring elements. At level 1, no padding will be applied. At level 2, no padding will be applied, and the flags will be packed into the upper bits of the consumer pointer.
-
static constexpr bool EmbedFirstBlock = false#
If true, the first storage block will be a member of the channel object (instead of dynamically allocated). Subsequent storage blocks are always dynamically allocated. Incompatible with set_reuse_blocks(false).
-
static constexpr size_t BlockSize = 4096#