-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtaskthread.c
116 lines (94 loc) · 2.55 KB
/
taskthread.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include "taskthread.h"
#include "common.h"
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <base/logfile.h>
#include <base/zzmalloc.h>
static void*
taskthread_run(void *arg)
{
TaskThread *tt = (TaskThread*)arg;
DINFO("task thread:%lu\n", (unsigned long)tt->tid);
return NULL;
}
TaskThread*
taskthread_create()
{
int ret;
TaskThread *tt = zz_malloc(sizeof(TaskThread));
memset(tt, 0, sizeof(TaskThread));
ret = pthread_create(&tt->tid, NULL, taskthread_run, tt);
if (ret != 0) {
char errbuf[1024];
strerror_r(errno, errbuf, 1024);
DERROR("pthread_create error: %s\n", errbuf);
MEMLINK_EXIT;
}
ret = pthread_mutex_init(&tt->locker, NULL);
if (ret != 0) {
char errbuf[1024];
strerror_r(errno, errbuf, 1024);
DERROR("pthread_mutex_init error: %s\n", errbuf);
MEMLINK_EXIT;
}
ret = pthread_cond_init(&tt->cond, NULL);
if (ret != 0) {
char errbuf[1024];
strerror_r(errno, errbuf, 1024);
DERROR("pthread_cond_init error: %s\n", errbuf);
MEMLINK_EXIT;
}
ret = pthread_detach(tt->tid);
if (ret != 0) {
char errbuf[1024];
strerror_r(errno, errbuf, 1024);
DERROR("pthread_detach error: %s\n", errbuf);
MEMLINK_EXIT;
}
return tt;
}
int
taskthread_add_task(TaskThread *tt, Task *task)
{
int retcode = MEMLINK_OK;
pthread_mutex_lock(&tt->locker);
if ((tt->task_addi == tt->task_readi) && tt->tasks[tt->task_addi] != NULL) {
// full
retcode = MEMLINK_ERR_FULL;
goto add_task_over;
}
tt->tasks[tt->task_addi] = task;
tt->task_addi++;
tt->task_addi = tt->task_addi % MAX_TASK;
pthread_cond_signal(&tt->cond);
pthread_mutex_unlock(&tt->locker);
add_task_over:
return retcode;
}
Task*
taskthread_get_task(TaskThread *tt, int timeout)
{
Task *task = NULL;
struct timespec ts;
int ret;
pthread_mutex_lock(&tt->locker);
while ((tt->task_addi == tt->task_readi) && tt->tasks[tt->task_addi] == NULL) {
// empty
if (timeout <= 0) {
goto get_task_over;
}
ts.tv_sec = time(NULL) + timeout;
ts.tv_nsec = 0;
ret = pthread_cond_timedwait(&tt->cond, &tt->locker, &ts);
if (ret == ETIMEDOUT)
return NULL;
}
task = tt->tasks[tt->task_readi];
tt->task_readi++;
tt->task_readi = tt->task_readi % MAX_TASK;
pthread_mutex_unlock(&tt->locker);
get_task_over:
return task;
}