@@ -32,6 +32,23 @@ def _dataset_label(dataset: str) -> str:
3232 return dataset [:39 ]
3333
3434
35+ def _input_dataset (
36+ dataset_id : str ,
37+ version : str ,
38+ file_type : recipes .DatasetType ,
39+ table_name : str | None ,
40+ ) -> InputDataset :
41+ table_name = table_name or f"{ dataset_id } _{ version } "
42+ return InputDataset (
43+ id = dataset_id ,
44+ version = version ,
45+ source = "edm.recipes.datasets" ,
46+ file_type = file_type ,
47+ custom = {"file_type" : file_type },
48+ import_as = table_name ,
49+ )
50+
51+
3552class Converter (SortedSerializedBase , YamlWriter ):
3653 _exclude_falsey_values : bool = False
3754 _exclude_none : bool = False
@@ -131,34 +148,18 @@ def ingest(
131148 version = version ,
132149 staging_dir = dataset_staging_dir ,
133150 push = False ,
134- )[0 ] # TODO - hack
151+ )
152+ assert len (config ) == 1 , (
153+ "Validate ingest not set up to compare one-to-many definitions"
154+ )
135155
136156 ## copy so that it's in the "library" file system for easy import
137- output_path = dataset_staging_dir / dataset / config . version / f"{ dataset } .parquet"
157+ output_path = dataset_staging_dir / dataset / f"{ dataset } .parquet"
138158 ingest_path = LIBRARY_PATH / dataset / "ingest" / f"{ dataset } .parquet"
139159 ingest_path .parent .mkdir (exist_ok = True , parents = True )
140160 shutil .copy (output_path , ingest_path )
141161
142162
143- def _import_dataset (
144- dataset_id , version , file_type , dest_table_name , dest_dir : Path | None = None
145- ):
146- pg_client = postgres .PostgresClient (schema = SCHEMA , database = DATABASE )
147- data_loader .import_dataset (
148- InputDataset (
149- id = dataset_id ,
150- version = version ,
151- source = "edm.recipes.datasets" ,
152- file_type = file_type ,
153- custom = {"file_type" : file_type },
154- import_as = dest_table_name ,
155- ),
156- stage = "builds.qa" ,
157- pg_client = pg_client ,
158- dest_dir_override = dest_dir ,
159- )
160-
161-
162163def load_recipe (
163164 dataset : str ,
164165 version : Literal ["library" , "ingest" ],
@@ -176,7 +177,11 @@ def load_recipe(
176177 client .drop_table (dataset )
177178 client .drop_table (target_table )
178179
179- _import_dataset (dataset , version , file_type = file_type , dest_table_name = target_table )
180+ data_loader .load_dataset_into_pg (
181+ _input_dataset (dataset , version , file_type , target_table ),
182+ client ,
183+ LIBRARY_PATH / dataset / version / f"{ dataset } .{ file_type .to_extension ()} " ,
184+ )
180185
181186
182187def load_recipe_from_s3 (
@@ -203,12 +208,11 @@ def load_recipe_from_s3(
203208 # often use case would be archiving to dev bucket multiple times
204209 # just ensure that local copy is not re-used
205210 with TemporaryDirectory () as _dir :
206- _import_dataset (
207- ds_id ,
208- s3_version ,
209- file_type = file_type ,
210- dest_table_name = target_table ,
211- dest_dir = Path (_dir ),
211+ data_loader .import_dataset (
212+ _input_dataset (ds_id , s3_version , file_type , target_table ),
213+ pg_client = client ,
214+ stage = "builds.qa" ,
215+ dest_dir_override = Path (_dir ),
212216 )
213217
214218
0 commit comments