21
21
import torchx .specs as specs
22
22
from torchx .cli .argparse_util import ArgOnceAction , torchxconfig_run
23
23
from torchx .cli .cmd_base import SubCommand
24
+ from torchx .cli .cmd_log import get_logs
24
25
from torchx .runner import config , get_runner , Runner
25
26
from torchx .runner .config import load_sections
26
27
from torchx .schedulers import get_default_scheduler_name , get_scheduler_factories
@@ -186,6 +187,12 @@ def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
186
187
help = "optional parent run ID that this run belongs to."
187
188
" It can be used to group runs for experiment tracking purposes" ,
188
189
)
190
+ subparser .add_argument (
191
+ "--tee_logs" ,
192
+ action = "store_true" ,
193
+ default = False ,
194
+ help = "Add additional prefix to log lines to indicate which replica is printing the log" ,
195
+ )
189
196
subparser .add_argument (
190
197
"component_name_and_args" ,
191
198
nargs = argparse .REMAINDER ,
@@ -237,14 +244,18 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
237
244
print (app_handle )
238
245
239
246
if args .scheduler .startswith ("local" ):
240
- self ._wait_and_exit (runner , app_handle , log = True )
247
+ self ._wait_and_exit (
248
+ runner , app_handle , log = True , tee_logs = args .tee_logs
249
+ )
241
250
else :
242
251
logger .info (f"Launched app: { app_handle } " )
243
252
app_status = runner .status (app_handle )
244
253
if app_status :
245
254
logger .info (app_status .format ())
246
255
if args .wait or args .log :
247
- self ._wait_and_exit (runner , app_handle , log = args .log )
256
+ self ._wait_and_exit (
257
+ runner , app_handle , log = args .log , tee_logs = args .tee_logs
258
+ )
248
259
249
260
except (ComponentValidationException , ComponentNotFoundException ) as e :
250
261
error_msg = f"\n Failed to run component `{ component } ` got errors: \n { e } "
@@ -267,10 +278,16 @@ def run(self, args: argparse.Namespace) -> None:
267
278
with get_runner (component_defaults = component_defaults ) as runner :
268
279
self ._run (runner , args )
269
280
270
- def _wait_and_exit (self , runner : Runner , app_handle : str , log : bool ) -> None :
281
+ def _wait_and_exit (
282
+ self , runner : Runner , app_handle : str , log : bool , tee_logs : bool = False
283
+ ) -> None :
271
284
logger .info ("Waiting for the app to finish..." )
272
285
273
- log_thread = self ._start_log_thread (runner , app_handle ) if log else None
286
+ log_thread = (
287
+ self ._start_log_thread (runner , app_handle , tee_logs_enabled = tee_logs )
288
+ if log
289
+ else None
290
+ )
274
291
275
292
status = runner .wait (app_handle , wait_interval = 1 )
276
293
if not status :
@@ -287,15 +304,30 @@ def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None:
287
304
else :
288
305
logger .debug (status )
289
306
290
- def _start_log_thread (self , runner : Runner , app_handle : str ) -> threading .Thread :
291
- thread = tee_logs (
292
- dst = sys .stderr ,
293
- app_handle = app_handle ,
294
- regex = None ,
295
- runner = runner ,
296
- should_tail = True ,
297
- streams = None ,
298
- colorize = not sys .stderr .closed and sys .stderr .isatty (),
299
- )
307
+ def _start_log_thread (
308
+ self , runner : Runner , app_handle : str , tee_logs_enabled : bool = False
309
+ ) -> threading .Thread :
310
+ if tee_logs_enabled :
311
+ thread = tee_logs (
312
+ dst = sys .stderr ,
313
+ app_handle = app_handle ,
314
+ regex = None ,
315
+ runner = runner ,
316
+ should_tail = True ,
317
+ streams = None ,
318
+ colorize = not sys .stderr .closed and sys .stderr .isatty (),
319
+ )
320
+ else :
321
+ thread = threading .Thread (
322
+ target = get_logs ,
323
+ kwargs = {
324
+ "file" : sys .stderr ,
325
+ "runner" : runner ,
326
+ "identifier" : app_handle ,
327
+ "regex" : None ,
328
+ "should_tail" : True ,
329
+ },
330
+ )
331
+ thread .daemon = True
300
332
thread .start ()
301
333
return thread
0 commit comments