Skip to content

Commit 6bcd82d

Browse files
committed
feat: Implement parquet datasource with schema conversion
fix: some schema propagation bugs feat: add a main java app to test on nyc trips dataset
1 parent 56254c2 commit 6bcd82d

File tree

10 files changed

+396
-16
lines changed

10 files changed

+396
-16
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,7 @@ TODO.md
9595
TODO.rst
9696
TODO
9797

98-
# End of https://www.toptal.com/developers/gitignore/api/java,maven,visualstudiocode,gradle
98+
### Datasets ###
99+
datasets/
100+
101+
# End of https://www.toptal.com/developers/gitignore/api/java,maven,visualstudiocode,gradle

.vscode/launch.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
55
"version": "0.2.0",
66
"configurations": [
7+
{
8+
"type": "java",
9+
"name": "NycTripsBenchmark",
10+
"request": "launch",
11+
"mainClass": "co.clflushopt.glint.NycTripsBenchmark",
12+
"projectName": "glint"
13+
},
714
{
815
"type": "java",
916
"name": "Current File",

glint/pom.xml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,42 @@
6161
<artifactId>mockito-core</artifactId>
6262
<version>5.15.2</version>
6363
</dependency>
64+
<dependency>
65+
<groupId>org.apache.parquet</groupId>
66+
<artifactId>parquet-common</artifactId>
67+
<version>1.15.0</version>
68+
</dependency>
69+
<dependency>
70+
<groupId>org.apache.parquet</groupId>
71+
<artifactId>parquet-encoding</artifactId>
72+
<version>1.15.0</version>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.apache.parquet</groupId>
76+
<artifactId>parquet-column</artifactId>
77+
<version>1.15.0</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>org.apache.parquet</groupId>
81+
<artifactId>parquet-hadoop</artifactId>
82+
<version>1.15.0</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.apache.hadoop</groupId>
86+
<artifactId>hadoop-hdfs</artifactId>
87+
<version>3.3.0</version>
88+
<scope>test</scope>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.apache.hadoop</groupId>
92+
<artifactId>hadoop-common</artifactId>
93+
<version>3.3.0</version>
94+
</dependency>
95+
<dependency>
96+
<groupId>org.apache.hadoop</groupId>
97+
<artifactId>hadoop-mapreduce-client-core</artifactId>
98+
<version>3.3.0</version>
99+
</dependency>
64100
</dependencies>
65101

66102
<build>
Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,77 @@
11
package co.clflushopt.glint;
22

3+
import java.io.FileNotFoundException;
4+
import java.util.Iterator;
5+
6+
import org.apache.arrow.vector.types.pojo.ArrowType;
7+
8+
import co.clflushopt.glint.core.ExecutionContext;
9+
import co.clflushopt.glint.dataframe.DataFrame;
10+
import co.clflushopt.glint.query.logical.expr.AggregateExpr;
11+
import co.clflushopt.glint.query.logical.expr.CastExpr;
12+
import co.clflushopt.glint.query.logical.expr.ColumnExpr;
13+
import co.clflushopt.glint.query.logical.expr.LogicalExpr;
14+
import co.clflushopt.glint.query.logical.plan.LogicalPlan;
15+
import co.clflushopt.glint.query.optimizer.QueryOptimizer;
16+
import co.clflushopt.glint.types.RecordBatch;
17+
318
/**
419
* Hello world!
520
*
621
*/
722
public class App {
823
public static void main(String[] args) {
924
System.out.println("Welcome to the Glint query compiler");
25+
try {
26+
nycTripsBenchmark(args);
27+
} catch (FileNotFoundException e) {
28+
e.printStackTrace();
29+
}
30+
}
31+
32+
public static void nycTripsBenchmark(String[] args) throws FileNotFoundException {
33+
// Create execution context
34+
ExecutionContext ctx = ExecutionContext.create().build();
35+
36+
long startTime = System.currentTimeMillis();
37+
try {
38+
39+
// Create DataFrame and apply transformations
40+
DataFrame df = ctx.readParquet("./datasets/yellow_tripdata_2019-01.parquet", null);
41+
42+
System.out.println("Logical Plan:\t" + LogicalPlan.format(df.getLogicalPlan()));
43+
System.out.println("Schema:\t" + df.getSchema());
44+
45+
// Optimize and execute the plan
46+
LogicalPlan optimizedPlan = QueryOptimizer.optimize(df.getLogicalPlan());
47+
System.out.println("Optimized Plan:\t" + LogicalPlan.format(optimizedPlan));
48+
49+
// Execute and print results
50+
Iterator<RecordBatch> results = ctx.execute(optimizedPlan);
51+
52+
while (results.hasNext()) {
53+
RecordBatch batch = results.next();
54+
System.out.println(batch.getSchema());
55+
System.out.println(batch.toCsv());
56+
57+
}
58+
59+
} finally {
60+
long endTime = System.currentTimeMillis();
61+
System.out.println("Query took " + (endTime - startTime) + " ms");
62+
}
63+
}
64+
65+
// Helper methods for creating expressions
66+
private static LogicalExpr col(String name) {
67+
return new ColumnExpr(name);
68+
}
69+
70+
private static LogicalExpr cast(LogicalExpr expr, ArrowType targetType) {
71+
return new CastExpr(expr, targetType);
72+
}
73+
74+
private static AggregateExpr max(LogicalExpr expr) {
75+
return new AggregateExpr.Max(expr);
1076
}
11-
}
77+
}

glint/src/main/java/co/clflushopt/glint/core/ExecutionContext.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,18 @@
33
import java.io.FileNotFoundException;
44
import java.util.Collections;
55
import java.util.HashMap;
6+
import java.util.Iterator;
67
import java.util.Optional;
78

89
import co.clflushopt.glint.dataframe.DataFrame;
910
import co.clflushopt.glint.dataframe.DataFrameImpl;
1011
import co.clflushopt.glint.datasource.CsvDataSource;
12+
import co.clflushopt.glint.datasource.ParquetDataSource;
13+
import co.clflushopt.glint.query.logical.plan.LogicalPlan;
1114
import co.clflushopt.glint.query.logical.plan.Scan;
15+
import co.clflushopt.glint.query.optimizer.QueryOptimizer;
16+
import co.clflushopt.glint.query.planner.QueryPlanner;
17+
import co.clflushopt.glint.types.RecordBatch;
1218
import co.clflushopt.glint.types.Schema;
1319

1420
public class ExecutionContext {
@@ -22,6 +28,13 @@ private ExecutionContext(HashMap<String, Object> context, Configuration configur
2228
this.config = configuration;
2329
}
2430

31+
public Iterator<RecordBatch> execute(LogicalPlan plan) {
32+
// Implementation to execute logical plan
33+
var optimizedPlan = QueryOptimizer.optimize(plan);
34+
var physicalPlan = QueryPlanner.createPhysicalPlan(optimizedPlan);
35+
return physicalPlan.execute();
36+
}
37+
2538
/**
2639
* Configuration class that encapsulates all execution settings. Uses builder
2740
* pattern for a clean configuration API.
@@ -74,6 +87,32 @@ public DataFrame readCsv(String path, Optional<Schema> schema, CsvReaderOptions
7487
return new DataFrameImpl(new Scan(options.getTableName(), source, Collections.emptyList()));
7588
}
7689

90+
/**
91+
* Creates a DataFrame from a CSV file with default options.
92+
*
93+
*
94+
* @param name
95+
* @param df
96+
*/
97+
public DataFrame readCsv(String path, Schema schema, CsvReaderOptions options)
98+
throws FileNotFoundException {
99+
var source = new CsvDataSource(path, Optional.of(schema), options.hasHeader(),
100+
defaultBatchSize);
101+
return new DataFrameImpl(new Scan(options.getTableName(), source, Collections.emptyList()));
102+
}
103+
104+
/**
105+
* Creates a DataFrame from a Parquet file with the specified options.
106+
*
107+
*
108+
* @param name
109+
* @param df
110+
*/
111+
public DataFrame readParquet(String path, Optional<Schema> schema) {
112+
var source = new ParquetDataSource(path);
113+
return new DataFrameImpl(new Scan("parquet_scan", source, Collections.emptyList()));
114+
}
115+
77116
/**
78117
* Creates a temporary table from a DataFrame for use in subsequent queries.
79118
*/
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package co.clflushopt.glint.datasource;
2+
3+
import java.io.IOException;
4+
import java.util.Collections;
5+
import java.util.Iterator;
6+
import java.util.List;
7+
import java.util.NoSuchElementException;
8+
import java.util.stream.Collectors;
9+
10+
import org.apache.arrow.memory.RootAllocator;
11+
import org.apache.arrow.vector.VectorSchemaRoot;
12+
import org.apache.hadoop.conf.Configuration;
13+
import org.apache.hadoop.fs.Path;
14+
import org.apache.parquet.column.page.PageReadStore;
15+
import org.apache.parquet.hadoop.ParquetFileReader;
16+
import org.apache.parquet.hadoop.util.HadoopInputFile;
17+
18+
import co.clflushopt.glint.types.ArrowFieldVector;
19+
import co.clflushopt.glint.types.RecordBatch;
20+
import co.clflushopt.glint.types.Schema;
21+
import co.clflushopt.glint.types.SchemaConverter;
22+
23+
public class ParquetDataSource implements DataSource {
24+
private final String filename;
25+
26+
public ParquetDataSource(String filename) {
27+
this.filename = filename;
28+
}
29+
30+
@Override
31+
public Schema getSchema() {
32+
try (ParquetScan scan = new ParquetScan(filename, Collections.emptyList())) {
33+
org.apache.arrow.vector.types.pojo.Schema arrowSchema = SchemaConverter
34+
.fromParquet(scan.getSchema()).toArrow();
35+
return SchemaConverter.fromArrow(arrowSchema);
36+
} catch (IOException e) {
37+
throw new RuntimeException("Failed to read schema from Parquet file", e);
38+
}
39+
}
40+
41+
@SuppressWarnings("resource")
42+
@Override
43+
public Iterable<RecordBatch> scan(List<String> projection) {
44+
// Return an Iterable that creates a new ParquetScan each time iterator() is
45+
// called
46+
return () -> {
47+
try {
48+
return new ParquetScan(filename, projection).iterator();
49+
} catch (IOException e) {
50+
throw new RuntimeException("Failed to create ParquetScan", e);
51+
}
52+
};
53+
}
54+
}
55+
56+
class ParquetScan implements AutoCloseable {
57+
private final ParquetFileReader reader;
58+
private final List<String> columns;
59+
private final org.apache.parquet.schema.MessageType schema;
60+
61+
public ParquetScan(String filename, List<String> columns) throws IOException {
62+
this.columns = columns;
63+
this.reader = ParquetFileReader
64+
.open(HadoopInputFile.fromPath(new Path(filename), new Configuration()));
65+
this.schema = reader.getFooter().getFileMetaData().getSchema();
66+
}
67+
68+
public Iterator<RecordBatch> iterator() {
69+
return new ParquetIterator(reader, columns);
70+
}
71+
72+
@Override
73+
public void close() throws IOException {
74+
reader.close();
75+
}
76+
77+
public org.apache.parquet.schema.MessageType getSchema() {
78+
return schema;
79+
}
80+
}
81+
82+
class ParquetIterator implements Iterator<RecordBatch> {
83+
private final ParquetFileReader reader;
84+
private final List<String> projectedColumns;
85+
private final org.apache.parquet.schema.MessageType schema;
86+
private final org.apache.arrow.vector.types.pojo.Schema arrowSchema;
87+
private final org.apache.arrow.vector.types.pojo.Schema projectedArrowSchema;
88+
private RecordBatch batch;
89+
90+
public ParquetIterator(ParquetFileReader reader, List<String> projectedColumns) {
91+
this.reader = reader;
92+
this.projectedColumns = projectedColumns;
93+
this.schema = reader.getFooter().getFileMetaData().getSchema();
94+
this.arrowSchema = SchemaConverter.fromParquet(schema).toArrow();
95+
96+
if (projectedColumns.isEmpty()) {
97+
// Project all columns
98+
this.projectedArrowSchema = arrowSchema;
99+
} else {
100+
// Create projected schema
101+
List<org.apache.arrow.vector.types.pojo.Field> projectedFields = projectedColumns
102+
.stream().map(
103+
name -> arrowSchema.getFields().stream()
104+
.filter(f -> f.getName().equals(name)).findFirst()
105+
.orElseThrow(() -> new IllegalArgumentException(
106+
"Column not found: " + name)))
107+
.collect(Collectors.toList());
108+
109+
this.projectedArrowSchema = new org.apache.arrow.vector.types.pojo.Schema(
110+
projectedFields);
111+
}
112+
113+
}
114+
115+
@Override
116+
public boolean hasNext() {
117+
batch = nextBatch();
118+
return batch != null;
119+
}
120+
121+
@Override
122+
public RecordBatch next() {
123+
if (batch == null) {
124+
throw new NoSuchElementException();
125+
}
126+
RecordBatch result = batch;
127+
batch = null;
128+
return result;
129+
}
130+
131+
private RecordBatch nextBatch() {
132+
try (PageReadStore pages = reader.readNextRowGroup()) {
133+
if (pages == null) {
134+
return null;
135+
}
136+
137+
if (pages.getRowCount() > Integer.MAX_VALUE) {
138+
throw new IllegalStateException("Row count exceeds maximum integer value");
139+
}
140+
141+
int rows = (int) pages.getRowCount();
142+
143+
VectorSchemaRoot root = VectorSchemaRoot.create(projectedArrowSchema,
144+
new RootAllocator(Long.MAX_VALUE));
145+
root.allocateNew();
146+
root.setRowCount(rows);
147+
148+
Schema convertedSchema = SchemaConverter.fromArrow(projectedArrowSchema);
149+
150+
return new RecordBatch(convertedSchema, root.getFieldVectors().stream()
151+
.map(ArrowFieldVector::new).collect(Collectors.toList()));
152+
} catch (IOException e) {
153+
// TODO Auto-generated catch block
154+
e.printStackTrace();
155+
}
156+
return batch;
157+
}
158+
}

glint/src/main/java/co/clflushopt/glint/query/logical/plan/Scan.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public List<LogicalPlan> getChildren() {
4848

4949
private Schema infer() {
5050
var schema = this.dataSource.getSchema();
51+
assert schema != null;
52+
assert schema.getFields().size() > 0;
53+
5154
if (projections.isEmpty()) {
5255
return schema;
5356
}

0 commit comments

Comments
 (0)