61
61
62
62
63
63
#include "server.h"
64
+ #include "connection.h"
64
65
#include "bio.h"
65
66
#include <stdatomic.h>
67
+ #include <pthread.h>
68
+ #include <signal.h>
69
+ #include <errno.h>
70
+ #include <stdio.h>
66
71
67
72
static char * bio_worker_title [] = {
68
73
"bio_close_file" ,
69
74
"bio_aof" ,
70
75
"bio_lazy_free" ,
76
+ "bio_save_to_disk" ,
71
77
};
72
78
73
79
#define BIO_WORKER_NUM (sizeof(bio_worker_title) / sizeof(*bio_worker_title))
@@ -77,6 +83,7 @@ static unsigned int bio_job_to_worker[] = {
77
83
[BIO_AOF_FSYNC ] = 1 ,
78
84
[BIO_CLOSE_AOF ] = 1 ,
79
85
[BIO_LAZY_FREE ] = 2 ,
86
+ [BIO_SAVE_TO_DISK ] = 3 ,
80
87
};
81
88
82
89
static pthread_t bio_threads [BIO_WORKER_NUM ];
@@ -108,6 +115,12 @@ typedef union bio_job {
108
115
lazy_free_fn * free_fn ; /* Function that will free the provided arguments */
109
116
void * free_args []; /* List of arguments to be passed to the free function */
110
117
} free_args ;
118
+ struct {
119
+ int type ;
120
+ connection * conn ; /* Connection to download the rdb from */
121
+ int dest_fd ;
122
+ int sync_io_timeout ;
123
+ } save_to_disk_args ;
111
124
} bio_job ;
112
125
113
126
void * bioProcessBackgroundJobs (void * arg );
@@ -203,6 +216,18 @@ void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache) {
203
216
bioSubmitJob (BIO_AOF_FSYNC , job );
204
217
}
205
218
219
+ void bioCreateSaveRDBToDiskJob (connection * conn , int dest_fd , int sync_io_timeout ) {
220
+ bio_job * job = zmalloc (sizeof (* job ));
221
+ job -> save_to_disk_args .conn = conn ;
222
+ job -> save_to_disk_args .sync_io_timeout = sync_io_timeout ;
223
+ job -> save_to_disk_args .dest_fd = dest_fd ;
224
+ bioSubmitJob (BIO_SAVE_TO_DISK , job );
225
+ }
226
+
227
+ int shouldAbortSave (void ) {
228
+ return atomic_load_explicit (& server .replica_bio_abort_save , memory_order_relaxed );
229
+ }
230
+
206
231
void * bioProcessBackgroundJobs (void * arg ) {
207
232
bio_job * job ;
208
233
unsigned long worker = (unsigned long )arg ;
@@ -278,6 +303,169 @@ void *bioProcessBackgroundJobs(void *arg) {
278
303
if (job_type == BIO_CLOSE_AOF ) close (job -> fd_args .fd );
279
304
} else if (job_type == BIO_LAZY_FREE ) {
280
305
job -> free_args .free_fn (job -> free_args .free_args );
306
+ } else if (job_type == BIO_SAVE_TO_DISK ) {
307
+ ssize_t nread , readlen , nwritten ;
308
+ off_t left , repl_transfer_last_fsync_off = 0 ;
309
+ int usemark ;
310
+ off_t repl_transfer_size = -1 , repl_transfer_read = 0 ;
311
+ char eofmark [RDB_EOF_MARK_SIZE ];
312
+ char lastbytes [RDB_EOF_MARK_SIZE ];
313
+ char buf [PROTO_IOBUF_LEN ];
314
+ int sync_io_timeout = job -> save_to_disk_args .sync_io_timeout ;
315
+ connection * conn = job -> save_to_disk_args .conn ;
316
+ int dest_fd = job -> save_to_disk_args .dest_fd ;
317
+ int received_ping = 0 ;
318
+ long long stat_net_repl_input_bytes = 0 ;
319
+ int error = 0 , main_thread_abort = 0 ;
320
+
321
+ /* Put the socket in blocking mode to simplify RDB transfer.
322
+ * We'll restore it when the RDB is received. */
323
+ connBlock (conn );
324
+ connRecvTimeout (conn , sync_io_timeout );
325
+ do {
326
+ received_ping = 0 ;
327
+ if (shouldAbortSave ()) {
328
+ main_thread_abort = 1 ;
329
+ goto done ;
330
+ }
331
+ nread = connSyncReadLine (conn , buf , 1024 , sync_io_timeout );
332
+ if (nread == -1 ) {
333
+ replicaBioSaveServerLog (LL_WARNING , "I/O error reading bulk count from PRIMARY: %s" , connGetLastError (conn ));
334
+ error = 1 ;
335
+ goto done ;
336
+ } else {
337
+ /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and
338
+ * convert "\r\n" to '\0' so 1 byte is lost. */
339
+ stat_net_repl_input_bytes += nread + 1 ;
340
+ }
341
+ int ret = inspectBulkPayloadHeaderForErrors (buf );
342
+ if (ret == INSPECT_BULK_PAYLOAD_PRIMARY_ABORT ) {
343
+ replicaBioSaveServerLog (LL_WARNING , "PRIMARY aborted replication with an error: %s" , buf + 1 );
344
+ error = 1 ;
345
+ goto done ;
346
+ } else if (ret == INSPECT_BULK_PAYLOAD_PRIMARY_PING ) {
347
+ atomic_store_explicit (& server .repl_transfer_lastio , atomic_load_explicit (& server .unixtime , memory_order_relaxed ), memory_order_relaxed );
348
+ memset (buf , 0 , PROTO_IOBUF_LEN );
349
+ received_ping = 1 ;
350
+ } else if (ret == INSPECT_BULK_PAYLOAD_PRIMARY_BAD_PROTO ) {
351
+ replicaBioSaveServerLog (LL_WARNING ,
352
+ "Bad protocol from PRIMARY, the first byte is not '$' (we received '%s'), are you sure the host "
353
+ "and port are right?" ,
354
+ buf );
355
+ error = 1 ;
356
+ goto done ;
357
+ }
358
+ } while (received_ping );
359
+
360
+ usemark = inspectBulkPayloadHeaderForEOF (buf , eofmark , lastbytes );
361
+ if (usemark ) {
362
+ repl_transfer_size = 0 ;
363
+ replicaBioSaveServerLog (LL_NOTICE , "PRIMARY <-> REPLICA sync: receiving streamed RDB from primary with EOF to disk" );
364
+ } else {
365
+ repl_transfer_size = strtol (buf + 1 , NULL , 10 );
366
+ replicaBioSaveServerLog (LL_NOTICE , "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary to disk" ,
367
+ (long long )repl_transfer_size );
368
+ }
369
+
370
+ while (1 ) {
371
+ if (shouldAbortSave ()) {
372
+ replicaBioSaveServerLog (3 , "BIO THREAD ABORTING DUE TO MAIN THREAD REQUEST" );
373
+ main_thread_abort = 1 ;
374
+ goto done ;
375
+ }
376
+ if (usemark ) {
377
+ readlen = sizeof (buf );
378
+ } else {
379
+ left = repl_transfer_size - repl_transfer_read ;
380
+ readlen = (left < (signed )sizeof (buf )) ? left : (signed )sizeof (buf );
381
+ }
382
+ nread = connRead (conn , buf , readlen );
383
+ if (nread <= 0 ) {
384
+ if (connGetState (conn ) == CONN_STATE_CONNECTED ) {
385
+ /* equivalent to EAGAIN */
386
+ memset (buf , 0 , PROTO_IOBUF_LEN );
387
+ continue ;
388
+ }
389
+ replicaBioSaveServerLog (LL_WARNING , "I/O error trying to sync with PRIMARY: %s" ,
390
+ (nread == -1 ) ? connGetLastError (conn ) : "connection lost" );
391
+ error = 1 ;
392
+ goto done ;
393
+ }
394
+ stat_net_repl_input_bytes += nread ;
395
+
396
+ /* When a mark is used, we want to detect EOF asap in order to avoid
397
+ * writing the EOF mark into the file... */
398
+ int eof_reached = 0 ;
399
+ if (usemark ) eof_reached = inspectBulkPayloadForEOF (buf , nread , eofmark , lastbytes );
400
+ /* Update the last I/O time for the replication transfer (used in
401
+ * order to detect timeouts during replication), and write what we
402
+ * got from the socket to the dump file on disk. */
403
+ atomic_store_explicit (& server .repl_transfer_lastio , atomic_load_explicit (& server .unixtime , memory_order_relaxed ), memory_order_relaxed );
404
+ if ((nwritten = write (dest_fd , buf , nread )) != nread ) {
405
+ replicaBioSaveServerLog (LL_WARNING ,
406
+ "Write error or short write writing to the DB dump file "
407
+ "needed for PRIMARY <-> REPLICA synchronization: %s" ,
408
+ (nwritten == -1 ) ? strerror (errno ) : "short write" );
409
+ error = 1 ;
410
+ goto done ;
411
+ }
412
+ repl_transfer_read += nread ;
413
+
414
+ /* Delete the last 40 bytes from the file if we reached EOF. */
415
+ if (usemark && eof_reached ) {
416
+ if (ftruncate (dest_fd , repl_transfer_read - RDB_EOF_MARK_SIZE ) == -1 ) {
417
+ replicaBioSaveServerLog (LL_WARNING ,
418
+ "Error truncating the RDB file received from the primary "
419
+ "for SYNC: %s" ,
420
+ strerror (errno ));
421
+ error = 1 ;
422
+ goto done ;
423
+ }
424
+ }
425
+
426
+ /* Sync data on disk from time to time, otherwise at the end of the
427
+ * transfer we may suffer a big delay as the memory buffers are copied
428
+ * into the actual disk. */
429
+ if (repl_transfer_read >= repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC ) {
430
+ off_t sync_size = repl_transfer_read - repl_transfer_last_fsync_off ;
431
+ rdb_fsync_range (dest_fd , repl_transfer_last_fsync_off , sync_size );
432
+ repl_transfer_last_fsync_off += sync_size ;
433
+ }
434
+
435
+ /* Check if the transfer is now complete */
436
+ if (!usemark ) {
437
+ if (repl_transfer_read == repl_transfer_size ) eof_reached = 1 ;
438
+ }
439
+
440
+ /* If the transfer is yet not complete, we need to read more, so
441
+ * return ASAP and wait for the handler to be called again. */
442
+ if (!eof_reached ) {
443
+ memset (buf , 0 , PROTO_IOBUF_LEN );
444
+ continue ;
445
+ }
446
+
447
+ break ;
448
+ }
449
+
450
+ done :
451
+ /* Restore the socket to the original state to continue
452
+ * with the normal replication. */
453
+ connNonBlock (conn );
454
+ connRecvTimeout (conn , 0 );
455
+ if (main_thread_abort ) {
456
+ // Do nothing for now, main thread already handles cancelReplHandshake
457
+ replicaBioSaveServerLog (LL_WARNING , "Replica main thread aborted RDB save" );
458
+ } else if (error ) {
459
+ replicaBioSaveServerLog (LL_WARNING , "Error downloading RDB" );
460
+ atomic_store_explicit (& server .replica_bio_disk_save_state , REPL_BIO_DISK_SAVE_STATE_FAIL , memory_order_release );
461
+ } else {
462
+ replicaBioSaveServerLog (LL_NOTICE , "Done downloading RDB" );
463
+ server .bio_stat_net_repl_input_bytes = stat_net_repl_input_bytes ;
464
+ server .bio_repl_transfer_size = repl_transfer_size ;
465
+ server .bio_repl_transfer_read = repl_transfer_read ;
466
+ server .bio_conn = conn ;
467
+ atomic_store_explicit (& server .replica_bio_disk_save_state , REPL_BIO_DISK_SAVE_STATE_FINISHED , memory_order_release );
468
+ }
281
469
} else {
282
470
serverPanic ("Wrong job type in bioProcessBackgroundJobs()." );
283
471
}
0 commit comments