Skip to content

Commit db31483

Browse files
snapshots: non-blocking
1 parent d2aec3e commit db31483

File tree

4 files changed

+47
-29
lines changed

4 files changed

+47
-29
lines changed

src/discof/restore/utils/fd_snapshot_httpdl.c

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,16 @@ fd_snapshot_httpdl_reset( fd_snapshot_httpdl_t * self ) {
102102
self->hops = FD_SNAPSHOT_HTTPDL_DEFAULT_HOPS;
103103
self->req_deadline = 0L;
104104

105-
self->req_tail = 0UL;
106-
self->req_head = 0UL;
107-
self->resp_tail = 0UL;
108-
self->resp_head = 0UL;
109-
self->dl_total = 0UL;
110-
self->last_dl_total = 0UL;
111-
self->last_nanos = 0UL;
112-
self->write_total = 0UL;
113-
self->content_len = 0UL;
105+
self->req_tail = 0UL;
106+
self->req_head = 0UL;
107+
self->resp_tail = 0UL;
108+
self->file_resp_tail = 0UL;
109+
self->resp_head = 0UL;
110+
self->dl_total = 0UL;
111+
self->last_dl_total = 0UL;
112+
self->last_nanos = 0UL;
113+
self->write_total = 0UL;
114+
self->content_len = 0UL;
114115

115116
fd_memset( self->req_buf, 0, sizeof(self->req_buf) );
116117
fd_memset( self->resp_buf, 0, sizeof(self->resp_buf) );
@@ -300,7 +301,7 @@ fd_snapshot_httpdl_init_full_snapshot_file( fd_snapshot_httpdl_t * self,
300301
fd_memcpy( self->snapshot_filename, self->full_snapshot_entry->filename, PATH_MAX );
301302

302303
/* open full snapshot save file */
303-
self->current_snapshot_fd = open( self->snapshot_filename_temp, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR );
304+
self->current_snapshot_fd = open( self->snapshot_filename_temp, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
304305
if( FD_UNLIKELY( self->current_snapshot_fd<0 ) ) {
305306
FD_LOG_WARNING(( "open(%s) failed (%d-%s)", self->snapshot_filename_temp, errno, fd_io_strerror( errno ) ));
306307
self->state = FD_SNAPSHOT_HTTPDL_STATE_FAIL;
@@ -372,9 +373,10 @@ fd_snapshot_httpdl_init_incremental_snapshot_file( fd_snapshot_httpdl_t * self,
372373
static void
373374
fd_snapshot_httpdl_reset_req( fd_snapshot_httpdl_t * self ) {
374375
self->req_deadline = fd_log_wallclock() + (long)FD_SNAPSHOT_HTTPDL_REQUEST_TIMEOUT;
375-
self->state = FD_SNAPSHOT_HTTPDL_STATE_REQ;
376-
self->resp_tail = 0UL;
377-
self->resp_head = 0UL;
376+
self->state = FD_SNAPSHOT_HTTPDL_STATE_REQ;
377+
self->resp_tail = 0UL;
378+
self->file_resp_tail = 0UL;
379+
self->resp_head = 0UL;
378380
}
379381

380382
static int
@@ -527,7 +529,8 @@ fd_snapshot_httpdl_resp( fd_snapshot_httpdl_t * self ) {
527529
Remember where the leftover tail started so we can later reuse it
528530
during response reading. */
529531

530-
self->resp_tail = (ulong)parse_res;
532+
self->resp_tail = (ulong)parse_res;
533+
self->file_resp_tail = self->resp_tail;
531534

532535
/* Is it a redirect? If so, start over. */
533536

@@ -598,23 +601,25 @@ fd_snapshot_httpdl_resp( fd_snapshot_httpdl_t * self ) {
598601

599602
static int
600603
fd_snapshot_httldl_write_snapshot_file( fd_snapshot_httpdl_t * self,
601-
ulong write_sz ) {
604+
ulong sz,
605+
ulong * written_sz ) {
602606
FD_TEST( self->current_snapshot_fd != -1 );
603607

604608
/* write out to snapshot file */
605609
ulong src_sz;
606610
int err = fd_io_write( self->current_snapshot_fd,
607-
self->resp_buf + self->resp_tail,
608-
write_sz,
609-
write_sz,
611+
self->resp_buf + self->file_resp_tail,
612+
sz,
613+
sz,
610614
&src_sz );
611-
if( FD_UNLIKELY( err!=0 ) ) {
615+
if( FD_UNLIKELY( err!=EAGAIN ) ) {
612616
FD_LOG_WARNING(( "fd_io_write() failed (%d-%s) requested %lu bytes and wrote %lu bytes", err, fd_io_strerror( err ), write_sz, src_sz ));
613617
self->state = FD_SNAPSHOT_HTTPDL_STATE_FAIL;
614618
fd_snapshot_httpdl_cleanup_fds( self );
615619
return err;
616620
}
617621

622+
*written_sz = src_sz;
618623
return 0;
619624
}
620625

@@ -655,21 +660,30 @@ fd_snapshot_httpdl_write( fd_snapshot_httpdl_t * self,
655660

656661
/* write out to in memory buffer */
657662
ulong write_sz = fd_ulong_min( avail_sz, dst_max );
658-
fd_memcpy( dst, self->resp_buf + self->resp_tail, write_sz );
659-
*sz = write_sz;
663+
if( write_sz ) {
664+
fd_memcpy( dst, self->resp_buf + self->resp_tail, write_sz );
665+
*sz = write_sz;
666+
}
667+
668+
ulong avail_file_sz = self->resp_head - self->file_resp_tail;
669+
ulong file_write_sz = fd_ulong_min( avail_file_sz, dst_max );
660670

661671
/* save snapshot contents to file */
662-
int err = fd_snapshot_httldl_write_snapshot_file( self, write_sz );
672+
ulong file_written_sz = 0UL;
673+
int err = fd_snapshot_httldl_write_snapshot_file( self, file_write_sz, &file_written_sz );
663674
if( FD_UNLIKELY( err ) ) {
664675
return err;
665676
}
666677

667678
self->resp_tail += (uint)write_sz;
679+
self->file_resp_tail += (uint)file_written_sz;
668680
self->write_total += write_sz;
681+
self->file_write_total += file_written_sz;
669682
self->metrics.bytes_read = self->dl_total;
670683

671684
/* check if done downloading and writing */
672-
if( self->content_len == self->write_total ) {
685+
if( self->content_len == self->write_total &&
686+
self->content_len == self->file_write_total ) {
673687
FD_LOG_NOTICE(( "Wrote out all %lu MB", self->write_total>>20 ));
674688

675689
self->state = FD_SNAPSHOT_HTTPDL_STATE_DONE;
@@ -696,9 +710,10 @@ fd_snapshot_httpdl_dl( fd_snapshot_httpdl_t * self,
696710
else return 0;
697711
}
698712

699-
if( self->resp_head == self->resp_tail ) {
713+
if( self->resp_head == self->resp_tail &&
714+
self->resp_head == self->file_resp_tail ) {
700715
/* Empty resp buffer means we can recv more bytes */
701-
self->resp_tail = self->resp_head = 0UL;
716+
self->resp_tail = self->file_resp_tail = self->resp_head = 0UL;
702717
long recv_sz = recv( self->socket_fd, self->resp_buf,
703718
fd_ulong_min( self->content_len - self->dl_total, FD_SNAPSHOT_HTTPDL_RESP_BUF_MAX ),
704719
MSG_DONTWAIT );

src/discof/restore/utils/fd_snapshot_httpdl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ struct fd_snapshot_httpdl {
6969

7070
uchar resp_buf[ FD_SNAPSHOT_HTTPDL_RESP_BUF_MAX ];
7171
ulong resp_tail;
72+
ulong file_resp_tail;
7273
ulong resp_head;
7374

7475
/* value from "content-length:" */
@@ -85,6 +86,7 @@ struct fd_snapshot_httpdl {
8586
/* Total written out so far */
8687

8788
ulong write_total;
89+
ulong file_write_total;
8890

8991
/* full snapshot base slot used to verify incremental snapshot */
9092
ulong base_slot;

src/discof/restore/utils/fd_snapshot_reader.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ fd_snapshot_reader_new( void * mem,
6060
void * full_snapshot_file_mem = FD_SCRATCH_ALLOC_APPEND( l,
6161
fd_snapshot_file_align(),
6262
fd_snapshot_file_footprint() );
63-
int full_fd = open( full_snapshot_entry->filename, O_RDONLY|O_CLOEXEC );
63+
int full_fd = open( full_snapshot_entry->filename, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
6464
if( FD_UNLIKELY( full_fd<0 ) ) FD_LOG_ERR(( "open() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
6565

6666
FD_LOG_NOTICE(( "Retrieving full snapshot from %s", full_snapshot_entry->filename ));
@@ -78,7 +78,7 @@ fd_snapshot_reader_new( void * mem,
7878
void * incremental_snapshot_file_mem = FD_SCRATCH_ALLOC_APPEND( l,
7979
fd_snapshot_file_align(),
8080
fd_snapshot_file_footprint() );
81-
int inc_fd = open( incremental_snapshot_entry->inner.filename, O_RDONLY|O_CLOEXEC );
81+
int inc_fd = open( incremental_snapshot_entry->inner.filename, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
8282
if( FD_UNLIKELY( inc_fd<0 ) ) FD_LOG_ERR(( "open() failed (%i-%s)", errno, fd_io_strerror( errno ) ));
8383

8484
FD_LOG_NOTICE(( "Retrieving incremental snapshot from %s", incremental_snapshot_entry->inner.filename ));

src/discof/restore/utils/fd_snapshot_reader.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ fd_snapshot_reader_set_source_incremental( fd_snapshot_reader_t * self ) {
8282
FD_TEST( self->full_src.this );
8383

8484
/* There can be only be three different snapshot source configurations:
85-
- Both full and incremental snapshot files exist on disk locally
86-
- A valid local full snapshot file exists but no local incremental snapshot file exists
85+
- Both full and incremental snapshot files exist on disk locally.
86+
- A valid local full snapshot file exists but no local incremental
87+
snapshot file exists.
8788
- Both the full and incremental snapshots must be downloaded */
8889
if( self->full_src.src_type == SRC_FILE ) {
8990
/* If the full snapshot was read from disk,

0 commit comments

Comments
 (0)