-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathschema.py
498 lines (433 loc) · 16.9 KB
/
schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
import inspect
from collections import OrderedDict, deque
from datetime import datetime
from decimal import Decimal
from django.conf import settings
from django.db import models
from django.db.models import FieldDoesNotExist, ManyToManyRel, ManyToOneRel
from django.db.models.fields.related import ForeignObjectRel
from django.utils.timezone import get_current_timezone
from .ast import Comparison, Const, List, Logical, Name, Node
from .compat import text_type
from .exceptions import DjangoQLSchemaError
class DjangoQLField(object):
"""
Abstract searchable field
"""
model = None
name = None
nullable = False
suggest_options = False
type = 'unknown'
value_types = []
value_types_description = ''
def __init__(self, model=None, name=None, nullable=None,
suggest_options=None, suggestions_limit=None):
if model is not None:
self.model = model
if name is not None:
self.name = name
if nullable is not None:
self.nullable = nullable
if suggest_options is not None:
self.suggest_options = suggest_options
if suggestions_limit is not None:
self.suggestions_limit = suggestions_limit
def _get_options_queryset(self):
return self.model.objects.order_by(self.name)
def as_dict(self):
return {
'type': self.type,
'nullable': self.nullable,
}
def _field_choices(self):
if self.model:
try:
return self.model._meta.get_field(self.name).choices
except FieldDoesNotExist:
pass
return []
def get_options(self):
"""
DEPRECATED: field value suggestions are now using get_sugestions() method
"""
choices = self._field_choices()
if choices:
return [c[1] for c in choices]
else:
return self._get_options_queryset().values_list(self.name, flat=True)
def get_sugestions(self, text):
"""
Override this method to provide custom suggestion options
"""
choices = self._field_choices()
if choices:
return [c[1] for c in choices]
kwargs = {'{}__icontains'.format(self.name): text}
return self._get_options_queryset()\
.filter(**kwargs)[:self.suggestions_limit]\
.values_list(self.name, flat=True)
def get_lookup_name(self):
"""
Override this method to provide custom lookup name
"""
return self.name
def get_lookup_value(self, value):
"""
Override this method to convert displayed values to lookup values
"""
choices = self._field_choices()
if choices:
if isinstance(value, list):
return [c[0] for c in choices if c[1] in value]
else:
for c in choices:
if c[1] == value:
return c[0]
return value
def get_operator(self, operator):
"""
Get a comparison suffix to be used in Django ORM & inversion flag for it
:param operator: string, DjangoQL comparison operator
:return: (suffix, invert) - a tuple with 2 values:
suffix - suffix to be used in ORM query, for example '__gt' for '>'
invert - boolean, True if this comparison needs to be inverted
"""
op = {
'=': '',
'>': '__gt',
'>=': '__gte',
'<': '__lt',
'<=': '__lte',
'~': '__icontains',
'in': '__in',
}.get(operator)
if op is not None:
return op, False
op = {
'!=': '',
'!~': '__icontains',
'not in': '__in',
}[operator]
return op, True
def get_lookup(self, path, operator, value):
"""
Performs a lookup for this field with given path, operator and value.
Override this if you'd like to implement a fully custom lookup. It
should support all comparison operators compatible with the field type.
:param path: a list of names preceding current lookup. For example,
if expression looks like 'author.groups.name = "Foo"' path would
be ['author', 'groups']. 'name' is not included, because it's the
current field instance itself.
:param operator: a string with comparison operator. It could be one of
the following: '=', '!=', '>', '>=', '<', '<=', '~', '!~', 'in',
'not in'. Depending on the field type, some operators may be
excluded. '~' and '!~' can be applied to StrField only and aren't
allowed for any other fields. BoolField can't be used with less or
greater operators, '>', '>=', '<' and '<=' are excluded for it.
:param value: value passed for comparison
:return: Q-object
"""
search = '__'.join(path + [self.get_lookup_name()])
op, invert = self.get_operator(operator)
q = models.Q(**{'%s%s' % (search, op): self.get_lookup_value(value)})
return ~q if invert else q
def validate(self, value):
if not self.nullable and value is None:
raise DjangoQLSchemaError(
'Field %s is not nullable, '
'can\'t compare it to None' % self.name
)
if value is not None and type(value) not in self.value_types:
if self.nullable:
msg = (
'Field "{field}" has "nullable {field_type}" type. '
'It can be compared to {possible_values} or None, '
'but not to {value}'
)
else:
msg = (
'Field "{field}" has "{field_type}" type. It can '
'be compared to {possible_values}, '
'but not to {value}'
)
raise DjangoQLSchemaError(msg.format(
field=self.name,
field_type=self.type,
possible_values=self.value_types_description,
value=repr(value),
))
class IntField(DjangoQLField):
type = 'int'
value_types = [int]
value_types_description = 'integer numbers'
class FloatField(DjangoQLField):
type = 'float'
value_types = [int, float, Decimal]
value_types_description = 'floating point numbers'
class StrField(DjangoQLField):
type = 'str'
value_types = [text_type]
value_types_description = 'strings'
class BoolField(DjangoQLField):
type = 'bool'
value_types = [bool]
value_types_description = 'True or False'
class DateField(DjangoQLField):
type = 'date'
value_types = [text_type]
value_types_description = 'dates in "YYYY-MM-DD" format'
def validate(self, value):
super(DateField, self).validate(value)
try:
self.get_lookup_value(value)
except ValueError:
raise DjangoQLSchemaError(
'Field "%s" can be compared to dates in '
'"YYYY-MM-DD" format, but not to %s' % (
self.name,
repr(value),
)
)
def get_lookup_value(self, value):
if not value:
return None
return datetime.strptime(value, '%Y-%m-%d').date()
class DateTimeField(DjangoQLField):
type = 'datetime'
value_types = [text_type]
value_types_description = 'timestamps in "YYYY-MM-DD HH:MM" format'
def validate(self, value):
super(DateTimeField, self).validate(value)
try:
self.get_lookup_value(value)
except ValueError:
raise DjangoQLSchemaError(
'Field "%s" can be compared to timestamps in '
'"YYYY-MM-DD HH:MM" format, but not to %s' % (
self.name,
repr(value),
)
)
def get_lookup_value(self, value):
if not value:
return None
mask = '%Y-%m-%d'
if len(value) > 10:
mask += ' %H:%M'
if len(value) > 16:
mask += ':%S'
dt = datetime.strptime(value, mask)
if settings.USE_TZ:
dt = dt.replace(tzinfo=get_current_timezone())
return dt
def get_lookup(self, path, operator, value):
search = '__'.join(path + [self.get_lookup_name()])
op, invert = self.get_operator(operator)
# Add LIKE operator support for datetime fields. For LIKE comparisons
# we don't want to convert source value to datetime instance, because
# it would effectively kill the idea. What we want is expressions like
# 'created ~ "2017-01-30'
# to be translated to
# 'created LIKE %2017-01-30%',
# but it would work only if we pass a string as a parameter. If we pass
# a datetime instance, it would add time part in a form of 00:00:00,
# and resulting comparison would look like
# 'created LIKE %2017-01-30 00:00:00%'
# which is not what we want for this case.
val = value if operator in ('~', '!~') else self.get_lookup_value(value)
q = models.Q(**{'%s%s' % (search, op): val})
return ~q if invert else q
class RelationField(DjangoQLField):
type = 'relation'
def __init__(self, model, name, related_model, nullable=False,
suggest_options=False, suggestions_limit=None):
super(RelationField, self).__init__(
model=model,
name=name,
nullable=nullable,
suggest_options=suggest_options,
suggestions_limit=suggestions_limit,
)
self.related_model = related_model
@property
def relation(self):
return DjangoQLSchema.model_label(self.related_model)
def as_dict(self):
dikt = super(RelationField, self).as_dict()
dikt['relation'] = self.relation
return dikt
class DjangoQLSchema(object):
include = () # models to include into introspection
exclude = () # models to exclude from introspection
suggest_options = None
suggestions_limit = 50
def __init__(self, model):
if not inspect.isclass(model) or not issubclass(model, models.Model):
raise DjangoQLSchemaError(
'Schema must be initialized with a subclass of Django model'
)
if self.include and self.exclude:
raise DjangoQLSchemaError(
'Either include or exclude can be specified, but not both'
)
if self.excluded(model):
raise DjangoQLSchemaError(
"%s can't be used with %s because it's excluded from it" % (
model,
self.__class__,
)
)
self.current_model = model
self._models = None
if self.suggest_options is None:
self.suggest_options = {}
def excluded(self, model):
return model in self.exclude or \
(self.include and model not in self.include)
@property
def models(self):
if not self._models:
self._models = self.introspect(
model=self.current_model,
exclude=tuple(self.model_label(m) for m in self.exclude),
)
return self._models
@classmethod
def model_label(self, model):
return text_type(model._meta)
def introspect(self, model, exclude=()):
"""
Start with given model and recursively walk through its relationships.
Returns a dict with all model labels and their fields found.
"""
result = {}
open_set = deque([model])
closed_set = list(exclude)
while open_set:
model = open_set.popleft()
model_label = self.model_label(model)
if model_label in closed_set:
continue
model_fields = OrderedDict()
for field in self.get_fields(model):
if not isinstance(field, DjangoQLField):
field = self.get_field_instance(model, field)
if not field:
continue
if isinstance(field, RelationField):
if field.relation not in closed_set:
model_fields[field.name] = field
open_set.append(field.related_model)
else:
model_fields[field.name] = field
result[model_label] = model_fields
closed_set.append(model_label)
return result
def get_fields(self, model):
"""
By default, returns all field names of a given model.
Override this method to limit field options. You can either return a
plain list of field names from it, like ['id', 'name'], or call
.super() and exclude unwanted fields from its result.
"""
return sorted(
[f.name for f in model._meta.get_fields() if f.name != 'password']
)
def get_field_instance(self, model, field_name):
field = model._meta.get_field(field_name)
field_kwargs = {'model': model, 'name': field.name}
if field.is_relation:
if not field.related_model:
# GenericForeignKey
return
if self.excluded(field.related_model):
return
field_cls = RelationField
field_kwargs['related_model'] = field.related_model
else:
field_cls = self.get_field_cls(field)
if isinstance(field, (ManyToOneRel, ManyToManyRel, ForeignObjectRel)):
# Django 1.8 doesn't have .null attribute for these fields
field_kwargs['nullable'] = True
else:
field_kwargs['nullable'] = field.null
field_kwargs['suggest_options'] = (
field.name in self.suggest_options.get(model, [])
)
field_kwargs['suggestions_limit'] = self.suggestions_limit
field_instance = field_cls(**field_kwargs)
# Check if suggested options conflict with field type
if field_cls != StrField and field_instance.suggest_options:
for option in field_instance.get_options():
if isinstance(option, text_type):
# Convert to StrField
field_instance = StrField(**field_kwargs)
return field_instance
def get_field_cls(self, field):
str_fields = (models.CharField, models.TextField, models.UUIDField)
if isinstance(field, str_fields):
return StrField
elif isinstance(field, (models.AutoField, models.IntegerField)):
return IntField
elif isinstance(field, (models.BooleanField, models.NullBooleanField)):
return BoolField
elif isinstance(field, (models.DecimalField, models.FloatField)):
return FloatField
elif isinstance(field, models.DateTimeField):
return DateTimeField
elif isinstance(field, models.DateField):
return DateField
return DjangoQLField
def as_dict(self):
models = {}
for model_label, fields in self.models.items():
models[model_label] = OrderedDict(
[(name, field.as_dict()) for name, field in fields.items()]
)
return {
'current_model': self.model_label(self.current_model),
'models': models,
}
def resolve_name(self, name):
assert isinstance(name, Name)
model = self.model_label(self.current_model)
field = None
for name_part in name.parts:
field = self.models[model].get(name_part)
if not field:
raise DjangoQLSchemaError(
'Unknown field: %s. Possible choices are: %s' % (
name_part,
', '.join(sorted(self.models[model].keys())),
)
)
if field.type == 'relation':
model = field.relation
field = None
return field
def validate(self, node):
"""
Validate DjangoQL AST tree vs. current schema
"""
assert isinstance(node, Node)
if isinstance(node.operator, Logical):
self.validate(node.left)
self.validate(node.right)
return
assert isinstance(node.left, Name)
assert isinstance(node.operator, Comparison)
assert isinstance(node.right, (Const, List))
# Check that field and value types are compatible
field = self.resolve_name(node.left)
value = node.right.value
if field is None:
if value is not None:
raise DjangoQLSchemaError(
'Related model %s can be compared to None only, but not to '
'%s' % (node.left.value, type(value).__name__)
)
else:
values = value if isinstance(node.right, List) else [value]
for v in values:
field.validate(v)