aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/beaker_globals.c8
-rw-r--r--src/beaker_globals.h8
-rw-r--r--src/server.c177
-rw-r--r--src/template.c6
4 files changed, 170 insertions, 29 deletions
diff --git a/src/beaker_globals.c b/src/beaker_globals.c
index 2ea5ab6..29c301e 100644
--- a/src/beaker_globals.c
+++ b/src/beaker_globals.c
@@ -4,10 +4,10 @@ RouteHandler handlers[MAX_HANDLERS];
int handler_count = 0;
-int current_client_socket = -1;
+__thread int current_client_socket = -1;
-Cookie cookies_to_set[MAX_COOKIES];
+__thread Cookie cookies_to_set[MAX_COOKIES];
-int cookies_to_set_count = 0;
+__thread int cookies_to_set_count = 0;
-char current_request_buffer[BUFFER_SIZE]; \ No newline at end of file
+__thread char current_request_buffer[BUFFER_SIZE]; \ No newline at end of file
diff --git a/src/beaker_globals.h b/src/beaker_globals.h
index 94aabdf..7b727bc 100644
--- a/src/beaker_globals.h
+++ b/src/beaker_globals.h
@@ -7,12 +7,12 @@ extern RouteHandler handlers[MAX_HANDLERS];
extern int handler_count;
-extern int current_client_socket;
+extern __thread int current_client_socket;
-extern Cookie cookies_to_set[MAX_COOKIES];
+extern __thread Cookie cookies_to_set[MAX_COOKIES];
-extern int cookies_to_set_count;
+extern __thread int cookies_to_set_count;
-extern char current_request_buffer[BUFFER_SIZE];
+extern __thread char current_request_buffer[BUFFER_SIZE];
#endif \ No newline at end of file
diff --git a/src/server.c b/src/server.c
index 1176a66..b623db7 100644
--- a/src/server.c
+++ b/src/server.c
@@ -8,6 +8,101 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
+#include <pthread.h>
+#include <sys/sysinfo.h>
+#include <stdlib.h>
+#include <signal.h>
+
+#define MAX_PENDING_CONNECTIONS 128
+
+static volatile sig_atomic_t g_shutdown_requested = 0;
+
+static void signal_handler(int sig) {
+ (void)sig;
+ g_shutdown_requested = 1;
+}
+
+typedef struct {
+ int client_sockets[MAX_PENDING_CONNECTIONS];
+ int count;
+ int front;
+ int rear;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int shutdown;
+} WorkQueue;
+
+static WorkQueue g_work_queue;
+
+static void work_queue_init(WorkQueue *queue) {
+ queue->count = 0;
+ queue->front = 0;
+ queue->rear = 0;
+ queue->shutdown = 0;
+ pthread_mutex_init(&queue->mutex, NULL);
+ pthread_cond_init(&queue->cond, NULL);
+}
+
+static void work_queue_destroy(WorkQueue *queue) {
+ pthread_mutex_destroy(&queue->mutex);
+ pthread_cond_destroy(&queue->cond);
+}
+
+static int work_queue_push(WorkQueue *queue, int client_socket) {
+ pthread_mutex_lock(&queue->mutex);
+
+ if (queue->count >= MAX_PENDING_CONNECTIONS) {
+ pthread_mutex_unlock(&queue->mutex);
+ return -1;
+ }
+
+ queue->client_sockets[queue->rear] = client_socket;
+ queue->rear = (queue->rear + 1) % MAX_PENDING_CONNECTIONS;
+ queue->count++;
+
+ pthread_cond_signal(&queue->cond);
+ pthread_mutex_unlock(&queue->mutex);
+ return 0;
+}
+
+static int work_queue_pop(WorkQueue *queue) {
+ pthread_mutex_lock(&queue->mutex);
+
+ while (queue->count == 0 && !queue->shutdown) {
+ pthread_cond_wait(&queue->cond, &queue->mutex);
+ }
+
+ if (queue->shutdown && queue->count == 0) {
+ pthread_mutex_unlock(&queue->mutex);
+ return -1;
+ }
+
+ int client_socket = queue->client_sockets[queue->front];
+ queue->front = (queue->front + 1) % MAX_PENDING_CONNECTIONS;
+ queue->count--;
+
+ pthread_mutex_unlock(&queue->mutex);
+ return client_socket;
+}
+
+static int get_optimal_thread_count(void) {
+ int cores = get_nprocs();
+ return cores * 2;
+}
+
+void handle_client_connection(int new_socket);
+
+static void *worker_thread(void *arg) {
+ (void)arg;
+ while (1) {
+ int client_socket = work_queue_pop(&g_work_queue);
+ if (client_socket < 0) {
+ break;
+ }
+ handle_client_connection(client_socket);
+ }
+ return NULL;
+}
static int initialize_server_socket(const char *ip, int port, int *server_fd_out,
struct sockaddr_in *address_out) {
@@ -57,7 +152,7 @@ static int initialize_server_socket(const char *ip, int port, int *server_fd_out
return 0;
}
-static void handle_client_connection(int new_socket) {
+void handle_client_connection(int new_socket) {
current_client_socket = new_socket;
char buffer[BUFFER_SIZE] = {0};
@@ -164,27 +259,73 @@ static void handle_client_connection(int new_socket) {
}
int beaker_run(const char *ip, int port) {
- int server_fd;
- struct sockaddr_in address;
- int addrlen = sizeof(address);
+ beaker_run_with_threads(ip, port, 0);
+ return 0;
+}
- if (initialize_server_socket(ip, port, &server_fd, &address) != 0) {
- return -1;
- }
+void beaker_run_with_threads(const char *ip, int port, int num_workers) {
+ int server_fd;
+ struct sockaddr_in address;
+ int addrlen = sizeof(address);
- while (true) {
- int new_socket;
+ g_shutdown_requested = 0;
- if ((new_socket = accept(server_fd, (struct sockaddr *)&address,
- (socklen_t *)&addrlen)) < 0) {
- perror("accept failed");
- fprintf(stderr, "[ERROR] beaker_run: Failed to accept connection.\n");
- continue;
+ struct sigaction sa;
+ sa.sa_handler = signal_handler;
+ sigemptyset(&sa.sa_mask);
+ sa.sa_flags = 0;
+ sigaction(SIGINT, &sa, NULL);
+ sigaction(SIGTERM, &sa, NULL);
+
+ if (num_workers <= 0) {
+ num_workers = get_optimal_thread_count();
}
- handle_client_connection(new_socket);
- }
+ if (initialize_server_socket(ip, port, &server_fd, &address) != 0) {
+ return;
+ }
- close(server_fd);
- return 0;
+ work_queue_init(&g_work_queue);
+
+ pthread_t threads[num_workers];
+ for (int i = 0; i < num_workers; i++) {
+ pthread_create(&threads[i], NULL, worker_thread, NULL);
+ }
+
+ printf("Beaker server started with %d worker threads\n", num_workers);
+
+ while (!g_shutdown_requested) {
+ int new_socket;
+
+ if ((new_socket = accept(server_fd, (struct sockaddr *)&address,
+ (socklen_t *)&addrlen)) < 0) {
+ if (g_shutdown_requested) {
+ break;
+ }
+ perror("accept failed");
+ fprintf(stderr, "[ERROR] beaker_run_with_threads: Failed to accept connection.\n");
+ continue;
+ }
+
+ if (work_queue_push(&g_work_queue, new_socket) < 0) {
+ fprintf(stderr, "[WARNING] Work queue full, closing connection\n");
+ const char *busy_response = "HTTP/1.1 503 Service Unavailable\r\nContent-Length: 0\r\n\r\n";
+ send(new_socket, busy_response, strlen(busy_response), 0);
+ close(new_socket);
+ }
+ }
+
+ printf("Shutting down server...\n");
+
+ pthread_mutex_lock(&g_work_queue.mutex);
+ g_work_queue.shutdown = 1;
+ pthread_mutex_unlock(&g_work_queue.mutex);
+ pthread_cond_broadcast(&g_work_queue.cond);
+
+ for (int i = 0; i < num_workers; i++) {
+ pthread_join(threads[i], NULL);
+ }
+
+ work_queue_destroy(&g_work_queue);
+ close(server_fd);
}
diff --git a/src/template.c b/src/template.c
index 494b011..3bd583b 100644
--- a/src/template.c
+++ b/src/template.c
@@ -471,7 +471,7 @@ static void append_to_buffer(char **buffer, size_t *current_len,
static char *parse_indexed_tag(const char *tag_content, int *index_val) {
*index_val = -1;
- char *open_bracket = strchr(tag_content, '[');
+ const char *open_bracket = strchr(tag_content, '[');
if (open_bracket == NULL) {
char *key_name = strdup(tag_content);
@@ -483,7 +483,7 @@ static char *parse_indexed_tag(const char *tag_content, int *index_val) {
return key_name;
}
- char *close_bracket = strchr(open_bracket, ']');
+ const char *close_bracket = strchr(open_bracket, ']');
if (close_bracket == NULL) {
fprintf(stderr,
"[ERROR] parse_indexed_tag: Unclosed bracket in tag '%s'. "
@@ -742,7 +742,7 @@ static char *render_template_segment(const char *template_segment,
const char *filename_start = trimmed_tag_content + strlen("include ");
if (*filename_start == '"') {
filename_start++;
- char *filename_end = strchr(filename_start, '"');
+ const char *filename_end = strchr(filename_start, '"');
if (filename_end != NULL) {
size_t filename_len = filename_end - filename_start;
char *included_filename = (char *)malloc(filename_len + 1);