Advanced deployments#
To understand how Mofka works, we should understand its architecture, shown in the figure bellow, which represents the software stack in one server process.
Mofka’s architecture.#
Mofka is built using the Mochi collection of components, in particular Argobots, Mercury, Margo, Thallium, Yokan, Flock, ABT-IO, plus custom components (partition managers) that Mofka defines.
We have seen in the previous section that, to deploy a Mofka server, we need to call
bedrock. Bedrock is our bootstrapping component. It takes a JSON configuration file
and instanciates the rest of the components according to this configuration file.
Each part of this stack can be configured and finely tuned. This section will explain how,
and along the way we will make our deployment fully distributed and persistent.
The final, more complete configuration file will be shown at the end.
Let’s start with the configuration from the previous section.
{
"libraries": [
"libflock-bedrock-module.so",
"libyokan-bedrock-module.so",
"libwarabi-bedrock-module.so",
"libabt-io-bedrock-module.so",
"libmofka-bedrock-module.so"
],
"providers": [
{
"name" : "group_manager",
"type" : "flock",
"provider_id" : 1,
"config": {
"bootstrap": "self",
"file": "mofka.json",
"group": {
"type": "static"
}
}
},
{
"name": "master_database",
"provider_id": 2,
"type": "yokan",
"tags" : [ "mofka:master" ],
"config" : {
"database" : {
"type": "map"
}
}
},
{
"name" : "io_controller",
"type" : "abt_io",
"provider_id" : 3,
"config" : {},
"dependencies": {
"pool": "__primary__"
}
}
]
}
This configuration first lists the libraries (lib*-bedrock-module.so) that bedrock
needs to load in order to instanciate the required components. It then defines three
providers.
The “group_manager” is a component that handles group membership. It uses the Flock Mochi library and currently uses the “self” bootstrap method. This method is appropriate for a single-process service.
The “master_database” is a component that stores information about all the available topics. It relies on the Yokan Mochi library and currently uses a “map” database, which is implemented as a C++
std::mapand is therefore in memory.The “io_controller” is a component that will manage the I/O operations from our partitions. It uses the ABT-IO Mochi library and is currently configured to rely on the same thread pool as the rest of the components.
Making the master database persistent#
The first change we will want to make is to make the master database persistent.
This can be done by installing the mochi-yokan package with a persistent backend.
For example, if we are within a spack environment, we can request Yokan to be built
with the RocksDB backend as follows.
# you may uninstall the previous version of yokan first
spack uninstall --dependents -y mochi-yokan
# then add the new one to your environment
spack add mochi-yokan+rocksdb
# finally, reconcretize and rebuild it
spack concretize -f
spack install
Once this is done, we can change the “database” part of our master database configuration as follows.
"database" : {
"type": "rocksdb",
"config": {
"create_if_missing": true,
"path": "/tmp/mofka/master"
}
}
We are now indicating that we want a RocksDB database, that we want it created if it does not exist, and we provide the path at which this database should live.
Note
Some backends (such as RocksDB) won’t create the directories for their database
if they don’t exist, so go ahead and mkdir /tmp/mofka.
You may now start Mofka as follows.
bedrock na+sm -c config.json -v trace
Ensure that bedrock is running fine and has created the RocksDB database.
Running multiple processes#
Next, we will distribute Mofka across multiple processes, first on a single machine, then
on multiple machines. We will start Mofka as an MPI program using mpirun, but will
first have to do some modification to its configuration.
The first component we need to change is the group manager (flock), which is currently
setup to use the “self” bootstrap method. If we leave it like this, each process will
build its own single-member group. Instead, we want them to rely on MPI to form the group.
Go ahead and change "bootstrap": "self" into "bootstrap": "mpi" .
Next, only one process should have a master database. This can be solved by adding
an "__if__" condition to our master_database component as follows.
{
"__if__": "$MPI_COMM_WORLD.rank == 0",
"name": "master",
"provider_id": 2,
"type": "yokan",
...
This "__if__" condition will be evaluated at bootstrap and, if false, disable
the object it appears in. This way, only the process with rank 0 will have a master database.
You may now start Mofka as follows.
na+sm is a shared-memory transport and will only work on a single machine.
To run on a cluster, use an appropriate transport, such as “tcp”, “verbs”, “cxi”, etc.
Important
Please refer to the documentation here
to find which protocols are available in your cluster. The margo-info executable
is installed in your Spack environment as part of the Mochi stack. If you need help, do not
hesitate to contact the Mochi developers on Slack.
Creating persistent partitions#
Before continuing, make sure Mofka is now running and take note of the location where the mofka.json file was created (you may note that this file now contains the addresses of all the Mofka servers).
Export the DIASPORA_CTL_DRIVER_OPTIONS environment variable as follows so it can
be found by diaspora-ctl.
export DIASPORA_CTL_DRIVER_OPTIONS="--driver mofka --driver.group_file /path/to/mofka.json"
In the quickstart tutorial, we have created a topic with a single in-memory partition as follows.
diaspora-ctl topic create --name my_topic --topic.num_partitions 1
The more complete version of this command is the following.
diaspora-ctl topic create --name my_topic \
--topic.num_partitions 1 \
--topic.config.type memory \
--topic.dependencies.pool __primary__
We will discuss the --topic.dependencies.pool later, for now note that by default,
not specifying a partition type gives us an in-memory partition. Instead, we would like
a partition that is stored on disk. The following command will do that for us.
diaspora-ctl topic create --name my_topic \
--topic.config.type default \
--topic.num_partitions 1 \
--topic.config.partition.path /tmp/mofka
This command should execute and we should find that a new folder my_topic-<uuid> has been created, where <uuid> is a randomly-generated UUID.
Note that a more complete version of the above command would be the following.
diaspora-ctl topic create --name my_topic \
--topic.num_partitions 1 \
--topic.config.type default \
--topic.config.partition.path /tmp/mofka \
--topic.dependencies.io_controller io_controller \
--topic.dependencies.pool __primary__
If --topic.dependencies.io_controller is not specified, the first listed ABT-IO
component from the server will be used. If the pool is not specified, the __primary__
pool is used.
The above code however lets us better understand how configuration is passed to the
partitions. The --topic.<X> arguments are translated verbatim into a JSON
options object that mirrors the resulting Bedrock provider configuration.
The above command will convert its arguments into the following.
{
"config": {
"type": "default",
"partition": {
"path": "/tmp/mofka"
}
},
"dependencies": {
"io_controller": "io_controller",
"pool": "__primary__"
}
}
That is: --topic.config.type selects the partition manager,
--topic.config.partition.<field> sets a partition-manager-specific field
(here, path), and --topic.dependencies.<name> resolves a Bedrock
dependency. You can confirm the full effective configuration with
bedrock-query, where config/type, config/partition/*, and
dependencies/* will appear in the same shape as the arguments above.
Important
If you start Mofka (i.e. Bedrock) on multiple servers, “num_partitions” represents the total number of partitions to create, and these partitions will be assigned to servers in a round-robin manner.
More configuration#
If we run the following:
bedrock-query tcp -f mofka.json -p
We can see that the returned configuration is much larger than the one we provided. In particular, the “config/partition” object of the Mofka provider holding our partition has many more fields than just “path”. The table below explains each of these fields.
Field |
Default |
Description |
|---|---|---|
|
(required) |
Base directory in which the partition’s chunk files are written. The
partition manager creates a subdirectory |
|
|
Maximum size in bytes that a chunk file (data, metadata, descriptors, or index) is allowed to reach before the partition rotates to a new chunk. |
|
|
Maximum number of events written to a single chunk before the partition rotates to a new one. |
|
|
If |
|
|
Capacity of the LRU cache of read-only file descriptors used when serving consumer requests. Larger values reduce the cost of opening chunk files repeatedly when consumers read across many chunks. |
|
|
Number of size tiers in the buffer pool used to receive event
metadata from producers via RDMA. Each tier holds buffers of the
same size; sizes grow geometrically across tiers (see
|
|
|
Number of pre-allocated buffers per tier. |
|
|
Size in bytes of the buffers in the first (smallest) tier. |
|
|
Geometric ratio between successive tiers. With |
|
|
Same as above, but for the buffer pool used to receive event data from producers. |
|
|
Pre-allocated buffers per tier in the producer data pool. |
|
|
First-tier buffer size for incoming producer data. |
|
|
Geometric ratio between tiers in the producer data pool. |
|
|
Number of tiers in the buffer pool used to send event metadata back to consumers via RDMA. |
|
|
Pre-allocated buffers per tier in the consumer metadata pool. |
|
|
First-tier buffer size for outgoing consumer metadata. |
|
|
Geometric ratio between tiers in the consumer metadata pool. |
|
|
Number of tiers in the buffer pool used to send event data descriptors (small, fixed-size records pointing at chunk locations) back to consumers via RDMA. |
|
|
Pre-allocated buffers per tier in the consumer descriptor pool. |
|
|
First-tier buffer size for outgoing data descriptors. |
|
|
Geometric ratio between tiers in the consumer descriptor pool. |
These fields can be provided as command-line argument as we did with “path” before, but it is much easier to aggregate them in a “topic-config.json” configuration file as follows.
{
"fd_cache_capacity": 64,
"max_chunk_size": 67108864,
"max_events_per_chunk": 1000000,
"consumers": {
...
},
...
}
We can then pass this configuration file to diaspora-ctl topic create as follows.
diaspora-ctl topic create --name my_topic \
--topic.config.type default \
--topic.num_partitions 1 \
--topic.config.partition.path /tmp/mofka \
--topic-config topic-config.json
Note
Any argument passed using the --topic.config. prefix will overwrite arguments
from the configuration file passed to --topic-config. In the above scenario,
if the topic-config.json file had a “path” entry, it would be overwritten with
“/tmp/mofka” as a consequence of passing
--topic.config.partition.path /tmp/mofka.
Setting up multithreading#
In all the configurations we have used so far, every Mofka provider, the
network progress loop, and every RPC handler share the same default
Argobots execution stream (__primary__). That means RPCs serialize
behind progress and behind one another — a bottleneck that becomes visible
as soon as multiple producers or consumers hit a single server, or when
chunk writes are slow enough to stall the handler that’s currently running.
The simplest way to relieve this pressure is to ask Margo to spawn extra
Argobots resources for you. Two shortcut fields in the margo
section of the Bedrock configuration do exactly that:
"margo": {
"use_progress_thread": true,
"rpc_thread_count": 4
}
use_progress_threadmoves Mercury’s progress loop to its own dedicated execution stream, so polling no longer competes with RPC handlers.rpc_thread_countcreates an__rpc__pool and N execution streams pulling from it; every RPC handler is then dispatched into that pool. With four ES, up to four RPCs can run truly concurrently.
Note
In our case it is not necessary to set “use_progress_thread” to true,
by default the progress loop will use the __primary__ execution
stream, which, if we specify a non-zero rpc_thread_count, is already
separated from the execution streams servicing RPCs.
After restarting Bedrock, bedrock-query will show the expanded
margo.argobots section with the additional pool and the additional
xstreams that Margo generated for you.
For finer control — CPU pinning, choosing the Argobots scheduler, or
declaring multiple custom pools — you can write the long-form
argobots.pools and argobots.xstreams arrays directly in
the margo section, and reference them by name from
progress_pool, rpc_pool, or any provider’s pool
dependency. The full schema is documented in
Margo’s JSON configuration reference. The next
section uses that long form to dedicate execution streams to ABT-IO.
Improving I/O performance#
By default, the io_controller (an ABT-IO provider) depends on
__primary__. Every read and write a partition issues runs there,
contending with whatever else lives in that pool. Two complementary
techniques help: dedicate execution streams to ABT-IO, and/or switch to
the io_uring backend.
Dedicating execution streams to ABT-IO#
Declare a new pool and two xstreams pulling from it in the margo
section:
"argobots": {
"pools": [
{ "name": "__primary__", "kind": "fifo_wait", "access": "mpmc" },
{ "name": "io_pool", "kind": "fifo_wait", "access": "mpmc" }
],
"xstreams": [
{ "name": "__primary__",
"scheduler": { "type": "basic_wait", "pools": ["__primary__"] } },
{ "name": "io_es_0",
"scheduler": { "type": "basic_wait", "pools": ["io_pool"] } },
{ "name": "io_es_1",
"scheduler": { "type": "basic_wait", "pools": ["io_pool"] } }
]
}
Then point the io_controller provider at the new pool by changing
its pool dependency:
{
"name": "io_controller",
"type": "abt_io",
"provider_id": 3,
"config": {},
"dependencies": { "pool": "io_pool" }
}
Two execution streams sharing one mpmc pool means ABT-IO ULTs
can run on whichever ES is free, and they no longer compete with RPC
handling for CPU time. If your workload is read-heavy and consumers
fan out across many partitions, raising the xstream count further can
help.
Switching to io_uring#
ABT-IO can also use Linux’s io_uring kernel interface as its
backend. With io_uring, the kernel asynchronously processes submitted
I/Os and reports completions through a ring buffer; the ABT-IO ULT only
submits and then waits for completion. Because the kernel does the
actual work, dedicating extra Argobots execution streams to this
backend would mostly waste cores — __primary__ is the right
target pool.
Add a second ABT-IO provider alongside the first one:
{
"name": "io_uring_controller",
"type": "abt_io",
"provider_id": 4,
"config": {
"num_urings": 1,
"liburing_flags": ["IOSQE_ASYNC"]
},
"dependencies": { "pool": "__primary__" }
}
A partition can then be told which ABT-IO instance to use by naming it in the partition’s dependencies:
diaspora-ctl topic create --name fast_topic \
--topic.config.type default \
--topic.num_partitions 1 \
--topic.config.partition.path /tmp/mofka \
--topic.dependencies.io_controller io_uring_controller
Note
io_uring requires a recent enough Linux kernel and an abt-io
build with liburing support (in Spack, the mochi-abt-io
+liburing variant). If either is missing, fall back to the
thread-based backend with a dedicated pool as shown above.
Full configuration#
After all the modifications done above, here is the final configuration.
{
"libraries": [
"libflock-bedrock-module.so",
"libyokan-bedrock-module.so",
"libabt-io-bedrock-module.so",
"libmofka-bedrock-module.so"
],
"margo": {
"use_progress_thread": true,
"rpc_thread_count": 4,
"argobots": {
"pools": [
{ "name": "__primary__", "kind": "fifo_wait", "access": "mpmc" },
{ "name": "io_pool", "kind": "fifo_wait", "access": "mpmc" }
],
"xstreams": [
{ "name": "__primary__",
"scheduler": { "type": "basic_wait", "pools": ["__primary__"] } },
{ "name": "io_es_0",
"scheduler": { "type": "basic_wait", "pools": ["io_pool"] } },
{ "name": "io_es_1",
"scheduler": { "type": "basic_wait", "pools": ["io_pool"] } }
]
}
},
"providers": [
{
"name" : "group_manager",
"type" : "flock",
"provider_id" : 1,
"config": {
"bootstrap": "mpi",
"file": "mofka.json",
"group": {
"type": "static"
}
}
},
{
"__if__": "$MPI_COMM_WORLD.rank == 0",
"name": "master",
"provider_id": 2,
"type": "yokan",
"tags" : [ "mofka:master" ],
"config" : {
"database" : {
"type": "rocksdb",
"config": {
"create_if_missing": true,
"path": "/tmp/mofka/master"
}
}
}
},
{
"name" : "io_controller",
"type" : "abt_io",
"provider_id" : 3,
"config" : {},
"dependencies": {
"pool": "io_pool"
}
},
{
"name" : "io_uring_controller",
"type" : "abt_io",
"provider_id" : 4,
"config" : {
"num_urings": 1,
"liburing_flags": ["IOSQE_ASYNC"]
},
"dependencies": {
"pool": "__primary__"
}
}
]
}