Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package com.linkedin.datahub.graphql.types.mappers;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.TestUtils;
import com.linkedin.datahub.graphql.generated.MatchedField;
import com.linkedin.metadata.entity.validation.ValidationApiUtils;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.net.URISyntaxException;
import java.util.List;
import org.testng.annotations.BeforeTest;
Expand Down Expand Up @@ -44,9 +42,7 @@ public void testMatchedFieldValidation() throws URISyntaxException {
IllegalArgumentException.class,
() -> ValidationApiUtils.validateUrn(entityRegistry, invalidUrn));

QueryContext mockContext = mock(QueryContext.class);
when(mockContext.getOperationContext())
.thenReturn(TestOperationContexts.systemContextNoSearchAuthorization(entityRegistry));
QueryContext mockContext = TestUtils.getMockAllowContext();

List<MatchedField> actualMatched =
MapperUtils.getMatchedFieldEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,25 @@ public LoadIndicesIndexManager(
public List<ReindexConfig> discoverDataHubIndexConfigs() throws IOException {
List<ReindexConfig> configs = new ArrayList<>();

// Get entity indices using IndexConvention pattern
String entityPattern = indexConvention.getAllEntityIndicesPattern();
log.debug("Querying entity indices with pattern: {}", entityPattern);
GetIndexRequest entityRequest = new GetIndexRequest(entityPattern);
GetIndexResponse entityResponse = searchClient.getIndex(entityRequest, RequestOptions.DEFAULT);
String[] entityIndices = entityResponse.getIndices();

for (String indexName : entityIndices) {
try {
ReindexConfig config = indexBuilder.buildReindexState(indexName, Map.of(), Map.of());
configs.add(config);
log.debug("Added entity index config: {}", indexName);
} catch (IOException e) {
log.warn(
"Failed to build reindex config for entity index {}: {}", indexName, e.getMessage());
// Get entity indices using IndexConvention patterns
List<String> entityPatterns = indexConvention.getAllEntityIndicesPatterns();
log.debug("Querying entity indices with patterns: {}", entityPatterns);

for (String entityPattern : entityPatterns) {
GetIndexRequest entityRequest = new GetIndexRequest(entityPattern);
GetIndexResponse entityResponse =
searchClient.getIndex(entityRequest, RequestOptions.DEFAULT);
String[] entityIndices = entityResponse.getIndices();

for (String indexName : entityIndices) {
try {
ReindexConfig config = indexBuilder.buildReindexState(indexName, Map.of(), Map.of());
configs.add(config);
log.debug("Added entity index config: {}", indexName);
} catch (IOException e) {
log.warn(
"Failed to build reindex config for entity index {}: {}", indexName, e.getMessage());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
CronArgs args = createArgs(context);
try {
for (ElasticSearchIndexed service : services) {
service.tweakReplicasAll(structuredProperties, args.dryRun);
service.tweakReplicasAll(context.opContext(), structuredProperties, args.dryRun);
}
} catch (Exception e) {
log.error("TweakReplicasStep failed.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
try {

List<ReindexConfig> indexConfigs =
getAllReindexConfigs(services, structuredProperties).stream()
getAllReindexConfigs(context.opContext(), services, structuredProperties).stream()
.filter(ReindexConfig::requiresReindex)
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
final List<ReindexConfig> reindexConfigs =
getAllReindexConfigs(services, structuredProperties);
getAllReindexConfigs(context.opContext(), services, structuredProperties);

// Get indices to update
List<ReindexConfig> indexConfigs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
for (ElasticSearchIndexed service : services) {
service.reindexAll(structuredProperties);
service.reindexAll(context.opContext(), structuredProperties);
}
} catch (Exception e) {
log.error("BuildIndicesStep failed.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
// Each service enumerates and cleans up their own indices
IndexUtils.getAllReindexConfigs(indexedServices, structuredProperties)
IndexUtils.getAllReindexConfigs(context.opContext(), indexedServices, structuredProperties)
.forEach(
reindexConfig ->
ESIndexBuilder.cleanOrphanedIndices(searchClient, esConfig, reindexConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.upgrade.DataHubUpgradeState;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,7 +81,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
log.error("ReindexDebugStep failed: No ElasticSearchService found");
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}
setConfig(args.index);
setConfig(context.opContext(), args.index);
if (config == null) {
log.error("ReindexDebugStep failed: No matching config found for index: {}", args.index);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
Expand All @@ -99,10 +100,11 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
};
}

void setConfig(String targetIndex) throws IOException, IllegalAccessException {
void setConfig(OperationContext opContext, String targetIndex)
throws IOException, IllegalAccessException {
// datahubpolicyindex_v2 has some docs upon starting quickdebug...
// String targetIndex = "datahubpolicyindex_v2";
List<ReindexConfig> configs = service.buildReindexConfigs(structuredProperties);
List<ReindexConfig> configs = service.buildReindexConfigs(opContext, structuredProperties);
config = null; // Reset config to null
for (ReindexConfig cfg : configs) {
String cfgname = cfg.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.utils.elasticsearch.SearchClientShim;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -30,14 +31,16 @@ private IndexUtils() {}
private static List<ReindexConfig> _reindexConfigs = new ArrayList<>();

public static List<ReindexConfig> getAllReindexConfigs(
OperationContext opContext,
List<ElasticSearchIndexed> elasticSearchIndexedList,
Collection<Pair<Urn, StructuredPropertyDefinition>> structuredProperties)
throws IOException {
// Avoid locking & reprocessing
List<ReindexConfig> reindexConfigs = new ArrayList<>(_reindexConfigs);
if (reindexConfigs.isEmpty()) {
for (ElasticSearchIndexed elasticSearchIndexed : elasticSearchIndexedList) {
reindexConfigs.addAll(elasticSearchIndexed.buildReindexConfigs(structuredProperties));
reindexConfigs.addAll(
elasticSearchIndexed.buildReindexConfigs(opContext, structuredProperties));
}
_reindexConfigs = new ArrayList<>(reindexConfigs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,10 @@ public void testRestoreIndicesInit() {
public void testBuildIndicesInit() {
assertEquals("BuildIndices", buildIndices.id());
assertTrue(buildIndices.steps().size() >= 3);
assertNotNull(esIndexBuilder.getElasticSearchConfiguration());
assertNotNull(esIndexBuilder.getElasticSearchConfiguration().getBuildIndices());
assertTrue(esIndexBuilder.getElasticSearchConfiguration().getBuildIndices().isCloneIndices());
assertFalse(
esIndexBuilder.getElasticSearchConfiguration().getBuildIndices().isAllowDocCountMismatch());
assertNotNull(esIndexBuilder.getConfig());
assertNotNull(esIndexBuilder.getConfig().getBuildIndices());
assertTrue(esIndexBuilder.getConfig().getBuildIndices().isCloneIndices());
assertFalse(esIndexBuilder.getConfig().getBuildIndices().isAllowDocCountMismatch());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.UUID;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Bean;
Expand All @@ -28,6 +29,7 @@

@TestConfiguration
@Import(value = {SystemAuthenticationFactory.class})
@EnableConfigurationProperties
public class UpgradeCliApplicationTestConfiguration {

// TODO: We cannot remove the MockBean annotation here because with MockitoBean it is still trying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.metadata.utils.elasticsearch.SearchClientShim;
import com.linkedin.metadata.utils.elasticsearch.responses.GetIndexResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.RequestOptions;
Expand Down Expand Up @@ -69,7 +70,8 @@ public void testDiscoverDataHubIndexConfigs() throws IOException {
.thenReturn(mockSystemMetadataResponse);

// Mock index convention patterns
when(mockIndexConvention.getAllEntityIndicesPattern()).thenReturn("datahub_*index_v2");
when(mockIndexConvention.getAllEntityIndicesPatterns())
.thenReturn(List.of("datahub_*index_v2"));
when(mockIndexConvention.getIndexName(ElasticSearchGraphService.INDEX_NAME))
.thenReturn("datahub_graph_service_v1");
when(mockIndexConvention.getIndexName(ElasticSearchSystemMetadataService.INDEX_NAME))
Expand Down Expand Up @@ -137,6 +139,10 @@ public void testDiscoverDataHubIndexConfigs() throws IOException {

@Test
public void testDiscoverDataHubIndexConfigsWithIOException() throws IOException {
// Mock getAllEntityIndicesPatterns to return a pattern so the loop executes
when(mockIndexConvention.getAllEntityIndicesPatterns())
.thenReturn(List.of("datahub_*index_v2"));

// Mock IOException
when(mockSearchClient.getIndex(any(GetIndexRequest.class), any(RequestOptions.class)))
.thenThrow(new IOException("Connection failed"));
Expand Down Expand Up @@ -176,6 +182,10 @@ public void testOptimizeForBulkOperations() throws IOException {

@Test
public void testOptimizeForBulkOperationsWithIOException() throws IOException {
// Mock getAllEntityIndicesPatterns to return a pattern so the loop executes
when(mockIndexConvention.getAllEntityIndicesPatterns())
.thenReturn(List.of("datahub_*index_v2"));

// Mock IOException during discovery
when(mockSearchClient.getIndex(any(GetIndexRequest.class), any(RequestOptions.class)))
.thenThrow(new IOException("Discovery failed"));
Expand Down Expand Up @@ -273,7 +283,8 @@ public void testOptimizeForBulkOperationsManagesBothRefreshAndReplicas() throws
.thenReturn(mockSystemMetadataResponse);

// Mock index convention patterns
when(mockIndexConvention.getAllEntityIndicesPattern()).thenReturn("datahub_*index_v2");
when(mockIndexConvention.getAllEntityIndicesPatterns())
.thenReturn(List.of("datahub_*index_v2"));
when(mockIndexConvention.getIndexName(ElasticSearchGraphService.INDEX_NAME))
.thenReturn("datahub_graph_service_v1");
when(mockIndexConvention.getIndexName(ElasticSearchSystemMetadataService.INDEX_NAME))
Expand Down Expand Up @@ -327,7 +338,8 @@ public void testRestoreFromConfigurationWithPerIndexOverrides() throws IOExcepti
.thenReturn(mockSystemMetadataResponse);

// Mock index convention patterns
when(mockIndexConvention.getAllEntityIndicesPattern()).thenReturn("datahub_*index_v2");
when(mockIndexConvention.getAllEntityIndicesPatterns())
.thenReturn(List.of("datahub_*index_v2"));
when(mockIndexConvention.getIndexName(ElasticSearchGraphService.INDEX_NAME))
.thenReturn("datahub_graph_service_v1");
when(mockIndexConvention.getIndexName(ElasticSearchSystemMetadataService.INDEX_NAME))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ public void testExecutableSuccess() {

// Verify that tweakReplicasAll was called with the correct parameters
Mockito.verify(mockService)
.tweakReplicasAll(ArgumentMatchers.eq(structuredProperties), ArgumentMatchers.eq(true));
.tweakReplicasAll(
ArgumentMatchers.eq(mockOpContext),
ArgumentMatchers.eq(structuredProperties),
ArgumentMatchers.eq(true));
}

@Test
Expand All @@ -142,7 +145,10 @@ public void testExecutableWithException() {
Mockito.when(mockContext.opContext()).thenReturn(mockOpContext);
Mockito.doThrow(new RuntimeException("Test exception"))
.when(mockService)
.tweakReplicasAll(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
.tweakReplicasAll(
ArgumentMatchers.any(OperationContext.class),
ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean());

UpgradeStepResult result = tweakReplicasStep.executable().apply(mockContext);

Expand All @@ -169,8 +175,14 @@ public void testExecutableWithMultipleServices() {

// Verify that tweakReplicasAll was called on both services
Mockito.verify(mockService)
.tweakReplicasAll(ArgumentMatchers.eq(structuredProperties), ArgumentMatchers.eq(false));
.tweakReplicasAll(
ArgumentMatchers.eq(mockOpContext),
ArgumentMatchers.eq(structuredProperties),
ArgumentMatchers.eq(false));
Mockito.verify(mockService2)
.tweakReplicasAll(ArgumentMatchers.eq(structuredProperties), ArgumentMatchers.eq(false));
.tweakReplicasAll(
ArgumentMatchers.eq(mockOpContext),
ArgumentMatchers.eq(structuredProperties),
ArgumentMatchers.eq(false));
}
}
Loading
Loading