17
17
18
18
package org .apache .baremaps .storage .flatgeobuf ;
19
19
20
- import com .google .flatbuffers .FlatBufferBuilder ;
21
20
import java .io .IOException ;
22
21
import java .nio .ByteBuffer ;
23
22
import java .nio .ByteOrder ;
26
25
import java .nio .file .StandardOpenOption ;
27
26
import java .util .Iterator ;
28
27
import java .util .NoSuchElementException ;
28
+ import java .util .Objects ;
29
29
import org .apache .baremaps .data .collection .DataCollection ;
30
- import org .apache .baremaps .data .storage .DataRow ;
31
- import org .apache .baremaps .data .storage .DataSchema ;
32
- import org .apache .baremaps .data .storage .DataStoreException ;
33
- import org .apache .baremaps .data .storage .DataTable ;
34
- import org .locationtech .jts .geom .*;
35
- import org .wololo .flatgeobuf .Constants ;
36
- import org .wololo .flatgeobuf .GeometryConversions ;
37
- import org .wololo .flatgeobuf .HeaderMeta ;
38
- import org .wololo .flatgeobuf .PackedRTree ;
39
- import org .wololo .flatgeobuf .generated .Feature ;
40
- import org .wololo .flatgeobuf .generated .GeometryType ;
30
+ import org .apache .baremaps .data .storage .*;
31
+ import org .apache .baremaps .data .storage .DataColumn .Type ;
32
+ import org .apache .baremaps .flatgeobuf .FlatGeoBuf ;
33
+ import org .apache .baremaps .flatgeobuf .FlatGeoBufReader ;
34
+ import org .apache .baremaps .flatgeobuf .FlatGeoBufWriter ;
35
+ import org .apache .baremaps .flatgeobuf .PackedRTree ;
41
36
42
37
/**
43
38
* A {@link DataTable} that stores rows in a flatgeobuf file.
@@ -54,8 +49,18 @@ public class FlatGeoBufDataTable implements DataTable {
54
49
* @param file the path to the flatgeobuf file
55
50
*/
56
51
public FlatGeoBufDataTable (Path file ) {
52
+ this (file , readSchema (file ));
53
+ }
54
+
55
+ /**
56
+ * Constructs a table from a flatgeobuf file and a schema (used for writing).
57
+ *
58
+ * @param file the path to the flatgeobuf file
59
+ * @param schema the schema of the table
60
+ */
61
+ public FlatGeoBufDataTable (Path file , DataSchema schema ) {
57
62
this .file = file ;
58
- this .schema = readSchema ( file ) ;
63
+ this .schema = schema ;
59
64
}
60
65
61
66
/**
@@ -65,27 +70,15 @@ public FlatGeoBufDataTable(Path file) {
65
70
* @return the schema of the table
66
71
*/
67
72
private static DataSchema readSchema (Path file ) {
68
- try (var channel = FileChannel .open (file , StandardOpenOption .READ )) {
73
+ try (var reader = new FlatGeoBufReader ( FileChannel .open (file , StandardOpenOption .READ ) )) {
69
74
// try to read the schema from the file
70
- var buffer = ByteBuffer .allocate (1 << 20 ).order (ByteOrder .LITTLE_ENDIAN );
71
- HeaderMeta headerMeta = readHeaderMeta (channel , buffer );
72
- return FlatGeoBufTypeConversion .asSchema (headerMeta );
75
+ var header = reader .readHeader ();
76
+ return FlatGeoBufTypeConversion .asSchema (header );
73
77
} catch (IOException e ) {
74
78
return null ;
75
79
}
76
80
}
77
81
78
- /**
79
- * Constructs a table from a flatgeobuf file and a schema (used for writing).
80
- *
81
- * @param file the path to the flatgeobuf file
82
- * @param schema the schema of the table
83
- */
84
- public FlatGeoBufDataTable (Path file , DataSchema schema ) {
85
- this .file = file ;
86
- this .schema = schema ;
87
- }
88
-
89
82
/**
90
83
* {@inheritDoc}
91
84
*/
@@ -100,22 +93,13 @@ public DataSchema schema() {
100
93
@ Override
101
94
public Iterator <DataRow > iterator () {
102
95
try {
103
- var channel = FileChannel .open (file , StandardOpenOption .READ );
96
+ var reader = new FlatGeoBufReader ( FileChannel .open (file , StandardOpenOption .READ ) );
104
97
105
- var buffer = ByteBuffer .allocate (1 << 20 ).order (ByteOrder .LITTLE_ENDIAN );
106
- HeaderMeta headerMeta = readHeaderMeta (channel , buffer );
107
- channel .position (headerMeta .offset );
108
-
109
- // skip the index
110
- long indexOffset = headerMeta .offset ;
111
- long indexSize =
112
- PackedRTree .calcSize ((int ) headerMeta .featuresCount , headerMeta .indexNodeSize );
113
- channel .position (indexOffset + indexSize );
114
-
115
- buffer .clear ();
98
+ var header = reader .readHeader ();
99
+ reader .skipIndex ();
116
100
117
101
// create the feature stream
118
- return new RowIterator (channel , headerMeta , schema , buffer );
102
+ return new RowIterator (reader , header , schema );
119
103
} catch (IOException e ) {
120
104
throw new DataStoreException (e );
121
105
}
@@ -134,30 +118,14 @@ public void clear() {
134
118
*/
135
119
@ Override
136
120
public long size () {
137
- try (var channel = FileChannel .open (file , StandardOpenOption .READ )) {
138
- var buffer = ByteBuffer .allocate (1 << 20 ).order (ByteOrder .LITTLE_ENDIAN );
139
- HeaderMeta headerMeta = readHeaderMeta (channel , buffer );
140
- return headerMeta .featuresCount ;
121
+ try (var reader = new FlatGeoBufReader (FileChannel .open (file , StandardOpenOption .READ ))) {
122
+ FlatGeoBuf .Header header = reader .readHeader ();
123
+ return header .featuresCount ();
141
124
} catch (IOException e ) {
142
125
throw new DataStoreException (e );
143
126
}
144
127
}
145
128
146
- /**
147
- * Reads the header meta from a channel.
148
- *
149
- * @param channel the channel to read from
150
- * @param buffer the buffer to use
151
- * @return the header meta
152
- * @throws IOException if an error occurs while reading the header meta
153
- */
154
- private static HeaderMeta readHeaderMeta (SeekableByteChannel channel , ByteBuffer buffer )
155
- throws IOException {
156
- channel .read (buffer );
157
- buffer .flip ();
158
- return HeaderMeta .read (buffer );
159
- }
160
-
161
129
/**
162
130
* Writes a collection of rows to a flatgeobuf file.
163
131
*
@@ -166,68 +134,45 @@ private static HeaderMeta readHeaderMeta(SeekableByteChannel channel, ByteBuffer
166
134
*/
167
135
public void write (DataCollection <DataRow > features ) throws IOException {
168
136
try (
169
- var channel = FileChannel .open (file , StandardOpenOption .CREATE , StandardOpenOption .WRITE );
170
- var outputStream = Channels .newOutputStream (channel )) {
171
- outputStream .write (Constants .MAGIC_BYTES );
172
-
173
- var bufferBuilder = new FlatBufferBuilder ();
174
-
175
- var headerMeta = new HeaderMeta ();
176
- headerMeta .geometryType = GeometryType .Unknown ;
177
- headerMeta .indexNodeSize = 16 ;
178
- headerMeta .srid = 3857 ;
179
- headerMeta .featuresCount = features .size ();
180
- headerMeta .name = schema .name ();
181
- headerMeta .columns = FlatGeoBufTypeConversion .asColumns (schema .columns ());
182
- HeaderMeta .write (headerMeta , outputStream , bufferBuilder );
137
+ var writer = new FlatGeoBufWriter (
138
+ FileChannel .open (file , StandardOpenOption .CREATE , StandardOpenOption .WRITE ))) {
139
+
140
+ schema .columns ().stream ()
141
+ .filter (c -> c .cardinality () == DataColumn .Cardinality .REQUIRED )
142
+ .forEach (c -> {
143
+ if (Objects .requireNonNull (c .type ()) == Type .BINARY ) {
144
+ throw new UnsupportedOperationException ();
145
+ }
146
+ });
147
+
148
+ var header = new FlatGeoBuf .Header (
149
+ schema .name (),
150
+ null ,
151
+ FlatGeoBuf .GeometryType .UNKNOWN ,
152
+ false ,
153
+ false ,
154
+ false ,
155
+ false ,
156
+ FlatGeoBufTypeConversion .asColumns (schema .columns ()),
157
+ features .size (),
158
+ 2 ,
159
+ null ,
160
+ null ,
161
+ null ,
162
+ null );
163
+
164
+ writer .writeHeader (header );
183
165
184
166
var indexSize =
185
- (int ) PackedRTree .calcSize ((int ) headerMeta .featuresCount , headerMeta .indexNodeSize );
167
+ (int ) PackedRTree .calcSize ((int ) header .featuresCount (), header .indexNodeSize () );
186
168
187
- for (int i = 0 ; i < indexSize ; i ++) {
188
- outputStream .write (0 );
189
- }
169
+ writer .writeIndexBuffer (ByteBuffer .allocate (indexSize ).order (ByteOrder .LITTLE_ENDIAN ));
190
170
191
171
var iterator = features .iterator ();
192
172
while (iterator .hasNext ()) {
193
- var featureBuilder = new FlatBufferBuilder (4096 );
194
-
195
173
var row = iterator .next ();
196
-
197
- var propertiesBuffer = ByteBuffer .allocate (1 << 20 ).order (ByteOrder .LITTLE_ENDIAN );
198
- var properties = row .values ().stream ()
199
- .filter (v -> !(v instanceof Geometry ))
200
- .toList ();
201
- for (int i = 0 ; i < properties .size (); i ++) {
202
- var column = headerMeta .columns .get (i );
203
- var value = properties .get (i );
204
- propertiesBuffer .putShort ((short ) i );
205
- FlatGeoBufTypeConversion .writeValue (propertiesBuffer , column , value );
206
- }
207
- if (propertiesBuffer .position () > 0 ) {
208
- propertiesBuffer .flip ();
209
- }
210
- var propertiesOffset = org .wololo .flatgeobuf .generated .Feature
211
- .createPropertiesVector (featureBuilder , propertiesBuffer );
212
-
213
- var geometry = row .values ().stream ()
214
- .filter (v -> v instanceof Geometry )
215
- .map (Geometry .class ::cast )
216
- .findFirst ();
217
-
218
- var geometryOffset = geometry .isPresent ()
219
- ? GeometryConversions .serialize (featureBuilder , geometry .get (), headerMeta .geometryType )
220
- : 0 ;
221
-
222
- var featureOffset =
223
- org .wololo .flatgeobuf .generated .Feature .createFeature (featureBuilder , geometryOffset ,
224
- propertiesOffset , 0 );
225
- featureBuilder .finishSizePrefixed (featureOffset );
226
-
227
- ByteBuffer data = featureBuilder .dataBuffer ();
228
- while (data .hasRemaining ()) {
229
- channel .write (data );
230
- }
174
+ var feature = FlatGeoBufTypeConversion .asFeature (row );
175
+ writer .writeFeature (feature );
231
176
}
232
177
}
233
178
}
@@ -237,38 +182,36 @@ public void write(DataCollection<DataRow> features) throws IOException {
237
182
*/
238
183
public static class RowIterator implements Iterator <DataRow > {
239
184
240
- private final HeaderMeta headerMeta ;
185
+ private final FlatGeoBuf . Header header ;
241
186
242
187
private final DataSchema schema ;
243
188
244
- private final SeekableByteChannel channel ;
245
-
246
- private final ByteBuffer buffer ;
189
+ private final FlatGeoBufReader reader ;
247
190
248
191
private long cursor = 0 ;
249
192
250
193
/**
251
194
* Constructs a row iterator.
252
195
*
253
- * @param channel the channel to read from
254
- * @param headerMeta the header meta
196
+ * @param reader the channel to read from
197
+ * @param header the header of the file
255
198
* @param schema the schema of the table
256
- * @param buffer the buffer to use
257
199
*/
258
- public RowIterator (SeekableByteChannel channel , HeaderMeta headerMeta ,
259
- DataSchema schema , ByteBuffer buffer ) {
260
- this .channel = channel ;
261
- this .headerMeta = headerMeta ;
200
+ public RowIterator (
201
+ FlatGeoBufReader reader ,
202
+ FlatGeoBuf .Header header ,
203
+ DataSchema schema ) {
204
+ this .reader = reader ;
205
+ this .header = header ;
262
206
this .schema = schema ;
263
- this .buffer = buffer ;
264
207
}
265
208
266
209
/**
267
210
* {@inheritDoc}
268
211
*/
269
212
@ Override
270
213
public boolean hasNext () {
271
- return cursor < headerMeta .featuresCount ;
214
+ return cursor < header .featuresCount () ;
272
215
}
273
216
274
217
/**
@@ -277,19 +220,9 @@ public boolean hasNext() {
277
220
@ Override
278
221
public DataRow next () {
279
222
try {
280
- channel .read (buffer );
281
- buffer .flip ();
282
-
283
- var featureSize = buffer .getInt ();
284
- var row =
285
- FlatGeoBufTypeConversion .asRow (headerMeta , schema , Feature .getRootAsFeature (buffer ));
286
-
287
- buffer .position (Integer .BYTES + featureSize );
288
- buffer .compact ();
289
-
223
+ var feature = reader .readFeature ();
290
224
cursor ++;
291
-
292
- return row ;
225
+ return FlatGeoBufTypeConversion .asRow (schema , feature );
293
226
} catch (IOException e ) {
294
227
throw new NoSuchElementException (e );
295
228
}
0 commit comments