Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce pipeline processes #298

Open
wants to merge 68 commits into
base: maint_0.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
5d54c40
add package tools and clean up requirements
christian-monch Jul 29, 2022
6568591
update release-pypi goal in Makefile
christian-monch Jul 29, 2022
15b1168
use dataclasses for annex-status info
christian-monch Jul 29, 2022
e362521
add --file-info parameter
christian-monch Jul 30, 2022
b389caf
clean up traverser and use --file-info
christian-monch Aug 1, 2022
2e505e4
more cleanup
christian-monch Aug 3, 2022
492804b
distinguish between GitRepo and AnnexRepo
christian-monch Aug 3, 2022
1e6760a
fix dataset path returned by traverser
christian-monch Aug 3, 2022
6585ff3
handle id-less datasets
christian-monch Aug 3, 2022
af9b45a
update required versions
christian-monch Oct 21, 2022
f07616e
fix some typos and pep-8 warnings
christian-monch Nov 1, 2022
e9e8353
add probe pipeline elements
christian-monch Nov 1, 2022
5aa694d
improve probe elements
christian-monch Nov 2, 2022
b68c1f1
add an invocation count to processor probe
christian-monch Nov 2, 2022
0acee9d
backport a fix for traversal root path handling
christian-monch Nov 2, 2022
e40c26d
send stop pipeline-data to consumer
christian-monch Nov 3, 2022
6fe8e74
remove an unused import
christian-monch Dec 8, 2022
e581244
fix a bug in relative item path determination
christian-monch Jan 12, 2023
254ba86
add a test for pipeline extract adapter
christian-monch Jan 16, 2023
7f928d0
add nose to requirements
christian-monch Jan 24, 2023
ff73673
fix parameter names
christian-monch Jan 25, 2023
a601e85
require dataset IDs, rebase on maint_0.4
christian-monch Jan 26, 2023
d2cb104
fix dataset reporting
christian-monch Jan 26, 2023
8dee26d
update requirements-files
christian-monch Feb 25, 2023
909bba5
[TEMP] fix traversal code
christian-monch Feb 27, 2023
90609b3
copy code from datalad to prevent pulling in nose
christian-monch Feb 27, 2023
46ffe7c
fix use of `to_json()` instead of `to_dict()`
christian-monch Feb 27, 2023
8a29f5b
finalize rebase on maint
christian-monch Mar 21, 2023
da6a7c6
implement a faster dataset traverser
christian-monch Mar 23, 2023
66d9901
add a known flag
christian-monch Mar 23, 2023
2dbc0a9
fix git ls-file interpretation
christian-monch Mar 24, 2023
95d38fe
cache dataset version
christian-monch Mar 24, 2023
de02bc5
yield a relative dataset path and simplify code
christian-monch Mar 25, 2023
ff45963
[TEMP]
christian-monch Mar 26, 2023
940e4a0
[TEMP2]
christian-monch Mar 26, 2023
0ff6598
use annex find to add annex info to FileInfo
christian-monch Mar 27, 2023
46bd69e
[temp3] add a stdin-based adding consumer
christian-monch Mar 29, 2023
55c6ad3
[temp4] use --json-lines
christian-monch Mar 30, 2023
0eb804f
provide proper JSON lines to stdin-adder
christian-monch Apr 3, 2023
b2b91b6
log metadata sent to stdin-adder on debug level
christian-monch Apr 3, 2023
1ec78ad
add size information for non-annex files
christian-monch Apr 5, 2023
a5f03e6
fix a bug where file-size was returned as string
christian-monch Apr 17, 2023
20349b5
[temp] consolidate listing code
christian-monch Apr 24, 2023
43409cc
handle directory attributes properly
christian-monch Apr 26, 2023
f3916a9
add `key` to status dict returned by ls_struct
christian-monch Apr 26, 2023
7e2b5ab
adapt conduct test to new traverser
christian-monch Apr 26, 2023
ee6f9c5
add a demo of a possible dataset iterator
christian-monch Jun 8, 2023
f994320
add a first example with parallel execution
christian-monch Jun 9, 2023
75b8088
use xargs instead of subprocess
christian-monch Jun 14, 2023
3dcfd91
working parallel example
christian-monch Jun 14, 2023
7e4b62b
add an example for adding from parallel
christian-monch Jun 14, 2023
db341c7
use ls-tree options supported by older git
christian-monch Jun 14, 2023
dfb0b53
fix a faulty format spec in git-annex-find
christian-monch Jun 14, 2023
388cfa2
compare ls_struct for empty or None
christian-monch Jun 16, 2023
cce26a2
improve git output parsing
christian-monch Jun 16, 2023
5e1246c
improve extract adaptor
christian-monch Jun 16, 2023
820d719
add DatasetInfo creation from file-info parameter
christian-monch Jun 19, 2023
b7dc8dd
add individual adaptors for file and dataset
christian-monch Jun 19, 2023
499da79
add a metadata iterator
christian-monch Jun 22, 2023
018515c
add a trivial traversal pipeline
christian-monch Jul 4, 2023
b587f9b
add location info to traverser result
christian-monch Jul 4, 2023
50f42bf
add root dataset info to output
christian-monch Jul 13, 2023
c2ffeeb
add an adaptor to emit files for tabby-file TSVs
christian-monch Jul 28, 2023
cf75265
fix creation of AnnexedFileInfo in get_path_info
christian-monch Aug 15, 2023
a34b05a
improve report of unrecognized extractor classes
christian-monch Aug 15, 2023
48eacde
remove unused code
christian-monch Aug 15, 2023
b80f94c
improve reporting of unrecognized extractor class
christian-monch Aug 15, 2023
c76dfeb
ignore classes from datalad-deprecated
christian-monch Aug 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ code-analysis:
release-pypi: clean
# better safe than sorry
test ! -e dist
python setup.py sdist
python setup.py bdist_wheel --universal
twine upload dist/*
python3 -m pip install --upgrade build
python3 -m pip install --upgrade twine
python3 -m build
python3 -m twine upload dist/*
6 changes: 2 additions & 4 deletions datalad_metalad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
import hashlib

from ._version import get_versions


__docformat__ = 'restructuredtext'

Expand Down Expand Up @@ -105,9 +107,5 @@ def get_agent_id(name, email):
).encode('utf-8')).hexdigest()


from datalad import setup_package
from datalad import teardown_package

from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
4 changes: 2 additions & 2 deletions datalad_metalad/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def _get_top_nodes(realm: Path,
# with an un-versioned path. In both cases the internal dataset-tree
# path is "". If set, the un-versioned path is stored in the prefix
# path element in the version list (which confusingly is also called
# "path".
# "path").
assert ap.dataset_path in (top_level_dataset_tree_path, None)

# We leave the creation of the respective nodes to auto_create
Expand Down Expand Up @@ -738,7 +738,7 @@ def get_tvl_uuid_mrr_metadata_file_tree(
Read tree version list, uuid set, metadata root record, dataset-level
metadata, and filetree from the metadata store, for the given root
dataset id, root dataset version, dataset id, dataset version, dataset path,
and unversioned path.
and un-versioned path.

This function caches results in order to avoid costly persist operations.

Expand Down
10 changes: 5 additions & 5 deletions datalad_metalad/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
sds-path = path of sds-pd-version in rds-version
add metadata_root_record to uuid-set(rds).sds-pd-version, sds-path
else:
Error("Cannot find path of sds-uuid@sds-pd-version in any rds@version)
Error("Cannot find path of sds-uuid@sds-pd-version in any rds@version")
Error("What can you do? Not much besides re-aggregating")
Error("What can we do? Add a structure that allows for 'detached' metadata")

Expand Down Expand Up @@ -128,7 +128,7 @@ class Aggregate(Interface):
i.e. the directory that contains the ".datalad"-entry, to the top-level
directory of the respective sub-dataset.

Aggregate works on existing metadata, it will not extract meta data from
Aggregate works on existing metadata, it will not extract metadata from
data file. To create metadata, use the meta-extract command.

As a result of the aggregation, the metadata of all specified sub-datasets
Expand Down Expand Up @@ -305,7 +305,7 @@ def copy_uuid_set(destination_metadata_store: str,

# If the destination does not contain a version list for the
# source UUID, we add a copy of the source version list with
# a the specified path prefix
# the specified path prefix
if uuid not in destination_uuid_set.uuids():

lgr.debug(
Expand Down Expand Up @@ -349,7 +349,7 @@ def copy_uuid_set(destination_metadata_store: str,
element=element.deepcopy(
new_destination=destination_metadata_store))

# Unget the versioned element
# un-get the versioned element
lgr.debug(
f"persisting copied metadata element for pd version "
f"{pd_version} of UUID: {uuid}")
Expand All @@ -368,7 +368,7 @@ def copy_uuid_set(destination_metadata_store: str,
primary_data_version=pd_version,
prefix_path=old_path)

# Unget the version list in the destination, that should persist it
# un-get the version list in the destination, that should persist it
lgr.debug(f"persisting copied version list for UUID: {uuid}")
destination_uuid_set.unget_version_list(uuid)

Expand Down
19 changes: 13 additions & 6 deletions datalad_metalad/conduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ class Conduct(Interface):
- A list of processors. A processor reads data,
either from the previous processor or the provider and performs
computations on the data and return a result that is processed by
the next processor. The computation may have side-effect,
the next processor. The computation may have side effects,
e.g. store metadata.

The provider is usually executed in the current processes' main
thread. Processors are usually executed in concurrent processes,
i.e. workers. The maximum number of workers is given by the
parameter `max_workers`.

Which provider and which processors are used is defined in an
Which provider and which processors are used is defined in a
"configuration", which is given as JSON-serialized dictionary.
"""

Expand Down Expand Up @@ -303,7 +303,7 @@ def process_parallel(executor,
status="ok",
path=str(path),
logger=lgr,
pipeline_data=pipeline_data.to_json())
pipeline_data=pipeline_data.to_dict())
continue

lgr.debug(f"Starting new instance of {processor_specs[0]} on {pipeline_data}")
Expand Down Expand Up @@ -343,7 +343,7 @@ def process_parallel(executor,
status="ok",
path=str(path),
logger=lgr,
pipeline_data=pipeline_data.to_json())
pipeline_data=pipeline_data.to_dict())
else:
lgr.debug(
f"Starting processor[{next_index}]"
Expand Down Expand Up @@ -396,7 +396,7 @@ def process_parallel(executor,
status="ok",
path=str(path),
logger=lgr,
pipeline_data=pipeline_data.to_json())
pipeline_data=pipeline_data.to_dict())
else:
lgr.debug(
f"Handing pipeline data {pipeline_data} to"
Expand All @@ -418,6 +418,10 @@ def process_parallel(executor,
status="error",
logger=lgr,
message=traceback.format_exc())

if consumer_instance:
consumer_instance.consume(PipelineData(state=PipelineDataState.STOP))

return


Expand All @@ -435,6 +439,9 @@ def process_sequential(provider_instance: Provider,
evaluated_constructor_args=evaluated_constructor_args,
consumer_instance=consumer_instance)

if consumer_instance:
consumer_instance.consume(PipelineData(state=PipelineDataState.STOP))


def process_downstream(pipeline_data: PipelineData,
processor_specs: list[dict],
Expand Down Expand Up @@ -493,7 +500,7 @@ def process_downstream(pipeline_data: PipelineData,
status="ok",
path=str(path),
logger=lgr,
pipeline_data=pipeline_data.to_json())
pipeline_data=pipeline_data.to_dict())

lgr.debug(
f"Pipeline finished, returning datalad result {datalad_result}")
Expand Down
10 changes: 9 additions & 1 deletion datalad_metalad/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from typing import List, Optional

from datalad.support.exceptions import InsufficientArgumentsError
from datalad.support.exceptions import (
InsufficientArgumentsError,
NoDatasetArgumentFound,
)
from datalad.utils import ensure_unicode


class NoDatasetIdFound(NoDatasetArgumentFound):
"""Raised whenever a dataset ID cannot be found in a dataset."""
pass


class MetadataKeyException(RuntimeError):
def __init__(self,
message: str = "",
Expand Down
Loading