-
Notifications
You must be signed in to change notification settings - Fork 16
feat: add list type support to ArrowWriter #149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2edee28
9df5acd
71c47f9
41855ca
8cf201e
df1c4dc
d8b2e93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -146,6 +146,10 @@ public class StructArrayBuilder: ArrowArrayBuilder<StructBufferBuilder, NestedAr | |
| try super.init(ArrowTypeStruct(ArrowType.ArrowStruct, fields: fields)) | ||
| } | ||
|
|
||
| public override func appendAny(_ val: Any?) { | ||
| self.append(val as? [Any?]) | ||
| } | ||
|
|
||
| public override func append(_ values: [Any?]?) { | ||
| self.bufferBuilder.append(values) | ||
| if let anyValues = values { | ||
|
|
@@ -186,6 +190,10 @@ public class ListArrayBuilder: ArrowArrayBuilder<ListBufferBuilder, NestedArray> | |
| try super.init(arrowType) | ||
| } | ||
|
|
||
| public override func appendAny(_ val: Any?) { | ||
| self.append(val as? [Any?]) | ||
| } | ||
|
Comment on lines
+193
to
+195
|
||
|
|
||
| public override func append(_ values: [Any?]?) { | ||
| self.bufferBuilder.append(values) | ||
| if let vals = values { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,13 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length | |
| } | ||
|
|
||
| fieldsOffset = fbb.createVector(ofOffsets: offsets) | ||
| } else if let listField = field.type as? ArrowTypeList { | ||
| switch writeField(&fbb, field: listField.elementField) { | ||
| case .success(let offset): | ||
| fieldsOffset = fbb.createVector(ofOffsets: [offset]) | ||
| case .failure(let error): | ||
| return .failure(error) | ||
| } | ||
| } | ||
|
|
||
| let nameOffset = fbb.create(string: field.name) | ||
|
|
@@ -178,16 +185,23 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length | |
| fbb: inout FlatBufferBuilder) { | ||
| for index in (0 ..< fields.count).reversed() { | ||
| let column = columns[index] | ||
| let fieldNode = | ||
| org_apache_arrow_flatbuf_FieldNode(length: Int64(column.length), | ||
| nullCount: Int64(column.nullCount)) | ||
| offsets.append(fbb.create(struct: fieldNode)) | ||
| // FlatBuffer vectors use prepend semantics: last-written element becomes | ||
| // the first when read. Arrow IPC requires depth-first pre-order (parent | ||
| // before children), so children must be written before their parent here. | ||
| if let nestedType = column.type as? ArrowTypeStruct { | ||
| let nestedArray = column.array as? NestedArray | ||
| if let nestedFields = nestedArray?.fields { | ||
| writeFieldNodes(nestedType.fields, columns: nestedFields, offsets: &offsets, fbb: &fbb) | ||
| } | ||
| } else if let listType = column.type as? ArrowTypeList { | ||
| if let nestedArray = column.array as? NestedArray, let valuesHolder = nestedArray.values { | ||
| writeFieldNodes([listType.elementField], columns: [valuesHolder], offsets: &offsets, fbb: &fbb) | ||
| } | ||
| } | ||
| let fieldNode = | ||
| org_apache_arrow_flatbuf_FieldNode(length: Int64(column.length), | ||
| nullCount: Int64(column.nullCount)) | ||
| offsets.append(fbb.create(struct: fieldNode)) | ||
|
Comment on lines
+188
to
+204
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -204,12 +218,17 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length | |
| let buffer = org_apache_arrow_flatbuf_Buffer(offset: Int64(bufferOffset), length: Int64(bufferDataSize)) | ||
| buffers.append(buffer) | ||
| bufferOffset += bufferDataSize | ||
| if let nestedType = column.type as? ArrowTypeStruct { | ||
| let nestedArray = column.array as? NestedArray | ||
| if let nestedFields = nestedArray?.fields { | ||
| writeBufferInfo(nestedType.fields, columns: nestedFields, | ||
| bufferOffset: &bufferOffset, buffers: &buffers, fbb: &fbb) | ||
| } | ||
| } | ||
| if let nestedType = column.type as? ArrowTypeStruct { | ||
| let nestedArray = column.array as? NestedArray | ||
| if let nestedFields = nestedArray?.fields { | ||
| writeBufferInfo(nestedType.fields, columns: nestedFields, | ||
| bufferOffset: &bufferOffset, buffers: &buffers, fbb: &fbb) | ||
| } | ||
| } else if let listType = column.type as? ArrowTypeList { | ||
| if let nestedArray = column.array as? NestedArray, let valuesHolder = nestedArray.values { | ||
| writeBufferInfo([listType.elementField], columns: [valuesHolder], | ||
| bufferOffset: &bufferOffset, buffers: &buffers, fbb: &fbb) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -263,18 +282,30 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length | |
| for var bufferData in colBufferData { | ||
| addPadForAlignment(&bufferData) | ||
| writer.append(bufferData) | ||
| if let nestedType = column.type as? ArrowTypeStruct { | ||
| guard let nestedArray = column.array as? NestedArray, | ||
| let nestedFields = nestedArray.fields else { | ||
| return .failure(.invalid("Struct type array expected for nested type")) | ||
| } | ||
| } | ||
| if let nestedType = column.type as? ArrowTypeStruct { | ||
| guard let nestedArray = column.array as? NestedArray, | ||
| let nestedFields = nestedArray.fields else { | ||
| return .failure(.invalid("Struct type array expected for nested type")) | ||
| } | ||
|
|
||
| switch writeRecordBatchData(&writer, fields: nestedType.fields, columns: nestedFields) { | ||
| case .success: | ||
| continue | ||
| case .failure(let error): | ||
| return .failure(error) | ||
| } | ||
| switch writeRecordBatchData(&writer, fields: nestedType.fields, columns: nestedFields) { | ||
| case .success: | ||
| continue | ||
| case .failure(let error): | ||
| return .failure(error) | ||
| } | ||
| } else if let listType = column.type as? ArrowTypeList { | ||
| guard let nestedArray = column.array as? NestedArray, | ||
| let valuesHolder = nestedArray.values else { | ||
| return .failure(.invalid("List type array expected with values child")) | ||
| } | ||
|
|
||
| switch writeRecordBatchData(&writer, fields: [listType.elementField], columns: [valuesHolder]) { | ||
| case .success: | ||
| continue | ||
| case .failure(let error): | ||
| return .failure(error) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -341,7 +372,8 @@ public class ArrowWriter { // swiftlint:disable:this type_body_length | |
| public func writeStreaming(_ info: ArrowWriter.Info) -> Result<Data, ArrowError> { | ||
| let writer: any DataWriter = InMemDataWriter() | ||
| switch toMessage(info.schema) { | ||
| case .success(let schemaData): | ||
| case .success(var schemaData): | ||
| addPadForAlignment(&schemaData) | ||
| withUnsafeBytes(of: CONTINUATIONMARKER.littleEndian) {writer.append(Data($0))} | ||
| withUnsafeBytes(of: UInt32(schemaData.count).littleEndian) {writer.append(Data($0))} | ||
| writer.append(schemaData) | ||
|
Comment on lines
372
to
379
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a unit test that exercises
appendAnyonStructArrayBuilderandListArrayBuilder(especiallylist<struct<...>>), since this change fixes a correctness issue whereappendAnypreviously bypassed child-builder distribution. Existing tests coverappendAnyfor primitive builders but not nested builders, so this could regress without targeted coverage.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added testStructArrayBuilderAppendAny and testListOfStructAppendAny