Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include support for runtime fields #1611

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions elasticsearch_dsl/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ def __init__(self, **kwargs):
self._query_proxy = QueryProxy(self, "query")
self._post_filter_proxy = QueryProxy(self, "post_filter")

self._fields = []
self._runtime_mappings = {}

def filter(self, *args, **kwargs):
return self.query(Bool(filter=[Q(*args, **kwargs)]))

Expand Down Expand Up @@ -411,6 +414,9 @@ def _clone(self):
s._highlight_opts = self._highlight_opts.copy()
s._suggest = self._suggest.copy()
s._script_fields = self._script_fields.copy()
s._fields = self._fields
s._runtime_mappings = self._runtime_mappings.copy()

for x in ("query", "post_filter"):
getattr(s, x)._proxied = getattr(self, x)._proxied

Expand Down Expand Up @@ -459,6 +465,8 @@ def update_from_dict(self, d):
s.setdefault("text", text)
if "script_fields" in d:
self._script_fields = d.pop("script_fields")
if "runtime_mappings" in d:
self._runtime_mappings = d.pop("runtime_mappings")
self._extra.update(d)
return self

Expand Down Expand Up @@ -490,6 +498,48 @@ def script_fields(self, **kwargs):
s._script_fields.update(kwargs)
return s

def runtime_mappings(self, **kwargs):
"""
Define runtime fields to be calculated at query time. See
https://www.elastic.co/guide/en/elasticsearch/reference/current/runtime.html
for more details.

Runtime fields are automatically added to the query response.

Example::

s = Search()
s = s.runtime_mappings(
'client_ip': {
'type': 'ip',
'script': '''
String clientip=grok('%{COMMONAPACHELOG}').extract(doc["message"].value)?.clientip;
if (clientip != null) emit(clientip);
'''
}
)

"""
s = self._clone()
s._runtime_mappings.update(kwargs)
s.fields(*s._runtime_mappings.keys())
return s

def fields(self, *args):
"""
Runtime fields are not indexed or stored, so they will not appear in the _source block if you run a query, but
can easily be added to the response by adding the 'fields' clause to the body of the query.

Example::

s = Search()
s = s.fields("client_ip")

"""
s = self._clone()
s._fields.extend(args)
return s

def source(self, fields=None, **kwargs):
"""
Selectively control how the _source field is returned.
Expand Down Expand Up @@ -678,6 +728,12 @@ def to_dict(self, count=False, **kwargs):
if self._script_fields:
d["script_fields"] = self._script_fields

if self._runtime_mappings:
d["runtime_mappings"] = self._runtime_mappings

if self._fields:
d["fields"] = self._fields

d.update(recursive_to_dict(kwargs))
return d

Expand Down
34 changes: 34 additions & 0 deletions tests/test_integration/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.


from datetime import datetime

from elasticsearch import TransportError
from pytest import raises

Expand Down Expand Up @@ -167,3 +169,35 @@ def test_raw_subfield_can_be_used_in_aggs(data_client):
authors = r.aggregations.authors
assert 1 == len(authors)
assert {"key": "Honza Král", "doc_count": 52} == authors[0]


def test_runtime_field(data_client):
current_date = datetime.now()

s = Search(index="git").filter(Q("exists", field="committed_date"))
s = s.runtime_mappings(
days_since_commit={
"type": "long",
"script": {
"source": """
String currentDateStr = params.get('current_date');

DateTimeFormatter dtf = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
LocalDateTime committedDate = doc['committed_date'].value.toLocalDateTime();
LocalDateTime currentDate = LocalDateTime.parse(currentDateStr, dtf);

emit(Duration.between(committedDate, currentDate).toDays());
""",
"params": {
"current_date": current_date.replace(microsecond=0).isoformat()
},
},
}
)
response = s.execute()

for commit in response.hits:
assert "days_since_commit" in commit

committed_date = datetime.fromisoformat(commit.committed_date)
assert commit.days_since_commit[0] == (current_date - committed_date).days
13 changes: 13 additions & 0 deletions tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ def test_complex_example():
.filter(Q("term", category="meetup") | Q("term", category="conference"))
.post_filter("terms", tags=["prague", "czech"])
.script_fields(more_attendees="doc['attendees'].value + 42")
.runtime_mappings(
http={
"type": "composite",
"script": 'emit(grok("%{COMMONAPACHELOG}").extract(doc["message"].value))',
}
)
)

s.aggs.bucket("per_country", "terms", field="country").metric(
Expand Down Expand Up @@ -342,11 +348,18 @@ def test_complex_example():
"aggs": {"avg_attendees": {"avg": {"field": "attendees"}}},
}
},
"fields": ["http"],
"highlight": {
"order": "score",
"fields": {"title": {"fragment_size": 50}, "body": {"fragment_size": 50}},
},
"script_fields": {"more_attendees": {"script": "doc['attendees'].value + 42"}},
"runtime_mappings": {
"http": {
"type": "composite",
"script": 'emit(grok("%{COMMONAPACHELOG}").extract(doc["message"].value))',
}
},
} == s.to_dict()


Expand Down