From 0a5a3114eccf9b6c2af805f19fda2835ce247abb Mon Sep 17 00:00:00 2001 From: frosty Date: Thu, 5 Mar 2026 06:48:37 +0000 Subject: multithreading, minor fixes --- src/server.c | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 159 insertions(+), 18 deletions(-) (limited to 'src/server.c') diff --git a/src/server.c b/src/server.c index 1176a66..b623db7 100644 --- a/src/server.c +++ b/src/server.c @@ -8,6 +8,101 @@ #include #include #include +#include +#include +#include +#include + +#define MAX_PENDING_CONNECTIONS 128 + +static volatile sig_atomic_t g_shutdown_requested = 0; + +static void signal_handler(int sig) { + (void)sig; + g_shutdown_requested = 1; +} + +typedef struct { + int client_sockets[MAX_PENDING_CONNECTIONS]; + int count; + int front; + int rear; + pthread_mutex_t mutex; + pthread_cond_t cond; + int shutdown; +} WorkQueue; + +static WorkQueue g_work_queue; + +static void work_queue_init(WorkQueue *queue) { + queue->count = 0; + queue->front = 0; + queue->rear = 0; + queue->shutdown = 0; + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->cond, NULL); +} + +static void work_queue_destroy(WorkQueue *queue) { + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->cond); +} + +static int work_queue_push(WorkQueue *queue, int client_socket) { + pthread_mutex_lock(&queue->mutex); + + if (queue->count >= MAX_PENDING_CONNECTIONS) { + pthread_mutex_unlock(&queue->mutex); + return -1; + } + + queue->client_sockets[queue->rear] = client_socket; + queue->rear = (queue->rear + 1) % MAX_PENDING_CONNECTIONS; + queue->count++; + + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); + return 0; +} + +static int work_queue_pop(WorkQueue *queue) { + pthread_mutex_lock(&queue->mutex); + + while (queue->count == 0 && !queue->shutdown) { + pthread_cond_wait(&queue->cond, &queue->mutex); + } + + if (queue->shutdown && queue->count == 0) { + pthread_mutex_unlock(&queue->mutex); + return -1; + } + + int client_socket = queue->client_sockets[queue->front]; + queue->front = (queue->front + 1) % MAX_PENDING_CONNECTIONS; + queue->count--; + + pthread_mutex_unlock(&queue->mutex); + return client_socket; +} + +static int get_optimal_thread_count(void) { + int cores = get_nprocs(); + return cores * 2; +} + +void handle_client_connection(int new_socket); + +static void *worker_thread(void *arg) { + (void)arg; + while (1) { + int client_socket = work_queue_pop(&g_work_queue); + if (client_socket < 0) { + break; + } + handle_client_connection(client_socket); + } + return NULL; +} static int initialize_server_socket(const char *ip, int port, int *server_fd_out, struct sockaddr_in *address_out) { @@ -57,7 +152,7 @@ static int initialize_server_socket(const char *ip, int port, int *server_fd_out return 0; } -static void handle_client_connection(int new_socket) { +void handle_client_connection(int new_socket) { current_client_socket = new_socket; char buffer[BUFFER_SIZE] = {0}; @@ -164,27 +259,73 @@ static void handle_client_connection(int new_socket) { } int beaker_run(const char *ip, int port) { - int server_fd; - struct sockaddr_in address; - int addrlen = sizeof(address); + beaker_run_with_threads(ip, port, 0); + return 0; +} - if (initialize_server_socket(ip, port, &server_fd, &address) != 0) { - return -1; - } +void beaker_run_with_threads(const char *ip, int port, int num_workers) { + int server_fd; + struct sockaddr_in address; + int addrlen = sizeof(address); - while (true) { - int new_socket; + g_shutdown_requested = 0; - if ((new_socket = accept(server_fd, (struct sockaddr *)&address, - (socklen_t *)&addrlen)) < 0) { - perror("accept failed"); - fprintf(stderr, "[ERROR] beaker_run: Failed to accept connection.\n"); - continue; + struct sigaction sa; + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + + if (num_workers <= 0) { + num_workers = get_optimal_thread_count(); } - handle_client_connection(new_socket); - } + if (initialize_server_socket(ip, port, &server_fd, &address) != 0) { + return; + } - close(server_fd); - return 0; + work_queue_init(&g_work_queue); + + pthread_t threads[num_workers]; + for (int i = 0; i < num_workers; i++) { + pthread_create(&threads[i], NULL, worker_thread, NULL); + } + + printf("Beaker server started with %d worker threads\n", num_workers); + + while (!g_shutdown_requested) { + int new_socket; + + if ((new_socket = accept(server_fd, (struct sockaddr *)&address, + (socklen_t *)&addrlen)) < 0) { + if (g_shutdown_requested) { + break; + } + perror("accept failed"); + fprintf(stderr, "[ERROR] beaker_run_with_threads: Failed to accept connection.\n"); + continue; + } + + if (work_queue_push(&g_work_queue, new_socket) < 0) { + fprintf(stderr, "[WARNING] Work queue full, closing connection\n"); + const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n"; + send(new_socket, busy_response, strlen(busy_response), 0); + close(new_socket); + } + } + + printf("Shutting down server...\n"); + + pthread_mutex_lock(&g_work_queue.mutex); + g_work_queue.shutdown = 1; + pthread_mutex_unlock(&g_work_queue.mutex); + pthread_cond_broadcast(&g_work_queue.cond); + + for (int i = 0; i < num_workers; i++) { + pthread_join(threads[i], NULL); + } + + work_queue_destroy(&g_work_queue); + close(server_fd); } -- cgit v1.2.3