aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorfrosty <gabriel@bwaaa.monster>2026-03-31 00:44:53 +0300
committerfrosty <gabriel@bwaaa.monster>2026-03-31 00:44:53 +0300
commit7ee1b45af11535ae7e4081d5fabc8eec31715a52 (patch)
tree73c37f5cdd24b2fddc9c6fd71d47c3f24e27efb3 /src
parentab4810b05b0fa5c6d7cd7b8e32afbc39c9ad98cc (diff)
downloadbeaker-7ee1b45af11535ae7e4081d5fabc8eec31715a52.tar.gz
perf: replace blocking accept and mutex queue with poll + lock-free hybrid
Diffstat (limited to 'src')
-rw-r--r--src/beaker_globals.h5
-rw-r--r--src/server.c143
2 files changed, 90 insertions, 58 deletions
diff --git a/src/beaker_globals.h b/src/beaker_globals.h
index 7b727bc..0682533 100644
--- a/src/beaker_globals.h
+++ b/src/beaker_globals.h
@@ -1,7 +1,8 @@
-#ifndef BEAKER_GLOBALS_H
+#ifndef BEAKER_GLOBALS_H
#define BEAKER_GLOBALS_H
-#include "../beaker.h"
+#include "../beaker.h"
+#include <stdatomic.h>
extern RouteHandler handlers[MAX_HANDLERS];
diff --git a/src/server.c b/src/server.c
index caeb4bf..270be70 100644
--- a/src/server.c
+++ b/src/server.c
@@ -12,6 +12,9 @@
#include <unistd.h>
#include <stdlib.h>
#include <signal.h>
+#include <fcntl.h>
+#include <poll.h>
+#include <errno.h>
#define MAX_PENDING_CONNECTIONS 128
@@ -23,22 +26,28 @@ static void signal_handler(int sig) {
}
typedef struct {
- int client_sockets[MAX_PENDING_CONNECTIONS];
- int count;
- int front;
- int rear;
+ _Atomic(size_t) sequence;
+ int socket;
+} WorkSlot;
+
+typedef struct {
+ _Atomic(size_t) head;
+ _Atomic(size_t) tail;
+ _Atomic(int) shutdown;
+ WorkSlot slots[MAX_PENDING_CONNECTIONS];
pthread_mutex_t mutex;
pthread_cond_t cond;
- int shutdown;
} WorkQueue;
static WorkQueue g_work_queue;
static void work_queue_init(WorkQueue *queue) {
- queue->count = 0;
- queue->front = 0;
- queue->rear = 0;
- queue->shutdown = 0;
+ atomic_store(&queue->head, 0);
+ atomic_store(&queue->tail, 0);
+ atomic_store(&queue->shutdown, 0);
+ for (int i = 0; i < MAX_PENDING_CONNECTIONS; i++) {
+ atomic_store(&queue->slots[i].sequence, (size_t)i);
+ }
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
}
@@ -49,40 +58,55 @@ static void work_queue_destroy(WorkQueue *queue) {
}
static int work_queue_push(WorkQueue *queue, int client_socket) {
- pthread_mutex_lock(&queue->mutex);
-
- if (queue->count >= MAX_PENDING_CONNECTIONS) {
- pthread_mutex_unlock(&queue->mutex);
- return -1;
+ size_t tail = atomic_load(&queue->tail);
+
+ for (;;) {
+ WorkSlot *slot = &queue->slots[tail % MAX_PENDING_CONNECTIONS];
+ size_t seq = atomic_load(&slot->sequence);
+ intptr_t diff = (intptr_t)seq - (intptr_t)tail;
+
+ if (diff == 0) {
+ if (atomic_compare_exchange_weak(&queue->tail, &tail, tail + 1)) {
+ slot->socket = client_socket;
+ atomic_store(&slot->sequence, tail + 1);
+ pthread_cond_signal(&queue->cond);
+ return 0;
+ }
+ } else if (diff < 0) {
+ return -1;
+ } else {
+ tail = atomic_load(&queue->tail);
+ }
}
-
- queue->client_sockets[queue->rear] = client_socket;
- queue->rear = (queue->rear + 1) % MAX_PENDING_CONNECTIONS;
- queue->count++;
-
- pthread_cond_signal(&queue->cond);
- pthread_mutex_unlock(&queue->mutex);
- return 0;
}
static int work_queue_pop(WorkQueue *queue) {
- pthread_mutex_lock(&queue->mutex);
-
- while (queue->count == 0 && !queue->shutdown) {
- pthread_cond_wait(&queue->cond, &queue->mutex);
- }
-
- if (queue->shutdown && queue->count == 0) {
- pthread_mutex_unlock(&queue->mutex);
- return -1;
+ size_t head = atomic_load(&queue->head);
+
+ for (;;) {
+ if (atomic_load(&queue->shutdown)) return -1;
+
+ WorkSlot *slot = &queue->slots[head % MAX_PENDING_CONNECTIONS];
+ size_t seq = atomic_load(&slot->sequence);
+ intptr_t diff = (intptr_t)seq - (intptr_t)(head + 1);
+
+ if (diff == 0) {
+ if (atomic_compare_exchange_weak(&queue->head, &head, head + 1)) {
+ int fd = slot->socket;
+ atomic_store(&slot->sequence, head + MAX_PENDING_CONNECTIONS);
+ return fd;
+ }
+ } else if (diff < 0) {
+ pthread_mutex_lock(&queue->mutex);
+ if (!atomic_load(&queue->shutdown) && atomic_load(&queue->head) == head) {
+ pthread_cond_wait(&queue->cond, &queue->mutex);
+ }
+ pthread_mutex_unlock(&queue->mutex);
+ head = atomic_load(&queue->head);
+ } else {
+ head = atomic_load(&queue->head);
+ }
}
-
- int client_socket = queue->client_sockets[queue->front];
- queue->front = (queue->front + 1) % MAX_PENDING_CONNECTIONS;
- queue->count--;
-
- pthread_mutex_unlock(&queue->mutex);
- return client_socket;
}
static int get_optimal_thread_count(void) {
@@ -149,6 +173,13 @@ static int initialize_server_socket(const char *ip, int port, int *server_fd_out
return -1;
}
+ int flags = fcntl(*server_fd_out, F_GETFL, 0);
+ if (flags < 0 || fcntl(*server_fd_out, F_SETFL, flags | O_NONBLOCK) < 0) {
+ perror("fcntl O_NONBLOCK failed");
+ close(*server_fd_out);
+ return -1;
+ }
+
printf("Beaker server listening on %s:%d\n", ip, port);
return 0;
}
@@ -295,32 +326,32 @@ void beaker_run_with_threads(const char *ip, int port, int num_workers) {
printf("Beaker server started with %d worker threads\n", num_workers);
- while (!g_shutdown_requested) {
- int new_socket;
+ struct pollfd pfd = { .fd = server_fd, .events = POLLIN };
- if ((new_socket = accept(server_fd, (struct sockaddr *)&address,
- (socklen_t *)&addrlen)) < 0) {
- if (g_shutdown_requested) {
- break;
- }
- perror("accept failed");
- fprintf(stderr, "[ERROR] beaker_run_with_threads: Failed to accept connection.\n");
- continue;
+ while (!g_shutdown_requested) {
+ int ret = poll(&pfd, 1, 1000);
+ if (ret < 0) {
+ if (errno == EINTR) continue;
+ perror("poll failed");
+ break;
}
+ if (ret == 0) continue;
- if (work_queue_push(&g_work_queue, new_socket) < 0) {
- fprintf(stderr, "[WARNING] Work queue full, closing connection\n");
- const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n";
- send(new_socket, busy_response, strlen(busy_response), 0);
- close(new_socket);
+ int new_socket;
+ while ((new_socket = accept(server_fd, (struct sockaddr *)&address,
+ (socklen_t *)&addrlen)) >= 0) {
+ if (work_queue_push(&g_work_queue, new_socket) < 0) {
+ fprintf(stderr, "[WARNING] Work queue full, closing connection\n");
+ const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n";
+ send(new_socket, busy_response, strlen(busy_response), 0);
+ close(new_socket);
+ }
}
}
printf("Shutting down server...\n");
- pthread_mutex_lock(&g_work_queue.mutex);
- g_work_queue.shutdown = 1;
- pthread_mutex_unlock(&g_work_queue.mutex);
+ atomic_store(&g_work_queue.shutdown, 1);
pthread_cond_broadcast(&g_work_queue.cond);
for (int i = 0; i < num_workers; i++) {