Skip to content

Commit 5b235db

Browse files
authored
Merge branch 'dev' into resource-allocation-page
2 parents 3220b7f + 913a31f commit 5b235db

File tree

9 files changed

+129
-47
lines changed

9 files changed

+129
-47
lines changed

.github/workflows/qiita-ci.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ jobs:
104104
- name: Install plugins
105105
shell: bash -l {0}
106106
run: |
107-
wget https://data.qiime2.org/distro/core/qiime2-2022.11-py38-linux-conda.yml
108-
conda env create --quiet -n qtp-biom --file qiime2-2022.11-py38-linux-conda.yml
109-
rm qiime2-2022.11-py38-linux-conda.yml
107+
conda env create -n qtp-biom --file https://data.qiime2.org/distro/amplicon/qiime2-amplicon-2024.5-py39-linux-conda.yml
110108
export QIITA_ROOTCA_CERT=`pwd`/qiita_core/support_files/ci_rootca.crt
111109
export QIITA_CONFIG_FP=`pwd`/qiita_core/support_files/config_test.cfg
112110
export REDBIOM_HOST="http://localhost:7379"

qiita_db/artifact.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1342,23 +1342,6 @@ def _helper(sql_edges, edges, nodes):
13421342
# If the job is in success we don't need to do anything
13431343
# else since it would've been added by the code above
13441344
if jstatus != 'success':
1345-
# Connect the job with his input artifacts, the
1346-
# input artifacts may or may not exist yet, so we
1347-
# need to check both the input_artifacts and the
1348-
# pending properties
1349-
for in_art in n_obj.input_artifacts:
1350-
iid = in_art.id
1351-
if iid not in nodes and iid in extra_nodes:
1352-
nodes[iid] = extra_nodes[iid]
1353-
_add_edge(edges, nodes[iid], nodes[n_obj.id])
1354-
1355-
pending = n_obj.pending
1356-
for pred_id in pending:
1357-
for pname in pending[pred_id]:
1358-
in_node_id = '%s:%s' % (
1359-
pred_id, pending[pred_id][pname])
1360-
_add_edge(edges, nodes[in_node_id],
1361-
nodes[n_obj.id])
13621345

13631346
if jstatus != 'error':
13641347
# If the job is not errored, we can add the
@@ -1380,6 +1363,34 @@ def _helper(sql_edges, edges, nodes):
13801363
queue.append(cjob.id)
13811364
if cjob.id not in nodes:
13821365
nodes[cjob.id] = ('job', cjob)
1366+
1367+
# including the outputs
1368+
for o_name, o_type in cjob.command.outputs:
1369+
node_id = '%s:%s' % (cjob.id, o_name)
1370+
node = TypeNode(
1371+
id=node_id, job_id=cjob.id,
1372+
name=o_name, type=o_type)
1373+
if node_id not in nodes:
1374+
nodes[node_id] = ('type', node)
1375+
1376+
# Connect the job with his input artifacts, the
1377+
# input artifacts may or may not exist yet, so we
1378+
# need to check both the input_artifacts and the
1379+
# pending properties
1380+
for in_art in n_obj.input_artifacts:
1381+
iid = in_art.id
1382+
if iid not in nodes and iid in extra_nodes:
1383+
nodes[iid] = extra_nodes[iid]
1384+
_add_edge(edges, nodes[iid], nodes[n_obj.id])
1385+
1386+
pending = n_obj.pending
1387+
for pred_id in pending:
1388+
for pname in pending[pred_id]:
1389+
in_node_id = '%s:%s' % (
1390+
pred_id, pending[pred_id][pname])
1391+
_add_edge(edges, nodes[in_node_id],
1392+
nodes[n_obj.id])
1393+
13831394
elif n_type == 'type':
13841395
# Connect this 'future artifact' with the job that will
13851396
# generate it

qiita_db/metadata_template/prep_template.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,9 @@ def _get_predecessors(workflow, node):
815815
pred.append(data)
816816
return pred
817817

818+
# this is only helpful for when there are no _get_predecessors
819+
return pred
820+
818821
# Note: we are going to use the final BIOMs to figure out which
819822
# processing is missing from the back/end to the front, as this
820823
# will prevent generating unnecessary steps (AKA already provided
@@ -937,6 +940,8 @@ def _get_predecessors(workflow, node):
937940
if set(merging_schemes[info]) >= set(cxns):
938941
init_artifacts = merging_schemes[info]
939942
break
943+
if not predecessors:
944+
pnode = node
940945
if init_artifacts is None:
941946
pdp = pnode.default_parameter
942947
pdp_cmd = pdp.command

qiita_db/software.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1995,9 +1995,20 @@ def graph(self):
19951995
qdb.sql_connection.TRN.add(sql, [self.id])
19961996
db_edges = qdb.sql_connection.TRN.execute_fetchindex()
19971997

1998+
# let's track what nodes are actually being used so if they do not
1999+
# have an edge we still return them as part of the graph
2000+
used_nodes = nodes.copy()
19982001
for edge_id, p_id, c_id in db_edges:
19992002
e = DefaultWorkflowEdge(edge_id)
20002003
g.add_edge(nodes[p_id], nodes[c_id], connections=e)
2004+
if p_id in used_nodes:
2005+
del used_nodes[p_id]
2006+
if c_id in used_nodes:
2007+
del used_nodes[c_id]
2008+
# adding the missing nodes
2009+
for ms in used_nodes:
2010+
g.add_node(nodes[ms])
2011+
20012012
return g
20022013

20032014
@property

qiita_db/test/test_artifact.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -404,9 +404,8 @@ def test_descendants_with_jobs(self):
404404
'"phred_offset": "auto"}')
405405
params = qdb.software.Parameters.load(qdb.software.Command(1),
406406
json_str=json_str)
407-
user = qdb.user.User('[email protected]')
408407
wf = qdb.processing_job.ProcessingWorkflow.from_scratch(
409-
user, params, name='Test WF')
408+
qdb.user.User('[email protected]'), params, name='Test WF')
410409
parent = list(wf.graph.nodes())[0]
411410
wf.add(qdb.software.DefaultParameters(10),
412411
connections={parent: {'demultiplexed': 'input_data'}})
@@ -699,6 +698,8 @@ def setUp(self):
699698

700699
self._clean_up_files.extend([self.fwd, self.rev])
701700

701+
self.user = qdb.user.User('[email protected]')
702+
702703
def tearDown(self):
703704
for f in self._clean_up_files:
704705
if exists(f):
@@ -1039,7 +1040,7 @@ def test_delete_in_construction_job(self):
10391040
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
10401041
'"phred_offset": ""}' % test.id)
10411042
qdb.processing_job.ProcessingJob.create(
1042-
qdb.user.User('[email protected]'),
1043+
self.user,
10431044
qdb.software.Parameters.load(qdb.software.Command(1),
10441045
json_str=json_str))
10451046
uploads_fp = join(qdb.util.get_mountpoint("uploads")[0][1],
@@ -1064,7 +1065,7 @@ def test_delete_error_running_job(self):
10641065
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
10651066
'"phred_offset": ""}' % test.id)
10661067
job = qdb.processing_job.ProcessingJob.create(
1067-
qdb.user.User('[email protected]'),
1068+
self.user,
10681069
qdb.software.Parameters.load(qdb.software.Command(1),
10691070
json_str=json_str))
10701071
job._set_status('running')
@@ -1147,7 +1148,7 @@ def test_delete_with_jobs(self):
11471148
'"min_per_read_length_fraction": 0.75, "sequence_max_n": 0, '
11481149
'"phred_offset": ""}' % test.id)
11491150
job = qdb.processing_job.ProcessingJob.create(
1150-
qdb.user.User('[email protected]'),
1151+
self.user,
11511152
qdb.software.Parameters.load(qdb.software.Command(1),
11521153
json_str=json_str))
11531154
job._set_status('success')
@@ -1177,8 +1178,7 @@ def test_being_deleted_by(self):
11771178
cmd = qiita_plugin.get_command('delete_artifact')
11781179
params = qdb.software.Parameters.load(
11791180
cmd, values_dict={'artifact': test.id})
1180-
job = qdb.processing_job.ProcessingJob.create(
1181-
qdb.user.User('[email protected]'), params, True)
1181+
job = qdb.processing_job.ProcessingJob.create(self.user, params, True)
11821182
job._set_status('running')
11831183

11841184
# verifying that there is a job and is the same than above
@@ -1189,8 +1189,7 @@ def test_being_deleted_by(self):
11891189
self.assertIsNone(test.being_deleted_by)
11901190

11911191
# now, let's actually remove
1192-
job = qdb.processing_job.ProcessingJob.create(
1193-
qdb.user.User('[email protected]'), params, True)
1192+
job = qdb.processing_job.ProcessingJob.create(self.user, params, True)
11941193
job.submit()
11951194
# let's wait for job
11961195
wait_for_processing_job(job.id)
@@ -1207,7 +1206,7 @@ def test_delete_as_output_job(self):
12071206
data = {'OTU table': {'filepaths': [(fp, 'biom')],
12081207
'artifact_type': 'BIOM'}}
12091208
job = qdb.processing_job.ProcessingJob.create(
1210-
qdb.user.User('[email protected]'),
1209+
self.user,
12111210
qdb.software.Parameters.load(
12121211
qdb.software.Command.get_validator('BIOM'),
12131212
values_dict={'files': dumps({'biom': [fp]}),
@@ -1448,29 +1447,50 @@ def test_descendants_with_jobs(self):
14481447
data_type="16S")
14491448
self.assertEqual(len(a.analysis.artifacts), 3)
14501449
# 3. add jobs conencting the new artifact to the other root
1450+
# - currently:
14511451
# a -> job -> b
14521452
# c
1453-
# job1 connects b & c
1454-
# job2 connects a & c
1453+
# - expected:
1454+
# a --> job -> b
1455+
# |-> job2 -> out
1456+
# ^
1457+
# |-----|---> job1 -> out
1458+
# c ------------|
14551459
cmd = qdb.software.Command.create(
14561460
qdb.software.Software(1),
14571461
"CommandWithMultipleInputs", "", {
1458-
'input_b': ['artifact:["BIOM"]', None],
1459-
'input_c': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'})
1460-
params = qdb.software.Parameters.load(
1461-
cmd, values_dict={'input_b': a.children[0].id, 'input_c': c.id})
1462-
job1 = qdb.processing_job.ProcessingJob.create(
1463-
qdb.user.User('[email protected]'), params)
1462+
'input_x': ['artifact:["BIOM"]', None],
1463+
'input_y': ['artifact:["BIOM"]', None]}, {'out': 'BIOM'})
14641464
params = qdb.software.Parameters.load(
1465-
cmd, values_dict={'input_b': a.id, 'input_c': c.id})
1466-
job2 = qdb.processing_job.ProcessingJob.create(
1467-
qdb.user.User('[email protected]'), params)
1465+
cmd, values_dict={'input_x': a.children[0].id, 'input_y': c.id})
1466+
wf = qdb.processing_job.ProcessingWorkflow.from_scratch(
1467+
self.user, params, name='Test WF')
1468+
job1 = list(wf.graph.nodes())[0]
14681469

1470+
cmd_dp = qdb.software.DefaultParameters.create("", cmd)
1471+
wf.add(cmd_dp, req_params={'input_x': a.id, 'input_y': c.id})
1472+
job2 = list(wf.graph.nodes())[1]
14691473
jobs = [j[1] for e in a.descendants_with_jobs.edges
14701474
for j in e if j[0] == 'job']
14711475
self.assertIn(job1, jobs)
14721476
self.assertIn(job2, jobs)
14731477

1478+
# 4. add job3 connecting job2 output with c as inputs
1479+
# - expected:
1480+
# a --> job -> b
1481+
# |-> job2 -> out -> job3 -> out
1482+
# ^ ^
1483+
# | |
1484+
# | |
1485+
# |-----|---> job1 -> out
1486+
# c ------------|
1487+
wf.add(cmd_dp, connections={
1488+
job1: {'out': 'input_x'}, job2: {'out': 'input_y'}})
1489+
job3 = list(wf.graph.nodes())[2]
1490+
jobs = [j[1] for e in a.descendants_with_jobs.edges
1491+
for j in e if j[0] == 'job']
1492+
self.assertIn(job3, jobs)
1493+
14741494

14751495
@qiita_test_checker()
14761496
class ArtifactArchiveTests(TestCase):

qiita_db/util.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2803,7 +2803,7 @@ def update_resource_allocation_table(weeks=1, test=None):
28032803
sacct = [
28042804
'sacct', '-p',
28052805
'--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,End,CPUTimeRAW,'
2806-
'ReqMem,AllocCPUs,AveVMSize', '--starttime',
2806+
'ReqMem,AllocCPUs,AveVMSize,MaxVMSizeNode', '--starttime',
28072807
dates[0].strftime('%Y-%m-%d'), '--endtime',
28082808
dates[1].strftime('%Y-%m-%d'), '--user', 'qiita', '--state', 'CD']
28092809

@@ -2922,6 +2922,7 @@ def merge_rows(rows):
29222922
df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x)))
29232923
df['ElapsedRawTime'] = df.ElapsedRaw.apply(
29242924
lambda x: timedelta(seconds=float(x)))
2925+
df.replace({np.nan: None}, inplace=True)
29252926

29262927
for index, row in df.iterrows():
29272928
with qdb.sql_connection.TRN:

qiita_pet/handlers/software.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ def _default_parameters_parsing(node):
6161
# getting the main default parameters
6262
nodes = []
6363
edges = []
64+
at = w.artifact_type
6465

6566
# first get edges as this will give us the main connected commands
6667
# and their order
@@ -72,18 +73,22 @@ def _default_parameters_parsing(node):
7273
# output_type: output_node_name}, ...}
7374
# for easy look up and merge of output_names
7475
main_nodes = dict()
76+
not_used_nodes = {n.id: n for n in graph.nodes}
7577
for i, (x, y) in enumerate(graph.edges):
78+
if x.id in not_used_nodes:
79+
del not_used_nodes[x.id]
80+
if y.id in not_used_nodes:
81+
del not_used_nodes[y.id]
82+
vals_x, input_x, output_x = _default_parameters_parsing(x)
83+
vals_y, input_y, output_y = _default_parameters_parsing(y)
84+
7685
connections = []
7786
for a, _, c in graph[x][y]['connections'].connections:
7887
connections.append("%s | %s" % (a, c))
7988

80-
vals_x, input_x, output_x = _default_parameters_parsing(x)
81-
vals_y, input_y, output_y = _default_parameters_parsing(y)
82-
8389
if i == 0:
8490
# we are in the first element so we can specifically select
8591
# the type we are looking for
86-
at = w.artifact_type
8792
if at in input_x[0][1]:
8893
input_x[0][1] = at
8994
else:
@@ -144,6 +149,37 @@ def _default_parameters_parsing(node):
144149

145150
wparams = w.parameters
146151

152+
# adding nodes without edges
153+
# as a first step if not_used_nodes is not empty we'll confirm that
154+
# nodes/edges are empty; in theory we should never hit this
155+
if not_used_nodes and (nodes or edges):
156+
raise ValueError(
157+
'Error, please check your workflow configuration')
158+
159+
# note that this block is similar but not identical to adding connected
160+
# nodes
161+
for i, (_, x) in enumerate(not_used_nodes.items()):
162+
vals_x, input_x, output_x = _default_parameters_parsing(x)
163+
if at in input_x[0][1]:
164+
input_x[0][1] = at
165+
else:
166+
input_x[0][1] = '** WARNING, NOT DEFINED **'
167+
168+
name_x = vals_x[0]
169+
if vals_x not in (nodes):
170+
nodes.append(vals_x)
171+
for a, b in input_x:
172+
if b in inputs:
173+
name = inputs[b]
174+
else:
175+
name = 'input_%s_%s' % (name_x, b)
176+
nodes.append([name, a, b])
177+
edges.append([name, vals_x[0]])
178+
for a, b in output_x:
179+
name = 'output_%s_%s' % (name_x, b)
180+
nodes.append([name, a, b])
181+
edges.append([name_x, name])
182+
147183
workflows.append(
148184
{'name': w.name, 'id': w.id, 'data_types': w.data_type,
149185
'description': w.description, 'active': w.active,

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
install_requires=['psycopg2', 'click', 'bcrypt', 'pandas<2.0',
106106
'biom-format', 'tornado<6.0', 'toredis', 'redis',
107107
'scp', 'pyparsing', 'h5py', 'natsort', 'nose', 'pep8',
108-
'networkx', 'humanize', 'wtforms<3.0.0', 'nltk',
108+
'networkx', 'humanize', 'wtforms<3.0.0', 'nltk<=3.8.1',
109109
'openpyxl', 'sphinx-bootstrap-theme', 'Sphinx<3.0',
110110
'gitpython', 'redbiom', 'pyzmq', 'sphinx_rtd_theme',
111111
'paramiko', 'seaborn', 'matplotlib', 'scipy<=1.10.1',

0 commit comments

Comments
 (0)