Async Queues#

Bounded Queues#

Unbounded Queues#

coro_util::qu_spsc_unbounded

SPSC queue

coro_util::qu_mpsc_unbounded

MPSC queue

coro_util::channel

MPMC queue. accessed via chan_tok hazard pointer + shared ownership handle

Adapter Usage#

The library provides base templates in the impl:: namespace which must be bound to a continuation policy to use. The continuation policy completes the integration with your library of choice. Pre-bound adapter types which combine the impl:: base and policy into a complete, usable type are provided for severall well-known coroutine libraries.

Every supported coroutine library has its own adapter folder under coro_util/<library>/. Switching libraries is just switching the include path. The tabs below show the same pair for each library: a producer that post() s a single value and a consumer that drains the queue with pull().

There are two types of integrations:

  • Directly awaitable: TooManyCooks, YACLib, Boost.Cobalt, concurrencpp, cppcoro, libcoro

  • Wrapper required: Asio, Boost.Capy, libfork

// Replace with the adapter header for your library
#include "coro_util/tmc/qu_spsc_unbounded.hpp"
#include "tmc/task.hpp"

tmc::task<void> producer(coro_util::qu_spsc_unbounded<int>& q) {
  q.post(42);
  co_return;
}

tmc::task<void> consumer(coro_util::qu_spsc_unbounded<int>& q) {
  while (auto data = co_await q.pull()) {
    process(*data);
  }
}
// For wrapper-required libraries, both op.hpp and the queue hpp are needed
// Replace with the adapter header for your library
#include "coro_util/asio/op.hpp"
#include "coro_util/asio/qu_spsc_unbounded.hpp"
#include "asio/awaitable.hpp"

asio::awaitable<void> producer(coro_util::qu_spsc_unbounded<int>& q) {
  // only awaitable operations need to be wrapped
  q.post(42);
  co_return;
}

asio::awaitable<void> consumer(coro_util::qu_spsc_unbounded<int>& q) {
  // asio_wrap() bridges the queue awaitable through Asio's closed await_transform
  while (auto data = co_await coro_util::asio_wrap(q.pull())) {
    process(*data);
  }
}

Common Types#

All of the queues and the channel report the outcome of try_pull() using a single shared status enum.

enum class coro_util::qu_err#

Status code returned by try_pull().status()

Values:

enumerator OK#
enumerator EMPTY#
enumerator CLOSED#