Skip to content

Commit

Permalink
Map items to objects instead of dicts (#409)
Browse files Browse the repository at this point in the history
* Consistent parameter name for map_item()

* Wrap mapped items in MappedItem() object

* Keep track of import warnings in search.py

* Add warning when mapping a tweet with missing metric data

* Add new iterate_mapped_objects method

* Log mapping warnings when merging datasets

* Pass object instead of dict

* Clarify Twitter warning

* Documenting MappedItem

* Explain things to myself

* Remove unused methods from Dataset class

* Woops, accidentally committed this

* Allow passing missing fields to MappedItem

* Add MappedItemIncompleteException

* Add map_missing argument to iterate_mapped_items/objects

* Fix bug

* MissingMappedField things

* Always warn when importing missing fields

* Consistent defaults

* Fix missing metrics warning
  • Loading branch information
stijn-uva authored Feb 20, 2024
1 parent 42ffa55 commit 32b8790
Show file tree
Hide file tree
Showing 19 changed files with 536 additions and 373 deletions.
17 changes: 13 additions & 4 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,11 +713,18 @@ def create_standalone(self):
@classmethod
def map_item_method_available(cls, dataset):
"""
Checks if map_item method exists and is compatible with dataset. If dataset does not have an extension,
returns False
Check if this processor can use map_item
:param BasicProcessor processor: The BasicProcessor subclass object with which to use map_item
:param DataSet dataset: The DataSet object with which to use map_item
Checks if map_item method exists and is compatible with dataset. If
dataset has a different extension than the default for this processor,
or if the dataset has no extension, this means we cannot be sure the
data is in the right format to be mapped, so `False` is returned in
that case even if a map_item() method is available.
:param BasicProcessor processor: The BasicProcessor subclass object
with which to use map_item
:param DataSet dataset: The DataSet object with which to
use map_item
"""
# only run item mapper if extension of processor == extension of
# data file, for the scenario where a csv file was uploaded and
Expand Down Expand Up @@ -745,8 +752,10 @@ def get_mapped_item(cls, item):
mapped_item = cls.map_item(item)
except (KeyError, IndexError) as e:
raise MapItemException(f"Unable to map item: {type(e).__name__}-{e}")

if not mapped_item:
raise MapItemException("Unable to map item!")

return mapped_item

@classmethod
Expand Down
37 changes: 32 additions & 5 deletions backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Search(BasicProcessor, ABC):
# Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp']
return_cols = ['thread_id', 'body', 'subject', 'timestamp']

flawless = 0
import_issues = 0

def process(self):
"""
Expand Down Expand Up @@ -114,10 +114,11 @@ def process(self):
# file exists somewhere, so we create it as an empty file
with open(query_parameters.get("copy_to"), "w") as empty_file:
empty_file.write("")
if self.flawless == 0:

if self.import_issues == 0:
self.dataset.finish(num_rows=num_items)
else:
self.dataset.update_status(f"Unexpected data format for {self.flawless} items. All data can be downloaded, but only data with expected format will be available to 4CAT processors; check logs for details", is_final=True)
self.dataset.update_status(f"{self.import_issues} item(s) in the dataset had an unexpected format. All data can be downloaded, but only data with the expected format will be available to 4CAT processors and in CSV exports; check the dataset log for details.", is_final=True)
self.dataset.finish(num_rows=num_items)

def search(self, query):
Expand Down Expand Up @@ -179,6 +180,8 @@ def import_from_file(self, path):
if not path.exists():
return []

import_warnings = {}

# Check if processor and dataset can use map_item
check_map_item = self.map_item_method_available(dataset=self.dataset)
if not check_map_item:
Expand All @@ -199,18 +202,42 @@ def import_from_file(self, path):
**item["data"],
"__import_meta": {k: v for k, v in item.items() if k != "data"}
}

# Check map item here!
if check_map_item:
try:
self.get_mapped_item(new_item)
mapped_item = self.get_mapped_item(new_item)

# keep track of items that raised a warning
# this means the item could be mapped, but there is
# some information the user should take note of
warning = mapped_item.get_message()
if not warning and mapped_item.get_missing_fields():
# usually this would have an explicit warning, but
# if not it's still useful to know
warning = f"The following fields are missing for this item and will be replaced with a default value: {', '.join(mapped_item.get_missing_fields())}"

if warning:
if warning not in import_warnings:
import_warnings[warning] = 0
import_warnings[warning] += 1
self.import_issues += 1

except MapItemException as e:
# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
self.flawless += 1
self.import_issues += 1
self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=unmapped_items is False)
unmapped_items = True

yield new_item

# warnings were raised about some items
# log these, with the number of items each warning applied to
if sum(import_warnings.values()) > 0:
self.dataset.log("While importing, the following issues were raised:")
for warning, num_items in import_warnings.items():
self.dataset.log(f" {warning} (for {num_items:,} item(s))")

path.unlink()
self.dataset.delete_parameter("file")

Expand Down
165 changes: 89 additions & 76 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
from common.config_manager import config
from common.lib.job import Job, JobNotFoundException
from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int
from common.lib.item_mapping import MappedItem, MissingMappedField
from common.lib.fourcat_module import FourcatModule
from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException,
MapItemException)
MapItemException, MappedItemIncompleteException)


class DataSet(FourcatModule):
Expand Down Expand Up @@ -219,17 +220,6 @@ def clear_log(self):
with log_path.open("w") as outfile:
pass

def has_log_file(self):
"""
Check if a log file exists for this dataset
This should be the case, but datasets created before status logging was
added may not have one, so we need to be able to check this.
:return bool: Does a log file exist?
"""
return self.get_log_path().exists()

def log(self, log):
"""
Write log message to file
Expand All @@ -245,24 +235,6 @@ def log(self, log):
with log_path.open("a", encoding="utf-8") as outfile:
outfile.write("%s: %s\n" % (datetime.datetime.now().strftime("%c"), log))

def get_log_iterator(self):
"""
Return an iterator with a (time, message) tuple per line in the log
Just a convenience function!
:return iterator: (time, message) per log message
"""
log_path = self.get_log_path()
if not log_path.exists():
return

with log_path.open(encoding="utf-8") as infile:
for line in infile:
logtime = line.split(":")[0]
logmsg = ":".join(line.split(":")[1:])
yield (logtime, logmsg)

def iterate_items(self, processor=None, bypass_map_item=False, warn_unmappable=True):
"""
A generator that iterates through a CSV or NDJSON file
Expand Down Expand Up @@ -317,7 +289,7 @@ def iterate_items(self, processor=None, bypass_map_item=False, warn_unmappable=T

if item_mapper:
try:
item = own_processor.get_mapped_item(item)
item = own_processor.get_mapped_item(item).get_item_data()
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
Expand All @@ -337,7 +309,7 @@ def iterate_items(self, processor=None, bypass_map_item=False, warn_unmappable=T
item = json.loads(line)
if item_mapper:
try:
item = own_processor.get_mapped_item(item)
item = own_processor.get_mapped_item(item).get_item_data()
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
Expand All @@ -349,15 +321,41 @@ def iterate_items(self, processor=None, bypass_map_item=False, warn_unmappable=T
else:
raise NotImplementedError("Cannot iterate through %s file" % path.suffix)

def iterate_mapped_items(self, processor=None, warn_unmappable=True):
def iterate_mapped_objects(self, processor=None, warn_unmappable=True, map_missing="default"):
"""
Wrapper for iterate_items that returns both the original item and the mapped item (or else the same identical item).
No extension check is performed here as the point is to be able to handle the original object and save as an appropriate
filetype.
Generate mapped dataset items
Wrapper for iterate_items that returns both the original item and the
mapped item (or else the same identical item). No extension check is
performed here as the point is to be able to handle the original
object and save as an appropriate filetype.
Note the two parameters warn_unmappable and map_missing. Items can be
unmappable in that their structure is too different to coerce into a
neat dictionary of the structure the data source expects. This makes it
'unmappable' and warn_unmappable determines what happens in this case.
It can also be of the right structure, but with some fields missing or
incomplete. map_missing determines what happens in that case. The
latter is for example possible when importing data via Zeeschuimer,
which produces unstably-structured data captured from social media
sites.
:param BasicProcessor processor: A reference to the processor
iterating the dataset.
:return generator: A generator that yields a tuple with the unmapped item followed by the mapped item
:param bool warn_unmappable: If an item is not mappable, skip the item
and log a warning
:param map_missing: Indicates what to do with mapped items for which
some fields could not be mapped. Defaults to 'empty_str'. Must be one of:
- 'default': fill missing fields with the default passed by map_item
- 'abort': raise a MappedItemIncompleteException if a field is missing
- a dictionary with a 'replace' key: replace missing field with the
value in the dictionary for the 'replace' key
- a callback: replace missing field with the return value of the
callback. The MappedItem object is passed to the callback as the
first argument and the name of the missing field as the second.
:return generator: A generator that yields a tuple with the unmapped
item followed by the mapped item
"""
unmapped_items = False
# Collect item_mapper for use with filter
Expand All @@ -380,12 +378,65 @@ def iterate_mapped_items(self, processor=None, warn_unmappable=True):
self.warn_unmappable_item(i, processor, e, warn_admins=unmapped_items is False)
unmapped_items = True
continue

# check if fields have been marked as 'missing' in the
# underlying data, and treat according to the chosen strategy
if mapped_item.get_missing_fields():
default_strategy = "default"

# strategy can be for all fields at once, or per field
# in the former case it's a string, in the latter a dict
# here we make sure it's always a dict to not complicate
# the following code
if type(map_missing) is str:
default_strategy = map_missing
map_missing = {}

for missing_field in mapped_item.get_missing_fields():
strategy = map_missing.get(missing_field, default_strategy)

if callable(strategy):
# delegate handling to a callback
mapped_item.data[missing_field] = strategy(mapped_item.data, missing_field)
elif strategy == "abort":
# raise an exception to be handled at the processor level
raise MappedItemIncompleteException(f"Cannot process item, field {missing_field} missing in source data.")
elif strategy == "default":
# use whatever was passed to the object constructor
mapped_item.data[missing_field] = mapped_item.data[missing_field].value
else:
raise ValueError("map_missing must be 'abort', 'default', or a callback.")

else:
mapped_item = original_item

# Yield the two items
yield original_item, mapped_item

def iterate_mapped_items(self, processor=None, warn_unmappable=True, map_missing="default"):
"""
Generate mapped dataset dictionaries
Identical to iterate_mapped_object, but yields the mapped item's data
(i.e. a dictionary) rather than the MappedItem object.
:param BasicProcessor processor: A reference to the processor
iterating the dataset.
:param bool warn_unmappable: If an item is not mappable, skip the item
and log a warning
:param map_missing: Indicates what to do with mapped items for which
some fields could not be mapped. Defaults to 'default' (see
`iterate_mapped_object()`).
:return generator: A generator that yields a tuple with the unmapped
item followed by the mapped item
"""
for original_item, mapped_item in self.iterate_mapped_objects(processor, warn_unmappable, map_missing):
if type(mapped_item) is MappedItem:
yield original_item, mapped_item.get_item_data()
else:
yield original_item, mapped_item

def get_item_keys(self, processor=None):
"""
Get item attribute names
Expand Down Expand Up @@ -450,18 +501,6 @@ def remove_staging_areas(self):
if staging_area.is_dir():
shutil.rmtree(staging_area)

def get_results_dir(self):
"""
Get path to results directory
Always returns a path, that will at some point contain the dataset
data, but may not do so yet. Use this to get the location to write
generated results to.
:return str: A path to the results directory
"""
return self.folder

def finish(self, num_rows=0):
"""
Declare the dataset finished
Expand All @@ -474,32 +513,6 @@ def finish(self, num_rows=0):
self.data["is_finished"] = True
self.data["num_rows"] = num_rows

def unfinish(self):
"""
Declare unfinished, and reset status, so that it may be executed again.
"""
if not self.is_finished():
raise RuntimeError("Cannot unfinish an unfinished dataset")

try:
self.get_results_path().unlink()
except FileNotFoundError:
pass

self.data["timestamp"] = int(time.time())
self.data["is_finished"] = False
self.data["num_rows"] = 0
self.data["status"] = "Dataset is queued."
self.data["progress"] = 0

self.db.update("datasets", data={
"timestamp": self.data["timestamp"],
"is_finished": self.data["is_finished"],
"num_rows": self.data["num_rows"],
"status": self.data["status"],
"progress": 0
}, where={"key": self.key})

def copy(self, shallow=True):
"""
Copies the dataset, making a new version with a unique key
Expand Down Expand Up @@ -1377,7 +1390,6 @@ def get_own_processor(self):
processor_type = self.parameters.get("type", self.data.get("type"))
return backend.all_modules.processors.get(processor_type)


def get_available_processors(self, user=None):
"""
Get list of processors that may be run for this dataset
Expand Down Expand Up @@ -1587,6 +1599,7 @@ def warn_unmappable_item(self, item_count, processor=None, error_message=None, w
if processor is not None:
processor.log.warning(f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}.")
elif hasattr(self.db, "log"):
# borrow the database's log handler
self.db.log.warning(f"Unable to map item all items for dataset {closest_dataset.key}.")
else:
# No other log available
Expand Down
Loading

0 comments on commit 32b8790

Please sign in to comment.