Skip to content

Commit 01612cb

Browse files
committed
add files
1 parent f6868c4 commit 01612cb

File tree

7 files changed

+315
-0
lines changed

7 files changed

+315
-0
lines changed

Readme.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Timelark Data Pipeline
2+
3+
This very basic data pipeline built in Python is part of the Timelark project. It reads unstructured text from text files, extracts named entities using spaCy, queries the Aleph API to enrich these entities, and saves the enriched data to an SQLite database. From here it can be visualized.
4+
5+
## Table of Contents
6+
7+
- [Prerequisites](#prerequisites)
8+
- [Installation](#installation)
9+
- [Configuration](#configuration)
10+
- [Running the Pipeline](#running-the-pipeline)
11+
12+
## Prerequisites
13+
14+
- Python 3.x
15+
- [spaCy](https://spacy.io/) and spaCy model (e.g., en_core_web_lg)
16+
- [Dataset](https://dataset.readthedocs.io/en/latest/index.html) (sqlite wrapper)
17+
- Aleph API access and API key (for example [OCCRP's Aleph](https://aleph.occrp.org/))
18+
- [Confection](https://github.com/explosion/confection) (for configuration management)
19+
20+
## Installation
21+
22+
Clone this repository:
23+
24+
```bash
25+
git clone <https://github.com/jlstro/timelark-pipeline.git>
26+
cd timelark-pipeline
27+
```
28+
29+
Create a virtual environment and nstall the required Python packages:
30+
31+
```bash
32+
python3 -m venv venv
33+
source venv/bin/activate
34+
# On Windows: venv\Scripts\activate
35+
python3 -m pip install spacy confection dataset
36+
```
37+
38+
Download and install the spaCy model (e.g., "en_core_web_lg"):
39+
40+
``` bash
41+
python3 -m spacy download en_core_web_lg
42+
```
43+
44+
## Configuration
45+
46+
1. Create a configuration file named `config.cfg` in the root directory of the repository. Define the paths to your database, text files, and other configuration values as needed. Refer to the [confection documentation](https://github.com/timelark/confection) for more information on writing the configuration.
47+
48+
Example `config.cfg`:
49+
50+
```ini
51+
[paths]
52+
db = "./db/data.db"
53+
files = "./text_files"
54+
55+
[aleph]
56+
host = "https://aleph.occrp.org"
57+
collections = 25, 55, 90
58+
```
59+
60+
Make sure you set your Aleph API key as an environment variable named ALEPH_API_KEY.
61+
62+
Running the Pipeline
63+
64+
Run the main script to start the pipeline:
65+
66+
```bash
67+
python3 main.py
68+
```
69+
70+
The pipeline will read text files from the specified directory, extract entities, enrich them using the API, and save the enriched data to the SQLite database.

alephutil.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import requests
2+
import logging
3+
import json
4+
5+
# Set up logging for the module
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def build_aleph_url(host, collection_ids, schema, limit, query):
10+
base_url = f'{host}/api/2/entities'
11+
12+
# Construct the filter parameters for collection IDs
13+
collection_filters = "&".join([f"filter%3Acollection_id={collection_id}" for collection_id in collection_ids])
14+
15+
# Construct the full URL with parameters
16+
url = f"{base_url}?{collection_filters}&filter%3Aschemata={schema}&limit={limit}&q={query}"
17+
return url
18+
19+
def query_aleph(entity, config):
20+
"""
21+
Queries the Aleph API to retrieve information based on a an entity type and name.
22+
23+
:param entity: A tuple containing the name and the type of an entity
24+
:param config: Configuration dictionary containing API key, Aleph host, and collection IDs
25+
"""
26+
27+
host = config['host']
28+
api_key = config['api_key']
29+
collections = config['collections']
30+
schema = entity['schema']
31+
name = entity['name']
32+
33+
HEADERS = { "Authorization": api_key, "Content-Type": 'application/json', 'Accept-Encoding': 'gzip, deflate, br' }
34+
url = build_aleph_url(host, collections, schema, 50, name)
35+
response = requests.get(url, headers=HEADERS)
36+
37+
if response.status_code == 200: # Check if the response is successful
38+
logger.info(f'Aleph API call successful: {entity}')
39+
data = json.loads(response.text)
40+
return data
41+
else:
42+
logger.error(f"API call failed: {url}")
43+
return None
44+
45+
def parse_schema(schema):
46+
if schema == 'ORG':
47+
return 'Company'
48+
elif schema == 'PERSON':
49+
return 'Person'
50+
elif schema == 'EVENT':
51+
return 'Event'
52+
else:
53+
return 'LegalEntity'
54+
55+
def enrich_entities(entity, config):
56+
"""
57+
Enriches entity information using the Aleph API data.
58+
59+
:param entity: a single spacy extracted entity
60+
:param config: Configuration dictionary containing API key, Aleph host, and collection IDs
61+
:return: Enriched entity information
62+
"""
63+
schema = parse_schema(entity['category'])
64+
name = entity['text']
65+
enriched_entity = {'name': name, 'schema': schema}
66+
67+
try:
68+
data = query_aleph(enriched_entity, config)
69+
if data:
70+
for result in data['results']:
71+
for k, v in result['properties'].items():
72+
if k in enriched_entity and enriched_entity[k] != result['properties'][k]:
73+
enriched_entity[k] = [enriched_entity[k]] + result['properties'][k]
74+
else:
75+
enriched_entity[k] = result['properties'][k]
76+
else:
77+
logger.warning(f"No data received from Aleph API for entity: {entity['text']}")
78+
except Exception as e:
79+
logger.error(f"Error during API query for entity {entity['text']}: {e}")
80+
81+
return enriched_entity

config.cfg

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[paths]
2+
examples = "./example_data.jsonl"
3+
db = "./db/timelark.db"
4+
files = "./text"
5+
6+
[aleph]
7+
host = "https://aleph.occrp.org/"
8+
collections = 6102, 4558, 119, 845

dbmanager.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import dataset
2+
import logging
3+
import uuid
4+
5+
# Set up logging for the module
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def save_to_db(data, db_path):
10+
try:
11+
db = dataset.connect(f"sqlite:///{db_path}")
12+
13+
for entity in data:
14+
schema = entity.get("schema")
15+
if schema:
16+
table_name = f"{schema}_entities"
17+
primary_key = str(uuid.uuid4()) # Generate a UUID as the primary key
18+
19+
table = db.get_table(table_name) or db.create_table(table_name, primary_id="uuid")
20+
entity["uuid"] = primary_key # Add the UUID to the entity dictionary
21+
table.upsert(entity, keys=["uuid"]) # Use the UUID as the primary key
22+
logger.info(f"Inserted '{schema}' into the database")
23+
else:
24+
logger.warning(f"Entity has no schema specified, skipping insertion: {entity}")
25+
except Exception as e:
26+
logger.error(f"Error while saving to the database: {e}")

extractor.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import spacy
2+
import logging
3+
4+
# Set up logging for the module
5+
logger = logging.getLogger(__name__)
6+
7+
# Load the larger spaCy model for entity recognition
8+
nlp = spacy.load("en_core_web_lg") # or en_core_web_md or sm
9+
10+
def extract_entities(text):
11+
"""
12+
Extracts entities from the given text and filters for specific categories.
13+
14+
Args:
15+
text (str): Input text to extract entities from.
16+
17+
Returns:
18+
list: List of extracted entities.
19+
"""
20+
try:
21+
logger.info("Extracting entities from text")
22+
doc = nlp(text)
23+
entities = []
24+
target_labels = ["LOC", "PERSON", "ORG", "GPE", "DATE", "EVENT"]
25+
for ent in doc.ents:
26+
if ent.label_ in target_labels:
27+
entities.append({"text": ent.text, "category": ent.label_})
28+
logger.info(f"Extracted {len(entities)} entities from text")
29+
return entities
30+
except Exception as e:
31+
logger.error(f"Error extracting entities: {e}")
32+
raise RuntimeError(f"Error extracting entities: {e}")

file_reader.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import logging
2+
3+
# Set up logging for the module
4+
logger = logging.getLogger(__name__)
5+
6+
def read_text_from_files(file_path):
7+
"""
8+
Reads text from a file.
9+
10+
Args:
11+
file_path (str): Path to the text file.
12+
13+
Returns:
14+
str: Contents of the text file.
15+
"""
16+
try:
17+
logger.info("Reading text from file: %s", file_path)
18+
with open(file_path, "r", encoding="utf-8") as file:
19+
text = file.read()
20+
logger.info("Read text from file: %s", file_path)
21+
return text
22+
except Exception as e:
23+
logger.error("Error reading file %s: %s", file_path, e)
24+
raise RuntimeError(f"Error reading file {file_path}: {e}")

main.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
import logging
3+
from confection import registry, Config
4+
from file_reader import read_text_from_files
5+
from extractor import extract_entities
6+
from alephutil import enrich_entities
7+
from dbmanager import save_to_db
8+
from glob import glob
9+
10+
# Set up logging
11+
logging.basicConfig(
12+
level=logging.INFO,
13+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
14+
datefmt="%Y-%m-%d %H:%M:%S"
15+
)
16+
17+
# Load configuration from disk
18+
config_file = Config().from_disk("./config.cfg")
19+
config = registry.resolve(config_file)
20+
# Access configuration values
21+
db_path = config['paths']['db']
22+
files_path = config['paths']['files']
23+
aleph_host = config['aleph']['host']
24+
collections = list[config['aleph']['collections']]
25+
# Read API key from environment variable
26+
api_key = os.environ.get("ALEPH_API_KEY")
27+
# create aleph_config variable for easier handling
28+
aleph_config = {"host":aleph_host, "api_key":api_key, "collections":collections}
29+
30+
# Check for None and log warnings if needed
31+
missing_values = []
32+
if db_path is None:
33+
missing_values.append("db_path")
34+
if files_path is None:
35+
missing_values.append("files_path")
36+
if aleph_host is None:
37+
missing_values.append("aleph_host")
38+
if api_key is None:
39+
missing_values.append("api_key")
40+
41+
if missing_values:
42+
logging.warning("The following configuration values are missing or None: %s", ", ".join(missing_values))
43+
44+
45+
if __name__ == "__main__":
46+
logging.info("Starting the data pipeline")
47+
48+
# Use glob to get a list of all text files in the specified folder
49+
text_files = glob(files_path+"/*.txt")
50+
51+
logging.info(f"Found {len(text_files)} text files to process")
52+
53+
for txt in text_files:
54+
logging.info(f"Processing text file: {txt}")
55+
56+
# Read text from the file
57+
text = read_text_from_files(txt)
58+
59+
# Extract entities from the text
60+
entities = extract_entities(text)
61+
62+
enriched_entities = []
63+
if entities:
64+
# Enrich entities using Aleph API
65+
for entity in entities:
66+
enriched_entity = enrich_entities(entity, aleph_config)
67+
enriched_entities.append(enriched_entity)
68+
69+
# Save enriched entities to the database
70+
save_to_db(enriched_entities, db_path)
71+
72+
logging.info(f"Processed and saved data from {txt}")
73+
74+
logging.info("Data pipeline completed")

0 commit comments

Comments
 (0)