2828#include <plugins/askrene/layer.h>
2929#include <plugins/askrene/reserve.h>
3030#include <sys/wait.h>
31+ #include <wire/wire_io.h>
3132#include <wire/wire_sync.h>
3233
34+ struct router_child {
35+ /* Inside askrene->children */
36+ struct list_node list ;
37+ struct command * cmd ;
38+ struct timemono start ;
39+ int pid ;
40+ struct io_conn * log_conn ;
41+ struct io_conn * reply_conn ;
42+
43+ /* A whole msg read in for logging */
44+ u8 * log_msg ;
45+
46+ /* How much we've read so far */
47+ char * reply_buf ;
48+ size_t reply_bytes ;
49+
50+ /* How much we just read (populated by io_read_partial) */
51+ size_t this_reply_len ;
52+ };
53+
3354static bool have_layer (const char * * layers , const char * name )
3455{
3556 for (size_t i = 0 ; i < tal_count (layers ); i ++ ) {
@@ -372,30 +393,134 @@ static const struct layer **apply_layers(const tal_t *ctx,
372393 return layers ;
373394}
374395
375- static void process_child_logs (struct command * cmd ,
376- int log_fd )
396+ static struct command_result * reap_child (struct router_child * child )
397+ {
398+ int child_status ;
399+ struct timerel time_delta ;
400+ const char * err ;
401+
402+ waitpid (child -> pid , & child_status , 0 );
403+ time_delta = timemono_between (time_mono (), child -> start );
404+
405+ /* log the time of computation */
406+ cmd_log (tmpctx , child -> cmd , LOG_DBG , "get_routes %s %" PRIu64 " ms" ,
407+ WEXITSTATUS (child_status ) != 0 ? "failed after" : "completed in" ,
408+ time_to_msec (time_delta ));
409+
410+ if (WIFSIGNALED (child_status )) {
411+ err = tal_fmt (tmpctx , "child died with signal %u" ,
412+ WTERMSIG (child_status ));
413+ goto fail_broken ;
414+ }
415+
416+ /* This is how it indicates an error message */
417+ if (WEXITSTATUS (child_status ) != 0 && child -> reply_bytes ) {
418+ err = tal_strndup (child , child -> reply_buf , child -> reply_bytes );
419+ goto fail ;
420+ }
421+ if (child -> reply_bytes == 0 ) {
422+ err = tal_fmt (child , "child produced no output (exited %i)?" ,
423+ WEXITSTATUS (child_status ));
424+ goto fail_broken ;
425+ }
426+
427+ /* Frees child, since it's a child of cmd */
428+ return command_finish_rawstr (child -> cmd ,
429+ child -> reply_buf , child -> reply_bytes );
430+
431+ fail_broken :
432+ plugin_log (child -> cmd -> plugin , LOG_BROKEN , "%s" , err );
433+ fail :
434+ assert (err );
435+ /* Frees child, since it's a child of cmd */
436+ return command_fail (child -> cmd , PAY_ROUTE_NOT_FOUND , "%s" , err );
437+ }
438+
439+ /* Last one out finalizes */
440+ static void log_closed (struct io_conn * conn , struct router_child * child )
441+ {
442+ child -> log_conn = NULL ;
443+ if (child -> reply_conn == NULL )
444+ reap_child (child );
445+ }
446+
447+ static void reply_closed (struct io_conn * conn , struct router_child * child )
448+ {
449+ child -> reply_conn = NULL ;
450+ if (child -> log_conn == NULL )
451+ reap_child (child );
452+ }
453+
454+ static struct io_plan * log_msg_in (struct io_conn * conn ,
455+ struct router_child * child )
377456{
378- u8 * msg ;
379- while ((msg = wire_sync_read (tmpctx , log_fd )) != NULL ) {
380- enum log_level level ;
381- char * entry ;
382- struct node_id * peer ;
383- if (fromwire_status_log (tmpctx , msg , & level , & peer , & entry ))
384- cmd_log (tmpctx , cmd , level , "%s" , entry );
457+ enum log_level level ;
458+ char * entry ;
459+ struct node_id * peer ;
460+
461+ if (fromwire_status_log (tmpctx , child -> log_msg , & level , & peer , & entry ))
462+ cmd_log (tmpctx , child -> cmd , level , "%s" , entry );
463+ else {
464+ cmd_log (tmpctx , child -> cmd , LOG_BROKEN ,
465+ "unexpected non-log message %s" ,
466+ tal_hex (tmpctx , child -> log_msg ));
385467 }
468+ return io_read_wire (conn , child , & child -> log_msg , log_msg_in , child );
469+ }
470+
471+ static struct io_plan * child_log_init (struct io_conn * conn ,
472+ struct router_child * child )
473+ {
474+ io_set_finish (conn , log_closed , child );
475+ return io_read_wire (conn , child , & child -> log_msg , log_msg_in , child );
476+ }
477+
478+ static size_t remaining_read_len (const struct router_child * child )
479+ {
480+ return tal_bytelen (child -> reply_buf ) - child -> reply_bytes ;
481+ }
482+
483+ static struct io_plan * child_reply_in (struct io_conn * conn ,
484+ struct router_child * child )
485+ {
486+ child -> reply_bytes += child -> this_reply_len ;
487+ if (remaining_read_len (child ) < 64 )
488+ tal_resize (& child -> reply_buf , tal_bytelen (child -> reply_buf ) * 2 );
489+ return io_read_partial (conn ,
490+ child -> reply_buf + child -> reply_bytes ,
491+ remaining_read_len (child ),
492+ & child -> this_reply_len ,
493+ child_reply_in , child );
494+ }
495+
496+ static struct io_plan * child_reply_init (struct io_conn * conn ,
497+ struct router_child * child )
498+ {
499+ io_set_finish (conn , reply_closed , child );
500+ child -> reply_buf = tal_arr (child , char , 64 );
501+ child -> reply_bytes = 0 ;
502+ child -> this_reply_len = 0 ;
503+ return child_reply_in (conn , child );
504+ }
505+
506+ static void destroy_router_child (struct router_child * child )
507+ {
508+ list_del (& child -> list );
386509}
387510
388511static struct command_result * do_getroutes (struct command * cmd ,
389512 struct gossmap_localmods * localmods ,
390513 struct getroutes_info * info )
391514{
392515 struct askrene * askrene = get_askrene (cmd -> plugin );
393- const char * err , * json ;
394- struct timemono time_start , deadline ;
395- int child_fd , log_fd , child_pid , child_status ;
516+ const char * err ;
517+ struct timemono deadline ;
518+ int child_fd , log_fd ;
519+ struct router_child * child ;
396520 const struct layer * * layers ;
397521 s8 * biases ;
398522 fp16_t * capacities ;
523+ int ecode ;
399524
400525 /* update the gossmap */
401526 if (gossmap_refresh (askrene -> gossmap )) {
@@ -463,8 +588,9 @@ static struct command_result *do_getroutes(struct command *cmd,
463588 "maxparts == 1: switching to a single path algorithm." );
464589 }
465590
466- time_start = time_mono ();
467- deadline = timemono_add (time_start ,
591+ child = tal (cmd , struct router_child );
592+ child -> start = time_mono ();
593+ deadline = timemono_add (child -> start ,
468594 time_from_sec (askrene -> route_seconds ));
469595 child_fd = fork_router_child (askrene -> gossmap ,
470596 layers ,
@@ -475,47 +601,27 @@ static struct command_result *do_getroutes(struct command *cmd,
475601 info -> dev_algo == ALGO_SINGLE_PATH ,
476602 deadline , srcnode , dstnode , info -> amount ,
477603 info -> maxfee , info -> finalcltv , info -> maxdelay ,
478- info -> maxparts , cmd -> id , cmd -> filter , & log_fd , & child_pid );
479- if (child_fd == -1 ) {
480- err = tal_fmt (tmpctx , "failed to fork: %s" , strerror (errno ));
481- goto fail_broken ;
482- }
483-
484- /* FIXME: Go async! */
485- process_child_logs (cmd , log_fd );
486- close (log_fd );
487- json = grab_fd_str (cmd , child_fd );
488- close (child_fd );
489- waitpid (child_pid , & child_status , 0 );
490-
491- struct timerel time_delta = timemono_between (time_mono (), time_start );
492-
493- /* log the time of computation */
494- cmd_log (tmpctx , cmd , LOG_DBG , "get_routes %s %" PRIu64 " ms" ,
495- WEXITSTATUS (child_status ) != 0 ? "failed after" : "completed in" ,
496- time_to_msec (time_delta ));
604+ info -> maxparts , cmd -> id , cmd -> filter , & log_fd , & child -> pid );
605+ /* Save this, as remove_localmods won't preserve it. */
606+ ecode = errno ;
607+ /* We don't need this any more. */
608+ gossmap_remove_localmods (askrene -> gossmap , localmods );
497609
498- if (WIFSIGNALED (child_status )) {
499- err = tal_fmt (tmpctx , "child died with signal %u" ,
500- WTERMSIG (child_status ));
501- goto fail_broken ;
502- }
503- /* This is how it indicates an error message */
504- if (WEXITSTATUS (child_status ) != 0 && json ) {
505- err = json ;
506- goto fail ;
507- }
508- if (!json ) {
509- err = tal_fmt (tmpctx , "child produced no output (exited %i)?" ,
510- WEXITSTATUS (child_status ));
610+ if (child_fd == -1 ) {
611+ err = tal_fmt (tmpctx , "failed to fork: %s" , strerror (ecode ));
612+ gossmap_remove_localmods (askrene -> gossmap , localmods );
511613 goto fail_broken ;
512614 }
513615
514- /* At last we remove the localmods from the gossmap. */
515- gossmap_remove_localmods (askrene -> gossmap , localmods );
616+ child -> reply_conn = io_new_conn (child , child_fd ,
617+ child_reply_init , child );
618+ child -> log_conn = io_new_conn (child , log_fd , child_log_init , child );
619+ child -> cmd = cmd ;
516620
517- /* Child already created this fully formed. We just paste it */
518- return command_finish_rawstr (cmd , json , strlen (json ));
621+ /* FIXME: limit parallelism! */
622+ list_add_tail (& askrene -> children , & child -> list );
623+ tal_add_destructor (child , destroy_router_child );
624+ return command_still_pending (cmd );
519625
520626fail_broken :
521627 plugin_log (cmd -> plugin , LOG_BROKEN , "%s" , err );
@@ -1216,6 +1322,7 @@ static const char *init(struct command *init_cmd,
12161322
12171323 askrene -> plugin = plugin ;
12181324 list_head_init (& askrene -> layers );
1325+ list_head_init (& askrene -> children );
12191326 askrene -> reserved = new_reserve_htable (askrene );
12201327 askrene -> gossmap = gossmap_load (askrene , GOSSIP_STORE_FILENAME ,
12211328 plugin_gossmap_logcb , plugin );
0 commit comments