Skip to content

Commit 58cbc72

Browse files
committed
Add timeout support for reading fd'd in reactor.
Exposes news reactor_add_reader_with_timeout() function. Provides the needs for OpenSIPS#1838
1 parent f8e7ccc commit 58cbc72

File tree

3 files changed

+76
-24
lines changed

3 files changed

+76
-24
lines changed

io_wait.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ struct fd_map {
111111
* internal usage */
112112
int app_flags; /* flags to be used by upper layer apps, not by
113113
* the reactor */
114+
unsigned int timeout;
114115
};
115116

116117

@@ -198,6 +199,7 @@ static inline struct fd_map* hash_fd_map( io_wait_h* h,
198199
fd_type type,
199200
void* data,
200201
int flags,
202+
unsigned int timeout,
201203
int *already)
202204
{
203205
if (h->fd_hash[fd].fd <= 0) {
@@ -212,6 +214,8 @@ static inline struct fd_map* hash_fd_map( io_wait_h* h,
212214

213215
h->fd_hash[fd].flags|=flags;
214216

217+
h->fd_hash[fd].timeout = timeout;
218+
215219
return &h->fd_hash[fd];
216220
}
217221

@@ -255,6 +259,7 @@ static inline int kq_ev_change(io_wait_h* h, int fd, int filter, int flag,
255259
#define IO_WATCH_READ (1<<0)
256260
#define IO_WATCH_WRITE (1<<1)
257261
#define IO_WATCH_ERROR (1<<2)
262+
#define IO_WATCH_TIMEOUT (1<<3)
258263
/* reserved, do not attempt to use */
259264
#define IO_WATCH_PRV_CHECKED (1<<29)
260265
#define IO_WATCH_PRV_TRIG_READ (1<<30)
@@ -341,6 +346,7 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto]
341346
fd_type type,
342347
void* data,
343348
int prio,
349+
unsigned int timeout,
344350
int flags)
345351
{
346352

@@ -440,7 +446,10 @@ inline static int io_watch_add( io_wait_h* h, // lgtm [cpp/use-of-goto]
440446
return 0;
441447
}
442448

443-
if ((e=hash_fd_map(h, fd, type, data,flags,&already))==0){
449+
if (timeout)
450+
timeout+=get_ticks();
451+
452+
if ((e=hash_fd_map(h, fd, type, data,flags, timeout, &already))==0){
444453
LM_ERR("[%s] failed to hash the fd %d\n",h->name, fd);
445454
goto error0;
446455
}

io_wait_loop.h

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
7070
{
7171
int n, r;
7272
int ret;
73+
struct fd_map *e;
74+
unsigned int curr_time;
75+
7376
again:
7477
ret=n=poll(h->fd_array, h->fd_no, t*1000);
7578
if (n==-1){
@@ -79,9 +82,11 @@ inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
7982
goto error;
8083
}
8184
}
82-
for (r=h->fd_no-1; (r>=0) && n; r--){
85+
86+
curr_time = get_ticks();
87+
88+
for (r=h->fd_no-1; (r>=0) ; r--){
8389
if (h->fd_array[r].revents & POLLOUT) {
84-
n--;
8590
/* sanity checks */
8691
if ((h->fd_array[r].fd >= h->max_fd_no)||
8792
(h->fd_array[r].fd < 0)){
@@ -93,7 +98,6 @@ inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
9398
}
9499
handle_io(get_fd_map(h, h->fd_array[r].fd),r,IO_WATCH_WRITE);
95100
} else if (h->fd_array[r].revents & (POLLIN|POLLERR|POLLHUP)){
96-
n--;
97101
/* sanity checks */
98102
if ((h->fd_array[r].fd >= h->max_fd_no)||
99103
(h->fd_array[r].fd < 0)){
@@ -104,8 +108,13 @@ inline static int io_wait_loop_poll(io_wait_h* h, int t, int repeat)
104108
continue;
105109
}
106110

107-
while((handle_io(get_fd_map(h, h->fd_array[r].fd), r,IO_WATCH_READ) > 0)
111+
while((handle_io(get_fd_map(h, h->fd_array[r].fd), r,
112+
IO_WATCH_READ) > 0)
108113
&& repeat);
114+
} else if ( (e=get_fd_map(h, h->fd_array[r].fd))!=NULL &&
115+
e->timeout!=0 && e->timeout<=curr_time ) {
116+
e->timeout = 0;
117+
handle_io( e, r, IO_WATCH_TIMEOUT);
109118
}
110119
}
111120
error:
@@ -122,6 +131,8 @@ inline static int io_wait_loop_select(io_wait_h* h, int t, int repeat)
122131
int n, ret;
123132
struct timeval timeout;
124133
int r;
134+
struct fd_map *e;
135+
unsigned int curr_time;
125136

126137
again:
127138
sel_set=h->master_set;
@@ -134,12 +145,18 @@ inline static int io_wait_loop_select(io_wait_h* h, int t, int repeat)
134145
n=0;
135146
/* continue */
136147
}
148+
149+
curr_time = get_ticks();
150+
137151
/* use poll fd array */
138-
for(r=h->fd_no-1; (r>=0) && n; r--){
152+
for(r=h->fd_no-1; (r>=0) ; r--){
139153
if (FD_ISSET(h->fd_array[r].fd, &sel_set)){
140-
while((handle_io(get_fd_map(h, h->fd_array[r].fd), r,IO_WATCH_READ)>0)
141-
&& repeat);
142-
n--;
154+
while( (handle_io( get_fd_map(h, h->fd_array[r].fd), r,
155+
IO_WATCH_READ)>0) && repeat );
156+
} else if ( (e=get_fd_map(h, h->fd_array[r].fd))!=NULL &&
157+
e->timeout!=0 && e->timeout<=curr_time ) {
158+
e->timeout = 0;
159+
handle_io( e, r, IO_WATCH_TIMEOUT);
143160
}
144161
};
145162
return ret;
@@ -155,6 +172,7 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
155172
struct fd_map *e;
156173
struct epoll_event ep_event;
157174
int fd;
175+
unsigned int curr_time;
158176

159177
again:
160178
ret=n=epoll_wait(h->epfd, h->ep_array, h->fd_no, t*1000);
@@ -171,22 +189,29 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
171189
goto error;
172190
}
173191
}
192+
193+
curr_time = get_ticks();
194+
174195
for (r=0; r<n; r++) {
175196
#if 0
176197
LM_NOTICE("[%s] triggering fd %d, events %d, flags %d\n",
177198
h->name, ((struct fd_map*)h->ep_array[r].data.ptr)->fd,
178-
h->ep_array[r].events, ((struct fd_map*)h->ep_array[r].data.ptr)->flags);
199+
h->ep_array[r].events,
200+
((struct fd_map*)h->ep_array[r].data.ptr)->flags);
179201
#endif
180202
/* do some sanity check over the triggered fd */
181203
e = ((struct fd_map*)h->ep_array[r].data.ptr);
182-
if (e->type==0 || e->fd<=0 || (e->flags&(IO_WATCH_READ|IO_WATCH_WRITE))==0 ) {
204+
if (e->type==0 || e->fd<=0 ||
205+
(e->flags&(IO_WATCH_READ|IO_WATCH_WRITE))==0 ) {
183206
fd = e - h->fd_hash;
184-
LM_ERR("[%s] unset/bogus map (idx=%d) triggered for %d by epoll "
185-
"(fd=%d,type=%d,flags=%x,data=%p) -> removing from epoll\n", h->name,
207+
LM_ERR("[%s] unset/bogus map (idx=%d) triggered for %d by "
208+
"epoll (fd=%d,type=%d,flags=%x,data=%p) -> removing "
209+
"from epoll\n", h->name,
186210
fd, h->ep_array[r].events,
187211
e->fd, e->type, e->flags, e->data);
188212
/* as the triggering fd has no corresponding in fd_map, better
189-
remove it from poll, to avoid un-managed reporting on this fd */
213+
* remove it from poll, to avoid un-managed reporting
214+
* on this fd */
190215
if (epoll_ctl(h->epfd, EPOLL_CTL_DEL, fd, &ep_event)<0) {
191216
LM_ERR("failed to remove from epoll %s(%d)\n",
192217
strerror(errno), errno);
@@ -249,7 +274,7 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
249274
}
250275
}
251276
/* now do the actual running of IO handlers */
252-
for(r=h->fd_no-1; (r>=0) && n ; r--) {
277+
for(r=h->fd_no-1; (r>=0) ; r--) {
253278
e = get_fd_map(h, h->fd_array[r].fd);
254279
/* test the sanity of the fd_map */
255280
if (e->flags & (IO_WATCH_PRV_TRIG_READ|IO_WATCH_PRV_TRIG_WRITE)) {
@@ -278,11 +303,12 @@ inline static int io_wait_loop_epoll(io_wait_h* h, int t, int repeat)
278303
if ( e->flags & IO_WATCH_PRV_TRIG_READ ) {
279304
e->flags &= ~IO_WATCH_PRV_TRIG_READ;
280305
while((handle_io( e, r, IO_WATCH_READ)>0) && repeat);
281-
n--;
282306
} else if ( e->flags & IO_WATCH_PRV_TRIG_WRITE ){
283307
e->flags &= ~IO_WATCH_PRV_TRIG_WRITE;
284308
handle_io( e, r, IO_WATCH_WRITE);
285-
n--;
309+
} else if ( e->timeout!=0 && e->timeout<=curr_time ) {
310+
e->timeout = 0;
311+
handle_io( e, r, IO_WATCH_TIMEOUT);
286312
}
287313
}
288314

@@ -299,6 +325,7 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
299325
int ret, n, r;
300326
struct timespec tspec;
301327
struct fd_map *e;
328+
unsigned int curr_time;
302329

303330
tspec.tv_sec=t;
304331
tspec.tv_nsec=0;
@@ -308,15 +335,20 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
308335
if (n==-1){
309336
if (errno==EINTR) goto again; /* signal, ignore it */
310337
else{
311-
LM_ERR("[%s] kevent: %s [%d]\n", h->name, strerror(errno), errno);
338+
LM_ERR("[%s] kevent: %s [%d]\n", h->name,
339+
strerror(errno), errno);
312340
goto error;
313341
}
314342
}
343+
344+
curr_time = get_ticks();
345+
315346
h->kq_nchanges=0; /* reset changes array */
316347
for (r=0; r<n; r++){
317348
#ifdef EXTRA_DEBUG
318349
LM_DBG("[%s] event %d/%d: fd=%d, udata=%lx, flags=0x%x\n",
319-
h->name, r, n, h->kq_array[r].ident, (long)h->kq_array[r].udata,
350+
h->name, r, n, h->kq_array[r].ident,
351+
(long)h->kq_array[r].udata,
320352
h->kq_array[r].flags);
321353
#endif
322354
if (h->kq_array[r].flags & EV_ERROR){
@@ -339,6 +371,9 @@ inline static int io_wait_loop_kqueue(io_wait_h* h, int t, int repeat)
339371
e->flags &= ~IO_WATCH_PRV_TRIG_READ;
340372
while((handle_io( e, r, IO_WATCH_READ)>0) && repeat);
341373
n--;
374+
} else if ( e->timeout!=0 && e->timeout<=curr_time ) {
375+
e->timeout = 0;
376+
handle_io( e, r, IO_WATCH_TIMEOUT);
342377
}
343378
}
344379

@@ -361,7 +396,6 @@ inline static int io_wait_loop_sigio_rt(io_wait_h* h, int t)
361396
int sigio_fd;
362397
struct fd_map* fm;
363398

364-
365399
ret=1; /* 1 event per call normally */
366400
ts.tv_sec=t;
367401
ts.tv_nsec=0;
@@ -469,6 +503,7 @@ inline static int io_wait_loop_devpoll(io_wait_h* h, int t, int repeat)
469503
int ret;
470504
struct dvpoll dpoll;
471505
struct fd_map *e;
506+
unsigned int curr_time;
472507

473508
dpoll.dp_timeout=t*1000;
474509
dpoll.dp_nfds=h->fd_no;
@@ -482,6 +517,9 @@ inline static int io_wait_loop_devpoll(io_wait_h* h, int t, int repeat)
482517
goto error;
483518
}
484519
}
520+
521+
curr_time = get_ticks();
522+
485523
for (r=0; r< n; r++){
486524
if (h->dp_changes[r].revents & (POLLNVAL|POLLERR)){
487525
LM_ERR("[%s] pollinval returned for fd %d, revents=%x\n",
@@ -492,12 +530,14 @@ inline static int io_wait_loop_devpoll(io_wait_h* h, int t, int repeat)
492530
IO_WATCH_PRV_TRIG_READ;
493531
}
494532
/* now do the actual running of IO handlers */
495-
for(r=h->fd_no-1; (r>=0) && n ; r--) {
533+
for(r=h->fd_no-1; (r>=0) ; r--) {
496534
e = get_fd_map(h, h->fd_array[r].fd);
497535
if ( e->flags & IO_WATCH_PRV_TRIG_READ ) {
498536
e->flags &= ~IO_WATCH_PRV_TRIG_READ;
499537
while((handle_io( e, r, IO_WATCH_READ)>0) && repeat);
500-
n--;
538+
} else if ( e->timeout!=0 && e->timeout<=curr_time ) {
539+
e->timeout = 0;
540+
handle_io( e, r, IO_WATCH_TIMEOUT);
501541
}
502542
}
503543

reactor_defs.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,13 @@ int init_reactor_size(void);
7878
init_io_wait(&_worker_io, _name, reactor_size, io_poll_method, _prio_max)
7979

8080
#define reactor_add_reader( _fd, _type, _prio, _data) \
81-
io_watch_add(&_worker_io, _fd, _type, _data, _prio, IO_WATCH_READ)
81+
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_READ)
82+
83+
#define reactor_add_reader_with_timeout( _fd, _type, _prio, _t, _data) \
84+
io_watch_add(&_worker_io, _fd, _type, _data, _prio, _t, IO_WATCH_READ)
8285

8386
#define reactor_add_writer( _fd, _type, _prio, _data) \
84-
io_watch_add(&_worker_io, _fd, _type, _data, _prio, IO_WATCH_WRITE)
87+
io_watch_add(&_worker_io, _fd, _type, _data, _prio, 0, IO_WATCH_WRITE)
8588

8689
#define reactor_del_reader( _fd, _idx, _io_flags) \
8790
io_watch_del(&_worker_io, _fd, _idx, _io_flags, IO_WATCH_READ)

0 commit comments

Comments
 (0)