-
Notifications
You must be signed in to change notification settings - Fork 141
SNOW-2203826: Loosen flattening rules for sort and filter #4026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| def _retrieve_aggregation_function_list(self) -> None: | ||
| """Retrieve the list of aggregation functions which will later be used in sql simplifier.""" | ||
| if ( | ||
| not context._is_snowpark_connect_compatible_mode | ||
| or context._aggregation_function_set | ||
| ): | ||
| return | ||
|
|
||
| retrieved_set = set() | ||
|
|
||
| for sql in [ | ||
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | ||
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | ||
| ]: | ||
| try: | ||
| retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()}) | ||
| except BaseException as e: | ||
| _logger.debug( | ||
| "Unable to get aggregation functions from the database: %s", | ||
| e, | ||
| ) | ||
| # we raise error here as a pessimistic tactics | ||
| # the reason is that if we fail to retrieve the aggregation function list, we have empty set | ||
| # the simplifier will flatten the query which contains aggregation functions leading to incorrect results | ||
| raise | ||
|
|
||
| with context._aggregation_function_set_lock: | ||
| context._aggregation_function_set.update(retrieved_set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race condition in _retrieve_aggregation_function_list(). The method checks if context._aggregation_function_set is populated at line 4935 without holding the lock, but only acquires the lock at line 4957 before updating. This creates a check-then-act race condition.
Issue: Multiple threads calling filter() concurrently could all see an empty set at line 4935, bypass the early return, and all execute the database queries simultaneously, leading to:
- Multiple redundant database queries
- Potential thread contention when updating the set
- Wasted resources
Fix:
def _retrieve_aggregation_function_list(self) -> None:
if not context._is_snowpark_connect_compatible_mode:
return
with context._aggregation_function_set_lock:
# Re-check inside the lock
if context._aggregation_function_set:
return
retrieved_set = set()
for sql in [...]:
# ... query logic ...
context._aggregation_function_set.update(retrieved_set)| def _retrieve_aggregation_function_list(self) -> None: | |
| """Retrieve the list of aggregation functions which will later be used in sql simplifier.""" | |
| if ( | |
| not context._is_snowpark_connect_compatible_mode | |
| or context._aggregation_function_set | |
| ): | |
| return | |
| retrieved_set = set() | |
| for sql in [ | |
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | |
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | |
| ]: | |
| try: | |
| retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()}) | |
| except BaseException as e: | |
| _logger.debug( | |
| "Unable to get aggregation functions from the database: %s", | |
| e, | |
| ) | |
| # we raise error here as a pessimistic tactics | |
| # the reason is that if we fail to retrieve the aggregation function list, we have empty set | |
| # the simplifier will flatten the query which contains aggregation functions leading to incorrect results | |
| raise | |
| with context._aggregation_function_set_lock: | |
| context._aggregation_function_set.update(retrieved_set) | |
| def _retrieve_aggregation_function_list(self) -> None: | |
| """Retrieve the list of aggregation functions which will later be used in sql simplifier.""" | |
| if not context._is_snowpark_connect_compatible_mode: | |
| return | |
| # First check without lock for performance | |
| if context._aggregation_function_set: | |
| return | |
| with context._aggregation_function_set_lock: | |
| # Re-check inside the lock | |
| if context._aggregation_function_set: | |
| return | |
| retrieved_set = set() | |
| for sql in [ | |
| """select function_name from information_schema.functions where is_aggregate = 'YES'""", | |
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | |
| ]: | |
| try: | |
| retrieved_set.update({r[0].lower() for r in self.sql(sql).collect()}) | |
| except BaseException as e: | |
| _logger.debug( | |
| "Unable to get aggregation functions from the database: %s", | |
| e, | |
| ) | |
| # we raise error here as a pessimistic tactics | |
| # the reason is that if we fail to retrieve the aggregation function list, we have empty set | |
| # the simplifier will flatten the query which contains aggregation functions leading to incorrect results | |
| raise | |
| context._aggregation_function_set.update(retrieved_set) | |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
Some old commits in the main branch were changed so #3941 can't be merged. This PR is a recreation of #3941 with the same code change.
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-2203826
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
Please write a short description of how your code change solves the related issue.