Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimizing batch sizing based on cost #1416

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion evadb/executor/storage_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
elif self.node.table.table_type == TableType.STRUCTURED_DATA:
return storage_engine.read(self.node.table, self.node.batch_mem_size)
elif self.node.table.table_type == TableType.NATIVE_DATA:
return storage_engine.read(self.node.table)
return storage_engine.read(self.node.table, self.node.batch_mem_size)
elif self.node.table.table_type == TableType.PDF_DATA:
return storage_engine.read(self.node.table)
else:
Expand Down
68 changes: 53 additions & 15 deletions evadb/optimizer/cost_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
from functools import singledispatch
from typing import List

from evadb.optimizer.group_expression import GroupExpression
from evadb.plan_nodes.abstract_plan import AbstractPlan
Expand All @@ -21,6 +23,16 @@
from evadb.plan_nodes.hash_join_probe_plan import HashJoinProbePlan
from evadb.plan_nodes.nested_loop_join_plan import NestedLoopJoinPlan
from evadb.plan_nodes.seq_scan_plan import SeqScanPlan
from evadb.plan_nodes.storage_plan import StoragePlan


@dataclasses.dataclass
class CostEntry:
plan_cost: float = 1.0
num_calls: float = 1.0

def __gt__(self, other):
return self.plan_cost * self.num_calls > other.plan_cost * other.num_calls


class CostModel:
Expand All @@ -31,35 +43,61 @@ class CostModel:
def __init__(self):
pass

def calculate_cost(self, gexpr: GroupExpression):
@property
def zero_cost(self) -> CostEntry:
return CostEntry(plan_cost=0)

def calculate_cost(
self, gexpr: GroupExpression, children: List[CostEntry]
) -> CostEntry:
"""
Return the cost of the group expression.
"""

@singledispatch
def cost(opr: AbstractPlan):
return 1.0
def cost(opr: AbstractPlan, children: List[CostEntry]):
if len(children) > 1:
return dataclasses.replace(children[0])
else:
return CostEntry()

@cost.register(NestedLoopJoinPlan)
def cost_nested_loop_join_build_plan(opr: NestedLoopJoinPlan):
return 1.0
def cost_nested_loop_join_build_plan(
opr: NestedLoopJoinPlan, children: List[CostEntry]
):
new_plan_cost = children[0].plan_cost + 1.0
return dataclasses.replace(children[0], plan_cost=new_plan_cost)

@cost.register(HashJoinBuildPlan)
def cost_hash_join_build_plan(opr: HashJoinBuildPlan):
return 1.0
def cost_hash_join_build_plan(
opr: HashJoinBuildPlan, children: List[CostEntry]
):
new_plan_cost = children[0].plan_cost + 1.0
return dataclasses.replace(children[0], plan_cost=new_plan_cost)

@cost.register(HashJoinProbePlan)
def cost_hash_join_probe_plan(opr: HashJoinProbePlan):
return 1.0
def cost_hash_join_probe_plan(
opr: HashJoinProbePlan, children: List[CostEntry]
):
new_plan_cost = children[0].plan_cost + 1.0
return dataclasses.replace(children[0], plan_cost=new_plan_cost)

@cost.register(SeqScanPlan)
def cost_seq_scan(opr: SeqScanPlan):
return 1.0
def cost_seq_scan(opr: SeqScanPlan, children: List[CostEntry]):
new_plan_cost = children[0].plan_cost + 1.0
return dataclasses.replace(children[0], plan_cost=new_plan_cost)

@cost.register(StoragePlan)
def cost_storage_plan(opr: StoragePlan, children: List[CostEntry]):
return CostEntry(
plan_cost=opr.batch_mem_size, num_calls=1.0 / opr.batch_mem_size
)

@cost.register(ApplyAndMergePlan)
def cost_apply_and_merge(opr: ApplyAndMergePlan):
def cost_apply_and_merge(opr: ApplyAndMergePlan, children: List[CostEntry]):
if opr.func_expr.has_cache():
return 0
return 1
return dataclasses.replace(children[0])
new_plan_cost = children[0].plan_cost + 1.0
return dataclasses.replace(children[0], plan_cost=new_plan_cost)

return cost(gexpr.opr)
return cost(gexpr.opr, children)
3 changes: 3 additions & 0 deletions evadb/optimizer/group_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ def __eq__(self, other: "GroupExpression"):
and self.children == other.children
)

def __repr__(self) -> str:
return self.__str__()

def __str__(self) -> str:
return "%s(%s)" % (
type(self).__name__,
Expand Down
10 changes: 6 additions & 4 deletions evadb/optimizer/optimizer_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,14 @@ def __init__(self, root_expr: GroupExpression, optimizer_context: OptimizerConte
super().__init__(optimizer_context, OptimizerTaskType.OPTIMIZE_INPUTS)

def execute(self):
cost = 0
child_cost = []
memo = self.optimizer_context.memo
grp = memo.get_group_by_id(self.root_expr.group_id)
for child_id in self.root_expr.children:
child_grp = memo.get_group_by_id(child_id)
if child_grp.get_best_expr(PropertyType.DEFAULT):
# Note: May never get hit when using EvaDB on Ray
cost += child_grp.get_best_expr_cost(PropertyType.DEFAULT)
child_cost.append(child_grp.get_best_expr_cost(PropertyType.DEFAULT))
else:
self.optimizer_context.task_stack.push(
OptimizeInputs(self.root_expr, self.optimizer_context)
Expand All @@ -304,8 +304,10 @@ def execute(self):
)
return

cost += self.optimizer_context.cost_model.calculate_cost(self.root_expr)
grp.add_expr_cost(self.root_expr, PropertyType.DEFAULT, cost)
root_cost = self.optimizer_context.cost_model.calculate_cost(
self.root_expr, child_cost
)
grp.add_expr_cost(self.root_expr, PropertyType.DEFAULT, root_cost)


# class ExploreGroup(OptimizerTask):
Expand Down
31 changes: 15 additions & 16 deletions evadb/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,25 +932,24 @@ def check(self, before: Operator, context: OptimizerContext):
return True

def apply(self, before: LogicalGet, context: OptimizerContext):
# Configure the batch_mem_size. It decides the number of rows
# read in a batch from storage engine.
# Todo: Experiment heuristics.
after = SeqScanPlan(None, before.target_list, before.alias)
batch_mem_size = context.db.catalog().get_configuration_catalog_value(
max_batch_mem_size = context.db.catalog().get_configuration_catalog_value(
"batch_mem_size"
)
after.append_child(
StoragePlan(
before.table_obj,
before.video,
predicate=before.predicate,
sampling_rate=before.sampling_rate,
sampling_type=before.sampling_type,
chunk_params=before.chunk_params,
batch_mem_size=batch_mem_size,
# sweep the min and max batch size.
for batch_mem_size in [1, max_batch_mem_size]:
after = SeqScanPlan(None, before.target_list, before.alias)
after.append_child(
StoragePlan(
before.table_obj,
before.video,
predicate=before.predicate,
sampling_rate=before.sampling_rate,
sampling_type=before.sampling_type,
chunk_params=before.chunk_params,
batch_mem_size=batch_mem_size,
)
)
)
yield after
yield after


class LogicalDerivedGetToPhysical(Rule):
Expand Down
2 changes: 1 addition & 1 deletion evadb/plan_nodes/vector_index_scan_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def search_query_expr(self):
def __str__(self):
return "VectorIndexScan(index_name={}, vector_store_type={}, limit_count={}, search_query_expr={})".format(
self._index.name,
self._index.vector_store_type,
self._index.type,
self._limit_count,
self._search_query_expr,
)
Expand Down