Skip to content

Commit e7e907d

Browse files
authored
Data Column (tensorflow#294)
* DataColumn exposes definition and repetition levels. Much stricter check for schema. * bug fixed (tensorflow#282): FSC understands nulls on non-atomic level * updating docs * oops!
1 parent fe69a55 commit e7e907d

31 files changed

+473
-389
lines changed

.github/workflows/full.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name: 'Full Workflow'
22

33
env:
4-
VERSION: 4.6.2
4+
VERSION: 4.7.0
55
ASM_VERSION: 4.0.0
66

77
on:

docs/README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ Then, data columns need to be prepared for writing. As parquet is column-based f
119119

120120
```csharp
121121
var column1 = new DataColumn(
122-
(DataField)schema[0],
122+
schema.DataFields[0],
123123
Enumerable.Range(0, 1_000_000).Select(i => DateTime.UtcNow.AddSeconds(i)).ToArray());
124124

125125
var column2 = new DataColumn(
126-
(DataField)schema[1],
126+
schema.DataFields[1],
127127
Enumerable.Range(0, 1_000_000).Select(i => i % 2 == 0 ? "on" : "off").ToArray());
128128

129129
var column3 = new DataColumn(
130-
(DataField)schema[2],
130+
schema.DataFields[2],
131131
Enumerable.Range(0, 1_000_000).Select(i => (double)i).ToArray());
132132
```
133133

docs/column.md

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# DataColumn
2+
3+
`DataColumn` is an essential part for low-level serialization. It represents a column that has actual data.
4+
5+
For simple records that contain atomic types (int, string etc.) schema only consists of DataColumns.
6+
7+
Here is a sample logical representation of `DataColumn` class:
8+
9+
```mermaid
10+
classDiagram
11+
class DataColumn {
12+
+Field Field
13+
+Array DefinedData
14+
+Array Data
15+
+int[]? DefinitionLevels;
16+
+int[]? RepetitionLevels;
17+
+DataColumn(DataField field, Array definedData, int[]? definitionLevels, int[]? repetitionLevels)
18+
+DataColumn(DataField field, Array data, int[]? repetitionLevels = null)
19+
}
20+
21+
```
22+
23+
`Field` is a schema field that defines this column. You can obtain this field from a schema you define.
24+
25+
`DefinedData` is raw data that is defined by `Field`'s type. If your field is nullable, `DefinedData` represents non-nullable values. On the other hand, `Data` represents data as-is, including nulls. If you are reading `DataColumn` and need to access the data, `Data` is your field. If you need to access data as it's stored in parquet file, use `DefinedData`. The names are chosen mostly due to backward compatibility reasons.
26+
27+
Going further, if you need to access *repetition and definition levels* as they are stored in parquet file, you can use the corresponding `DefinitionLevels` and `RepetitionLevels` fields.
28+
29+
## Creating DataColumn
30+
31+
There are two public constuctors available (see above diagram). For convenience and backward compatibility, the second constuctor accepts `DataField` and two parameters:
32+
33+
1. `data` is data to write, including nulls if the field is nullable. DataColumn will decompose the data array into `DefinitionLevels` and `DefinedData` on construction.
34+
2. `repetitionLevels` are only required if a field is a part of a nested type.
35+
36+
The first constructor is more granular and allows you to specify all three parts when constructing a colum.

docs/writing.md

+25-20
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,24 @@ Writing files is a multi stage process, giving you the full flexibility on what
1010
4. When required, repeat from step (2) to create more row groups. A row groups is like a physical data partition that should fit in memory for processing. It's a guess game how much data should be in a single row group, but a number of at least 5 thousand rows per column is great. Remember that parquet format works best on large chunks of data.
1111

1212
```csharp
13+
// create file schema
14+
var schema = new ParquetSchema(
15+
new DataField<int>("id"),
16+
new DataField<string>("city"));
17+
1318
//create data columns with schema metadata and the data you need
1419
var idColumn = new DataColumn(
15-
new DataField<int>("id"),
20+
schema.DataFields[0],
1621
new int[] { 1, 2 });
1722

1823
var cityColumn = new DataColumn(
19-
new DataField<string>("city"),
24+
schema.DataFields[1],
2025
new string[] { "London", "Derby" });
2126

22-
// create file schema
23-
var schema = new ParquetSchema(idColumn.Field, cityColumn.Field);
24-
2527
using(Stream fileStream = System.IO.File.OpenWrite("c:\\test.parquet")) {
2628
using(ParquetWriter parquetWriter = await ParquetWriter.CreateAsync(schema, fileStream)) {
29+
parquetWriter.CompressionMethod = CompressionMethod.Gzip;
30+
parquetWriter.CompressionLevel = System.IO.Compression.CompressionLevel.Optimal;
2731
// create a new row group in the file
2832
using(ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup()) {
2933
await groupWriter.WriteColumnAsync(idColumn);
@@ -33,7 +37,9 @@ using(Stream fileStream = System.IO.File.OpenWrite("c:\\test.parquet")) {
3337
}
3438
```
3539

36-
# Specifying Compression Method and Level
40+
To read more about DataColumn, see [this page](column.md).
41+
42+
### Specifying Compression Method and Level
3743

3844
After constructing `ParquetWriter` you can optionally set compression method ([`CompressionMethod`](../src/Parquet/CompressionMethod.cs)) and/or compression level ([`CompressionLevel`](https://learn.microsoft.com/en-us/dotnet/api/system.io.compression.compressionlevel?view=net-7.0)) which defaults to `Snappy`. Unless you have specific needs to override compression, the default are very reasonable.
3945

@@ -48,28 +54,28 @@ using(ParquetWriter parquetWriter = await ParquetWriter.CreateAsync(schema, file
4854
```
4955

5056

51-
# Appending to Files
57+
### Appending to Files
5258

5359
This lib supports pseudo appending to files, however it's worth keeping in mind that *row groups are immutable* by design, therefore the only way to append is to create a new row group at the end of the file. It's worth mentioning that small row groups make data compression and reading extremely ineffective, therefore the larger your row group the better.
5460

5561
This should make you The following code snippet illustrates this:
5662

5763
```csharp
5864
//write a file with a single row group
59-
var id = new DataField<int>("id");
65+
var schema = new ParquetSchema(new DataField<int>("id"));
6066
var ms = new MemoryStream();
6167

62-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms)) {
68+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
6369
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
64-
await rg.WriteColumnAsync(new DataColumn(id, new int[] { 1, 2 }));
70+
await rg.WriteColumnAsync(new DataColumn(schema.DataFields[0], new int[] { 1, 2 }));
6571
}
6672
}
6773

6874
//append to this file. Note that you cannot append to existing row group, therefore create a new one
6975
ms.Position = 0; // this is to rewind our memory stream, no need to do it in real code.
70-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms, append: true)) {
76+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms, append: true)) {
7177
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
72-
await rg.WriteColumnAsync(new DataColumn(id, new int[] { 3, 4 }));
78+
await rg.WriteColumnAsync(new DataColumn(schema.DataFields[0], new int[] { 3, 4 }));
7379
}
7480
}
7581

@@ -80,39 +86,38 @@ using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
8086

8187
using(ParquetRowGroupReader rg = reader.OpenRowGroupReader(0)) {
8288
Assert.Equal(2, rg.RowCount);
83-
Assert.Equal(new int[] { 1, 2 }, (await rg.ReadColumnAsync(id)).Data);
89+
Assert.Equal(new int[] { 1, 2 }, (await rg.ReadColumnAsync(schema.DataFields[0])).Data);
8490
}
8591

8692
using(ParquetRowGroupReader rg = reader.OpenRowGroupReader(1)) {
8793
Assert.Equal(2, rg.RowCount);
88-
Assert.Equal(new int[] { 3, 4 }, (await rg.ReadColumnAsync(id)).Data);
94+
Assert.Equal(new int[] { 3, 4 }, (await rg.ReadColumnAsync(schema.DataFields[0])).Data);
8995
}
9096

9197
}
92-
9398
```
9499

95100
Note that you have to specify that you are opening `ParquetWriter` in **append** mode in it's constructor explicitly - `new ParquetWriter(new Schema(id), ms, append: true)`. Doing so makes parquet.net open the file, find the file footer and delete it, rewinding current stream position to the end of actual data. Then, creating more row groups simply writes data to the file as usual, and `.Dispose()` on `ParquetWriter` generates a new file footer, writes it to the file and closes down the stream.
96101

97102
Please keep in mind that row groups are designed to hold a large amount of data (5'0000 rows on average) therefore try to find a large enough batch to append to the file. Do not treat parquet file as a row stream by creating a row group and placing 1-2 rows in it, because this will both increase file size massively and cause a huge performance degradation for a client reading such a file.
98103

99-
# Custom Metadata
104+
### Custom Metadata
100105

101106
To read and write custom file metadata, you can use `CustomMetadata` property on `ParquetFileReader` and `ParquetFileWriter`, i.e.
102107

103108
```csharp
104109
var ms = new MemoryStream();
105-
var id = new DataField<int>("id");
110+
var schema = new ParquetSchema(new DataField<int>("id"));
106111

107112
//write
108-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms)) {
113+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
109114
writer.CustomMetadata = new Dictionary<string, string> {
110115
["key1"] = "value1",
111116
["key2"] = "value2"
112117
};
113118

114119
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
115-
await rg.WriteColumnAsync(new DataColumn(id, new[] { 1, 2, 3, 4 }));
120+
await rg.WriteColumnAsync(new DataColumn(schema.DataFields[0], new[] { 1, 2, 3, 4 }));
116121
}
117122
}
118123

@@ -123,6 +128,6 @@ using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
123128
}
124129
```
125130

126-
# Complex Types
131+
### Complex Types
127132

128133
To write complex types (arrays, lists, maps, structs) read [this guide](nested_types.md).

perf/pyarrow/1.parquet

792 Bytes
Binary file not shown.

perf/pyarrow/run.py

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
df = pd.DataFrame({
1212
"ints": [1, 2, 3],
13+
"tags": [[1, 2], [3, 4], [5, 6]]
1314
}, index=list("abc"))
1415

1516
table = pa.Table.from_pandas(df, preserve_index=False)

src/Parquet.Test/DocRef.cs

+17-15
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,20 @@ public async Task ReadIntro() {
125125
}
126126

127127
public async Task WriteIntro() {
128+
// create file schema
129+
var schema = new ParquetSchema(
130+
new DataField<int>("id"),
131+
new DataField<string>("city"));
132+
128133
//create data columns with schema metadata and the data you need
129134
var idColumn = new DataColumn(
130-
new DataField<int>("id"),
135+
schema.DataFields[0],
131136
new int[] { 1, 2 });
132137

133138
var cityColumn = new DataColumn(
134-
new DataField<string>("city"),
139+
schema.DataFields[1],
135140
new string[] { "London", "Derby" });
136141

137-
// create file schema
138-
var schema = new ParquetSchema(idColumn.Field, cityColumn.Field);
139-
140142
using(Stream fileStream = System.IO.File.OpenWrite("c:\\test.parquet")) {
141143
using(ParquetWriter parquetWriter = await ParquetWriter.CreateAsync(schema, fileStream)) {
142144
parquetWriter.CompressionMethod = CompressionMethod.Gzip;
@@ -152,20 +154,20 @@ public async Task WriteIntro() {
152154

153155
public async Task AppendDemo() {
154156
//write a file with a single row group
155-
var id = new DataField<int>("id");
157+
var schema = new ParquetSchema(new DataField<int>("id"));
156158
var ms = new MemoryStream();
157159

158-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms)) {
160+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
159161
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
160-
await rg.WriteColumnAsync(new DataColumn(id, new int[] { 1, 2 }));
162+
await rg.WriteColumnAsync(new DataColumn(schema.DataFields[0], new int[] { 1, 2 }));
161163
}
162164
}
163165

164166
//append to this file. Note that you cannot append to existing row group, therefore create a new one
165167
ms.Position = 0; // this is to rewind our memory stream, no need to do it in real code.
166-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms, append: true)) {
168+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms, append: true)) {
167169
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
168-
await rg.WriteColumnAsync(new DataColumn(id, new int[] { 3, 4 }));
170+
await rg.WriteColumnAsync(new DataColumn(schema.DataFields[0], new int[] { 3, 4 }));
169171
}
170172
}
171173

@@ -176,30 +178,30 @@ public async Task AppendDemo() {
176178

177179
using(ParquetRowGroupReader rg = reader.OpenRowGroupReader(0)) {
178180
Assert.Equal(2, rg.RowCount);
179-
Assert.Equal(new int[] { 1, 2 }, (await rg.ReadColumnAsync(id)).Data);
181+
Assert.Equal(new int[] { 1, 2 }, (await rg.ReadColumnAsync(schema.DataFields[0])).Data);
180182
}
181183

182184
using(ParquetRowGroupReader rg = reader.OpenRowGroupReader(1)) {
183185
Assert.Equal(2, rg.RowCount);
184-
Assert.Equal(new int[] { 3, 4 }, (await rg.ReadColumnAsync(id)).Data);
186+
Assert.Equal(new int[] { 3, 4 }, (await rg.ReadColumnAsync(schema.DataFields[0])).Data);
185187
}
186188

187189
}
188190
}
189191

190192
public async Task CustomMetadata() {
191193
var ms = new MemoryStream();
192-
var id = new DataField<int>("id");
194+
var schema = new ParquetSchema(new DataField<int>("id"));
193195

194196
//write
195-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms)) {
197+
using(ParquetWriter writer = await ParquetWriter.CreateAsync(schema, ms)) {
196198
writer.CustomMetadata = new Dictionary<string, string> {
197199
["key1"] = "value1",
198200
["key2"] = "value2"
199201
};
200202

201203
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
202-
await rg.WriteColumnAsync(new DataColumn(id, new[] { 1, 2, 3, 4 }));
204+
await rg.WriteColumnAsync(new DataColumn(schema.DataFields[0], new[] { 1, 2, 3, 4 }));
203205
}
204206
}
205207

src/Parquet.Test/DumbStreamTest.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public async Task Read_from_stream_that_only_retuns_one_byte_at_the_time() {
1919
DateTime[] data = new DateTime[10000];
2020
for(int i = 0; i < 10000; i++)
2121
data[i] = DateTime.UtcNow.AddMilliseconds(i);
22-
await rowGroupWriter.WriteColumnAsync(new DataColumn(field, data, 0, 10000));
22+
await rowGroupWriter.WriteColumnAsync(new DataColumn(field, data));
2323
}
2424

2525
int fileSize = (int)memoryFileStream.Length;

src/Parquet.Test/ParquetWriterTest.cs

+4-51
Original file line numberDiff line numberDiff line change
@@ -165,13 +165,10 @@ public async Task Append_to_file_reads_all_data() {
165165
}
166166
}
167167

168-
public readonly static IEnumerable<object[]> NullableColumnContentCases = new List<object[]>()
169-
{
170-
new object[] { new int?[] { 1, 2 } },
171-
new object[] { new int?[] { null } },
172-
new object[] { new int?[] { 1, null, 2 } },
173-
new object[] { new int[] { 1, 2 } },
174-
};
168+
public readonly static IEnumerable<object[]> NullableColumnContentCases = new List<object[]>(){
169+
new object[] { new int?[] { 1, 2 } },
170+
new object[] { new int?[] { null } },
171+
new object[] { new int?[] { 1, null, 2 } } };
175172

176173
[Theory]
177174
[MemberData(nameof(NullableColumnContentCases))]
@@ -196,50 +193,6 @@ public async Task Write_read_nullable_column(Array input) {
196193
}
197194
}
198195

199-
[Fact]
200-
public async Task Writes_only_beginning_of_array() {
201-
var ms = new MemoryStream();
202-
var id = new DataField<int>("id");
203-
204-
//write
205-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms)) {
206-
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
207-
await rg.WriteColumnAsync(new DataColumn(id, new[] { 1, 2, 3, 4 }, 0, 3));
208-
}
209-
}
210-
211-
//read back
212-
using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
213-
Assert.Equal(3, reader.ThriftMetadata!.Num_rows);
214-
215-
using(ParquetRowGroupReader rg = reader.OpenRowGroupReader(0)) {
216-
Assert.Equal(new int[] { 1, 2, 3 }, (await rg.ReadColumnAsync(id)).Data);
217-
}
218-
}
219-
}
220-
221-
[Fact]
222-
public async Task Writes_only_end_of_array() {
223-
var ms = new MemoryStream();
224-
var id = new DataField<int>("id");
225-
226-
//write
227-
using(ParquetWriter writer = await ParquetWriter.CreateAsync(new ParquetSchema(id), ms)) {
228-
using(ParquetRowGroupWriter rg = writer.CreateRowGroup()) {
229-
await rg.WriteColumnAsync(new DataColumn(id, new[] { 1, 2, 3, 4 }, 1, 3));
230-
}
231-
}
232-
233-
//read back
234-
using(ParquetReader reader = await ParquetReader.CreateAsync(ms)) {
235-
Assert.Equal(3, reader.ThriftMetadata!.Num_rows);
236-
237-
using(ParquetRowGroupReader rg = reader.OpenRowGroupReader(0)) {
238-
Assert.Equal(new int[] { 2, 3, 4 }, (await rg.ReadColumnAsync(id)).Data);
239-
}
240-
}
241-
}
242-
243196
[Fact]
244197
public async Task FileMetadata_sets_num_rows_on_file_and_row_group() {
245198
var ms = new MemoryStream();

0 commit comments

Comments
 (0)