diff options
| author | frosty <gabriel@bwaaa.monster> | 2026-03-05 06:48:37 +0000 |
|---|---|---|
| committer | frosty <gabriel@bwaaa.monster> | 2026-03-05 06:48:37 +0000 |
| commit | 0a5a3114eccf9b6c2af805f19fda2835ce247abb (patch) | |
| tree | d2cd06ec6aed8a506fb33d21fce3249888d74845 | |
| parent | cd0335d0c468cf55e53b5b3c1fae33d45c7de752 (diff) | |
| download | beaker-0a5a3114eccf9b6c2af805f19fda2835ce247abb.tar.gz | |
multithreading, minor fixes
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | beaker.h | 1 | ||||
| -rw-r--r-- | src/beaker_globals.c | 8 | ||||
| -rw-r--r-- | src/beaker_globals.h | 8 | ||||
| -rw-r--r-- | src/server.c | 177 | ||||
| -rw-r--r-- | src/template.c | 6 |
6 files changed, 172 insertions, 30 deletions
@@ -12,7 +12,7 @@ endif AR := ar CFLAGS := -Wall -fPIC -I. -Isrc -LDFLAGS := -shared +LDFLAGS := -shared -lpthread BUILD_DIR ?= build OBJ_DIR := $(BUILD_DIR)/obj @@ -99,5 +99,6 @@ bool serve_static_file_with_mime(const char *request_path_relative_to_static, co bool serve_data(const char *data, size_t size, const char *mime_type); int beaker_run(const char *ip, int port); +void beaker_run_with_threads(const char *ip, int port, int num_workers); #endif diff --git a/src/beaker_globals.c b/src/beaker_globals.c index 2ea5ab6..29c301e 100644 --- a/src/beaker_globals.c +++ b/src/beaker_globals.c @@ -4,10 +4,10 @@ RouteHandler handlers[MAX_HANDLERS]; int handler_count = 0; -int current_client_socket = -1; +__thread int current_client_socket = -1; -Cookie cookies_to_set[MAX_COOKIES]; +__thread Cookie cookies_to_set[MAX_COOKIES]; -int cookies_to_set_count = 0; +__thread int cookies_to_set_count = 0; -char current_request_buffer[BUFFER_SIZE];
\ No newline at end of file +__thread char current_request_buffer[BUFFER_SIZE];
\ No newline at end of file diff --git a/src/beaker_globals.h b/src/beaker_globals.h index 94aabdf..7b727bc 100644 --- a/src/beaker_globals.h +++ b/src/beaker_globals.h @@ -7,12 +7,12 @@ extern RouteHandler handlers[MAX_HANDLERS]; extern int handler_count; -extern int current_client_socket; +extern __thread int current_client_socket; -extern Cookie cookies_to_set[MAX_COOKIES]; +extern __thread Cookie cookies_to_set[MAX_COOKIES]; -extern int cookies_to_set_count; +extern __thread int cookies_to_set_count; -extern char current_request_buffer[BUFFER_SIZE]; +extern __thread char current_request_buffer[BUFFER_SIZE]; #endif
\ No newline at end of file diff --git a/src/server.c b/src/server.c index 1176a66..b623db7 100644 --- a/src/server.c +++ b/src/server.c @@ -8,6 +8,101 @@ #include <sys/types.h> #include <sys/socket.h> #include <unistd.h> +#include <pthread.h> +#include <sys/sysinfo.h> +#include <stdlib.h> +#include <signal.h> + +#define MAX_PENDING_CONNECTIONS 128 + +static volatile sig_atomic_t g_shutdown_requested = 0; + +static void signal_handler(int sig) { + (void)sig; + g_shutdown_requested = 1; +} + +typedef struct { + int client_sockets[MAX_PENDING_CONNECTIONS]; + int count; + int front; + int rear; + pthread_mutex_t mutex; + pthread_cond_t cond; + int shutdown; +} WorkQueue; + +static WorkQueue g_work_queue; + +static void work_queue_init(WorkQueue *queue) { + queue->count = 0; + queue->front = 0; + queue->rear = 0; + queue->shutdown = 0; + pthread_mutex_init(&queue->mutex, NULL); + pthread_cond_init(&queue->cond, NULL); +} + +static void work_queue_destroy(WorkQueue *queue) { + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->cond); +} + +static int work_queue_push(WorkQueue *queue, int client_socket) { + pthread_mutex_lock(&queue->mutex); + + if (queue->count >= MAX_PENDING_CONNECTIONS) { + pthread_mutex_unlock(&queue->mutex); + return -1; + } + + queue->client_sockets[queue->rear] = client_socket; + queue->rear = (queue->rear + 1) % MAX_PENDING_CONNECTIONS; + queue->count++; + + pthread_cond_signal(&queue->cond); + pthread_mutex_unlock(&queue->mutex); + return 0; +} + +static int work_queue_pop(WorkQueue *queue) { + pthread_mutex_lock(&queue->mutex); + + while (queue->count == 0 && !queue->shutdown) { + pthread_cond_wait(&queue->cond, &queue->mutex); + } + + if (queue->shutdown && queue->count == 0) { + pthread_mutex_unlock(&queue->mutex); + return -1; + } + + int client_socket = queue->client_sockets[queue->front]; + queue->front = (queue->front + 1) % MAX_PENDING_CONNECTIONS; + queue->count--; + + pthread_mutex_unlock(&queue->mutex); + return client_socket; +} + +static int get_optimal_thread_count(void) { + int cores = get_nprocs(); + return cores * 2; +} + +void handle_client_connection(int new_socket); + +static void *worker_thread(void *arg) { + (void)arg; + while (1) { + int client_socket = work_queue_pop(&g_work_queue); + if (client_socket < 0) { + break; + } + handle_client_connection(client_socket); + } + return NULL; +} static int initialize_server_socket(const char *ip, int port, int *server_fd_out, struct sockaddr_in *address_out) { @@ -57,7 +152,7 @@ static int initialize_server_socket(const char *ip, int port, int *server_fd_out return 0; } -static void handle_client_connection(int new_socket) { +void handle_client_connection(int new_socket) { current_client_socket = new_socket; char buffer[BUFFER_SIZE] = {0}; @@ -164,27 +259,73 @@ static void handle_client_connection(int new_socket) { } int beaker_run(const char *ip, int port) { - int server_fd; - struct sockaddr_in address; - int addrlen = sizeof(address); + beaker_run_with_threads(ip, port, 0); + return 0; +} - if (initialize_server_socket(ip, port, &server_fd, &address) != 0) { - return -1; - } +void beaker_run_with_threads(const char *ip, int port, int num_workers) { + int server_fd; + struct sockaddr_in address; + int addrlen = sizeof(address); - while (true) { - int new_socket; + g_shutdown_requested = 0; - if ((new_socket = accept(server_fd, (struct sockaddr *)&address, - (socklen_t *)&addrlen)) < 0) { - perror("accept failed"); - fprintf(stderr, "[ERROR] beaker_run: Failed to accept connection.\n"); - continue; + struct sigaction sa; + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + sigaction(SIGINT, &sa, NULL); + sigaction(SIGTERM, &sa, NULL); + + if (num_workers <= 0) { + num_workers = get_optimal_thread_count(); } - handle_client_connection(new_socket); - } + if (initialize_server_socket(ip, port, &server_fd, &address) != 0) { + return; + } - close(server_fd); - return 0; + work_queue_init(&g_work_queue); + + pthread_t threads[num_workers]; + for (int i = 0; i < num_workers; i++) { + pthread_create(&threads[i], NULL, worker_thread, NULL); + } + + printf("Beaker server started with %d worker threads\n", num_workers); + + while (!g_shutdown_requested) { + int new_socket; + + if ((new_socket = accept(server_fd, (struct sockaddr *)&address, + (socklen_t *)&addrlen)) < 0) { + if (g_shutdown_requested) { + break; + } + perror("accept failed"); + fprintf(stderr, "[ERROR] beaker_run_with_threads: Failed to accept connection.\n"); + continue; + } + + if (work_queue_push(&g_work_queue, new_socket) < 0) { + fprintf(stderr, "[WARNING] Work queue full, closing connection\n"); + const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n"; + send(new_socket, busy_response, strlen(busy_response), 0); + close(new_socket); + } + } + + printf("Shutting down server...\n"); + + pthread_mutex_lock(&g_work_queue.mutex); + g_work_queue.shutdown = 1; + pthread_mutex_unlock(&g_work_queue.mutex); + pthread_cond_broadcast(&g_work_queue.cond); + + for (int i = 0; i < num_workers; i++) { + pthread_join(threads[i], NULL); + } + + work_queue_destroy(&g_work_queue); + close(server_fd); } diff --git a/src/template.c b/src/template.c index 494b011..3bd583b 100644 --- a/src/template.c +++ b/src/template.c @@ -471,7 +471,7 @@ static void append_to_buffer(char **buffer, size_t *current_len, static char *parse_indexed_tag(const char *tag_content, int *index_val) { *index_val = -1; - char *open_bracket = strchr(tag_content, '['); + const char *open_bracket = strchr(tag_content, '['); if (open_bracket == NULL) { char *key_name = strdup(tag_content); @@ -483,7 +483,7 @@ static char *parse_indexed_tag(const char *tag_content, int *index_val) { return key_name; } - char *close_bracket = strchr(open_bracket, ']'); + const char *close_bracket = strchr(open_bracket, ']'); if (close_bracket == NULL) { fprintf(stderr, "[ERROR] parse_indexed_tag: Unclosed bracket in tag '%s'. " @@ -742,7 +742,7 @@ static char *render_template_segment(const char *template_segment, const char *filename_start = trimmed_tag_content + strlen("include "); if (*filename_start == '"') { filename_start++; - char *filename_end = strchr(filename_start, '"'); + const char *filename_end = strchr(filename_start, '"'); if (filename_end != NULL) { size_t filename_len = filename_end - filename_start; char *included_filename = (char *)malloc(filename_len + 1); |
