Skip to content

Commit

Permalink
Cannot merge non json/csv datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
stijn-uva committed Jan 11, 2024
1 parent f66b8c0 commit c5fbe02
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions processors/conversion/merge_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,39 +130,43 @@ def process(self):
seen_ids = set()
with self.dataset.get_results_path().open("w", encoding="utf-8", newline="") as outfile:
for dataset in source_datasets:
for original_item, mapped_item in dataset.iterate_mapped_items():
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while mapping duplicates")

if not canonical_fieldnames:
canonical_fieldnames = set(mapped_item.keys())
sorted_canonical_fieldnames = list(mapped_item.keys())
else:
item_fieldnames = set(mapped_item.keys())
if item_fieldnames != canonical_fieldnames:
return self.dataset.finish_with_error("Cannot merge datasets - not the same set of "
"attributes per item (are they not the same type or "
"has one been altered by a processor?)")

processed += 1
if self.parameters["merge"] != "keep" and mapped_item.get("id") in seen_ids:
duplicates += 1
continue

seen_ids.add(mapped_item.get("id"))
merged += 1

if dataset.get_extension() == "csv":
if not writer:
writer = csv.DictWriter(outfile, fieldnames=sorted_canonical_fieldnames)
writer.writeheader()

writer.writerow(original_item)

elif dataset.get_extension() == "ndjson":
outfile.write(json.dumps(original_item) + "\n")

self.update_progress(processed, total_items)
try:
for original_item, mapped_item in dataset.iterate_mapped_items():
if self.interrupted:
raise ProcessorInterruptedException("Interrupted while mapping duplicates")

if not canonical_fieldnames:
canonical_fieldnames = set(mapped_item.keys())
sorted_canonical_fieldnames = list(mapped_item.keys())
else:
item_fieldnames = set(mapped_item.keys())
if item_fieldnames != canonical_fieldnames:
return self.dataset.finish_with_error("Cannot merge datasets - not the same set of "
"attributes per item (are they not the same type or "
"has one been altered by a processor?)")

processed += 1
if self.parameters["merge"] != "keep" and mapped_item.get("id") in seen_ids:
duplicates += 1
continue

seen_ids.add(mapped_item.get("id"))
merged += 1

if dataset.get_extension() == "csv":
if not writer:
writer = csv.DictWriter(outfile, fieldnames=sorted_canonical_fieldnames)
writer.writeheader()

writer.writerow(original_item)

elif dataset.get_extension() == "ndjson":
outfile.write(json.dumps(original_item) + "\n")

self.update_progress(processed, total_items)

except NotImplementedError:
self.dataset.finish_with_error(f"Datasets comprising {dataset.get_extension()} files cannot be merged. You can only merge NDJSON or CSV datasets.")

# phew, finally done
self.dataset.update_status(f"Merged {processed:,} items ({merged:,} merged, {duplicates:,} skipped)",
Expand Down

0 comments on commit c5fbe02

Please sign in to comment.