-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdatabus_example.py
More file actions
226 lines (194 loc) · 10.4 KB
/
databus_example.py
File metadata and controls
226 lines (194 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import datetime
import glob
import json
import os
import psycopg2
from dotenv import load_dotenv
from tqdm import tqdm
import DataBUS.neotomaHelpers as nh
import DataBUS.neotomaValidator as nv
from DataBUS.neotomaHelpers.logging_dict import logging_response
"""Example script demonstrating the use of DataBUS functions.
This script serves as an example of how to use the DataBUS library to validate and upload data to a Neotoma database.
It includes steps for hashing files, checking validation logs, and validating various data types such as sites, geopolitical units, collection units, etc.
The script reads data from specified CSV files, validates the data against the Neotoma database, and logs the validation process.
It also includes error handling to ensure that any issues during validation are properly logged and that database transactions
are rolled back if necessary.
The script should be run twice, once with the --upload flag set to False to perform validation and generate logs, and a second time with the --upload flag set to True to upload the validated data to the database.
Run with uv
Example usage:
uv run databus_example.py --data data/ --template template.yml --logs data/logs/ --upload False
uv run databus_example.py --data data/ --template template.yml --logs data/logs/ --upload True
"""
args = nh.parse_arguments()
# Load environment variables from .env file.
# Look at .env_example for expected format.
# This should be renamed to .env and updated with the appropriate database connection
# information for your environment.
load_dotenv()
connection = json.loads(os.getenv('PGDB_TANK'))
# Load YAML template and CSV files
filenames = glob.glob(args['data'] + "*.csv")
yml_dict = nh.template_to_dict(temp_file=args['template'])
# Connect to the PostgreSQL database using psycopg2
conn = psycopg2.connect(**connection, connect_timeout=5)
cur = conn.cursor()
start_time = datetime.now()
print(f"Start uploading at {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
for filename in tqdm(filenames, desc="Files", unit="file"):
conn.rollback()
logfile = []
databus = {}
csv_file = nh.read_csv(filename)
hashcheck = nh.hash_file(filename)
filecheck = nh.check_file(filename, validation_files="data/")
logfile = logfile + hashcheck['message'] + filecheck['message']
logfile.append(f"\nNew Upload started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
if hashcheck['pass'] is False and filecheck['pass'] is False:
logfile.append("File must be properly validated before it can be uploaded.")
hashcheck = False
else:
hashcheck = True
try:
step_bar = tqdm(total=16, desc=os.path.basename(filename), leave=False, unit="step")
# Not all steps are required for every upload.
# This is only an example of how to run DataBUS.
# Modify the steps as needed for your specific use case.
logfile.append("=== Sites ===")
result = nh.safe_step("sites", lambda csv_file=csv_file: nv.valid_site(
cur=cur, yml_dict=yml_dict, csv_file=csv_file), logfile, conn)
if result is not None:
databus['sites'] = result
logfile = logging_response(databus['sites'], logfile)
step_bar.update(1)
logfile.append("=== GPUs ===")
result = nh.safe_step("gpus", lambda csv_file=csv_file, databus=databus: nv.valid_geopolitical_units(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['gpuid'] = result
logfile = logging_response(databus['gpuid'], logfile)
step_bar.update(1)
logfile.append("=== CUs ===")
result = nh.safe_step("collunits", lambda csv_file=csv_file, databus=databus: nv.valid_collunit(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['collunits'] = result
logfile = logging_response(databus['collunits'], logfile)
step_bar.update(1)
logfile.append("=== Analysis Units ===")
result = nh.safe_step("analysisunits", lambda csv_file=csv_file, databus=databus: nv.valid_analysisunit(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['analysisunits'] = result
logfile = logging_response(databus['analysisunits'], logfile)
step_bar.update(1)
logfile.append("=== Datasets ===")
result = nh.safe_step("datasets", lambda csv_file=csv_file, databus=databus: nv.valid_dataset(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['datasets'] = result
logfile = logging_response(databus['datasets'], logfile)
step_bar.update(1)
logfile.append("=== Geochron Datasets ===")
result = nh.safe_step("geodataset", lambda csv_file=csv_file, databus=databus: nv.valid_geochron_dataset(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['geodataset'] = result
logfile = logging_response(databus['geodataset'], logfile)
step_bar.update(1)
logfile.append("=== Chronologies ===")
result = nh.safe_step("chronologies", lambda csv_file=csv_file, databus=databus: nv.valid_chronologies(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['chronologies'] = result
logfile = logging_response(databus['chronologies'], logfile)
step_bar.update(1)
logfile.append("=== Chron Controls ===")
result = nh.safe_step("chron_controls", lambda csv_file=csv_file, databus=databus: nv.valid_chroncontrols(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['chron_controls'] = result
logfile = logging_response(databus['chron_controls'], logfile)
step_bar.update(1)
logfile.append("=== Geochron ===")
result = nh.safe_step("geochron", lambda csv_file=csv_file, databus=databus: nv.valid_geochron(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['geochron'] = result
logfile = logging_response(databus['geochron'], logfile)
step_bar.update(1)
logfile.append("=== Geochron Control ===")
result = nh.safe_step("geochroncontrol", lambda databus=databus: nv.valid_geochroncontrol(
cur=cur, databus=databus), logfile, conn)
if result is not None:
databus['geochroncontrol'] = result
step_bar.update(1)
logfile.append("=== Contacts ===")
result = nh.safe_step("contacts", lambda csv_file=csv_file, databus=databus: nv.valid_contact(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['contacts'] = result
logfile = logging_response(databus['contacts'], logfile)
step_bar.update(1)
logfile.append("=== Database ===")
result = nh.safe_step("database", lambda databus=databus: nv.valid_dataset_database(
cur=cur, yml_dict=yml_dict, databus=databus), logfile, conn)
if result is not None:
databus['database'] = result
logfile = logging_response(databus['database'], logfile)
step_bar.update(1)
logfile.append("=== Samples ===")
result = nh.safe_step("samples", lambda csv_file=csv_file, databus=databus: nv.valid_sample(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['samples'] = result
logfile = logging_response(databus['samples'], logfile)
step_bar.update(1)
logfile.append("=== Sample Ages ===")
result = nh.safe_step("sample_age", lambda csv_file=csv_file, databus=databus: nv.valid_sample_age(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['sample_age'] = result
logfile = logging_response(databus['sample_age'], logfile)
step_bar.update(1)
logfile.append("=== Data ===")
result = nh.safe_step("data", lambda csv_file=csv_file, databus=databus: nv.valid_data(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['data'] = result
logfile = logging_response(databus['data'], logfile)
step_bar.update(1)
logfile.append("=== Publications ===")
result = nh.safe_step("publications", lambda csv_file=csv_file, databus=databus: nv.valid_publication(
cur=cur, yml_dict=yml_dict, csv_file=csv_file, databus=databus), logfile, conn)
if result is not None:
databus['publications'] = result
logfile = logging_response(databus['publications'], logfile)
step_bar.update(1)
step_bar.close()
all_true = all(databus[key].validAll for key in databus)
all_true = all_true and hashcheck
if args.upload:
if all_true:
# Special command for finalizing the upload and inserting into the datasetsubmissions table.
databus['finalize'] = nv.insert_final(cur,
databus = databus)
conn.commit()
logfile.append("Data has been successfully uploaded to the database.")
else:
conn.rollback()
logfile.append("Data must be fully validated before it can be uploaded to the database.")
else:
if all_true:
conn.rollback()
logfile.append("Data has been fully validated and is ready for upload.")
else:
conn.rollback()
logfile.append("Data has not passed validation. Please review the log messages for details.")
except Exception as e:
conn.rollback()
logfile.append(f"An error occurred during validation: {str(e)}")
with open(filename + '.valid.log', 'w', encoding="utf-8") as writer:
for i in logfile:
writer.write(i)
writer.write('\n')