Skip to content

Commit 06e39c0

Browse files
authored
ESQL: Disallow remote enrich after lu join (#131426)
Fix #129372 Due to how remote ENRICH is [planned](https://github.com/elastic/elasticsearch/blob/32e50d0d94e27ee559d24bf9d5463ba6e64d1788/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java#L93), it interacts in special ways with pipeline breakers, in particular LIMIT and TopN; when these are encountered upstream from a remote ENRICH, these nodes are copied and executed a second time after the remote ENRICH. We'd like to allow remote ENRICH after LOOKUP JOIN, but that forces the lookup to be remote as well; this has its own interactions with pipeline breakers: in particular, LIMITs and TopNs cannot just be duplicated after LOOKUP JOIN, as LOOKUP JOIN may add new rows. For now, let's just forbid any usage of remote ENRICH after LOOKUP JOINs; remote ENRICH is mostly relevant for CCS, and LOOKUP JOIN doesn't support that in 9.1/8.19, anyway. There is separate work that enables remote LOOKUP JOINs on remote clusters and adds the correct validations; we can later build support for remote ENRICH + LOOKUP JOIN on top of that. (C.f. my comment [here](#129372 (comment)) and my draft #131286 for enabling this.)
1 parent 732bab0 commit 06e39c0

File tree

8 files changed

+407
-41
lines changed

8 files changed

+407
-41
lines changed

docs/changelog/131426.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 131426
2+
summary: Disallow remote enrich after lu join
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 129372

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,10 @@ public MultiClusterSpecIT(
126126
"NullifiedJoinKeyToPurgeTheJoin",
127127
"SortBeforeAndAfterJoin",
128128
"SortEvalBeforeLookup",
129-
"SortBeforeAndAfterMultipleJoinAndMvExpand"
129+
"SortBeforeAndAfterMultipleJoinAndMvExpand",
130+
"LookupJoinAfterTopNAndRemoteEnrich",
131+
// Lookup join after LIMIT is not supported in CCS yet
132+
"LookupJoinAfterLimitAndRemoteEnrich"
130133
);
131134

132135
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,3 +661,104 @@ from *
661661
author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
662662
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
663663
;
664+
665+
666+
statsAfterRemoteEnrich
667+
required_capability: enrich_load
668+
669+
FROM sample_data
670+
| KEEP message
671+
| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2")
672+
| EVAL language_code = "1"
673+
| ENRICH _remote:languages_policy ON language_code
674+
| STATS messages = count_distinct(message) BY language_name
675+
;
676+
677+
messages:long | language_name:keyword
678+
2 | English
679+
;
680+
681+
682+
enrichAfterRemoteEnrich
683+
required_capability: enrich_load
684+
685+
FROM sample_data
686+
| KEEP message
687+
| WHERE message IN ("Connected to 10.1.0.1")
688+
| EVAL language_code = "1"
689+
| ENRICH _remote:languages_policy ON language_code
690+
| RENAME language_name AS first_language_name
691+
| ENRICH languages_policy ON language_code
692+
;
693+
694+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
695+
Connected to 10.1.0.1 | 1 | English | English
696+
;
697+
698+
699+
coordinatorEnrichAfterRemoteEnrich
700+
required_capability: enrich_load
701+
702+
FROM sample_data
703+
| KEEP message
704+
| WHERE message IN ("Connected to 10.1.0.1")
705+
| EVAL language_code = "1"
706+
| ENRICH _remote:languages_policy ON language_code
707+
| RENAME language_name AS first_language_name
708+
| ENRICH _coordinator:languages_policy ON language_code
709+
;
710+
711+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
712+
Connected to 10.1.0.1 | 1 | English | English
713+
;
714+
715+
716+
doubleRemoteEnrich
717+
required_capability: enrich_load
718+
719+
FROM sample_data
720+
| KEEP message
721+
| WHERE message IN ("Connected to 10.1.0.1")
722+
| EVAL language_code = "1"
723+
| ENRICH _remote:languages_policy ON language_code
724+
| RENAME language_name AS first_language_name
725+
| ENRICH _remote:languages_policy ON language_code
726+
;
727+
728+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
729+
Connected to 10.1.0.1 | 1 | English | English
730+
;
731+
732+
733+
enrichAfterCoordinatorEnrich
734+
required_capability: enrich_load
735+
736+
FROM sample_data
737+
| KEEP message
738+
| WHERE message IN ("Connected to 10.1.0.1")
739+
| EVAL language_code = "1"
740+
| ENRICH _coordinator:languages_policy ON language_code
741+
| RENAME language_name AS first_language_name
742+
| ENRICH languages_policy ON language_code
743+
;
744+
745+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
746+
Connected to 10.1.0.1 | 1 | English | English
747+
;
748+
749+
750+
doubleCoordinatorEnrich
751+
required_capability: enrich_load
752+
753+
FROM sample_data
754+
| KEEP message
755+
| WHERE message IN ("Connected to 10.1.0.1")
756+
| EVAL language_code = "1"
757+
| ENRICH _coordinator:languages_policy ON language_code
758+
| RENAME language_name AS first_language_name
759+
| ENRICH _coordinator:languages_policy ON language_code
760+
;
761+
762+
message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
763+
Connected to 10.1.0.1 | 1 | English | English
764+
;

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4773,3 +4773,101 @@ FROM sample_data_ts_nanos
47734773
2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
47744774
2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
47754775
;
4776+
4777+
###############################################
4778+
# LOOKUP JOIN and ENRICH
4779+
###############################################
4780+
4781+
enrichAfterLookupJoin
4782+
required_capability: join_lookup_v12
4783+
4784+
FROM sample_data
4785+
| KEEP message
4786+
| WHERE message == "Connected to 10.1.0.1"
4787+
| EVAL language_code = "1"
4788+
| LOOKUP JOIN message_types_lookup ON message
4789+
| ENRICH languages_policy ON language_code
4790+
;
4791+
4792+
message:keyword | language_code:keyword | type:keyword | language_name:keyword
4793+
Connected to 10.1.0.1 | 1 | Success | English
4794+
;
4795+
4796+
4797+
lookupJoinAfterEnrich
4798+
required_capability: join_lookup_v12
4799+
4800+
FROM sample_data
4801+
| KEEP message
4802+
| WHERE message == "Connected to 10.1.0.1"
4803+
| EVAL language_code = "1"
4804+
| ENRICH languages_policy ON language_code
4805+
| LOOKUP JOIN message_types_lookup ON message
4806+
;
4807+
4808+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
4809+
Connected to 10.1.0.1 | 1 | English | Success
4810+
;
4811+
4812+
4813+
lookupJoinAfterRemoteEnrich
4814+
required_capability: join_lookup_v12
4815+
4816+
FROM sample_data
4817+
| KEEP message
4818+
| WHERE message == "Connected to 10.1.0.1"
4819+
| EVAL language_code = "1"
4820+
| ENRICH _remote:languages_policy ON language_code
4821+
| LOOKUP JOIN message_types_lookup ON message
4822+
;
4823+
4824+
message:keyword | language_code:keyword | language_name:keyword | type:keyword
4825+
Connected to 10.1.0.1 | 1 | English | Success
4826+
;
4827+
4828+
4829+
lookupJoinAfterLimitAndRemoteEnrich
4830+
required_capability: join_lookup_v12
4831+
4832+
FROM sample_data
4833+
| KEEP message
4834+
| WHERE message == "Connected to 10.1.0.1"
4835+
| EVAL language_code = "1"
4836+
| LIMIT 1
4837+
| ENRICH _remote:languages_policy ON language_code
4838+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
4839+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
4840+
| KEEP message, enrich_language_name, language_name, country.keyword
4841+
| SORT language_name, country.keyword
4842+
;
4843+
4844+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
4845+
Connected to 10.1.0.1 | English | English | Canada
4846+
Connected to 10.1.0.1 | English | English | United States of America
4847+
Connected to 10.1.0.1 | English | English | null
4848+
Connected to 10.1.0.1 | English | null | United Kingdom
4849+
;
4850+
4851+
4852+
lookupJoinAfterTopNAndRemoteEnrich
4853+
required_capability: join_lookup_v12
4854+
4855+
FROM sample_data
4856+
| KEEP message
4857+
| WHERE message == "Connected to 10.1.0.1"
4858+
| EVAL language_code = "1"
4859+
| SORT message
4860+
| LIMIT 1
4861+
| ENRICH _remote:languages_policy ON language_code
4862+
| EVAL enrich_language_name = language_name, language_code = language_code::integer
4863+
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
4864+
| KEEP message, enrich_language_name, language_name, country.keyword
4865+
| SORT language_name, country.keyword
4866+
;
4867+
4868+
message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
4869+
Connected to 10.1.0.1 | English | English | Canada
4870+
Connected to 10.1.0.1 | English | English | United States of America
4871+
Connected to 10.1.0.1 | English | English | null
4872+
Connected to 10.1.0.1 | English | null | United Kingdom
4873+
;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Enrich.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.xpack.esql.index.EsIndex;
3636
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
3737
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
38+
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
3839

3940
import java.io.IOException;
4041
import java.util.ArrayList;
@@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
295296
* retaining the originating cluster and restructing pages for routing, which might be complicated.
296297
*/
297298
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
298-
boolean[] agg = { false };
299-
boolean[] enrichCoord = { false };
299+
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
300+
// in separate FORK branches which are valid by themselves.
301+
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
302+
}
303+
304+
/**
305+
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
306+
*/
307+
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
308+
if (enrich.mode != Mode.REMOTE) {
309+
return;
310+
}
311+
312+
// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
313+
// https://github.com/elastic/elasticsearch/issues/131445
314+
boolean[] aggregate = { false };
315+
boolean[] coordinatorOnlyEnrich = { false };
316+
boolean[] lookupJoin = { false };
300317

301-
plan.forEachUp(UnaryPlan.class, u -> {
318+
enrich.forEachUp(LogicalPlan.class, u -> {
302319
if (u instanceof Aggregate) {
303-
agg[0] = true;
304-
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
305-
enrichCoord[0] = true;
306-
}
307-
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
308-
if (agg[0]) {
309-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
310-
}
311-
if (enrichCoord[0]) {
312-
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
313-
}
320+
aggregate[0] = true;
321+
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
322+
coordinatorOnlyEnrich[0] = true;
323+
} else if (u instanceof LookupJoin) {
324+
lookupJoin[0] = true;
314325
}
315326
});
327+
328+
if (aggregate[0]) {
329+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
330+
}
331+
if (coordinatorOnlyEnrich[0]) {
332+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
333+
}
334+
if (lookupJoin[0]) {
335+
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
336+
}
316337
}
317338
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
8787
PhysicalPlan mappedChild = map(unary.child());
8888

8989
//
90-
// TODO - this is hard to follow and needs reworking
90+
// TODO - this is hard to follow, causes bugs and needs reworking
9191
// https://github.com/elastic/elasticsearch/issues/115897
9292
//
9393
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@
3636
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
3737
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
3838
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
39+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
3940
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
4041
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
41-
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;
4242

4343
public final class AnalyzerTestUtils {
4444

@@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
6161
}
6262

6363
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
64-
return new Analyzer(
65-
new AnalyzerContext(
66-
EsqlTestUtils.TEST_CFG,
67-
new EsqlFunctionRegistry(),
68-
indexResolution,
69-
defaultLookupResolution(),
70-
defaultEnrichResolution(),
71-
emptyInferenceResolution()
72-
),
73-
verifier
74-
);
64+
return analyzer(indexResolution, defaultLookupResolution(), verifier);
7565
}
7666

7767
public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
68+
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
69+
}
70+
71+
public static Analyzer analyzer(
72+
IndexResolution indexResolution,
73+
Map<String, IndexResolution> lookupResolution,
74+
EnrichResolution enrichResolution,
75+
Verifier verifier
76+
) {
77+
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
78+
}
79+
80+
public static Analyzer analyzer(
81+
IndexResolution indexResolution,
82+
Map<String, IndexResolution> lookupResolution,
83+
EnrichResolution enrichResolution,
84+
Verifier verifier,
85+
Configuration config
86+
) {
7887
return new Analyzer(
7988
new AnalyzerContext(
80-
EsqlTestUtils.TEST_CFG,
89+
config,
8190
new EsqlFunctionRegistry(),
8291
indexResolution,
8392
lookupResolution,
84-
defaultEnrichResolution(),
93+
enrichResolution,
8594
defaultInferenceResolution()
8695
),
8796
verifier
8897
);
8998
}
9099

91100
public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
92-
return new Analyzer(
93-
new AnalyzerContext(
94-
config,
95-
new EsqlFunctionRegistry(),
96-
indexResolution,
97-
defaultLookupResolution(),
98-
defaultEnrichResolution(),
99-
defaultInferenceResolution()
100-
),
101-
verifier
102-
);
101+
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
103102
}
104103

105104
public static Analyzer analyzer(Verifier verifier) {

0 commit comments

Comments
 (0)