Skip to content

Adding a merge_type parameter to the ingest simulate API #132210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Aug 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/132210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 132210
summary: Adding a `merge_type` parameter to the ingest simulate API
area: Ingest Node
type: enhancement
issues:
- 131608
Original file line number Diff line number Diff line change
Expand Up @@ -1931,3 +1931,157 @@ setup:
- match: { docs.0.doc._index: "simple-data-stream1" }
- match: { docs.0.doc._source.bar: "baz" }
- match: { docs.0.doc.error.type: "document_parsing_exception" }

---
"Test ingest simulate with mapping addition on subobjects":

- skip:
features:
- headers
- allowed_warnings

- do:
indices.put_index_template:
name: subobject-template
body:
index_patterns: subobject-index*
template:
mappings:
properties:
a.b:
type: match_only_text

- do:
headers:
Content-Type: application/json
simulate.ingest:
body: >
{
"docs": [
{
"_index": "subobject-index-1",
"_id": "AZgsHA0B41JjTOmNiBKC",
"_source": {
"a.b": "some text"
}
}
],
"mapping_addition": {
"properties": {
"a.b": {
"type": "keyword"
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "subobject-index-1" }
- match: { docs.0.doc._source.a\.b: "some text" }
- match: { docs.0.doc.error.type: "mapper_parsing_exception" }

# Here we provide a mapping_substitution to the subobject, and make sure that it is applied rather than throwing an
# exception.
- do:
headers:
Content-Type: application/json
simulate.ingest:
merge_type: "template"
body: >
{
"docs": [
{
"_index": "subobject-index-1",
"_id": "AZgsHA0B41JjTOmNiBKC",
"_source": {
"a.b": "some text"
}
}
],
"mapping_addition": {
"properties": {
"a.b": {
"type": "keyword"
}
}
}
Comment on lines +2000 to +2006
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is it worth testing with mapping in index_template_substitutions or component_template_substitutions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it probably is.

}
- length: { docs: 1 }
- match: { docs.0.doc._index: "subobject-index-1" }
- match: { docs.0.doc._source.a\.b: "some text" }
- not_exists: docs.0.doc.error

# Now we run the same test but with index_template_substitutions rather than mapping_addition. In this case though,
# the mappings are _substituted_, not merged. That is, the original template and its mappings are completely replaced
# with the new one. So the merge_type has no impact.
- do:
headers:
Content-Type: application/json
simulate.ingest:
merge_type: "index"
body: >
{
"docs": [
{
"_index": "subobject-index-1",
"_id": "AZgsHA0B41JjTOmNiBKC",
"_source": {
"a.b": "some text"
}
}
],
"index_template_substitutions": {
"subobject-template": {
"index_patterns": ["subobject-index*"],
"template": {
"mappings": {
"properties": {
"a.b": {
"type": "keyword"
}
}
}
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "subobject-index-1" }
- match: { docs.0.doc._source.a\.b: "some text" }
- not_exists: docs.0.doc.error

# This makes sure that we get the same result for merge_type "template" for index_template_substitutions
- do:
headers:
Content-Type: application/json
simulate.ingest:
merge_type: "template"
body: >
{
"docs": [
{
"_index": "subobject-index-1",
"_id": "AZgsHA0B41JjTOmNiBKC",
"_source": {
"a.b": "some text"
}
}
],
"index_template_substitutions": {
"subobject-template": {
"index_patterns": ["subobject-index*"],
"template": {
"mappings": {
"properties": {
"a.b": {
"type": "keyword"
}
}
}
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "subobject-index-1" }
- match: { docs.0.doc._source.a\.b: "some text" }
- not_exists: docs.0.doc.error
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
"pipeline":{
"type":"string",
"description":"The pipeline id to preprocess incoming documents with if no pipeline is given for a particular document"
},
"merge_type":{
"type":"string",
"description":"The mapping merge type if mapping overrides are being provided in mapping_addition. The allowed values are one of index or template. The index option merges mappings the way they would be merged into an existing index. The template option merges mappings the way they would be merged into a template.",
"default": "index"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testMappingValidationIndexExists() {
}
""";
indicesAdmin().create(new CreateIndexRequest(indexName).mapping(mapping)).actionGet();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -163,7 +163,7 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
""", XContentType.JSON).id(randomUUID());
{
// First we use the original component template, and expect a failure in the second document:
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
Expand Down Expand Up @@ -197,7 +197,8 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
)
),
Map.of(),
Map.of()
Map.of(),
null
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
Expand Down Expand Up @@ -235,7 +236,8 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
indexTemplateName,
Map.of("index_patterns", List.of(indexName), "composed_of", List.of("test-component-template-2"))
),
Map.of()
Map.of(),
null
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
Expand All @@ -258,7 +260,8 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
Map.of(
"_doc",
Map.of("dynamic", "strict", "properties", Map.of("foo1", Map.of("type", "text"), "foo3", Map.of("type", "text")))
)
),
null
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
Expand All @@ -277,7 +280,7 @@ public void testMappingValidationIndexDoesNotExistsNoTemplate() {
* mapping-less "random-index-template" created by the parent class), so we expect no mapping validation failure.
*/
String indexName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -324,7 +327,7 @@ public void testMappingValidationIndexDoesNotExistsV2Template() throws IOExcepti
request.indexTemplate(composableIndexTemplate);

client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -356,7 +359,7 @@ public void testMappingValidationIndexDoesNotExistsV1Template() {
indicesAdmin().putTemplate(
new PutIndexTemplateRequest("test-template").patterns(List.of("my-index-*")).mapping("foo1", "type=integer")
).actionGet();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -410,7 +413,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
{
// First, try with no @timestamp to make sure we're picking up data-stream-specific templates
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand All @@ -437,7 +440,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
}
{
// Now with @timestamp
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"@timestamp": "2024-08-27",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ static TransportVersion def(int id) {
public static final TransportVersion TRANSPORT_NODE_USAGE_STATS_FOR_THREAD_POOLS_ACTION = def(9_135_0_00);
public static final TransportVersion INDEX_TEMPLATE_TRACKING_INFO = def(9_136_0_00);
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class SimulateBulkRequest extends BulkRequest {
private final Map<String, Map<String, Object>> componentTemplateSubstitutions;
private final Map<String, Map<String, Object>> indexTemplateSubstitutions;
private final Map<String, Object> mappingAddition;
private final String mappingMergeType;

/**
* @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with
Expand All @@ -118,7 +119,8 @@ public SimulateBulkRequest(
Map<String, Map<String, Object>> pipelineSubstitutions,
Map<String, Map<String, Object>> componentTemplateSubstitutions,
Map<String, Map<String, Object>> indexTemplateSubstitutions,
Map<String, Object> mappingAddition
Map<String, Object> mappingAddition,
String mappingMergeType
) {
super();
Objects.requireNonNull(pipelineSubstitutions);
Expand All @@ -129,6 +131,7 @@ public SimulateBulkRequest(
this.componentTemplateSubstitutions = componentTemplateSubstitutions;
this.indexTemplateSubstitutions = indexTemplateSubstitutions;
this.mappingAddition = mappingAddition;
this.mappingMergeType = mappingMergeType;
}

@SuppressWarnings("unchecked")
Expand All @@ -147,6 +150,11 @@ public SimulateBulkRequest(StreamInput in) throws IOException {
} else {
mappingAddition = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_MAPPING_MERGE_TYPE)) {
mappingMergeType = in.readOptionalString();
} else {
mappingMergeType = null;
}
}

@Override
Expand All @@ -160,6 +168,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
out.writeGenericValue(mappingAddition);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_MAPPING_MERGE_TYPE)) {
out.writeOptionalString(mappingMergeType);
}
}

public Map<String, Map<String, Object>> getPipelineSubstitutions() {
Expand Down Expand Up @@ -189,6 +200,10 @@ public Map<String, Object> getMappingAddition() {
return mappingAddition;
}

public String getMappingMergeType() {
return mappingMergeType;
}

private static ComponentTemplate convertRawTemplateToComponentTemplate(Map<String, Object> rawTemplate) {
ComponentTemplate componentTemplate;
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawTemplate)) {
Expand All @@ -215,7 +230,8 @@ public BulkRequest shallowClone() {
pipelineSubstitutions,
componentTemplateSubstitutions,
indexTemplateSubstitutions,
mappingAddition
mappingAddition,
mappingMergeType
);
bulkRequest.setRefreshPolicy(getRefreshPolicy());
bulkRequest.waitForActiveShards(waitForActiveShards());
Expand Down
Loading