Skip to content

add method to get the edges from a proof read annotation #1350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions webknossos/webknossos/_nml/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class Volume(NamedTuple):
segments: list[Segment]
format: str | None = None
largest_segment_id: int | None = None
edited_mapping_edges_location: str | None = None
edited_mapping_base_mapping_name: str | None = None

def _dump(self, xf: XmlWriter) -> None:
xf.startTag(
Expand Down Expand Up @@ -56,4 +58,10 @@ def _parse(cls, nml_volume: Element) -> "Volume":
largest_segment_id=None
if largest_segment_id_str is None
else int(largest_segment_id_str),
edited_mapping_edges_location=nml_volume.get(
"editedMappingEdgesLocation", default=None
),
edited_mapping_base_mapping_name=nml_volume.get(
"editedMappingBaseMappingName", default=None
),
)
107 changes: 94 additions & 13 deletions webknossos/webknossos/annotation/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
from zlib import Z_BEST_SPEED

import attr
import numpy as np
import tensorstore as ts
from cluster_tools.executor_protocol import Executor
from upath import UPath
from zipp import Path as ZipPath
Expand Down Expand Up @@ -112,6 +114,8 @@ class _VolumeLayer:
zip: ZipPath | None
segments: dict[int, SegmentInformation]
largest_segment_id: int | None
edited_mapping_zip_path: ZipPath | None
edited_mapping_base_mapping_name: str | None

def _default_zip_name(self) -> str:
return f"data_{self.id}_{self.name}.zip"
Expand Down Expand Up @@ -602,13 +606,35 @@ def _load_from_nml(
annotation._volume_layers = cls._parse_volumes(nml, possible_volume_paths)
return annotation

@staticmethod
def _get_volume_path(
location_path: str | None, possible_paths: list[ZipPath] | None
) -> ZipPath | None:
"""
Get zip that matches the location path.
"""
if location_path is None or possible_paths is None:
return None

fitting_volume_paths = [i for i in possible_paths if str(i.at) == location_path]
if len(fitting_volume_paths) == 1:
with fitting_volume_paths[0].open(mode="rb") as f:
with ZipFile(f) as volume_layer_zipfile:
if len(volume_layer_zipfile.filelist) == 0:
return None
else:
return fitting_volume_paths[0]
else:
return None

@staticmethod
def _parse_volumes(
nml: wknml.Nml, possible_paths: list[ZipPath] | None
) -> list[_VolumeLayer]:
volume_layers = []
layers_with_not_found_location = []
layers_without_location = []
edited_mapping_zip_path = None
for volume in nml.volumes:
if possible_paths is None: # when parsing NML files
volume_path = None
Expand All @@ -617,23 +643,20 @@ def _parse_volumes(
# is unpacked and loaded directly.
layers_with_not_found_location.append(volume)
volume_path = None
elif volume.location is None:
elif (
volume.location is None and volume.edited_mapping_edges_location is None
):
volume_path = None
layers_without_location.append(volume)
else:
fitting_volume_paths = [
i for i in possible_paths if str(i.at) == volume.location
]
if len(fitting_volume_paths) == 1:
with fitting_volume_paths[0].open(mode="rb") as f:
with ZipFile(f) as volume_layer_zipfile:
if len(volume_layer_zipfile.filelist) == 0:
volume_path = None
else:
volume_path = fitting_volume_paths[0]
else:
volume_path = Annotation._get_volume_path(
volume.location, possible_paths
)
edited_mapping_zip_path = Annotation._get_volume_path(
volume.edited_mapping_edges_location, possible_paths
)
if volume_path is None:
layers_with_not_found_location.append(volume)
volume_path = None

segments = {}
if volume.segments is not None:
Expand All @@ -655,6 +678,8 @@ def _parse_volumes(
else DataFormat.WKW
),
zip=volume_path,
edited_mapping_zip_path=edited_mapping_zip_path,
edited_mapping_base_mapping_name=volume.edited_mapping_base_mapping_name,
segments=segments,
largest_segment_id=volume.largest_segment_id,
)
Expand Down Expand Up @@ -1095,6 +1120,8 @@ def add_volume_layer(
zip=None,
segments={},
largest_segment_id=None,
edited_mapping_zip_path=None,
edited_mapping_base_mapping_name=None,
)
)

Expand Down Expand Up @@ -1158,6 +1185,60 @@ def _get_volume_layer(
+ "Please specify which layer should be used via volume_layer_name or volume_layer_id."
)

def get_edited_mapping_edges(
self, volume_layer_name: str | None = None
) -> tuple[np.ndarray, np.ndarray, str]:
"""Returns the edited mapping edges for a volume layer.

Returns a tuple of the edited mapping edges, edge_is_addition, and the base mapping name.

Args:
volume_layer_name: Name of the layer to get edited mapping edges for.

Returns:
EditedMappingEdges: Edited mapping edges and base mapping name.
"""
volume_layer = self._get_volume_layer(volume_layer_name)
assert volume_layer.edited_mapping_base_mapping_name is not None, (
"edited_mapping_base_mapping_name must not be None"
)
if volume_layer.edited_mapping_zip_path is None:
raise ValueError("The volume layer does not have edited mapping edges.")
with volume_layer.edited_mapping_zip_path.open(mode="rb") as f:
edited_mapping_zip = ZipFile(f)
with TemporaryDirectory() as tmp_dir:
edited_mapping_zip.extractall(tmp_dir)
edges = np.array(
ts.open(
{
"driver": "zarr3",
"kvstore": {
"driver": "file",
"path": str(UPath(tmp_dir).resolve() / "edges"),
},
},
open=True,
create=False,
).result()[:]
)
edge_is_addition = np.array(
ts.open(
{
"driver": "zarr3",
"kvstore": {
"driver": "file",
"path": str(
UPath(tmp_dir).resolve() / "edgeIsAddition"
),
},
},
open=True,
create=False,
).result()[:]
)

return edges, edge_is_addition, volume_layer.edited_mapping_base_mapping_name

def delete_volume_layer(
self,
volume_layer_name: str | None = None,
Expand Down
Loading