Skip to content

Commit 77eead6

Browse files
feat: add option in csv readers to clean and null empty strings (#64)
1 parent c8eead4 commit 77eead6

File tree

4 files changed

+159
-2
lines changed

4 files changed

+159
-2
lines changed

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def __init__(
4646
field_check: bool = False,
4747
field_check_error_code: Optional[str] = "ExpectedVsActualFieldMismatch",
4848
field_check_error_message: Optional[str] = "The submitted header is missing fields",
49+
null_empty_strings: bool = False,
4950
**_,
5051
):
5152
self.header = header
@@ -55,6 +56,7 @@ def __init__(
5556
self.field_check = field_check
5657
self.field_check_error_code = field_check_error_code
5758
self.field_check_error_message = field_check_error_message
59+
self.null_empty_strings = null_empty_strings
5860

5961
super().__init__()
6062

@@ -109,7 +111,16 @@ def read_to_relation( # pylint: disable=unused-argument
109111
}
110112

111113
reader_options["columns"] = ddb_schema
112-
return read_csv(resource, **reader_options)
114+
rel = read_csv(resource, **reader_options)
115+
116+
if self.null_empty_strings:
117+
cleaned_cols = ",".join([
118+
f"NULLIF(TRIM({c}), '') as {c}"
119+
for c in reader_options["columns"].keys()
120+
])
121+
rel = rel.select(cleaned_cols)
122+
123+
return rel
113124

114125

115126
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
@@ -147,6 +158,9 @@ def read_to_relation( # pylint: disable=unused-argument
147158
# redundant
148159
df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612
149160

161+
if self.null_empty_strings:
162+
df = df.select([pl.col(c).str.strip_chars().replace("", None) for c in df.columns])
163+
150164
return ddb.sql("SELECT * FROM df")
151165

152166

src/dve/core_engine/backends/implementations/spark/readers/csv.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections.abc import Iterator
44
from typing import Any, Optional
55

6+
import pyspark.sql.functions as psf
67
from pydantic import BaseModel
78
from pyspark.sql import DataFrame, SparkSession
89
from pyspark.sql.types import StructType
@@ -30,6 +31,7 @@ def __init__(
3031
header: bool = True,
3132
multi_line: bool = False,
3233
encoding: str = "utf-8-sig",
34+
null_empty_strings: bool = False,
3335
spark_session: Optional[SparkSession] = None,
3436
**_,
3537
) -> None:
@@ -40,6 +42,7 @@ def __init__(
4042
self.quote_char = quote_char
4143
self.header = header
4244
self.multi_line = multi_line
45+
self.null_empty_strings = null_empty_strings
4346
self.spark_session = spark_session if spark_session else SparkSession.builder.getOrCreate() # type: ignore # pylint: disable=C0301
4447

4548
super().__init__()
@@ -70,8 +73,16 @@ def read_to_dataframe(
7073
"multiLine": self.multi_line,
7174
}
7275

73-
return (
76+
df = (
7477
self.spark_session.read.format("csv")
7578
.options(**kwargs) # type: ignore
7679
.load(resource, schema=spark_schema)
7780
)
81+
82+
if self.null_empty_strings:
83+
df = df.select(*[
84+
psf.trim(psf.col(c.name)).alias(c.name)
85+
for c in spark_schema.fields
86+
]).replace("", None)
87+
88+
return df

tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from pathlib import Path
33
from tempfile import TemporaryDirectory
44

5+
import polars as pl
56
import pytest
67
from duckdb import DuckDBPyRelation, default_connection
78
from pydantic import BaseModel
@@ -33,6 +34,10 @@ class SimpleHeaderModel(BaseModel):
3334
header_2: str
3435

3536

37+
class VerySimpleModel(BaseModel):
38+
test_col: str
39+
40+
3641
@pytest.fixture
3742
def temp_dir():
3843
with TemporaryDirectory(prefix="ddb_test_csv_reader") as temp_dir:
@@ -157,3 +162,74 @@ def test_ddb_csv_repeating_header_reader_with_more_than_one_set_of_distinct_valu
157162

158163
with pytest.raises(MessageBearingError):
159164
reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)
165+
166+
167+
def test_DuckDBCSVReader_with_null_empty_strings(temp_dir):
168+
test_df = pl.DataFrame({"test_col": ["fine", " ", " "]})
169+
file_uri = temp_dir.joinpath("test_empty_string1.csv").as_posix()
170+
test_df.write_csv(
171+
file_uri,
172+
include_header=True,
173+
quote_style="always"
174+
)
175+
176+
reader = DuckDBCSVReader(
177+
header=True,
178+
delim=",",
179+
quotechar='"',
180+
connection=default_connection,
181+
null_empty_strings=True,
182+
)
183+
184+
entity = reader.read_to_relation(file_uri, "test", VerySimpleModel)
185+
186+
assert entity.shape[0] == 3
187+
assert entity.filter("test_col IS NULL").shape[0] == 2
188+
189+
190+
def test_DuckDBCSVRepeatingHeaderReader_with_null_empty_strings(temp_dir):
191+
test_df = pl.DataFrame({
192+
"header_1": ["fine",], "header_2": [" "],
193+
})
194+
file_uri = temp_dir.joinpath("test_empty_string2.csv").as_posix()
195+
test_df.write_csv(
196+
file_uri,
197+
include_header=True,
198+
quote_style="always"
199+
)
200+
201+
reader = DuckDBCSVRepeatingHeaderReader(
202+
header=True,
203+
delim=",",
204+
quotechar='"',
205+
connection=default_connection,
206+
null_empty_strings=True,
207+
)
208+
209+
entity = reader.read_to_relation(file_uri, "test", SimpleHeaderModel)
210+
211+
assert entity.shape[0] == 1
212+
assert entity.filter("header_2 IS NULL").shape[0] == 1
213+
214+
215+
def test_PolarsToDuckDBCSVReader_with_null_empty_strings(temp_dir):
216+
test_df = pl.DataFrame({"test_col": ["fine", " ", " "]})
217+
file_uri = temp_dir.joinpath("test_empty_string3.csv").as_posix()
218+
test_df.write_csv(
219+
file_uri,
220+
include_header=True,
221+
quote_style="always"
222+
)
223+
224+
reader = PolarsToDuckDBCSVReader(
225+
header=True,
226+
delim=",",
227+
quotechar='"',
228+
connection=default_connection,
229+
null_empty_strings=True,
230+
)
231+
232+
entity = reader.read_to_relation(file_uri, "test", VerySimpleModel)
233+
234+
assert entity.shape[0] == 3
235+
assert entity.filter("test_col IS NULL").shape[0] == 2
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""Test Spark readers"""
2+
3+
# pylint: disable=W0621
4+
# pylint: disable=C0116
5+
# pylint: disable=C0103
6+
# pylint: disable=C0115
7+
8+
import tempfile
9+
from pathlib import Path
10+
11+
import polars as pl
12+
import pytest
13+
from pydantic import BaseModel
14+
from pyspark.sql import DataFrame, Row, SparkSession
15+
from pyspark.sql.types import StringType, StructField, StructType
16+
17+
from dve.core_engine.backends.implementations.spark.readers.csv import SparkCSVReader
18+
19+
20+
class SparkCSVTestModel(BaseModel):
21+
test_col: str
22+
23+
24+
@pytest.fixture
25+
def spark_null_csv_resource():
26+
test_df = pl.DataFrame({"test_col": ["fine", " ", " "]})
27+
28+
with tempfile.TemporaryDirectory() as tdir:
29+
resource_uri = Path(tdir, "test_spark_csv_reader.csv").as_posix()
30+
test_df.write_csv(resource_uri, include_header=True, quote_style="always")
31+
32+
yield resource_uri
33+
34+
35+
def test_SparkCSVReader_clean_empty_strings(spark: SparkSession, spark_null_csv_resource):
36+
resource_uri = spark_null_csv_resource
37+
expected_df = spark.createDataFrame(
38+
[
39+
Row(
40+
test_col="fine",
41+
),
42+
Row(
43+
test_col=None,
44+
),
45+
Row(test_col=None),
46+
],
47+
StructType([StructField("test_field", StringType())]),
48+
)
49+
50+
reader = SparkCSVReader(null_empty_strings=True, spark_session=spark)
51+
52+
result_df: DataFrame = reader.read_to_dataframe(
53+
resource=resource_uri, entity_name="test", schema=SparkCSVTestModel
54+
)
55+
56+
assert result_df.exceptAll(expected_df).count() == 0

0 commit comments

Comments
 (0)