Skip to content

Commit 675e6aa

Browse files
authored
feat: add row index (#57)
* feat: add row index at file transform, if not using file transform add during dataa contract or business rules (but likely non-deterministic) * feat: integrate record index into error report * style: sort formating, linting and static typing * build: revert pyproject.toml and poetry.lock to v0.6.2
1 parent 18a84d0 commit 675e6aa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+333
-202
lines changed

src/dve/core_engine/backends/base/backend.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def apply(
163163
return entities, get_parent(processing_errors_uri), successful
164164

165165
for entity_name, entity in entities.items():
166-
entities[entity_name] = self.step_implementations.add_row_id(entity)
166+
entities[entity_name] = self.step_implementations.add_record_index(entity)
167167

168168
# TODO: Handle entity manager creation errors.
169169
entity_manager = EntityManager(entities, reference_data)
@@ -172,9 +172,6 @@ def apply(
172172
# TODO: and return uri to errors
173173
_ = self.step_implementations.apply_rules(working_dir, entity_manager, rule_metadata)
174174

175-
for entity_name, entity in entity_manager.entities.items():
176-
entity_manager.entities[entity_name] = self.step_implementations.drop_row_id(entity)
177-
178175
return entity_manager.entities, get_parent(dc_feedback_errors_uri), True
179176

180177
def process(

src/dve/core_engine/backends/base/contract.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,9 @@ def read_raw_entities(
337337
successful = True
338338
for entity_name, resource in entity_locations.items():
339339
reader_metadata = contract_metadata.reader_metadata[entity_name]
340-
extension = "." + (
341-
get_file_suffix(resource) or ""
342-
).lower() # Already checked that extension supported.
340+
extension = (
341+
"." + (get_file_suffix(resource) or "").lower()
342+
) # Already checked that extension supported.
343343

344344
reader_config = reader_metadata[extension]
345345
reader_type = get_reader(reader_config.reader)
@@ -369,6 +369,14 @@ def read_raw_entities(
369369

370370
return entities, dedup_messages(messages), successful
371371

372+
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
373+
"""Add a record index to the entity"""
374+
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
375+
376+
def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
377+
"""Drop a record index from the entity"""
378+
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
379+
372380
@abstractmethod
373381
def apply_data_contract(
374382
self,

src/dve/core_engine/backends/base/reader.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,14 @@ def read_to_entity_type(
127127

128128
return reader_func(self, resource, entity_name, schema)
129129

130+
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
131+
"""Add a record index to the entity"""
132+
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
133+
134+
def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
135+
"""Drop a record index to the entity"""
136+
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
137+
130138
def write_parquet(
131139
self,
132140
entity: EntityType,

src/dve/core_engine/backends/base/rules.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,13 @@ def register_udfs(cls, **kwargs):
135135
"""Method to register all custom dve functions for use during business rules application"""
136136
raise NotImplementedError()
137137

138-
@staticmethod
139-
def add_row_id(entity: EntityType) -> EntityType:
140-
"""Add a unique row id field to an entity"""
141-
raise NotImplementedError()
138+
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
139+
"""Add a record index to the entity"""
140+
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
142141

143-
@staticmethod
144-
def drop_row_id(entity: EntityType) -> EntityType:
145-
"""Add a unique row id field to an entity"""
146-
raise NotImplementedError()
142+
def drop_record_index(self, entity: EntityType) -> EntityType:
143+
"""Drop a unique row id field to an entity"""
144+
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
147145

148146
@classmethod
149147
def _raise_notimplemented_error(

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
3131
duckdb_read_parquet,
32+
duckdb_record_index,
3233
duckdb_write_parquet,
3334
get_duckdb_type_from_annotation,
3435
relation_is_empty,
@@ -37,6 +38,7 @@
3738
from dve.core_engine.backends.metadata.contract import DataContractMetadata
3839
from dve.core_engine.backends.types import StageSuccessful
3940
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
41+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
4042
from dve.core_engine.message import FeedbackMessage
4143
from dve.core_engine.type_hints import URI, EntityLocations
4244
from dve.core_engine.validation import RowValidator, apply_row_validator_helper
@@ -54,6 +56,7 @@ def __call__(self, row: pd.Series):
5456
return row # no op
5557

5658

59+
@duckdb_record_index
5760
@duckdb_write_parquet
5861
@duckdb_read_parquet
5962
class DuckDBDataContract(BaseDataContract[DuckDBPyRelation]):
@@ -144,10 +147,12 @@ def apply_data_contract(
144147
fld.name: get_duckdb_type_from_annotation(fld.annotation)
145148
for fld in entity_fields.values()
146149
}
150+
ddb_schema[RECORD_INDEX_COLUMN_NAME] = get_duckdb_type_from_annotation(int)
147151
polars_schema: dict[str, PolarsType] = {
148152
fld.name: get_polars_type_from_annotation(fld.annotation)
149153
for fld in entity_fields.values()
150154
}
155+
polars_schema[RECORD_INDEX_COLUMN_NAME] = get_polars_type_from_annotation(int)
151156
if relation_is_empty(relation):
152157
self.logger.warning(f"+ Empty relation for {entity_name}")
153158
empty_df = pl.DataFrame([], schema=polars_schema) # type: ignore # pylint: disable=W0612
@@ -170,6 +175,9 @@ def apply_data_contract(
170175

171176
self.logger.info(f"Data contract found {msg_count} issues in {entity_name}")
172177

178+
if not RECORD_INDEX_COLUMN_NAME in relation.columns:
179+
relation = self.add_record_index(relation)
180+
173181
casting_statements = [
174182
(
175183
self.generate_ddb_cast_statement(column, dtype)

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212

1313
import duckdb.typing as ddbtyp
1414
import numpy as np
15-
from duckdb import DuckDBPyConnection, DuckDBPyRelation
15+
from duckdb import DuckDBPyConnection, DuckDBPyRelation, StarExpression
1616
from duckdb.typing import DuckDBPyType
1717
from pandas import DataFrame
1818
from pydantic import BaseModel
1919
from typing_extensions import Annotated, get_args, get_origin, get_type_hints
2020

2121
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
22+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
2223
from dve.core_engine.type_hints import URI
2324
from dve.parser.file_handling.service import LocalFilesystemImplementation, _get_implementation
2425

@@ -286,3 +287,29 @@ def duckdb_rel_to_dictionaries(
286287
cols: tuple[str] = tuple(entity.columns) # type: ignore
287288
while rows := entity.fetchmany(batch_size):
288289
yield from (dict(zip(cols, rw)) for rw in rows)
290+
291+
292+
def _add_duckdb_record_index(
293+
self, entity: DuckDBPyRelation # pylint: disable=W0613
294+
) -> DuckDBPyRelation:
295+
"""Add record index to duckdb relation"""
296+
if RECORD_INDEX_COLUMN_NAME in entity.columns:
297+
return entity
298+
299+
return entity.select(f"*, row_number() OVER () as {RECORD_INDEX_COLUMN_NAME}")
300+
301+
302+
def _drop_duckdb_record_index(
303+
self, entity: DuckDBPyRelation # pylint: disable=W0613
304+
) -> DuckDBPyRelation:
305+
"""Drop record index from duckdb relation"""
306+
if RECORD_INDEX_COLUMN_NAME not in entity.columns:
307+
return entity
308+
return entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME]))
309+
310+
311+
def duckdb_record_index(cls):
312+
"""Class decorator to add record index methods for duckdb implementations"""
313+
setattr(cls, "add_record_index", _add_duckdb_record_index)
314+
setattr(cls, "drop_record_index", _drop_duckdb_record_index)
315+
return cls

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,32 @@
66

77
import duckdb as ddb
88
import polars as pl
9-
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv
9+
from duckdb import (
10+
DuckDBPyConnection,
11+
DuckDBPyRelation,
12+
StarExpression,
13+
default_connection,
14+
read_csv,
15+
)
1016
from pydantic import BaseModel
1117

1218
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
1319
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
1420
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
21+
duckdb_record_index,
1522
duckdb_write_parquet,
1623
get_duckdb_type_from_annotation,
1724
)
1825
from dve.core_engine.backends.implementations.duckdb.types import SQLType
1926
from dve.core_engine.backends.readers.utilities import check_csv_header_expected
20-
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
27+
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, polars_record_index
28+
from dve.core_engine.constants import RECORD_INDEX_COLUMN_NAME
2129
from dve.core_engine.message import FeedbackMessage
2230
from dve.core_engine.type_hints import URI, EntityName
2331
from dve.parser.file_handling import get_content_length
2432

2533

34+
@duckdb_record_index
2635
@duckdb_write_parquet
2736
class DuckDBCSVReader(BaseFileReader):
2837
"""A reader for CSV files including the ability to compare the passed model
@@ -111,18 +120,19 @@ def read_to_relation( # pylint: disable=unused-argument
111120
}
112121

113122
reader_options["columns"] = ddb_schema
114-
rel = read_csv(resource, **reader_options)
123+
124+
rel = self.add_record_index(read_csv(resource, **reader_options, parallel=False))
115125

116126
if self.null_empty_strings:
117-
cleaned_cols = ",".join([
118-
f"NULLIF(TRIM({c}), '') as {c}"
119-
for c in reader_options["columns"].keys()
120-
])
127+
cleaned_cols = ",".join(
128+
[f"NULLIF(TRIM({c}), '') as {c}" for c in reader_options["columns"].keys()]
129+
)
121130
rel = rel.select(cleaned_cols)
122131

123132
return rel
124133

125134

135+
@polars_record_index
126136
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
127137
"""
128138
Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.
@@ -156,10 +166,19 @@ def read_to_relation( # pylint: disable=unused-argument
156166

157167
# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
158168
# redundant
159-
df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612
169+
df = self.add_record_index( # pylint: disable=W0612
170+
pl.scan_csv(resource, **reader_options).select( # type: ignore
171+
list(polars_types.keys())
172+
)
173+
)
160174

161175
if self.null_empty_strings:
162-
df = df.select([pl.col(c).str.strip_chars().replace("", None) for c in df.columns])
176+
pl_exprs = [
177+
pl.col(c).str.strip_chars().replace("", None)
178+
for c in df.columns
179+
if not c == RECORD_INDEX_COLUMN_NAME
180+
] + [pl.col(RECORD_INDEX_COLUMN_NAME)]
181+
df = df.select(pl_exprs)
163182

164183
return ddb.sql("SELECT * FROM df")
165184

@@ -203,8 +222,10 @@ def __init__(
203222
def read_to_relation( # pylint: disable=unused-argument
204223
self, resource: URI, entity_name: EntityName, schema: type[BaseModel]
205224
) -> DuckDBPyRelation:
206-
entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
207-
entity = entity.distinct()
225+
entity: DuckDBPyRelation = super().read_to_relation(
226+
resource=resource, entity_name=entity_name, schema=schema
227+
)
228+
entity = entity.select(StarExpression(exclude=[RECORD_INDEX_COLUMN_NAME])).distinct()
208229
no_records = entity.shape[0]
209230

210231
if no_records != 1:
@@ -233,4 +254,4 @@ def read_to_relation( # pylint: disable=unused-argument
233254
],
234255
)
235256

236-
return entity
257+
return entity.select(f"*, 1 as {RECORD_INDEX_COLUMN_NAME}")

src/dve/core_engine/backends/implementations/duckdb/readers/json.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99

1010
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
1111
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
12+
duckdb_record_index,
1213
duckdb_write_parquet,
1314
get_duckdb_type_from_annotation,
1415
)
1516
from dve.core_engine.backends.implementations.duckdb.types import SQLType
1617
from dve.core_engine.type_hints import URI, EntityName
1718

1819

20+
@duckdb_record_index
1921
@duckdb_write_parquet
2022
class DuckDBJSONReader(BaseFileReader):
2123
"""A reader for JSON files"""
@@ -47,4 +49,6 @@ def read_to_relation( # pylint: disable=unused-argument
4749
for fld in schema.__fields__.values()
4850
}
4951

50-
return read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
52+
return self.add_record_index(
53+
read_json(resource, columns=ddb_schema, format=self._json_format) # type: ignore
54+
)

src/dve/core_engine/backends/implementations/duckdb/readers/xml.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,15 @@
1111
from dve.core_engine.backends.exceptions import MessageBearingError
1212
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
1313
from dve.core_engine.backends.readers.xml import XMLStreamReader
14-
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
14+
from dve.core_engine.backends.utilities import (
15+
get_polars_type_from_annotation,
16+
polars_record_index,
17+
stringify_model,
18+
)
1519
from dve.core_engine.type_hints import URI
1620

1721

22+
@polars_record_index
1823
@duckdb_write_parquet
1924
class DuckDBXMLStreamReader(XMLStreamReader):
2025
"""A reader for XML files"""
@@ -39,7 +44,9 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
3944
for fld in stringify_model(schema).__fields__.values()
4045
}
4146

42-
_lazy_frame = pl.LazyFrame(
43-
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
47+
_lazy_frame = self.add_record_index(
48+
pl.LazyFrame(
49+
data=self.read_to_py_iterator(resource, entity_name, schema), schema=polars_schema
50+
)
4451
)
4552
return self.ddb_connection.sql("select * from _lazy_frame")

src/dve/core_engine/backends/implementations/duckdb/rules.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
2424
DDBStruct,
2525
duckdb_read_parquet,
26+
duckdb_record_index,
2627
duckdb_rel_to_dictionaries,
2728
duckdb_write_parquet,
2829
get_all_registered_udfs,
@@ -51,13 +52,13 @@
5152
SemiJoin,
5253
TableUnion,
5354
)
54-
from dve.core_engine.constants import ROWID_COLUMN_NAME
5555
from dve.core_engine.functions import implementations as functions
5656
from dve.core_engine.message import FeedbackMessage
5757
from dve.core_engine.templating import template_object
5858
from dve.core_engine.type_hints import Messages
5959

6060

61+
@duckdb_record_index
6162
@duckdb_write_parquet
6263
@duckdb_read_parquet
6364
class DuckDBStepImplementations(BaseStepImplementations[DuckDBPyRelation]):
@@ -106,20 +107,6 @@ def register_udfs( # type: ignore
106107
connection.sql(_sql)
107108
return cls(connection=connection, **kwargs)
108109

109-
@staticmethod
110-
def add_row_id(entity: DuckDBPyRelation) -> DuckDBPyRelation:
111-
"""Adds a row identifier to the Relation"""
112-
if ROWID_COLUMN_NAME not in entity.columns:
113-
entity = entity.project(f"*, ROW_NUMBER() OVER () as {ROWID_COLUMN_NAME}")
114-
return entity
115-
116-
@staticmethod
117-
def drop_row_id(entity: DuckDBPyRelation) -> DuckDBPyRelation:
118-
"""Drops the row identiifer from a Relation"""
119-
if ROWID_COLUMN_NAME in entity.columns:
120-
entity = entity.select(StarExpression(exclude=[ROWID_COLUMN_NAME]))
121-
return entity
122-
123110
def add(self, entities: DuckDBEntities, *, config: ColumnAddition) -> Messages:
124111
"""A transformation step which adds a column to an entity."""
125112
entity: DuckDBPyRelation = entities[config.entity_name]

0 commit comments

Comments
 (0)