Internal Architecture of libzmq

Introduction

It seems like the number of people who are willing to peek at ØMQ internals is growing lately and while the codebase is heavily commented what's missing is an overview of the architecture, a document that would allow newcomers to dive into the code without much trouble.

This text is an attempt to provide such document. It will grow gradually to cover the whole codebase, however, it is not intended to go into too much detail. Details tend to change as time goes on and the document would get out of sync quickly. To get the details you should check the source code instead.

The first thing to do is to warn the reader that the codebase is complex. It's not complex in terms of number of lines (currently it has ~10k lines) or by being spaghetti code (it's not). Rather, it's complex because of sheer amount of different combinations it takes into account. Consider that it runs on over ten operating systems each of them having several versions. It runs on many different microarchitctures ranging from ARM to Itanium. It can be compiled by different compilers, starting with gcc and ending with MSVC and SunStudio. It has to interact nicely with 20+ different language bindings. It allows to use different underlying transports as different as in-process message passing and reliable multicast. It supports different messaging patterns: remote procedure call, data distribution, parallel pipelining and more. Each socket can either connect to the peer or allow the peer to connect to it. It can even do both. Dialogue between two nodes can either survive breakages of the underlying connection or it can be transient. Etc. Etc. All these options are mutually orthogonal, giving literally thousands of possible combinations to take into account.

The moral of the above is that the code is complex even though it may look simple and it is easy to break. Till now, approximately ten man-years were invested into the project amounting to 2 hours spent on each single line of code, including lines like "i++;" Thus: Be careful and try to understand what the code does and why does it do it in that particular way before changing it.

Global state

Using global variables in a library is a pretty sure way to shoot yourself in a foot. Everything works well until the library gets linked into an executable twice (see the picture). At that point you'll start getting bizzare errors and crashes.

arch1.png

To prevent this problem libzmq has no global variables. Instead, user of the library is responsible for creating the global state explicitly. Object containing the global state is called 'context'. While from the users perspective context looks more or less like a pool of I/O threads to be used with your sockets, from libzmq's perspective it's just an object to store any global state that we happen to need. For example, the list of available inproc endpoints is stored in the context. The list of sockets that have been closed but still linger in the memory because of unsent messages is held by context. Etc.

Context is implemented as class ctx_t.

Concurrency model

ØMQ's concurency model may a bit confusing at first. The reason is that we eat our own dogfood and use message passing to achieve concurrency and internal scalability. Thus, even though ØMQ is a multithreaded application you won't find mutexes, condition variables or semaphores meant to orchestrate the parallel processing. Instead, each object will live in its own thread and no other thread will ever touch it (that's why mutexes are not needed). Other threads will communicate with the object by sending it messages (called 'commands' to distinguish them for user-level ØMQ messages). Same way the object can speak to other objects — potentially running in different threads — by sending them 'commands'.

From user's point of view passing commands between objects is easy. Just derive your object from base class 'object_t' and that's it. You can send commands and define handlers for incoming commands. Have a look at command.hpp file. It defines all available commands. Say, there is a 'term' command with a single argument called 'linger'. To send 'term' command with linger argument of 100 to object 'p' do this:

send_term (p, 100);

On the other hand, if you want to define handler for the 'term' command do this:

void my_object_t::process_term (int linger)
{
    //  Implement your action here.
}

However, be aware that the above only works if you are derived from 'object_t' class!

For most commands, there's a guarantee that the destination object won't disappear while command is in-flight. (To understand the guarantee check the section explaining the object tree model used to tie asynchronous objects into well-defined hierarchies.) However, for couple of commands (basically for those that are sent across the object tree rather than along its latices) the guarantee doesn't apply. For these commands the sender calls inc_seqnum function on the destination object, which synchronously increments a counter stored in the destination object (sent_seqnum) before sending the command itself. When the destination object processes the command, it increases another counter (processed_seqnum). When the object is shutting down, it knows that it can't finish while processed_seqnum is less than sent_seqnum, i.e. there are still commands in flight to be delivered to this object. The whole process is done transparently in object_t and own_t classes. Command sender and command receiver can just send and receive commands and don't have to care about the command sequence numbers.

Remark: Actually, some pieces of data are enclosed in critical sections. Two rules apply to choosing where to use critical section:

  1. There's a need for the data to be accessible from any thread at any time (say the list of existing inproc endpoints).
  2. The data guarded by the critical section should never be touched on the critical path (message passing per se).

Threading Model

As for threads, there are only two kinds of them in ØMQ. Each thread is either 'application thread' i.e. thread created outside of ØMQ and used to access the API, or an I/O thread — created inside of ØMQ and used to send and receive messages in the background. 'thread_t' is a simple portability class to create threads in OS-agnostic manner.

It should be understood that while the above discussion is correct from OS's point of view, ØMQ has a bit different notion of threads. From ØMQ's perspective, thread is any object that has a 'mailbox'. Mailbox is basically a queue to store commands sent to any object living in that thread. The thread retrieves the commands from the mailbox in the order they were sent in and processes them one by one. The mailbox is implemented in 'mailbox_t' class.

There are two different kinds of threads as far as ØMQ is concerned: I/O threads and sockets.

I/O threads are easy. In this case OS thread and ØMQ thread correspond each to another. Particular I/O thread is running in its own OS thread and has a single mailbox for incoming commands.

Sockets are somehow more complex. In short, each ØMQ socket has its own mailbox for incoming commands and thus it is treated by ØMQ as a separate thread. In reality, single application thread can create multiple sockets meaning that in this case multiple ØMQ threads map to a single OS thread. To make it even more complex, ØMQ sockets can be migrated between OS threads. For example, Java binding may use ØMQ socket from a single thread, however, after the work is done, it passes it to the garbage collection thread to be destroyed. In such case the association between ØMQ thread and OS thread changes — the socket is said to be migrated to a different OS thread.

I/O threads

I/O threads are background threads used by ØMQ to handle network traffic in asynchronous way. The implementation is pretty minimal. io_thread_t class is derived from thread_t which is a simple compatibility wrapper on top of OS-specific threading API. It is also derived from object_t which makes it capable of sending and receiving commads (such as stop command which is sent to the I/O thread when the library is being terminated).

In addition to that, each I/O thread owns a poller object. Poller object (poller_t) is an abstraction over different polling mechanisms as provided by different OSes. It is a typedef for a preferred polling mechanism class, such as select_t, poll_t, epoll_t etc.

There's a simple helper class called io_object_t which all the objects living in I/O threads are derived from. Thanks to that they are able to perform following functions: Register a file descriptor (add_fd). From that point on, a callback is invoked when something happens with the file descriptor (in_event, out_event). Once the file descriptor is not needed, the object can unregister it using rm_fd function. Object can also register a timer using add_timer function which causes timer_event to be invoked when the timer expires. Timer can be canceled using cancel_timer function.

io1.png

It is worth of noting that the io_thread_t itself registers a file descriptor with the poller it owns. It's the file descriptor associated with its mailbox (recall that any thread, whether I/O or application thread have an associated mailbox). That fires the in_event on the io_thread_t when a new command arrives. io_thread_t then dispatches the command to its destined object.

Object trees

The internal objects created within ØMQ library, are, for the most part, organised into tree hierarchies. The root of the hierarchy is always a socket:

objtree1.png

Each object in the tree can live in a different thread. It is in no way bound to live in the same thread as its parent. The root of the tree (socket) lives in an application thread while the remaining objects live in I/O threads:

objtree2.png

The main raison d'être of the object trees is to provide deterministic shutdown mechanism. The rule of the thumb is that object asked to shut down sends shutdown request to all its children and waits for the confirmations before shutting down itself.

Note that the exchange of shutdown request and confirmation — which are both commands — effectively flushes all the commands currently on the flight between the two objects. That's why most commands (those that are passed along latices of the object tree) don't need to use command sequence counters (see above) to guarantee that the object won't be shut down while there are still messages in flight aimed at it.

The shutdown process gets more complex when object decides to shut itself down without being asked to do so by the parent — such as when session object shuts down after the TCP disconnection. We have to account for parent-initiated termination, self-initiated termination and even the case when the two accidentally happen at the same time.

It turns out that all the cases can be solved by self-terminating object asking its parent to shut it down. The diagrams below are sequence diagrams for all the scenarios. Note that parent asks child to shut down sending it term command. Child confirm its termination by sending term_ack back to the parent. Additionally, child, when it wants to self-destruct, asks parent to shut it down by sending it term_req command.

objtree3.png

Note that in the last case, the term_req command is simply ignored and dropped by the parent. It knows it have already sent a termination request (term) to the child, so there's no point in sending it anew. If it did send the second termination request, it would arrive at the child after it have been deallocated and trash the process by causing a segmentation fault or by overwriting the memory.

The object tree mechanism is implemented in own_t class. Note that own_t is derived from object_t and thus every object in the object tree can send and receive commands (it needs to do so during the termination sequence). However, the opposite is not true. There are objects that can send and receive commands, but are not part of an object tree (e.g. pipe endpoints).

The reaper thread

There's one specific problem with the shutdown mechanism as described above. Shut down of any particular object (including socket) can take arbitrary amount of time. However, we would like zmq_close to have POSIX-like behaviour: You can close TCP socket, the call returns immediately, even if there's pending outbound data to be sent to the peer later on.

So, call to zmq_close from an application thread should initiate the socket shutdown, however, we cannot rely on the said thread to do all the handshaking with the child objects. The thread may be already involved in doing something completely different and it may never even invoke ØMQ library anymore. Thus, the socket should be migrated to a worker thread that would handle all the handshaking in application thread's stead.

The logical choice would be to migrate the socket to one of the I/O threads, however, ØMQ can be initialised with zero I/O threads (to be used exclusively for in-process communication). Thus, we need a dedicated thread to do the task. And reaper thread is exactly that.

It's implemented as reaper_t class. The socket sends reap command to the reaper thread, which, upon receiving the command, takes care of the socket and shuts it down cleanly.

Messages

Before we can progress further, we have to know how ØMQ messages work.

The requirements for messages are rather complex. The main reason for complexity is that the implementation should be very efficient for both very small and very large messages. The requirements are as follows:

  • For very small messages it's cheaper to copy the message than keep the shared data on the heap. These messages thus have no associated buffer and the data are stored directly in the zmq_msg_t structure — presumably on the stack. This has huge impact on performance as it almost entirely avoids need to do memory allocations/deallocations.
  • When using inproc transport, message should never be copied. Thus, the buffer sent in one thread, should be received in the other thread and get deallocated there.
  • Messages should support reference counting. Thus, if a message is published to many different TCP connections, all the sending I/O threads access the same buffer rather then copying the buffer for rach I/O thread or TCP connection.
  • The same trick should be accessible to user, so that he can send same physical buffer to multiple ØMQ sockets without need to copy the content.
  • User should be able to send buffer that was allocated by application-specific allocation mechanism without need to copy the data. This is especially important with legacy applications which allocate large amounts of data.

To achieve these goals messages in ØMQ look like this:

For very small messages (VSMs) the buffer is part of zmq_msg_t structure itself. This way the buffer is allocated on the stack and there's no need for using allocation function, such as malloc, which tend to be the performance bottleneck for small message transfer. content field contains ZMQ_VSM constant. The message data are stored in vsm_data byte array. vsm_size specifies the length of the message:

msg1.png

Note that maximal size of the message that fits into vsm_data buffer is specified by ZMQ_MAX_VSM_SIZE constant. By default it is set to 30, but you can change the value when building the library.

For messages that won't fit into vsm_data buffer we assume that allocating the buffer on heap is cheaper than copying the message data around all the time. We allocate the buffer on the heap and make zmq_msg_t structure point to it.

The structure allocated on the heap is called msg_content_t and it contains all the metadata relevant to the allocated chunk: its address (data), its size (size), funtion pointer to use to deallocate it (ffn) and a hint value to pass to the deallocation function (hint).

The buffer along with its metadata can be shared between several zmq_msg_t instances. In such a case we have to keep buffer's reference count (refcnt) so that we can deallocate it when there are no more zmq_msg_t structures pointing to it:

msg2.png

As can be seen on the diagram, to minimise the number of allocations, buffer for message data and the metadata can be allocated in a single memory chunk.

Note that the reference counting mechanism is accessible to the user. Calling zmq_msg_copy doesn't physically copy the buffer, instead it creates a new zmq_msg_t structure that points to the same buffer as the original message.

Finally, if message buffer is supplied by the user, we can't store the buffer metadata in the same memory chunk and thus we have to allocate a separate chunk for the metadata:

msg3.png

Pipes

Message scheduling

There are different scheduling algorithms used in ØMQ, however, all of them work on a single datastructure, namely on a flat array of pipes. Some pipes are active meaning that they you can send/recv messages from them, some are passive, which means that message cannot be sent to the pipe because high watermark have been reached or that message cannot be read from the pipe because there is none.

If the individual pipes in the array were just flagged by active/passive flag, scheduling would be inefficient. For example, if there were 10,000 inbound pipes, all of them except of a single one passive, fair-queueing algorithm would have to check 9,999 pipes before receiving each single message.

To solve this problem, all the active pipes are located at the beginning of the list, while passive pipes are at the end. There's a single variable (active) that determines how much of the pipes at the beginning of the array are active. The rest are passive.

Thus, when performing a scheduling algorithm, such as load-balancing or fair-queueing we have to bother only with the N initial pipes which are guaranteed to be active and completely ignore the rest. That kind of approach leads to O(1) scheduling.

A bit more complex part is how to activate and deactivate individual pipes. And it turns out that it's not so complex after all and that it can be done in O(1) time. The diagram below shows deactivation of pipe X. Note that all that's needed is swapping two elements in the array and decrementing the active variable:

sched1.png

Comments: 5

Add a New Comment