Skip to content

ES|QL: Fix Fork field reference tracking #131723

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9f24a18
Initial
svilen-mihaylov-elastic Jul 22, 2025
594d76a
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 22, 2025
69fff05
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 23, 2025
b64ec91
Update docs/changelog/131723.yaml
svilen-mihaylov-elastic Jul 24, 2025
666382c
Update docs/changelog/131723.yaml
svilen-mihaylov-elastic Jul 24, 2025
15480b6
Handle fork references correctly
svilen-mihaylov-elastic Jul 24, 2025
96951c7
Merge branch 'svilen/127208' of https://github.com/svilen-mihaylov-el…
svilen-mihaylov-elastic Jul 24, 2025
8a98b45
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 24, 2025
bb7082d
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 24, 2025
a20fb5b
Remove _fork
svilen-mihaylov-elastic Jul 24, 2025
b159bd7
[CI] Auto commit changes from spotless
Jul 24, 2025
3b7f546
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 25, 2025
f9f26d4
Fix merge
svilen-mihaylov-elastic Jul 25, 2025
917b273
Merge branch 'svilen/127208' of https://github.com/svilen-mihaylov-el…
svilen-mihaylov-elastic Jul 25, 2025
cb412e6
Return all fields
svilen-mihaylov-elastic Jul 25, 2025
c5564f0
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 28, 2025
7d67986
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 29, 2025
2479bfa
Fix
svilen-mihaylov-elastic Jul 29, 2025
3ebe919
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 30, 2025
482fabe
tweak
svilen-mihaylov-elastic Jul 30, 2025
300f169
Merge branch 'svilen/127208' of https://github.com/svilen-mihaylov-el…
svilen-mihaylov-elastic Jul 30, 2025
6378f76
Add tests
svilen-mihaylov-elastic Jul 30, 2025
f337b02
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 30, 2025
82f189d
Add test
svilen-mihaylov-elastic Jul 30, 2025
199392f
Add more tests
svilen-mihaylov-elastic Jul 31, 2025
b7a7854
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 31, 2025
5dd7462
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Jul 31, 2025
98aa887
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Aug 1, 2025
7bc4a87
Address feedback
svilen-mihaylov-elastic Aug 4, 2025
b017321
not
svilen-mihaylov-elastic Aug 4, 2025
d14bf24
Update x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/…
svilen-mihaylov-elastic Aug 6, 2025
2931d83
Update x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/…
svilen-mihaylov-elastic Aug 6, 2025
82672be
Update x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/…
svilen-mihaylov-elastic Aug 6, 2025
94da34b
Update
svilen-mihaylov-elastic Aug 6, 2025
9606c04
[CI] Auto commit changes from spotless
Aug 6, 2025
3f55419
Separate implementation
svilen-mihaylov-elastic Aug 7, 2025
00a885a
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Aug 8, 2025
4313222
Merge branch 'main' into svilen/127208
svilen-mihaylov-elastic Aug 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131723.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131723
summary: Tests for FORK's evaluation of field names used in `field_caps` resolve calls
area: Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.util.Holder;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -74,6 +76,33 @@ public void forEachDown(Consumer<? super T> action) {
}
}

/**
* Same as forEachDown, but can end the traverse early, by setting the boolean argument in the action.
*/
public boolean forEachDownMayReturnEarly(BiConsumer<? super T, Holder<Boolean>> action) {
var breakEarly = new Holder<>(false);
forEachDownMayReturnEarly(action, breakEarly);
return breakEarly.get();
}

@SuppressWarnings("unchecked")
void forEachDownMayReturnEarly(BiConsumer<? super T, Holder<Boolean>> action, Holder<Boolean> breakEarly) {
action.accept((T) this, breakEarly);
Copy link
Contributor Author

@svilen-mihaylov-elastic svilen-mihaylov-elastic Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quoting the previous comment from @astefan from here #131723 (comment) here since it became outdated.

Ok, I've looked in depth at the code change proposal and these below are my arguments for not approving this change as is now:

this is a harder to grasp code (the method in Node and the needed changes in resolveFieldNames) for >whoever is looking at the code as a first-timer with the goal to understand it. I've tried to refactor it to >see if there is a better structure that makes it more easily readable (isolating the true branch of the >forEachDownMayReturnEarly only to fork needs), but I couldn't find one.
the entire bulk of code here (which existed >before the need for forEachDownMayReturnEarly method) suddently needs to be aware of the early exit >from forEachDown even though it doesn't need to know this, only fork code must be aware of the early >exit logic. This is the main reason why a different exit logic, which is isolated to fork as much as possible, >is better. The logic in resolveFieldNames is complex enough and many other people who looked at it and >contributed to it mentioned the code is complex enough and code comments are essential. Let's aim to >make it less complex (with code comments for the next person looking at the code) or at least keep it at >the same complexity.

Copy link
Contributor Author

@svilen-mihaylov-elastic svilen-mihaylov-elastic Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your feedback. To my understanding you are unhappy with the API I originally had which had a lambda returning true/false to indicate early exit. As you pointed out, existing code needs to be aware of the correct default value (true) to return if it does not with to return the traverse early. This is fair criticism.

I've made a small tweak below. Essentially instead of returning true/false, I pass a boolean holder. This holder needs to be updated to true only if the lambda needs the traverse to end early, by default no action is needed. Thus only the fork code is aware of the need to exit early and the non-fork code stays as is.

I think this should satisfy the criteria you mentioned. I'd be happy to iterate further as needed.

if (breakEarly.get()) {
// Early return.
return;
}
// please do not refactor it to a for-each loop to avoid
// allocating iterator that performs concurrent modification checks and extra stack frames
for (int c = 0, size = children.size(); c < size; c++) {
children.get(c).forEachDownMayReturnEarly(action, breakEarly);
if (breakEarly.get()) {
// Early return.
return;
}
}
}

@SuppressWarnings("unchecked")
public <E extends T> void forEachDown(Class<E> typeToken, Consumer<? super E> action) {
forEachDown(t -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
Expand Down Expand Up @@ -76,11 +77,6 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
}

// TODO: Improve field resolution for FORK - right now we request all fields
if (parsed.anyMatch(p -> p instanceof Fork)) {
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
}

Holder<Boolean> projectAll = new Holder<>(false);
parsed.forEachExpressionDown(UnresolvedStar.class, us -> {// explicit "*" fields selection
if (projectAll.get()) {
Expand All @@ -93,7 +89,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
}

var referencesBuilder = AttributeSet.builder();
var referencesBuilder = new Holder<>(AttributeSet.builder());
// "keep" and "drop" attributes are special whenever a wildcard is used in their name, as the wildcard can cover some
// attributes ("lookup join" generated columns among others); steps like removal of Aliases should ignore fields matching the
// wildcards.
Expand All @@ -110,19 +106,49 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
// lookup indices where we request "*" because we may require all their fields
Set<String> wildcardJoinIndices = new java.util.HashSet<>();

boolean[] canRemoveAliases = new boolean[] { true };
var canRemoveAliases = new Holder<>(true);

var forEachDownProcessor = new Holder<BiConsumer<LogicalPlan, Holder<Boolean>>>();
forEachDownProcessor.set((LogicalPlan p, Holder<Boolean> breakEarly) -> {// go over each plan top-down
if (p instanceof Fork fork) {
// Early return from forEachDown. We will iterate over the children manually and end the recursion via forEachDown early.
var forkRefsResult = AttributeSet.builder();
forkRefsResult.addAll(referencesBuilder.get());

for (var forkBranch : fork.children()) {
referencesBuilder.set(AttributeSet.builder());
var isNestedFork = forkBranch.forEachDownMayReturnEarly(forEachDownProcessor.get());
// This assert is just for good measure. FORKs within FORKs is yet not supported.
assert isNestedFork == false : "Nested FORKs are not yet supported";
// This is a safety measure for fork where the list of fields returned is empty.
// It can be empty for a branch that does need all the fields. For example "fork (where true) (where a is not null)"
// but it can also be empty for queries where NO fields are needed from ES,
// for example "fork (eval x = 1 | keep x) (eval y = 1 | keep y)" but we cannot establish this yet.
if (referencesBuilder.get().isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the logic behind this check? Why would an empty referencesBuidler here would terminate early (and mark the query as needing * all fields)?

Copy link
Contributor Author

@svilen-mihaylov-elastic svilen-mihaylov-elastic Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below on line 232:

if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty())

This has the comment

 // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that's correct. The logic at line 232 is about queries like this from test | eval x = 123 | keep x. Meaning, the query actually doesn't need any fields from _field_caps. The logic for fork says completely the opposite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that's correct. The logic at line 232 is about queries like this from test | eval x = 123 | keep x. Meaning, the query actually doesn't need any fields from _field_caps. The logic for fork says completely the opposite.

I agree that the logic seems pretty confusing, but at the same time I believe the statement you make regarding the fields is only partially true. Consider the following query with match.

 FROM employees
            | FORK
               ( STATS x = count(*))
               ( WHERE emp_no == "2" )

Consider the "stats" stage which looks like this

Eval[[fork1[KEYWORD] AS _fork#3]]
\_Aggregate[[],[?count[*] AS x#2]]
  \_UnresolvedRelation[employees]

It has no referenced columns (p.references() is empty). Thus by the logic of the comment I mentioned above we should retain all columns. This information needs to be propagated back to the caller (the Fork). Thus if any of the Fork's children need all the fields, the we can break out early and just retain all fields.

I can try to make this a bit clear in the following way. The lambda can return a pair <bool, ColumnSet>. The bool can indicate if we need to retain all columns. How does this sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about the lambda here, but about the logic of the field name resolution for fork.


I have tested this query

FROM employees | fork (eval x = 1 | keep x) (eval y = 2 | keep y) (eval z = 3 | keep z)

and with the logic as projectAll.set(true); if referencesBuilder is empty we ask * from ES even though we don't need all the fields, actually we don't need any field except _index as I mentioned previously.
I've tested the same query with no projectAll.set(true) in the referencesBuilder empty branch and we will ask for _index only and the query returns the same columns and rows as the previous test.


Thank you for bringing up the stats query as well, that's a good example.
stats x = count(*) is similar with eval y = ... | keep y in that it "resets" the columns list to something that's not found in the ES indices (aka a FieldAttribute) and this branch of fork doesn't need anything from ES. Actually, running from employees | stats x = count(*) will not bypass the field resolution and shortcut it to * (all fields), but run into the _index branch. I don't see why fork should be have differently here; after all, fork runs a bunch of queries that are missing the from part.

Copy link
Contributor Author

@svilen-mihaylov-elastic svilen-mihaylov-elastic Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I have added the query you have in your comment as a test, and it returns *. I'm not following if there is another change your are proposing or you simply have a question/observation about the existing (pre-fork) code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. Actually, running from employees | stats x = count(*) will not bypass the field resolution and shortcut it > to * (all fields), but run into the _index branch. I don't see why fork should be have differently here; after > all, fork runs a bunch of queries that are missing the from part.

I'm not following this part. I'm not sure I understand how fork behaves differently here... If we end up with empty set for resolved fields, we will convert to _index, no? In that regard this behavior is the same as it should be, but maybe I'm missing something in your explanation.

projectAll.set(true);
// Return early, we'll be returning all references no matter what the remainder of the query is.
breakEarly.set(true);
return;
}
forkRefsResult.addAll(referencesBuilder.get());
}

forkRefsResult.removeIf(attr -> attr.name().equals(Fork.FORK_FIELD));
referencesBuilder.set(forkRefsResult);

parsed.forEachDown(p -> {// go over each plan top-down
if (p instanceof RegexExtract re) { // for Grok and Dissect
// Return early, we've already explored all fork branches.
breakEarly.set(true);
return;
} else if (p instanceof RegexExtract re) { // for Grok and Dissect
// keep the inputs needed by Grok/Dissect
referencesBuilder.addAll(re.input().references());
referencesBuilder.get().addAll(re.input().references());
} else if (p instanceof Enrich enrich) {
AttributeSet enrichFieldRefs = Expressions.references(enrich.enrichFields());
AttributeSet.Builder enrichRefs = enrichFieldRefs.combine(enrich.matchField().references()).asBuilder();
// Enrich adds an EmptyAttribute if no match field is specified
// The exact name of the field will be added later as part of enrichPolicyMatchFields Set
enrichRefs.removeIf(attr -> attr instanceof EmptyAttribute);
referencesBuilder.addAll(enrichRefs);
referencesBuilder.get().addAll(enrichRefs);
} else if (p instanceof LookupJoin join) {
if (join.config().type() instanceof JoinTypes.UsingJoinType usingJoinType) {
joinRefs.addAll(usingJoinType.columns());
Expand All @@ -135,15 +161,15 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
joinRefs.addAll(keepRefs);
}
} else {
referencesBuilder.addAll(p.references());
referencesBuilder.get().addAll(p.references());
if (p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES) {
// METRICS aggs generally rely on @timestamp without the user having to mention it.
referencesBuilder.add(new UnresolvedAttribute(ur.source(), MetadataAttribute.TIMESTAMP_FIELD));
referencesBuilder.get().add(new UnresolvedAttribute(ur.source(), MetadataAttribute.TIMESTAMP_FIELD));
}
// special handling for UnresolvedPattern (which is not an UnresolvedAttribute)
p.forEachExpression(UnresolvedNamePattern.class, up -> {
var ua = new UnresolvedAttribute(up.source(), up.name());
referencesBuilder.add(ua);
referencesBuilder.get().add(ua);
if (p instanceof Keep) {
keepRefs.add(ua);
} else if (p instanceof Drop) {
Expand All @@ -168,10 +194,10 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
//
// and ips_policy enriches the results with the same name ip field),
// these aliases should be kept in the list of fields.
if (canRemoveAliases[0] && p.anyMatch(FieldNameUtils::couldOverrideAliases)) {
canRemoveAliases[0] = false;
if (canRemoveAliases.get() && p.anyMatch(FieldNameUtils::couldOverrideAliases)) {
canRemoveAliases.set(false);
}
if (canRemoveAliases[0]) {
if (canRemoveAliases.get()) {
// remove any already discovered UnresolvedAttributes that are in fact aliases defined later down in the tree
// for example "from test | eval x = salary | stats max = max(x) by gender"
// remove the UnresolvedAttribute "x", since that is an Alias defined in "eval"
Expand All @@ -187,21 +213,25 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
if (fieldNames.contains(ne.name())) {
return;
}
referencesBuilder.removeIf(
attr -> matchByName(attr, ne.name(), keepRefs.contains(attr) || dropWildcardRefs.contains(attr))
);
referencesBuilder.get()
.removeIf(attr -> matchByName(attr, ne.name(), keepRefs.contains(attr) || dropWildcardRefs.contains(attr)));
});
}
});
parsed.forEachDownMayReturnEarly(forEachDownProcessor.get());

if (projectAll.get()) {
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
}

// Add JOIN ON column references afterward to avoid Alias removal
referencesBuilder.addAll(joinRefs);
referencesBuilder.get().addAll(joinRefs);
// If any JOIN commands need wildcard field-caps calls, persist the index names

// remove valid metadata attributes because they will be filtered out by the IndexResolver anyway
// otherwise, in some edge cases, we will fail to ask for "*" (all fields) instead
referencesBuilder.removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
Set<String> fieldNames = referencesBuilder.build().names();
referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
Set<String> fieldNames = referencesBuilder.get().build().names();

if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) {
// there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index
Expand Down
Loading