Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sal-uva committed Jun 14, 2024
2 parents d67cf44 + b10e3bb commit 25fded7
Show file tree
Hide file tree
Showing 74 changed files with 1,674 additions and 609 deletions.
2 changes: 1 addition & 1 deletion .zenodo.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"license": "MPL-2.0",
"title": "4CAT Capture and Analysis Toolkit",
"upload_type": "software",
"version": "v1.40",
"version": "v1.44",
"keywords": [
"webmining",
"scraping",
Expand Down
7 changes: 4 additions & 3 deletions 4cat-daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
cli = argparse.ArgumentParser()
cli.add_argument("--interactive", "-i", default=False, help="Run 4CAT in interactive mode (not in the background).",
action="store_true")
cli.add_argument("--log-level", "-l", default="INFO", help="Set log level (\"DEBUG\", \"INFO\", \"WARNING\", \"ERROR\", \"CRITICAL\", \"FATAL\").")
cli.add_argument("--no-version-check", "-n", default=False,
help="Skip version check that may prompt the user to migrate first.", action="store_true")
cli.add_argument("command")
Expand Down Expand Up @@ -81,14 +82,14 @@
print("Running backend in interactive mode instead.")
import backend.bootstrap as bootstrap

bootstrap.run(as_daemon=False)
bootstrap.run(as_daemon=False, log_level=args.log_level)
sys.exit(0)

if args.interactive:
print("Running backend in interactive mode.")
import backend.bootstrap as bootstrap

bootstrap.run(as_daemon=False)
bootstrap.run(as_daemon=False, log_level=args.log_level)
sys.exit(0)
else:
# if so, import necessary modules
Expand Down Expand Up @@ -129,7 +130,7 @@ def start():
detach_process=True
) as context:
import backend.bootstrap as bootstrap
bootstrap.run(as_daemon=True)
bootstrap.run(as_daemon=True, log_level=args.log_level)

sys.exit(0)

Expand Down
38 changes: 23 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,47 @@ such as the generation and visualisation of word embedding models.
platforms that are part of the tool, but you can also [add additional data
sources](https://github.com/digitalmethodsinitiative/4cat/wiki/How-to-make-a-data-source)
using 4CAT's Python API. The following data sources are currently supported
actively and can be used to collect data with 4CAT:
actively and can be used to collect data with 4CAT directly:

* 4chan and 8kun
* BitChute
* Reddit
* Telegram
* Tumblr
* Twitter API v2 (Academic and regular tracks)

The following platforms are supported through other tools, with which you can
collect data to import data into 4CAT for analysis:
The following platforms are supported through
[Zeeschuimer](https://github.com/digitalmethodsinitiative/zeeschuimer), with
which you can collect data to import into 4CAT for analysis:

* Instagram, TikTok, 9gag, Imgur, LinkedIn, Parler, Douyin and Twitter (via
[Zeeschuimer](https://github.com/digitalmethodsinitiative/zeeschuimer))
* Facebook and Instagram (via [CrowdTangle](https://www.crowdtangle.com) exports)
* Instagram (posts)
* TikTok (posts and comments)
* 9gag
* Imgur
* LinkedIn
* Gab
* Douyin
* X/Twitter

It is also possible to upload data collected with other tools as CSV files. The
following tools are explicitly supported but other data can also be uploaded as
long as it is formatted as CSV:

* Facebook and Instagram (via [CrowdTangle](https://www.crowdtangle.com) or [Facepager](https://github.com/strohne/Facepager) exports)
* YouTube videos and comments (via the [YouTube Data Tools](https://ytdt.digitalmethods.net/))
* Weibo (via [Bazhuayu](https://www.bazhuayu.com/))

A number of other platforms have built-in support that is untested, or requires
e.g. special API access. You can view the [data sources in our wiki](https://github.com/digitalmethodsinitiative/4cat/wiki/Available-data-sources) or review [the data
sources' code](https://github.com/digitalmethodsinitiative/4cat/tree/master/datasources)
in the GitHub repository. It is also possible to import your own CSV files into
4CAT for analysis.
in the GitHub repository.

## Installation
You can install 4CAT locally or on a server via Docker or manually. For easiest installation, we reccomend copying our [`docker-compose.yml file`](https://raw.githubusercontent.com/digitalmethodsinitiative/4cat/master/docker-compose.yml), [`.env`](https://raw.githubusercontent.com/digitalmethodsinitiative/4cat/master/.env) file, and running this terminal command in the folder where those files have been saved:
You can install 4CAT locally or on a server via Docker or manually. For easiest installation, we recommend copying our [`docker-compose.yml file`](https://raw.githubusercontent.com/digitalmethodsinitiative/4cat/master/docker-compose.yml), [`.env`](https://raw.githubusercontent.com/digitalmethodsinitiative/4cat/master/.env) file, and running this terminal command in the folder where those files have been saved:

```
docker-compose up -d
```

In depth instructions on both Docker installation and manual installation can be found [in our
wiki](https://github.com/digitalmethodsinitiative/4cat/wiki/Installing-4CAT).

A video walkthrough installing 4CAT via Docker can be found on [YouTube here](https://youtu.be/oWsB7bvNfOY).
wiki](https://github.com/digitalmethodsinitiative/4cat/wiki/Installing-4CAT). A video walkthrough installing 4CAT via Docker can be found on [YouTube here](https://youtu.be/oWsB7bvNfOY).

Currently scraping of 4chan, 8chan, and 8kun require additional steps; please see the wiki.

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
1.41
1.44

This file should not be modified. It is used by 4CAT to determine whether it
needs to run migration scripts to e.g. update the database structure to a more
Expand Down
6 changes: 3 additions & 3 deletions backend/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from common.config_manager import config

def run(as_daemon=True):
def run(as_daemon=True, log_level="INFO"):
pidfile = Path(config.get('PATH_ROOT'), config.get('PATH_LOCKFILE'), "4cat.pid")

if as_daemon:
Expand Down Expand Up @@ -49,9 +49,9 @@ def run(as_daemon=True):
if config.get("USING_DOCKER"):
as_daemon = True
# Rename log if Docker setup
log = Logger(output=True, filename='backend_4cat.log')
log = Logger(output=True, filename='backend_4cat.log', log_level=log_level)
else:
log = Logger(output=not as_daemon)
log = Logger(output=not as_daemon, filename='4cat.log', log_level=log_level)

log.info("4CAT Backend started, logger initialised")
db = Database(logger=log, appname="main",
Expand Down
16 changes: 14 additions & 2 deletions backend/lib/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,35 @@ def loop(self):
self.looping = False

self.log.info("Telling all workers to stop doing whatever they're doing...")
# request shutdown from all workers except the API
# this allows us to use the API to figure out if a certain worker is
# hanging during shutdown, for example
for jobtype in self.worker_pool:
if jobtype == "api":
continue

for worker in self.worker_pool[jobtype]:
if hasattr(worker, "request_interrupt"):
worker.request_interrupt()
else:
worker.abort()

# wait for all workers to finish
# wait for all workers that we just asked to quit to finish
self.log.info("Waiting for all workers to finish...")
for jobtype in self.worker_pool:
if jobtype == "api":
continue
for worker in self.worker_pool[jobtype]:
self.log.info("Waiting for worker %s..." % jobtype)
worker.join()

time.sleep(3)
# shut down API last
for worker in self.worker_pool.get("api", []):
worker.request_interrupt()
worker.join()

# abort
time.sleep(1)
self.log.info("Bye!")

def validate_datasources(self):
Expand Down
37 changes: 6 additions & 31 deletions backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class Search(BasicProcessor, ABC):
# Mandatory columns: ['thread_id', 'body', 'subject', 'timestamp']
return_cols = ['thread_id', 'body', 'subject', 'timestamp']

import_issues = 0
import_error_count = 0
import_warning_count = 0

def process(self):
"""
Expand Down Expand Up @@ -89,36 +90,10 @@ def process(self):
elif items is not None:
self.dataset.update_status("Query finished, no results found.")

# queue predefined processors
if num_items > 0 and query_parameters.get("next", []):
for next in query_parameters.get("next"):
next_parameters = next.get("parameters", {})
next_type = next.get("type", "")
available_processors = self.dataset.get_available_processors(user=self.dataset.creator)

# run it only if the processor is actually available for this query
if next_type in available_processors:
next_analysis = DataSet(parameters=next_parameters, type=next_type, db=self.db,
parent=self.dataset.key,
extension=available_processors[next_type]["extension"])
self.queue.add_job(next_type, remote_id=next_analysis.key)

# see if we need to register the result somewhere
if query_parameters.get("copy_to", None):
# copy the results to an arbitrary place that was passed
if self.dataset.get_results_path().exists():
# but only if we actually have something to copy
shutil.copyfile(str(self.dataset.get_results_path()), query_parameters.get("copy_to"))
else:
# if copy_to was passed, that means it's important that this
# file exists somewhere, so we create it as an empty file
with open(query_parameters.get("copy_to"), "w") as empty_file:
empty_file.write("")

if self.import_issues == 0:
if self.import_warning_count == 0 and self.import_error_count == 0:
self.dataset.finish(num_rows=num_items)
else:
self.dataset.update_status(f"{self.import_issues} item(s) in the dataset had an unexpected format. All data can be downloaded, but only data with the expected format will be available to 4CAT processors and in CSV exports; check the dataset log for details.", is_final=True)
self.dataset.update_status(f"All data imported. {str(self.import_error_count) + ' item(s) had an unexpected format and cannot be used in 4CAT processors. ' if self.import_error_count != 0 else ''}{str(self.import_warning_count) + ' item(s) missing some data fields. ' if self.import_warning_count != 0 else ''}Check the dataset log for details.", is_final=True)
self.dataset.finish(num_rows=num_items)

def search(self, query):
Expand Down Expand Up @@ -221,11 +196,11 @@ def import_from_file(self, path):
if warning not in import_warnings:
import_warnings[warning] = 0
import_warnings[warning] += 1
self.import_issues += 1
self.import_warning_count += 1

except MapItemException as e:
# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
self.import_issues += 1
self.import_error_count += 1
self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=unmapped_items is False)
unmapped_items = True

Expand Down
4 changes: 2 additions & 2 deletions backend/workers/check_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ def work(self):
# update available!
# show a notification for all admins (normal users can't update
# after all)
add_notification(self.db, "!admins",
add_notification(self.db, "!admin",
"A new version of 4CAT is [available](%s). The latest version is %s; you are running version %s." % (
release_url, latest_tag, current_version
), allow_dismiss=True)

else:
# up to date? dismiss any notifications about new versions
self.db.execute("DELETE FROM users_notifications WHERE username = '!admins' "
self.db.execute("DELETE FROM users_notifications WHERE username = '!admin' "
"AND notification LIKE 'A new version of 4CAT%'")
2 changes: 2 additions & 0 deletions backend/workers/cleanup_tempfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class TempFileCleaner(BasicWorker):
type = "clean-temp-files"
max_workers = 1

ensure_job = {"remote_id": "localhost", "interval": 10800}

def work(self):
"""
Go through result files, and for each one check if it should still
Expand Down
8 changes: 7 additions & 1 deletion backend/workers/expire_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from backend.lib.worker import BasicWorker
from common.lib.dataset import DataSet
from common.lib.exceptions import DataSetNotFoundException
from common.lib.exceptions import DataSetNotFoundException, WorkerInterruptedException

from common.lib.user import User

Expand Down Expand Up @@ -55,6 +55,9 @@ def expire_datasets(self):
""")

for dataset in datasets:
if self.interrupted:
raise WorkerInterruptedException("Interrupted while expiring datasets")

try:
dataset = DataSet(key=dataset["key"], db=self.db)
if dataset.is_expired():
Expand All @@ -81,6 +84,9 @@ def expire_users(self):
now = datetime.datetime.now()

for expiring_user in expiring_users:
if self.interrupted:
raise WorkerInterruptedException("Interrupted while expiring users")

user = User.get_by_name(self.db, expiring_user["name"])
username = user.data["name"]

Expand Down
4 changes: 2 additions & 2 deletions common/lib/config_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"datasources.enabled": {
"type": UserInput.OPTION_DATASOURCES,
"default": ["ninegag", "douban", "douyin", "imgur", "upload", "instagram", "linkedin", "parler",
"telegram", "tiktok", "twitter"],
"telegram", "tiktok", "twitter", "tiktok-comments", "truthsocial", "gab"],
"help": "Data Sources",
"tooltip": "A list of enabled data sources that people can choose from when creating a dataset page."
},
Expand All @@ -31,7 +31,7 @@
},
"datasources.expiration": {
"type": UserInput.OPTION_TEXT_JSON,
"default": {"fourchan": {"enabled": False, "allow_optout": False, "timeout": 0}, "eightchan": {"enabled": False, "allow_optout": False, "timeout": 0}, "eightkun": {"enabled": False, "allow_optout": False, "timeout": 0}, "ninegag": {"enabled": True, "allow_optout": False, "timeout": 0}, "bitchute": {"enabled": True, "allow_optout": False, "timeout": 0}, "dmi-tcat": {"enabled": False, "allow_optout": False, "timeout": 0}, "dmi-tcatv2": {"enabled": False, "allow_optout": False, "timeout": 0}, "douban": {"enabled": True, "allow_optout": False, "timeout": 0}, "douyin": {"enabled": True, "allow_optout": False, "timeout": 0}, "imgur": {"enabled": True, "allow_optout": False, "timeout": 0}, "upload": {"enabled": True, "allow_optout": False, "timeout": 0}, "instagram": {"enabled": True, "allow_optout": False, "timeout": 0}, "linkedin": {"enabled": True, "allow_optout": False, "timeout": 0}, "parler": {"enabled": True, "allow_optout": False, "timeout": 0}, "reddit": {"enabled": False, "allow_optout": False, "timeout": 0}, "telegram": {"enabled": True, "allow_optout": False, "timeout": 0}, "tiktok": {"enabled": True, "allow_optout": False, "timeout": 0}, "tiktok-urls": {"enabled": False, "allow_optout": False, "timeout": 0}, "tumblr": {"enabled": False, "allow_optout": False, "timeout": 0}, "twitter": {"enabled": True, "allow_optout": False, "timeout": 0}, "twitterv2": {"enabled": False, "allow_optout": False, "timeout": 0}, "usenet": {"enabled": False, "allow_optout": False, "timeout": 0}, "vk": {"enabled": False, "allow_optout": False, "timeout": 0}},
"default": {"fourchan": {"enabled": False, "allow_optout": False, "timeout": 0}, "eightchan": {"enabled": False, "allow_optout": False, "timeout": 0}, "eightkun": {"enabled": False, "allow_optout": False, "timeout": 0}, "ninegag": {"enabled": True, "allow_optout": False, "timeout": 0}, "bitchute": {"enabled": True, "allow_optout": False, "timeout": 0}, "dmi-tcat": {"enabled": False, "allow_optout": False, "timeout": 0}, "dmi-tcatv2": {"enabled": False, "allow_optout": False, "timeout": 0}, "douban": {"enabled": True, "allow_optout": False, "timeout": 0}, "douyin": {"enabled": True, "allow_optout": False, "timeout": 0}, "gab": {"enabled": True, "allow_optout": False, "timeout": 0}, "imgur": {"enabled": True, "allow_optout": False, "timeout": 0}, "upload": {"enabled": True, "allow_optout": False, "timeout": 0}, "instagram": {"enabled": True, "allow_optout": False, "timeout": 0}, "linkedin": {"enabled": True, "allow_optout": False, "timeout": 0}, "parler": {"enabled": True, "allow_optout": False, "timeout": 0}, "reddit": {"enabled": False, "allow_optout": False, "timeout": 0}, "telegram": {"enabled": True, "allow_optout": False, "timeout": 0}, "tiktok": {"enabled": True, "allow_optout": False, "timeout": 0}, "tiktok-urls": {"enabled": False, "allow_optout": False, "timeout": 0}, "truthsocial": {"enabled": True, "allow_optout": False, "timeout": 0}, "tumblr": {"enabled": False, "allow_optout": False, "timeout": 0}, "twitter": {"enabled": True, "allow_optout": False, "timeout": 0}, "twitterv2": {"enabled": False, "allow_optout": False, "timeout": 0}, "usenet": {"enabled": False, "allow_optout": False, "timeout": 0}, "vk": {"enabled": False, "allow_optout": False, "timeout": 0}},
"help": "Data source-specific expiration",
"tooltip": "Allows setting expiration settings per datasource. Configured by proxy via the 'data sources' "
"setting.",
Expand Down
21 changes: 10 additions & 11 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,9 +334,17 @@ def iterate_items(self, processor=None, warn_unmappable=True, map_missing="defau
# Collect item_mapper for use with filter
item_mapper = False
own_processor = self.get_own_processor()
if own_processor.map_item_method_available(dataset=self):
if own_processor and own_processor.map_item_method_available(dataset=self):
item_mapper = True

# missing field strategy can be for all fields at once, or per field
# if it is per field, it is a dictionary with field names and their strategy
# if it is for all fields, it is may be a callback, 'abort', or 'default'
default_strategy = "default"
if type(map_missing) is not dict:
default_strategy = map_missing
map_missing = {}

# Loop through items
for i, item in enumerate(self._iterate_items(processor)):
# Save original to yield
Expand All @@ -355,15 +363,6 @@ def iterate_items(self, processor=None, warn_unmappable=True, map_missing="defau
# check if fields have been marked as 'missing' in the
# underlying data, and treat according to the chosen strategy
if mapped_item.get_missing_fields():
default_strategy = "default"

# strategy can be for all fields at once, or per field
# if it is per field, it is a dictionary with field names and their strategy
# if it is for all fields, it is may be a callback, 'abort', or 'default'
if type(map_missing) is not dict:
default_strategy = map_missing
map_missing = {}

for missing_field in mapped_item.get_missing_fields():
strategy = map_missing.get(missing_field, default_strategy)

Expand Down Expand Up @@ -1369,7 +1368,7 @@ def get_available_processors(self, user=None, exclude_hidden=False):
del processors[analysis.type]
continue

if exclude_hidden and not processors[analysis.type].is_hidden:
if exclude_hidden and processors[analysis.type].is_hidden:
del processors[analysis.type]

self.available_processors = processors
Expand Down
23 changes: 18 additions & 5 deletions common/lib/dmi_service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
__maintainer__ = "Dale Wahl"
__email__ = "[email protected]"

from common.lib.helpers import strip_tags


class DmiServiceManagerException(Exception):
"""
Expand All @@ -30,6 +32,12 @@ class DsmOutOfMemory(DmiServiceManagerException):
pass


class DsmConnectionError(DmiServiceManagerException):
"""
Raised when there is a problem with the configuration settings.
"""
pass

class DmiServiceManager:
"""
Class to manage interactions with a DMI Service Manager server.
Expand Down Expand Up @@ -60,12 +68,17 @@ def check_gpu_memory_available(self, service_endpoint):
api_endpoint = self.server_address + "check_gpu_mem/" + service_endpoint
resp = requests.get(api_endpoint, timeout=30)
if resp.status_code == 200:
return True, resp.json()
elif resp.status_code in [400, 404, 500, 503]:
return False, resp.json()
return resp.json()
elif resp.status_code == 503:
# TODO: retry later (increase delay in dmi_service_manager class and interrupt w/ retry)? DSM could possibly manage jobs in queue
# Processor could run CPU mode, but DSM needs to run different container (container fails if GPU enabled but not available)
raise DsmOutOfMemory("DMI Service Manager server out of GPU memory.")
else:
self.processor.log.warning("Unknown response from DMI Service Manager: %s" % resp.text)
return False, None
try:
reason = resp.json()['reason']
except JSONDecodeError:
reason = strip_tags(resp.text)
raise DsmConnectionError(f"Connection Error {resp.status_code}: {reason}")

def process_files(self, input_file_dir, filenames, output_file_dir, server_file_collection_name, server_results_folder_name):
"""
Expand Down
Loading

0 comments on commit 25fded7

Please sign in to comment.