Skip to content

Commit 926d630

Browse files
authored
Merge pull request #146 from MITLibraries/TIMX-496-add-same-day-run-timestamp
TIMX 496 - Add run_timestamp column to dataset
2 parents 8da7a7d + de7dc90 commit 926d630

File tree

6 files changed

+165
-14
lines changed

6 files changed

+165
-14
lines changed

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pytest = "*"
2525
ruff = "*"
2626
setuptools = "*"
2727
pip-audit = "*"
28+
pytest-freezegun = "*"
2829

2930
[requires]
3031
python_version = "3.12"

Pipfile.lock

Lines changed: 17 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/conftest.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,57 @@ def dataset_with_runs_location(tmp_path) -> str:
153153
@pytest.fixture
154154
def local_dataset_with_runs(dataset_with_runs_location) -> TIMDEXDataset:
155155
return TIMDEXDataset(dataset_with_runs_location)
156+
157+
158+
@pytest.fixture
159+
def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
160+
"""Dataset fixture where a single source had multiple runs on the same day.
161+
162+
After these runs, we'd expect 70 records in Opensearch:
163+
- most recent full run "run-2" established a 75 record base
164+
- runs "run-3" and "run-4" just modified records; no record count change
165+
- run "run-5" deleted 5 records
166+
167+
If the order of full runs 1 & 2 are not handled correctly, we'd see an incorrect
168+
baseline of 100 records.
169+
170+
If the order of daily runs 4 & 5 are not handled correctly, we'd see 75 records
171+
because the deletes would happen before the index just recreated the records.
172+
"""
173+
location = str(tmp_path / "dataset_with_same_day_runs")
174+
os.mkdir(location)
175+
176+
timdex_dataset = TIMDEXDataset(location)
177+
178+
run_params = []
179+
180+
# Simulate two "full" runs where "run-2" should establish the baseline.
181+
# Simulate daily runs, multiple per day sometimes, where deletes from "run-5" should
182+
# be represented.
183+
run_params.extend(
184+
[
185+
(100, "alma", "2025-01-01", "full", "index", "run-1"),
186+
(75, "alma", "2025-01-01", "full", "index", "run-2"),
187+
(10, "alma", "2025-01-01", "daily", "index", "run-3"),
188+
(20, "alma", "2025-01-02", "daily", "index", "run-4"),
189+
(5, "alma", "2025-01-02", "daily", "delete", "run-5"),
190+
]
191+
)
192+
193+
for params in run_params:
194+
num_records, source, run_date, run_type, action, run_id = params
195+
records = generate_sample_records(
196+
num_records,
197+
timdex_record_id_prefix=source,
198+
source=source,
199+
run_date=run_date,
200+
run_type=run_type,
201+
action=action,
202+
run_id=run_id,
203+
)
204+
timdex_dataset.write(records)
205+
206+
# reload after writes
207+
timdex_dataset.load()
208+
209+
return timdex_dataset

tests/test_dataset.py

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1-
# ruff: noqa: D205, S105, S106, SLF001, PD901, PLR2004
1+
# ruff: noqa: D205, D209, S105, S106, SLF001, PD901, PLR2004
22

33
import os
4-
from datetime import date
4+
from datetime import UTC, date, datetime
55
from unittest.mock import MagicMock, patch
66

77
import pyarrow as pa
88
import pytest
99
from pyarrow import fs
1010

11+
from tests.utils import generate_sample_records
1112
from timdex_dataset_api.dataset import (
1213
DatasetNotLoadedError,
1314
TIMDEXDataset,
@@ -463,3 +464,68 @@ def test_dataset_current_records_index_filtering_accurate_records_yielded(
463464
"alma:23",
464465
"alma:24",
465466
]
467+
468+
469+
@pytest.mark.freeze_time("2025-05-22 01:23:45.567890")
470+
def test_dataset_write_includes_minted_run_timestamp(tmp_path):
471+
# create dataset
472+
location = str(tmp_path / "one_run_at_frozen_time")
473+
os.mkdir(location)
474+
timdex_dataset = TIMDEXDataset(location)
475+
476+
run_id = "abc123"
477+
478+
# perform a single ETL run that should pickup the frozen time for run_timestamp
479+
records = generate_sample_records(
480+
10,
481+
timdex_record_id_prefix="alma",
482+
source="alma",
483+
run_date="2025-05-22",
484+
run_type="full",
485+
action="index",
486+
run_id=run_id,
487+
)
488+
timdex_dataset.write(records)
489+
timdex_dataset.load()
490+
491+
# assert TIMDEXDataset.write() applies current time as run_timestamp
492+
run_row_dict = next(timdex_dataset.read_dicts_iter())
493+
assert "run_timestamp" in run_row_dict
494+
assert run_row_dict["run_timestamp"] == datetime(
495+
2025,
496+
5,
497+
22,
498+
1,
499+
23,
500+
45,
501+
567890,
502+
tzinfo=UTC,
503+
)
504+
505+
# assert the same run_timestamp is applied to all rows in the run
506+
df = timdex_dataset.read_dataframe(run_id=run_id)
507+
assert len(list(df.run_timestamp.unique())) == 1
508+
509+
510+
def test_dataset_load_current_records_gets_correct_same_day_full_run(
511+
dataset_with_same_day_runs,
512+
):
513+
"""Two full runs were performed on the same day, but 'run-2' was performed most
514+
recently. current_records=True should discover the more recent of the two 'run-2',
515+
not 'run-1'."""
516+
dataset_with_same_day_runs.load(current_records=True, run_type="full")
517+
df = dataset_with_same_day_runs.read_dataframe()
518+
519+
assert list(df.run_id.unique()) == ["run-2"]
520+
521+
522+
def test_dataset_load_current_records_gets_correct_same_day_daily_runs_ordering(
523+
dataset_with_same_day_runs,
524+
):
525+
"""Two runs were performed on 2025-01-02, but the most recent records should be from
526+
run 'run-5' which are action='delete', not 'run-4' with action='index'."""
527+
dataset_with_same_day_runs.load(current_records=True, run_type="daily")
528+
first_record = next(dataset_with_same_day_runs.read_dicts_iter())
529+
530+
assert first_record["run_id"] == "run-5"
531+
assert first_record["action"] == "delete"

timdex_dataset_api/dataset.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
pa.field("year", pa.string()),
4242
pa.field("month", pa.string()),
4343
pa.field("day", pa.string()),
44+
pa.field("run_timestamp", pa.timestamp("us", tz="UTC")),
4445
)
4546
)
4647

@@ -62,6 +63,7 @@ class DatasetFilters(TypedDict, total=False):
6263
year: str | None
6364
month: str | None
6465
day: str | None
66+
run_timestamp: str | datetime | None
6567

6668

6769
@dataclass
@@ -112,15 +114,19 @@ def __init__(
112114
location (str | list[str]): Local filesystem path or an S3 URI to
113115
a parquet dataset. For partitioned datasets, set to the base directory.
114116
"""
115-
self.location = location
116117
self.config = config or TIMDEXDatasetConfig()
118+
self.location = location
117119

120+
# pyarrow dataset
118121
self.filesystem, self.paths = self.parse_location(self.location)
119122
self.dataset: ds.Dataset = None # type: ignore[assignment]
120123
self.schema = TIMDEX_DATASET_SCHEMA
121124
self.partition_columns = TIMDEX_DATASET_PARTITION_COLUMNS
125+
126+
# writing
122127
self._written_files: list[ds.WrittenFile] = None # type: ignore[assignment]
123128

129+
# reading
124130
self._current_records: bool = False
125131
self._current_records_dataset: ds.Dataset = None # type: ignore[assignment]
126132

@@ -405,26 +411,31 @@ def write(
405411
return self._written_files # type: ignore[return-value]
406412

407413
def create_record_batches(
408-
self,
409-
records_iter: Iterator["DatasetRecord"],
414+
self, records_iter: Iterator["DatasetRecord"]
410415
) -> Iterator[pa.RecordBatch]:
411416
"""Yield pyarrow.RecordBatches for writing.
412417
413418
This method expects an iterator of DatasetRecord instances.
414419
415-
Each DatasetRecord is validated and serialized to a dictionary before added to a
416-
pyarrow.RecordBatch for writing.
420+
Each DatasetRecord is serialized to a dictionary, any column data shared by all
421+
rows is added to the record, and then added to a pyarrow.RecordBatch for writing.
417422
418423
Args:
419424
- records_iter: Iterator of DatasetRecord instances
420425
"""
426+
run_timestamp = datetime.now(UTC)
421427
for i, record_batch in enumerate(
422428
itertools.batched(records_iter, self.config.write_batch_size)
423429
):
424-
batch = pa.RecordBatch.from_pylist(
425-
[record.to_dict() for record in record_batch]
426-
)
427-
logger.debug(f"Yielding batch {i+1} for dataset writing.")
430+
record_dicts = [
431+
{
432+
**record.to_dict(),
433+
"run_timestamp": run_timestamp,
434+
}
435+
for record in record_batch
436+
]
437+
batch = pa.RecordBatch.from_pylist(record_dicts)
438+
logger.debug(f"Yielding batch {i + 1} for dataset writing.")
428439
yield batch
429440

430441
def log_write_statistics(self, start_time: float) -> None:

timdex_dataset_api/run.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
5252
"source": "first",
5353
"run_date": "first",
5454
"run_type": "first",
55+
"run_timestamp": "first",
5556
"num_rows": "sum",
5657
"filename": list,
5758
}
@@ -65,9 +66,9 @@ def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
6566
lambda x: len(x)
6667
)
6768

68-
# sort by run date and source
69+
# sort by run_timestamp (more granularity than run_date) and source
6970
grouped_runs_df = grouped_runs_df.sort_values(
70-
["run_date", "source"], ascending=False
71+
["run_timestamp", "source"], ascending=False
7172
)
7273

7374
# cache the result
@@ -185,12 +186,14 @@ def _parse_run_metadata_from_parquet_file(self, parquet_filepath: str) -> dict:
185186
run_date = columns_meta[4]["statistics"]["max"]
186187
run_type = columns_meta[5]["statistics"]["max"]
187188
run_id = columns_meta[7]["statistics"]["max"]
189+
run_timestamp = columns_meta[9]["statistics"]["max"]
188190

189191
return {
190192
"source": source,
191193
"run_date": run_date,
192194
"run_type": run_type,
193195
"run_id": run_id,
196+
"run_timestamp": run_timestamp,
194197
"num_rows": num_rows,
195198
"filename": parquet_filepath,
196199
}

0 commit comments

Comments
 (0)