channel#

Async MPMC unbounded queue using purely zero-copy operation. It is linearizable / FIFO. It can be configured by overriding the Config template parameter.

The channel is accessed through a coro_util::chan_tok access token, which is created by make_channel(). Tokens share ownership of the channel by reference counting; to access the channel from multiple threads or tasks concurrently, give each one its own copy of the token.

Producer (unrestricted) operations:

Consumer (unrestricted) operations:

Usage Examples#

pull() suspends until data is available.

#include "coro_util/tmc/channel.hpp"
#include "tmc/task.hpp"
#include "tmc/spawn_tuple.hpp"

tmc::task<void> producer(coro_util::chan_tok<int> chan) {
  for (int i = 0; i < 100; ++i) {
    chan.post(i);
  }
  chan.close();
}

tmc::task<void> consumer(coro_util::chan_tok<int> chan) {
  // Loop automatically breaks once the channel drains after close()
  while (auto data = co_await chan.pull()) {
    int& v = data.value();
    // do something with v
  }
}

tmc::task<void> chan_quickstart() {
  coro_util::chan_tok<int> chan = coro_util::make_channel<int>();
  co_await tmc::spawn_tuple(producer(chan), consumer(chan));
}

try_pull() is non-suspending and must be polled. It returns a scoped zero-copy handle whose status() (or operator bool()) indicates the result.

// Drains all data currently in the channel. Returns true if the channel was empty afterward,
// and false if the channel was closed afterward.
tmc::task<bool> consume_all_data(coro_util::chan_tok<int> chan) {
  while (true) {
    auto data = chan.try_pull();
    switch (data.status()) {
      case coro_util::qu_err::OK: {
        int& v = data.value();
        // do something with v
        break;
      }
      case coro_util::qu_err::EMPTY:
        // No data available right now. Try again later.
        co_return true;
      case coro_util::qu_err::CLOSED:
        // The channel has been closed and drained. Do not try again later.
        co_return false;
      default:
        std::unreachable();
    }
  }
}

API Reference#

template<typename ContinuationPolicy, typename T, typename Config = coro_util::chan_default_config>
inline chan_tok<ContinuationPolicy, T, Config> coro_util::impl::make_channel() noexcept#

Creates a new channel and returns an access token to it. Internal plumbing: callers use the policy-bound make_channel wrapper provided by an adapter header (e.g. tmc/channel.hpp).

template<typename ContinuationPolicy, typename T, typename Config>
class chan_tok#

Tokens share ownership of a channel by reference counting. Access to the channel (from multiple tokens) is thread-safe, but access to a single token from multiple threads is not. To access the channel from multiple threads or tasks concurrently, make a copy of the token for each (by using the copy constructor).

Public Functions

template<typename ...Args>
inline bool post(Args&&... ConstructArgs) noexcept#

If the channel is open, this will always return true, indicating that an object of type T was enqueued by in-place construction in the channel using the provided constructor arguments.

If the channel is closed, this will return false, and the object will not be constructed.

Will not suspend or block.

inline chan_t::aw_pull pull() noexcept#

Await to dequeue. Returns a chan_zc_scope which provides a scoped zero-copy reference to a value in the channel storage. When the scope is destroyed, the referenced value will be destroyed and the channel slot freed for reuse. Use value(), operator*(), or operator->() to access the underlying value.

The returned scope’s has_value() / operator bool() returns true if a value was dequeued. If the channel is closed, this will continue to return values until the channel has been fully drained, after which it will resume with an empty scope (has_value() returns false).

May suspend until a value is available, or the channel is closed.

WARNING: The chan_zc_scope uses the same hazard pointer as this chan_tok ! For correct operation, you MUST release or destroy the returned chan_zc_scope before calling another member function on this chan_tok, and before this chan_tok goes out of scope! The safest way to accomplish this is to tie its scope to the loop: while (auto data = co_await chan.pull()) { process(data.value()); }

inline chan_try_pull_zc_scope<T> try_pull() noexcept#

Attempts to immediately dequeue, returning a chan_try_pull_zc_scope which provides a scoped zero-copy reference to a value in the channel storage. When the scope is destroyed, the referenced value will be destroyed and the channel slot freed for reuse. Use value(), operator*(), or operator->() to access the underlying value.

The returned scope’s status() returns:

  • qu_err::OK - a value was dequeued

  • qu_err::EMPTY - no value is currently available

  • qu_err::CLOSED - the channel has been closed and drained

The returned scope’s has_value() / operator bool() returns true if a value was dequeued, or false if the channel was empty or closed.

WARNING: The chan_try_pull_zc_scope uses the same hazard pointer as this chan_tok ! For correct operation, you MUST release or destroy the returned scope before calling another member function on this chan_tok, and before this chan_tok goes out of scope! The safest way to accomplish this is to tie its scope to the loop: while (auto data = chan.try_pull()) { process(data.value()); }

template<typename TIter>
inline bool post_bulk(TIter &&Begin, size_t Count)#

If the channel is open, this will always return true, indicating that Count elements, starting from the Begin iterator, were enqueued.

If the channel is closed, this will return false, and no items will be enqueued.

Each item is moved (not copied) from the iterator into the channel.

The closed check is performed first, then space is pre-allocated, then all Count items are moved into the channel. Thus, there cannot be a partial success - either all or none of the items will be moved.

Will not suspend or block.

template<typename TIter>
inline bool post_bulk(TIter &&Begin, TIter &&End)#

Calculates the number of elements via size_t Count = End - Begin;

If the channel is open, this will always return true, indicating that Count elements, starting from the Begin iterator, were enqueued.

If the channel is closed, this will return false, and no items will be enqueued.

Each item is moved (not copied) from the iterator into the channel.

The closed check is performed first, then space is pre-allocated, then all Count items are moved into the channel. Thus, there cannot be a partial success - either all or none of the items will be moved.

Will not suspend or block.

template<typename TRange>
inline bool post_bulk(TRange &&Range)#

Calculates the number of elements via size_t Count = Range.end() - Range.begin();

If the channel is open, this will always return true, indicating that Count elements from the beginning of the range were enqueued.

If the channel is closed, this will return false, and no items will be enqueued.

Each item is moved (not copied) from the iterator into the channel.

The closed check is performed first, then space is pre-allocated, then all Count items are moved into the channel. Thus, there cannot be a partial success - either all or none of the items will be moved.

Will not suspend or block.

inline void close() noexcept#

All future calls to post() will immediately return false. Calls to pull() will continue to read data until all messages have been consumed, at which point all subsequent calls to pull() will immediately return an empty scope. If the queue was already empty, any waiting consumers will be awoken immediately and return an empty scope.

This function is idempotent and thread-safe. It is not lock-free. It may contend the lock against other close() calls and block reclamation.

inline chan_tok(const chan_tok &Other) noexcept#

Copy Constructor: The new chan_tok will have its own hazard pointer so that it can be used concurrently with the other token.

If the other token is from a different channel, this token will now point to that channel.

inline chan_tok &operator=(const chan_tok &Other) noexcept#

Copy Assignment: If the other token is from a different channel, this token will now point to that channel.

inline chan_tok new_token() noexcept#

Identical to the token copy constructor, but makes the intent more explicit - that a new token is being created which will independently own a reference count and hazard pointer to the underlying channel.

inline chan_tok(chan_tok &&Other) noexcept#

Move Constructor: The moved-from token will become empty; it will release its channel pointer, and its hazard pointer.

inline chan_tok &operator=(chan_tok &&Other) noexcept#

Move Assignment: The moved-from token will become empty; it will release its channel pointer, and its hazard pointer.

If the other token is from a different channel, this token will now point to that channel.

inline ~chan_tok()#

Releases the token’s hazard pointer and decrements the channel’s shared reference count. When the last token for a channel is destroyed, the channel will also be destroyed. If the channel was not drained and any data remains in the channel, the destructor will also be called for each remaining data element.

struct chan_default_config#

Public Static Attributes

static constexpr size_t BlockSize = 256#

The number of elements that can be stored in each block in the channel linked list.

static constexpr bool ElementPadding = true#

If true, queue elements will be padded up to the next increment of 64 bytes. This reduces false sharing between neighboring elements. If false, no padding will be applied.