Skip to content

Commit

Permalink
netfs: Fix wait/wake to be consistent about the waitqueue used
Browse files Browse the repository at this point in the history
Fix further inconsistencies in the use of waitqueues
(clear_and_wake_up_bit() vs private waitqueue).

Move some of this stuff from the read and write sides into common code so
that it can be done in fewer places.

To make this work, async I/O needs to set NETFS_RREQ_OFFLOAD_COLLECTION to
indicate that a workqueue will do the collecting and places that call the
wait function need to deal with it returning the amount transferred.

Fixes: e2d46f2 ("netfs: Change the read result collector to only use one work item")
Signed-off-by: David Howells <[email protected]>
cc: Marc Dionne <[email protected]>
cc: Steve French <[email protected]>
cc: Ihor Solodrai <[email protected]>
cc: Eric Van Hensbergen <[email protected]>
cc: Latchesar Ionkov <[email protected]>
cc: Dominique Martinet <[email protected]>
cc: Christian Schoenebeck <[email protected]>
cc: Paulo Alcantara <[email protected]>
cc: Jeff Layton <[email protected]>
cc: [email protected]
cc: [email protected]
cc: [email protected]
cc: [email protected]
Signed-off-by: Steve French <[email protected]>
  • Loading branch information
dhowells authored and Steve French committed Feb 20, 2025
1 parent c90a1a2 commit ea87aa9
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 222 deletions.
2 changes: 1 addition & 1 deletion fs/netfs/buffered_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ static void netfs_read_to_pagecache(struct netfs_io_request *rreq)
if (unlikely(size > 0)) {
smp_wmb(); /* Write lists before ALL_QUEUED. */
set_bit(NETFS_RREQ_ALL_QUEUED, &rreq->flags);
netfs_wake_read_collector(rreq);
netfs_wake_collector(rreq);
}

/* Defer error return as we may need to wait for outstanding I/O. */
Expand Down
2 changes: 1 addition & 1 deletion fs/netfs/buffered_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ ssize_t netfs_perform_write(struct kiocb *iocb, struct iov_iter *iter,
wbc_detach_inode(&wbc);
if (ret2 == -EIOCBQUEUED)
return ret2;
if (ret == 0)
if (ret == 0 && ret2 < 0)
ret = ret2;
}

Expand Down
4 changes: 2 additions & 2 deletions fs/netfs/direct_read.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ static int netfs_dispatch_unbuffered_reads(struct netfs_io_request *rreq)
rreq->netfs_ops->issue_read(subreq);

if (test_bit(NETFS_RREQ_PAUSE, &rreq->flags))
netfs_wait_for_pause(rreq);
netfs_wait_for_paused_read(rreq);
if (test_bit(NETFS_RREQ_FAILED, &rreq->flags))
break;
if (test_bit(NETFS_RREQ_BLOCKED, &rreq->flags) &&
Expand All @@ -115,7 +115,7 @@ static int netfs_dispatch_unbuffered_reads(struct netfs_io_request *rreq)
if (unlikely(size > 0)) {
smp_wmb(); /* Write lists before ALL_QUEUED. */
set_bit(NETFS_RREQ_ALL_QUEUED, &rreq->flags);
netfs_wake_read_collector(rreq);
netfs_wake_collector(rreq);
}

return ret;
Expand Down
10 changes: 4 additions & 6 deletions fs/netfs/direct_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ ssize_t netfs_unbuffered_write_iter_locked(struct kiocb *iocb, struct iov_iter *
}

__set_bit(NETFS_RREQ_USE_IO_ITER, &wreq->flags);
if (async)
__set_bit(NETFS_RREQ_OFFLOAD_COLLECTION, &wreq->flags);

/* Copy the data into the bounce buffer and encrypt it. */
// TODO
Expand All @@ -105,13 +107,9 @@ ssize_t netfs_unbuffered_write_iter_locked(struct kiocb *iocb, struct iov_iter *

if (!async) {
trace_netfs_rreq(wreq, netfs_rreq_trace_wait_ip);
wait_on_bit(&wreq->flags, NETFS_RREQ_IN_PROGRESS,
TASK_UNINTERRUPTIBLE);
ret = wreq->error;
if (ret == 0) {
ret = wreq->transferred;
ret = netfs_wait_for_write(wreq);
if (ret > 0)
iocb->ki_pos += ret;
}
} else {
ret = -EIOCBQUEUED;
}
Expand Down
33 changes: 27 additions & 6 deletions fs/netfs/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ static inline void netfs_proc_del_rreq(struct netfs_io_request *rreq) {}
struct folio_queue *netfs_buffer_make_space(struct netfs_io_request *rreq,
enum netfs_folioq_trace trace);
void netfs_reset_iter(struct netfs_io_subrequest *subreq);
void netfs_wake_collector(struct netfs_io_request *rreq);
void netfs_subreq_clear_in_progress(struct netfs_io_subrequest *subreq);
void netfs_wait_for_in_progress_stream(struct netfs_io_request *rreq,
struct netfs_io_stream *stream);
ssize_t netfs_wait_for_read(struct netfs_io_request *rreq);
ssize_t netfs_wait_for_write(struct netfs_io_request *rreq);
void netfs_wait_for_paused_read(struct netfs_io_request *rreq);
void netfs_wait_for_paused_write(struct netfs_io_request *rreq);

/*
* objects.c
Expand Down Expand Up @@ -91,11 +99,9 @@ static inline void netfs_see_subrequest(struct netfs_io_subrequest *subreq,
/*
* read_collect.c
*/
bool netfs_read_collection(struct netfs_io_request *rreq);
void netfs_read_collection_worker(struct work_struct *work);
void netfs_wake_read_collector(struct netfs_io_request *rreq);
void netfs_cache_read_terminated(void *priv, ssize_t transferred_or_error);
ssize_t netfs_wait_for_read(struct netfs_io_request *rreq);
void netfs_wait_for_pause(struct netfs_io_request *rreq);

/*
* read_pgpriv2.c
Expand Down Expand Up @@ -175,8 +181,8 @@ static inline void netfs_stat_d(atomic_t *stat)
* write_collect.c
*/
int netfs_folio_written_back(struct folio *folio);
bool netfs_write_collection(struct netfs_io_request *wreq);
void netfs_write_collection_worker(struct work_struct *work);
void netfs_wake_write_collector(struct netfs_io_request *wreq);

/*
* write_issue.c
Expand All @@ -197,8 +203,8 @@ struct netfs_io_request *netfs_begin_writethrough(struct kiocb *iocb, size_t len
int netfs_advance_writethrough(struct netfs_io_request *wreq, struct writeback_control *wbc,
struct folio *folio, size_t copied, bool to_page_end,
struct folio **writethrough_cache);
int netfs_end_writethrough(struct netfs_io_request *wreq, struct writeback_control *wbc,
struct folio *writethrough_cache);
ssize_t netfs_end_writethrough(struct netfs_io_request *wreq, struct writeback_control *wbc,
struct folio *writethrough_cache);
int netfs_unbuffered_write(struct netfs_io_request *wreq, bool may_wait, size_t len);

/*
Expand Down Expand Up @@ -253,6 +259,21 @@ static inline void netfs_put_group_many(struct netfs_group *netfs_group, int nr)
netfs_group->free(netfs_group);
}

/*
* Clear and wake up a NETFS_RREQ_* flag bit on a request.
*/
static inline void netfs_wake_rreq_flag(struct netfs_io_request *rreq,
unsigned int rreq_flag,
enum netfs_rreq_trace trace)
{
if (test_bit(rreq_flag, &rreq->flags)) {
trace_netfs_rreq(rreq, trace);
clear_bit_unlock(rreq_flag, &rreq->flags);
smp_mb__after_atomic(); /* Set flag before task state */
wake_up(&rreq->waitq);
}
}

/*
* fscache-cache.c
*/
Expand Down
217 changes: 217 additions & 0 deletions fs/netfs/misc.c
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,220 @@ bool netfs_release_folio(struct folio *folio, gfp_t gfp)
return true;
}
EXPORT_SYMBOL(netfs_release_folio);

/*
* Wake the collection work item.
*/
void netfs_wake_collector(struct netfs_io_request *rreq)
{
if (test_bit(NETFS_RREQ_OFFLOAD_COLLECTION, &rreq->flags) &&
!test_bit(NETFS_RREQ_RETRYING, &rreq->flags)) {
queue_work(system_unbound_wq, &rreq->work);
} else {
trace_netfs_rreq(rreq, netfs_rreq_trace_wake_queue);
wake_up(&rreq->waitq);
}
}

/*
* Mark a subrequest as no longer being in progress and, if need be, wake the
* collector.
*/
void netfs_subreq_clear_in_progress(struct netfs_io_subrequest *subreq)
{
struct netfs_io_request *rreq = subreq->rreq;
struct netfs_io_stream *stream = &rreq->io_streams[subreq->stream_nr];

clear_bit_unlock(NETFS_SREQ_IN_PROGRESS, &subreq->flags);
smp_mb__after_atomic(); /* Clear IN_PROGRESS before task state */

/* If we are at the head of the queue, wake up the collector. */
if (list_is_first(&subreq->rreq_link, &stream->subrequests) ||
test_bit(NETFS_RREQ_RETRYING, &rreq->flags))
netfs_wake_collector(rreq);
}

/*
* Wait for all outstanding I/O in a stream to quiesce.
*/
void netfs_wait_for_in_progress_stream(struct netfs_io_request *rreq,
struct netfs_io_stream *stream)
{
struct netfs_io_subrequest *subreq;
DEFINE_WAIT(myself);

list_for_each_entry(subreq, &stream->subrequests, rreq_link) {
if (!test_bit(NETFS_SREQ_IN_PROGRESS, &subreq->flags))
continue;

trace_netfs_rreq(rreq, netfs_rreq_trace_wait_queue);
for (;;) {
prepare_to_wait(&rreq->waitq, &myself, TASK_UNINTERRUPTIBLE);

if (!test_bit(NETFS_SREQ_IN_PROGRESS, &subreq->flags))
break;

trace_netfs_sreq(subreq, netfs_sreq_trace_wait_for);
schedule();
trace_netfs_rreq(rreq, netfs_rreq_trace_woke_queue);
}
}

finish_wait(&rreq->waitq, &myself);
}

/*
* Perform collection in app thread if not offloaded to workqueue.
*/
static int netfs_collect_in_app(struct netfs_io_request *rreq,
bool (*collector)(struct netfs_io_request *rreq))
{
bool need_collect = false, inactive = true;

for (int i = 0; i < NR_IO_STREAMS; i++) {
struct netfs_io_subrequest *subreq;
struct netfs_io_stream *stream = &rreq->io_streams[i];

if (!stream->active)
continue;
inactive = false;
trace_netfs_collect_stream(rreq, stream);
subreq = list_first_entry_or_null(&stream->subrequests,
struct netfs_io_subrequest,
rreq_link);
if (subreq &&
(!test_bit(NETFS_SREQ_IN_PROGRESS, &subreq->flags) ||
test_bit(NETFS_SREQ_MADE_PROGRESS, &subreq->flags))) {
need_collect = true;
break;
}
}

if (!need_collect && !inactive)
return 0; /* Sleep */

__set_current_state(TASK_RUNNING);
if (collector(rreq)) {
/* Drop the ref from the NETFS_RREQ_IN_PROGRESS flag. */
netfs_put_request(rreq, netfs_rreq_trace_put_work_ip);
return 1; /* Done */
}

if (inactive) {
WARN(true, "Failed to collect inactive req R=%08x\n",
rreq->debug_id);
cond_resched();
}
return 2; /* Again */
}

/*
* Wait for a request to complete, successfully or otherwise.
*/
static ssize_t netfs_wait_for_request(struct netfs_io_request *rreq,
bool (*collector)(struct netfs_io_request *rreq))
{
DEFINE_WAIT(myself);
ssize_t ret;

for (;;) {
trace_netfs_rreq(rreq, netfs_rreq_trace_wait_queue);
prepare_to_wait(&rreq->waitq, &myself, TASK_UNINTERRUPTIBLE);

if (!test_bit(NETFS_RREQ_OFFLOAD_COLLECTION, &rreq->flags)) {
switch (netfs_collect_in_app(rreq, collector)) {
case 0:
break;
case 1:
goto all_collected;
case 2:
continue;
}
}

if (!test_bit(NETFS_RREQ_IN_PROGRESS, &rreq->flags))
break;

schedule();
trace_netfs_rreq(rreq, netfs_rreq_trace_woke_queue);
}

all_collected:
finish_wait(&rreq->waitq, &myself);

ret = rreq->error;
if (ret == 0) {
ret = rreq->transferred;
switch (rreq->origin) {
case NETFS_DIO_READ:
case NETFS_DIO_WRITE:
case NETFS_READ_SINGLE:
break;
default:
if (rreq->submitted < rreq->len) {
trace_netfs_failure(rreq, NULL, ret, netfs_fail_short_read);
ret = -EIO;
}
break;
}
}

return ret;
}

ssize_t netfs_wait_for_read(struct netfs_io_request *rreq)
{
return netfs_wait_for_request(rreq, netfs_read_collection);
}

ssize_t netfs_wait_for_write(struct netfs_io_request *rreq)
{
return netfs_wait_for_request(rreq, netfs_write_collection);
}

/*
* Wait for a paused operation to unpause or complete in some manner.
*/
static void netfs_wait_for_pause(struct netfs_io_request *rreq,
bool (*collector)(struct netfs_io_request *rreq))
{
DEFINE_WAIT(myself);

trace_netfs_rreq(rreq, netfs_rreq_trace_wait_pause);

for (;;) {
trace_netfs_rreq(rreq, netfs_rreq_trace_wait_queue);
prepare_to_wait(&rreq->waitq, &myself, TASK_UNINTERRUPTIBLE);

if (!test_bit(NETFS_RREQ_OFFLOAD_COLLECTION, &rreq->flags)) {
switch (netfs_collect_in_app(rreq, collector)) {
case 0:
break;
case 1:
goto all_collected;
case 2:
continue;
}
}

if (!test_bit(NETFS_RREQ_IN_PROGRESS, &rreq->flags) ||
!test_bit(NETFS_RREQ_PAUSE, &rreq->flags))
break;

schedule();
trace_netfs_rreq(rreq, netfs_rreq_trace_woke_queue);
}

all_collected:
finish_wait(&rreq->waitq, &myself);
}

void netfs_wait_for_paused_read(struct netfs_io_request *rreq)
{
return netfs_wait_for_pause(rreq, netfs_read_collection);
}

void netfs_wait_for_paused_write(struct netfs_io_request *rreq)
{
return netfs_wait_for_pause(rreq, netfs_write_collection);
}
Loading

0 comments on commit ea87aa9

Please sign in to comment.