Skip to content

Commit 3b11750

Browse files
committed
askrene: actually run children in parallel.
Changelog-Changed: Plugins: `askrene` now runs routing in parallel. Signed-off-by: Rusty Russell <[email protected]>
1 parent 8eaa4a9 commit 3b11750

File tree

2 files changed

+159
-50
lines changed

2 files changed

+159
-50
lines changed

plugins/askrene/askrene.c

Lines changed: 157 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,29 @@
2828
#include <plugins/askrene/layer.h>
2929
#include <plugins/askrene/reserve.h>
3030
#include <sys/wait.h>
31+
#include <wire/wire_io.h>
3132
#include <wire/wire_sync.h>
3233

34+
struct router_child {
35+
/* Inside askrene->children */
36+
struct list_node list;
37+
struct command *cmd;
38+
struct timemono start;
39+
int pid;
40+
struct io_conn *log_conn;
41+
struct io_conn *reply_conn;
42+
43+
/* A whole msg read in for logging */
44+
u8 *log_msg;
45+
46+
/* How much we've read so far */
47+
char *reply_buf;
48+
size_t reply_bytes;
49+
50+
/* How much we just read (populated by io_read_partial) */
51+
size_t this_reply_len;
52+
};
53+
3354
static bool have_layer(const char **layers, const char *name)
3455
{
3556
for (size_t i = 0; i < tal_count(layers); i++) {
@@ -372,30 +393,134 @@ static const struct layer **apply_layers(const tal_t *ctx,
372393
return layers;
373394
}
374395

375-
static void process_child_logs(struct command *cmd,
376-
int log_fd)
396+
static struct command_result *reap_child(struct router_child *child)
397+
{
398+
int child_status;
399+
struct timerel time_delta;
400+
const char *err;
401+
402+
waitpid(child->pid, &child_status, 0);
403+
time_delta = timemono_between(time_mono(), child->start);
404+
405+
/* log the time of computation */
406+
cmd_log(tmpctx, child->cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms",
407+
WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in",
408+
time_to_msec(time_delta));
409+
410+
if (WIFSIGNALED(child_status)) {
411+
err = tal_fmt(tmpctx, "child died with signal %u",
412+
WTERMSIG(child_status));
413+
goto fail_broken;
414+
}
415+
416+
/* This is how it indicates an error message */
417+
if (WEXITSTATUS(child_status) != 0 && child->reply_bytes) {
418+
err = tal_strndup(child, child->reply_buf, child->reply_bytes);
419+
goto fail;
420+
}
421+
if (child->reply_bytes == 0) {
422+
err = tal_fmt(child, "child produced no output (exited %i)?",
423+
WEXITSTATUS(child_status));
424+
goto fail_broken;
425+
}
426+
427+
/* Frees child, since it's a child of cmd */
428+
return command_finish_rawstr(child->cmd,
429+
child->reply_buf, child->reply_bytes);
430+
431+
fail_broken:
432+
plugin_log(child->cmd->plugin, LOG_BROKEN, "%s", err);
433+
fail:
434+
assert(err);
435+
/* Frees child, since it's a child of cmd */
436+
return command_fail(child->cmd, PAY_ROUTE_NOT_FOUND, "%s", err);
437+
}
438+
439+
/* Last one out finalizes */
440+
static void log_closed(struct io_conn *conn, struct router_child *child)
441+
{
442+
child->log_conn = NULL;
443+
if (child->reply_conn == NULL)
444+
reap_child(child);
445+
}
446+
447+
static void reply_closed(struct io_conn *conn, struct router_child *child)
448+
{
449+
child->reply_conn = NULL;
450+
if (child->log_conn == NULL)
451+
reap_child(child);
452+
}
453+
454+
static struct io_plan *log_msg_in(struct io_conn *conn,
455+
struct router_child *child)
377456
{
378-
u8 *msg;
379-
while ((msg = wire_sync_read(tmpctx, log_fd)) != NULL) {
380-
enum log_level level;
381-
char *entry;
382-
struct node_id *peer;
383-
if (fromwire_status_log(tmpctx, msg, &level, &peer, &entry))
384-
cmd_log(tmpctx, cmd, level, "%s", entry);
457+
enum log_level level;
458+
char *entry;
459+
struct node_id *peer;
460+
461+
if (fromwire_status_log(tmpctx, child->log_msg, &level, &peer, &entry))
462+
cmd_log(tmpctx, child->cmd, level, "%s", entry);
463+
else {
464+
cmd_log(tmpctx, child->cmd, LOG_BROKEN,
465+
"unexpected non-log message %s",
466+
tal_hex(tmpctx, child->log_msg));
385467
}
468+
return io_read_wire(conn, child, &child->log_msg, log_msg_in, child);
469+
}
470+
471+
static struct io_plan *child_log_init(struct io_conn *conn,
472+
struct router_child *child)
473+
{
474+
io_set_finish(conn, log_closed, child);
475+
return io_read_wire(conn, child, &child->log_msg, log_msg_in, child);
476+
}
477+
478+
static size_t remaining_read_len(const struct router_child *child)
479+
{
480+
return tal_bytelen(child->reply_buf) - child->reply_bytes;
481+
}
482+
483+
static struct io_plan *child_reply_in(struct io_conn *conn,
484+
struct router_child *child)
485+
{
486+
child->reply_bytes += child->this_reply_len;
487+
if (remaining_read_len(child) < 64)
488+
tal_resize(&child->reply_buf, tal_bytelen(child->reply_buf) * 2);
489+
return io_read_partial(conn,
490+
child->reply_buf + child->reply_bytes,
491+
remaining_read_len(child),
492+
&child->this_reply_len,
493+
child_reply_in, child);
494+
}
495+
496+
static struct io_plan *child_reply_init(struct io_conn *conn,
497+
struct router_child *child)
498+
{
499+
io_set_finish(conn, reply_closed, child);
500+
child->reply_buf = tal_arr(child, char, 64);
501+
child->reply_bytes = 0;
502+
child->this_reply_len = 0;
503+
return child_reply_in(conn, child);
504+
}
505+
506+
static void destroy_router_child(struct router_child *child)
507+
{
508+
list_del(&child->list);
386509
}
387510

388511
static struct command_result *do_getroutes(struct command *cmd,
389512
struct gossmap_localmods *localmods,
390513
struct getroutes_info *info)
391514
{
392515
struct askrene *askrene = get_askrene(cmd->plugin);
393-
const char *err, *json;
394-
struct timemono time_start, deadline;
395-
int child_fd, log_fd, child_pid, child_status;
516+
const char *err;
517+
struct timemono deadline;
518+
int child_fd, log_fd;
519+
struct router_child *child;
396520
const struct layer **layers;
397521
s8 *biases;
398522
fp16_t *capacities;
523+
int ecode;
399524

400525
/* update the gossmap */
401526
if (gossmap_refresh(askrene->gossmap)) {
@@ -463,8 +588,9 @@ static struct command_result *do_getroutes(struct command *cmd,
463588
"maxparts == 1: switching to a single path algorithm.");
464589
}
465590

466-
time_start = time_mono();
467-
deadline = timemono_add(time_start,
591+
child = tal(cmd, struct router_child);
592+
child->start = time_mono();
593+
deadline = timemono_add(child->start,
468594
time_from_sec(askrene->route_seconds));
469595
child_fd = fork_router_child(askrene->gossmap,
470596
layers,
@@ -475,47 +601,27 @@ static struct command_result *do_getroutes(struct command *cmd,
475601
info->dev_algo == ALGO_SINGLE_PATH,
476602
deadline, srcnode, dstnode, info->amount,
477603
info->maxfee, info->finalcltv, info->maxdelay,
478-
info->maxparts, cmd->id, cmd->filter, &log_fd, &child_pid);
479-
if (child_fd == -1) {
480-
err = tal_fmt(tmpctx, "failed to fork: %s", strerror(errno));
481-
goto fail_broken;
482-
}
483-
484-
/* FIXME: Go async! */
485-
process_child_logs(cmd, log_fd);
486-
close(log_fd);
487-
json = grab_fd_str(cmd, child_fd);
488-
close(child_fd);
489-
waitpid(child_pid, &child_status, 0);
490-
491-
struct timerel time_delta = timemono_between(time_mono(), time_start);
492-
493-
/* log the time of computation */
494-
cmd_log(tmpctx, cmd, LOG_DBG, "get_routes %s %" PRIu64 " ms",
495-
WEXITSTATUS(child_status) != 0 ? "failed after" : "completed in",
496-
time_to_msec(time_delta));
604+
info->maxparts, cmd->id, cmd->filter, &log_fd, &child->pid);
605+
/* Save this, as remove_localmods won't preserve it. */
606+
ecode = errno;
607+
/* We don't need this any more. */
608+
gossmap_remove_localmods(askrene->gossmap, localmods);
497609

498-
if (WIFSIGNALED(child_status)) {
499-
err = tal_fmt(tmpctx, "child died with signal %u",
500-
WTERMSIG(child_status));
501-
goto fail_broken;
502-
}
503-
/* This is how it indicates an error message */
504-
if (WEXITSTATUS(child_status) != 0 && json) {
505-
err = json;
506-
goto fail;
507-
}
508-
if (!json) {
509-
err = tal_fmt(tmpctx, "child produced no output (exited %i)?",
510-
WEXITSTATUS(child_status));
610+
if (child_fd == -1) {
611+
err = tal_fmt(tmpctx, "failed to fork: %s", strerror(ecode));
612+
gossmap_remove_localmods(askrene->gossmap, localmods);
511613
goto fail_broken;
512614
}
513615

514-
/* At last we remove the localmods from the gossmap. */
515-
gossmap_remove_localmods(askrene->gossmap, localmods);
616+
child->reply_conn = io_new_conn(child, child_fd,
617+
child_reply_init, child);
618+
child->log_conn = io_new_conn(child, log_fd, child_log_init, child);
619+
child->cmd = cmd;
516620

517-
/* Child already created this fully formed. We just paste it */
518-
return command_finish_rawstr(cmd, json, strlen(json));
621+
/* FIXME: limit parallelism! */
622+
list_add_tail(&askrene->children, &child->list);
623+
tal_add_destructor(child, destroy_router_child);
624+
return command_still_pending(cmd);
519625

520626
fail_broken:
521627
plugin_log(cmd->plugin, LOG_BROKEN, "%s", err);
@@ -1216,6 +1322,7 @@ static const char *init(struct command *init_cmd,
12161322

12171323
askrene->plugin = plugin;
12181324
list_head_init(&askrene->layers);
1325+
list_head_init(&askrene->children);
12191326
askrene->reserved = new_reserve_htable(askrene);
12201327
askrene->gossmap = gossmap_load(askrene, GOSSIP_STORE_FILENAME,
12211328
plugin_gossmap_logcb, plugin);

plugins/askrene/askrene.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ struct askrene {
2828
struct command *layer_cmd;
2929
/* How long before we abort trying to find a route? */
3030
u32 route_seconds;
31+
/* Routing children currently in flight. */
32+
struct list_head children;
3133
};
3234

3335
/* Useful plugin->askrene mapping */

0 commit comments

Comments
 (0)