Skip to content

Commit 1a5dde2

Browse files
authored
ParquetSerializer support for append operations (tensorflow#280)
1 parent 35d1758 commit 1a5dde2

File tree

8 files changed

+85
-4
lines changed

8 files changed

+85
-4
lines changed

.github/workflows/full.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: 'Full Workflow'
22

33
env:
44
VERSION: 4.6.1
5-
ASM_VERSION: 4.6.0
5+
ASM_VERSION: 4.0.0
66

77
on:
88
push:

docs/serialisation.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,31 @@ Similar to JSON [supported collection types](https://learn.microsoft.com/en-us/d
293293
`*` Technically impossible.
294294
`**` Technically possible, but not implemented yet.
295295

296+
## Appending to Files
297+
298+
`ParquetSerializer` supports appending data to an existing Parquet file. This can be useful when you have multiple batches of data that need to be written to the same file.
299+
300+
To use this feature, you need to set the `Append` flag to `true` in the `ParquetSerializerOptions` object that you pass to the `SerializeAsync` method. This will tell the library to append the data batch to the end of the file stream instead of overwriting it. For example:
301+
302+
```csharp
303+
await ParquetSerializer.SerializeAsync(dataBatch, ms, new ParquetSerializerOptions { Append = true });
304+
```
305+
306+
However, there is one caveat: you should not set the `Append` flag to `true` for the first batch of data that you write to a new file. This is because a Parquet file has a header and a footer that contain metadata about the schema and statistics of the data. If you try to append data to an empty file stream, you will get an `IOException` because there is no header or footer to read from. Therefore, you should always set the `Append` flag to `false` for the first batch (or not pass any options, which makes it `false` by default) and then switch it to `true` for subsequent batches. For example:
307+
308+
```csharp
309+
// First batch
310+
await ParquetSerializer.SerializeAsync(dataBatch1, ms, new ParquetSerializerOptions { Append = false });
311+
312+
// Second batch
313+
await ParquetSerializer.SerializeAsync(dataBatch2, ms, new ParquetSerializerOptions { Append = true });
314+
315+
// Third batch
316+
await ParquetSerializer.SerializeAsync(dataBatch3, ms, new ParquetSerializerOptions { Append = true });
317+
```
318+
319+
By following this pattern, you can easily append data to a Parquet file using `ParquetSerializer`.
320+
296321
## FAQ
297322

298323
**Q.** Can I specify schema for serialisation/deserialisation.

src/Parquet.Test/Serialisation/ParquetSerializerTest.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,5 +202,41 @@ public async Task Map_Simple_Serde() {
202202
XAssert.JsonEquivalent(data, data2);
203203

204204
}
205+
206+
[Fact]
207+
public async Task Append_reads_all_data() {
208+
var data = Enumerable.Range(0, 1_000).Select(i => new Record {
209+
Timestamp = DateTime.UtcNow.AddSeconds(i),
210+
EventName = i % 2 == 0 ? "on" : "off",
211+
MeterValue = i
212+
}).ToList();
213+
214+
using var ms = new MemoryStream();
215+
216+
const int batchSize = 100;
217+
for(int i = 0; i < data.Count; i += batchSize) {
218+
List<Record> dataBatch = data.Skip(i).Take(batchSize).ToList();
219+
220+
ms.Position = 0;
221+
await ParquetSerializer.SerializeAsync(dataBatch, ms, new ParquetSerializerOptions { Append = i > 0 });
222+
}
223+
224+
ms.Position = 0;
225+
IList<Record> data2 = await ParquetSerializer.DeserializeAsync<Record>(ms);
226+
227+
Assert.Equivalent(data2, data);
228+
}
229+
230+
[Fact]
231+
public async Task Append_to_new_file_fails() {
232+
var data = Enumerable.Range(0, 10).Select(i => new Record {
233+
Timestamp = DateTime.UtcNow.AddSeconds(i),
234+
EventName = i % 2 == 0 ? "on" : "off",
235+
MeterValue = i
236+
}).ToList();
237+
238+
using var ms = new MemoryStream();
239+
await Assert.ThrowsAsync<IOException>(async () => await ParquetSerializer.SerializeAsync(data, ms, new ParquetSerializerOptions { Append = true }));
240+
}
205241
}
206242
}

src/Parquet/Encodings/SchemaEncoder.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ static bool TryBuildList(List<Thrift.SchemaElement> schema,
206206
//as we are skipping elements set path hint
207207
Thrift.SchemaElement tseRepeatedGroup = schema[index + 1];
208208
field.Path = new FieldPath(tseList.Name, tseRepeatedGroup.Name);
209+
field.ThriftSchemaElement = se;
209210
field.GroupSchemaElement = tseRepeatedGroup;
210211
index += 2; //skip this element and child container
211212
ownedChildren = 1; //we should get this element assigned back
@@ -237,7 +238,8 @@ static bool TryBuildMap(List<Thrift.SchemaElement> schema,
237238
var map = new MapField(root.Name) {
238239
Path = new FieldPath(root.Name, tseContainer.Name),
239240
IsNullable = root.Repetition_type != FieldRepetitionType.REQUIRED,
240-
GroupSchemaElement = tseContainer
241+
GroupSchemaElement = tseContainer,
242+
ThriftSchemaElement = root
241243
};
242244

243245
index += 1;
@@ -261,6 +263,7 @@ static bool TryBuildStruct(List<Thrift.SchemaElement> schema,
261263
ownedChildren = container.Num_children; //make then owned to receive in .Assign()
262264
field = StructField.CreateWithNoElements(container.Name);
263265
field.IsNullable = container.Repetition_type != FieldRepetitionType.REQUIRED;
266+
field.ThriftSchemaElement = container;
264267
return true;
265268
}
266269

@@ -304,6 +307,7 @@ static bool TryBuildStruct(List<Thrift.SchemaElement> schema,
304307

305308
df.IsNullable = isNullable;
306309
df.IsArray = isArray;
310+
df.ThriftSchemaElement = se;
307311
f = df;
308312

309313
index++;

src/Parquet/ParquetWriter.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ private async Task PrepareFileAsync(bool append, CancellationToken cancellationT
9393
if(!Stream.CanSeek)
9494
throw new IOException("destination stream must be seekable for append operations.");
9595

96+
if(Stream.Length == 0)
97+
throw new IOException($"you can only append to existing streams, but current stream is empty.");
98+
9699
await ValidateFileAsync();
97100

98101
Thrift.FileMetaData fileMeta = await ReadMetadataAsync(cancellationToken);

src/Parquet/Schema/Field.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ internal List<string> GetNaturalChildPath(List<string> path) {
6060
/// </summary>
6161
internal string? ClrPropName { get; set; }
6262

63+
/// <summary>
64+
/// Low-level thrift schema element corresponding to this high-level schema element.
65+
/// Only set when reading files.
66+
/// </summary>
67+
public Thrift.SchemaElement? ThriftSchemaElement { get; internal set; }
68+
6369
internal virtual FieldPath? PathPrefix { set { } }
6470

6571
/// <summary>

src/Parquet/Serialization/ParquetSerializer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public static async Task<ParquetSchema> SerializeAsync<T>(IEnumerable<T> objectI
3333

3434
Striper<T> striper = new Striper<T>(typeof(T).GetParquetSchema(false));
3535

36-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, false, cancellationToken)) {
36+
bool append = options != null && options.Append;
37+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(striper.Schema, destination, null, append, cancellationToken)) {
3738

3839
if(options != null) {
3940
writer.CompressionMethod = options.CompressionMethod;

src/Parquet/Serialization/ParquetSerializerOptions.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ namespace Parquet.Serialization {
55
/// Parquet serializer options
66
/// </summary>
77
public class ParquetSerializerOptions {
8+
9+
/// <summary>
10+
/// When set to true, appends to file by creating a new row group.
11+
/// </summary>
12+
public bool Append { get; set; } = false;
13+
814
/// <summary>
915
/// Page compression method
1016
/// </summary>
@@ -15,6 +21,6 @@ public class ParquetSerializerOptions {
1521
/// Page compression level
1622
/// </summary>
1723

18-
public CompressionLevel CompressionLevel = CompressionLevel.Optimal;
24+
public CompressionLevel CompressionLevel { get; set; } = CompressionLevel.Optimal;
1925
}
2026
}

0 commit comments

Comments
 (0)