Skip to content

Commit ad6da8f

Browse files
committed
Add annotations
Signed-off-by: Konstantin Slavnov <[email protected]>
1 parent 5dc543b commit ad6da8f

File tree

11 files changed

+604
-86
lines changed

11 files changed

+604
-86
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""
2+
Annotation tool.
3+
4+
Inspired by https://uima.apache.org/d/uimafit-current/api/
5+
"""
6+
7+
# TODO(zurk) move annotation module and tests to lookout-sdk-ml
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
from typing import Dict, Iterable, Iterator, Optional, Sequence, Tuple, Union, Any, Type # noqa F401
2+
3+
from lookout.sdk.service_data_pb2 import File
4+
from sortedcontainers import SortedDict
5+
6+
from lookout.style.format.annotations.annotations import Annotation, LanguageAnnotation, \
7+
PathAnnotation, UASTAnnotation
8+
9+
10+
class NoIntersection(Exception):
11+
"""Raises by AnnotatedData.find_intersect() if there is no intersection."""
12+
13+
14+
class AnnotationsSlice(dict):
15+
"""
16+
Annotations collection for a specific range.
17+
"""
18+
19+
def __init__(self, start, stop, *args, **kwargs):
20+
"""Init."""
21+
super().__init__(*args, **kwargs)
22+
self._range = (start, stop)
23+
self._start = start
24+
self._stop = stop
25+
26+
start = property(lambda self: self._start)
27+
28+
stop = property(lambda self: self._stop)
29+
30+
range = property(lambda self: self._range)
31+
32+
33+
class AnnotatedData:
34+
"""
35+
Data annotation tool that allows to annotate any sequenced data you want.
36+
37+
All special utilities to work with annotations should be implemented in this class
38+
List of methods that can be implemented can be found here:
39+
https://uima.apache.org/d/uimafit-current/api/org/apache/uima/fit/util/JCasUtil.html
40+
"""
41+
42+
def __init__(self, content: str):
43+
"""
44+
Return new AnnotatedData instance.
45+
46+
:param content: Data to annotate. It is expected to be string but can be any type with \
47+
__getitem__() defined for int and slice input arguments.
48+
"""
49+
self._content = content
50+
51+
# Dictionary to store annotations for all file (aka `global` annotations)
52+
self._global_annotations = {} # type: Dict[Type[Annotation], Annotation]
53+
54+
# _range_to_annotations dict is created for optimization purpose only.
55+
# The most common use-case we have in style-analyzer is iterating through Token annotations
56+
# in the sorted order. To iterate fast ordered Dict is used.
57+
self._range_to_annotations = SortedDict() # type: SortedDict[(int, int), Dict[Type[Annotation], Annotation]] # noqa E501
58+
self._type_to_annotations = {} # type: Dict[Type[Annotation], SortedDict[(int, int), Annotation]] # noqa E501
59+
60+
content = property(lambda self: self._content)
61+
62+
def __len__(self):
63+
"""Return length of AnnotatedData instance. It is the same as its content length."""
64+
return len(self._content)
65+
66+
def __getitem__(self, item: Union[int, slice, Tuple[int, int]]) -> Any:
67+
"""
68+
Get part of content for a specific index range.
69+
70+
:param item: index or index range.
71+
:return: Corresponding part of content.
72+
"""
73+
if isinstance(item, tuple):
74+
item = slice(*item)
75+
if isinstance(item, slice) and item.step is not None:
76+
raise KeyError("slice.step is not supported.")
77+
return self._content[item]
78+
79+
def count(self, annotation_type: Type[Annotation]):
80+
"""Count number of annotations of specific type."""
81+
return len(self._type_to_annotations[annotation_type])
82+
83+
def add(self, annotation: Annotation) -> None:
84+
"""
85+
Add annotation. One type annotations can not overlap with each other.
86+
"""
87+
annotation_id = type(annotation)
88+
if annotation.start == 0 and annotation.stop == len(self):
89+
if annotation_id in self._global_annotations:
90+
raise ValueError("Global annotation %s already exists" % annotation)
91+
self._global_annotations[annotation_id] = annotation
92+
else:
93+
# TODO(zurk): Add a check that there is no overlapping annotations of one type.
94+
if annotation.range not in self._range_to_annotations:
95+
self._range_to_annotations[annotation.range] = {}
96+
if annotation_id not in self._type_to_annotations:
97+
self._type_to_annotations[annotation_id] = SortedDict()
98+
self._range_to_annotations[annotation.range][annotation_id] = annotation
99+
self._type_to_annotations[annotation_id][annotation.range] = annotation
100+
101+
def update(self, annotations: Iterable[Annotation]) -> None:
102+
"""
103+
Add multiple annotations.
104+
"""
105+
for annotation in annotations:
106+
self.add(annotation)
107+
108+
def get(self, annotation_type: Type[Annotation], range: Optional[Tuple[int, int]] = None,
109+
) -> Annotation:
110+
"""
111+
Return a specific annotation for a given range.
112+
113+
Looking for exact match only. If range is None it returns annotations that cover all
114+
content (aka global annotation).
115+
"""
116+
if range is None:
117+
return self._global_annotations[annotation_type]
118+
else:
119+
return self._type_to_annotations[annotation_type][range]
120+
121+
def iter_annotation(self, name: Type[Annotation], start_offset: Optional[int] = None,
122+
stop_offset: Optional[int] = None) -> Iterator[Annotation]:
123+
"""
124+
Iterate through specific type of annotation.
125+
126+
Returns an annotations iterator.
127+
"""
128+
if stop_offset is not None:
129+
raise NotImplementedError()
130+
if start_offset is not None:
131+
search_from = self._type_to_annotations[name].bisect_left(
132+
(start_offset, start_offset))
133+
for value in self._type_to_annotations[name].values()[search_from:]:
134+
yield value
135+
else:
136+
for value in self._type_to_annotations[name].values():
137+
yield value
138+
139+
def iter_annotations(self, types: Sequence[Type[Annotation]],
140+
start_offset: Optional[int] = None, stop_offset: Optional[int] = None,
141+
) -> Iterator[AnnotationsSlice]:
142+
"""
143+
Iterate through annotations with specified type.
144+
145+
:return: Requested annotations slices iterator.
146+
"""
147+
if start_offset is not None or stop_offset is not None:
148+
raise NotImplementedError()
149+
150+
types_set = frozenset(types)
151+
for annotation0 in self.iter_annotation(types[0]):
152+
# Annotations with the same range
153+
same_range_annotations = self._range_to_annotations[annotation0.range]
154+
same_range_names = set(same_range_annotations.keys())
155+
common = types_set & same_range_names
156+
missing = types_set - same_range_names
157+
annotations = dict()
158+
for type in missing:
159+
try:
160+
annotations[type] = self.find_intersect(type, *annotation0.range)
161+
except NoIntersection:
162+
pass
163+
annotations.update({type: same_range_annotations[type] for type in common})
164+
yield AnnotationsSlice(*annotation0.range, annotations)
165+
166+
def iter_items(self, types: Sequence[Type[Annotation]], start_offset: Optional[int] = None,
167+
stop_offset: Optional[int] = None,
168+
) -> Iterator[Tuple[str, AnnotationsSlice]]:
169+
"""
170+
Iterate through annotations with specified type.
171+
172+
:return: Annotated data slice with requested annotations.
173+
"""
174+
for annotations in self.iter_annotations(types, start_offset, stop_offset):
175+
yield self[annotations.range], annotations
176+
177+
def find_intersect(self, name: Type[Annotation], start: int, stop: int) -> Annotation:
178+
"""
179+
Find an annotation of given type that intersects the interval [start, stop).
180+
181+
raises NoIntersection exception if there is no such annotation.
182+
183+
:param name: Annotation type.
184+
:param start: start of interval.
185+
:param stop: end of interval.
186+
:return: requested Annotation.
187+
"""
188+
try:
189+
annotation_layer = self._type_to_annotations[name]
190+
except KeyError:
191+
raise NoIntersection("There is no annotation layer %s" % name)
192+
search_start = max(0, annotation_layer.bisect_left((start, start)) - 1)
193+
search_stop = annotation_layer.bisect_right((stop, stop))
194+
for range in annotation_layer.islice(search_start, search_stop):
195+
if self._check_interval_crossing(start, stop, *range):
196+
# assuming that there is only one such annotation
197+
return annotation_layer[range]
198+
raise NoIntersection("There is no annotation %s from %d to %d" % (name, start, stop))
199+
200+
@classmethod
201+
def _check_interval_crossing(cls, start1: int, stop1: int, start2: int, stop2: int) -> bool:
202+
# TODO(zurk): explain logic with [x, x) intervals.
203+
if start1 == stop1:
204+
if start2 == stop2:
205+
return start1 == start2
206+
else:
207+
return start2 < start1 < stop2
208+
else:
209+
if start2 == stop2:
210+
return start1 < start2 < stop1
211+
else:
212+
return (start1 <= start2 < stop1 or
213+
start1 < stop2 < stop1 or
214+
start2 <= start1 < stop2)
215+
216+
@classmethod
217+
def from_file(cls, file: File) -> "AnnotatedData":
218+
"""
219+
Create AnnotatedData instance from File.
220+
221+
:param file: file.content will be used as data to be annotated with \
222+
file.path, file.language and file.uast.
223+
:return: new AnnotatedData instance.
224+
"""
225+
raw_data = file.content.decode("utf-8", "replace")
226+
annotated_data = AnnotatedData(raw_data)
227+
annotated_data.add(PathAnnotation(0, len(raw_data), file.path))
228+
annotated_data.add(UASTAnnotation(0, len(raw_data), file.uast))
229+
annotated_data.add(LanguageAnnotation(0, len(raw_data), file.language))
230+
return annotated_data

0 commit comments

Comments
 (0)