Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: redis/redis-py
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v4.1.3
Choose a base ref
...
head repository: redis/redis-py
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 4.1
Choose a head ref
  • 1 commit
  • 7 files changed
  • 1 contributor

Commits on Feb 16, 2022

  1. Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    0ed0660 View commit details
Showing with 443 additions and 45 deletions.
  1. +2 −0 .github/workflows/integration.yaml
  2. +31 −21 redis/commands/graph/commands.py
  3. +208 −0 redis/commands/graph/execution_plan.py
  4. +35 −7 redis/commands/search/commands.py
  5. +1 −1 setup.py
  6. +112 −15 tests/test_graph.py
  7. +54 −1 tests/test_search.py
2 changes: 2 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
@@ -8,9 +8,11 @@ on:
- '**/*.md'
branches:
- master
- '[0-9].[0-9]'
pull_request:
branches:
- master
- '[0-9].[0-9]'

jobs:

52 changes: 31 additions & 21 deletions redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
from redis.exceptions import ResponseError

from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
from .query_result import QueryResult


@@ -118,27 +119,6 @@ def flush(self):
self.nodes = {}
self.edges = []

def explain(self, query, params=None):
"""
Get the execution plan for given query,
Returns an array of operations.
For more information see `GRAPH.EXPLAIN <https://oss.redis.com/redisgraph/master/commands/#graphexplain>`_. # noqa
Args:
query:
The query that will be executed.
params: dict
Query parameters.
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)

def bulk(self, **kwargs):
"""Internal only. Not supported."""
raise NotImplementedError(
@@ -200,3 +180,33 @@ def list_keys(self):
For more information see `GRAPH.LIST <https://oss.redis.com/redisgraph/master/commands/#graphlist>`_. # noqa
"""
return self.execute_command("GRAPH.LIST")

def execution_plan(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns an array of operations.
Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return "\n".join(plan)

def explain(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns ExecutionPlan object.
Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return ExecutionPlan(plan)
208 changes: 208 additions & 0 deletions redis/commands/graph/execution_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import re


class ProfileStats:
"""
ProfileStats, runtime execution statistics of operation.
"""

def __init__(self, records_produced, execution_time):
self.records_produced = records_produced
self.execution_time = execution_time


class Operation:
"""
Operation, single operation within execution plan.
"""

def __init__(self, name, args=None, profile_stats=None):
"""
Create a new operation.
Args:
name: string that represents the name of the operation
args: operation arguments
profile_stats: profile statistics
"""
self.name = name
self.args = args
self.profile_stats = profile_stats
self.children = []

def append_child(self, child):
if not isinstance(child, Operation) or self is child:
raise Exception("child must be Operation")

self.children.append(child)
return self

def child_count(self):
return len(self.children)

def __eq__(self, o: object) -> bool:
if not isinstance(o, Operation):
return False

return self.name == o.name and self.args == o.args

def __str__(self) -> str:
args_str = "" if self.args is None else " | " + self.args
return f"{self.name}{args_str}"


class ExecutionPlan:
"""
ExecutionPlan, collection of operations.
"""

def __init__(self, plan):
"""
Create a new execution plan.
Args:
plan: array of strings that represents the collection operations
the output from GRAPH.EXPLAIN
"""
if not isinstance(plan, list):
raise Exception("plan must be an array")

self.plan = plan
self.structured_plan = self._operation_tree()

def _compare_operations(self, root_a, root_b):
"""
Compare execution plan operation tree
Return: True if operation trees are equal, False otherwise
"""

# compare current root
if root_a != root_b:
return False

# make sure root have the same number of children
if root_a.child_count() != root_b.child_count():
return False

# recursively compare children
for i in range(root_a.child_count()):
if not self._compare_operations(root_a.children[i], root_b.children[i]):
return False

return True

def __str__(self) -> str:
def aggraget_str(str_children):
return "\n".join(
[
" " + line
for str_child in str_children
for line in str_child.splitlines()
]
)

def combine_str(x, y):
return f"{x}\n{y}"

return self._operation_traverse(
self.structured_plan, str, aggraget_str, combine_str
)

def __eq__(self, o: object) -> bool:
"""Compares two execution plans
Return: True if the two plans are equal False otherwise
"""
# make sure 'o' is an execution-plan
if not isinstance(o, ExecutionPlan):
return False

# get root for both plans
root_a = self.structured_plan
root_b = o.structured_plan

# compare execution trees
return self._compare_operations(root_a, root_b)

def _operation_traverse(self, op, op_f, aggregate_f, combine_f):
"""
Traverse operation tree recursively applying functions
Args:
op: operation to traverse
op_f: function applied for each operation
aggregate_f: aggregation function applied for all children of a single operation
combine_f: combine function applied for the operation result and the children result
""" # noqa
# apply op_f for each operation
op_res = op_f(op)
if len(op.children) == 0:
return op_res # no children return
else:
# apply _operation_traverse recursively
children = [
self._operation_traverse(child, op_f, aggregate_f, combine_f)
for child in op.children
]
# combine the operation result with the children aggregated result
return combine_f(op_res, aggregate_f(children))

def _operation_tree(self):
"""Build the operation tree from the string representation"""

# initial state
i = 0
level = 0
stack = []
current = None

def _create_operation(args):
profile_stats = None
name = args[0].strip()
args.pop(0)
if len(args) > 0 and "Records produced" in args[-1]:
records_produced = int(
re.search("Records produced: (\\d+)", args[-1]).group(1)
)
execution_time = float(
re.search("Execution time: (\\d+.\\d+) ms", args[-1]).group(1)
)
profile_stats = ProfileStats(records_produced, execution_time)
args.pop(-1)
return Operation(
name, None if len(args) == 0 else args[0].strip(), profile_stats
)

# iterate plan operations
while i < len(self.plan):
current_op = self.plan[i]
op_level = current_op.count(" ")
if op_level == level:
# if the operation level equal to the current level
# set the current operation and move next
child = _create_operation(current_op.split("|"))
if current:
current = stack.pop()
current.append_child(child)
current = child
i += 1
elif op_level == level + 1:
# if the operation is child of the current operation
# add it as child and set as current operation
child = _create_operation(current_op.split("|"))
current.append_child(child)
stack.append(current)
current = child
level += 1
i += 1
elif op_level < level:
# if the operation is not child of current operation
# go back to it's parent operation
levels_back = level - op_level + 1
for _ in range(levels_back):
current = stack.pop()
level -= levels_back
else:
raise Exception("corrupted plan")
return stack[0]
42 changes: 35 additions & 7 deletions redis/commands/search/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import time
from typing import Dict, Union

from ..helpers import parse_to_dict
from ._util import to_string
@@ -377,7 +378,17 @@ def info(self):
it = map(to_string, res)
return dict(zip(it, it))

def _mk_query_args(self, query):
def get_params_args(self, query_params: Dict[str, Union[str, int, float]]):
args = []
if len(query_params) > 0:
args.append("params")
args.append(len(query_params) * 2)
for key, value in query_params.items():
args.append(key)
args.append(value)
return args

def _mk_query_args(self, query, query_params: Dict[str, Union[str, int, float]]):
args = [self.index_name]

if isinstance(query, str):
@@ -387,9 +398,16 @@ def _mk_query_args(self, query):
raise ValueError(f"Bad query type {type(query)}")

args += query.get_args()
if query_params is not None:
args += self.get_params_args(query_params)

return args, query

def search(self, query):
def search(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Search the index for a given query, and return a result of documents
@@ -401,7 +419,7 @@ def search(self, query):
For more information: https://oss.redis.com/redisearch/Commands/#ftsearch
""" # noqa
args, query = self._mk_query_args(query)
args, query = self._mk_query_args(query, query_params=query_params)
st = time.time()
res = self.execute_command(SEARCH_CMD, *args)

@@ -413,18 +431,26 @@ def search(self, query):
with_scores=query._with_scores,
)

def explain(self, query):
def explain(
self,
query=Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""Returns the execution plan for a complex query.
For more information: https://oss.redis.com/redisearch/Commands/#ftexplain
""" # noqa
args, query_text = self._mk_query_args(query)
args, query_text = self._mk_query_args(query, query_params=query_params)
return self.execute_command(EXPLAIN_CMD, *args)

def explain_cli(self, query): # noqa
def explain_cli(self, query: Union[str, Query]): # noqa
raise NotImplementedError("EXPLAINCLI will not be implemented.")

def aggregate(self, query):
def aggregate(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Issue an aggregation query.
@@ -445,6 +471,8 @@ def aggregate(self, query):
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
if query_params is not None:
cmd += self.get_params_args(query_params)

raw = self.execute_command(*cmd)
return self._get_AggregateResult(raw, query, has_cursor)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
long_description_content_type="text/markdown",
keywords=["Redis", "key-value store", "database"],
license="MIT",
version="4.1.3",
version="4.1.4",
packages=find_packages(
include=[
"redis",
Loading