Pull to refresh

High-performance network library using C++20 coroutines

Level of difficultyMedium
Reading time17 min
Views17K

Asynchronous programming is commonly employed for efficient implementation of network interactions in C++. The essence of this approach lies in the fact that the results of socket read/write functions are not immediately available but become accessible after some time. This approach allows for loading the processor with useful work during the wait for data. Various implementations of this approach exist, such as callbacks, actors, future/promise, coroutines. In C++, these implementations are available as libraries from third-party developers or can be implemented independently.

Coroutines are the most challenging to implement as they require writing platform-dependent code. However, the recent version of the C++ 20 language standard introduces support for coroutines at the compiler and standard library levels. Coroutines are functions that can suspend their execution, preserving their state, and later return to that state to resume the function's work. The compiler automatically creates a checkpoint with the coroutine's state.

For a comprehensive understanding of C++ 20 coroutines, refer to this article. Below, we examine a code example using coroutines and describe important points applied during implementation.

For a complete understanding and assimilation of the material presented in this article, it is necessary for the reader to possess basic knowledge in the field of sockets and the TCP/IP protocol. Fundamental concepts, such as mechanisms of interaction through sockets, principles of data transmission via the TCP/IP protocol, as well as an understanding of key terms in this domain, will be crucial for the successful comprehension of the provided information. GeeksforGeeks offers useful tutorials on TCP Server-Client Implementation in C++ and Socket Programming in C/C++, which can serve as valuable resources for enhancing your understanding of these foundational concepts.

The code of the library discussed below, along with usage examples, can be downloaded from the following link: https://github.com/resetius/coroio.

Contents

  1. Network Interaction: Echo Client and Server Example

  2. Echo Client and Server Code Overview

  3. promise and coroutine_handle

  4. Using final_suspend in Unit Tests

  5. Event Handling Using select

  6. Performance and benchmarks


Network Interaction: Echo Client and Server Example

Let's examine a classic example of network interaction—the implementation of an echo client and server. This example illustrates the task of implementing network programs and effectively demonstrates the details of a specific approach used for asynchronous programming. The echo client/server example is often included in the documentation of third-party libraries.

The echo server receives a message from the client and immediately sends it back unchanged as a response. The echo client sends a message to the server and waits for a response. Here's an example of the process.

This is a client session:

$ ./echoclient
test
Received: test

message
Received: message

yet another message
Received: yet another message

This is a server session:

$ ./echoserver
Received: test

Received: message

Received: yet another message

Echo Client and Server Code Overview

Let's examine the Echo client and server code from top to bottom. We'll start
by looking at the overall code structure, then move on to exploring the C++
coroutine API, and finally, we'll descend to the POSIX API level for socket
operations. It's worth noting that no third-party libraries are used in this
implementation. Only the standard C++ library and the POSIX API are employed.

The Echo client reads from standard input, sends the input to the socket,
and waits for a response:

TSimpleTask client(TLoop* loop)
{
    char out[128] = {0};
    char in[128] = {0};
    ssize_t size = 1;

    try {
        TSocket input{TAddress{}, 0 /* stdin */, loop->Poller()};
        TSocket socket{TAddress{"127.0.0.1", 8888}, loop->Poller()};

        co_await socket.Connect();
        while (size && (size = co_await input.ReadSome(out, sizeof(out)))) {
            co_await socket.WriteSome(out, size);
            size = co_await socket.ReadSome(in, sizeof(in));
            std::cout << "Received: " << std::string_view(in, size) << "\n";
        }
    } catch (const std::exception& ex) {
        std::cout << "Exception: " << ex.what() << "\n";
    }
    loop->Stop();
    co_return;
}

int main(int argc, char** argv) {
    TLoop loop;
    client(&loop);
    loop.Loop();
    return 0;
}

The Echo server listens on a port, accepts incoming connections (calls
accept), and for each connection, a separate coroutine is created. This
coroutine reads data from the socket and immediately sends the read data back
to the socket:

TSimpleTask client_handler(TSocket socket, TLoop* loop) {
    char buffer[128] = {0}; ssize_t size = 0;

    try {
        while ((size = co_await socket.ReadSome(buffer, sizeof(buffer))) > 0) {
            std::cerr << "Received: " << std::string_view(buffer, size) << "\n";
            co_await socket.WriteSome(buffer, size);
        }
    } catch (const std::exception& ex) {
        std::cerr << "Exception: " << ex.what() << "\n";
    }
    co_return;
}

TSimpleTask server(TLoop* loop)
{
    TSocket socket(TAddress{"0.0.0.0", 8888}, loop->Poller());
    socket.Bind();
    socket.Listen();

    while (true) {
        auto client = co_await socket.Accept();
        client_handler(std::move(client), loop);
    }
    co_return;
}

int main(int argc, char** argv) {
    TLoop loop;
    server(&loop);
    loop.Loop();
    return 0;
}

The client and server share a similar structure: the main function starts an
event handling loop (TLoop) and launches a coroutine. The event handling loop
interacts with an object (TPoller), which performs input-output multiplexing.
In this implementation, input-output multiplexing is achieved using the select
mechanism, and changing it to poll, epoll, kqueue, etc., should not pose a
problem. TPoller waits for events on the socket or a timeout, passes these
events to the TLoop loop, which, in turn, wakes up the coroutines subscribed to
these events.

The following diagram illustrates how the program switches the execution flow between the
client coroutine and the Loop.loop() function when the coroutine sleeps on
each co_await operator call (in reality, it may not sleep every time;
details below).

coroutine suspend/resume
coroutine suspend/resume

Below is a diagram of the main classes implemented in our library. TPoller has several implementations (select, poll, epoll, kqueue, uring). TLoop contains TPoller - its responsibility is to initiate the Poll. TSocket contains a reference to TPoller and adds events to it when ReadSome, WriteSome, Connect, etc., are called.

class diagram
class diagram

Before moving on to the description of the main components, let's take a look at the coroutine API in C++20.

Objects promise and coroutine_handle

Each coroutine is associated with two entities: promise and coroutine_handle.
coroutine_handle is necessary for managing the coroutine externally; for
example, it can be used to resume (resume) the coroutine, destroy (destroy)
it, or check if the coroutine is done (done). With the promise object, you
can define the behavior of the coroutine at the start and end of its
execution and manage the coroutine from within. It's important to note that the
methods of the promise object should not be called directly; instead, the
co_return and co_yield operators are used, and the compiler substitutes the
appropriate methods.

In the example, the following pair of promise/coroutine_handle is used
(TVoidPromise/TSimpleTask):

struct TVoidPromise;

struct TSimpleTask : std::coroutine_handle<TVoidPromise>
{
    using promise_type = TVoidPromise;
};

struct TVoidPromise
{
    TSimpleTask get_return_object() { 
      return { TSimpleTask::from_promise(*this) }; 
    }
    std::suspend_never initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_void() {}
    void unhandled_exception() {}
};

Here, the coroutine doesn't return anything (return_void), starts
immediately (std::suspend_never initial_suspend), and does not sleep upon
completion (std::suspend_never final_suspend).

In some cases, such as when implementing unit tests, it is convenient to sleep
the coroutine upon completion:

struct TTestPromise;

struct TTestTask : std::coroutine_handle<TTestPromise>
{
    using promise_type = TTestPromise;
};

struct TTestPromise
{
    TTestTask get_return_object() { 
      return { TTestTask::from_promise(*this) }; 
    }
    std::suspend_never initial_suspend() { return {}; }
    std::suspend_always /* the difference here */ final_suspend() noexcept 
    { return {}; }
    void return_void() {}
    void unhandled_exception() {}
};

Please note that in this case, the promise differs only in the signature of
the final_suspend method.

Using final_suspend in Unit Tests

Let's consider a unit test that launches client and server coroutines and then
checks the correctness of the message exchange:

void test_write_after_connect(void**) {
    TLoop loop;
    TSocket socket(TAddress{"127.0.0.1", 8898}, loop.Poller());
    socket.Bind();
    socket.Listen();
    char send_buf[128] = "Hello";
    char rcv_buf[128] = {0};

    TTestTask h1 = [](TLoop* loop, char* buf, int size) -> TTestTask
    {
        TSocket client(TAddress{"127.0.0.1", 8898}, loop->Poller());
        co_await client.Connect();
        co_await client.WriteSome(buf, size);
        co_return;
    }(&loop, send_buf, sizeof(send_buf));

    TTestTask h2 = [](TSocket* socket, char* buf, int size) -> TTestTask
    {
        TSocket clientSocket = std::move(co_await socket->Accept());
        co_await clientSocket.ReadSome(buf, size);
        co_return;
    }(&socket, rcv_buf, sizeof(rcv_buf));

    while (!(h1.done() && h2.done())) {
        loop.Step();
    }
    h1.destroy(); h2.destroy();

    assert_true(memcmp(&send_buf, &rcv_buf, sizeof(send_buf))==0);
}

In this case, the event handling loop runs until both coroutines have
completed. The done method checks whether the coroutine has finished.
However, this method can only be called if the coroutine is "asleep," which is
why the promise with the std::suspend_always final_suspend signature is used
here.

Before moving on to the implementation of the multiplexer and asynchronous
socket operations, let's describe the main classes, structures, and functions
used in the program.

Basic Structures, Classes, and Functions

For convenience, the following using statements are employed for std::chrono
(clock-related operations) and std::coroutine_handle<> objects:

using TClock = std::chrono::steady_clock;
using TTime = TClock::time_point;
using THandle = std::coroutine_handle<>;

The low-level structure for working with the network address sockaddr_in is encapsulated within the following class:

class TAddress
{
public:
    TAddress(const std::string& addr, int port);
    TAddress(sockaddr_in addr);
};

The POSIX API uses the timeval structure to specify a timeout. We need a
function that converts a time interval specified by two time values into the
timeval structure:

timeval GetTimeval(TTime now, TTime deadline);

Implementations of TAddress and GetTimeval are straightforward, and I do
not provide them here. The code can be viewed here.

To work with timers and socket events, the TTimer and TEvent structures are used:

struct TTimer {
    TTime Deadline;
    int Fd;
    THandle Handle;
    bool operator<(const TTimer& e) const {
        return std::tuple(Deadline, Fd, !static_cast<bool>(Handle)) 
            < std::tuple(e.Deadline, e.Fd, !static_cast<bool>(e.Handle));
    }
};

struct TEvent {
    int Fd;
    enum {
        READ = 1,
        WRITE = 2
    };
    int Type;
    THandle Handle;

    bool Match(const TEvent& other) const {
        return Fd == other.Fd && (Type & other.Type);
    }
};

TTimer will be used to implement timeouts on the socket (e.g., connection
timeout) and for the implementation of the asynchronous Sleep function.
Objects of type TTimer will be stored in a priority queue ordered by
Deadline. When the Deadline arrives, the Handle coroutine is
awakened. Also, note the comparison function—timers with the same
Deadline and Fd, but with an empty Handle field, come before
timers with a filled Handle field. This is needed for timer removal.
Instead of deleting, we will insert a timer with an empty Handle into the
priority queue. During the priority queue processing, we will use this to skip
removed timers.

TEvent describes an event on the socket. The event type is specified by
the Type field. An event can be either a read event, a write event, or
both. When the event occurs, the Handle coroutine is awakened.

Using these structures, we will implement an event multiplexer based on select.

Event Handling Using select

First, let's describe the class fields. For each socket, we need to store
THandle, which is awakened on read/write. We will store them in the
THandlePair structure, which consists of the pair THandle Read /
THandle Write. We will also store a priority queue for handling timeouts.
This queue stores structures of type TTimer. After select wakes up,
some coroutines need to be awakened, and for some, we need to send them back to
select. For these purposes, we will maintain two vectors: Changes_
and ReadyEvents_. The class also stores a pair of utility fields used in
the select call.

class TSelect {
private:
    int MaxFd_ = 0; // The maximum file descriptor number passed to select
    // Events that have changed (coroutine awakened for them) 
    //  and need to be sent back to select
    std::vector<TEvent> Changes_; 
    // Events for which a coroutine needs to be awakened
    std::vector<TEvent> ReadyEvents_; 
    std::priority_queue<TTimer> Timers_;
    TTime LastTimersProcessTime_; // The time when timers were last processed

    std::vector<THandlePair> InEvents_; // Read/write events sent to select
    std::vector<fd_mask> ReadFds_; // Auxiliary array for select
    std::vector<fd_mask> WriteFds_; // Auxiliary array for select

Now let's look at the method that prepares arguments for the select call,
invokes select, and populates the vector with coroutines to be awakened.

    fd_set* ReadFds() {
        return reinterpret_cast<fd_set*>(&ReadFds_[0]);
    }
    fd_set* WriteFds() {
        return reinterpret_cast<fd_set*>(&WriteFds_[0]);
    }

public:
    void Poll() {
        auto deadline = Timers_.empty() ? TTime::max() : Timers_.top().Deadline;
        auto tv = GetTimeval(TClock::now(), deadline);

        constexpr int bits = sizeof(fd_mask)*8;

        if (InEvents_.size() <= MaxFd_) {
            InEvents_.resize(MaxFd_+1);
        }
        if (MaxFd_ >= ReadFds_.size()*bits) {
            ReadFds_.resize((MaxFd_+bits)/bits);
            WriteFds_.resize((MaxFd_+bits)/bits);
        }

        for (const auto& ch : Changes_) {
            int fd = ch.Fd;
            auto& ev = InEvents_[fd];
            if (ch.Handle) {
                if (ch.Type & TEvent::READ) {
                    FD_SET(fd, ReadFds()); ev.Read = ch.Handle;
                }
                if (ch.Type & TEvent::WRITE) {
                    FD_SET(fd, WriteFds()); ev.Write = ch.Handle;
                }
            } else {
                if (ch.Type & TEvent::READ) {
                    FD_CLR(fd, ReadFds()); ev.Read = {};
                }
                if (ch.Type & TEvent::WRITE) {
                    FD_CLR(fd, WriteFds()); ev.Write = {};
                }
            }
        }

        ReadyEvents_.clear(); Changes_.clear(); MaxFd_ = 0;

        if (select(InEvents_.size(), ReadFds(), WriteFds(), nullptr, &tv) < 0) {
            throw std::system_error(errno, std::generic_category(), "select");
        }

Reading/writing events on the socket are indicated in select by setting bits
in the bit arrays ReadFds_ and WriteFds_. Therefore, we must iterate
through all the changed events and set or clear bits for them in ReadFds_ and
WriteFds_. The occurrence of events like Timeout is determined by setting
the last argument of select. The tv argument is set so that select wakes
up when the Deadline of the first TTimer in the priority queue is reached.

After waking up from select, we iterate through the array of events and
check the read/write bit in the bit arrays ReadFds_ and WriteFds_. If the
corresponding bit is set, we place the event in the ReadyEvents_ array. We
will traverse this array later and awaken all the coroutines in it.

        for (int k=0; k < static_cast<int>(InEvents_.size()); ++k) {
            auto ev = InEvents_[k];

            if (FD_ISSET(k, WriteFds())) {
                assert(ev.Write);
                ReadyEvents_.emplace_back(TEvent{k, TEvent::WRITE, ev.Write});
            } else if (ev.Write) {
                // fd was cleared by select, set it
                FD_SET(k, WriteFds());
            }
            if (FD_ISSET(k, ReadFds())) {
                assert(ev.Read);
                ReadyEvents_.emplace_back(TEvent{k, TEvent::READ, ev.Read});
            } else if (ev.Read) {
                // fd was cleared by select, set it
                FD_SET(k, ReadFds());
            }
        }

        ProcessTimers();
    }

Now let's consider the processing of timers:

    void ProcessTimers() {
        auto now = TClock::now();
        int prevFd = -1;
        while (!Timers_.empty()&&Timers_.top().Deadline <= now) {
            TTimer timer = std::move(Timers_.top());

            if ((prevFd == -1 || prevFd != timer.Fd) && timer.Handle) {
                ReadyEvents_.emplace_back(TEvent{-1, 0, timer.Handle});
            }

            prevFd = timer.Fd;
            Timers_.pop();
        }
        LastTimersProcessTime_ = now;
    }

We iterate through the priority queue, and if the Deadline has arrived, we
place the event in ReadyEvents_. It's important to consider that the timer
might have been removed; in this case, we need to skip it and not place it in
ReadyEvents_.

Adding read/write events and timers is done trivially:

    void AddTimer(int fd, TTime deadline, THandle h) {
        Timers_.emplace(TTimer{deadline, fd, h});
    }

    void AddRead(int fd, THandle h) {
        MaxFd_ = std::max(MaxFd_, fd);
        Changes_.emplace_back(TEvent{fd, TEvent::READ, h});
    }

    void AddWrite(int fd, THandle h) {
        MaxFd_ = std::max(MaxFd_, fd);
        Changes_.emplace_back(TEvent{fd, TEvent::WRITE, h});
    }

Removing an event is accomplished by inserting an empty event into Changes_:

    void RemoveEvent(int fd) {
        MaxFd_ = std::max(MaxFd_, fd);
        Changes_.emplace_back(TEvent{fd, TEvent::READ|TEvent::WRITE, {}});
    }

Removing a timer is done by inserting an empty timer into Timers_:

    bool RemoveTimer(int fd, TTime deadline) {
        bool fired = deadline < LastTimersProcessTime_;
        if (!fired) {
            Timers_.emplace(TTimer{deadline, fd, {}});
        }
        return fired;
    }

Now let's consider the awakening of coroutines:

    void Wakeup(TEvent&& change) {
        change.Handle.resume();
        if (Changes_.empty() || !Changes_.back().Match(change)) {
            if (change.Fd >= 0) {
                change.Handle = {};
                Changes_.emplace_back(std::move(change));
            }
        }
    }

    void WakeupReadyHandles() {
        int i = 0;
        for (auto&& ev : ReadyEvents_) {
            Wakeup(std::move(ev));
        }
    }
};

The Wakeup method awakens a coroutine. After awakening and execution, the
coroutine might fall asleep again, waiting for the same descriptor on which we
previously woke up, or it might fall asleep on another descriptor. If the
coroutine falls asleep on a different descriptor, the one that triggered the
awakening must be removed from select. This check and removal are performed
in the condition if (Changes_.empty() ...). The WakeupReadyHandles method
traverses all events in ReadyEvents_ and awakens coroutines.

That concludes the description of the TSelect class. The Poll and
WakeupReadyHandles methods will be used in the event handling loop, and the
other methods will be used by the socket class in read/write/connection
methods.

The event handling loop is implemented trivially:

template<typename TPoller>
class TLoop {
public:
    void Loop() {
        while (Running_) {
            Step();
        }
    }

    void Stop() {
        Running_ = false;
    }

    void Step() {
        Poller_.Poll();
        Poller_.WakeupReadyHandles();
    }

    TPoller& Poller() {
        return Poller_;
    }

private:
    TPoller Poller_;
    bool Running_ = true;
};

This is a template class parameterized by the type TPoller, which can be
TSelect. This is done to allow for swapping out the implementation of the
multiplexer (using epoll instead of select, etc.). Also, for convenience
and future code extension, let's place all methods and fields needed by the
socket for operation in the base class TPollerBase and inherit TSelect
from TPollerBase. Thus, TPollerBase will contain methods AddTimer,
RemoveTimer, AddRead, AddWrite, RemoveEvent. The code for this
enhancement is trivial. Later, the socket class will work with the base class.

Socket

As fields, the socket has a descriptor Fd_, a pointer to the base class
poller Poller_, and an address Addr_. Constructors and destructors are
trivial. In the constructor, a socket is created and set to non-blocking mode
using fcntl. In the destructor, the descriptor is closed with close, and
events associated with the socket are removed from Poller_.

class TSocket {
    int Fd_ = -1;
    TPollerBase* Poller_ = nullptr;
    TAddress Addr_;

public:
    // Obvious things: Constructors, assignment operators, destructors

Let's present the code for the main socket methods - establishing a
connection, accepting a connection, reading, and writing. All these methods
will return a structure called TAwaitable, containing the methods
await_ready, await_suspend, await_resume. The compiler will insert
calls to these methods into the code. First, the await_ready method is
called; if it returns true, the coroutine will not go into a sleeping
state, and instead, the await_resume method will be immediately called to
get the result. If the await_ready method returns false, the
await_suspend method is called, and a reference to the coroutine is passed
as an argument. This reference can be saved and later used to awaken the
coroutine. Upon awakening, the await_resume method is always called,
returning the result.

Let's examine the Connect method, which establishes a connection. Here,
in await_ready, we attempt to establish a connection in non-blocking mode,
and if successful, we return true. If it's not possible to do it
immediately, we return false and go to sleep. We wake up when select
indicates that the connection is established and operations can be performed
with the socket (in this case, write operations). Note that here, with the
help of a timer (AddTimer) and the deadline parameter, you can set the
maximum connection waiting time.

auto Connect(TTime deadline = TTime::max()) {
    struct TAwaitable {
        bool await_ready() {
            int ret = connect(fd, (struct sockaddr*) &addr, sizeof(addr));
            if (ret < 0 
                && !(errno == EINTR||errno==EAGAIN||errno==EINPROGRESS)) {
                throw std::system_error(errno, 
                                        std::generic_category(), "connect");
            }
            return ret >= 0;
        }

        void await_suspend(std::coroutine_handle<> h) {
            poller->AddWrite(fd, h);
            if (deadline != TTime::max()) {
                poller->AddTimer(fd, deadline, h);
            }
        }

        void await_resume() {
            if (deadline != TTime::max() 
                && poller->RemoveTimer(fd, deadline)) {
                throw std::system_error(
                  std::make_error_code(std::errc::timed_out));
            }
        }

        TPollerBase* poller;
        int fd;
        sockaddr_in addr;
        TTime deadline;
    };
    return TAwaitable{Poller_, Fd_, Addr_.Addr(), deadline};
}

The read method works similarly, but in this case, we try to read data using
read. If it fails, we put the coroutine to sleep, and the descriptor is
sent to wait in select.

auto ReadSome(char* buf, size_t size) {
    struct TAwaitable  {
        bool await_ready() {
            Run();
            return ready = (ret >= 0);
        }

        int await_resume() {
            if (!ready) { Run(); }
            return ret;
        }

        void Run() {
            ret = read(fd, b, s);
            if (ret < 0 && !(err == EINTR||err==EAGAIN||err==EINPROGRESS)) {
                throw std::system_error(errno, 
                                        std::generic_category(), "read");
            }
        }

        void await_suspend(std::coroutine_handle<> h) {
            poller->AddRead(fd, h);
        }

        TSelect* poller;
        int fd;
        char* b; size_t s;
        int ret;
        bool ready;
    };
    return TAwaitable{Poller_,Fd_,buf,size};
}

Let's add another method, ReadSomeYield, which will always put the coroutine
to sleep immediately and send the descriptor to wait in select. This method
is useful for tests and benchmarks.

    auto ReadSomeYield(char* buf, size_t size) {
        struct TAwaitable  {
            bool await_ready() {
                return (ready = false);
            }
            ...
            same code here
            ...
        };
        return TAwaitable{Poller_,Fd_,buf,size};
    }

The write methods WriteSome and WriteSomeYield are implemented similarly,
but instead of calling read, write is used everywhere. In case of
sleeping, an event of type Write is added to the poller using AddWrite.

    auto WriteSome(char* buf, size_t size) {
        // Same as ReadSome
    }

    auto WriteSomeYield(char* buf, size_t size) {
        // Same as ReadSomeYield
    }

The method for accepting connections, Accept, used by the server, is
implemented as follows.

auto Accept() {
    struct TAwaitable {
        bool await_ready() const { return false; }
        void await_suspend(std::coroutine_handle<> h) {
            poller->AddRead(fd, h);
        }
        TSocket await_resume() {
            sockaddr_in clientaddr;
            socklen_t len = sizeof(clientaddr);

            int clientfd = accept(fd, (sockaddr*)&clientaddr, &len);
            if (clientfd < 0) {
                throw std::system_error(errno, 
                                        std::generic_category(), "accept");
            }

            return TSocket{clientaddr, clientfd, *poller};
        }

        TSelect* poller;
        int fd;
    };

    return TAwaitable{Poller_, Fd_};
}

In await_ready, we go to sleep immediately and wait for select to respond
that the server socket is readable (AddRead). If this happens, it means a
new client has arrived. In this case, we obtain the client socket using
the accept call, wrap it in TSocket, and return the result.

There are also the Bind and Listen methods, which are trivial wrappers
around the bind and listen calls. The first method binds the socket to an
address, and the second method puts the socket in passive mode, allowing the
accept method to be called to accept connections.

    /* assign the socket the 
       required address to which clients will send requests */
    void Bind() { /* trivial */ }

    void Listen() { /* trivial */ }
};

Performance

The full code of the examples discussed above can be downloaded using the link.
The complete version includes support not only for select but also for poll, epoll, kqueue, and even uring.

Our solution has proven its effectiveness in terms of usability, as seen in
the client and server code. Now let's look at the performance of the
written library and compare it with a similar solution that uses callbacks
for code implementation.

We'll apply the methodology from libevent. Create N pipes that pass
data to each other as follows:

struct Stat {
    int writes = 0;
    int fired = 0;
    int count = 0;
    int failures = 0;
    int out = 0;
};

template<typename TSocket>
TTestTask pipe_reader(TSocket& r, TSocket& w, Stat& s) {
    ssize_t size;
    char buf[1] = {0};

    try {
        while ((size = co_await r.ReadSomeYield(buf, 1)) != 0) {
            s.count += size;
            if (s.writes) {
                s.writes--;
                if (co_await w.WriteSome(buf, 1) != 1) {
                    s.failures ++;
                }
                s.fired++;
            }
        }
    } catch (const std::exception& ex) {
        s.failures ++;
    }
    s.out ++;
    co_return;
}

We read data from one pipe and write it to the next one in the chain. We
collect statistics about the number of bytes read, the number of write calls,
the number of remaining write calls, and so on in the Stat structure.
A similar benchmark included in libevent also collects similar statistics. To
ensure that the coroutine benchmark operates exactly like the callback-based
libevent benchmark, I compared the counter values of the statistics after
running the benchmark. The counter values precisely matched the values
provided by the libevent benchmark.

We write data into M pipes as follows:

template<typename TSocket>
TTestTask write_one(TSocket& w, Stat& s) {
    char buf[1] = {'e'};
    co_await w.WriteSome(buf, 1);
    s.fired ++;
    co_return;
}

Let's run two benchmarks. In the first one, we initially write data into one
pipe with M=1, meaning one pipe is active while the others are passive. We
increase the total number of pipes and observe how the processing time for all
events changes as we increase the number of pipes. In the second benchmark, we
set M=100. The results for various operating systems and CPUs are provided
below.
As you can see, the developed solution performs as well as libevent, and for
backend select and poll, it even performs better.

Below are the results of two benchmarks for the i7-12800H and i5-11400F processors, as well as the Apple M1. Results for the libevent library are also provided for comparison.

CPU i7-12800H, Ubuntu 23.04, clang 16
CPU i7-12800H, Ubuntu 23.04, clang 16
CPU i7-12800H, Ubuntu 23.04, clang 16
CPU i7-12800H, Ubuntu 23.04, clang 16
CPU i5-11400F, Ubuntu 23.04, WSL2, kernel 6.1.21.1-microsoft-standard-WSL2+
CPU i5-11400F, Ubuntu 23.04, WSL2, kernel 6.1.21.1-microsoft-standard-WSL2+
CPU i5-11400F, Ubuntu 23.04, WSL2, kernel 6.1.21.1-microsoft-standard-WSL2+
CPU i5-11400F, Ubuntu 23.04, WSL2, kernel 6.1.21.1-microsoft-standard-WSL2+
CPU Apple M1, MacBook Air M1 16G, MacOS 12.6.3
CPU Apple M1, MacBook Air M1 16G, MacOS 12.6.3
CPU Apple M1, MacBook Air M1 16G, MacOS 12.6.3
CPU Apple M1, MacBook Air M1 16G, MacOS 12.6.3

Conclusion

We have successfully developed a high-performance network library that not only excels in efficiency but also offers ease of use and extendability. The library's unique design supports asynchronous I/O for both sockets and files by leveraging the capabilities of the uring backend. The code snippets provided are straightforward, mirroring the structure of typical synchronous code, thereby simplifying the learning curve for new users. This combination of performance, simplicity, and extendability makes our library a robust tool for building efficient, scalable network applications. It stands as a testament to the power and flexibility of modern C++ programming using coroutines.

Links

  1. Callbacks

  2. Actors

  3. Future/Promise

  4. Coroutines

  5. Coroutines in C++

  6. Libevent

  7. resetius/coroio

Tags:
Hubs:
Total votes 6: ↑4 and ↓2+2
Comments14

Articles