Default partition manager — architecture#
This page describes the design of Mofka’s default partition manager: how events are laid out on disk, how a producer’s batch flows from the wire through to the chunk files, how consumers are fed back from the same files, and which configuration parameters govern each piece. It is a companion to the field reference in Advanced deployments, which lists the configuration keys and their default values.
Overview#
The default partition manager stores events in append-only chunked log
files on a local filesystem, using
ABT-IO for
asynchronous, Argobots-friendly I/O. Unlike the legacy partition manager
— which stores metadata in a Yokan database and data blobs in a Warabi
target — the default partition needs no external Mochi services. Its only
required dependency is an io_controller (an ABT-IO instance running
in the same Bedrock process).
Trade-off: legacy gives you whatever durability, indexing, and remote storage your Yokan/Warabi backends provide; default trades that flexibility for a much simpler, dependency-free deployment and a small, predictable on-disk format.
On-disk layout#
A partition is a directory:
<path>/<topic>-<uuid>/
where <path> is the configured path, <topic> is the
topic name, and <uuid> is the partition UUID. Inside that directory,
events live in chunks. Every chunk is four sibling files sharing the
prefix chunk-NNNNNN. (six-digit zero-padded chunk id):
Extension |
Content |
|---|---|
|
Concatenated event metadata, packed back-to-back (variable length). |
|
Concatenated event data, packed back-to-back (variable length). |
|
Concatenated serialized |
|
Fixed-size |
The .idx file is what makes the format self-describing: every event
in the chunk has exactly one fixed-width record pointing into the variable-
length sibling files. That same property drives crash recovery (below).
Chunk rotation#
The manager opens a current chunk (id 0 at first) and appends to it until either of the following triggers, after which it closes the four file descriptors and opens a new chunk with id incremented by one:
the chunk has reached
max_events_per_chunkevents, orthe combined size of its
.metaand.datafiles has reachedmax_chunk_sizebytes.
Trade-off. Smaller chunk-size limits give more files (more
open and fdatasync overhead per partition over time, but
finer-grained crash recovery and more opportunities for parallel reads).
Larger limits give fewer, larger files.
Crash recovery and restart#
When a DefaultPartitionManager is created with a UUID matching an
existing partition directory, it scans chunk-*.idx files in numeric
order and rebuilds its in-memory state — the index, the per-event
chunk-id table, the running write offsets, the total event count, and the
current chunk id. New events are then appended to the last chunk
(triggering a rotation if it is already full).
This means restart is the same code path as fresh start: pass the same
<path> and the same partition UUID, and the partition resumes
exactly where it left off.
Write path — producer to disk#
A producer’s mofka_producer_send_batch RPC is handled by
receiveBatch. The work is split across two ULTs to keep the RPC
handler free for the next request while the heavy lifting (RDMA pull +
disk writes + optional fsync) happens in the background.
The handler ULT allocates a
PushOperationdescribing the batch, takes the write-queue lock, assigns the batch’s first event id (incrementingm_assigned_eventsby the batch size — this linearizes id assignment across concurrent senders), pushes the operation onto the write queue, signals the write-loop ULT, and starts the RDMA pulls.The RDMA pulls acquire one buffer from each of the two producer-side buffer pools, sized to fit the batch’s metadata sizes array + concatenated metadata content (and likewise for data). Both pulls run concurrently and overlap with whatever the write-loop ULT is doing for an earlier batch.
The single per-partition write-loop ULT picks the next operation off the queue, waits for its two RDMA pulls to complete, then writes to disk in this order:
Build
IndexRecords and a serialized descriptor blob in memory (one descriptor per event — these are what consumers will receive in the descriptor stream).Issue four
abt_io_pwritecalls — one to.meta, one to.data, one to.desc, one to.idx— at the current per-file offsets.If
sync=true, issue fourabt_io_fdatasynccalls. Producers are not acknowledged before this step.Bump the file offsets, append the new index records to the in-memory index, and bump
m_total_events(which wakes any consumer ULTs blocked onm_events_cv).If the rotation triggers fire, close the four FDs and open a new chunk.
Send the producer’s response.
Because step 1 assigns ids under the queue lock and step 3 drains the
queue serially, batches are stored in submission order — the in-memory
index, the on-disk .idx records, and the consumer-visible event
ids all agree.
Parameters that affect this path.
producers.metadata_buffer_pool.* and producers.data_buffer_pool.*
shape the incoming RDMA buffer pools (see Caches and buffer pools
below). sync flips the per-batch fdatasync on or off.
max_chunk_size and max_events_per_chunk decide when the
write loop rotates to a new chunk.
Read path — feeding a consumer#
A mofka_consumer_request_events RPC drives the per-consumer feed
loop in feedConsumer. The loop pipelines disk reads with RDMA
pushes so the network and the disk stay busy in parallel:
Wait for events. Block on
m_events_cvuntil the index has advanced past the consumer’s cursor (or the consumer is asked to stop).Pick the next batch. Read the upcoming
min(batch_size, available)index records to compute total metadata and total descriptor bytes.Allocate two outgoing RDMA buffers — one from
consumer_metadata_buffer_pool, one fromconsumer_desc_buffer_pool— each sized to fit the per-event sizes array plus the concatenated content for that batch.Issue parallel disk reads. For each event in the batch, look up the chunk id and pread the metadata and descriptor regions out of the chunk file. Reads are grouped per chunk and the read-only file descriptor for each chunk comes from an LRU FD cache (see below). Reads use the non-blocking
abt_io_pread_nbso all reads in the batch are in flight together.Wait for disk reads to complete, then wait for the previous batch’s RDMA push to drain, then start the next push. The pipeline depth is one batch deep — the buffers from the previous push are recycled at the moment they become safe.
When the consumer is stopped with no events left, the loop sends a final
empty feed(0, NoMoreEvents, ...).
Parameters that affect this path.
consumers.metadata_buffer_pool.* and
consumers.desc_buffer_pool.* size the outgoing RDMA buffers. If a
batch needs a buffer larger than the largest tier currently has, the pool
grows on demand — correctness-preserving, but at the cost of allocating
and registering a new buffer mid-flight. fd_cache_capacity decides
how many distinct chunk files can be hot in the read cache simultaneously.
Read path — random access (getData)#
When a consumer asks for the data bytes for events whose descriptors it
already holds, the request lands in getData (driven by the
mofka_consumer_request_data RPC):
Decode each
DataDescriptorto recover its{chunk_id, offset, size}triple.Allocate a flat buffer sized to the sum of all requested sizes.
For each chunk referenced by the request, open the chunk’s
.datafile withabt_io_open(O_RDONLY), issue blockingabt_io_preads for the events from that chunk, then close the file before moving to the next chunk.Build a segment list that respects the descriptor-level
flatten()layout (so non-contiguous selections are honored), expose those segments as a single read-only bulk handle, and push to the consumer.
This path does not currently use the FD cache or a buffer pool — it
opens and closes each chunk per call and allocates a fresh
std::vector<char> for the response. For workloads that randomly
fetch data across many chunks, that per-call open cost can dominate;
caching here is an obvious future improvement.
Caches and buffer pools#
There are five distinct data structures whose sizes are driven by configuration. The table summarises which parameter sizes each one and which data-flow path uses it:
Structure |
Sized by |
Used in |
|---|---|---|
FD cache (LRU read-only file descriptors) |
|
|
Producer metadata buffer pool (write_only) |
|
|
Producer data buffer pool (write_only) |
|
|
Consumer metadata buffer pool (read_only) |
|
|
Consumer descriptor buffer pool (read_only) |
|
|
The FD cache is a simple LRU keyed by chunk-file path. A miss costs an
abt_io_open; a hit reuses an already-open read-only descriptor.
Eviction skips entries that are still referenced by an in-flight read,
so a small cache won’t break correctness — but it will trigger more
opens.
The four buffer pools are thallium::bulk_buffer_pool instances.
Each pool is tiered: it holds num_tiers tiers of buffers, and
the k-th tier holds buffers of size
first_size * size_multiple^k. Each tier preallocates
num_buffers buffers (0 means “start empty, grow on
demand”). Buffers are pre-registered with Mercury for RDMA, so reusing
a pool buffer avoids the registration cost that an ad-hoc allocation
would pay. A request for size S is served from the smallest tier
whose buffers are >= S; if no such buffer is free, the pool
grows.
The producer-side pools are write_only (the partition manager
receives into them). The consumer-side pools are read_only
(the partition manager sends from them).
Tuning notes#
`max_chunk_size` / `max_events_per_chunk`. Larger values amortize per-chunk overhead. Smaller values give finer-grained crash recovery (less to replay) and let consumer reads parallelize across more files.
`sync`.
truecallsfdatasyncon all four chunk files after every batch — a server crash loses at most the in-flight batch.falselets the kernel flush lazily, which is faster but exposes a wider crash window.`fd_cache_capacity`. Bump it if consumers regularly read across many old chunks. Each miss is an
abt_io_open; each hit is free.Producer buffer pools. Set
first_sizeclose to the typical batch payload size, and consider preallocating a few buffers (num_buffers > 0) for the smallest tier if you expect a steady stream of similarly-sized batches.Consumer buffer pools. Set
first_sizeclose tobatch_size * average event metadata(or descriptor) size. Undersizing isn’t a correctness bug but causes runtime growth; gross oversizing wastes pinned memory.
The full list of parameters and their defaults lives in Advanced deployments under “More configuration”.