Skip to content

Commit 56fad0e

Browse files
committed
added thread pool on epoll server.
1 parent 44c7c04 commit 56fad0e

File tree

8 files changed

+224
-24
lines changed

8 files changed

+224
-24
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC = clang
2-
CFLAGS = -static -lm -Wall -Wextra -O2
2+
CFLAGS = -lm -Wall -Wextra -O2 -static
33

44
# files
55
SOURCES = simple-server.c epoll-server.c uring-server.c
@@ -13,7 +13,7 @@ simple: simple-server.c
1313
# $(CC) $(CFLAGS) -o $@ $< -DFILE1_DEFINE
1414

1515
epoll: epoll-server.c
16-
$(CC) -o epoll-server epoll-server.c request-handle.c cpu-bound.c $(CFLAGS)
16+
$(CC) -o epoll-server epoll-server.c threadpool.c request-handle.c cpu-bound.c $(CFLAGS) -pthread
1717
# $(CC) $(CFLAGS) -o $@ $< -DFILE2_DEFINE -DADDITIONAL_FLAG
1818

1919
uring: uring-server.c

epoll-server.c

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
#include <sys/epoll.h>
22
#include <sys/socket.h>
33
#include <netinet/in.h>
4+
#include <arpa/inet.h>
45
#include <unistd.h>
56
#include <fcntl.h>
67
#include <stdio.h>
78
#include <stdlib.h>
89
#include <string.h>
910
#include <signal.h>
1011
#include <unistd.h>
12+
#include <errno.h>
1113
#include "request-handle.h"
14+
#include "threadpool.h"
1215

1316
#define PORT 8080
14-
#define MAX_EVENTS 10
15-
#define BUFFER_SIZE 1024
17+
#define MAX_EVENTS 1000
18+
#define WORKERS 3
19+
#define BUFFER_SIZE 2048
1620

1721
int server_fd = -1; // global server socket
1822

@@ -50,9 +54,11 @@ void set_nonblocking(int sock) {
5054
}
5155

5256
int main() {
57+
int opt = 1;
5358
int epoll_fd;
5459
struct sockaddr_in server_addr;
5560
struct epoll_event event, events[MAX_EVENTS];
61+
ThreadPool pool;
5662

5763
setup_signal_handler();
5864

@@ -63,6 +69,13 @@ int main() {
6369
exit(EXIT_FAILURE);
6470
}
6571

72+
// Configurando a opção SO_REUSEADDR
73+
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
74+
perror("Erro ao configurar socket");
75+
close(server_fd);
76+
exit(EXIT_FAILURE);
77+
}
78+
6679
// Configurar endereço e bind
6780
server_addr.sin_family = AF_INET;
6881
server_addr.sin_port = htons(PORT);
@@ -89,7 +102,7 @@ int main() {
89102
}
90103

91104
// Adicionar o socket do servidor ao epoll
92-
event.events = EPOLLIN; // Monitorar para leitura
105+
event.events = EPOLLIN | EPOLLET; // Monitorar para leitura
93106
event.data.fd = server_fd;
94107
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) < 0) {
95108
perror("Erro ao adicionar ao epoll");
@@ -98,6 +111,9 @@ int main() {
98111
exit(EXIT_FAILURE);
99112
}
100113

114+
//iniciar workers thread_pool
115+
thread_pool_init(&pool, WORKERS, MAX_EVENTS);
116+
101117
printf("epoll server listening on port %d\n", PORT);
102118

103119
while (1) {
@@ -106,29 +122,56 @@ int main() {
106122
if (events[i].data.fd == server_fd) {
107123
// Aceitar nova conexão
108124
int client_fd = accept(server_fd, NULL, NULL);
125+
if (client_fd == -1) {
126+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
127+
// Nenhuma conexão pendente
128+
continue;
129+
} else {
130+
perror("Erro ao aceitar conexão");
131+
continue;
132+
}
133+
}
109134
set_nonblocking(client_fd);
110135
event.events = EPOLLIN | EPOLLET;
111136
event.data.fd = client_fd;
112137
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event);
113-
printf("Nova conexão aceita\n");
138+
//printf("Nova conexão aceita\n");
114139
} else {
115140
// process request
116141
char buffer[BUFFER_SIZE];
117142
int bytes_read = read(events[i].data.fd, buffer, BUFFER_SIZE - 1);
118143

119144
if (bytes_read < 0) {
120-
perror("Erro ao ler do cliente");
145+
if (errno == EAGAIN || errno == EWOULDBLOCK) {
146+
continue;
147+
} else {
148+
perror("Erro ao ler do cliente");
149+
close(events[i].data.fd);
150+
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
151+
continue;
152+
}
153+
} else if (bytes_read == 0) {
154+
// Conexão fechada pelo cliente
121155
close(events[i].data.fd);
156+
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
122157
continue;
123158
}
124159

125160
buffer[bytes_read] = '\0';
126161

127-
handle_request(events[i].data.fd, buffer);
162+
TaskArgs *args = malloc(sizeof(TaskArgs));
163+
args->client_fd = events[i].data.fd;
164+
args->buffer = malloc(BUFFER_SIZE * sizeof(char));
165+
strncpy(args->buffer, buffer, BUFFER_SIZE - 1);
166+
args->buffer[BUFFER_SIZE - 1] = '\0';
167+
168+
//handle_request(events[i].data.fd, buffer); //sync
169+
thread_pool_add(&pool, handle_request, args); //async
128170
}
129171
}
130172
}
131173

132174
close(server_fd);
175+
close(epoll_fd);
133176
return 0;
134177
}

request-handle.c

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
#include <sys/socket.h>
66
#include <netinet/in.h>
77
#include <arpa/inet.h>
8-
#include <signal.h>
8+
//#include <signal.h>
99
#include <string.h>
1010
#include <unistd.h>
1111
#include "cpu-bound.h"
1212
#include "request-handle.h"
13+
#include "threadpool.h"
1314

14-
#define BUF_SIZE 1024
15+
#define BUF_SIZE 2048
1516

1617

1718
char* getParamQueryString(char buffer[BUF_SIZE], const char *param_name) {
@@ -51,11 +52,11 @@ char* getParamQueryString(char buffer[BUF_SIZE], const char *param_name) {
5152
return NULL;
5253
}
5354

54-
void handle_request(int client_fd, char *buffer) {
55-
printf("Requisição recebida:\n%s\n", buffer);
55+
void handle_request(TaskArgs* args) {
56+
//printf("Requisição recebida:\n%s\n", args->buffer);
5657

5758
//params
58-
char *timeout = getParamQueryString(buffer, "timeout");
59+
char *timeout = getParamQueryString(args->buffer, "timeout");
5960
if (timeout == NULL) {
6061
timeout = "10"; // ms
6162
}
@@ -68,32 +69,35 @@ void handle_request(int client_fd, char *buffer) {
6869
"{\"message\": \"request completed\"}";
6970

7071
// Routing
71-
if (strstr(buffer, "GET /api/cpu-bound") == buffer) {
72+
if (strstr(args->buffer, "GET /api/cpu-bound") == args->buffer) {
7273

7374
simulateCPU(atoi(timeout));
7475

75-
write(client_fd, response, strlen(response));
76+
write(args->client_fd, response, strlen(response));
7677

77-
} else if (strstr(buffer, "GET /api/io-bound") == buffer) {
78+
} else if (strstr(args->buffer, "GET /api/io-bound") == args->buffer) {
7879

7980
usleep(atoi(timeout) * 1000);
8081

81-
write(client_fd, response, strlen(response));
82+
write(args->client_fd, response, strlen(response));
8283

83-
} else if (strstr(buffer, "GET /health") == buffer) {
84+
} else if (strstr(args->buffer, "GET /health") == args->buffer) {
8485

8586
// response 200
8687
char *resp = "HTTP/1.1 200 OK\r\nContent-Length: 8\r\n\r\nHealthy!";
87-
write(client_fd, resp, strlen(resp));
88+
write(args->client_fd, resp, strlen(resp));
8889

8990
} else {
9091

9192
// response 404
9293
char *resp = "HTTP/1.1 404 Not Found\r\nContent-Length: 16\r\n\r\nRoute not found!";
93-
write(client_fd, resp, strlen(resp));
94+
write(args->client_fd, resp, strlen(resp));
9495
}
9596

9697
//free(timeout);
9798
//free(resp_size);
98-
close(client_fd);
99+
if (strstr(args->buffer, "Connection: keep-alive") == NULL) {
100+
close(args->client_fd);
101+
}
102+
free(args);
99103
}

request-handle.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
#ifndef REQUEST_HANDLE_H
22
#define REQUEST_HANDLE_H
33

4+
#include "threadpool.h"
45

5-
void handle_request(int client_fd, char *buffer);
6+
void handle_request(TaskArgs* args); //(int client_fd, char *buffer);
67

78
#endif

simple-server.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <signal.h>
99
#include <unistd.h>
1010
#include "request-handle.h"
11+
#include "threadpool.h"
1112

1213
#define PORT 8080
1314
#define BACKLOG 10
@@ -102,7 +103,13 @@ int main() {
102103

103104
buffer[bytes_read] = '\0';
104105

105-
handle_request(client_fd, buffer);
106+
TaskArgs *args = malloc(sizeof(TaskArgs));
107+
args->client_fd = client_fd;
108+
args->buffer = malloc(BUF_SIZE * sizeof(char));
109+
strncpy(args->buffer, buffer, BUF_SIZE - 1);
110+
args->buffer[BUF_SIZE - 1] = '\0';
111+
112+
handle_request(args);
106113
}
107114

108115
close(server_fd);

threadpool.c

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#include <pthread.h>
2+
#include <stdlib.h>
3+
#include <stdio.h>
4+
#include <unistd.h>
5+
6+
7+
typedef struct {
8+
int client_fd;
9+
char *buffer;
10+
} TaskArgs;
11+
12+
typedef void (*FunctionPointer)(TaskArgs*);
13+
14+
typedef struct {
15+
FunctionPointer function; // Ponteiro para a função
16+
void* argument; // Argumento da função
17+
} Task;
18+
19+
typedef struct {
20+
Task* tasks; // Fila de tarefas
21+
int task_count; // Número de tarefas na fila
22+
int head, tail; // Ponteiros para início e fim da fila
23+
int queue_size; // Tamanho da fila
24+
25+
pthread_mutex_t lock; // Mutex para sincronização
26+
pthread_cond_t notify; // Condição para notificação
27+
int shutdown; // Flag para finalizar o pool
28+
} ThreadPool;
29+
30+
31+
int thread_pool_add(ThreadPool* pool, FunctionPointer TaskFunc, void* argument) {
32+
pthread_mutex_lock(&pool->lock);
33+
34+
if (pool->task_count == pool->queue_size) {
35+
pthread_mutex_unlock(&pool->lock);
36+
return -1; // Fila cheia
37+
}
38+
39+
Task task = { TaskFunc, argument };
40+
pool->tasks[pool->tail] = task;
41+
pool->tail = (pool->tail + 1) % pool->queue_size;
42+
pool->task_count++;
43+
44+
pthread_cond_signal(&pool->notify);
45+
pthread_mutex_unlock(&pool->lock);
46+
return 0;
47+
}
48+
49+
void* thread_loop(void* arg) {
50+
ThreadPool* pool = (ThreadPool*)arg;
51+
52+
while (1) {
53+
pthread_mutex_lock(&pool->lock);
54+
55+
while (pool->task_count == 0 && !pool->shutdown) {
56+
pthread_cond_wait(&pool->notify, &pool->lock);
57+
}
58+
59+
if (pool->shutdown) {
60+
pthread_mutex_unlock(&pool->lock);
61+
pthread_exit(NULL);
62+
}
63+
64+
Task task = pool->tasks[pool->head];
65+
pool->head = (pool->head + 1) % pool->queue_size;
66+
pool->task_count--;
67+
68+
pthread_mutex_unlock(&pool->lock);
69+
70+
// Executa a tarefa
71+
task.function(task.argument);
72+
}
73+
}
74+
75+
void thread_pool_init(ThreadPool* pool, int thread_count, int queue_size) {
76+
pool->tasks = (Task*)malloc(sizeof(Task) * queue_size);
77+
pool->task_count = 0;
78+
pool->queue_size = queue_size;
79+
pool->head = pool->tail = 0;
80+
pool->shutdown = 0;
81+
82+
pthread_mutex_init(&pool->lock, NULL);
83+
pthread_cond_init(&pool->notify, NULL);
84+
85+
for (int i = 0; i < thread_count; i++) {
86+
pthread_t thread;
87+
pthread_create(&thread, NULL, thread_loop, (void*)pool);
88+
pthread_detach(thread);
89+
}
90+
}
91+
92+
void thread_pool_destroy(ThreadPool* pool) {
93+
pthread_mutex_lock(&pool->lock);
94+
pool->shutdown = 1;
95+
pthread_cond_broadcast(&pool->notify);
96+
pthread_mutex_unlock(&pool->lock);
97+
98+
free(pool->tasks);
99+
pthread_mutex_destroy(&pool->lock);
100+
pthread_cond_destroy(&pool->notify);
101+
}

threadpool.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#ifndef THREAD_POOL_H
2+
#define THREAD_POOL_H
3+
4+
#include <pthread.h>
5+
6+
typedef struct {
7+
int client_fd;
8+
char *buffer;
9+
} TaskArgs;
10+
11+
typedef void (*FunctionPointer)(TaskArgs*);
12+
13+
typedef struct {
14+
FunctionPointer function; // Ponteiro para a função
15+
void* argument; // Argumento da função
16+
} Task;
17+
18+
// Estrutura para o pool de threads
19+
typedef struct {
20+
Task* tasks; // Fila de tarefas
21+
int task_count; // Número de tarefas na fila
22+
int head, tail; // Ponteiros para início e fim da fila
23+
int queue_size; // Tamanho da fila
24+
25+
pthread_mutex_t lock; // Mutex para sincronização
26+
pthread_cond_t notify; // Condição para notificação
27+
int shutdown; // Flag para finalizar o pool
28+
} ThreadPool;
29+
30+
31+
//int thread_pool_add(ThreadPool* pool, void (*function)(void*), void* argument);
32+
int thread_pool_add(ThreadPool* pool, FunctionPointer TaskFunc, void* argument);
33+
void thread_pool_init(ThreadPool* pool, int thread_count, int queue_size);
34+
void thread_pool_destroy(ThreadPool* pool);
35+
36+
#endif

0 commit comments

Comments
 (0)