Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit a8ac5a3

Browse files
authored
Fixes force merge failing on long executions, changes some action mes… (#267)
* Fixes force merge failing on long executions, changes some action messaging, adds better try/catch on actions to deal with remote transport exceptions * Adds unit tests for failures in steps * Adds more tests * Updates Allocation step with message and execute changes * Updates Allocation test message * Updates jacoco source so the source files show up in the report * Adds tests for wait for snapshot step * Adds tests for attempt snapshot step and some missing license headers * Addressing comments on naming and spacing
1 parent 3a967aa commit a8ac5a3

File tree

46 files changed

+1570
-323
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1570
-323
lines changed

README.md

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,15 +64,6 @@ The project in this package uses the [Gradle](https://docs.gradle.org/current/us
6464

6565
However, to build the `index management` plugin project, we also use the Elastic build tools for Gradle. These tools are idiosyncratic and don't always follow the conventions and instructions for building regular Java code using Gradle. Not everything in `index management` will work the way it's described in the Gradle documentation. If you encounter such a situation, the Elastic build tools [source code](https://github.com/elastic/elasticsearch/tree/master/buildSrc/src/main/groovy/org/elasticsearch/gradle) is your best bet for figuring out what's going on.
6666

67-
This project currently uses the Notification subproject from the [Alerting plugin](https://github.com/opendistro-for-elasticsearch/alerting). There is an [open PR](https://github.com/opendistro-for-elasticsearch/alerting/pull/97) that introduces the maven publish task in Alerting for publishing the Notification jars. Until this PR is fully merged and jars published you will need to pull down the PR yourself and publish the jars to your local maven repository in order to build Index Management.
68-
69-
1. Visit the PR [here](https://github.com/opendistro-for-elasticsearch/alerting/pull/97) and pull down the Alerting plugin along with the PR changes
70-
2. You may need to cherry-pick the changes into a separate branch if you require a specific version to be published
71-
3. Build the Alerting plugin (w/ the changes in PR) and publish the artifacts to your local maven repository
72-
1. `./gradlew clean`
73-
2. `./gradlew build` or `./gradlew assemble` build will run the tests and build artifacts, assemble will only build the artifacts
74-
3. `./gradlew publishToMavenLocal` publishes artifacts to your local maven repository
75-
7667
### Building from the command line
7768

7869
1. `./gradlew build` builds and tests project.

build-tools/esplugin-coverage.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ integTest.runner {
5858
jacocoTestReport {
5959
dependsOn integTest, test
6060
executionData dummyTest.jacoco.destinationFile, dummyIntegTest.jacoco.destinationFile
61-
sourceDirectories.from = sourceSets.main.allSource
61+
sourceDirectories.from = "src/main/kotlin"
6262
classDirectories.from = sourceSets.main.output
6363
reports {
6464
html.enabled = true // human readable

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/ManagedIndexRunner.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
290290

291291
if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
292292
// Step null check is done in getStartingManagedIndexMetaData
293-
step.execute()
293+
step.preExecute(logger).execute().postExecute(logger)
294294
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)
295295

296296
if (executedManagedIndexMetaData.isFailed) {

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/elasticapi/ElasticExtensions.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
2424
import kotlinx.coroutines.delay
2525
import org.apache.logging.log4j.Logger
2626
import org.elasticsearch.ElasticsearchException
27+
import org.elasticsearch.ExceptionsHelper
2728
import org.elasticsearch.action.ActionListener
2829
import org.elasticsearch.action.bulk.BackoffPolicy
30+
import org.elasticsearch.action.support.DefaultShardOperationFailedException
2931
import org.elasticsearch.client.ElasticsearchClient
3032
import org.elasticsearch.cluster.metadata.IndexMetadata
3133
import org.elasticsearch.common.bytes.BytesReference
@@ -36,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser
3638
import org.elasticsearch.common.xcontent.XContentParserUtils
3739
import org.elasticsearch.common.xcontent.XContentType
3840
import org.elasticsearch.rest.RestStatus
41+
import org.elasticsearch.transport.RemoteTransportException
3942
import java.time.Instant
4043
import kotlin.coroutines.resume
4144
import kotlin.coroutines.resumeWithException
@@ -202,9 +205,7 @@ fun IndexMetadata.getRolloverAlias(): String? {
202205
fun IndexMetadata.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? {
203206
val index = this.index.name
204207
val uuid = this.index.uuid
205-
val policyID = this.getPolicyID()
206-
207-
if (policyID == null) return null
208+
val policyID = this.getPolicyID() ?: return null
208209

209210
return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID)
210211
}
@@ -217,3 +218,13 @@ fun IndexMetadata.getManagedIndexMetaData(): ManagedIndexMetaData? {
217218
}
218219
return null
219220
}
221+
222+
fun Throwable.findRemoteTransportException(): RemoteTransportException? {
223+
if (this is RemoteTransportException) return this
224+
return this.cause?.findRemoteTransportException()
225+
}
226+
227+
fun DefaultShardOperationFailedException.getUsefulCauseString(): String {
228+
val rte = this.cause?.findRemoteTransportException()
229+
return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString()
230+
}

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/coordinator/SweptManagedIndexConfig.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ data class SweptManagedIndexConfig(
4040
) {
4141

4242
companion object {
43+
@Suppress("ComplexMethod")
4344
@JvmStatic
4445
@Throws(IOException::class)
4546
fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig {

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/Step.kt

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step
1717

1818
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
1919
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
20+
import org.apache.logging.log4j.Logger
2021
import org.elasticsearch.common.io.stream.StreamInput
2122
import org.elasticsearch.common.io.stream.StreamOutput
2223
import org.elasticsearch.common.io.stream.Writeable
@@ -25,7 +26,17 @@ import java.util.Locale
2526

2627
abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) {
2728

28-
abstract suspend fun execute()
29+
fun preExecute(logger: Logger): Step {
30+
logger.info("Executing $name for ${managedIndexMetaData.index}")
31+
return this
32+
}
33+
34+
abstract suspend fun execute(): Step
35+
36+
fun postExecute(logger: Logger): Step {
37+
logger.info("Finished executing $name for ${managedIndexMetaData.index}")
38+
return this
39+
}
2940

3041
abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData
3142

@@ -44,9 +55,7 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
4455
*/
4556
abstract fun isIdempotent(): Boolean
4657

47-
fun getStartingStepMetaData(): StepMetaData {
48-
return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)
49-
}
58+
fun getStartingStepMetaData(): StepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)
5059

5160
fun getStepStartTime(): Instant {
5261
if (managedIndexMetaData.stepMetaData == null || managedIndexMetaData.stepMetaData.name != this.name) {
@@ -55,6 +64,8 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
5564
return Instant.ofEpochMilli(managedIndexMetaData.stepMetaData.startTime)
5665
}
5766

67+
protected val indexName: String = managedIndexMetaData.index
68+
5869
enum class StepStatus(val status: String) : Writeable {
5970
STARTING("starting"),
6071
CONDITION_NOT_MET("condition_not_met"),

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/allocation/AttemptAllocationStep.kt

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,17 @@ class AttemptAllocationStep(
3939

4040
override fun isIdempotent() = true
4141

42-
override suspend fun execute() {
42+
override suspend fun execute(): AttemptAllocationStep {
4343
try {
4444
val response: AcknowledgedResponse = client.admin()
4545
.indices()
4646
.suspendUntil { updateSettings(UpdateSettingsRequest(buildSettings(), managedIndexMetaData.index), it) }
4747
handleResponse(response)
4848
} catch (e: Exception) {
49-
logger.error(ERROR_MESSAGE, e)
50-
stepStatus = StepStatus.FAILED
51-
val mutableInfo = mutableMapOf("message" to ERROR_MESSAGE)
52-
val errorMessage = e.message
53-
if (errorMessage != null) mutableInfo["cause"] = errorMessage
54-
info = mutableInfo.toMap()
49+
handleException(e)
5550
}
51+
52+
return this
5653
}
5754

5855
private fun buildSettings(): Settings {
@@ -63,13 +60,23 @@ class AttemptAllocationStep(
6360
return builder.build()
6461
}
6562

63+
private fun handleException(e: Exception) {
64+
val message = getFailedMessage(indexName)
65+
logger.error(message, e)
66+
stepStatus = StepStatus.FAILED
67+
val mutableInfo = mutableMapOf("message" to message)
68+
val errorMessage = e.message
69+
if (errorMessage != null) mutableInfo["cause"] = errorMessage
70+
info = mutableInfo.toMap()
71+
}
72+
6673
private fun handleResponse(response: AcknowledgedResponse) {
6774
if (response.isAcknowledged) {
6875
stepStatus = StepStatus.COMPLETED
69-
info = mapOf("message" to "Updated settings with allocation.")
76+
info = mapOf("message" to getSuccessMessage(indexName))
7077
} else {
7178
stepStatus = StepStatus.FAILED
72-
info = mapOf("message" to ERROR_MESSAGE)
79+
info = mapOf("message" to getFailedMessage(indexName))
7380
}
7481
}
7582

@@ -82,7 +89,8 @@ class AttemptAllocationStep(
8289
}
8390

8491
companion object {
85-
private const val ERROR_MESSAGE = "Failed to update settings with allocation."
8692
private const val SETTINGS_PREFIX = "index.routing.allocation."
93+
fun getFailedMessage(index: String) = "Failed to update allocation setting [index=$index]"
94+
fun getSuccessMessage(index: String) = "Successfully updated allocation setting [index=$index]"
8795
}
8896
}

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/close/AttemptCloseStep.kt

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.C
2121
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
2222
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
2323
import org.apache.logging.log4j.LogManager
24+
import org.elasticsearch.ExceptionsHelper
2425
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
2526
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse
2627
import org.elasticsearch.client.Client
2728
import org.elasticsearch.cluster.service.ClusterService
2829
import org.elasticsearch.snapshots.SnapshotInProgressException
30+
import org.elasticsearch.transport.RemoteTransportException
2931

3032
class AttemptCloseStep(
3133
val clusterService: ClusterService,
@@ -41,34 +43,52 @@ class AttemptCloseStep(
4143
override fun isIdempotent() = true
4244

4345
@Suppress("TooGenericExceptionCaught")
44-
override suspend fun execute() {
45-
val index = managedIndexMetaData.index
46+
override suspend fun execute(): AttemptCloseStep {
4647
try {
47-
logger.info("Executing close on $index")
4848
val closeIndexRequest = CloseIndexRequest()
49-
.indices(index)
49+
.indices(indexName)
5050

5151
val response: CloseIndexResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
52-
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
5352
if (response.isAcknowledged) {
5453
stepStatus = StepStatus.COMPLETED
55-
info = mapOf("message" to "Successfully closed index")
54+
info = mapOf("message" to getSuccessMessage(indexName))
5655
} else {
56+
val message = getFailedMessage(indexName)
57+
logger.warn(message)
5758
stepStatus = StepStatus.FAILED
58-
info = mapOf("message" to "Failed to close index")
59+
info = mapOf("message" to message)
60+
}
61+
} catch (e: RemoteTransportException) {
62+
val cause = ExceptionsHelper.unwrapCause(e)
63+
if (cause is SnapshotInProgressException) {
64+
handleSnapshotException(cause)
65+
} else {
66+
handleException(cause as Exception)
5967
}
6068
} catch (e: SnapshotInProgressException) {
61-
logger.warn("Failed to close index [index=$index] with snapshot in progress")
62-
stepStatus = StepStatus.CONDITION_NOT_MET
63-
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
69+
handleSnapshotException(e)
6470
} catch (e: Exception) {
65-
logger.error("Failed to set index to close [index=$index]", e)
66-
stepStatus = StepStatus.FAILED
67-
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
68-
val errorMessage = e.message
69-
if (errorMessage != null) mutableInfo["cause"] = errorMessage
70-
info = mutableInfo.toMap()
71+
handleException(e)
7172
}
73+
74+
return this
75+
}
76+
77+
private fun handleSnapshotException(e: SnapshotInProgressException) {
78+
val message = getSnapshotMessage(indexName)
79+
logger.warn(message, e)
80+
stepStatus = StepStatus.CONDITION_NOT_MET
81+
info = mapOf("message" to message)
82+
}
83+
84+
private fun handleException(e: Exception) {
85+
val message = getFailedMessage(indexName)
86+
logger.error(message, e)
87+
stepStatus = StepStatus.FAILED
88+
val mutableInfo = mutableMapOf("message" to message)
89+
val errorMessage = e.message
90+
if (errorMessage != null) mutableInfo["cause"] = errorMessage
91+
info = mutableInfo.toMap()
7292
}
7393

7494
override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
@@ -78,4 +98,10 @@ class AttemptCloseStep(
7898
info = info
7999
)
80100
}
101+
102+
companion object {
103+
fun getFailedMessage(index: String) = "Failed to close index [index=$index]"
104+
fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]"
105+
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]"
106+
}
81107
}

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/delete/AttemptDeleteStep.kt

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.D
2121
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
2222
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
2323
import org.apache.logging.log4j.LogManager
24+
import org.elasticsearch.ExceptionsHelper
2425
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
2526
import org.elasticsearch.action.support.master.AcknowledgedResponse
2627
import org.elasticsearch.client.Client
2728
import org.elasticsearch.cluster.service.ClusterService
2829
import org.elasticsearch.snapshots.SnapshotInProgressException
30+
import org.elasticsearch.transport.RemoteTransportException
2931
import java.lang.Exception
3032

3133
class AttemptDeleteStep(
@@ -42,30 +44,51 @@ class AttemptDeleteStep(
4244
override fun isIdempotent() = true
4345

4446
@Suppress("TooGenericExceptionCaught")
45-
override suspend fun execute() {
47+
override suspend fun execute(): AttemptDeleteStep {
4648
try {
4749
val response: AcknowledgedResponse = client.admin().indices()
48-
.suspendUntil { delete(DeleteIndexRequest(managedIndexMetaData.index), it) }
50+
.suspendUntil { delete(DeleteIndexRequest(indexName), it) }
4951

5052
if (response.isAcknowledged) {
5153
stepStatus = StepStatus.COMPLETED
52-
info = mapOf("message" to "Deleted index")
54+
info = mapOf("message" to getSuccessMessage(indexName))
5355
} else {
56+
val message = getFailedMessage(indexName)
57+
logger.warn(message)
5458
stepStatus = StepStatus.FAILED
55-
info = mapOf("message" to "Failed to delete index")
59+
info = mapOf("message" to message)
60+
}
61+
} catch (e: RemoteTransportException) {
62+
val cause = ExceptionsHelper.unwrapCause(e)
63+
if (cause is SnapshotInProgressException) {
64+
handleSnapshotException(cause)
65+
} else {
66+
handleException(cause as Exception)
5667
}
5768
} catch (e: SnapshotInProgressException) {
58-
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
59-
stepStatus = StepStatus.CONDITION_NOT_MET
60-
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
69+
handleSnapshotException(e)
6170
} catch (e: Exception) {
62-
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
63-
stepStatus = StepStatus.FAILED
64-
val mutableInfo = mutableMapOf("message" to "Failed to delete index")
65-
val errorMessage = e.message
66-
if (errorMessage != null) mutableInfo["cause"] = errorMessage
67-
info = mutableInfo.toMap()
71+
handleException(e)
6872
}
73+
74+
return this
75+
}
76+
77+
private fun handleSnapshotException(e: SnapshotInProgressException) {
78+
val message = getSnapshotMessage(indexName)
79+
logger.warn(message, e)
80+
stepStatus = StepStatus.CONDITION_NOT_MET
81+
info = mapOf("message" to message)
82+
}
83+
84+
private fun handleException(e: Exception) {
85+
val message = getFailedMessage(indexName)
86+
logger.error(message, e)
87+
stepStatus = StepStatus.FAILED
88+
val mutableInfo = mutableMapOf("message" to message)
89+
val errorMessage = e.message
90+
if (errorMessage != null) mutableInfo["cause"] = errorMessage
91+
info = mutableInfo.toMap()
6992
}
7093

7194
override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
@@ -78,5 +101,8 @@ class AttemptDeleteStep(
78101

79102
companion object {
80103
const val name = "attempt_delete"
104+
fun getFailedMessage(index: String) = "Failed to delete index [index=$index]"
105+
fun getSuccessMessage(index: String) = "Successfully deleted index [index=$index]"
106+
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying deletion [index=$index]"
81107
}
82108
}

0 commit comments

Comments
 (0)