Skip to content

Commit 0a20234

Browse files
authored
Merge pull request #147 from MITLibraries/TIMX-496-establish-migrations-and-backfill-migration
TIMX 496 - add migrations folder and run_timestamp migration
2 parents 926d630 + 243c7e1 commit 0a20234

File tree

3 files changed

+282
-0
lines changed

3 files changed

+282
-0
lines changed
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
# ruff: noqa: BLE001, D212, TRY300, TRY400
2+
"""
3+
Date: 2025-05-30
4+
5+
Description:
6+
7+
After the creation of a new run_timestamp column as part of Jira ticket TIMX-496, there
8+
was a need to backfill a run timestamp for all parquet files in the dataset.
9+
10+
This migration performs the following:
11+
1. retrieves all parquet file from the dataset
12+
2. for each parquet file:
13+
a. if the run_timestamp column already exists, skip
14+
b. retrieve the file creation date of the parquet file, this becomes the run_timestamp
15+
c. rewrite the parquet file with a new run_timestamp column
16+
17+
Side effects:
18+
19+
1- Loss of "Last Modified" date in S3
20+
21+
This migration is using the original "Last Modified" date in S3 that was minted when the
22+
parquet file was written. It is storing that data in a `run_timestamp` column and thus
23+
will persist, but the actual parquet file will LOSE this "Last Modified" date when it is
24+
recreated.
25+
26+
Usage:
27+
28+
pipenv run python migrations/001_2025_05_30_backfill_run_timestamp_column.py \
29+
<DATASET_LOCATION> \
30+
--dry-run
31+
"""
32+
33+
import argparse
34+
import json
35+
import time
36+
from datetime import UTC, datetime
37+
38+
import pyarrow as pa
39+
import pyarrow.dataset as ds
40+
import pyarrow.parquet as pq
41+
from pyarrow import fs
42+
43+
from timdex_dataset_api.config import configure_dev_logger, configure_logger
44+
from timdex_dataset_api.dataset import TIMDEX_DATASET_SCHEMA, TIMDEXDataset
45+
46+
configure_dev_logger()
47+
48+
logger = configure_logger(__name__)
49+
50+
51+
def backfill_dataset(location: str, *, dry_run: bool = False) -> None:
52+
"""Main entrypoint for backfill script.
53+
54+
Loop through all parquet files in the dataset and, if the run_timestamp column does
55+
not exist, create it using the S3 object creation date.
56+
"""
57+
start_time = time.perf_counter()
58+
td = TIMDEXDataset(location)
59+
td.load()
60+
61+
parquet_files = td.dataset.files # type: ignore[attr-defined]
62+
logger.info(f"Found {len(parquet_files)} parquet files in dataset.")
63+
64+
success_count = 0
65+
skip_count = 0
66+
error_count = 0
67+
68+
for i, parquet_file in enumerate(parquet_files):
69+
logger.info(
70+
f"Working on parquet file {i + 1}/{len(parquet_files)}: {parquet_file}"
71+
)
72+
73+
success, result = backfill_parquet_file(parquet_file, td.dataset, dry_run=dry_run)
74+
75+
if success:
76+
if result and "skipped" in result:
77+
skip_count += 1
78+
else:
79+
success_count += 1
80+
else:
81+
error_count += 1
82+
83+
logger.info(json.dumps(result))
84+
85+
logger.info(
86+
f"Backfill complete. Elapsed: {time.perf_counter()-start_time}, "
87+
f"Success: {success_count}, Skipped: {skip_count}, Errors: {error_count}"
88+
)
89+
90+
91+
def backfill_parquet_file(
92+
parquet_filepath: str,
93+
dataset: ds.Dataset,
94+
*,
95+
dry_run: bool = False,
96+
) -> tuple[bool, dict]:
97+
"""Backfill a single parquet file with run_timestamp column.
98+
99+
Args:
100+
parquet_filepath: Path to the parquet file
101+
dataset: PyArrow dataset instance
102+
dry_run: If True, don't actually write changes
103+
104+
Returns:
105+
Tuple of (success: bool, result: dict)
106+
"""
107+
start_time = time.perf_counter()
108+
try:
109+
parquet_file = pq.ParquetFile(parquet_filepath, filesystem=dataset.filesystem) # type: ignore[attr-defined]
110+
111+
# Check if run_timestamp column already exists
112+
if "run_timestamp" in parquet_file.schema.names:
113+
logger.info(
114+
f"Parquet already has 'run_timestamp', skipping: {parquet_filepath}"
115+
)
116+
return True, {"file_path": parquet_filepath, "skipped": True}
117+
118+
# Read all rows from the parquet file into a pyarrow Table
119+
# NOTE: memory intensive for very large parquet files, though suitable for onetime
120+
# migration work.
121+
table = parquet_file.read()
122+
123+
# Get S3 object creation date
124+
creation_date = get_s3_object_creation_date(parquet_filepath, dataset.filesystem) # type: ignore[attr-defined]
125+
126+
# Create run_timestamp column using the exact schema definition
127+
num_rows = len(table)
128+
run_timestamp_field = TIMDEX_DATASET_SCHEMA.field("run_timestamp")
129+
run_timestamp_array = pa.array(
130+
[creation_date] * num_rows, type=run_timestamp_field.type
131+
)
132+
133+
# Add the run_timestamp column to the table
134+
table_with_timestamp = table.append_column("run_timestamp", run_timestamp_array)
135+
136+
# Write the updated table back to the same file
137+
if not dry_run:
138+
pq.write_table(
139+
table_with_timestamp, # type: ignore[attr-defined]
140+
parquet_filepath,
141+
filesystem=dataset.filesystem, # type: ignore[attr-defined]
142+
)
143+
logger.info(f"Successfully updated file: {parquet_filepath}")
144+
else:
145+
logger.info(f"DRY RUN: Would update file: {parquet_filepath}")
146+
147+
update_details = {
148+
"file_path": parquet_filepath,
149+
"rows_updated": num_rows,
150+
"run_timestamp_added": creation_date.isoformat(),
151+
"elapsed": time.perf_counter() - start_time,
152+
"dry_run": dry_run,
153+
}
154+
155+
return True, update_details
156+
157+
except Exception as e:
158+
logger.error(f"Error processing parquet file {parquet_filepath}: {e}")
159+
return False, {
160+
"file_path": parquet_filepath,
161+
"error": str(e),
162+
"elapsed": time.perf_counter() - start_time,
163+
"dry_run": dry_run,
164+
}
165+
166+
167+
def get_s3_object_creation_date(file_path: str, filesystem: fs.FileSystem) -> datetime:
168+
"""Get the creation date of an S3 object.
169+
170+
This function assumes that all datetimes coming back are coming from the same source
171+
and will be formatted similarly, which means either all values are timezone aware or
172+
not.
173+
174+
Args:
175+
file_path: Path to the S3 object
176+
filesystem: PyArrow S3 filesystem instance
177+
178+
Returns:
179+
datetime: Creation date of the S3 object in UTC
180+
"""
181+
try:
182+
# Get creation date of S3 object
183+
file_info = filesystem.get_file_info(file_path)
184+
creation_date: datetime = file_info.mtime # type: ignore[assignment]
185+
186+
# Ensure it's timezone-aware and in UTC
187+
if creation_date.tzinfo is None:
188+
creation_date = creation_date.replace(tzinfo=UTC)
189+
elif creation_date.tzinfo != UTC:
190+
creation_date = creation_date.astimezone(UTC)
191+
192+
return creation_date
193+
194+
except Exception as e:
195+
logger.error(f"Error getting S3 object creation date for {file_path}: {e}")
196+
raise
197+
198+
199+
if __name__ == "__main__":
200+
parser = argparse.ArgumentParser(
201+
description=(
202+
"Backfill run_timestamp column in TIMDEX parquet files "
203+
"using S3 creation dates"
204+
)
205+
)
206+
parser.add_argument(
207+
"--dry-run",
208+
action="store_true",
209+
help="Scan files and report what would be done without making changes",
210+
)
211+
parser.add_argument(
212+
"dataset_location", help="Path to the dataset (local path or s3://bucket/path)"
213+
)
214+
215+
args = parser.parse_args()
216+
217+
backfill_dataset(args.dataset_location, dry_run=args.dry_run)

migrations/README.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# TIMDEX Dataset Migrations
2+
3+
This directory stores data and/or schema modifications that were made to the TIMDEX parquet dataset. Consider them like ["migrations"](https://en.wikipedia.org/wiki/Schema_migration) for a SQL database, but -- at least at the time of this writing -- considerably more informal and ad-hoc.
4+
5+
Unless otherwise noted, it assumed that these migrations were:
6+
7+
* manually run by a developer, either on a local machine or some cloud operations
8+
* have been performed already, should not be performed again
9+
* the migration script does not contain a way to rollback the changes
10+
11+
## Structure
12+
13+
Each migration is either a single python file, or a dedicated directory, that follow this naming convention:
14+
15+
- `###_`: incrementing migration sequence number
16+
- `YYYY_MM_DD_`: approximate date of migration creation and run
17+
- `short_name.py` (file) or `short_name` (directory): short migration name
18+
19+
Examples:
20+
21+
- `001_2025_05_30_backfill_run_timestamp_column.py` --> single file
22+
- `002_2025_06_15_remove_errant_parquet_files` --> directory that contains 1+ files
23+
24+
Files inside a migration directory like `002_2025_06_15_remove_errant_parquet_files` are _not_ expected to follow any particular format (though a `README.md` is encourage to inform future developers how it was performed!).
25+
26+
The entrypoint for each migration should contain a docstring at the root of the file with a structure like:
27+
28+
```python
29+
"""
30+
Date: YYYY-MM-DD
31+
32+
Description:
33+
34+
Description here about the nature of the migration...
35+
36+
Usage:
37+
38+
Explanation here for how to run it...
39+
"""
40+
```
41+
42+
Example:
43+
```python
44+
"""
45+
Date: 2025-05-30
46+
47+
Description:
48+
49+
After the creation of a new run_timestamp column as part of Jira ticket TIMX-496, there
50+
was a need to backfill a run timestamp for all parquet files in the dataset.
51+
52+
This migration performs the following:
53+
1. retrieves all parquet file from the dataset
54+
2. for each parquet file:
55+
a. if the run_timestamp column already exists, skip
56+
b. retrieve the file creation date of the parquet file, this becomes the run_timestamp
57+
c. rewrite the parquet file with a new run_timestamp column
58+
59+
Usage:
60+
PYTHONPATH=. \
61+
pipenv run python migrations/001_2025_05_30_backfill_run_timestamp_column.py \
62+
<DATASET_LOCATION> \
63+
--dry-run
64+
"""
65+
```

migrations/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)