Pull to refresh

Exploring a possible implementation of non-blocking IO by writing a server on pure syscalls

Reading time11 min
Views2.2K
Original author: vda19999

How do people usually write a server if they don't really care about performance? A program starts, then starts accepting incoming connections from clients and starts a new thread for each client, which is engaged in servicing this client. If you use framework, like Spring or Flask or Poco there, then it does something like this inside itself - the only difference is the threads can be reused, that is, taken from a certain pool. It's all quite convenient, but not too effective (and Spring is also bad). Most likely, your threads serving clients do not live long and most of the time they are waiting either to receive data from the client or to send it to the client - that is, they are waiting for some system calls to return. Creating an OS thread is quite an expensive operation, as is context switching between OS threads. If you want to be able to serve a lot of customers efficiently, you need to come up with something else. For example, callbacks, but they are pretty inconvenient (though there are different opinions on this).

Another option is to use non-blocking I/O in combination with some kind of implementation of user-space threads (fibers). In this article I will show you how to write all this with your own hands.

All the code is given in my repository. There are 3 branches there: good-old-one-thread-impl contains the original implementation, hand-context is a version with manual context switching and implementation of fiber-local variables, the other two branches contain an attempt to make this implementation work in several OS threads. The entire code is given only as proof-on-concept and contains errors.

Some introductory explanations

What are user-space threads? These are threads that the OS does not participate in switching and about which it knows nothing. They can all work in one OS thread or in several (like goroutines in Go, virtual threads in Java 19). These threads implement the idea of cooperative multitasking: a thread can only be switched if it asks for it (to be exact, Go isn`t really coperative anymore, but the article isn`t about it). In our case, the thread will be removed from execution when it is waiting for some input or output - while it is waiting for this, other threads will be executed.

What is the non-blocking I/O interface in Linux like? Usually, the read and write system calls are used for input and output. There are others, but we will consider these ones, since they are the most common: you can use them to work with files, network, pipes, you can read signals (see signalfd). Usually, such system calls are blocked until the data is read or written, but you can configure the file descriptor to be non-blocking - then the read and write system calls will not be blocked - and if the data cannot be read or written, an EAGAIN error is returned. It remains to somehow learn to understand which file descriptors are ready for I/O, so as not to try them blindly.

There are several such mechanisms in Linux: select(2), poll(2), epoll(7). Each of them provides an opportunity to tell the OS kernel which file descriptors we are interested in, and the kernel will tell us which ones are ready for I/O. select(2) is outdated, and I will use poll(2). epoll(7) is a more effective solution, but it is late to change. Anyway, poll(2) will still be good as a presentation.

Implementing fibers

At first, we will use boost::context to implement fibers, but then we will get rid of it. The boost::context library allows us to switch contexts. It remains for us to write some wrapper over the context and write some simple thread scheduler.

The execution context (represented by boost::context) is the state of a certain execution, that is, the values of registers and the stack.

The implementation of fibers will be some a class that stores the context itself, the function that the fiber executes, information about whether execution has been started, whether it has been completed, whether the fiber is ready for execution.

FiberImpl
class FiberImpl: public std::enable_shared_from_this<FiberImpl> {
// // everything inside is private, because for convenience I will write a wrapper over
// this implementation
private:
    explicit FiberImpl(const std::function<void()>& func);
    // ideally, there should also be a cinstructor accepting function&&
    void join();
    bool isFinished() const;
    void start();
    bool isReady() const;
    friend class Fiber;
    friend class FiberManager;
    friend class CondVar;
    friend void sched_execution();
    void continue_executing();
    void suspend();
    std::function<void()> func;
    // context that describes the state of the thread before termination
    continuation this_context;
    // the context that was executed before the execution of the fiber
    continuation previous_context;
    // condition variable, that can be waited on to join the thread
    CondVar finish_cv;
    bool launched = false;
    bool finished = false;
    bool is_ready = false;

};

The most interesting thing is how a fiber should be started. When the fiber is started, the calcc function from boost::context should be called, which takes a lambda function and returns the context in which the passed function is executed (callcc returns when the lambda function wants to switch the context). The lambda function must accept the context from which it was called - with its help, it can switch back to it.

When a fiber needs to be put to execution for the second time, you just need to execute the resume method of the context - it will start executing this context, and when it wants to switch back, the function will return the new state of this context.

starting a fiber
void FiberImpl::continue_executing() {
    if (!launched) {
        launched = true;
        this_context = callcc([&](auto sink) {
            cerr << "starting func in new fiber\n";
            previous_context = std::move(sink);
            func();
            finished = true;
            finish_cv.notify_all();
            return std::move(previous_context);
        });
    } else {
        this_context = this_context.resume();
    }
}

The implementation of other FiberImpl methods is mostly trivial.

trivial implementation
extern FiberManager fiberManager;

extern std::shared_ptr<FiberImpl> current_fiber;

FiberImpl::FiberImpl(const std::function<void()> &func) {
    this->func = func;
}

void FiberImpl::join() {
    while (!finished) {
        finish_cv.wait();
    }
}

void FiberImpl::start() {
    fiberManager.registerFiber(this->shared_from_this());
    is_ready = true;
}

void FiberImpl::suspend() {
    previous_context = previous_context.resume();
}

void sched_execution() {
    current_fiber->suspend();
}

void startFiberManager() {
    fiberManager.work();
}

The fiber manager should be able to store a list of fibers ready for execution and be engaged in launching these fibers.

fiber manager
using std::shared_ptr;

void startFiberManager();

class FiberManager {
    friend class Fiber;
    friend class FiberImpl;
    friend class CondVar;
    friend void sched_execution();
    void work();
    void registerFiber(const shared_ptr<FiberImpl>& fiber_ptr);

    list<shared_ptr<FiberImpl>> ready_fibers;

    friend void startFiberManager();
};

// note that the manager stores only ready-to-execute
// fibers. the rest should be stored somewhere else
FiberManager fiberManager;

std::shared_ptr<FiberImpl> current_fiber;

void FiberManager::work() {
    while (!ready_fibers.empty()) {
        auto iterator = this->ready_fibers.begin();
        while (iterator != ready_fibers.end()) {
            current_fiber = *iterator;
            if (current_fiber->isReady()) {
                current_fiber->continue_executing();
            }
            if (current_fiber->isFinished()) {
                iterator = ready_fibers.erase(iterator);
            } else {
                iterator++;
            }
        }
    }
}

void FiberManager::registerFiber(const shared_ptr<FiberImpl>& fiber_ptr) {
    ready_fibers.push_back(fiber_ptr);
}

The FibberImpl class is not very convenient to use: you need to create a shared_ptr, the function must be of type void(void), and you also need to call .start() . Let's write a simple wrapper over it.

wrapper
class Fiber {
public:
    template<typename Callable, typename... Args>
    explicit Fiber(const Callable& function, const Args&... args) {
        fiber_ptr = shared_ptr<FiberImpl>(new FiberImpl([&] () {
            function(args...);
        }));
        fiber_ptr->start();
    }

    void join() {
        fiber_ptr->join();
    }

    void detach() {
        fiberManager.work();
    }

    bool isFinished() {
        return fiber_ptr->isFinished();
    }

    bool isReady() {
        return fiber_ptr->isReady();
    }

private:
    shared_ptr<FiberImpl> fiber_ptr;
};

It remains to write some synchronization primitives.

condition variable
class FiberImpl;

class CondVar {
public:
    void wait();
    void notify_one();
    void notify_all();

private:
    std::vector<shared_ptr<FiberImpl>> waiters;
};

The interface is trivial. Inside we store a list of fibers that are waiting on this condition variable.

extern FiberManager fiberManager;
extern std::shared_ptr<FiberImpl> current_fiber;

void CondVar::wait() {
    waiters.push_back(current_fiber);
    current_fiber->is_ready = false;
    current_fiber->suspend();
}

void CondVar::notify_one() {
    if (!waiters.empty()) {
        auto fiber_ptr = *waiters.rbegin();
        fiber_ptr->is_ready = true;
        waiters.pop_back();
    }
}

void CondVar::notify_all() {
    for (auto& fiber_ptr : waiters) {
        fiber_ptr->is_ready = true;
    }
    waiters.clear();
}

When a fiber wants to wait, it is added to the list, marked as not ready for execution and suspended. When someone wants to wake up one or many fibers, they are marked as ready for execution and removed from the list.

Please note that so far we have all fibers running in the same OS thread, they switch at times controlled by us, so there is no need to think about some kind of synchronization inside condition variable.

The second thing you can pay attention to is that normally you wait on a condition variable by taking a mutex, and condition variable releases it and then captures it again. However, in the conditions of cooperative multitasking, we do not need a mutex at all. In addition, in fact, the futex system call, with which the mutex is implemented, is just a kind of condition variable. Also, unlike std::condition_variable, we can't have any spurious wake ups.

Non-blocking IO

Now we need to write an implementation of waiting for file descriptors to be ready for I/O. There will be some separate fiber that will execute the poll system call to get information that the file descriptor is ready.

A thread that wants to wait for the file descriptor to be ready will create a condition variable, save its request to wait for I/O and fall asleep on this condition variable.

implementation
struct FdRequest {
    CondVar cv;
    int fd;
    short events;
};

class Waiter {
public:
    Waiter();

    static int wait(int fd, short events);

    static void stop();

    static void loop();

private:

    CondVar cv;
    unordered_map<int, FdRequest*> map;
    bool stopped = false;
};
Waiter waiter;

int Waiter::wait(int fd, short events) {
    FdRequest fdRequest{.cv = CondVar(), .fd = fd, .events = events};
    waiter.map[fd] = &fdRequest;
    waiter.cv.notify_one();
    fdRequest.cv.wait();
    return fdRequest.events;
}

Waiter::Waiter() {
    std::cout << "waiter initialising" << std::endl;
}

[[maybe_unused]] Fiber fiber(Waiter::loop);

void Waiter::stop() {
    waiter.stopped = true;
}

void Waiter::loop() {
    while (!waiter.stopped) {
        while (waiter.map.empty()) {
            waiter.cv.wait();
        }
        std::vector<pollfd> request;
        request.reserve(waiter.map.size());
        for (auto& elem : waiter.map) {
            request.push_back(pollfd{.fd = elem.first, 
              .events = elem.second->events, 
              .revents = 0});
        }
        int ret = poll(&request[0], request.size(), 100);
        if (ret < 0) {
            printf("poll returned with error %s", strerror(errno));
            continue;
        }
        for (auto& elem : request) {
            if (elem.revents > 0) {
                waiter.map[elem.fd]->cv.notify_one();
                waiter.map.erase(elem.fd);
            }
        }
        sched_execution();
    }
}

Writing a server

Now we can write a server. It will work like this: at startup, a fiber will be launched that will create a socket, link it to the port and then start accepting connections from clients on it. At the same time, before calling accept to accept connections, you should use Waiter to wait for incoming connections.

When a new connection appears, a new fiber will be started, which will wait for input from the client, and then write back to the socket exactly what was read.

implementation
void worker(int fd) {
    printf("work called with fd: %d\n", fd);
    char buf[1024];
    while (true) {
        Waiter::wait(fd, POLLIN);
        ssize_t n = read(fd, buf, sizeof(buf));
        if (n == 0) {
            printf("client finished, leaving\n");
            return;
        }
        if (n < 0) {
            printf("in worker error: %s\n", strerror(errno));
            return ;
        }
        int wrote = 0;
        while (wrote < n) {
            Waiter::wait(fd, POLLOUT);
            ssize_t m = write(fd, buf + wrote, n - wrote);
            if (m < 0) {
                printf("in worker error: %s\n", strerror(errno));
                return ;
            }
            wrote += m;
        }
    }
}

int main() {
    Fiber main_fiber([] () {
        std::cout << "main enetered" << endl;
        Fiber global_fiber([]() {
            int socket_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if (socket_fd < 0) {
                printf("socket error: %s\n", strerror(errno));
                exit(0);
            }
            int ret = fcntl(socket_fd, F_SETFL, O_NONBLOCK);
            if (ret == -1) {
                printf("fcntl error: %s\n", strerror(errno));
                exit(0);
            }
            sockaddr_in sin{};
            sin.sin_family = AF_INET;
            sin.sin_port = htons(8001);
            sin.sin_addr = in_addr{0};
            if (bind(socket_fd, reinterpret_cast<const sockaddr *>(&sin), sizeof(sin)) < 0) {
                printf("bind error: %s\n", strerror(errno));
                exit(0);
            }
            if (listen(socket_fd, 10) < 0) {
                printf("listen error: %s\n", strerror(errno));
                exit(0);
            }
            while (true) {
                printf("accepting\n");
                Waiter::wait(socket_fd, POLLIN);
                int client_fd = accept4(socket_fd, nullptr, nullptr, SOCK_NONBLOCK);
                Fiber thread(worker, client_fd);
            }
        });
    });
    startFiberManager();
    main_fiber.join();
}

We launch it - it works! There is only one OS thread, but we serve many clients in parallel and write code for many threads.

Multithreading

In fact, the approach in which all user-space threads are executed in one OS thread, firstly, does not scale, and secondly, leads to delays for all clients if servicing one of the clients requires significant processor time.

I tried to overcome this limitation by running the thread manager in several OS threads and preparing all classes accordingly.

On the one hand, I didn't have to write anything fundamentally new for this, and on the other hand, I didn't manage to fully debug it, so we won't talk about it in more detail.

Getting rid off boost::context

Now it's time to learn how to do everything with our own hands and get rid of boost::context. To do this, we need to create a structure in which we will store the context, write code to create it and switch between contexts.

code
class Context {
public:
    static const ssize_t STACK_SIZE = 4096 * 2;

    Context() = default;
    Context(const Context&) = delete;
    Context& operator = (const Context&) = delete;
    Context(Context&& other);
    Context& operator = (Context&& other) noexcept;
    static Context create_context();
    ~Context();

    void setRip(unsigned long rip);

private:
    unsigned long rbx;
    unsigned long rsp;
    unsigned long rbp;
    unsigned long r12;
    unsigned long r13;
    unsigned long r14;
    unsigned long r15;
    unsigned long rip;
};

extern "C" {
/*
 * saves current context into old_context_dest and loads new_context
 */
extern void switch_context(Context* old_context_dest, Context* new_context, unsigned long first_arg = 0);
}

The switch_context function gets two context pointers. The current context is saved using the first pointer, after which the context from the second argument is executed, but before that the third argument (first_arg) is written to the %rdi register - this is necessary so that when a function is run for the first time in a context, an argument can be passed to it.

This function can only be implemented in assembly.

switch_context:
// saving current context
    mov     %rbx, (%rdi)
    mov     %rsp, 8(%rdi)
    mov     %rbp, 16(%rdi)
    mov     %r12, 24(%rdi)
    mov     %r13, 32(%rdi)
    mov     %r14, 40(%rdi)
    mov     %r15, 48(%rdi)
// the address from which this function was called is written in the
// rip field, so that when switching to this context, it starts executing 
// code after executing switch_context
    mov     (%rsp), %rax
    mov     %rax, 56(%rdi)

    mov     %rsi, %rdi
// restoring other context
    mov     (%rdi), %rbx
    mov     8(%rdi), %rsp
    mov     16(%rdi), %rbp
    mov     24(%rdi), %r12
    mov     32(%rdi), %r13
    mov     40(%rdi), %r14
    mov     48(%rdi), %r15
    mov     56(%rdi), %rax
    mov     %rax, (%rsp)
    mov     %rdx, %rdi
// the value of the rip field is written to the top of the stack so that ret returns
// us exactly there
    ret

Creating a new context is trivial - you need to allocate memory for the stack and write it to the rsp field.

Fiber local variables

Initially, the code of the server could only work under Linux. When we got rid of boost, we limited ourselves to the x86/64 architecture. Now we will limit ourselves even more - the code will require a processor that supports fsgsbase instructions and a fairly new version of Linux.

I have not yet thoroughly understood the work of thread local variables, and I apologize for that.

How do thread local variables work? I have already written about this in a little more detail in another article which also contains code for adding support for thread local variables to an educational OS. Now I'll just say that by default, the compiler accesses thread local variables by some offset from the value of the segment register %fs. Accordingly, in order for them to work, the code that starts a thread must allocate some memory for thread local storage, initialize it and write the address of this memory to %fs.

But in the first approximation, nothing prevents us from doing the same when starting a fiber in order for all variables declared as thread_local to work as fiber local.

Actually, in the past, user programs could not write to %fs directly - only the kernel could do this, and for this the user program had to make a system call. Then the rdfsbase, wrfsbase, rdgsbase and wrfsbase instructions appeared in processors, but in order for them to work, the OS kernel must explicitly allow them, so a fairly modern kernel is needed to use them.

So, we need:

  1. add the %fs value to the Context class

  2. save and restore %fs when switching context in the switch_context function

  3. read the ELF file of the executable file to find out the TLS size what to initialise it with

  4. When creating a context, allocate memory for TLS, initialize and write to %fs the address of this memory

It should be noted, however, that thread_local variables are used not only by our code, but also by libstdc, so we need to understand in detail how to support fiber local variables so that thread_local variables would also work in libraries with which our binary is linked. Or it would be nice if our fiber local variables were declared not as thread_local, but as fiber_local, and the compiler would use %gs instead of %fs for them - because variables from libstdc may well be thread local, not fiber local

If you carefully follow the steps described above (or run the code from my repository after building it correctly) fiber local variables will work. Isn't it a miracle?

Tags:
Hubs:
Total votes 1: ↑1 and ↓0+1
Comments2

Articles