Skip to content

Commit f429b2c

Browse files
authored
Merge pull request #150 from MITLibraries/TIMX-506-dataset-metadata-class-client
TIMX 506 - new dataset metadata client
2 parents a479435 + ef8a8b8 commit f429b2c

File tree

6 files changed

+645
-141
lines changed

6 files changed

+645
-141
lines changed

Pipfile.lock

Lines changed: 137 additions & 137 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from timdex_dataset_api import TIMDEXDataset
2+
13
# timdex-dataset-api
2-
Python library for interacting with a TIMDEX parquet dataset located remotely or in S3.
4+
Python library for interacting with a TIMDEX parquet dataset located remotely or in S3. This library is often abbreviated as "TDA".
35

46
## Development
57

@@ -9,6 +11,13 @@ Python library for interacting with a TIMDEX parquet dataset located remotely or
911
- To run unit tests: `make test`
1012
- To lint the repo: `make lint`
1113

14+
The library version number is set in [`timdex_dataset_api/__init__.py`](timdex_dataset_api/__init__.py), e.g.:
15+
```python
16+
__version__ = "2.1.0"
17+
```
18+
19+
Updating the version number when making changes to the library will prompt applications that install it, when they have _their_ dependencies updated, to pickup the new version.
20+
1221
## Installation
1322

1423
This library is designed to be utilized by other projects, and can therefore be added as a dependency directly from the Github repository.
@@ -30,11 +39,116 @@ timdex_dataset_api = {git = "https://github.com/MITLibraries/timdex-dataset-api.
3039

3140
### Required
3241

42+
None at this time.
43+
3344
### Optional
3445
```shell
3546
TDA_LOG_LEVEL=# log level for timdex-dataset-api, accepts [DEBUG, INFO, WARNING, ERROR], default INFO
47+
WARNING_ONLY_LOGGERS=# Comma-seperated list of logger names to set as WARNING only, e.g. 'botocore,charset_normalizer,smart_open'
3648
```
3749

3850
## Usage
3951

40-
_TODO..._
52+
Currently, the most common use cases are:
53+
* **Transmogrifier**: uses TDA to **write** to the parquet dataset
54+
* **TIMDEX-Index-Manager (TIM)**: uses TDA to **read** from the parquet dataset
55+
56+
Beyond those two ETL run use cases, others are emerging where this library proves helpful:
57+
58+
* yielding only the current version of all records in the dataset, useful for quickly re-indexing to Opensearch
59+
* high throughput (time) + memory safe (space) access to the dataset for analysis
60+
61+
For both reading and writing, the following env vars are recommended:
62+
```shell
63+
TDA_LOG_LEVEL=INFO
64+
WARNING_ONLY_LOGGERS=asyncio,botocore,urllib3,s3transfer,boto3
65+
```
66+
67+
### Reading Data
68+
69+
First, import the library:
70+
```python
71+
from timdex_dataset_api import TIMDEXDataset
72+
```
73+
74+
Load a dataset instance:
75+
```python
76+
# dataset in S3
77+
timdex_dataset = TIMDEXDataset("s3://my-bucket/path/to/dataset")
78+
79+
# or, local dataset (e.g. testing or development)
80+
timdex_dataset = TIMDEXDataset("/path/to/dataset")
81+
82+
# load the dataset, which discovers all parquet files
83+
timdex_dataset.load()
84+
85+
# or, load the dataset but ensure that only current records are ever yielded
86+
timdex_dataset.load(current_records=True)
87+
```
88+
89+
All read methods for `TIMDEXDataset` allow for the same group of filters which are defined in `timdex_dataset_api.dataset.DatasetFilters`. Examples are shown below.
90+
91+
```python
92+
# read a single row, no filtering
93+
single_record_dict = next(timdex_dataset.read_dicts_iter())
94+
95+
96+
# get batches of records, filtering to a particular run
97+
for batch in timdex_dataset.read_batches_iter(
98+
source="alma",
99+
run_date="2025-06-01",
100+
run_id="abc123"
101+
):
102+
# do thing with pyarrow batch...
103+
104+
105+
# use convenience method to yield only transformed records
106+
# NOTE: this is what TIM uses for indexing to Opensearch for a given ETL run
107+
for transformed_record in timdex_dataset.read_transformed_records_iter(
108+
source="aspace",
109+
run_date="2025-06-01",
110+
run_id="ghi789"
111+
):
112+
# do something with transformed record dictionary...
113+
114+
115+
# load all records for a given run into a pandas dataframe
116+
# NOTE: this can be potentially expensive memory-wise if the run is large
117+
run_df = timdex_dataset.read_dataframe(
118+
source="dspace",
119+
run_date="2025-06-01",
120+
run_id="def456"
121+
)
122+
```
123+
124+
### Writing Data
125+
126+
At this time, the only application that writes to the ETL parquet dataset is Transmogrifier.
127+
128+
To write records to the dataset, you must prepare an iterator of `timdex_dataset_api.record.DatasetRecord`. Here is some pseudocode for how a dataset write can work:
129+
130+
```python
131+
from timdex_dataset_api import DatasetRecord, TIMDEXDataset
132+
133+
# different ways to achieve, just need some kind of iterator (e.g. list, generator, etc.)
134+
# of DatasetRecords for writing
135+
def records_to_write_iter() -> Iterator[DatasetRecord]:
136+
records = [...]
137+
for record in records:
138+
yield DatasetRecord(
139+
timdex_record_id=...,
140+
source_record=...,
141+
transformed_record=...,
142+
source=...,
143+
run_date=...,
144+
run_type=...,
145+
run_timestamp=...,
146+
action=...,
147+
run_record_offset=...
148+
)
149+
records_iter = records_to_write_iter()
150+
151+
# finally, perform the write, relying on the library to handle efficient batching
152+
timdex_dataset = TIMDEXDataset("/path/to/dataset")
153+
timdex_dataset.write(records_iter=records_iter)
154+
```

tests/conftest.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
generate_sample_records,
1111
generate_sample_records_with_simulated_partitions,
1212
)
13-
from timdex_dataset_api import TIMDEXDataset
13+
from timdex_dataset_api import TIMDEXDataset, TIMDEXDatasetMetadata
1414
from timdex_dataset_api.dataset import TIMDEXDatasetConfig
1515

1616

@@ -208,3 +208,8 @@ def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
208208
timdex_dataset.load()
209209

210210
return timdex_dataset
211+
212+
213+
@pytest.fixture
214+
def timdex_dataset_metadata(dataset_with_same_day_runs):
215+
return TIMDEXDatasetMetadata(timdex_dataset=dataset_with_same_day_runs)

tests/test_metadata.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# ruff: noqa: PLR2004
2+
3+
import duckdb
4+
5+
from timdex_dataset_api import TIMDEXDataset, TIMDEXDatasetMetadata
6+
7+
8+
def test_tdm_init_from_timdex_dataset_instance_success(dataset_with_same_day_runs):
9+
tdm = TIMDEXDatasetMetadata(timdex_dataset=dataset_with_same_day_runs)
10+
assert isinstance(tdm.timdex_dataset, TIMDEXDataset)
11+
12+
13+
def test_tdm_init_from_timdex_dataset_path_success(dataset_with_runs_location):
14+
tdm = TIMDEXDatasetMetadata.from_dataset_location(dataset_with_runs_location)
15+
assert isinstance(tdm.timdex_dataset, TIMDEXDataset)
16+
17+
18+
def test_tdm_default_database_location_in_memory(timdex_dataset_metadata):
19+
assert timdex_dataset_metadata.db_path == ":memory:"
20+
result = timdex_dataset_metadata.conn.query("PRAGMA database_list;").fetchone()
21+
assert result[1] == "memory" # name of database
22+
assert result[2] is None # file associated with database, where None is memory
23+
24+
25+
def test_tdm_explicit_database_in_file(tmp_path, dataset_with_runs_location):
26+
db_path = str(tmp_path / "tda.duckdb")
27+
tdm = TIMDEXDatasetMetadata.from_dataset_location(
28+
dataset_with_runs_location,
29+
db_path=db_path,
30+
)
31+
assert tdm.db_path == db_path
32+
result = tdm.conn.query("PRAGMA database_list;").fetchone()
33+
assert result[1] == "tda" # name of database
34+
assert result[2] == db_path # filepath passed during init
35+
36+
37+
def test_tdm_get_duckdb_connection(timdex_dataset_metadata):
38+
conn = timdex_dataset_metadata.get_connection()
39+
assert isinstance(conn, duckdb.DuckDBPyConnection)
40+
41+
42+
def test_tdm_set_threads(timdex_dataset_metadata):
43+
# set to 64
44+
timdex_dataset_metadata.set_database_thread_usage(64)
45+
sixty_four_thread_count = timdex_dataset_metadata.conn.query(
46+
"""SELECT current_setting('threads');"""
47+
).fetchone()[0]
48+
assert sixty_four_thread_count == 64
49+
50+
# set to 12
51+
timdex_dataset_metadata.set_database_thread_usage(12)
52+
sixty_four_thread_count = timdex_dataset_metadata.conn.query(
53+
"""SELECT current_setting('threads');"""
54+
).fetchone()[0]
55+
assert sixty_four_thread_count == 12
56+
57+
58+
def test_tdm_init_sets_up_database(timdex_dataset_metadata):
59+
df = timdex_dataset_metadata.conn.query("show tables;").to_df()
60+
assert set(df.name) == {"current_records", "records"}
61+
62+
63+
def test_tdm_get_current_parquet_files(timdex_dataset_metadata):
64+
parquet_files = timdex_dataset_metadata.get_current_parquet_files()
65+
# assert 5 total parquet files in dataset
66+
# but only 3 contain current records
67+
assert len(timdex_dataset_metadata.timdex_dataset.dataset.files) == 5
68+
assert len(parquet_files) == 3
69+
70+
71+
def test_tdm_get_record_to_run_mapping(timdex_dataset_metadata):
72+
record_map = timdex_dataset_metadata.get_current_record_to_run_map()
73+
74+
assert len(record_map) == 75
75+
assert record_map["alma:0"] == "run-5"
76+
assert record_map["alma:5"] == "run-4"
77+
assert record_map["alma:19"] == "run-4"
78+
assert "run-3" not in record_map.values()
79+
assert record_map["alma:20"] == "run-2"
80+
81+
82+
def test_tdm_current_records_subset_of_all_records(timdex_dataset_metadata):
83+
records_df = timdex_dataset_metadata.conn.query("select * from records;").to_df()
84+
current_records_df = timdex_dataset_metadata.conn.query(
85+
"select * from current_records;"
86+
).to_df()
87+
assert set(current_records_df.timdex_record_id).issubset(
88+
set(records_df.timdex_record_id)
89+
)

timdex_dataset_api/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
"""timdex_dataset_api/__init__.py"""
22

33
from timdex_dataset_api.dataset import TIMDEXDataset
4+
from timdex_dataset_api.metadata import TIMDEXDatasetMetadata
45
from timdex_dataset_api.record import DatasetRecord
56

6-
__version__ = "2.1.0"
7+
__version__ = "2.2.0"
78

89
__all__ = [
910
"DatasetRecord",
1011
"TIMDEXDataset",
12+
"TIMDEXDatasetMetadata",
1113
]

0 commit comments

Comments
 (0)