From 7ee1b45af11535ae7e4081d5fabc8eec31715a52 Mon Sep 17 00:00:00 2001 From: frosty Date: Tue, 31 Mar 2026 00:44:53 +0300 Subject: perf: replace blocking accept and mutex queue with poll + lock-free hybrid --- src/beaker_globals.h | 5 +- src/server.c | 143 +++++++++++++++++++++++++++++++-------------------- 2 files changed, 90 insertions(+), 58 deletions(-) diff --git a/src/beaker_globals.h b/src/beaker_globals.h index 7b727bc..0682533 100644 --- a/src/beaker_globals.h +++ b/src/beaker_globals.h @@ -1,7 +1,8 @@ -#ifndef BEAKER_GLOBALS_H +#ifndef BEAKER_GLOBALS_H #define BEAKER_GLOBALS_H -#include "../beaker.h" +#include "../beaker.h" +#include extern RouteHandler handlers[MAX_HANDLERS]; diff --git a/src/server.c b/src/server.c index caeb4bf..270be70 100644 --- a/src/server.c +++ b/src/server.c @@ -12,6 +12,9 @@ #include #include #include +#include +#include +#include #define MAX_PENDING_CONNECTIONS 128 @@ -23,22 +26,28 @@ static void signal_handler(int sig) { } typedef struct { - int client_sockets[MAX_PENDING_CONNECTIONS]; - int count; - int front; - int rear; + _Atomic(size_t) sequence; + int socket; +} WorkSlot; + +typedef struct { + _Atomic(size_t) head; + _Atomic(size_t) tail; + _Atomic(int) shutdown; + WorkSlot slots[MAX_PENDING_CONNECTIONS]; pthread_mutex_t mutex; pthread_cond_t cond; - int shutdown; } WorkQueue; static WorkQueue g_work_queue; static void work_queue_init(WorkQueue *queue) { - queue->count = 0; - queue->front = 0; - queue->rear = 0; - queue->shutdown = 0; + atomic_store(&queue->head, 0); + atomic_store(&queue->tail, 0); + atomic_store(&queue->shutdown, 0); + for (int i = 0; i < MAX_PENDING_CONNECTIONS; i++) { + atomic_store(&queue->slots[i].sequence, (size_t)i); + } pthread_mutex_init(&queue->mutex, NULL); pthread_cond_init(&queue->cond, NULL); } @@ -49,40 +58,55 @@ static void work_queue_destroy(WorkQueue *queue) { } static int work_queue_push(WorkQueue *queue, int client_socket) { - pthread_mutex_lock(&queue->mutex); - - if (queue->count >= MAX_PENDING_CONNECTIONS) { - pthread_mutex_unlock(&queue->mutex); - return -1; + size_t tail = atomic_load(&queue->tail); + + for (;;) { + WorkSlot *slot = &queue->slots[tail % MAX_PENDING_CONNECTIONS]; + size_t seq = atomic_load(&slot->sequence); + intptr_t diff = (intptr_t)seq - (intptr_t)tail; + + if (diff == 0) { + if (atomic_compare_exchange_weak(&queue->tail, &tail, tail + 1)) { + slot->socket = client_socket; + atomic_store(&slot->sequence, tail + 1); + pthread_cond_signal(&queue->cond); + return 0; + } + } else if (diff < 0) { + return -1; + } else { + tail = atomic_load(&queue->tail); + } } - - queue->client_sockets[queue->rear] = client_socket; - queue->rear = (queue->rear + 1) % MAX_PENDING_CONNECTIONS; - queue->count++; - - pthread_cond_signal(&queue->cond); - pthread_mutex_unlock(&queue->mutex); - return 0; } static int work_queue_pop(WorkQueue *queue) { - pthread_mutex_lock(&queue->mutex); - - while (queue->count == 0 && !queue->shutdown) { - pthread_cond_wait(&queue->cond, &queue->mutex); - } - - if (queue->shutdown && queue->count == 0) { - pthread_mutex_unlock(&queue->mutex); - return -1; + size_t head = atomic_load(&queue->head); + + for (;;) { + if (atomic_load(&queue->shutdown)) return -1; + + WorkSlot *slot = &queue->slots[head % MAX_PENDING_CONNECTIONS]; + size_t seq = atomic_load(&slot->sequence); + intptr_t diff = (intptr_t)seq - (intptr_t)(head + 1); + + if (diff == 0) { + if (atomic_compare_exchange_weak(&queue->head, &head, head + 1)) { + int fd = slot->socket; + atomic_store(&slot->sequence, head + MAX_PENDING_CONNECTIONS); + return fd; + } + } else if (diff < 0) { + pthread_mutex_lock(&queue->mutex); + if (!atomic_load(&queue->shutdown) && atomic_load(&queue->head) == head) { + pthread_cond_wait(&queue->cond, &queue->mutex); + } + pthread_mutex_unlock(&queue->mutex); + head = atomic_load(&queue->head); + } else { + head = atomic_load(&queue->head); + } } - - int client_socket = queue->client_sockets[queue->front]; - queue->front = (queue->front + 1) % MAX_PENDING_CONNECTIONS; - queue->count--; - - pthread_mutex_unlock(&queue->mutex); - return client_socket; } static int get_optimal_thread_count(void) { @@ -149,6 +173,13 @@ static int initialize_server_socket(const char *ip, int port, int *server_fd_out return -1; } + int flags = fcntl(*server_fd_out, F_GETFL, 0); + if (flags < 0 || fcntl(*server_fd_out, F_SETFL, flags | O_NONBLOCK) < 0) { + perror("fcntl O_NONBLOCK failed"); + close(*server_fd_out); + return -1; + } + printf("Beaker server listening on %s:%d\n", ip, port); return 0; } @@ -295,32 +326,32 @@ void beaker_run_with_threads(const char *ip, int port, int num_workers) { printf("Beaker server started with %d worker threads\n", num_workers); - while (!g_shutdown_requested) { - int new_socket; + struct pollfd pfd = { .fd = server_fd, .events = POLLIN }; - if ((new_socket = accept(server_fd, (struct sockaddr *)&address, - (socklen_t *)&addrlen)) < 0) { - if (g_shutdown_requested) { - break; - } - perror("accept failed"); - fprintf(stderr, "[ERROR] beaker_run_with_threads: Failed to accept connection.\n"); - continue; + while (!g_shutdown_requested) { + int ret = poll(&pfd, 1, 1000); + if (ret < 0) { + if (errno == EINTR) continue; + perror("poll failed"); + break; } + if (ret == 0) continue; - if (work_queue_push(&g_work_queue, new_socket) < 0) { - fprintf(stderr, "[WARNING] Work queue full, closing connection\n"); - const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n"; - send(new_socket, busy_response, strlen(busy_response), 0); - close(new_socket); + int new_socket; + while ((new_socket = accept(server_fd, (struct sockaddr *)&address, + (socklen_t *)&addrlen)) >= 0) { + if (work_queue_push(&g_work_queue, new_socket) < 0) { + fprintf(stderr, "[WARNING] Work queue full, closing connection\n"); + const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n"; + send(new_socket, busy_response, strlen(busy_response), 0); + close(new_socket); + } } } printf("Shutting down server...\n"); - pthread_mutex_lock(&g_work_queue.mutex); - g_work_queue.shutdown = 1; - pthread_mutex_unlock(&g_work_queue.mutex); + atomic_store(&g_work_queue.shutdown, 1); pthread_cond_broadcast(&g_work_queue.cond); for (int i = 0; i < num_workers; i++) { -- cgit v1.2.3