Skip to content

Commit cd08c69

Browse files
committed
Add support for re-serializing (sharding and reassembling) CRAM containers to a new stream.
1 parent f461401 commit cd08c69

File tree

6 files changed

+366
-5
lines changed

6 files changed

+366
-5
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package htsjdk.samtools;
2+
3+
import htsjdk.samtools.cram.build.CramIO;
4+
import htsjdk.samtools.cram.structure.Container;
5+
import htsjdk.samtools.cram.structure.CramHeader;
6+
import htsjdk.samtools.util.RuntimeIOException;
7+
8+
import java.io.IOException;
9+
import java.io.OutputStream;
10+
11+
/**
12+
* Rewrite a series of containers to a new stream. The CRAM header and SAMFileHeader containers are automatically
13+
* written to the stream when this class is instantiated. An EOF container is automatically written when
14+
* {@link #finish()} is called.
15+
*/
16+
public class CRAMContainerStreamRewriter {
17+
private final OutputStream outputStream;
18+
private final String outputStreamIdentifier;
19+
private final CramHeader cramHeader;
20+
private final SAMFileHeader samFileHeader;
21+
private final CRAMIndexer cramIndexer;
22+
23+
private long streamOffset = 0L;
24+
private long recordCounter = 0L;
25+
26+
/**
27+
* Create a CRAMContainerStreamWriter for writing SAM records into a series of CRAM
28+
* containers on an output stream, with an optional output index.
29+
*
30+
* @param outputStream where to write the CRAM stream.
31+
* @param samFileHeader {@link SAMFileHeader} to be used. Sort order is determined by the sortOrder property of this arg.
32+
* @param outputStreamIdentifier used for display in error message display
33+
* @param indexer CRAM indexer. Can be null if no index is required.
34+
*/
35+
public CRAMContainerStreamRewriter(
36+
final OutputStream outputStream,
37+
final CramHeader cramHeader,
38+
final SAMFileHeader samFileHeader,
39+
final String outputStreamIdentifier,
40+
final CRAMIndexer indexer) {
41+
this.outputStream = outputStream;
42+
this.cramHeader = cramHeader;
43+
this.samFileHeader = samFileHeader;
44+
this.outputStreamIdentifier = outputStreamIdentifier;
45+
this.cramIndexer = indexer;
46+
47+
//TODO: update the SAMFileHeader with a program group to leave a paper trail?
48+
streamOffset = CramIO.writeCramHeader(cramHeader, outputStream);
49+
streamOffset += Container.writeSAMFileHeaderContainer(cramHeader.getCRAMVersion(), samFileHeader, outputStream);
50+
}
51+
52+
/**
53+
* Writes a container to a stream, updating the (stream-relative) global record counter and byte offsets.
54+
*
55+
* Since this method mutates the values in the container, the container is no longer valid in the context
56+
* of the stream from which it originated.
57+
*
58+
* @param container the container to emit to the stream. the container must conform to the version and sort
59+
* order specified in the CRAM header and SAM header provided to the constructor
60+
* {@link #CRAMContainerStreamRewriter(OutputStream, CramHeader, SAMFileHeader, String, CRAMIndexer)}.
61+
* All the containers serialized to a single stream using this method must have originated from the
62+
* same original context(/stream), obtained via {@link htsjdk.samtools.cram.build.CramContainerIterator}.
63+
*/
64+
public void rewriteContainer(final Container container) {
65+
// update the container and slices with the correct global record counter and byte offsets
66+
// (required for indexing)
67+
container.relocateContainer(recordCounter, streamOffset);
68+
69+
// re-serialize the entire container and slice(s), block by block
70+
streamOffset += container.write(cramHeader.getCRAMVersion(), outputStream);
71+
recordCounter += container.getContainerHeader().getNumberOfRecords();
72+
73+
if (cramIndexer != null) {
74+
cramIndexer.processContainer(container, ValidationStringency.SILENT);
75+
}
76+
}
77+
78+
/**
79+
* Finish writing to the stream. Flushes the record cache and optionally emits an EOF container.
80+
*/
81+
public void finish() {
82+
try {
83+
CramIO.writeCramEOF(cramHeader.getCRAMVersion(), outputStream);
84+
outputStream.flush();
85+
if (cramIndexer != null) {
86+
cramIndexer.finish();
87+
}
88+
} catch (final IOException e) {
89+
throw new RuntimeIOException(String.format("IOException closing stream for %s", outputStreamIdentifier));
90+
}
91+
}
92+
93+
}

src/main/java/htsjdk/samtools/CRAMIndexer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package htsjdk.samtools;
22

3-
import htsjdk.samtools.cram.structure.CompressorCache;
43
import htsjdk.samtools.cram.structure.Container;
54

65
/**

src/main/java/htsjdk/samtools/cram/structure/Container.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class Container {
4949
private final List<Slice> slices;
5050

5151
// container's byte offset from the start of the containing stream, used for indexing
52-
private final long containerByteOffset;
52+
private long containerByteOffset;
5353

5454
/**
5555
* Create a Container with a {@link ReferenceContext} derived from its {@link Slice}s.
@@ -190,6 +190,7 @@ public int write(final CRAMVersion cramVersion, final OutputStream outputStream)
190190
// landmark 0 = byte length of the compression header
191191
// landmarks after 0 = byte length of the compression header plus all slices before this one
192192
landmarks.add(tempOutputStream.size());
193+
slice.byteOffsetOfContainer = containerByteOffset;
193194
slice.write(cramVersion, tempOutputStream);
194195
}
195196
getContainerHeader().setLandmarks(landmarks);
@@ -335,6 +336,28 @@ public List<SAMRecord> getSAMRecords(
335336
public CompressionHeader getCompressionHeader() { return compressionHeader; }
336337
public AlignmentContext getAlignmentContext() { return containerHeader.getAlignmentContext(); }
337338
public long getContainerByteOffset() { return containerByteOffset; }
339+
340+
/**
341+
* Update the stream-relative values (global record counter and stream byte offset) for this
342+
* container. For use when re-serializing a container that has been read from an existing stream
343+
* into a new stream. This method mutates the container and it's slices - the container is no
344+
* longer valid in the context of it's original stream.
345+
*
346+
* @param containerRecordCounter the new global record counter for this container
347+
* @param streamByteOffset the new stream byte offset counter for this container
348+
* @return the updated global record counter
349+
*/
350+
public long relocateContainer(final long containerRecordCounter, final long streamByteOffset) {
351+
this.containerByteOffset = streamByteOffset;
352+
this.getContainerHeader().setGlobalRecordCounter(containerRecordCounter);
353+
354+
long sliceRecordCounter = containerRecordCounter;
355+
for (final Slice slice : getSlices()) {
356+
sliceRecordCounter = slice.relocateSlice(sliceRecordCounter, streamByteOffset);
357+
}
358+
return sliceRecordCounter;
359+
}
360+
338361
public List<Slice> getSlices() { return slices; }
339362
public boolean isEOF() {
340363
return containerHeader.isEOF() && (getSlices() == null || getSlices().size() == 0);

src/main/java/htsjdk/samtools/cram/structure/ContainerHeader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class ContainerHeader {
4242
// total length of all blocks in this container (total length of this container, minus the Container Header).
4343
private final AlignmentContext alignmentContext;
4444
private final int recordCount;
45-
private final long globalRecordCounter;
45+
private long globalRecordCounter;
4646
private final long baseCount;
4747
private final int blockCount;
4848

@@ -249,4 +249,8 @@ public boolean isEOF() {
249249
return v3 || v2;
250250
}
251251

252+
void setGlobalRecordCounter(final long recordCounter) {
253+
this.globalRecordCounter = recordCount;
254+
}
255+
252256
}

src/main/java/htsjdk/samtools/cram/structure/Slice.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class Slice {
6767
// Slice header components as defined in the spec
6868
private final AlignmentContext alignmentContext; // ref sequence, alignment start and span
6969
private final int nRecords;
70-
private final long globalRecordCounter;
70+
private long globalRecordCounter;
7171
private final int nSliceBlocks; // includes the core block and external blocks, but not the header block
7272
private List<Integer> contentIDs;
7373
private int embeddedReferenceBlockContentID = EMBEDDED_REFERENCE_ABSENT_CONTENT_ID;
@@ -78,7 +78,7 @@ public class Slice {
7878

7979
private final CompressionHeader compressionHeader;
8080
private final SliceBlocks sliceBlocks;
81-
private final long byteOffsetOfContainer;
81+
public long byteOffsetOfContainer;
8282

8383
private Block sliceHeaderBlock;
8484

@@ -518,6 +518,22 @@ public void normalizeCRAMRecords(final List<CRAMCompressionRecord> cramCompressi
518518
}
519519
}
520520

521+
/**
522+
* Update the stream-relative values (global record counter and container stream byte offset) for
523+
* this slice. For use when re-serializing a container that has been read from an existing stream
524+
* into a new stream. This method mutates the container and it's slices - the container is no
525+
* longer valid in the context of it's original stream.
526+
*
527+
* @param sliceRecordCounter the new global record counter for this slice
528+
* @param containerByteOffset the new stream byte offset counter for this slice's enclosing container
529+
* @return the updated global record counter
530+
*/
531+
long relocateSlice(final long sliceRecordCounter, final long containerByteOffset) {
532+
this.byteOffsetOfContainer = containerByteOffset;
533+
this.globalRecordCounter = sliceRecordCounter;
534+
return sliceRecordCounter + getNumberOfRecords();
535+
}
536+
521537
private int getReferenceOffset(final boolean hasEmbeddedReference) {
522538
final ReferenceContext sliceReferenceContext = getAlignmentContext().getReferenceContext();
523539
return sliceReferenceContext.isMappedSingleRef() && hasEmbeddedReference ?

0 commit comments

Comments
 (0)