-
Notifications
You must be signed in to change notification settings - Fork 20
Add annotations #654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add annotations #654
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| """ | ||
| Annotation tool. | ||
|
|
||
| Inspired by https://uima.apache.org/d/uimafit-current/api/ | ||
| """ | ||
|
|
||
| # TODO(zurk) move annotation module and tests to lookout-sdk-ml |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,230 @@ | ||||||
| from typing import Dict, Iterable, Iterator, Optional, Sequence, Tuple, Union, Any, Type # noqa F401 | ||||||
|
|
||||||
| from lookout.sdk.service_data_pb2 import File | ||||||
| from sortedcontainers import SortedDict | ||||||
|
|
||||||
| from lookout.style.format.annotations.annotations import Annotation, LanguageAnnotation, \ | ||||||
| PathAnnotation, UASTAnnotation | ||||||
|
|
||||||
|
|
||||||
| class NoIntersection(Exception): | ||||||
| """Raises by AnnotatedData.find_intersect() if there is no intersection.""" | ||||||
|
|
||||||
|
|
||||||
| class AnnotationsSlice(dict): | ||||||
| """ | ||||||
| Annotations collection for a specific range. | ||||||
| """ | ||||||
|
|
||||||
| def __init__(self, start, stop, *args, **kwargs): | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """Init.""" | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| super().__init__(*args, **kwargs) | ||||||
| self._range = (start, stop) | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| self._start = start | ||||||
| self._stop = stop | ||||||
|
|
||||||
| start = property(lambda self: self._start) | ||||||
|
|
||||||
| stop = property(lambda self: self._stop) | ||||||
|
|
||||||
| range = property(lambda self: self._range) | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
|
|
||||||
| class AnnotatedData: | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| Data annotation tool that allows to annotate any sequenced data you want. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| All special utilities to work with annotations should be implemented in this class | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| List of methods that can be implemented can be found here: | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| https://uima.apache.org/d/uimafit-current/api/org/apache/uima/fit/util/JCasUtil.html | ||||||
| """ | ||||||
|
|
||||||
| def __init__(self, content: str): | ||||||
| """ | ||||||
| Return new AnnotatedData instance. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| :param content: Data to annotate. It is expected to be string but can be any type with \ | ||||||
| __getitem__() defined for int and slice input arguments. | ||||||
| """ | ||||||
| self._content = content | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| # Dictionary to store annotations for all file (aka `global` annotations) | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| self._global_annotations = {} # type: Dict[Type[Annotation], Annotation] | ||||||
|
|
||||||
| # _range_to_annotations dict is created for optimization purpose only. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| # The most common use-case we have in style-analyzer is iterating through Token annotations | ||||||
| # in the sorted order. To iterate fast ordered Dict is used. | ||||||
| self._range_to_annotations = SortedDict() # type: SortedDict[(int, int), Dict[Type[Annotation], Annotation]] # noqa E501 | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| self._type_to_annotations = {} # type: Dict[Type[Annotation], SortedDict[(int, int), Annotation]] # noqa E501 | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| content = property(lambda self: self._content) | ||||||
|
|
||||||
| def __len__(self): | ||||||
| """Return length of AnnotatedData instance. It is the same as its content length.""" | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| return len(self._content) | ||||||
|
|
||||||
| def __getitem__(self, item: Union[int, slice, Tuple[int, int]]) -> Any: | ||||||
| """ | ||||||
| Get part of content for a specific index range. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| :param item: index or index range. | ||||||
| :return: Corresponding part of content. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| if isinstance(item, tuple): | ||||||
| item = slice(*item) | ||||||
| if isinstance(item, slice) and item.step is not None: | ||||||
| raise KeyError("slice.step is not supported.") | ||||||
| return self._content[item] | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| def count(self, annotation_type: Type[Annotation]): | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is nothing else to count. Can I keep it? |
||||||
| """Count number of annotations of specific type.""" | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| return len(self._type_to_annotations[annotation_type]) | ||||||
|
|
||||||
| def add(self, annotation: Annotation) -> None: | ||||||
| """ | ||||||
| Add annotation. One type annotations can not overlap with each other. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| annotation_id = type(annotation) | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| if annotation.start == 0 and annotation.stop == len(self): | ||||||
| if annotation_id in self._global_annotations: | ||||||
| raise ValueError("Global annotation %s already exists" % annotation) | ||||||
| self._global_annotations[annotation_id] = annotation | ||||||
| else: | ||||||
| # TODO(zurk): Add a check that there is no overlapping annotations of one type. | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This TODO must be resolved in this PR
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's beneficial to have overlapping annotations (for example it's required for UASTAnnotation)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should add support for the overlapping annotations but it is hard to do right now. |
||||||
| if annotation.range not in self._range_to_annotations: | ||||||
| self._range_to_annotations[annotation.range] = {} | ||||||
| if annotation_id not in self._type_to_annotations: | ||||||
| self._type_to_annotations[annotation_id] = SortedDict() | ||||||
| self._range_to_annotations[annotation.range][annotation_id] = annotation | ||||||
| self._type_to_annotations[annotation_id][annotation.range] = annotation | ||||||
|
|
||||||
| def update(self, annotations: Iterable[Annotation]) -> None: | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| Add multiple annotations. | ||||||
| """ | ||||||
| for annotation in annotations: | ||||||
| self.add(annotation) | ||||||
|
|
||||||
| def get(self, annotation_type: Type[Annotation], range: Optional[Tuple[int, int]] = None, | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| ) -> Annotation: | ||||||
| """ | ||||||
| Return a specific annotation for a given range. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| Looking for exact match only. If range is None it returns annotations that cover all | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| content (aka global annotation). | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| if range is None: | ||||||
| return self._global_annotations[annotation_type] | ||||||
| else: | ||||||
| return self._type_to_annotations[annotation_type][range] | ||||||
|
|
||||||
| def iter_annotation(self, name: Type[Annotation], start_offset: Optional[int] = None, | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| stop_offset: Optional[int] = None) -> Iterator[Annotation]: | ||||||
| """ | ||||||
| Iterate through specific type of annotation. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| Returns an annotations iterator. | ||||||
| """ | ||||||
| if stop_offset is not None: | ||||||
| raise NotImplementedError() | ||||||
| if start_offset is not None: | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| search_from = self._type_to_annotations[name].bisect_left( | ||||||
| (start_offset, start_offset)) | ||||||
| for value in self._type_to_annotations[name].values()[search_from:]: | ||||||
| yield value | ||||||
| else: | ||||||
| for value in self._type_to_annotations[name].values(): | ||||||
| yield value | ||||||
|
|
||||||
| def iter_annotations(self, types: Sequence[Type[Annotation]], | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| start_offset: Optional[int] = None, stop_offset: Optional[int] = None, | ||||||
| ) -> Iterator[AnnotationsSlice]: | ||||||
| """ | ||||||
| Iterate through annotations with specified type. | ||||||
|
|
||||||
| :return: Requested annotations slices iterator. | ||||||
| """ | ||||||
| if start_offset is not None or stop_offset is not None: | ||||||
| raise NotImplementedError() | ||||||
|
|
||||||
| types_set = frozenset(types) | ||||||
| for annotation0 in self.iter_annotation(types[0]): | ||||||
| # Annotations with the same range | ||||||
| same_range_annotations = self._range_to_annotations[annotation0.range] | ||||||
| same_range_names = set(same_range_annotations.keys()) | ||||||
| common = types_set & same_range_names | ||||||
| missing = types_set - same_range_names | ||||||
| annotations = dict() | ||||||
| for type in missing: | ||||||
| try: | ||||||
| annotations[type] = self.find_intersect(type, *annotation0.range) | ||||||
| except NoIntersection: | ||||||
| pass | ||||||
| annotations.update({type: same_range_annotations[type] for type in common}) | ||||||
| yield AnnotationsSlice(*annotation0.range, annotations) | ||||||
|
|
||||||
| def iter_items(self, types: Sequence[Type[Annotation]], start_offset: Optional[int] = None, | ||||||
| stop_offset: Optional[int] = None, | ||||||
| ) -> Iterator[Tuple[str, AnnotationsSlice]]: | ||||||
| """ | ||||||
| Iterate through annotations with specified type. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| :return: Annotated data slice with requested annotations. | ||||||
| """ | ||||||
| for annotations in self.iter_annotations(types, start_offset, stop_offset): | ||||||
| yield self[annotations.range], annotations | ||||||
|
|
||||||
| def find_intersect(self, name: Type[Annotation], start: int, stop: int) -> Annotation: | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| """ | ||||||
| Find an annotation of given type that intersects the interval [start, stop). | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| raises NoIntersection exception if there is no such annotation. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| :param name: Annotation type. | ||||||
| :param start: start of interval. | ||||||
| :param stop: end of interval. | ||||||
| :return: requested Annotation. | ||||||
| """ | ||||||
| try: | ||||||
| annotation_layer = self._type_to_annotations[name] | ||||||
| except KeyError: | ||||||
| raise NoIntersection("There is no annotation layer %s" % name) | ||||||
| search_start = max(0, annotation_layer.bisect_left((start, start)) - 1) | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| search_stop = annotation_layer.bisect_right((stop, stop)) | ||||||
| for range in annotation_layer.islice(search_start, search_stop): | ||||||
| if self._check_interval_crossing(start, stop, *range): | ||||||
| # assuming that there is only one such annotation | ||||||
| return annotation_layer[range] | ||||||
| raise NoIntersection("There is no annotation %s from %d to %d" % (name, start, stop)) | ||||||
|
|
||||||
| @classmethod | ||||||
| def _check_interval_crossing(cls, start1: int, stop1: int, start2: int, stop2: int) -> bool: | ||||||
| # TODO(zurk): explain logic with [x, x) intervals. | ||||||
zurk marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| if start1 == stop1: | ||||||
| if start2 == stop2: | ||||||
| return start1 == start2 | ||||||
| else: | ||||||
| return start2 < start1 < stop2 | ||||||
| else: | ||||||
| if start2 == stop2: | ||||||
| return start1 < start2 < stop1 | ||||||
| else: | ||||||
| return (start1 <= start2 < stop1 or | ||||||
| start1 < stop2 < stop1 or | ||||||
| start2 <= start1 < stop2) | ||||||
|
|
||||||
| @classmethod | ||||||
| def from_file(cls, file: File) -> "AnnotatedData": | ||||||
| """ | ||||||
| Create AnnotatedData instance from File. | ||||||
|
|
||||||
| :param file: file.content will be used as data to be annotated with \ | ||||||
| file.path, file.language and file.uast. | ||||||
| :return: new AnnotatedData instance. | ||||||
| """ | ||||||
| raw_data = file.content.decode("utf-8", "replace") | ||||||
| annotated_data = AnnotatedData(raw_data) | ||||||
| annotated_data.add(PathAnnotation(0, len(raw_data), file.path)) | ||||||
| annotated_data.add(UASTAnnotation(0, len(raw_data), file.uast)) | ||||||
| annotated_data.add(LanguageAnnotation(0, len(raw_data), file.language)) | ||||||
| return annotated_data | ||||||
Uh oh!
There was an error while loading. Please reload this page.