Skip to content

Commit

Permalink
[TEMP] fix traversal code
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-monch committed Feb 27, 2023
1 parent fbd3629 commit 6183a2c
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 169 deletions.
2 changes: 1 addition & 1 deletion datalad_metalad/pipeline/processor/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def process(self, pipeline_data: PipelineData) -> PipelineData:
intra_dataset_path = (
""
if object_type == "dataset"
else dataset_traverse_record.element_info.path.intra_dataset_path)
else dataset_traverse_record.element_info.intra_dataset_path)

if object_type == "file":
kwargs = dict(
Expand Down
228 changes: 60 additions & 168 deletions datalad_metalad/pipeline/provider/datasettraverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import re
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import (
Generator,
Expand Down Expand Up @@ -64,6 +65,11 @@
)


class TraversalType(Enum):
DATASET = "dataset"
FILE = "file"


@dataclass
class DatasetTraverseResult(PipelineResult):
fs_base_path: Path
Expand Down Expand Up @@ -102,9 +108,9 @@ def optional_dict(name, attribute):
class DatasetTraverser(Provider):

name_to_item_set = {
"file": {"file"},
"dataset": {"dataset"},
"both": {"file", "dataset"}
"file": {TraversalType.FILE},
"dataset": {TraversalType.DATASET},
"both": {TraversalType.FILE, TraversalType.DATASET}
}

interface_documentation = DocumentedInterface(
Expand Down Expand Up @@ -197,7 +203,6 @@ def _get_dataset_result_part(self, dataset: Dataset):
)
}

x = """
def get_annex_file_info(self,
annex_repo: AnnexRepo,
dataset_path: Path
Expand All @@ -206,68 +211,68 @@ def get_annex_file_info(self,
return [
AnnexedFileInfo.from_annex_status(status, path, str(Path(path).relative_to(dataset_path)))
if len(status) == 13
else FileInfo.from_annex_status(status, path, str(Path(path).relative_to(dataset_path)))
for path, status in annex_status
else FileInfo.from_dict({**status, "dataset_path": str(dataset_path), "path": str(path), "intra_dataset_path": str(Path(path).relative_to(dataset_path)), "bytesize": status["bytesize"]})
for path, status in annex_status.items()
if status["state"] != "untracked"
]

def _traverse_dataset(self, dataset_path: Path) -> Iterable:
dataset = require_dataset(dataset_path, purpose="dataset_traversal")
element_path = resolve_path("", dataset)
def _traverse_dataset(self, dataset_path: Path) -> Generator:
"""Traverse all elements of dataset, and potentially its subdatasets.
This method will traverse the dataset `dataset` and yield traversal
results for each `file` or installed `dataset`, depending on the
selected item types.
if isinstance(dataset.repo, AnnexRepo):
info = self.get_annex_file_info(dataset.repo)
:param Path dataset_path: the root of all traversals
:return: a generator, yielding `DatasetTraversalResult`-records
:rtype: Generator[PipelineData]
"""

if self.dataset_mask in self.item_set:
dataset = require_dataset(dataset_path, purpose="dataset_traversal")
element_path = resolve_path("", dataset)

if TraversalType.DATASET in self.item_set:
if self._already_visited(dataset, Path("")):
return
traverse_result = self._generate_result(
dataset=dataset,
dataset_path=str(dataset.pathobj),
element_path=element_path,
element_info={
"type": TraversalType.DATASET.value,
"state": "",
"gitshasum": "",
"prev_gitshasum": ""
}
)
yield PipelineData((
("path", element_path),
(
"dataset-traversal-record",
[
DatasetTraverseResult(**{
"state": ResultState.SUCCESS,
"fs_base_path": self.fs_base_path,
"type": "dataset",
"path": element_path,
**self._get_dataset_result_part(dataset)
})
]
)))
if self.file_mask in self.item_set:
repo = dataset.repo
for relative_element_path in repo.get_files():
element_path = resolve_path(relative_element_path, dataset)
if any([
re.match(pattern, path_part)
for path_part in element_path.parts
for pattern in _standard_exclude]):
lgr.debug(f"Ignoring excluded element {element_path}")
continue
("dataset-traversal-record", [traverse_result])
))

if not element_path.is_dir():
if TraversalType.FILE in self.item_set:
if isinstance(dataset.repo, AnnexRepo):
status = get_annexstatus
else:
status = GitRepo.status

if self._already_visited(dataset, relative_element_path):
for path_str, element_info in status(dataset.repo).items():
if element_info["state"] == "untracked":
continue
if element_info["type"] == "file":
element_path = Path(path_str)
if self.is_excluded(element_path):
lgr.debug(f"Ignoring excluded path {element_path}")
continue
traverse_result = self._generate_result(
dataset=dataset,
dataset_path=str(dataset.pathobj),
element_path=element_path,
element_info=element_info
)
yield PipelineData((
("path", element_path),
(
"dataset-traversal-record",
[
DatasetTraverseResult(**{
"state": ResultState.SUCCESS,
"fs_base_path": self.fs_base_path,
"type": "file",
"path": element_path,
**self._get_dataset_result_part(dataset)
})
]
)
("dataset-traversal-record", [traverse_result])
))

if self.traverse_sub_datasets:
Expand All @@ -279,9 +284,9 @@ def _traverse_dataset(self, dataset_path: Path) -> Iterable:
yield from self._traverse_dataset(submodule_info["path"])
else:
lgr.debug(
f"ignoring un-installed dataset at {submodule_path}")
f"ignoring not installed sub-dataset at "
f"{submodule_path}")
return
"""

def is_excluded(self, path: Path) -> bool:
"""Check whether any of the path parts matches an exclude-pattern."""
Expand Down Expand Up @@ -341,118 +346,5 @@ def _generate_result(self,
**self._get_dataset_result_part(dataset)
})

def _traverse_single_dataset(self,
root: Path,
dataset: Dataset
) -> Generator[DatasetTraverseResult, None, None]:
"""Traverse all elements of dataset, do not recurse into subdatasets.
This method will traverse the dataset `dataset` and yield traversal
results for each `file` or installed `dataset`, depending on the
selected item types.
:param Path root: the root of all traversals
:param Dataset dataset: the dataset to traverse in this method
:return: a generator, yielding `DatasetTraversalResult`-records
:rtype: Generator[DatasetTraverseResult, None, None]
"""

if not self.is_installed(dataset):
lgr.warning(f"ignoring un-installed dataset at {dataset.path}")
return

if "dataset" in self.item_set:
if self._already_visited(dataset, Path("")):
return
element_path = resolve_path("", dataset)
traverse_result = self._generate_result(
dataset=dataset,
dataset_path=str(dataset.pathobj),
element_path=element_path,
element_info={
"type": "dataset",
"state": "",
"gitshasum": "",
"prev_gitshasum": ""
}
)
yield PipelineData((
("path", element_path),
("dataset-traversal-record", [traverse_result])
))

if "file" in self.item_set:
if isinstance(dataset.repo, AnnexRepo):
status = get_annexstatus
else:
status = GitRepo.status

for path_str, element_info in status(dataset.repo).items():
if element_info["type"] == "file":
element_path = Path(path_str)
if self.is_excluded(element_path):
lgr.debug(f"Ignoring excluded path {element_path}")
continue
traverse_result = self._generate_result(
dataset=dataset,
dataset_path=str(dataset.pathobj),
element_path=element_path,
element_info=element_info
)
yield PipelineData((
("path", element_path),
("dataset-traversal-record", [traverse_result])
))

if self.traverse_sub_datasets:
self._traverse_subdatasets(dataset)


def _traverse_subdatasets(self,
root_dataset: Dataset
) -> Generator[DatasetTraverseResult, None, None]:
"""Traverse all sub datasets of `root_dataset`
Traverse all sub datasets of the root dataset and yield pipeline
results, that can be added to pipeline data. All `dataset_path`
instances are relative to the path of `root_dataset`.
:param root_dataset: Dataset:
:return: Generator
"""
for sub_dataset in root_dataset.subdatasets(recursive=True, **std_args):
yield from self._traverse_single_dataset(
root=root_dataset.pathobj,
dataset=Dataset(sub_dataset["path"])
)

def _traverse_datasets(self, top_level_dir: Path):
"""Traverse the dataset at top_level_dir and optionally its subdatasets
The
:param top_level_dir:
:return:
"""

# Yield annex status information from the root dataset.
dataset = Dataset(top_level_dir)
yield from self._traverse_single_dataset(
root=top_level_dir,
dataset=dataset
)

# If subdataset recursion is requested, yield annex status information
# from all subdatasets.
if self.traverse_sub_datasets:
yield from self._traverse_subdatasets(dataset)

def next_object(self) -> Iterable:
yield from self._traverse_datasets(self.fs_base_path)

doc = """
We need to collect information that was sent to legacy extractors.
- inter-dataset path
- intra-dataset path
- file infos *
"""
yield from self._traverse_dataset(self.fs_base_path)

0 comments on commit 6183a2c

Please sign in to comment.