15
15
// specific language governing permissions and limitations
16
16
// under the License.
17
17
18
+ use std:: collections:: HashMap ;
18
19
use std:: sync:: Arc ;
19
20
20
- use arrow_array:: builder:: { MapBuilder , PrimitiveBuilder , StringBuilder } ;
21
- use arrow_array:: types:: { Int64Type , TimestampMillisecondType } ;
21
+ use arrow_array:: builder:: { MapBuilder , MapFieldNames , PrimitiveBuilder , StringBuilder } ;
22
+ use arrow_array:: types:: { Int64Type , TimestampMicrosecondType } ;
22
23
use arrow_array:: RecordBatch ;
23
- use arrow_schema:: { DataType , Field , Schema , TimeUnit } ;
24
+ use arrow_schema:: { DataType , Field } ;
24
25
use futures:: { stream, StreamExt } ;
26
+ use parquet:: arrow:: PARQUET_FIELD_ID_META_KEY ;
25
27
28
+ use crate :: arrow:: { schema_to_arrow_schema, DEFAULT_MAP_FIELD_NAME } ;
26
29
use crate :: scan:: ArrowRecordBatchStream ;
30
+ use crate :: spec:: {
31
+ MapType , NestedField , PrimitiveType , Type , MAP_KEY_FIELD_NAME , MAP_VALUE_FIELD_NAME ,
32
+ } ;
27
33
use crate :: table:: Table ;
28
34
use crate :: Result ;
29
35
@@ -38,51 +44,72 @@ impl<'a> SnapshotsTable<'a> {
38
44
Self { table }
39
45
}
40
46
41
- /// Returns the schema of the snapshots table.
42
- pub fn schema ( & self ) -> Schema {
43
- Schema :: new ( vec ! [
44
- Field :: new(
47
+ /// Returns the iceberg schema of the snapshots table.
48
+ pub fn schema ( & self ) -> crate :: spec:: Schema {
49
+ let fields = vec ! [
50
+ NestedField :: required(
51
+ 1 ,
45
52
"committed_at" ,
46
- DataType :: Timestamp ( TimeUnit :: Millisecond , Some ( "+00:00" . into( ) ) ) ,
47
- false ,
53
+ Type :: Primitive ( PrimitiveType :: Timestamptz ) ,
48
54
) ,
49
- Field :: new( "snapshot_id" , DataType :: Int64 , false ) ,
50
- Field :: new( "parent_id" , DataType :: Int64 , true ) ,
51
- Field :: new( "operation" , DataType :: Utf8 , false ) ,
52
- Field :: new( "manifest_list" , DataType :: Utf8 , false ) ,
53
- Field :: new(
55
+ NestedField :: required( 2 , "snapshot_id" , Type :: Primitive ( PrimitiveType :: Long ) ) ,
56
+ NestedField :: optional( 3 , "parent_id" , Type :: Primitive ( PrimitiveType :: Long ) ) ,
57
+ NestedField :: optional( 4 , "operation" , Type :: Primitive ( PrimitiveType :: String ) ) ,
58
+ NestedField :: optional( 5 , "manifest_list" , Type :: Primitive ( PrimitiveType :: String ) ) ,
59
+ NestedField :: optional(
60
+ 6 ,
54
61
"summary" ,
55
- DataType :: Map (
56
- Arc :: new( Field :: new(
57
- "entries" ,
58
- DataType :: Struct (
59
- vec![
60
- Field :: new( "keys" , DataType :: Utf8 , false ) ,
61
- Field :: new( "values" , DataType :: Utf8 , true ) ,
62
- ]
63
- . into( ) ,
64
- ) ,
62
+ Type :: Map ( MapType {
63
+ key_field: Arc :: new( NestedField :: map_key_element(
64
+ 7 ,
65
+ Type :: Primitive ( PrimitiveType :: String ) ,
66
+ ) ) ,
67
+ value_field: Arc :: new( NestedField :: map_value_element(
68
+ 8 ,
69
+ Type :: Primitive ( PrimitiveType :: String ) ,
65
70
false ,
66
71
) ) ,
67
- false ,
68
- ) ,
69
- false ,
72
+ } ) ,
70
73
) ,
71
- ] )
74
+ ] ;
75
+ crate :: spec:: Schema :: builder ( )
76
+ . with_fields ( fields. into_iter ( ) . map ( |f| f. into ( ) ) )
77
+ . build ( )
78
+ . unwrap ( )
72
79
}
73
80
74
81
/// Scans the snapshots table.
75
82
pub async fn scan ( & self ) -> Result < ArrowRecordBatchStream > {
83
+ let schema = schema_to_arrow_schema ( & self . schema ( ) ) ?;
84
+
76
85
let mut committed_at =
77
- PrimitiveBuilder :: < TimestampMillisecondType > :: new ( ) . with_timezone ( "+00:00" ) ;
86
+ PrimitiveBuilder :: < TimestampMicrosecondType > :: new ( ) . with_timezone ( "+00:00" ) ;
78
87
let mut snapshot_id = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
79
88
let mut parent_id = PrimitiveBuilder :: < Int64Type > :: new ( ) ;
80
89
let mut operation = StringBuilder :: new ( ) ;
81
90
let mut manifest_list = StringBuilder :: new ( ) ;
82
- let mut summary = MapBuilder :: new ( None , StringBuilder :: new ( ) , StringBuilder :: new ( ) ) ;
83
-
91
+ let mut summary = MapBuilder :: new (
92
+ Some ( MapFieldNames {
93
+ entry : DEFAULT_MAP_FIELD_NAME . to_string ( ) ,
94
+ key : MAP_KEY_FIELD_NAME . to_string ( ) ,
95
+ value : MAP_VALUE_FIELD_NAME . to_string ( ) ,
96
+ } ) ,
97
+ StringBuilder :: new ( ) ,
98
+ StringBuilder :: new ( ) ,
99
+ )
100
+ . with_keys_field ( Arc :: new (
101
+ Field :: new ( MAP_KEY_FIELD_NAME , DataType :: Utf8 , false ) . with_metadata ( HashMap :: from ( [ (
102
+ PARQUET_FIELD_ID_META_KEY . to_string ( ) ,
103
+ "7" . to_string ( ) ,
104
+ ) ] ) ) ,
105
+ ) )
106
+ . with_values_field ( Arc :: new (
107
+ Field :: new ( MAP_VALUE_FIELD_NAME , DataType :: Utf8 , true ) . with_metadata ( HashMap :: from ( [
108
+ ( PARQUET_FIELD_ID_META_KEY . to_string ( ) , "8" . to_string ( ) ) ,
109
+ ] ) ) ,
110
+ ) ) ;
84
111
for snapshot in self . table . metadata ( ) . snapshots ( ) {
85
- committed_at. append_value ( snapshot. timestamp_ms ( ) ) ;
112
+ committed_at. append_value ( snapshot. timestamp_ms ( ) * 1000 ) ;
86
113
snapshot_id. append_value ( snapshot. snapshot_id ( ) ) ;
87
114
parent_id. append_option ( snapshot. parent_snapshot_id ( ) ) ;
88
115
manifest_list. append_value ( snapshot. manifest_list ( ) ) ;
@@ -94,7 +121,7 @@ impl<'a> SnapshotsTable<'a> {
94
121
summary. append ( true ) ?;
95
122
}
96
123
97
- let batch = RecordBatch :: try_new ( Arc :: new ( self . schema ( ) ) , vec ! [
124
+ let batch = RecordBatch :: try_new ( Arc :: new ( schema) , vec ! [
98
125
Arc :: new( committed_at. finish( ) ) ,
99
126
Arc :: new( snapshot_id. finish( ) ) ,
100
127
Arc :: new( parent_id. finish( ) ) ,
@@ -123,14 +150,14 @@ mod tests {
123
150
check_record_batches (
124
151
batch_stream,
125
152
expect ! [ [ r#"
126
- Field { name: "committed_at", data_type: Timestamp(Millisecond , Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
127
- Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
128
- Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
129
- Field { name: "operation", data_type: Utf8, nullable: false , dict_id: 0, dict_is_ordered: false, metadata: {} },
130
- Field { name: "manifest_list", data_type: Utf8, nullable: false , dict_id: 0, dict_is_ordered: false, metadata: {} },
131
- Field { name: "summary", data_type: Map(Field { name: "entries ", data_type: Struct([Field { name: "keys ", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "values ", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: false , dict_id: 0, dict_is_ordered: false, metadata: {} }"# ] ] ,
153
+ Field { name: "committed_at", data_type: Timestamp(Microsecond , Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1" } },
154
+ Field { name: "snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2" } },
155
+ Field { name: "parent_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3" } },
156
+ Field { name: "operation", data_type: Utf8, nullable: true , dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4" } },
157
+ Field { name: "manifest_list", data_type: Utf8, nullable: true , dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5" } },
158
+ Field { name: "summary", data_type: Map(Field { name: "key_value ", data_type: Struct([Field { name: "key ", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7" } }, Field { name: "value ", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8" } }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true , dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6" } }"# ] ] ,
132
159
expect ! [ [ r#"
133
- committed_at: PrimitiveArray<Timestamp(Millisecond , Some("+00:00"))>
160
+ committed_at: PrimitiveArray<Timestamp(Microsecond , Some("+00:00"))>
134
161
[
135
162
2018-01-04T21:22:35.770+00:00,
136
163
2019-04-12T20:29:15.770+00:00,
@@ -158,11 +185,11 @@ mod tests {
158
185
[
159
186
]
160
187
[
161
- -- child 0: "keys " (Utf8)
188
+ -- child 0: "key " (Utf8)
162
189
StringArray
163
190
[
164
191
]
165
- -- child 1: "values " (Utf8)
192
+ -- child 1: "value " (Utf8)
166
193
StringArray
167
194
[
168
195
]
@@ -172,11 +199,11 @@ mod tests {
172
199
[
173
200
]
174
201
[
175
- -- child 0: "keys " (Utf8)
202
+ -- child 0: "key " (Utf8)
176
203
StringArray
177
204
[
178
205
]
179
- -- child 1: "values " (Utf8)
206
+ -- child 1: "value " (Utf8)
180
207
StringArray
181
208
[
182
209
]
0 commit comments