54
54
55
55
from sonLib .nxnewick import NXNewick
56
56
from sonLib .bioio import getTempDirectory , getTempFile , catFiles
57
-
57
+
58
+ rgfa_sample_name = '_rGFA_'
59
+
58
60
def main ():
59
61
parser = ArgumentParser ()
60
62
Job .Runner .addToilOptions (parser )
@@ -112,7 +114,7 @@ def graphmap_join_options(parser):
112
114
113
115
parser .add_argument ("--gfa" , nargs = '*' , default = None , help = "Produce a GFA for given graph type(s) if specified. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default]" )
114
116
115
- parser .add_argument ("--rgfa" , nargs = '*' , default = None , help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. [--gfa clip assumed by default] " )
117
+ parser .add_argument ("--rgfa" , nargs = '*' , default = None , help = "Produce a rGFA cover for given graph type(s) if specified. The rGFA cover will be output as a separate .rgfa.gz file, as well as included in the .gbz is reference path segements. This cover will be based on the (first) reference sample only. Valid types are 'full', 'clip', and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space." )
116
118
117
119
parser .add_argument ("--gbz" , nargs = '*' , default = None , help = "Generate GBZ/snarls indexes for the given graph type(s) if specified. Valid types are 'full', 'clip' and 'filter'. If no type specified 'clip' will be used ('full' used if clipping disabled). Multiple types can be provided separated by a space. --giraffe will also produce these (and other) indexes" )
118
120
@@ -176,9 +178,9 @@ def graphmap_join_validate_options(options):
176
178
if gfa == 'filter' and not options .filter :
177
179
raise RuntimeError ('--gfa cannot be set to filter since filtering is disabled' )
178
180
179
- if not options .rgfa :
181
+ if options .rgfa == [] :
180
182
options .rgfa = ['clip' ] if options .clip else ['full' ]
181
- options .rgfa = list (set (options .rgfa ))
183
+ options .rgfa = list (set (options .rgfa )) if options . rgfa else []
182
184
for rgfa in options .rgfa :
183
185
if rgfa not in ['clip' , 'filter' , 'full' ]:
184
186
raise RuntimeError ('Unrecognized value for --gfa: {}. Must be one of {clip, filter, full}' .format (gfa ))
@@ -383,18 +385,29 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
383
385
full_vg_ids = [join_job .rv (i ) for i in range (len (vg_ids ))]
384
386
prev_job = join_job
385
387
388
+ # by default, we don't leave rGFA path fragments in the chromosome graphs, they will only be in the .gbz and .rgfa.gz output
389
+ drop_rgfa_fragments = options .rgfa and not getOptionalAttrib (findRequiredNode (config .xmlRoot , "graphmap_join" ), "keepRGFAFragmentsInChromGraphs" , typeFn = bool , default = False )
390
+
386
391
# take out the _MINIGRAPH_ paths
387
392
if 'full' in options .chrom_vg :
388
393
output_full_vg_ids = []
389
394
for vg_path , vg_id , full_vg_id in zip (options .vg , vg_ids , full_vg_ids ):
390
395
drop_graph_event_job = join_job .addFollowOnJobFn (drop_graph_event , config , vg_path , full_vg_id ,
391
396
disk = vg_id .size * 3 , memory = vg_id .size * 6 )
392
- output_full_vg_ids .append (drop_graph_event_job .rv ())
397
+ if drop_rgfa_fragments :
398
+ # take out the rGFA path fragments
399
+ drop_rgfa_job = drop_graph_event_job .addFollowOnJobFn (drop_rgfa_path_fragments , vg_path , drop_graph_event_job .rv (),
400
+ disk = vg_id .size * 3 , memory = vg_id .size & 6 )
401
+ output_full_vg_ids .append (drop_rgfa_job .rv ())
402
+ else :
403
+ output_full_vg_ids .append (drop_graph_event_job .rv ())
393
404
else :
394
405
output_full_vg_ids = full_vg_ids
406
+
395
407
396
408
# run the "clip" phase to do the clip-vg clipping
397
409
clip_vg_ids = []
410
+ output_clip_vg_ids = []
398
411
clipped_stats = None
399
412
if options .clip or options .filter :
400
413
clip_root_job = Job ()
@@ -411,13 +424,19 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
411
424
clip_og_job = clip_job .addFollowOnJobFn (vg_to_og , options , config , vg_path , clip_job .rv (0 ),
412
425
disk = input_vg_id .size * 16 , memory = max (og_min_size , input_vg_id .size * 32 ))
413
426
og_chrom_ids ['clip' ]['og' ].append (clip_og_job .rv ())
427
+ if drop_rgfa_fragments :
428
+ drop_rgfa_job = clip_job .addFollowOnJobFn (drop_rgfa_path_fragments , vg_path , clip_job .rv (0 ),
429
+ disk = input_vg_id .size * 3 , memory = input_vg_id .size & 6 )
430
+ output_clip_vg_ids .append (drop_rgfa_job .rv ())
431
+
414
432
415
433
# join the stats
416
434
clipped_stats = clip_root_job .addFollowOnJobFn (cat_stats , clip_vg_stats ).rv ()
417
435
prev_job = clip_root_job
418
436
419
437
# run the "filter" phase to do the vg clip clipping
420
438
filter_vg_ids = []
439
+ output_filter_vg_ids = []
421
440
if options .filter :
422
441
filter_root_job = Job ()
423
442
prev_job .addFollowOn (filter_root_job )
@@ -429,8 +448,11 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
429
448
if 'filter' in options .odgi + options .chrom_og + options .viz + options .draw :
430
449
filter_og_job = filter_job .addFollowOnJobFn (vg_to_og , options , config , vg_path , filter_job .rv (),
431
450
disk = input_vg_id .size * 16 , memory = max (og_min_size , input_vg_id .size * 64 ))
432
- og_chrom_ids ['filter' ]['og' ].append (filter_og_job .rv ())
433
-
451
+ og_chrom_ids ['filter' ]['og' ].append (filter_og_job .rv ())
452
+ if drop_rgfa_fragments :
453
+ drop_rgfa_job = filter_job .addFollowOnJobFn (drop_rgfa_path_fragments , vg_path , filter_job .rv (),
454
+ disk = input_vg_id .size * 3 , memory = input_vg_id .size & 6 )
455
+ output_filter_vg_ids .append (drop_rgfa_job .rv ())
434
456
435
457
prev_job = filter_root_job
436
458
@@ -475,6 +497,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
475
497
rgfa_ids = []
476
498
current_out_dict = None
477
499
do_gbz = workflow_phase in options .gbz + options .vcf + options .giraffe
500
+ do_rgfa = workflow_phase in options .rgfa
478
501
if do_gbz or workflow_phase in options .gfa :
479
502
assert len (options .vg ) == len (phase_vg_ids ) == len (vg_ids )
480
503
for vg_path , vg_id , input_vg_id in zip (options .vg , phase_vg_ids , vg_ids ):
@@ -485,9 +508,9 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
485
508
486
509
gfa_merge_job = gfa_root_job .addFollowOnJobFn (make_vg_indexes , options , config , gfa_ids ,
487
510
tag = workflow_phase + '.' ,
488
- do_gbz = do_gbz ,
511
+ do_gbz = do_gbz , do_rgfa = do_rgfa ,
489
512
cores = options .indexCores ,
490
- disk = sum (f .size for f in vg_ids ) * 6 ,
513
+ disk = sum (f .size for f in vg_ids ) * 16 ,
491
514
memory = index_mem )
492
515
out_dicts .append (gfa_merge_job .rv ())
493
516
prev_job = gfa_merge_job
@@ -519,6 +542,16 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
519
542
memory = index_mem )
520
543
out_dicts .append (deconstruct_job .rv ())
521
544
545
+ if do_rgfa and vcf_ref == options .vcfReference [0 ]:
546
+ rgfa_deconstruct_job = prev_job .addFollowOnJobFn (make_vcf , config , options .outName , vcf_ref , current_out_dict ,
547
+ max_ref_allele = options .vcfbub ,
548
+ tag = vcftag + '.' , ref_tag = workflow_phase + '.' ,
549
+ do_rgfa = True ,
550
+ cores = options .indexCores ,
551
+ disk = sum (f .size for f in vg_ids ) * 6 ,
552
+ memory = index_mem )
553
+ out_dicts .append (rgfa_deconstruct_job .rv ())
554
+
522
555
# optional giraffe
523
556
if workflow_phase in options .giraffe :
524
557
giraffe_job = gfa_merge_job .addFollowOnJobFn (make_giraffe_indexes , options , config , current_out_dict ,
@@ -555,7 +588,7 @@ def graphmap_join_workflow(job, options, config, vg_ids, hal_ids):
555
588
if do_draw :
556
589
og_chrom_ids [workflow_phase ]['draw' ].append (viz_job .rv (1 ) if viz_job else None )
557
590
558
- return output_full_vg_ids , clip_vg_ids , clipped_stats , filter_vg_ids , out_dicts , og_chrom_ids
591
+ return output_full_vg_ids , output_clip_vg_ids , clipped_stats , output_filter_vg_ids , out_dicts , og_chrom_ids
559
592
560
593
def clip_vg (job , options , config , vg_path , vg_id , phase ):
561
594
""" run clip-vg
@@ -755,6 +788,16 @@ def drop_graph_event(job, config, vg_path, full_vg_id):
755
788
756
789
cactus_call (parameters = ['vg' , 'paths' , '-d' , '-S' , graph_event , '-x' , full_vg_path ], outfile = out_path )
757
790
return job .fileStore .writeGlobalFile (out_path )
791
+
792
+ def drop_rgfa_path_fragments (job , vg_path , vg_id ):
793
+ """ drop out rgfa path fragments """
794
+ work_dir = job .fileStore .getLocalTempDir ()
795
+ vg_path = os .path .join (work_dir , os .path .splitext (os .path .basename (vg_path ))[0 ]) + '.vg'
796
+ out_vg_path = os .path .join (work_dir , os .path .splitext (os .path .basename (vg_path ))[0 ]) + 'norgfa.vg'
797
+ job .fileStore .readGlobalFile (vg_id , vg_path )
798
+
799
+ cactus_call (parameters = ['vg' , 'paths' , '-d' , '-S' , rgfa_sample_name , '-v' , vg_path ], outfile = out_vg_path )
800
+ return job .fileStore .writeGlobalFile (out_vg_path )
758
801
759
802
def vg_to_gfa (job , options , config , vg_path , vg_id , rgfa = False ):
760
803
""" run gfa conversion """
@@ -809,7 +852,7 @@ def merge_rgfa(job, options, config, rgfa_ids, tag=''):
809
852
810
853
return { '{}rgfa.gz' .format (tag ) : job .fileStore .writeGlobalFile (merge_rgfa_path + '.gz' ) }
811
854
812
- def make_vg_indexes (job , options , config , gfa_ids , tag = "" , do_gbz = False ):
855
+ def make_vg_indexes (job , options , config , gfa_ids , tag = "" , do_gbz = False , do_rgfa = False ):
813
856
""" merge of the gfas, then make gbz / snarls / trans
814
857
"""
815
858
work_dir = job .fileStore .getLocalTempDir ()
@@ -825,8 +868,8 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
825
868
job .fileStore .readGlobalFile (gfa_id , gfa_path , mutable = True )
826
869
if i == 0 :
827
870
# make sure every --reference sample is in the GFA header RS tag (which won't be the case if one sample
828
- # is completely missing from the first graph, as vg may filter it out apparently)
829
- cmd = [['head' , '-1' , gfa_path ], ['sed' , '-e' , '1s/{} //' .format (graph_event )]]
871
+ # is completely missing from the first graph, as vg may filter it out apparently)
872
+ cmd = [['head' , '-1' , gfa_path ], ['sed' , '-e' , '1s/ {}//' . format ( graph_event ), '-e' , '1s/{} //' .format (graph_event )]]
830
873
gfa_header = cactus_call (parameters = cmd , check_output = True ).strip ().split ('\t ' )
831
874
for i in range (len (gfa_header )):
832
875
if gfa_header [i ].startswith ('RS:Z:' ):
@@ -841,44 +884,59 @@ def make_vg_indexes(job, options, config, gfa_ids, tag="", do_gbz=False):
841
884
cactus_call (parameters = cmd , outfile = merge_gfa_path , outappend = True , job_memory = job .memory )
842
885
job .fileStore .deleteGlobalFile (gfa_id )
843
886
844
- # make the gbz
887
+ out_dict = {}
888
+
889
+ if do_rgfa :
890
+ if do_gbz :
891
+ # make a GBZ that includes the rGFA fragments
892
+ rgfa_gbz_path = os .path .join (work_dir , '{}merged.rgfa.gbz' .format (tag ))
893
+ cactus_call (parameters = ['vg' , 'gbwt' , '-G' , merge_gfa_path , '--gbz-format' , '-g' , rgfa_gbz_path ], job_memory = job .memory )
894
+ out_dict ['{}rgfa.gbz' .format (tag )] = job .fileStore .writeGlobalFile (rgfa_gbz_path )
895
+
896
+ # remove rGFA paths from the GFA going forward
897
+ cactus_call (parameters = [['grep' , '-v' , '^W\t {}' .format (rgfa_sample_name ), merge_gfa_path ],
898
+ ['sed' , '-e' , '1s/ {}//g' .format (rgfa_sample_name ), '-e' , '1s/{} //g' .format (rgfa_sample_name )]],
899
+ outfile = merge_gfa_path + '.clean' )
900
+ cactus_call (parameters = ['mv' , merge_gfa_path + '.clean' , merge_gfa_path ])
901
+
902
+ # make the gbz without the rGFA fragments
845
903
if do_gbz :
846
904
gbz_path = os .path .join (work_dir , '{}merged.gbz' .format (tag ))
847
905
cactus_call (parameters = ['vg' , 'gbwt' , '-G' , merge_gfa_path , '--gbz-format' , '-g' , gbz_path ], job_memory = job .memory )
848
-
849
- # zip the gfa and remove any rgfa paths (they will be kept in the GBZ and .rgfa
850
- # but not here as it would be just too confusing)
851
- rgfa_sample_name = '_rGFA_'
852
- gfa_path = merge_gfa_path + '.gz'
853
- cactus_call (parameters = [['grep' , '-v' , '^W\t {}' .format (rgfa_sample_name ), merge_gfa_path ],
854
- ['bgzip' , '--threads' , str (job .cores )]],
855
- outfile = gfa_path )
856
- os .remove (merge_gfa_path )
857
-
906
+ out_dict ['{}gbz' .format (tag )] = job .fileStore .writeGlobalFile (gbz_path )
907
+
908
+ # zip the gfa
909
+ cactus_call (parameters = ['bgzip' , '--threads' , str (job .cores ), merge_gfa_path ])
910
+ out_dict ['{}gfa.gz' .format (tag )] = job .fileStore .writeGlobalFile (merge_gfa_path + '.gz' )
858
911
859
912
# make the snarls
860
913
if do_gbz :
861
914
snarls_path = os .path .join (work_dir , '{}merged.snarls' .format (tag ))
862
915
cactus_call (parameters = ['vg' , 'snarls' , gbz_path , '-T' , '-t' , str (job .cores )], outfile = snarls_path , job_memory = job .memory )
916
+ out_dict ['{}snarls' .format (tag )] = job .fileStore .writeGlobalFile (snarls_path )
863
917
864
- out_dict = { '{}gfa.gz' .format (tag ) : job .fileStore .writeGlobalFile (gfa_path ) }
865
918
if do_gbz :
866
919
out_dict ['{}gbz' .format (tag )] = job .fileStore .writeGlobalFile (gbz_path )
867
- out_dict [ '{}snarls' . format ( tag )] = job . fileStore . writeGlobalFile ( snarls_path )
920
+
868
921
return out_dict
869
922
870
- def make_vcf (job , config , out_name , vcf_ref , index_dict , tag = '' , ref_tag = '' , max_ref_allele = None ):
923
+ def make_vcf (job , config , out_name , vcf_ref , index_dict , tag = '' , ref_tag = '' , max_ref_allele = None , do_rgfa = False ):
871
924
""" make the vcf
872
925
"""
873
926
work_dir = job .fileStore .getLocalTempDir ()
874
927
gbz_path = os .path .join (work_dir , ref_tag + os .path .basename (out_name ) + '.gbz' )
875
928
snarls_path = os .path .join (work_dir , ref_tag + os .path .basename (out_name ) + '.snarls' )
876
- job .fileStore .readGlobalFile (index_dict ['{}gbz' .format (ref_tag )], gbz_path )
929
+ job .fileStore .readGlobalFile (index_dict ['{}{} gbz' .format (ref_tag , 'rgfa.' if do_rgfa else '' )], gbz_path )
877
930
job .fileStore .readGlobalFile (index_dict ['{}snarls' .format (ref_tag )], snarls_path )
878
931
932
+ if do_rgfa :
933
+ tag = 'rgfa.{}' .format (tag )
934
+
879
935
# make the vcf
880
936
vcf_path = os .path .join (work_dir , '{}merged.vcf.gz' .format (tag ))
881
937
decon_cmd = ['vg' , 'deconstruct' , gbz_path , '-P' , vcf_ref , '-C' , '-a' , '-r' , snarls_path , '-t' , str (job .cores )]
938
+ if do_rgfa :
939
+ decon_cmd += ['-P' , rgfa_sample_name ]
882
940
if getOptionalAttrib (findRequiredNode (config .xmlRoot , "graphmap_join" ), "GFANodeIDsInVCF" , typeFn = bool , default = True ):
883
941
decon_cmd .append ('-O' )
884
942
cactus_call (parameters = [decon_cmd , ['bgzip' , '--threads' , str (job .cores )]], outfile = vcf_path , job_memory = job .memory )
@@ -890,12 +948,15 @@ def make_vcf(job, config, out_name, vcf_ref, index_dict, tag='', ref_tag='', max
890
948
RealtimeLogger .warning ('WARNING: tabix failed on VCF with this error {}' .format (str (e )))
891
949
tbi_path = None
892
950
893
- output_dict = { '{}raw.vcf.gz' .format (tag ) : job .fileStore .writeGlobalFile (vcf_path ) }
951
+ # dont do vcfbub when doing rGFA
952
+ out_tag = '{}raw.' .format (tag ) if not do_rgfa else tag
953
+
954
+ output_dict = { '{}vcf.gz' .format (out_tag ) : job .fileStore .writeGlobalFile (vcf_path ) }
894
955
if tbi_path :
895
- output_dict ['{}raw. vcf.gz.tbi' .format (tag )] = job .fileStore .writeGlobalFile (tbi_path )
956
+ output_dict ['{}vcf.gz.tbi' .format (out_tag )] = job .fileStore .writeGlobalFile (tbi_path )
896
957
897
958
# make the filtered vcf
898
- if max_ref_allele :
959
+ if max_ref_allele and not do_rgfa :
899
960
vcfbub_path = os .path .join (work_dir , 'merged.filtered.vcf.gz' )
900
961
cactus_call (parameters = [['vcfbub' , '--input' , vcf_path , '--max-ref-length' , str (max_ref_allele ), '--max-level' , '0' ],
901
962
['bgzip' , '--threads' , str (job .cores )]],
0 commit comments