Skip to content

Commit 7090635

Browse files
committedOct 17, 2024·
Pass notifies from serve processes to xfrd
Directly without main or backup-main proxying for it.
1 parent 740902a commit 7090635

File tree

7 files changed

+170
-17
lines changed

7 files changed

+170
-17
lines changed
 

‎ipc.c

+44
Original file line numberDiff line numberDiff line change
@@ -777,3 +777,47 @@ xfrd_handle_ipc_read(struct event* handler, xfrd_state_type* xfrd)
777777
buffer_clear(xfrd->ipc_conn->packet);
778778
}
779779
}
780+
781+
void
782+
xfrd_handle_notify(int fd, short event, void* arg)
783+
{
784+
xfrd_state_type* xfrd = (xfrd_state_type*)arg;
785+
ssize_t r;
786+
uint32_t acl_num;
787+
int32_t acl_xfr;
788+
789+
if (!(event & EV_READ))
790+
return;
791+
792+
/* xfrd->notify_message is really local to the scope of this function,
793+
* but allocated beforehand anyway to prevent claiming more than
794+
* QIOBUFSZ on the stack. There is only a single xfrd event loop,
795+
* therefore it is safe to use this semi global variable.
796+
*/
797+
buffer_clear(xfrd->notify_message);
798+
r = recv(fd, buffer_current( xfrd->notify_message)
799+
, buffer_capacity(xfrd->notify_message), MSG_DONTWAIT);
800+
if(r == -1) {
801+
if(errno != EAGAIN && errno != EINTR && errno != EMSGSIZE) {
802+
log_msg( LOG_ERR
803+
, "xfrd_handle_notify receive failed: %s"
804+
, strerror(errno));
805+
}
806+
return;
807+
} else if(r == 0) {
808+
log_msg(LOG_ERR, "xfrd_handle_notify remote closed connection");
809+
return;
810+
} else if(r < (ssize_t)(sizeof(acl_xfr) + sizeof(acl_num))) {
811+
log_msg(LOG_ERR, "xfrd_handle_notify invalid message size");
812+
return;
813+
}
814+
/* acl_num and acl_xfr are appended to the NOTIFY message */
815+
acl_num = buffer_read_u32_at(xfrd->notify_message,
816+
r - sizeof(acl_xfr) - sizeof(acl_num));
817+
acl_xfr = (int32_t)buffer_read_u32_at(xfrd->notify_message,
818+
r - sizeof(acl_xfr));
819+
buffer_skip(xfrd->notify_message, r - sizeof(acl_xfr) - sizeof(acl_num));
820+
buffer_flip(xfrd->notify_message);
821+
xfrd_handle_passed_packet(xfrd->notify_message, acl_num, acl_xfr);
822+
}
823+

‎ipc.h

+3
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ void child_handle_parent_command(int fd, short event, void* arg);
8484
*/
8585
void xfrd_handle_ipc(int fd, short event, void* arg);
8686

87+
/* receive incoming notifies received by and from the serve processes */
88+
void xfrd_handle_notify(int fd, short event, void* arg);
89+
8790
/* check if all children have exited in an orderly fashion and set mode */
8891
void parent_check_all_children_exited(struct nsd* nsd);
8992

‎nsd.h

+9
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,15 @@ struct nsd
354354
* simultaneous with new serve childs. */
355355
int *dt_collector_fd_swap;
356356
#endif /* USE_DNSTAP */
357+
/* the pipes from the serve processes to xfrd, for passing through
358+
* NOTIFY messages, arrays of size child_count * 2.
359+
* Kept open for (re-)forks. */
360+
int *serve2xfrd_fd_send, *serve2xfrd_fd_recv;
361+
/* the pipes from the serve processes to the xfrd. Initially
362+
* these point halfway into serve2xfrd_fd_send, but during reload
363+
* the pointer is swapped with serve2xfrd_fd_send so that only one
364+
* serve child will write to the same fd simultaneously. */
365+
int *serve2xfrd_fd_swap;
357366
/* ratelimit for errors, time value */
358367
time_t err_limit_time;
359368
/* ratelimit for errors, packet count */

‎query.c

+29-16
Original file line numberDiff line numberDiff line change
@@ -466,8 +466,7 @@ answer_notify(struct nsd* nsd, struct query *query)
466466
if((acl_num = acl_check_incoming(zone_opt->pattern->allow_notify, query,
467467
&why)) != -1)
468468
{
469-
sig_atomic_t mode = NSD_PASS_TO_XFRD;
470-
int s = nsd->this_child->parent_fd;
469+
ssize_t r;
471470
uint16_t sz;
472471
uint32_t acl_send = htonl(acl_num);
473472
uint32_t acl_xfr;
@@ -488,30 +487,44 @@ answer_notify(struct nsd* nsd, struct query *query)
488487
sz = buffer_limit(query->packet);
489488
if(buffer_limit(query->packet) > MAX_PACKET_SIZE)
490489
return query_error(query, NSD_RC_SERVFAIL);
491-
/* forward to xfrd for processing
492-
Note. Blocking IPC I/O, but acl is OK. */
493-
sz = htons(sz);
494-
if(!write_socket(s, &mode, sizeof(mode)) ||
495-
!write_socket(s, &sz, sizeof(sz)) ||
496-
!write_socket(s, buffer_begin(query->packet),
497-
buffer_limit(query->packet)) ||
498-
!write_socket(s, &acl_send, sizeof(acl_send)) ||
499-
!write_socket(s, &acl_xfr, sizeof(acl_xfr))) {
500-
log_msg(LOG_ERR, "error in IPC notify server2main, %s",
490+
/* Temporary append acl_send and acl_xfr after the NOTIFY
491+
* message in the buffer, for sending along to the xfrd */
492+
assert(buffer_capacity(query->packet) >
493+
sz + sizeof(acl_send) + sizeof(acl_xfr));
494+
buffer_set_limit( query->packet
495+
, sz + sizeof(acl_send) + sizeof(acl_xfr));
496+
buffer_write_u32_at(query->packet, sz, acl_send);
497+
buffer_write_u32_at(query->packet, sz + sizeof(acl_send)
498+
, acl_xfr);
499+
r = send( nsd->serve2xfrd_fd_send[nsd->this_child->child_num]
500+
, buffer_begin(query->packet)
501+
, buffer_limit(query->packet)
502+
, MSG_DONTWAIT | MSG_NOSIGNAL
503+
);
504+
/* Restore query->packet buffer by removing the earlier
505+
* appended acl_send and acl_xfr again*/
506+
buffer_set_limit(query->packet, sz);
507+
if(r < 0) {
508+
log_msg(LOG_ERR, "error in IPC notify serve2xfrd, %s",
501509
strerror(errno));
502510
return query_error(query, NSD_RC_SERVFAIL);
511+
} else if(r == 0) {
512+
log_msg(LOG_ERR, "error in IPC notify serve2xfrd, %s",
513+
"xfrd closed the channel");
514+
return query_error(query, NSD_RC_SERVFAIL);
503515
}
504516
if(verbosity >= 1) {
505517
uint32_t serial = 0;
506518
char address[128];
507519
addr2str(&query->client_addr, address, sizeof(address));
508520
if(packet_find_notify_serial(query->packet, &serial))
509-
VERBOSITY(1, (LOG_INFO, "notify for %s from %s serial %u",
521+
VERBOSITY(1, (LOG_INFO, "notify for %s from %s serial %u size %u",
510522
dname_to_string(query->qname, NULL), address,
511-
(unsigned)serial));
523+
(unsigned)serial, (unsigned)sz));
512524
else
513-
VERBOSITY(1, (LOG_INFO, "notify for %s from %s",
514-
dname_to_string(query->qname, NULL), address));
525+
VERBOSITY(1, (LOG_INFO, "notify for %s from %s size %u",
526+
dname_to_string(query->qname, NULL), address,
527+
(unsigned)sz));
515528
}
516529

517530
/* create notify reply - keep same query contents */

‎server.c

+64-1
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,7 @@ void
16401640
server_prepare_xfrd(struct nsd* nsd)
16411641
{
16421642
char tmpfile[256];
1643+
size_t i;
16431644
/* create task mmaps */
16441645
nsd->mytask = 0;
16451646
snprintf(tmpfile, sizeof(tmpfile), "%snsd-xfr-%d/nsd.%u.task.0",
@@ -1683,6 +1684,47 @@ server_prepare_xfrd(struct nsd* nsd)
16831684
nsd;
16841685
((struct ipc_handler_conn_data*)nsd->xfrd_listener->user_data)->conn =
16851686
xfrd_tcp_create(nsd->region, QIOBUFSZ);
1687+
/* setup sockets to pass NOTIFY messages from the serve processes */
1688+
nsd->serve2xfrd_fd_send = region_alloc_array(
1689+
nsd->region, 2 * nsd->child_count, sizeof(int));
1690+
nsd->serve2xfrd_fd_recv= region_alloc_array(
1691+
nsd->region, 2 * nsd->child_count, sizeof(int));
1692+
for(i=0; i < 2 * nsd->child_count; i++) {
1693+
int sv[2];
1694+
int bufsz = QIOBUFSZ;
1695+
sv[0] = -1; /* For receiving by parent (xfrd) */
1696+
sv[1] = -1; /* For sending by child (server childs) */
1697+
if(socketpair(AF_UNIX, SOCK_DGRAM
1698+
#ifdef SOCK_NONBLOCK
1699+
| SOCK_NONBLOCK
1700+
#endif
1701+
, 0, sv) < 0) {
1702+
log_msg(LOG_ERR, "fatal error: cannot create NOTIFY "
1703+
"communication channel: %s", strerror(errno));
1704+
exit(1);
1705+
}
1706+
#ifndef SOCK_NONBLOCK
1707+
if (fcntl(sv[0], F_SETFL, O_NONBLOCK) == -1) {
1708+
log_msg(LOG_ERR, "serve2xfrd receive fd fcntl "
1709+
"failed: %s", strerror(errno));
1710+
}
1711+
if (fcntl(sv[1], F_SETFL, O_NONBLOCK) == -1) {
1712+
log_msg(LOG_ERR, "serve2xfrd send fd fcntl "
1713+
"failed: %s", strerror(errno));
1714+
}
1715+
#endif
1716+
if(setsockopt(sv[0], SOL_SOCKET, SO_RCVBUF, &bufsz, sizeof(bufsz))) {
1717+
log_msg(LOG_ERR, "setting serve2xfrd "
1718+
"receive buffer size failed: %s", strerror(errno));
1719+
}
1720+
if(setsockopt(sv[1], SOL_SOCKET, SO_SNDBUF, &bufsz, sizeof(bufsz))) {
1721+
log_msg(LOG_ERR, "setting serve2xfrd "
1722+
"send buffer size failed: %s", strerror(errno));
1723+
}
1724+
nsd->serve2xfrd_fd_recv[i] = sv[0];
1725+
nsd->serve2xfrd_fd_send[i] = sv[1];
1726+
}
1727+
nsd->serve2xfrd_fd_swap = nsd->serve2xfrd_fd_send + nsd->child_count;
16861728
}
16871729

16881730

@@ -1692,6 +1734,7 @@ server_start_xfrd(struct nsd *nsd, int del_db, int reload_active)
16921734
pid_t pid;
16931735
int sockets[2] = {0,0};
16941736
struct ipc_handler_conn_data *data;
1737+
size_t i;
16951738

16961739
if(nsd->xfrd_listener->fd != -1)
16971740
close(nsd->xfrd_listener->fd);
@@ -1728,6 +1771,14 @@ server_start_xfrd(struct nsd *nsd, int del_db, int reload_active)
17281771
* restarted, the reload is using nsd->mytask */
17291772
nsd->mytask = 1 - nsd->mytask;
17301773

1774+
/* close the send site of the serve2xfrd fds */
1775+
assert(nsd->serve2xfrd_fd_send < nsd->serve2xfrd_fd_swap);
1776+
for(i = 0; i < 2 * nsd->child_count; i++) {
1777+
if(nsd->serve2xfrd_fd_send[i] != -1) {
1778+
close(nsd->serve2xfrd_fd_send[i]);
1779+
nsd->serve2xfrd_fd_send[i] = -1;
1780+
}
1781+
}
17311782
#ifdef HAVE_SETPROCTITLE
17321783
setproctitle("xfrd");
17331784
#endif
@@ -1750,6 +1801,13 @@ server_start_xfrd(struct nsd *nsd, int del_db, int reload_active)
17501801
log_msg(LOG_ERR, "cannot fcntl pipe: %s", strerror(errno));
17511802
}
17521803
nsd->xfrd_listener->fd = sockets[0];
1804+
/* close the receive site of the serve2xfrd fds */
1805+
for(i = 0; i < 2 * nsd->child_count; i++) {
1806+
if(nsd->serve2xfrd_fd_recv[i] != -1) {
1807+
close(nsd->serve2xfrd_fd_recv[i]);
1808+
nsd->serve2xfrd_fd_recv[i] = -1;
1809+
}
1810+
}
17531811
#ifdef HAVE_SETPROCTITLE
17541812
setproctitle("main");
17551813
#endif
@@ -2404,6 +2462,9 @@ server_reload(struct nsd *nsd, region_type* server_region, netio_type* netio,
24042462
struct quit_sync_event_data cb_data;
24052463
struct event signal_event, cmd_event;
24062464
struct timeval reload_sync_timeout;
2465+
/* For swapping filedescriptors from the serve childs to the xfrd
2466+
* and/or the dnstap collector */
2467+
int *swap_fd_send;
24072468

24082469
/* ignore SIGCHLD from the previous server_main that used this pid */
24092470
memset(&ign_sigchld, 0, sizeof(ign_sigchld));
@@ -2491,7 +2552,6 @@ server_reload(struct nsd *nsd, region_type* server_region, netio_type* netio,
24912552
sigaction(SIGCHLD, &old_sigchld, NULL);
24922553
#ifdef USE_DNSTAP
24932554
if (nsd->dt_collector) {
2494-
int *swap_fd_send;
24952555
DEBUG(DEBUG_IPC,1, (LOG_INFO, "reload: swap dnstap collector pipes"));
24962556
/* Swap fd_send with fd_swap so old serve child and new serve
24972557
* childs will not write to the same pipe ends simultaneously */
@@ -2501,6 +2561,9 @@ server_reload(struct nsd *nsd, region_type* server_region, netio_type* netio,
25012561

25022562
}
25032563
#endif
2564+
swap_fd_send = nsd->serve2xfrd_fd_send;
2565+
nsd->serve2xfrd_fd_send = nsd->serve2xfrd_fd_swap;
2566+
nsd->serve2xfrd_fd_swap = swap_fd_send;
25042567
/* Start new child processes */
25052568
if (server_start_children(nsd, server_region, netio, &nsd->
25062569
xfrd_listener->fd) != 0) {

‎xfrd.c

+15
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ xfrd_init(int socket, struct nsd* nsd, int shortsoa, int reload_active,
131131
pid_t nsd_pid)
132132
{
133133
region_type* region;
134+
size_t i;
134135

135136
assert(xfrd == 0);
136137
/* to setup signalhandling */
@@ -190,6 +191,20 @@ xfrd_init(int socket, struct nsd* nsd, int shortsoa, int reload_active,
190191
xfrd->need_to_send_shutdown = 0;
191192
xfrd->need_to_send_stats = 0;
192193

194+
xfrd->serve2xfrd = (struct event *) region_alloc_array_zero(
195+
xfrd->region, nsd->child_count * 2, sizeof(struct event));
196+
for(i = 0; i < 2 * nsd->child_count; i++) {
197+
memset(&xfrd->serve2xfrd[i], 0, sizeof(struct event));
198+
event_set(&xfrd->serve2xfrd[i], nsd->serve2xfrd_fd_recv[i],
199+
EV_PERSIST|EV_READ, xfrd_handle_notify, xfrd);
200+
if(event_base_set(xfrd->event_base, &xfrd->serve2xfrd[i]) != 0)
201+
log_msg( LOG_ERR
202+
, "xfrd serve2xfrd: event_base_set failed");
203+
if(event_add(&xfrd->serve2xfrd[i], NULL) != 0)
204+
log_msg(LOG_ERR, "xfrd serve2xfrd: event_add failed");
205+
}
206+
xfrd->notify_message = buffer_create(xfrd->region, QIOBUFSZ);
207+
193208
xfrd->write_zonefile_needed = 0;
194209
if(nsd->options->zonefiles_write)
195210
xfrd_write_timer_set();

‎xfrd.h

+6
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ struct xfrd_state {
9999
int ipc_handler_flags;
100100
struct xfrd_tcp *ipc_conn;
101101
struct buffer* ipc_pass;
102+
103+
/* 2 * nsd->child_count communication channels with the serve childs */
104+
struct event* serve2xfrd;
105+
/* the message passed down from the serve process */
106+
struct buffer* notify_message;
107+
102108
/* sending ipc to server_main */
103109
uint8_t need_to_send_shutdown;
104110
uint8_t need_to_send_reload;

0 commit comments

Comments
 (0)
Please sign in to comment.