Skip to content

snapshots: non-blocking #5629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 42 additions & 26 deletions src/discof/restore/utils/fd_snapshot_httpdl.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ fd_snapshot_httpdl_reset( fd_snapshot_httpdl_t * self ) {
self->hops = FD_SNAPSHOT_HTTPDL_DEFAULT_HOPS;
self->req_deadline = 0L;

self->req_tail = 0UL;
self->req_head = 0UL;
self->resp_tail = 0UL;
self->resp_head = 0UL;
self->dl_total = 0UL;
self->last_dl_total = 0UL;
self->last_nanos = 0UL;
self->write_total = 0UL;
self->content_len = 0UL;
self->req_tail = 0UL;
self->req_head = 0UL;
self->resp_tail = 0UL;
self->file_resp_tail = 0UL;
self->resp_head = 0UL;
self->dl_total = 0UL;
self->last_dl_total = 0UL;
self->last_nanos = 0UL;
self->write_total = 0UL;
self->file_write_total = 0UL;
self->content_len = 0UL;

fd_memset( self->req_buf, 0, sizeof(self->req_buf) );
fd_memset( self->resp_buf, 0, sizeof(self->resp_buf) );
Expand Down Expand Up @@ -300,7 +302,7 @@ fd_snapshot_httpdl_init_full_snapshot_file( fd_snapshot_httpdl_t * self,
fd_memcpy( self->snapshot_filename, self->full_snapshot_entry->filename, PATH_MAX );

/* open full snapshot save file */
self->current_snapshot_fd = open( self->snapshot_filename_temp, O_WRONLY|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR );
self->current_snapshot_fd = open( self->snapshot_filename_temp, O_WRONLY|O_CREAT|O_TRUNC|O_NONBLOCK, S_IRUSR|S_IWUSR );
if( FD_UNLIKELY( self->current_snapshot_fd<0 ) ) {
FD_LOG_WARNING(( "open(%s) failed (%d-%s)", self->snapshot_filename_temp, errno, fd_io_strerror( errno ) ));
self->state = FD_SNAPSHOT_HTTPDL_STATE_FAIL;
Expand Down Expand Up @@ -372,9 +374,10 @@ fd_snapshot_httpdl_init_incremental_snapshot_file( fd_snapshot_httpdl_t * self,
static void
fd_snapshot_httpdl_reset_req( fd_snapshot_httpdl_t * self ) {
self->req_deadline = fd_log_wallclock() + (long)FD_SNAPSHOT_HTTPDL_REQUEST_TIMEOUT;
self->state = FD_SNAPSHOT_HTTPDL_STATE_REQ;
self->resp_tail = 0UL;
self->resp_head = 0UL;
self->state = FD_SNAPSHOT_HTTPDL_STATE_REQ;
self->resp_tail = 0UL;
self->file_resp_tail = 0UL;
self->resp_head = 0UL;
}

static int
Expand Down Expand Up @@ -527,7 +530,8 @@ fd_snapshot_httpdl_resp( fd_snapshot_httpdl_t * self ) {
Remember where the leftover tail started so we can later reuse it
during response reading. */

self->resp_tail = (ulong)parse_res;
self->resp_tail = (ulong)parse_res;
self->file_resp_tail = self->resp_tail;

/* Is it a redirect? If so, start over. */

Expand Down Expand Up @@ -598,23 +602,25 @@ fd_snapshot_httpdl_resp( fd_snapshot_httpdl_t * self ) {

static int
fd_snapshot_httldl_write_snapshot_file( fd_snapshot_httpdl_t * self,
ulong write_sz ) {
ulong sz,
ulong * written_sz ) {
FD_TEST( self->current_snapshot_fd != -1 );

/* write out to snapshot file */
ulong src_sz;
int err = fd_io_write( self->current_snapshot_fd,
self->resp_buf + self->resp_tail,
write_sz,
write_sz,
self->resp_buf + self->file_resp_tail,
sz,
sz,
&src_sz );
if( FD_UNLIKELY( err!=0 ) ) {
FD_LOG_WARNING(( "fd_io_write() failed (%d-%s) requested %lu bytes and wrote %lu bytes", err, fd_io_strerror( err ), write_sz, src_sz ));
if( FD_UNLIKELY( err!=EAGAIN && err!=0 ) ) {
FD_LOG_WARNING(( "fd_io_write() failed (%d-%s) requested %lu bytes and wrote %lu bytes", err, fd_io_strerror( err ), sz, src_sz ));
self->state = FD_SNAPSHOT_HTTPDL_STATE_FAIL;
fd_snapshot_httpdl_cleanup_fds( self );
return err;
}

*written_sz = src_sz;
return 0;
}

Expand Down Expand Up @@ -655,21 +661,30 @@ fd_snapshot_httpdl_write( fd_snapshot_httpdl_t * self,

/* write out to in memory buffer */
ulong write_sz = fd_ulong_min( avail_sz, dst_max );
fd_memcpy( dst, self->resp_buf + self->resp_tail, write_sz );
*sz = write_sz;
if( write_sz ) {
fd_memcpy( dst, self->resp_buf + self->resp_tail, write_sz );
*sz = write_sz;
}

ulong avail_file_sz = self->resp_head - self->file_resp_tail;
ulong file_write_sz = fd_ulong_min( avail_file_sz, dst_max );

/* save snapshot contents to file */
int err = fd_snapshot_httldl_write_snapshot_file( self, write_sz );
ulong file_written_sz = 0UL;
int err = fd_snapshot_httldl_write_snapshot_file( self, file_write_sz, &file_written_sz );
if( FD_UNLIKELY( err ) ) {
return err;
}

self->resp_tail += (uint)write_sz;
self->file_resp_tail += (uint)file_written_sz;
self->write_total += write_sz;
self->file_write_total += file_written_sz;
self->metrics.bytes_read = self->dl_total;

/* check if done downloading and writing */
if( self->content_len == self->write_total ) {
if( self->content_len == self->write_total &&
self->content_len == self->file_write_total ) {
FD_LOG_NOTICE(( "Wrote out all %lu MB", self->write_total>>20 ));

self->state = FD_SNAPSHOT_HTTPDL_STATE_DONE;
Expand All @@ -696,9 +711,10 @@ fd_snapshot_httpdl_dl( fd_snapshot_httpdl_t * self,
else return 0;
}

if( self->resp_head == self->resp_tail ) {
if( self->resp_head == self->resp_tail &&
self->resp_head == self->file_resp_tail ) {
/* Empty resp buffer means we can recv more bytes */
self->resp_tail = self->resp_head = 0UL;
self->resp_tail = self->file_resp_tail = self->resp_head = 0UL;
long recv_sz = recv( self->socket_fd, self->resp_buf,
fd_ulong_min( self->content_len - self->dl_total, FD_SNAPSHOT_HTTPDL_RESP_BUF_MAX ),
MSG_DONTWAIT );
Expand Down
2 changes: 2 additions & 0 deletions src/discof/restore/utils/fd_snapshot_httpdl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct fd_snapshot_httpdl {

uchar resp_buf[ FD_SNAPSHOT_HTTPDL_RESP_BUF_MAX ];
ulong resp_tail;
ulong file_resp_tail;
ulong resp_head;

/* value from "content-length:" */
Expand All @@ -85,6 +86,7 @@ struct fd_snapshot_httpdl {
/* Total written out so far */

ulong write_total;
ulong file_write_total;

/* full snapshot base slot used to verify incremental snapshot */
ulong base_slot;
Expand Down
4 changes: 2 additions & 2 deletions src/discof/restore/utils/fd_snapshot_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fd_snapshot_reader_new( void * mem,
void * full_snapshot_file_mem = FD_SCRATCH_ALLOC_APPEND( l,
fd_snapshot_file_align(),
fd_snapshot_file_footprint() );
int full_fd = open( full_snapshot_entry->filename, O_RDONLY|O_CLOEXEC );
int full_fd = open( full_snapshot_entry->filename, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
if( FD_UNLIKELY( full_fd<0 ) ) FD_LOG_ERR(( "open() failed (%i-%s)", errno, fd_io_strerror( errno ) ));

FD_LOG_NOTICE(( "Retrieving full snapshot from %s", full_snapshot_entry->filename ));
Expand All @@ -78,7 +78,7 @@ fd_snapshot_reader_new( void * mem,
void * incremental_snapshot_file_mem = FD_SCRATCH_ALLOC_APPEND( l,
fd_snapshot_file_align(),
fd_snapshot_file_footprint() );
int inc_fd = open( incremental_snapshot_entry->inner.filename, O_RDONLY|O_CLOEXEC );
int inc_fd = open( incremental_snapshot_entry->inner.filename, O_RDONLY|O_CLOEXEC|O_NONBLOCK );
if( FD_UNLIKELY( inc_fd<0 ) ) FD_LOG_ERR(( "open() failed (%i-%s)", errno, fd_io_strerror( errno ) ));

FD_LOG_NOTICE(( "Retrieving incremental snapshot from %s", incremental_snapshot_entry->inner.filename ));
Expand Down
5 changes: 3 additions & 2 deletions src/discof/restore/utils/fd_snapshot_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ fd_snapshot_reader_set_source_incremental( fd_snapshot_reader_t * self ) {
FD_TEST( self->full_src.this );

/* There can be only be three different snapshot source configurations:
- Both full and incremental snapshot files exist on disk locally
- A valid local full snapshot file exists but no local incremental snapshot file exists
- Both full and incremental snapshot files exist on disk locally.
- A valid local full snapshot file exists but no local incremental
snapshot file exists.
- Both the full and incremental snapshots must be downloaded */
if( self->full_src.src_type == SRC_FILE ) {
/* If the full snapshot was read from disk,
Expand Down
Loading