Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Naming and/or Implementation of various IDs within PZ #126

Open
mdr223 opened this issue Feb 13, 2025 · 0 comments
Open

Improve Naming and/or Implementation of various IDs within PZ #126

mdr223 opened this issue Feb 13, 2025 · 0 comments
Labels
enhancement New feature or request

Comments

@mdr223
Copy link
Collaborator

mdr223 commented Feb 13, 2025

From a review comment on #118:

Great question! The get_id_params(), get_op_params(), and universal_identifier() are not well documented, but they all play an important (and distinct) role in PZ. I'd like to take this opportunity to expand on their purpose(s) as they relate to the Optimizer, and maybe as a group we can come up with a better approach that is slightly less opaque.

At a very high-level, the Optimizer's job is to find the best physical plan for a user program. To do this, we follow a recipe that looks something like the following:

  1. Convert the user program to a (set of) logical plan(s)
  2. Convert those logical plan(s) into a set of physical plans
  3. Cost the physical plans and return the best one

While this sounds simple, there are a lot of implementation details that you need to get right. One such detail is determining whether two physical operators across two different physical plans are actually the same operator.

Why is this important? Currently in PZ, our cost model is built around sampled data. As the number of physical operators grows, it becomes expensive to sample every physical operator (let alone every physical plan) to get sample cost, runtime, and quality information. Thus, in an effort to be economical with our samples, we need to re-use the cost information gathered about a physical operator in one plan and apply it to that operator in other plans. (This of course rests on an assumption that operator performance is independent of its placement within a plan -- which is not always true -- but for now this is an assumption I am willing to make).

Consider the following four plans:

Plan 1: scan --> convert A1 --> filter B1
Plan 2: scan --> convert A2 --> filter B2
Plan 3: scan --> filter B1 --> convert A1
Plan 4: scan --> filter B2 --> convert A1

Let's say that we sample Plans 1 and 2 by processing a few inputs with each plan. Now, without having sampled Plans 3 and 4, we want to produce a cost estimate for them. Let's assume our cost model for Plans 3 and 4 is:

plan_dollar_cost = sum([op.cost for op in plan])
plan_runtime = sum([op.runtime for op in plan])
plan_quality = product([op.quality for op in plan])

(I realize that this^ cost model is not perfect, but let's just assume it is our cost model of choice for now.)

When we sample Plans 1 and 2, we compute the average cost, runtime, and quality of each operator and use this as our estimate for that operator. We store this information in a dictionary:

operator_to_stats = {
  "convert_A1_physical_op_id": {"cost": 1.0, "runtime": 2.0, "quality": 0.5, "selectivity": 1.0},
  "convert_A2_physical_op_id": {"cost": 1.5", "runtime": 4.0, "quality": 0.9, "selectivity": 1.0},
  "filter_B1_physical_op_id": {"cost": 0.75, "runtime": 0.5, "quality": 0.8, "selectivity": 0.3},
  "filter_B2_physical_op_id": {"cost": 2.75, "runtime": 0.7, "quality": 0.9, "selectivity": 0.4},
}

Now, to estimate Plan 3 using our cost model above, we simply need to compute:

plan3_dollar_cost = sum([operator_to_stats[op.get_op_id()]["cost"] for op in plan3])
plan3_runtime = sum([operator_to_stats[op.get_op_id()]["runtime"] for op in plan3])
plan3_quality = product([operator_to_stats[op.get_op_id()]["quality"] for op in plan3])

Here's the key issue: in order for operator_to_stats[op.get_op_id()] to lookup the correct entry in the dictionary, it must be the case that op.get_op_id() is:

  1. Independent of the operator's location in the plan
  2. Dependent on the setting of the operator's parameters

Criteria (1.) implies that op.get_op_id() cannot be a function of:

  1. the op_id of the parent operator
  2. the input_schema of the operator, and
  3. the output_schema of the operator, as sem_add_columns() now computes output_schema = self.schema.union(output_schema) (i.e. it also contains the input schema's fields in order to accurately reflect the schema of a DataRecord after being processed by that operator).

Criteria (2.) implies that we still need to specify which of an operator's parameters make that operator unique -- without depending upon location within the plan. For example, because the target_cache_id is a function of the parent operator's target_cache_id (which violates criteria (1.)) it cannot be included in the set of parameters that make the operator unique.

As a result of this^, the approach I settled on was to have each subclass of PhysicalOperator define the set of parameters make that operator unique, independent of location within the plan. These should not include things like input_schema, output_schema, and target_cache_id, but they should include things like model, filter, agg_func, etc. This is the purpose that get_id_params() serves. Its output is a dictionary containing these unique key-value pairs for each physical operator.

On the one hand, this approach places a burden on the PZ developer because it requires them to implement this dictionary for each new physical operator that they create. One might be tempted to instead enumerate the set of physical operator parameters which should NOT be included (e.g. input_schema, output_schema, target_cache_id, etc.) and compute an operator's get_id_params() as the set of all kwargs which are not in this disallowed list. However, with this^ approach we still need someone (the developer or PR reviewer) to check whether a parameter of the new physical operator needs to be added to the disallow list.

Personally, I opted for the get_id_params() approach (i.e. having the developer explicitly state the unique parameters), because I figured that -- at a minimum -- this would force a new PZ developer to stop and ask "Hey, what should this function return?". In the other approach, I think it's too easy for a new parameter to sneak into the set of unique params without first being added to a disallow list.

Finally, this begs the question:

  1. what does get_op_id() do?
  2. what does get_logical_op_id() (and get_logical_id_params()) do?
  3. what does get_op_params() / get_logical_op_params() do?
  4. and what does universal_identifier() / get_node_uid() do?

The answers are:

  1. get_op_id() calls get_id_params() and converts the output into a string
  2. These are the logical operator equivalents of get_op_id() and get_id_params()
  3. get_op_params() / get_logical_op_params() returns the full set of physical / logical operator parameters for when we need to make a copy of an operator. This should include parameters like input_schema, output_schema, etc.
  4. There are some cases where we do want a universally unique identifier for a Dataset / logical operator (e.g. when computing the target_cache_id, or when disambiguating between two identical field names in two different output schemas. universal_identifier() computes a hash which includes the parent's universal_identifier() (and all Dataset parameters), thus this can be used for both target_cache_id and field disambiguation.

I hope this^ provides more context on some of these opaque functions within PZ. I do not believe that I have found a perfect implementation, but with this context in mind, maybe we can all come up with a cleaner implementation of these stated goals.

FYI: @sivaprasadsudhir @mikecafarella @vitaglianog @Tranway1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant