Skip to content

Commit 8bb233a

Browse files
Ivan Gavryliukazurecoder
Ivan Gavryliuk
authored andcommitted
57 alltypes plain writer (tensorflow#65)
* started working on boolean writer * - add plain writer for float, int64 and double - dont' read definitions for required columns * bitpacker for bolean * writing strings * test for reading/writing bools * finish reading basic types * update readme
1 parent 04903d0 commit 8bb233a

23 files changed

+487
-77
lines changed

PULL_REQUEST_TEMPLATE.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
Fixes #
1+
### Fixes
2+
3+
Issue #
24

35
### Description
46

7+
desctiption goes here
58

69
### Todos
710
- [ ] unit/integration tests

README.md

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ This project is aimed to fix this problem.
1212

1313
## Roadmap
1414

15-
We have just started to work on this library, contributors are welcome.
15+
We have just started to work on this library, [contributors are welcome](CONTRIBUTING.md).
1616

1717
|Phase|Description|State|
1818
|-----|-----------|-----|
@@ -21,13 +21,15 @@ We have just started to work on this library, contributors are welcome.
2121
|3|Support GZIP and SNAPPY decompression/compression|planning|
2222
|4|Integrate with popular products like Azure Data Lakes|planning|
2323

24+
You can track the amount of features we have [implemented so far](doc/features.md).
25+
2426
## Getting started
2527

2628
**parquet-dotnet** is redistributed as a [NuGet package](https://www.nuget.org/packages/Parquet.Net) for `.NET 4.5.1` and `.NET Standard 1.6`. All code is managed and doesn't have any native dependencies, therefore you are ready to go after referencing the package.
2729

2830
### Reading files
2931

30-
In order to read a parquet file you need to open a stream first. Due to the fact that Parquet utilises file seeking extensively, the input stream must be *readable and seekable*.
32+
In order to read a parquet file you need to open a stream first. Due to the fact that Parquet utilises file seeking extensively, the input stream must be *readable and seekable*. This somewhat limits the amount of streaming you can do, for instance you can't read a parquet file from a network stream as we need to jump around it, therefore you have to download it locally to disk and then open.
3133

3234
For instance, to read a file `c:\test.parquet` you woudl normally write the following code
3335

@@ -46,6 +48,33 @@ using(Stream fs = File.OpenRead("c:\\test.parquet"))
4648

4749
this will read entire file in memory as a set of columns inside `ParquetDataSet` class.
4850

51+
### Writing files
52+
53+
Parquet.Net operates on streams, therefore you need to create it first. The following example shows how to create a file on disk with two columns - `id` and `city`.
54+
55+
```csharp
56+
using System.IO;
57+
using Parquet;
58+
59+
var idColumn = new ParquetColumn<int>("id");
60+
idColumn.Add(1, 2);
61+
62+
var cityColumn = new ParquetColumn<string>("city");
63+
cityColumn.Add("London", "Derby");
64+
65+
var dataSet = new ParquetDataSet(idColumn, cityColumn);
66+
67+
using(Stream fileStream = File.OpenWrite("c:\\test.parquet"))
68+
{
69+
using(var writer = new ParquetWriter(fileStream))
70+
{
71+
writer.Write(dataSet);
72+
}
73+
}
74+
75+
```
76+
77+
4978
## Tools
5079

5180
### parq
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
using Parquet.File.Values;
2+
using Parquet.Thrift;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.IO;
6+
using System.Linq;
7+
using System.Text;
8+
using Xunit;
9+
10+
namespace Parquet.Test.File.Values
11+
{
12+
public class PlainValuesReaderWriterTest
13+
{
14+
private ParquetOptions _options;
15+
private IValuesReader _reader;
16+
private IValuesWriter _writer;
17+
private BinaryWriter _bw;
18+
private BinaryReader _br;
19+
private MemoryStream _ms;
20+
21+
public PlainValuesReaderWriterTest()
22+
{
23+
_options = new ParquetOptions();
24+
25+
_reader = new PlainValuesReader(_options);
26+
_writer = new PlainValuesWriter(_options);
27+
28+
_ms = new MemoryStream();
29+
_br = new BinaryReader(_ms);
30+
_bw = new BinaryWriter(_ms);
31+
}
32+
33+
[Fact]
34+
public void Booleans()
35+
{
36+
var bools = new List<bool>();
37+
bools.AddRange(new[] { true, false, true });
38+
39+
var schema = new SchemaElement();
40+
schema.Type = Thrift.Type.BOOLEAN;
41+
42+
_writer.Write(_bw, schema, bools);
43+
44+
_ms.Position = 0;
45+
46+
var boolsRead = new List<bool?>();
47+
_reader.Read(_br, schema, boolsRead, 3);
48+
49+
Assert.Equal(bools, boolsRead.Cast<bool>().ToList());
50+
}
51+
}
52+
}

src/Parquet.Test/Parquet.Test.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,8 @@
106106
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
107107
</ItemGroup>
108108

109+
<ItemGroup>
110+
<Folder Include="Writer\" />
111+
</ItemGroup>
112+
109113
</Project>

src/Parquet.Test/ParquetReaderOnTestFilesTest.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace Parquet.Test
77
{
8-
using File = System.IO.File;
8+
using F = System.IO.File;
99

1010
/// <summary>
1111
/// Tests a set of predefined test files that they read back correct
@@ -15,7 +15,7 @@ public class ParquetReaderOnTestFilesTest
1515
[Fact]
1616
public void FixedLenByteArray_dictionary()
1717
{
18-
using (Stream s = File.OpenRead(GetDataFilePath("fixedlenbytearray.parquet")))
18+
using (Stream s = F.OpenRead(GetDataFilePath("fixedlenbytearray.parquet")))
1919
{
2020
using (var r = new ParquetReader(s))
2121
{
@@ -27,7 +27,7 @@ public void FixedLenByteArray_dictionary()
2727
[Fact]
2828
public void Datetypes_all()
2929
{
30-
using (Stream s = File.OpenRead(GetDataFilePath("dates.parquet")))
30+
using (Stream s = F.OpenRead(GetDataFilePath("dates.parquet")))
3131
{
3232
using (var r = new ParquetReader(s))
3333
{

src/Parquet.Test/ParquetWriterTest.cs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,40 @@ namespace Parquet.Test
66
{
77
public class ParquetWriterTest
88
{
9-
//[Fact]
9+
[Fact]
1010
public void Write_simple_int32_and_int_reads_back()
1111
{
12-
const string path = "c:\\tmp\\first.parquet";
13-
if (F.Exists(path)) F.Delete(path);
12+
var ms = new MemoryStream();
1413

15-
using (var ms = F.OpenWrite(path))
14+
using (var writer = new ParquetWriter(ms))
1615
{
17-
using (var writer = new ParquetWriter(ms))
18-
{
19-
var ds = new ParquetDataSet();
16+
var ds = new ParquetDataSet();
17+
18+
//8 values for each column
19+
20+
var idCol = new ParquetColumn<int>("id");
21+
idCol.Add(4, 5, 6, 7, 2, 3, 0, 1);
22+
23+
var bool_col = new ParquetColumn<bool>("bool_col");
24+
bool_col.Add(true, false, true, false, true, false, true, false);
2025

21-
var intCol = new ParquetColumn<int>("id");
22-
intCol.Add(1, 2, 3, 4, 5);
26+
var string_col = new ParquetColumn<string>("string_col");
27+
string_col.Add("0", "1", "0", "1", "0", "1", "0", "1");
2328

24-
writer.Write(new ParquetDataSet(intCol));
25-
}
29+
writer.Write(new ParquetDataSet(idCol, bool_col, string_col));
2630
}
2731

28-
using (var ms = F.OpenRead(path))
32+
ms.Position = 0;
33+
using (var reader = new ParquetReader(ms))
2934
{
30-
using (var reader = new ParquetReader(ms))
31-
{
32-
ParquetDataSet ds = reader.Read();
33-
}
35+
ParquetDataSet ds = reader.Read();
3436
}
37+
38+
#if DEBUG
39+
const string path = "c:\\tmp\\first.parquet";
40+
F.WriteAllBytes(path, ms.ToArray());
41+
#endif
42+
3543
}
3644
}
3745
}

src/Parquet.Test/Reader/ParquetCsvComparison.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,6 @@ private void Compare(ParquetDataSet parquet, Dictionary<string, List<string>> cs
4242
int i = 0;
4343
foreach(ParquetColumn pc in parquet.Columns)
4444
{
45-
/*if (pc.Name == "Parish")
46-
{
47-
i += 1;
48-
continue;
49-
}*/
50-
5145
List<string> cc = csv[pc.Name];
5246
Type expectedColumnType = columnTypes[i];
5347

src/Parquet.sln

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
2323
..\appveyor.ps1 = ..\appveyor.ps1
2424
..\build.ps1 = ..\build.ps1
2525
..\CONTRIBUTING.md = ..\CONTRIBUTING.md
26+
..\ISSUE_TEMPLATE.md = ..\ISSUE_TEMPLATE.md
2627
PreCommit.ps1 = PreCommit.ps1
28+
..\PULL_REQUEST_TEMPLATE.md = ..\PULL_REQUEST_TEMPLATE.md
2729
..\README.md = ..\README.md
2830
EndProjectSection
2931
EndProject

src/Parquet/CompressionMethod.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace Parquet
2+
{
3+
/// <summary>
4+
/// Parquet compression method
5+
/// </summary>
6+
public enum CompressionMethod
7+
{
8+
/// <summary>
9+
/// No compression
10+
/// </summary>
11+
None
12+
}
13+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.IO.Compression;
3+
4+
namespace Parquet.File.Data
5+
{
6+
class GzipDataWriter : IDataWriter
7+
{
8+
public void Write(byte[] buffer)
9+
{
10+
}
11+
}
12+
}
Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Text;
1+
using System.IO;
42

53
namespace Parquet.File.Data
64
{
75
class UncompressedDataWriter : IDataWriter
86
{
7+
private readonly Stream _stream;
8+
9+
public UncompressedDataWriter(Stream stream)
10+
{
11+
_stream = stream;
12+
}
13+
914
public void Write(byte[] buffer)
1015
{
11-
throw new NotImplementedException();
16+
_stream.Write(buffer, 0, buffer.Length);
1217
}
1318
}
1419
}

src/Parquet/File/ListFactory.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using Type = System.Type;
5+
using TType = Parquet.Thrift.Type;
6+
using System.Collections;
7+
using Parquet.Thrift;
8+
9+
namespace Parquet.File
10+
{
11+
static class ListFactory
12+
{
13+
struct TypeTag
14+
{
15+
public TType PType;
16+
17+
public Func<IList> Create;
18+
19+
public ConvertedType? ConvertedType;
20+
21+
public TypeTag(TType ptype, Func<IList> create, ConvertedType? convertedType)
22+
{
23+
PType = ptype;
24+
Create = create;
25+
ConvertedType = convertedType;
26+
}
27+
}
28+
29+
private static readonly Dictionary<Type, TypeTag> TypeToTag = new Dictionary<Type, TypeTag>
30+
{
31+
{ typeof(int), new TypeTag(TType.INT32, () => new List<int>(), null) },
32+
{ typeof(bool), new TypeTag(TType.BOOLEAN, () => new List<bool>(), null) },
33+
{ typeof(string), new TypeTag(TType.BYTE_ARRAY, () => new List<string>(), ConvertedType.UTF8) }
34+
};
35+
36+
public static IList Create(Type systemType, SchemaElement schema)
37+
{
38+
if (!TypeToTag.TryGetValue(systemType, out TypeTag tag))
39+
throw new NotSupportedException($"system type {systemType} is not supported");
40+
41+
schema.Type = tag.PType;
42+
if(tag.ConvertedType != null)
43+
schema.Converted_type = tag.ConvertedType.Value;
44+
return tag.Create();
45+
}
46+
}
47+
}

src/Parquet/File/MetaBuilder.cs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using Parquet.Thrift;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
7+
namespace Parquet.File
8+
{
9+
class MetaBuilder
10+
{
11+
private readonly FileMetaData _meta;
12+
13+
public MetaBuilder()
14+
{
15+
_meta = new FileMetaData();
16+
17+
_meta.Created_by = "parquet-dotnet";
18+
_meta.Version = 1;
19+
_meta.Row_groups = new List<RowGroup>();
20+
}
21+
22+
public FileMetaData ThriftMeta => _meta;
23+
24+
public void AddSchema(ParquetDataSet dataSet)
25+
{
26+
long totalCount = dataSet.Count;
27+
28+
_meta.Schema = new List<SchemaElement> { new SchemaElement("schema") { Num_children = dataSet.Columns.Count } };
29+
_meta.Schema.AddRange(dataSet.Columns.Select(c => c.Schema));
30+
_meta.Num_rows = totalCount;
31+
}
32+
33+
public RowGroup AddRowGroup()
34+
{
35+
var rg = new RowGroup();
36+
_meta.Row_groups.Add(rg);
37+
return rg;
38+
}
39+
}
40+
}

src/Parquet/File/PColumn.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,11 @@ private IList ReadDictionaryPage(PageHeader ph)
140140
{
141141
//todo: read repetition levels (only relevant for nested columns)
142142

143-
List<int> definitions = ReadDefinitionLevels(reader, (int)maxValues);
143+
//check if there are definitions at all
144+
bool hasDefinitions = _schemaElement.Repetition_type == FieldRepetitionType.OPTIONAL;
145+
List<int> definitions = hasDefinitions
146+
? ReadDefinitionLevels(reader, (int)maxValues)
147+
: null;
144148

145149
// these are pointers back to the Values table - lookup on values
146150
List<int> indexes = ReadColumnValues(reader, ph.Data_page_header.Encoding, destination, maxValues);

0 commit comments

Comments
 (0)