Skip to content

Commit cccef0a

Browse files
committed
precommit
1 parent 3648364 commit cccef0a

File tree

4 files changed

+44
-60
lines changed

4 files changed

+44
-60
lines changed

server/src/main/java/org/elasticsearch/action/bulk/FailureStoreDocument.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public IndexRequest convert() throws IOException {
4343
// up that the parent data stream needs to be created.
4444
// One option is to make use of the eventual flag to perform an operation on the failure store. Ughh who would have thought the
4545
// dependencies would be swapped like that...
46-
return new IndexRequest()
47-
.index(targetIndexName)
46+
return new IndexRequest().index(targetIndexName)
4847
.source(createSource())
4948
.opType(DocWriteRequest.OpType.CREATE)
5049
.setWriteToFailureStore(true);

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -969,21 +969,14 @@ public boolean isForceExecution() {
969969
);
970970
}
971971

972-
static boolean shouldStoreFailure(
973-
String indexName,
974-
Metadata metadata,
975-
long epochMillis
976-
) {
977-
return DataStream.isFailureStoreEnabled() && resolveFailureStoreFromMetadata(indexName, metadata, epochMillis)
978-
.or(() -> resolveFailureStoreFromTemplate(indexName, metadata))
979-
.orElse(false);
972+
static boolean shouldStoreFailure(String indexName, Metadata metadata, long epochMillis) {
973+
return DataStream.isFailureStoreEnabled()
974+
&& resolveFailureStoreFromMetadata(indexName, metadata, epochMillis).or(
975+
() -> resolveFailureStoreFromTemplate(indexName, metadata)
976+
).orElse(false);
980977
}
981978

982-
private static Optional<Boolean> resolveFailureStoreFromMetadata(
983-
String indexName,
984-
Metadata metadata,
985-
long epochMillis
986-
) {
979+
private static Optional<Boolean> resolveFailureStoreFromMetadata(String indexName, Metadata metadata, long epochMillis) {
987980
if (indexName == null) {
988981
return Optional.empty();
989982
}
@@ -1009,10 +1002,7 @@ private static Optional<Boolean> resolveFailureStoreFromMetadata(
10091002
return Optional.of(targetDataStream != null && targetDataStream.isFailureStore());
10101003
}
10111004

1012-
private static Optional<Boolean> resolveFailureStoreFromTemplate(
1013-
String indexName,
1014-
Metadata metadata
1015-
) {
1005+
private static Optional<Boolean> resolveFailureStoreFromTemplate(String indexName, Metadata metadata) {
10161006
if (indexName == null) {
10171007
return Optional.empty();
10181008
}

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -670,12 +670,15 @@ void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, String pipelin
670670
}
671671

672672
private record IngestPipelinesExecutionResult(boolean success, boolean kept, Exception exception, String failedIndex) {}
673+
673674
private static IngestPipelinesExecutionResult successResult() {
674675
return new IngestPipelinesExecutionResult(true, true, null, null);
675676
}
677+
676678
private static IngestPipelinesExecutionResult discardResult() {
677679
return new IngestPipelinesExecutionResult(true, false, null, null);
678680
}
681+
679682
private static IngestPipelinesExecutionResult failAndStoreFor(String index, Exception e) {
680683
return new IngestPipelinesExecutionResult(false, true, e, index);
681684
}
@@ -726,43 +729,40 @@ protected void doRun() {
726729
final IngestDocument ingestDocument = newIngestDocument(indexRequest, documentParsingObserver);
727730
// the document listener gives us three-way logic: a document can fail processing (1), or it can
728731
// be successfully processed. a successfully processed document can be kept (2) or dropped (3).
729-
final ActionListener<IngestPipelinesExecutionResult> documentListener = ActionListener.runAfter(new ActionListener<>() {
730-
@Override
731-
public void onResponse(IngestPipelinesExecutionResult result) {
732-
assert result != null;
733-
if (result.success) {
734-
if (result.kept == false) {
735-
onDropped.accept(slot);
732+
final ActionListener<IngestPipelinesExecutionResult> documentListener = ActionListener.runAfter(
733+
new ActionListener<>() {
734+
@Override
735+
public void onResponse(IngestPipelinesExecutionResult result) {
736+
assert result != null;
737+
if (result.success) {
738+
if (result.kept == false) {
739+
onDropped.accept(slot);
740+
}
741+
} else {
742+
// We were given a failure result in the onResponse method, so we must store the failure
743+
// Recover the original document state, track a failed ingest, and pass it along
744+
updateIndexRequestMetadata(indexRequest, ingestDocument.getOriginalMetadata());
745+
totalMetrics.ingestFailed();
746+
onStoreFailure.apply(slot, result.failedIndex, result.exception);
736747
}
737-
} else {
738-
// We were given a failure result in the onResponse method, so we must store the failure
739-
// Recover the original document state, track a failed ingest, and pass it along
740-
updateIndexRequestMetadata(indexRequest, ingestDocument.getOriginalMetadata());
741-
totalMetrics.ingestFailed();
742-
onStoreFailure.apply(slot, result.failedIndex, result.exception);
743748
}
744-
}
745749

746-
@Override
747-
public void onFailure(Exception e) {
748-
totalMetrics.ingestFailed();
749-
onFailure.accept(slot, e);
750+
@Override
751+
public void onFailure(Exception e) {
752+
totalMetrics.ingestFailed();
753+
onFailure.accept(slot, e);
754+
}
755+
},
756+
() -> {
757+
// regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate
758+
// that we're finished with this document
759+
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
760+
totalMetrics.postIngest(ingestTimeInNanos);
761+
ref.close();
750762
}
751-
}, () -> {
752-
// regardless of success or failure, we always stop the ingest "stopwatch" and release the ref to indicate
753-
// that we're finished with this document
754-
final long ingestTimeInNanos = System.nanoTime() - startTimeInNanos;
755-
totalMetrics.postIngest(ingestTimeInNanos);
756-
ref.close();
757-
});
758-
759-
executePipelines(
760-
pipelines,
761-
indexRequest,
762-
ingestDocument,
763-
shouldStoreFailure,
764-
documentListener
765763
);
764+
765+
executePipelines(pipelines, indexRequest, ingestDocument, shouldStoreFailure, documentListener);
766766
indexRequest.setPipelinesHaveRun();
767767

768768
assert actionRequest.index() != null;

server/src/test/java/org/elasticsearch/action/bulk/FailureStoreDocumentTests.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,9 @@
2424
public class FailureStoreDocumentTests extends ESTestCase {
2525

2626
public void testFailureStoreDocumentConverstion() throws Exception {
27-
IndexRequest source = new IndexRequest("original_index")
28-
.routing("fake_routing")
27+
IndexRequest source = new IndexRequest("original_index").routing("fake_routing")
2928
.id("1")
30-
.source(JsonXContent.contentBuilder()
31-
.startObject()
32-
.field("key", "value")
33-
.endObject()
34-
);
29+
.source(JsonXContent.contentBuilder().startObject().field("key", "value").endObject());
3530

3631
// The exception will be wrapped for the test to make sure the converter correctly unwraps it
3732
Exception exception = new ElasticsearchException("Test exception please ignore");
@@ -64,8 +59,8 @@ public void testFailureStoreDocumentConverstion() throws Exception {
6459
assertThat(
6560
ObjectPath.eval("error.stack_trace", convertedRequest.sourceAsMap()),
6661
startsWith(
67-
"org.elasticsearch.ElasticsearchException: Test exception please ignore\n" +
68-
"\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion"
62+
"org.elasticsearch.ElasticsearchException: Test exception please ignore\n"
63+
+ "\tat org.elasticsearch.action.bulk.FailureStoreDocumentTests.testFailureStoreDocumentConverstion"
6964
)
7065
);
7166

0 commit comments

Comments
 (0)