|
| 1 | +from typing import Dict, Iterable, Iterator, Optional, Sequence, Tuple, Union, Any, Type # noqa F401 |
| 2 | + |
| 3 | +from lookout.sdk.service_data_pb2 import File |
| 4 | +from sortedcontainers import SortedDict |
| 5 | + |
| 6 | +from lookout.style.format.annotations.annotations import Annotation, LanguageAnnotation, \ |
| 7 | + PathAnnotation, UASTAnnotation |
| 8 | + |
| 9 | + |
| 10 | +class NoIntersection(Exception): |
| 11 | + """Raises by AnnotatedData.find_intersect() if there is no intersection.""" |
| 12 | + |
| 13 | + |
| 14 | +class AnnotationsSlice(dict): |
| 15 | + """ |
| 16 | + Annotations collection for a specific range. |
| 17 | + """ |
| 18 | + |
| 19 | + def __init__(self, start, stop, *args, **kwargs): |
| 20 | + """Init.""" |
| 21 | + super().__init__(*args, **kwargs) |
| 22 | + self._range = (start, stop) |
| 23 | + self._start = start |
| 24 | + self._stop = stop |
| 25 | + |
| 26 | + start = property(lambda self: self._start) |
| 27 | + |
| 28 | + stop = property(lambda self: self._stop) |
| 29 | + |
| 30 | + range = property(lambda self: self._range) |
| 31 | + |
| 32 | + |
| 33 | +class AnnotatedData: |
| 34 | + """ |
| 35 | + Class that couples annotations and data together. |
| 36 | +
|
| 37 | + All special utilities to work with annotations should be implemented in this class |
| 38 | + List of methods that should be implemented can be found here: |
| 39 | + https://uima.apache.org/d/uimafit-current/api/org/apache/uima/fit/util/JCasUtil.html |
| 40 | + """ |
| 41 | + |
| 42 | + def __init__(self, content: str): |
| 43 | + """ |
| 44 | + Init. |
| 45 | +
|
| 46 | + :param content: Data to annotate. It is expected to be string but actually can be any type |
| 47 | + with __getitem__() defined for int and slice input arguments. |
| 48 | + """ |
| 49 | + self._content = content |
| 50 | + |
| 51 | + self._range_to_annotations = SortedDict() # type: SortedDict[(int, int), Dict[Type[Annotation], Annotation]] # noqa E501 |
| 52 | + self._type_to_annotations = {} # type: Dict[Type[Annotation], SortedDict[(int, int), Annotation]] # noqa E501 |
| 53 | + |
| 54 | + content = property(lambda self: self._content) |
| 55 | + |
| 56 | + def __getitem__(self, item: Union[int, slice, Tuple[int, int]]) -> Any: |
| 57 | + if isinstance(item, tuple): |
| 58 | + item = slice(*item) |
| 59 | + if isinstance(item, slice) and item.step is not None: |
| 60 | + raise KeyError("slice.step is not supported.") |
| 61 | + return self._content[item] |
| 62 | + |
| 63 | + def add(self, annotation: Annotation) -> None: |
| 64 | + """ |
| 65 | + Add annotation. |
| 66 | + """ |
| 67 | + annotation_id = type(annotation) |
| 68 | + # TODO(zurk): Add a check that there is no overlapping annotations of one type. |
| 69 | + if annotation.range not in self._range_to_annotations: |
| 70 | + self._range_to_annotations[annotation.range] = {} |
| 71 | + if annotation_id not in self._type_to_annotations: |
| 72 | + self._type_to_annotations[annotation_id] = SortedDict() |
| 73 | + self._range_to_annotations[annotation.range][annotation_id] = annotation |
| 74 | + self._type_to_annotations[annotation_id][annotation.range] = annotation |
| 75 | + |
| 76 | + def update(self, annotations: Iterable[Annotation]) -> None: |
| 77 | + """ |
| 78 | + Update with annotations. |
| 79 | + """ |
| 80 | + for annotation in annotations: |
| 81 | + self.add(annotation) |
| 82 | + |
| 83 | + def iget(self, annotation_type: Type[Annotation], index: int) -> Annotation: |
| 84 | + """ |
| 85 | + Return an annotation and for given type and index. |
| 86 | + """ |
| 87 | + return self._type_to_annotations[annotation_type].peekitem(index)[1] |
| 88 | + |
| 89 | + def iter_annotation(self, name: str, start_offset: Optional[int] = None, |
| 90 | + stop_offset: Optional[int] = None) -> Iterator[Annotation]: |
| 91 | + """ |
| 92 | + Iterate through specific annotation atomic_tokens, ys, files, etc. |
| 93 | +
|
| 94 | + Returns slice of RawData and its annotation. |
| 95 | + """ |
| 96 | + if start_offset is not None or stop_offset is not None: |
| 97 | + raise NotImplementedError() |
| 98 | + |
| 99 | + for value in self._type_to_annotations[name].values(): |
| 100 | + yield value |
| 101 | + |
| 102 | + def iter_annotations(self, types: Sequence[Type[Annotation]], |
| 103 | + start_offset: Optional[int] = None, stop_offset: Optional[int] = None, |
| 104 | + ) -> Iterator[AnnotationsSlice]: |
| 105 | + """ |
| 106 | + Iterate through annotations with specified type. |
| 107 | +
|
| 108 | + :return: Requested annotations slice. |
| 109 | + """ |
| 110 | + if start_offset is not None or stop_offset is not None: |
| 111 | + raise NotImplementedError() |
| 112 | + |
| 113 | + types_set = frozenset(types) |
| 114 | + for annotation0 in self.iter_annotation(types[0]): |
| 115 | + # Annotations with the same range |
| 116 | + same_range_annotations = self._range_to_annotations[annotation0.range] |
| 117 | + same_range_names = set(same_range_annotations.keys()) |
| 118 | + common = types_set & same_range_names |
| 119 | + missing = types_set - same_range_names |
| 120 | + annotations = dict() |
| 121 | + for type in missing: |
| 122 | + try: |
| 123 | + annotations[type] = self.find_intersect(type, *annotation0.range) |
| 124 | + except NoIntersection: |
| 125 | + pass |
| 126 | + annotations.update({type: same_range_annotations[type] for type in common}) |
| 127 | + yield AnnotationsSlice(*annotation0.range, annotations) |
| 128 | + |
| 129 | + def iter_items(self, types: Sequence[Type[Annotation]], start_offset: Optional[int] = None, |
| 130 | + stop_offset: Optional[int] = None, |
| 131 | + ) -> Iterator[Tuple[str, AnnotationsSlice]]: |
| 132 | + """ |
| 133 | + Iterate through annotations with specified type. |
| 134 | +
|
| 135 | + :return: Annotated data slice with requested annotations. |
| 136 | + """ |
| 137 | + for annotations in self.iter_annotations(types, start_offset, stop_offset): |
| 138 | + yield self[annotations.range], annotations |
| 139 | + |
| 140 | + def find_intersect(self, name: str, start: int, stop: int) -> Annotation: |
| 141 | + """ |
| 142 | + Find an annotation of given type that intersects the interval [start, stop). |
| 143 | +
|
| 144 | + raises NoIntersection exception if there is no such annotation. |
| 145 | +
|
| 146 | + :param name: Annotation type. |
| 147 | + :param start: start of interval. |
| 148 | + :param stop: end of interval. |
| 149 | + :return: requested Annotation. |
| 150 | + """ |
| 151 | + try: |
| 152 | + annotation_layer = self._type_to_annotations[name] |
| 153 | + except KeyError: |
| 154 | + raise NoIntersection("There is no annotation layer %s" % name) |
| 155 | + search_start = max(0, annotation_layer.bisect_left((start, start)) - 1) |
| 156 | + search_stop = annotation_layer.bisect_right((stop, stop)) |
| 157 | + for range in annotation_layer.islice(search_start, search_stop): |
| 158 | + if self._check_interval_crossing(start, stop, *range): |
| 159 | + # assuming that there is only one such annotation |
| 160 | + return annotation_layer[range] |
| 161 | + raise NoIntersection("There is no annotation %s from %d to %d" % (name, start, stop)) |
| 162 | + |
| 163 | + @classmethod |
| 164 | + def _check_interval_crossing(cls, start1: int, stop1: int, start2: int, stop2: int) -> bool: |
| 165 | + # TODO(zurk): explain logic with [x, x) intervals. |
| 166 | + if start1 == stop1: |
| 167 | + if start2 == stop2: |
| 168 | + return start1 == start2 |
| 169 | + else: |
| 170 | + return start2 < start1 < stop2 |
| 171 | + else: |
| 172 | + if start2 == stop2: |
| 173 | + return start1 < start2 < stop1 |
| 174 | + else: |
| 175 | + return (start1 <= start2 < stop1 or |
| 176 | + start1 < stop2 < stop1 or |
| 177 | + start2 <= start1 < stop2) |
| 178 | + |
| 179 | + def subiter_annotation(self, name: str, covering_annotation: Annotation): |
| 180 | + """TODO.""" |
| 181 | + raise NotImplementedError() |
| 182 | + |
| 183 | + def sub_iter_annotations(self, names: Sequence[str], covering_annotation: Annotation): |
| 184 | + """TODO.""" |
| 185 | + raise NotImplementedError() |
| 186 | + |
| 187 | + @classmethod |
| 188 | + def from_file(cls, file: File) -> "AnnotatedData": |
| 189 | + """ |
| 190 | + Create AnnotatedData instance from File. |
| 191 | +
|
| 192 | + :param file: file.content will be used as data to be annotated with \ |
| 193 | + file.path, file.language and file.uast. |
| 194 | + :return: new AnnotatedData instance. |
| 195 | + """ |
| 196 | + raw_data = file.content.decode("utf-8", "replace") |
| 197 | + annotated_data = AnnotatedData(raw_data) |
| 198 | + annotated_data.add(PathAnnotation(0, len(raw_data), file.path)) |
| 199 | + annotated_data.add(UASTAnnotation(0, len(raw_data), file.uast)) |
| 200 | + annotated_data.add(LanguageAnnotation(0, len(raw_data), file.language)) |
| 201 | + return annotated_data |
0 commit comments