Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/131426.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 131426
summary: Disallow remote enrich after lu join
area: ES|QL
type: bug
issues:
- 129372
101 changes: 101 additions & 0 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,104 @@ from *
author.keyword:keyword|book_no:keyword|scalerank:integer|street:keyword|bytes_in:ul|@timestamp:unsupported|abbrev:keyword|city_location:geo_point|distance:double|description:unsupported|birth_date:date|language_code:integer|intersects:boolean|client_ip:unsupported|event_duration:long|version:version|language_name:keyword
Fyodor Dostoevsky |1211 |null |null |null |null |null |null |null |null |null |null |null |null |null |null |null
;


statsAfterRemoteEnrich
required_capability: enrich_load

FROM sample_data
| KEEP message
| WHERE message IN ("Connected to 10.1.0.1", "Connected to 10.1.0.2")
| EVAL language_code = "1"
| ENRICH _remote:languages_policy ON language_code
| STATS messages = count_distinct(message) BY language_name
;

messages:long | language_name:keyword
2 | English
;


enrichAfterRemoteEnrich
required_capability: enrich_load

FROM sample_data
| KEEP message
| WHERE message IN ("Connected to 10.1.0.1")
| EVAL language_code = "1"
| ENRICH _remote:languages_policy ON language_code
| RENAME language_name AS first_language_name
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | English | English
;


coordinatorEnrichAfterRemoteEnrich
required_capability: enrich_load

FROM sample_data
| KEEP message
| WHERE message IN ("Connected to 10.1.0.1")
| EVAL language_code = "1"
| ENRICH _remote:languages_policy ON language_code
| RENAME language_name AS first_language_name
| ENRICH _coordinator:languages_policy ON language_code
;

message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | English | English
;


doubleRemoteEnrich
required_capability: enrich_load

FROM sample_data
| KEEP message
| WHERE message IN ("Connected to 10.1.0.1")
| EVAL language_code = "1"
| ENRICH _remote:languages_policy ON language_code
| RENAME language_name AS first_language_name
| ENRICH _remote:languages_policy ON language_code
;

message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | English | English
;


enrichAfterCoordinatorEnrich
required_capability: enrich_load

FROM sample_data
| KEEP message
| WHERE message IN ("Connected to 10.1.0.1")
| EVAL language_code = "1"
| ENRICH _coordinator:languages_policy ON language_code
| RENAME language_name AS first_language_name
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | English | English
;


doubleCoordinatorEnrich
required_capability: enrich_load

FROM sample_data
| KEEP message
| WHERE message IN ("Connected to 10.1.0.1")
| EVAL language_code = "1"
| ENRICH _coordinator:languages_policy ON language_code
| RENAME language_name AS first_language_name
| ENRICH _coordinator:languages_policy ON language_code
;

message:keyword | language_code:keyword | first_language_name:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | English | English
;
Original file line number Diff line number Diff line change
Expand Up @@ -4667,3 +4667,101 @@ FROM sample_data_ts_nanos
2023-10-23T12:27:28.948123456Z | 172.21.2.113 | 2764889 | Connected to 10.1.0.2
2023-10-23T12:15:03.360123456Z | 172.21.2.162 | 3450233 | Connected to 10.1.0.3
;

###############################################
# LOOKUP JOIN and ENRICH
###############################################

enrichAfterLookupJoin
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LOOKUP JOIN message_types_lookup ON message
| ENRICH languages_policy ON language_code
;

message:keyword | language_code:keyword | type:keyword | language_name:keyword
Connected to 10.1.0.1 | 1 | Success | English
;


lookupJoinAfterEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| ENRICH languages_policy ON language_code
| LOOKUP JOIN message_types_lookup ON message
;

message:keyword | language_code:keyword | language_name:keyword | type:keyword
Connected to 10.1.0.1 | 1 | English | Success
;


lookupJoinAfterRemoteEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| ENRICH _remote:languages_policy ON language_code
| LOOKUP JOIN message_types_lookup ON message
;

message:keyword | language_code:keyword | language_name:keyword | type:keyword
Connected to 10.1.0.1 | 1 | English | Success
;


lookupJoinAfterLimitAndRemoteEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| LIMIT 1
| ENRICH _remote:languages_policy ON language_code
| EVAL enrich_language_name = language_name, language_code = language_code::integer
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| KEEP message, enrich_language_name, language_name, country.keyword
| SORT language_name, country.keyword
;

message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
Connected to 10.1.0.1 | English | English | Canada
Connected to 10.1.0.1 | English | English | United States of America
Connected to 10.1.0.1 | English | English | null
Connected to 10.1.0.1 | English | null | United Kingdom
;


lookupJoinAfterTopNAndRemoteEnrich
required_capability: join_lookup_v12

FROM sample_data
| KEEP message
| WHERE message == "Connected to 10.1.0.1"
| EVAL language_code = "1"
| SORT message
| LIMIT 1
| ENRICH _remote:languages_policy ON language_code
| EVAL enrich_language_name = language_name, language_code = language_code::integer
| LOOKUP JOIN languages_lookup_non_unique_key ON language_code
| KEEP message, enrich_language_name, language_name, country.keyword
| SORT language_name, country.keyword
;

message:keyword | enrich_language_name:keyword | language_name:keyword | country.keyword:keyword
Connected to 10.1.0.1 | English | English | Canada
Connected to 10.1.0.1 | English | English | United States of America
Connected to 10.1.0.1 | English | English | null
Connected to 10.1.0.1 | English | null | United Kingdom
;
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.GeneratingPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -295,23 +296,43 @@ public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
* retaining the originating cluster and restructing pages for routing, which might be complicated.
*/
private static void checkRemoteEnrich(LogicalPlan plan, Failures failures) {
boolean[] agg = { false };
boolean[] enrichCoord = { false };
// First look for remote ENRICH, and then look at its children. Going over the whole plan once is trickier as remote ENRICHs can be
// in separate FORK branches which are valid by themselves.
plan.forEachUp(Enrich.class, enrich -> checkForPlansForbiddenBeforeRemoteEnrich(enrich, failures));
}

/**
* For a given remote {@link Enrich}, check if there are any forbidden plans upstream.
*/
private static void checkForPlansForbiddenBeforeRemoteEnrich(Enrich enrich, Failures failures) {
if (enrich.mode != Mode.REMOTE) {
return;
}

// TODO: shouldn't we also include FORK? Everything downstream from FORK should be coordinator-only.
// https://github.com/elastic/elasticsearch/issues/131445
boolean[] aggregate = { false };
boolean[] coordinatorOnlyEnrich = { false };
boolean[] lookupJoin = { false };

plan.forEachUp(UnaryPlan.class, u -> {
enrich.forEachUp(LogicalPlan.class, u -> {
if (u instanceof Aggregate) {
agg[0] = true;
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
enrichCoord[0] = true;
}
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
if (agg[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
if (enrichCoord[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}
aggregate[0] = true;
} else if (u instanceof Enrich upstreamEnrich && upstreamEnrich.mode() == Enrich.Mode.COORDINATOR) {
coordinatorOnlyEnrich[0] = true;
} else if (u instanceof LookupJoin) {
lookupJoin[0] = true;
}
});

if (aggregate[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
if (coordinatorOnlyEnrich[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy"));
}
if (lookupJoin[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LOOKUP JOIN"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private PhysicalPlan mapUnary(UnaryPlan unary) {
PhysicalPlan mappedChild = map(unary.child());

//
// TODO - this is hard to follow and needs reworking
// TODO - this is hard to follow, causes bugs and needs reworking
// https://github.com/elastic/elasticsearch/issues/115897
//
if (unary instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.MATCH_TYPE;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.RANGE_TYPE;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution;

public final class AnalyzerTestUtils {

Expand All @@ -61,45 +61,44 @@ public static Analyzer analyzer(IndexResolution indexResolution, Map<String, Ind
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
emptyInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), verifier);
}

public static Analyzer analyzer(IndexResolution indexResolution, Map<String, IndexResolution> lookupResolution, Verifier verifier) {
return analyzer(indexResolution, lookupResolution, defaultEnrichResolution(), verifier);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier
) {
return analyzer(indexResolution, lookupResolution, enrichResolution, verifier, TEST_CFG);
}

public static Analyzer analyzer(
IndexResolution indexResolution,
Map<String, IndexResolution> lookupResolution,
EnrichResolution enrichResolution,
Verifier verifier,
Configuration config
) {
return new Analyzer(
new AnalyzerContext(
EsqlTestUtils.TEST_CFG,
config,
new EsqlFunctionRegistry(),
indexResolution,
lookupResolution,
defaultEnrichResolution(),
enrichResolution,
defaultInferenceResolution()
),
verifier
);
}

public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, Configuration config) {
return new Analyzer(
new AnalyzerContext(
config,
new EsqlFunctionRegistry(),
indexResolution,
defaultLookupResolution(),
defaultEnrichResolution(),
defaultInferenceResolution()
),
verifier
);
return analyzer(indexResolution, defaultLookupResolution(), defaultEnrichResolution(), verifier, config);
}

public static Analyzer analyzer(Verifier verifier) {
Expand Down Expand Up @@ -213,6 +212,25 @@ public static void loadEnrichPolicyResolution(
);
}

public static void loadEnrichPolicyResolution(
EnrichResolution enrich,
Enrich.Mode mode,
String policyType,
String policy,
String field,
String index,
String mapping
) {
IndexResolution indexResolution = loadMapping(mapping, index);
List<String> enrichFields = new ArrayList<>(indexResolution.get().mapping().keySet());
enrichFields.remove(field);
enrich.addResolvedPolicy(
policy,
mode,
new ResolvedEnrichPolicy(field, policyType, enrichFields, Map.of("", index), indexResolution.get().mapping())
);
}

public static void loadEnrichPolicyResolution(EnrichResolution enrich, String policy, String field, String index, String mapping) {
loadEnrichPolicyResolution(enrich, EnrichPolicy.MATCH_TYPE, policy, field, index, mapping);
}
Expand Down
Loading