diff --git a/.github/workflows/docker_pr_test.yml b/.github/workflows/docker_pr_test.yml index 3109e3f8f..7d8b8db3b 100644 --- a/.github/workflows/docker_pr_test.yml +++ b/.github/workflows/docker_pr_test.yml @@ -12,16 +12,34 @@ jobs: name: Test docker-compose up with build runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Run docker compose up run: docker compose -f docker-compose_build.yml up -d - - name: Wait and check log + - name: Check backend container is running + run: | + sleep 30 + if [ "$(docker ps | grep 4cat_backend)" ]; then + echo "Docker 4cat_backend container is running..." + else + echo -e "Docker 4cat_backend container is not running...\nPrinting 4cat_backend logs:\n\n$(docker container logs 4cat_backend)" + exit 1 + fi + - name: Check frontend container is running + run: | + sleep 10 + if [ "$(docker ps | grep 4cat_frontend)" ]; then + echo "Docker 4cat_frontend container is running..." + else + echo -e "Docker 4cat_frontend container is not running...\nPrinting 4cat_frontend logs:\n\n$(docker container logs 4cat_frontend)" + exit 1 + fi + - name: Check 4CAT backend log for expected INFO message run: | test_case=" INFO at api.py:65: Local API listening for requests at backend:4444" sleep 30 && var=$(docker exec 4cat_backend tail -n 1 logs/backend_4cat.log) echo "::group::Backend test" if [ "$(echo "$var" | tr "|" "\n" | sed -n '2p')" = "$test_case" ]; then - echo "Backend running as expected" + echo "4CAT backend running as expected" else echo "::error::Backend failed to start" echo "Test:$test_case" @@ -32,7 +50,11 @@ jobs: - name: Print log on failure if: failure() run: | - docker cp 4cat_backend:/usr/src/app/logs/backend_4cat.log ./backend_4cat.log - echo "::group::Backend logs" - cat backend_4cat.log - echo "::endgroup::" + if [ "$(docker ps | grep 4cat)" ]; then + docker cp 4cat_backend:/usr/src/app/logs/backend_4cat.log ./backend_4cat.log + echo "::group::Backend logs" + cat backend_4cat.log + echo "::endgroup::" + else + echo "Docker containers not running; check logs in previous steps" + fi diff --git a/README.md b/README.md index 7e4ec7769..9fc84f890 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,10 @@ [](https://www.python.org/) [](https://github.com/digitalmethodsinitiative/4cat/actions/workflows/docker_latest.yml) -
4CAT has a website at 4cat.nl.
4CAT has a website at 4cat.nl.
+Follow 4CAT on Bluesky for updates.
4CAT is a research tool that can be used to analyse and process data from online social platforms. Its goal is to make the capture and analysis of data from these platforms accessible to people through a web interface, without diff --git a/backend/workers/cleanup_tempfiles.py b/backend/workers/cleanup_tempfiles.py index 51e96fd57..b0b3a6d21 100644 --- a/backend/workers/cleanup_tempfiles.py +++ b/backend/workers/cleanup_tempfiles.py @@ -3,7 +3,8 @@ """ import shutil import re - +import json +from datetime import datetime from pathlib import Path from common.config_manager import config @@ -27,12 +28,21 @@ class TempFileCleaner(BasicWorker): ensure_job = {"remote_id": "localhost", "interval": 10800} + # Use tracking file to delay deletion of files that may still be in use + tracking_file = config.get('PATH_DATA').joinpath(".temp_file_cleaner") + days_to_keep = 7 + def work(self): """ Go through result files, and for each one check if it should still exist :return: """ + # Load tracking file + if not self.tracking_file.exists(): + tracked_files = {} + else: + tracked_files = json.loads(self.tracking_file.read_text()) result_files = Path(config.get('PATH_DATA')).glob("*") for file in result_files: @@ -41,6 +51,7 @@ def work(self): continue if self.interrupted: + self.tracking_file.write_text(json.dumps(tracked_files)) raise WorkerInterruptedException("Interrupted while cleaning up orphaned result files") # the key of the dataset files belong to can be extracted from the @@ -59,20 +70,28 @@ def work(self): except DataSetException: # the dataset has been deleted since, but the result file still # exists - should be safe to clean up - self.log.info("No matching dataset with key %s for file %s, deleting file" % (key, str(file))) - if file.is_dir(): - try: - shutil.rmtree(file) - except PermissionError: - self.log.info(f"Folder {file} does not belong to a dataset but cannot be deleted (no " - f"permissions), skipping") - - else: - try: - file.unlink() - except FileNotFoundError: - # the file has been deleted since - pass + if file.name not in tracked_files: + self.log.info(f"No matching dataset with key {key} for file {file}; marking for deletion") + tracked_files[file.name] = datetime.now().timestamp() + (self.days_to_keep * 86400) + elif tracked_files[file.name] < datetime.now().timestamp(): + self.log.info(f"File {file} marked for deletion since {datetime.fromtimestamp(tracked_files[file.name]).strftime('%Y-%m-%d %H:%M:%S')}, deleting file") + if file.is_dir(): + try: + shutil.rmtree(file) + except PermissionError: + self.log.info(f"Folder {file} does not belong to a dataset but cannot be deleted (no " + f"permissions), skipping") + + else: + try: + file.unlink() + except FileNotFoundError: + # the file has been deleted since + pass + + # Remove from tracking + del tracked_files[file.name] + continue if file.is_dir() and "-staging" in file.stem and dataset.is_finished(): @@ -84,4 +103,7 @@ def work(self): dataset.key, str(file))) shutil.rmtree(file) + # Update tracked files + self.tracking_file.write_text(json.dumps(tracked_files)) + self.job.finish() \ No newline at end of file diff --git a/common/config_manager.py b/common/config_manager.py index eb6c846d0..1b8d4052f 100644 --- a/common/config_manager.py +++ b/common/config_manager.py @@ -146,16 +146,6 @@ def ensure_database(self): """ self.with_db() - # delete unknown keys - known_keys = tuple([names for names, settings in config.config_definition.items() if settings.get("type") not in UserInput.OPTIONS_COSMETIC]) - unknown_keys = self.db.fetchall("SELECT DISTINCT name FROM settings WHERE name NOT IN %s", (known_keys,)) - - if unknown_keys: - self.db.log.info(f"Deleting unknown settings from database: {', '.join([key['name'] for key in unknown_keys])}") - self.db.delete("settings", where={"name": tuple([key["name"] for key in unknown_keys])}, commit=False) - - self.db.commit() - # create global values for known keys with the default known_settings = self.get_all() for setting, parameters in self.config_definition.items(): diff --git a/common/lib/dataset.py b/common/lib/dataset.py index b092d2a4e..b494acbd3 100644 --- a/common/lib/dataset.py +++ b/common/lib/dataset.py @@ -15,7 +15,7 @@ from common.config_manager import config from common.lib.job import Job, JobNotFoundException from common.lib.module_loader import ModuleCollector -from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int +from common.lib.helpers import get_software_commit, NullAwareTextIOWrapper, convert_to_int, get_software_version from common.lib.item_mapping import MappedItem, MissingMappedField, DatasetItem from common.lib.fourcat_module import FourcatModule from common.lib.exceptions import (ProcessorInterruptedException, DataSetException, DataSetNotFoundException, @@ -1586,6 +1586,20 @@ def get_media_type(self): # Default to text return self.parameters.get("media_type", "text") + def get_metadata(self): + """ + Get dataset metadata + + This consists of all the data stored in the database for this dataset, plus the current 4CAT version (appended + as 'current_4CAT_version'). This is useful for exporting datasets, as it can be used by another 4CAT instance to + update its database (and ensure compatibility with the exporting version of 4CAT). + """ + metadata = self.db.fetchone("SELECT * FROM datasets WHERE key = %s", (self.key,)) + + # get 4CAT version (presumably to ensure export is compatible with import) + metadata["current_4CAT_version"] = get_software_version() + return metadata + def get_result_url(self): """ Gets the 4CAT frontend URL of a dataset file. diff --git a/datasources/douyin/search_douyin.py b/datasources/douyin/search_douyin.py index 4b5d5b814..12768196c 100644 --- a/datasources/douyin/search_douyin.py +++ b/datasources/douyin/search_douyin.py @@ -218,7 +218,7 @@ def map_item(item): "timestamp": post_timestamp.strftime("%Y-%m-%d %H:%M:%S"), "post_source_domain": urllib.parse.unquote(metadata.get("source_platform_url")), # Adding this as different Douyin pages contain different data - "post_url": f"https://www.douyin.com/video/{item[aweme_id_key]}", + "post_url": f"https://www.douyin.com/video/{item[aweme_id_key]}" if subject == "Post" else f"https://live.douyin.com/{author.get('web_rid')}", "region": item.get("region", ""), "hashtags": ",".join( [tag[hashtag_key] for tag in (item[text_extra_key] if item[text_extra_key] is not None else []) if diff --git a/datasources/fourcat_import/import_4cat.py b/datasources/fourcat_import/import_4cat.py index cd231b445..dc5d079fc 100644 --- a/datasources/fourcat_import/import_4cat.py +++ b/datasources/fourcat_import/import_4cat.py @@ -4,6 +4,7 @@ import requests import json import time +import zipfile from backend.lib.processor import BasicProcessor from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException, @@ -19,8 +20,8 @@ class FourcatImportException(FourcatException): class SearchImportFromFourcat(BasicProcessor): type = "import_4cat-search" # job ID category = "Search" # category - title = "Import from 4CAT" # title displayed in UI - description = "Import a dataset from another 4CAT server" # description displayed in UI + title = "Import 4CAT dataset and analyses" # title displayed in UI + description = "Import a dataset from another 4CAT server or from a zip file (exported from a 4CAT server)" # description displayed in UI is_local = False # Whether this datasource is locally scraped is_static = False # Whether this datasource is still updated @@ -33,29 +34,328 @@ class SearchImportFromFourcat(BasicProcessor): "\n\nTo import a dataset across servers, both servers need to be running the same version of 4CAT. " "You can find the current version in the footer at the bottom of the interface." }, + "method": { + "type": UserInput.OPTION_CHOICE, + "help": "Import Type", + "options": { + "zip": "Zip File", + "url": "4CAT URL", + }, + "default": "url" + }, "url": { "type": UserInput.OPTION_TEXT, "help": "Dataset URL", - "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/." + "tooltip": "URL to the dataset's page, for example https://4cat.example/results/28da332f8918e6dc5aacd1c3b0170f01b80bd95f8ff9964ac646cecd33bfee49/.", + "requires": "method^=url" }, "intro2": { "type": UserInput.OPTION_INFO, "help": "You can create an API key via the 'API Access' item in 4CAT's navigation menu. Note that you need " "an API key from **the server you are importing from**, not the one you are looking at right now. " - "Additionally, you need to have owner access to the dataset you want to import." + "Additionally, you need to have owner access to the dataset you want to import.", + "requires": "method^=url" }, "api-key": { "type": UserInput.OPTION_TEXT, "help": "4CAT API Key", "sensitive": True, "cache": True, - } + "requires": "method^=url" + }, + "data_upload": { + "type": UserInput.OPTION_FILE, + "help": "File", + "tooltip": "Upload a ZIP file containing a dataset exported from a 4CAT server.", + "requires": "method^=zip" + }, + } created_datasets = None base = None + remapped_keys = None + dataset_owner = None def process(self): + """ + Import 4CAT dataset either from another 4CAT server or from the uploaded zip file + """ + self.created_datasets = set() # keys of created datasets - may not be successful! + self.remapped_keys = {} # changed dataset keys + self.dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner + try: + if self.parameters.get("method") == "zip": + self.process_zip() + else: + self.process_urls() + except Exception as e: + # Catch all exceptions and finish the job with an error + # Resuming is impossible because this dataset was overwritten with the importing dataset + # halt_and_catch_fire() will clean up and delete the datasets that were created + self.interrupted = True + try: + self.halt_and_catch_fire() + except ProcessorInterruptedException: + pass + # Reraise the original exception for logging + raise e + + def after_create(query, dataset, request): + """ + Hook to execute after the dataset for this source has been created + + In this case, put the file in a temporary location so it can be + processed properly by the related Job later. + + :param dict query: Sanitised query parameters + :param DataSet dataset: Dataset created for this query + :param request: Flask request submitted for its creation + """ + if query.get("method") == "zip": + file = request.files["option-data_upload"] + file.seek(0) + with dataset.get_results_path().with_suffix(".importing").open("wb") as outfile: + while True: + chunk = file.read(1024) + if len(chunk) == 0: + break + outfile.write(chunk) + else: + # nothing to do for URLs + pass + + + def process_zip(self): + """ + Import 4CAT dataset from a ZIP file + """ + self.dataset.update_status(f"Importing datasets and analyses from ZIP file.") + temp_file = self.dataset.get_results_path().with_suffix(".importing") + + imported = [] + processed_files = 1 # take into account the export.log file + failed_imports = [] + with zipfile.ZipFile(temp_file, "r") as zip_ref: + zip_contents = zip_ref.namelist() + + # Get all metadata files and determine primary dataset + metadata_files = [file for file in zip_contents if file.endswith("_metadata.json")] + if not metadata_files: + self.dataset.finish_with_error("No metadata files found in ZIP file; is this a 4CAT export?") + return + + # Get the primary dataset + primary_dataset_keys = set() + datasets = [] + parent_child_mapping = {} + for file in metadata_files: + with zip_ref.open(file) as f: + metadata = json.load(f) + if not metadata.get("key_parent"): + primary_dataset_keys.add(metadata.get("key")) + datasets.append(metadata) + else: + # Store the mapping of parent to child datasets + parent_key = metadata.get("key_parent") + if parent_key not in parent_child_mapping: + parent_child_mapping[parent_key] = [] + parent_child_mapping[parent_key].append(metadata) + + # Primary dataset will overwrite this dataset; we could address this to support multiple primary datasets + if len(primary_dataset_keys) != 1: + self.dataset.finish_with_error("ZIP file contains multiple primary datasets; only one is allowed.") + return + + # Import datasets + while datasets: + self.halt_and_catch_fire() + + # Create the datasets + metadata = datasets.pop(0) + dataset_key = metadata.get("key") + processed_metadata = self.process_metadata(metadata) + new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys) + processed_files += 1 + + # TODO: I am now noticing that we do not update the results_file; it is even more unlikely to collide as it is both a random key and label combined... but... + # Copy the log file + self.halt_and_catch_fire() + log_filename = new_dataset.get_log_path().name + if log_filename in zip_contents: + self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}") + with zip_ref.open(log_filename) as f: + with new_dataset.get_log_path().open("wb") as outfile: + outfile.write(f.read()) + processed_files += 1 + else: + self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).") + + # Copy the results + self.halt_and_catch_fire() + results_filename = new_dataset.get_results_path().name + if results_filename in zip_contents: + self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}") + with zip_ref.open(results_filename) as f: + with new_dataset.get_results_path().open("wb") as outfile: + outfile.write(f.read()) + processed_files += 1 + + if not imported: + # first dataset - use num rows as 'overall' + num_rows = metadata["num_rows"] + else: + # TODO: should I just delete the new_dataset here? + self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") + new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).") + failed_imports.append(dataset_key) + continue + + # finally, the kids + self.halt_and_catch_fire() + if dataset_key in parent_child_mapping: + datasets.extend(parent_child_mapping[dataset_key]) + self.dataset.log(f"Adding ({len(parent_child_mapping[dataset_key])}) child datasets to import queue") + + # done - remember that we've imported this one + imported.append(new_dataset) + new_dataset.update_status(metadata["status"]) + + if new_dataset.key != self.dataset.key: + # only finish if this is not the 'main' dataset, or the user + # will think the whole import is done + new_dataset.finish(metadata["num_rows"]) + + # Check that all files were processed + missed_files = [] + if len(zip_contents) != processed_files: + for file in zip_contents: + if file not in processed_files: + missed_files.append(file) + + # todo: this part needs updating if/when we support importing multiple datasets! + if failed_imports: + self.dataset.update_status(f"Dataset import finished, but not all data was imported properly. " + f"{len(failed_imports)} dataset(s) were not successfully imported. Check the " + f"dataset log file for details.", is_final=True) + elif missed_files: + self.dataset.log(f"ZIP file contained {len(missed_files)} files that were not processed: {missed_files}") + self.dataset.update_status(f"Dataset import finished, but not all files were processed. " + f"{len(missed_files)} files were not successfully imported. Check the " + f"dataset log file for details.", is_final=True) + else: + self.dataset.update_status(f"{len(imported)} dataset(s) succesfully imported.", + is_final=True) + + if not self.dataset.is_finished(): + # now all related datasets are imported, we can finish the 'main' + # dataset, and the user will be alerted that the full import is + # complete + self.dataset.finish(num_rows) + + + @staticmethod + def process_metadata(metadata): + """ + Process metadata for import + """ + # get rid of some keys that are server-specific and don't need to + # be stored (or don't correspond to database columns) + metadata.pop("current_4CAT_version") + metadata.pop("id") + metadata.pop("job") + metadata.pop("is_private") + metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! + + # extra params are stored as JSON... + metadata["parameters"] = json.loads(metadata["parameters"]) + if "copied_from" in metadata["parameters"]: + metadata["parameters"].pop("copied_from") + metadata["parameters"] = json.dumps(metadata["parameters"]) + + return metadata + + def create_dataset(self, metadata, original_key, primary=False): + """ + Create a new dataset + """ + if primary: + self.dataset.update_status(f"Importing primary dataset {original_key}.") + # if this is the first dataset we're importing, make it the + # processor's "own" dataset. the key has already been set to + # the imported dataset's key via ensure_key() (or a new unqiue + # key if it already existed on this server) + # by making it the "own" dataset, the user initiating the + # import will see the imported dataset as the "result" of their + # import query in the interface, similar to the workflow for + # other data sources + new_dataset = self.dataset + metadata.pop("key") # key already OK (see above) + self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) + + else: + self.dataset.update_status(f"Importing child dataset {original_key}.") + # supernumerary datasets - handle on their own + # these include any children of imported datasets + try: + key_exists = DataSet(key=metadata["key"], db=self.db, modules=self.modules) + + # if we *haven't* thrown a DatasetException now, then the + # key is already in use, so create a "dummy" dataset and + # overwrite it with the metadata we have (except for the + # key). this ensures that a new unique key will be + # generated. + new_dataset = DataSet(parameters={}, type=self.type, db=self.db, modules=self.modules) + metadata.pop("key") + self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) + + except DataSetException: + # this is *good* since it means the key doesn't exist, so + # we can re-use the key of the imported dataset + self.db.insert("datasets", data=metadata) + new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules) + + # make sure the dataset path uses the new key and local dataset + # path settings. this also makes sure the log file is created in + # the right place (since it is derived from the results file path) + extension = metadata["result_file"].split(".")[-1] + new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) + + new_dataset.update_status("Imported dataset created") + if new_dataset.key != original_key: + # could not use original key because it was already in use + # so update any references to use the new key + self.remapped_keys[original_key] = new_dataset.key + new_dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " + f"{new_dataset.key} instead of key {original_key}!") + + # refresh object, make sure it's in sync with the database + self.created_datasets.add(new_dataset.key) + new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules) + if new_dataset.key == self.dataset.key: + # this ensures that the first imported dataset becomes the + # processor's "own" dataset, and that the import logs go to + # that dataset's log file. For later imports, this evaluates to + # False. + self.dataset = new_dataset + + # if the key of the parent dataset was changed, change the + # reference to it that the child dataset has + if new_dataset.key_parent and new_dataset.key_parent in self.remapped_keys: + new_dataset.key_parent = self.remapped_keys[new_dataset.key_parent] + + # update some attributes that should come from the new server, not + # the old + new_dataset.creator = self.dataset_owner + new_dataset.original_timestamp = new_dataset.timestamp + new_dataset.imported = True + new_dataset.timestamp = int(time.time()) + new_dataset.db.commit() + + return new_dataset + + + def process_urls(self): """ Import 4CAT dataset from another 4CAT server @@ -67,12 +367,9 @@ def process(self): keys = SearchImportFromFourcat.get_keys_from_urls(urls) api_key = self.parameters.get("api-key") - self.created_datasets = set() # keys of created datasets - may not be successful! imported = [] # successfully imported datasets failed_imports = [] # keys that failed to import - remapped_keys = {} # changed dataset keys num_rows = 0 # will be used later - dataset_owner = self.dataset.get_owners()[0] # at this point it has 1 owner # we can add support for multiple datasets later by removing # this part! @@ -101,90 +398,10 @@ def process(self): failed_imports.append(dataset_key) continue - # get rid of some keys that are server-specific and don't need to - # be stored (or don't correspond to database columns) - metadata.pop("current_4CAT_version") - metadata.pop("id") - metadata.pop("job") - metadata.pop("is_private") - metadata.pop("is_finished") # we'll finish it ourselves, thank you!!! - - # extra params are stored as JSON... - metadata["parameters"] = json.loads(metadata["parameters"]) - if "copied_from" in metadata["parameters"]: - metadata["parameters"].pop("copied_from") - metadata["parameters"] = json.dumps(metadata["parameters"]) - - if not imported: - # if this is the first dataset we're importing, make it the - # processor's "own" dataset. the key has already been set to - # the imported dataset's key via ensure_key() (or a new unqiue - # key if it already existed on this server) - # by making it the "own" dataset, the user initiating the - # import will see the imported dataset as the "result" of their - # import query in the interface, similar to the workflow for - # other data sources - new_dataset = self.dataset - metadata.pop("key") # key already OK (see above) - self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) + metadata = self.process_metadata(metadata) - else: - # supernumerary datasets - handle on their own - # these include any children of imported datasets - try: - key_exists = DataSet(key=metadata["key"], db=self.db) - - # if we *haven't* thrown a DatasetException now, then the - # key is already in use, so create a "dummy" dataset and - # overwrite it with the metadata we have (except for the - # key). this ensures that a new unique key will be - # generated. - new_dataset = DataSet(parameters={}, type=self.type, db=self.db) - metadata.pop("key") - self.db.update("datasets", where={"key": new_dataset.key}, data=metadata) - - except DataSetException: - # this is *good* since it means the key doesn't exist, so - # we can re-use the key of the imported dataset - self.db.insert("datasets", data=metadata) - new_dataset = DataSet(key=metadata["key"], db=self.db) - - # make sure the dataset path uses the new key and local dataset - # path settings. this also makes sure the log file is created in - # the right place (since it is derived from the results file path) - extension = metadata["result_file"].split(".")[-1] - new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension) - - new_dataset.update_status("Imported dataset created") - if new_dataset.key != dataset_key: - # could not use original key because it was already in use - # so update any references to use the new key - remapped_keys[dataset_key] = new_dataset.key - new_dataset.update_status(f"Cannot import with same key - already in use on this server. Using key " - f"{new_dataset.key} instead of key {dataset_key}!") - - # refresh object, make sure it's in sync with the database - self.created_datasets.add(new_dataset.key) - new_dataset = DataSet(key=new_dataset.key, db=self.db) - if new_dataset.key == self.dataset.key: - # this ensures that the first imported dataset becomes the - # processor's "own" dataset, and that the import logs go to - # that dataset's log file. For later imports, this evaluates to - # False. - self.dataset = new_dataset - - # if the key of the parent dataset was changed, change the - # reference to it that the child dataset has - if new_dataset.key_parent and new_dataset.key_parent in remapped_keys: - new_dataset.key_parent = remapped_keys[new_dataset.key_parent] - - # update some attributes that should come from the new server, not - # the old - new_dataset.creator = dataset_owner - new_dataset.original_timestamp = new_dataset.timestamp - new_dataset.imported = True - new_dataset.timestamp = int(time.time()) - new_dataset.db.commit() + # create the new dataset + new_dataset = self.create_dataset(metadata, dataset_key, primary=True if not imported else False) # then, the log self.halt_and_catch_fire() @@ -283,9 +500,9 @@ def halt_and_catch_fire(self): # overwritten by this point deletables = [k for k in self.created_datasets if k != self.dataset.key] for deletable in deletables: - DataSet(key=deletable, db=self.db).delete() + DataSet(key=deletable, db=self.db, modules=self.modules).delete() - self.dataset.finish_with_error(f"Interrupted while importing datasets from {self.base}. Cannot resume - you " + self.dataset.finish_with_error(f"Interrupted while importing datasets{' from '+self.base if self.base else ''}. Cannot resume - you " f"will need to initiate the import again.") raise ProcessorInterruptedException() @@ -353,47 +570,72 @@ def validate_query(query, request, user): :param User user: User object of user who has submitted the query :return dict: Safe query parameters """ - urls = query.get("url") - if not urls: - return QueryParametersException("Provide at least one dataset URL.") - - urls = urls.split(",") - bases = set([url.split("/results/")[0].lower() for url in urls]) - keys = SearchImportFromFourcat.get_keys_from_urls(urls) + if query.get("method") == "zip": + filename = "" + if "option-data_upload-entries" in request.form: + # First pass sends list of files in the zip + pass + elif "option-data_upload" in request.files: + # Second pass sends the actual file + file = request.files["option-data_upload"] + if not file: + raise QueryParametersException("No file uploaded.") + + if not file.filename.endswith(".zip"): + raise QueryParametersException("Uploaded file must be a ZIP file.") + + filename = file.filename + else: + raise QueryParametersException("No file was offered for upload.") + + return { + "method": "zip", + "filename": filename + } + elif query.get("method") == "url": + urls = query.get("url") + if not urls: + raise QueryParametersException("Provide at least one dataset URL.") + + urls = urls.split(",") + bases = set([url.split("/results/")[0].lower() for url in urls]) + keys = SearchImportFromFourcat.get_keys_from_urls(urls) + + if len(keys) != 1: + # todo: change this to < 1 if we allow multiple datasets + raise QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") + + if len(bases) != 1: + raise QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " + "one 4CAT server at a time.") + + base = urls[0].split("/results/")[0] + try: + # test if API key is valid and server is reachable + test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") + except FourcatImportException as e: + raise QueryParametersException(str(e)) - if len(keys) != 1: - # todo: change this to < 1 if we allow multiple datasets - return QueryParametersException("You need to provide a single URL to a 4CAT dataset to import.") + try: + # test if we get a response we can parse + metadata = test.json() + except ValueError: + raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") - if len(bases) != 1: - return QueryParametersException("All URLs need to point to the same 4CAT server. You can only import from " - "one 4CAT server at a time.") + version = get_software_version() - base = urls[0].split("/results/")[0] - try: - # test if API key is valid and server is reachable - test = SearchImportFromFourcat.fetch_from_4cat(base, keys[0], query.get("api-key"), "metadata") - except FourcatImportException as e: - raise QueryParametersException(str(e)) + if metadata.get("current_4CAT_version") != version: + raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " + f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " + "sure both are running the same version of 4CAT and try again.") - try: - # test if we get a response we can parse - metadata = test.json() - except ValueError: - raise QueryParametersException(f"Unexpected response when trying to fetch metadata for dataset {keys[0]}.") - - version = get_software_version() - - if metadata.get("current_4CAT_version") != version: - raise QueryParametersException(f"This 4CAT server is running a different version of 4CAT ({version}) than " - f"the one you are trying to import from ({metadata.get('current_4CAT_version')}). Make " - "sure both are running the same version of 4CAT and try again.") - - # OK, we can import at least one dataset - return { - "url": ",".join(urls), - "api-key": query.get("api-key") - } + # OK, we can import at least one dataset + return { + "url": ",".join(urls), + "api-key": query.get("api-key") + } + else: + raise QueryParametersException("Import method not yet implemented.") @staticmethod def get_keys_from_urls(urls): diff --git a/datasources/linkedin/search_linkedin.py b/datasources/linkedin/search_linkedin.py index f357341ed..a8380b4d8 100644 --- a/datasources/linkedin/search_linkedin.py +++ b/datasources/linkedin/search_linkedin.py @@ -79,7 +79,10 @@ def map_item(item): # or alternatively they are stored here: if not images and item["content"] and item["content"].get("articleComponent") and item["content"]["articleComponent"].get("largeImage"): image = item["content"]["articleComponent"]["largeImage"]["attributes"][0]["detailData"]["vectorImage"] - images.append(image["rootUrl"] + image["artifacts"][0]["fileIdentifyingUrlPathSegment"]) + if not image and item["content"]["articleComponent"]["largeImage"]["attributes"][0]["imageUrl"]: + images.append(item["content"]["articleComponent"]["largeImage"]["attributes"][0]["imageUrl"]["url"]) + elif image and image.get("artifacts"): + images.append(image["rootUrl"] + image["artifacts"][0]["fileIdentifyingUrlPathSegment"]) author = SearchLinkedIn.get_author(item) diff --git a/datasources/telegram/search_telegram.py b/datasources/telegram/search_telegram.py index 55c3a61b7..9e523a247 100644 --- a/datasources/telegram/search_telegram.py +++ b/datasources/telegram/search_telegram.py @@ -6,6 +6,7 @@ import hashlib import asyncio import json +import ural import time import re @@ -24,7 +25,7 @@ FloodWaitError, ApiIdInvalidError, PhoneNumberInvalidError, RPCError from telethon.tl.functions.channels import GetFullChannelRequest from telethon.tl.functions.users import GetFullUserRequest -from telethon.tl.types import User +from telethon.tl.types import User, MessageEntityMention @@ -214,6 +215,14 @@ def get_options(cls, parent_dataset=None, user=None): "tooltip": "Entities need to be references at least this many times to be added to the query. Only " "references discovered below the max crawl depth are taken into account." } + options["crawl-via-links"] = { + "type": UserInput.OPTION_TOGGLE, + "default": False, + "help": "Extract new groups from links", + "tooltip": "Look for references to other groups in message content via t.me links and @references. " + "This is more error-prone than crawling only via forwards, but can be a way to discover " + "links that would otherwise remain undetected." + } return options @@ -234,6 +243,7 @@ def get_items(self, query): self.details_cache = {} self.failures_cache = set() + #TODO: This ought to yield as we're holding everything in memory; async generator? execute_queries() also needs to be modified for this results = asyncio.run(self.execute_queries()) if not query.get("save-session"): @@ -326,9 +336,10 @@ async def execute_queries(self): except Exception as e: # catch-all so we can disconnect properly # ...should we? - self.dataset.update_status("Error scraping posts from Telegram") - self.log.error(f"Telegram scraping error: {traceback.format_exc()}") - return [] + self.dataset.update_status("Error scraping posts from Telegram; halting collection.") + self.log.error(f"Telegram scraping error (dataset {self.dataset.key}): {traceback.format_exc()}") + # May as well return what was captured, yes? + return posts finally: await client.disconnect() @@ -356,6 +367,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): crawl_max_depth = self.parameters.get("crawl-depth", 0) crawl_msg_threshold = self.parameters.get("crawl-threshold", 10) + crawl_via_links = self.parameters.get("crawl-via-links", False) self.dataset.log(f"Max crawl depth: {crawl_max_depth}") self.dataset.log(f"Crawl threshold: {crawl_msg_threshold}") @@ -364,12 +376,13 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): # has been mentioned. When crawling is enabled and this exceeds the # given threshold, the entity is added to the query crawl_references = {} - queried_entities = list(queries) - full_query = list(queries) + full_query = set(queries) + num_queries = len(queries) # we may not always know the 'entity username' for an entity ID, so # keep a reference map as we go entity_id_map = {} + query_id_map= {} # Collect queries # Use while instead of for so we can change queries during iteration @@ -383,17 +396,18 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): delay = 10 retries = 0 processed += 1 - self.dataset.update_progress(processed / len(full_query)) + self.dataset.update_progress(processed / num_queries) if no_additional_queries: - # Note that we are note completing this query + # Note that we are not completing this query self.dataset.update_status(f"Rate-limited by Telegram; not executing query {entity_id_map.get(query, query)}") continue while True: self.dataset.update_status(f"Retrieving messages for entity '{entity_id_map.get(query, query)}'") + entity_posts = 0 + discovered = 0 try: - entity_posts = 0 async for message in client.iter_messages(entity=query, offset_date=max_date): entity_posts += 1 total_messages += 1 @@ -413,11 +427,14 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): # the channel a message was forwarded from (but that # needs extra API requests...) serialized_message = SearchTelegram.serialize_obj(message) - if "_chat" in serialized_message and query not in entity_id_map and serialized_message["_chat"]["id"] == query: - # once we know what a channel ID resolves to, use the username instead so it is easier to - # understand for the user - entity_id_map[query] = serialized_message["_chat"]["username"] - self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") + if "_chat" in serialized_message: + # Add query ID to check if queries have been crawled previously + full_query.add(serialized_message["_chat"]["id"]) + if query not in entity_id_map and serialized_message["_chat"]["id"] == query: + # once we know what a channel ID resolves to, use the username instead so it is easier to + # understand for the user + entity_id_map[query] = serialized_message["_chat"]["username"] + self.dataset.update_status(f"Fetching messages for entity '{entity_id_map[query]}' (channel ID {query})") if resolve_refs: serialized_message = await self.resolve_groups(client, serialized_message) @@ -427,29 +444,85 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): break # if crawling is enabled, see if we found something to add to the query - if crawl_max_depth and (not crawl_msg_threshold or depth_map.get(query) < crawl_msg_threshold): + linked_entities = set() + if crawl_max_depth and (depth_map.get(query) < crawl_max_depth): message_fwd = serialized_message.get("fwd_from") fwd_from = None - if message_fwd and message_fwd["from_id"] and message_fwd["from_id"].get("_type") == "PeerChannel": - # even if we haven't resolved the ID, we can feed the numeric ID - # to Telethon! this is nice because it means we don't have to - # resolve entities to crawl iteratively - fwd_from = int(message_fwd["from_id"]["channel_id"]) - - if fwd_from and fwd_from not in queried_entities and fwd_from not in queries: - # new entity discovered! - # might be discovered (before collection) multiple times, so retain lowest depth - depth_map[fwd_from] = min(depth_map.get(fwd_from, crawl_max_depth), depth_map[query] + 1) - if depth_map[query] < crawl_max_depth: - if fwd_from not in crawl_references: - crawl_references[fwd_from] = 0 - - crawl_references[fwd_from] += 1 - if crawl_references[fwd_from] >= crawl_msg_threshold and fwd_from not in queries: - queries.append(fwd_from) - full_query.append(fwd_from) - self.dataset.update_status(f"Discovered new entity {entity_id_map.get(fwd_from, fwd_from)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") - + fwd_source_type = None + if message_fwd and message_fwd.get("from_id"): + if message_fwd["from_id"].get("_type") == "PeerChannel": + # Legacy(?) data structure (pre 2024/7/22) + # even if we haven't resolved the ID, we can feed the numeric ID + # to Telethon! this is nice because it means we don't have to + # resolve entities to crawl iteratively + fwd_from = int(message_fwd["from_id"]["channel_id"]) + fwd_source_type = "channel" + elif message_fwd and message_fwd.get("from_id", {}).get('full_chat',{}): + # TODO: do we need a check here to only follow certain types of messages? this is similar to resolving, but the types do not appear the same to me + # Note: message_fwd["from_id"]["channel_id"] == message_fwd["from_id"]["full_chat"]["id"] in test cases so far + fwd_from = int(message_fwd["from_id"]["full_chat"]["id"]) + fwd_source_type = "channel" + elif message_fwd and (message_fwd.get("from_id", {}).get('full_user',{}) or message_fwd.get("from_id", {}).get("_type") == "PeerUser"): + # forwards can also come from users + # these can never be followed, so don't add these to the crawl, but do document them + fwd_source_type = "user" + else: + print(json.dumps(message_fwd)) + self.log.warning(f"Telegram (dataset {self.dataset.key}): Unknown fwd_from data structure; unable to crawl") + fwd_source_type = "unknown" + + if fwd_from: + linked_entities.add(fwd_from) + + + if crawl_via_links: + # t.me links + all_links = ural.urls_from_text(serialized_message["message"]) + all_links = [link.split("t.me/")[1] for link in all_links if ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] + for link in all_links: + if link.startswith("+"): + # invite links + continue + + entity_name = link.split("/")[0].split("?")[0].split("#")[0] + linked_entities.add(entity_name) + + # @references + references = [r for t, r in message.get_entities_text() if type(t) is MessageEntityMention] + for reference in references: + if reference.startswith("@"): + reference = reference[1:] + + reference = reference.split("/")[0] + + linked_entities.add(reference) + + # Check if fwd_from or the resolved entity ID is already queued or has been queried + for link in linked_entities: + if link not in full_query and link not in queries and fwd_source_type not in ("user",): + # new entity discovered! + # might be discovered (before collection) multiple times, so retain lowest depth + # print(f"Potentially crawling {link}") + depth_map[link] = min(depth_map.get(link, crawl_max_depth), depth_map[query] + 1) + if link not in crawl_references: + crawl_references[link] = 0 + crawl_references[link] += 1 + + # Add to queries if it has been referenced enough times + if crawl_references[link] >= crawl_msg_threshold: + queries.append(link) + full_query.add(link) + num_queries += 1 + discovered += 1 + self.dataset.update_status(f"Discovered new entity {entity_id_map.get(link, link)} in {entity_id_map.get(query, query)} at crawl depth {depth_map[query]}, adding to query") + + + + serialized_message["4CAT_metadata"] = { + "collected_at": datetime.now().isoformat(), # this is relevant for rather long crawls + "query": query, # possibly redundant, but we are adding non-user defined queries by crawling and may be useful to know exactly what query was used to collect an entity + "query_depth": depth_map.get(query, 0) + } yield serialized_message if entity_posts >= max_items: @@ -502,6 +575,7 @@ async def gather_posts(self, client, queries, max_items, min_date, max_date): delay *= 2 continue + self.dataset.log(f"Completed {entity_id_map.get(query, query)} with {entity_posts} messages (discovered {discovered} new entities)") break async def resolve_groups(self, client, message): @@ -703,6 +777,9 @@ def map_item(message): if from_data and from_data.get("from_name"): forwarded_name = message["fwd_from"]["from_name"] + if from_data and from_data.get("users") and len(from_data["users"]) > 0 and "user" not in from_data: + from_data["user"] = from_data["users"][0] + if from_data and ("user" in from_data or "chats" in from_data): # 'resolve entities' was enabled for this dataset if "user" in from_data: @@ -743,6 +820,38 @@ def map_item(message): # Failsafe; can be updated to support formatting of new datastructures in the future reactions += f"{reaction}, " + # t.me links + linked_entities = set() + all_links = ural.urls_from_text(message["message"]) + all_links = [link.split("t.me/")[1] for link in all_links if + ural.get_hostname(link) == "t.me" and len(link.split("t.me/")) > 1] + + for link in all_links: + if link.startswith("+"): + # invite links + continue + + entity_name = link.split("/")[0].split("?")[0].split("#")[0] + linked_entities.add(entity_name) + + # @references + # in execute_queries we use MessageEntityMention to get these + # however, after serializing these objects we only have the offsets of + # the mentioned username, and telegram does weird unicode things to its + # offsets meaning we can't just substring the message. So use a regex + # as a 'good enough' solution + all_mentions = set(re.findall(r"@([^\s\W]+)", message["message"])) + + # make this case-insensitive since people may use different casing in + # messages than the 'official' username for example + all_connections = set([v for v in [forwarded_username, *linked_entities, *all_mentions] if v]) + all_ci_connections = set() + seen = set() + for connection in all_connections: + if connection.lower() not in seen: + all_ci_connections.add(connection) + seen.add(connection.lower()) + return MappedItem({ "id": f"{message['_chat']['username']}-{message['id']}", "thread_id": thread, @@ -754,7 +863,7 @@ def map_item(message): "body": message["message"], "reply_to": message.get("reply_to_msg_id", ""), "views": message["views"] if message["views"] else "", - "forwards": message.get("forwards", MissingMappedField(0)), + # "forwards": message.get("forwards", MissingMappedField(0)), "reactions": reactions, "timestamp": datetime.fromtimestamp(message["date"]).strftime("%Y-%m-%d %H:%M:%S"), "unix_timestamp": int(message["date"]), @@ -764,6 +873,9 @@ def map_item(message): "author_forwarded_from_name": forwarded_name, "author_forwarded_from_username": forwarded_username, "author_forwarded_from_id": forwarded_id, + "entities_linked": ",".join(linked_entities), + "entities_mentioned": ",".join(all_mentions), + "all_connections": ",".join(all_ci_connections), "timestamp_forwarded_from": datetime.fromtimestamp(forwarded_timestamp).strftime( "%Y-%m-%d %H:%M:%S") if forwarded_timestamp else "", "unix_timestamp_forwarded_from": forwarded_timestamp, @@ -975,7 +1087,6 @@ def validate_query(query, request, user): return { "items": num_items, "query": ",".join(sanitized_items), - "board": "", # needed for web interface "api_id": query.get("api_id"), "api_hash": query.get("api_hash"), "api_phone": query.get("api_phone"), @@ -984,7 +1095,8 @@ def validate_query(query, request, user): "min_date": min_date, "max_date": max_date, "crawl-depth": query.get("crawl-depth"), - "crawl-threshold": query.get("crawl-threshold") + "crawl-threshold": query.get("crawl-threshold"), + "crawl-via-links": query.get("crawl-via-links") } @staticmethod diff --git a/datasources/threads/DESCRIPTION.md b/datasources/threads/DESCRIPTION.md new file mode 100644 index 000000000..22f95bba8 --- /dev/null +++ b/datasources/threads/DESCRIPTION.md @@ -0,0 +1,9 @@ +The Threads data source can be used to manipulate data collected from [Threads](https://threads.net) - Meta's +microblogging platform - with [Zeeschuimer](https://github.com/digitalmethodsinitiative/zeeschuimer). Data is collected +with the browser extension; 4CAT cannot collect data on its own. After collecting data with Zeeschuimer it can be +uploaded to 4CAT for further processing and analysis. See the Zeeschuimer documentation for more information on how to +collect data with it. + +Data is collected as it is formatted internally by Threads' website. Posts are stored as (large) JSON objects; it +will usually be easier to make sense of the data by downloading it as a CSV file from 4CAT instead. The JSON structure +is relatively straightforward and contains some data not included in the CSV exports. \ No newline at end of file diff --git a/datasources/threads/__init__.py b/datasources/threads/__init__.py new file mode 100644 index 000000000..a4f019429 --- /dev/null +++ b/datasources/threads/__init__.py @@ -0,0 +1,12 @@ +""" +Initialize Threads data source +""" + +# An init_datasource function is expected to be available to initialize this +# data source. A default function that does this is available from the +# backend helpers library. +from common.lib.helpers import init_datasource + +# Internal identifier for this data source +DATASOURCE = "threads" +NAME = "Threads" \ No newline at end of file diff --git a/datasources/threads/search_threads.py b/datasources/threads/search_threads.py new file mode 100644 index 000000000..02c8c2de4 --- /dev/null +++ b/datasources/threads/search_threads.py @@ -0,0 +1,78 @@ +""" +Import scraped Threads data + +It's prohibitively difficult to scrape data from Threads within 4CAT itself due +to its aggressive rate limiting. Instead, import data collected elsewhere. +""" +from datetime import datetime +from urllib.parse import urlparse, parse_qs, unquote +import re + +from backend.lib.search import Search +from common.lib.item_mapping import MappedItem + + +class SearchThreads(Search): + """ + Import scraped Threads data + """ + type = "threads-search" # job ID + category = "Search" # category + title = "Import scraped Threads data" # title displayed in UI + description = "Import Threads data collected with an external tool such as Zeeschuimer." # description displayed in UI + extension = "ndjson" # extension of result file, used internally and in UI + is_from_zeeschuimer = True + + # not available as a processor for existing datasets + accepts = [None] + references = [ + "[Zeeschuimer browser extension](https://github.com/digitalmethodsinitiative/zeeschuimer)", + "[Worksheet: Capturing TikTok data with Zeeschuimer and 4CAT](https://tinyurl.com/nmrw-zeeschuimer-tiktok)" + ] + + def get_items(self, query): + """ + Run custom search + + Not available for 9gag + """ + raise NotImplementedError("Threads datasets can only be created by importing data from elsewhere") + + @staticmethod + def map_item(post): + post_timestamp = datetime.fromtimestamp(post["taken_at"]) + + if post["carousel_media"]: + image_urls = [c["image_versions2"]["candidates"].pop(0)["url"] for c in post["carousel_media"] if c["image_versions2"]] + video_urls = [c["video_versions"].pop(0)["url"] for c in post["carousel_media"] if c["video_versions"]] + else: + image_urls = [post["image_versions2"]["candidates"].pop(0)["url"]] if post["image_versions2"].get("candidates") else [] + video_urls = [post["video_versions"].pop(0)["url"]] if post["video_versions"] else [] + + linked_url = "" + link_thumbnail = "" + if post["text_post_app_info"].get("link_preview_attachment"): + linked_url = post["text_post_app_info"]["link_preview_attachment"]["url"] + linked_url = parse_qs(urlparse(linked_url).query).get("u", "").pop() + link_thumbnail = post["text_post_app_info"]["link_preview_attachment"].get("image_url") + + return MappedItem({ + "id": post["code"], + "url": f"https://www.threads.net/@{post['user']['username']}/post/{post['code']}", + "body": post["caption"]["text"] if post["caption"] else "", + "timestamp": post_timestamp.strftime("%Y-%m-%d %H:%M:%S"), + "author": post["user"]["username"], + "author_is_verified": "yes" if post["user"].get("is_verified") else "no", + "author_avatar": post["user"].get("profile_pic_url"), + "image_url": ",".join(image_urls), + "video_url": ",".join(video_urls), + "link_url": linked_url, + "link_thumbnail_url": link_thumbnail if link_thumbnail else "", + "is_paid_partnership": "yes" if post["is_paid_partnership"] else "no", + "likes": post["like_count"], + "reposts": post["text_post_app_info"]["repost_count"], + "replies": post["text_post_app_info"]["direct_reply_count"], + "quotes": post["text_post_app_info"]["quote_count"], + "hashtags": ",".join(re.findall(r"#([^\s!@#$%ˆ&*()_+{}:\"|<>?\[\];'\,./`~']+)", post["caption"]["text"])) if post["caption"] else "", + "unix_timestamp": int(post_timestamp.timestamp()), + }) diff --git a/docker-compose_build.yml b/docker-compose_build.yml index 7466e8ba8..b81a9fb94 100644 --- a/docker-compose_build.yml +++ b/docker-compose_build.yml @@ -32,6 +32,9 @@ services: - ./data/datasets/:/usr/src/app/data/ - ./data/config/:/usr/src/app/config/ - ./data/logs/:/usr/src/app/logs/ +# - 4cat_data:/usr/src/app/data/ +# - 4cat_config:/usr/src/app/config/ +# - 4cat_logs:/usr/src/app/logs/ entrypoint: docker/docker-entrypoint.sh frontend: @@ -49,6 +52,9 @@ services: - ./data/datasets/:/usr/src/app/data/ - ./data/config/:/usr/src/app/config/ - ./data/logs/:/usr/src/app/logs/ +# - 4cat_data:/usr/src/app/data/ +# - 4cat_config:/usr/src/app/config/ +# - 4cat_logs:/usr/src/app/logs/ command: ["docker/wait-for-backend.sh"] volumes: diff --git a/helper-scripts/first-run.py b/helper-scripts/first-run.py index dea0fd487..a565a591e 100644 --- a/helper-scripts/first-run.py +++ b/helper-scripts/first-run.py @@ -40,7 +40,7 @@ # Now check for presence of required NLTK packages import nltk -nltk_downloads = ("wordnet", "punkt", "omw-1.4") +nltk_downloads = ("wordnet", "punkt_tab", "omw-1.4") for package in nltk_downloads: # if it already exists, .download() will just NOP try: diff --git a/helper-scripts/migrate.py b/helper-scripts/migrate.py index 25071afe4..55c26c044 100644 --- a/helper-scripts/migrate.py +++ b/helper-scripts/migrate.py @@ -69,9 +69,9 @@ def check_for_nltk(): # NLTK import nltk try: - nltk.data.find('tokenizers/punkt') + nltk.data.find('tokenizers/punkt_tab') except LookupError: - nltk.download('punkt', quiet=True) + nltk.download('punkt_tab', quiet=True) try: nltk.data.find('corpora/wordnet') except LookupError: diff --git a/processors/conversion/export_datasets.py b/processors/conversion/export_datasets.py new file mode 100644 index 000000000..bd7b81289 --- /dev/null +++ b/processors/conversion/export_datasets.py @@ -0,0 +1,106 @@ +""" +Export a dataset and all its children to a ZIP file +""" +import shutil +import json +import datetime + +from backend.lib.processor import BasicProcessor +from common.lib.dataset import DataSet +from common.lib.exceptions import DataSetException + +__author__ = "Dale Wahl" +__credits__ = ["Dale Wahl"] +__maintainer__ = "Dale Wahl" +__email__ = "4cat@oilab.eu" + + + +class ExportDatasets(BasicProcessor): + """ + Export a dataset and all its children to a ZIP file + """ + type = "export-datasets" # job type ID + category = "Conversion" # category + title = "Export Dataset and All Analyses" # title displayed in UI + description = "Creates a ZIP file containing the dataset and all analyses to be archived and uploaded to a 4CAT instance in the future. Automatically expires after 1 day, after which you must run again." # description displayed in UI + extension = "zip" # extension of result file, used internally and in UI + + @classmethod + def is_compatible_with(cls, module=None, user=None): + """ + Determine if processor is compatible with dataset + + :param module: Module to determine compatibility with + """ + return module.is_top_dataset() and user.can_access_dataset(dataset=module, role="owner") + + def process(self): + """ + This takes a CSV file as input and writes the same data as a JSON file + """ + self.dataset.update_status("Collecting dataset and all analyses") + + results_path = self.dataset.get_staging_area() + + exported_datasets = [] + failed_exports = [] # keys that failed to import + keys = [self.dataset.top_parent().key] # get the key of the top parent + while keys: + dataset_key = keys.pop(0) + self.dataset.log(f"Exporting dataset {dataset_key}.") + + try: + dataset = DataSet(key=dataset_key, db=self.db) + # TODO: these two should fail for the primary dataset, but should they fail for the children too? + except DataSetException: + self.dataset.finish_with_error("Dataset not found.") + return + if not dataset.is_finished(): + self.dataset.finish_with_error("You cannot export unfinished datasets.") + return + + # get metadata + metadata = dataset.get_metadata() + if metadata["num_rows"] == 0: + self.dataset.update_status(f"Skipping empty dataset {dataset_key}") + failed_exports.append(dataset_key) + continue + + # get data + data_file = dataset.get_results_path() + if not data_file.exists(): + self.dataset.finish_with_error(f"Dataset {dataset_key} has no data; skipping.") + failed_exports.append(dataset_key) + continue + + # get log + log_file = dataset.get_results_path().with_suffix(".log") + + # All good, add to ZIP + with results_path.joinpath(f"{dataset_key}_metadata.json").open("w", encoding="utf-8") as outfile: + outfile.write(json.dumps(metadata)) + shutil.copy(data_file, results_path.joinpath(data_file.name)) + if log_file.exists(): + shutil.copy(log_file, results_path.joinpath(log_file.name)) + + # add children to queue + # Not using get_all_children() because we want to skip unfinished datasets and only need the keys + children = [d["key"] for d in self.db.fetchall("SELECT key FROM datasets WHERE key_parent = %s AND is_finished = TRUE", (dataset_key,))] + keys.extend(children) + + self.dataset.update_status(f"Exported dataset {dataset_key}.") + exported_datasets.append(dataset_key) + + # Add export log to ZIP + self.dataset.log(f"Exported datasets: {exported_datasets}") + self.dataset.log(f"Failed to export datasets: {failed_exports}") + shutil.copy(self.dataset.get_log_path(), results_path.joinpath("export.log")) + + # set expiration date + # these datasets can be very large and are just copies of the existing datasets, so we don't need to keep them around for long + # TODO: convince people to stop using hyphens in python variables and file names... + self.dataset.__setattr__("expires-after", (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp()) + + # done! + self.write_archive_and_finish(results_path, len(exported_datasets)) \ No newline at end of file diff --git a/processors/machine_learning/annotate_text.py b/processors/machine_learning/annotate_text.py index 954963de4..022e96de5 100644 --- a/processors/machine_learning/annotate_text.py +++ b/processors/machine_learning/annotate_text.py @@ -31,6 +31,13 @@ class TextClassifier(BasicProcessor): "provided categories.") # description displayed in UI extension = "csv" # extension of result file, used internally and in UI + references = [ + "Annotations are made using the [Stormtrooper](https://centre-for-humanities-computing.github.io/stormtrooper/) library", + "Model card: [google/flan-t5-large](https://huggingface.co/google/flan-t5-large)", + "Model card: [tiiuae/falcon-7b-instruct](https://huggingface.co/tiiuae/falcon-7b-instruct)", + "Model card: [meta-llama/Meta-Llama-3.1-8B-Instruct](https://huggingface.co/meta-llama/Meta-Llama-3.1-8B-Instruct)" + ] + config = { "dmi-service-manager.stormtrooper_intro-1": { "type": UserInput.OPTION_INFO, @@ -40,6 +47,11 @@ class TextClassifier(BasicProcessor): "type": UserInput.OPTION_TOGGLE, "default": False, "help": "Enable LLM-powered text classification", + }, + "dmi-service-manager.stormtrooper_models": { + "type": UserInput.OPTION_TEXT, + "default": "google/flan-t5-large,tiiaue/falcon-7b-instruct", + "help": "Comma-separated list of models that can be selected" } } @@ -53,8 +65,6 @@ class TextClassifier(BasicProcessor): "type": UserInput.OPTION_CHOICE, "default": "google/flan-t5-large", "options": { - "google/flan-t5-large": "google/flan-t5-large", - "tiiaue/falcon-7b-instruct": "tiiaue/falcon-7b-instruct" }, "help": "Large Language Model to use" }, @@ -97,6 +107,10 @@ def get_options(cls, parent_dataset=None, user=None): :return dict: Processor options """ options = cls.options + + models = config.get("dmi-service-manager.stormtrooper_models", user=user).split(",") + options["model"]["options"] = {m: m for m in models} + if parent_dataset is None: return options diff --git a/processors/metrics/rank_attribute.py b/processors/metrics/rank_attribute.py index 0e38757c6..9b90b3c7b 100644 --- a/processors/metrics/rank_attribute.py +++ b/processors/metrics/rank_attribute.py @@ -203,7 +203,7 @@ def missing_value_placeholder(data, field_name): # keep track of occurrences of found items per relevant time period for value in values: if to_lowercase: - value = value.lower() + value = str(value).lower() if rank_style == "overall" and value not in overall_top: continue diff --git a/processors/metrics/url_titles.py b/processors/metrics/url_titles.py index e32e3538d..75ebd12d0 100644 --- a/processors/metrics/url_titles.py +++ b/processors/metrics/url_titles.py @@ -145,7 +145,7 @@ def process(self): self.dataset.update_status("Finding URLs in dataset") for item in self.source_dataset.iterate_items(self): # combine column contents that we need to extract URLs from - source_text = " ".join([item[column] for column in columns]) + source_text = " ".join([str(item[column]) for column in columns]) urls = ural.urls_from_text(source_text) for url in urls: diff --git a/processors/text-analysis/tokenise.py b/processors/text-analysis/tokenise.py index a104306f1..17c350c86 100644 --- a/processors/text-analysis/tokenise.py +++ b/processors/text-analysis/tokenise.py @@ -357,7 +357,7 @@ def dummy_function(x, *args, **kwargs): # for russian we use a special purpose splitter with better # performance sentence_method = razdel.sentenize - elif language not in [lang.split('.')[0] for lang in os.listdir(nltk.data.find('tokenizers/punkt')) if + elif language not in [lang.split('.')[0] for lang in os.listdir(nltk.data.find('tokenizers/punkt_tab')) if 'pickle' in lang]: self.dataset.update_status( f"Language {language} not available for sentence tokenizer; grouping by item/post instead.") diff --git a/processors/visualisation/download_images.py b/processors/visualisation/download_images.py index c13fd0fca..8b0792e22 100644 --- a/processors/visualisation/download_images.py +++ b/processors/visualisation/download_images.py @@ -574,6 +574,10 @@ def request_get_w_error_handling(self, url, retries=0, **kwargs): else: self.dataset.log("Error: ConnectionError while trying to download image %s: %s" % (url, e)) raise FileNotFoundError() + except requests.exceptions.LocationParseError as e: + # not an valid url, just skip + self.dataset.log("Error: LocationParseError while trying to download image %s: %s" % (url, e)) + raise FileNotFoundError() except requests.exceptions.InvalidSchema: # not an http url, just skip raise FileNotFoundError() diff --git a/processors/visualisation/histwords.py b/processors/visualisation/histwords.py index f6ae05261..7463e9662 100644 --- a/processors/visualisation/histwords.py +++ b/processors/visualisation/histwords.py @@ -243,6 +243,7 @@ def process(self): vectors = tsne.fit_transform(vectors) except ValueError: self.dataset.finish_with_error("Insufficient data to reduce to 2D. The word embeddings model may be too small to visualise properly.") + return elif reduction_method == "TruncatedSVD": # standard sklearn parameters made explicit svd = TruncatedSVD(n_components=2, algorithm="randomized", n_iter=5, random_state=0) diff --git a/processors/visualisation/word-trees.py b/processors/visualisation/word-trees.py index f7783bcc1..0dfe2d408 100644 --- a/processors/visualisation/word-trees.py +++ b/processors/visualisation/word-trees.py @@ -38,71 +38,94 @@ class MakeWordtree(BasicProcessor): "Wattenberg, M., & Viégas, F. B. (2008). The Word Tree, an Interactive Visual Concordance. IEEE Transactions on Visualization and Computer Graphics, 14(6), 1221–1228.