Skip to content

Commit e114fca

Browse files
committed
struct parsing for duckdb working!
1 parent 6539830 commit e114fca

File tree

2 files changed

+79
-7
lines changed
  • dataframe-jdbc/src
    • main/kotlin/org/jetbrains/kotlinx/dataframe/io/db
    • test/kotlin/org/jetbrains/kotlinx/dataframe/io/local

2 files changed

+79
-7
lines changed

dataframe-jdbc/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/db/DuckDb.kt

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,21 @@ import org.duckdb.DuckDBColumnType.UUID
4242
import org.duckdb.DuckDBColumnType.VARCHAR
4343
import org.duckdb.DuckDBResultSetMetaData
4444
import org.duckdb.JsonNode
45+
import org.jetbrains.kotlinx.dataframe.AnyFrame
46+
import org.jetbrains.kotlinx.dataframe.AnyRow
47+
import org.jetbrains.kotlinx.dataframe.DataColumn
4548
import org.jetbrains.kotlinx.dataframe.DataFrame
49+
import org.jetbrains.kotlinx.dataframe.DataRow
50+
import org.jetbrains.kotlinx.dataframe.api.Infer
51+
import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
52+
import org.jetbrains.kotlinx.dataframe.api.asDataColumn
53+
import org.jetbrains.kotlinx.dataframe.api.cast
54+
import org.jetbrains.kotlinx.dataframe.api.castToNotNullable
55+
import org.jetbrains.kotlinx.dataframe.api.first
56+
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
57+
import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup
58+
import org.jetbrains.kotlinx.dataframe.impl.DataCollector
59+
import org.jetbrains.kotlinx.dataframe.impl.schema.DataFrameSchemaImpl
4660
import org.jetbrains.kotlinx.dataframe.io.DbConnectionConfig
4761
import org.jetbrains.kotlinx.dataframe.io.readAllSqlTables
4862
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
@@ -56,6 +70,7 @@ import java.sql.ResultSet
5670
import java.sql.Struct
5771
import java.util.Properties
5872
import kotlin.collections.toList
73+
import kotlin.reflect.KClass
5974
import kotlin.reflect.KTypeProjection
6075
import kotlin.reflect.full.createType
6176
import kotlin.reflect.full.withNullability
@@ -100,7 +115,7 @@ public object DuckDb : DbType("duckdb") {
100115
*/
101116
internal fun parseDuckDbType(sqlTypeName: String, isNullable: Boolean): AnyTypeInformation =
102117
duckDbTypeCache.getOrPut(Pair(sqlTypeName, isNullable)) {
103-
when (DuckDBResultSetMetaData.TypeNameToType(sqlTypeName)) {
118+
return@getOrPut when (DuckDBResultSetMetaData.TypeNameToType(sqlTypeName)) {
104119
BOOLEAN -> typeInformationForValueColumnOf<Boolean>(isNullable)
105120

106121
TINYINT -> typeInformationForValueColumnOf<Byte>(isNullable)
@@ -182,7 +197,6 @@ public object DuckDb : DbType("duckdb") {
182197
}
183198

184199
LIST, ARRAY -> {
185-
// TODO requires #1266 and #1273 for specific types
186200
val listType = parseListType(sqlTypeName)
187201
val parsedListType =
188202
parseDuckDbType(listType, true).castToAny()
@@ -206,11 +220,46 @@ public object DuckDb : DbType("duckdb") {
206220
}
207221
}
208222

209-
// TODO requires #1266 for specific types
210223
STRUCT -> {
211-
val structTypes = parseStructType(sqlTypeName)
224+
val structEntries = parseStructType(sqlTypeName)
225+
val parsedStructEntries = structEntries.mapValues { (_, type) ->
226+
parseDuckDbType(sqlTypeName = type, isNullable = true)
227+
}
212228

213-
typeInformationForValueColumnOf<Struct>(isNullable)
229+
val targetSchema = ColumnSchema.Group(
230+
schema = DataFrameSchemaImpl(parsedStructEntries.mapValues { it.value.targetSchema }),
231+
contentType = typeOf<Any?>(),
232+
)
233+
234+
typeInformationWithProcessingFor<Struct, Map<String, Any?>, DataRow<*>>(
235+
jdbcSourceType = typeOf<Struct>().withNullability(isNullable),
236+
targetSchema = targetSchema,
237+
valuePreprocessor = { struct, _ ->
238+
// NOTE DataRows cannot be `null` in DataFrame, instead, all its fields become `null`
239+
if (struct == null) {
240+
parsedStructEntries.mapValues { null }
241+
} else {
242+
// read data from the struct
243+
val attrs = struct.getAttributes(
244+
parsedStructEntries.mapValues {
245+
(it.value.jdbcSourceType.classifier!! as KClass<*>).java
246+
},
247+
)
248+
249+
// and potentially, preprocess each value individually
250+
parsedStructEntries.entries.withIndex().associate { (i, entry) ->
251+
entry.key to entry.value.castToAny().preprocess(attrs[i])
252+
}
253+
}
254+
},
255+
columnPostprocessor = { col, _ ->
256+
col.castToNotNullable()
257+
.values()
258+
.toDataFrame()
259+
.asColumnGroup(col.name())
260+
.asDataColumn()
261+
},
262+
)
214263
}
215264

216265
// Cannot handle this in Kotlin
@@ -222,6 +271,25 @@ public object DuckDb : DbType("duckdb") {
222271
}
223272
}
224273

274+
// Overriding buildDataColumn behavior so we can create the column group in post-processing for efficiency
275+
override fun <D : Any> buildDataColumn(
276+
name: String,
277+
values: List<D?>,
278+
typeInformation: TypeInformation<*, D, *>,
279+
inferNullability: Boolean,
280+
): DataColumn<D?> =
281+
when (val schema = typeInformation.targetSchema) {
282+
is ColumnSchema.Group ->
283+
DataColumn.createValueColumn(
284+
name = name,
285+
values = values,
286+
infer = if (inferNullability) Infer.Nulls else Infer.None,
287+
type = schema.type,
288+
)
289+
290+
else -> super.buildDataColumn(name, values, typeInformation, inferNullability)
291+
}
292+
225293
private fun SqlArray.toList(): List<Any?> =
226294
when (val array = this.array) {
227295
is IntArray -> array.toList()

dataframe-jdbc/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/local/duckDbTest.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,13 @@ class DuckDbTest {
255255
}
256256
}
257257

258+
@DataSchema
259+
data class NestedEntry(val i: Int, val j: String)
260+
258261
@DataSchema
259262
data class NestedTypes(
260263
@ColumnName("ijstruct_col")
261-
val ijstructCol: java.sql.Struct, // TODO
264+
val ijstructCol: NestedEntry, // TODO
262265
@ColumnName("intarray_col")
263266
val intarrayCol: List<Int?>,
264267
@ColumnName("intlist_col")
@@ -646,7 +649,8 @@ class DuckDbTest {
646649
1 to mapOf("value1" to "a", "value2" to "b"),
647650
200 to mapOf("value1" to "c", "value2" to "d"),
648651
)
649-
it[{ "ijstruct_col"<java.sql.Struct>() }].attributes shouldBe arrayOf<Any>(42, "answer")
652+
it[{ "ijstruct_col"["i"]<Int>() }] shouldBe 42
653+
it[{ "ijstruct_col"["j"]<String>() }] shouldBe "answer"
650654
it["union_col"] shouldBe 2
651655
}
652656
}

0 commit comments

Comments
 (0)