Skip to content
Merged
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ project.ext.externalDependency = [
'log4jApi': "org.apache.logging.log4j:log4j-api:$log4jVersion",
'log4j12Api': "org.slf4j:log4j-over-slf4j:$slf4jVersion",
'log4j2Api': "org.apache.logging.log4j:log4j-to-slf4j:$log4jVersion",
'lombok': 'org.projectlombok:lombok:1.18.30',
'lombok': 'org.projectlombok:lombok:1.18.42',
'mariadbConnector': 'org.mariadb.jdbc:mariadb-java-client:2.6.0',
'mavenArtifact': "org.apache.maven:maven-artifact:$mavenVersion",
'mixpanel': 'com.mixpanel:mixpanel-java:1.4.4',
Expand Down
79 changes: 79 additions & 0 deletions datahub-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {

implementation externalDependency.mustache
implementation externalDependency.javaxInject
implementation externalDependency.springActuator
implementation(externalDependency.hadoopClient) {
exclude group: 'net.minidev', module: 'json-smart'
exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt'
Expand Down Expand Up @@ -229,6 +230,84 @@ task runRestoreIndicesUrn(type: Exec) {
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100", "-a", "urnBasedPagination=true"
}

/**
* Runs LoadIndices on locally running system. This ensures indices have the correct mappings
* and then loads data from database to Elasticsearch.
*
* The process includes two steps:
* 1. BuildIndicesStep: Ensures indices have the correct mappings (creates/updates index structure)
* 2. LoadIndicesStep: Loads data from database to Elasticsearch
*
* The task automatically configures ES_BULK_REQUESTS_LIMIT to match the batch size
* and disables ES_BULK_FLUSH_PERIOD for optimal bulk processor performance.
*
* Optional parameters:
* - limit: Maximum number of records to process (default: no limit)
* - batchSize: Number of records per batch (default: 10000)
* - esThreadCount: Elasticsearch I/O thread count (default: 3, enables async bulk processing)
* - urnLike: URN pattern filter (e.g., "urn:li:dataset:%")
* - aspectNames: Comma-separated aspect names to filter
* - lePitEpochMs: Process records created before this timestamp
* - gePitEpochMs: Process records created after this timestamp
*
* Usage examples:
* ./gradlew runLoadIndices
* ./gradlew runLoadIndices -Plimit=5000
* ./gradlew runLoadIndices -Plimit=1000 -PbatchSize=2500
* ./gradlew runLoadIndices -PurnLike="urn:li:dataset:%"
* ./gradlew runLoadIndices -PesThreadCount=3
*/
task runLoadIndices(type: Exec) {
dependsOn bootJar
group = "Execution"
description = "Run the load indices process locally - ensures correct mappings and loads data from database to Elasticsearch."
environment "ENTITY_REGISTRY_CONFIG_PATH", "../metadata-models/src/main/resources/entity-registry.yml"

def args = ["java", "-agentlib:jdwp=transport=dt_socket,address=5003,server=y,suspend=n",
"-jar", "-Dserver.port=8083", bootJar.getArchiveFile().get(), "-u", "LoadIndices"]

// Add batchSize (default: 10000)
def batchSize = project.hasProperty('batchSize') ? project.getProperty('batchSize') : '10000'
args.addAll(["-a", "batchSize=${batchSize}"])

// Optimize bulk request limit for sustained load
environment "ES_BULK_REQUESTS_LIMIT", batchSize
// Set flush period to 5 minutes to allow proper batching
environment "ES_BULK_FLUSH_PERIOD", "300"

// Configure Elasticsearch I/O threads for better BulkProcessor performance
// Increase from default 2 to allow more concurrent HTTP connections
def threadCount = project.hasProperty('esThreadCount') ? project.getProperty('esThreadCount') : '4'
environment "ELASTICSEARCH_THREAD_COUNT", threadCount

// Add limit if specified
if (project.hasProperty('limit')) {
args.addAll(["-a", "limit=${project.getProperty('limit')}"])
}

// Add urnLike if specified
if (project.hasProperty('urnLike')) {
args.addAll(["-a", "urnLike=${project.getProperty('urnLike')}"])
}

// Add aspectNames if specified
if (project.hasProperty('aspectNames')) {
args.addAll(["-a", "aspectNames=${project.getProperty('aspectNames')}"])
}

// Add lePitEpochMs if specified
if (project.hasProperty('lePitEpochMs')) {
args.addAll(["-a", "lePitEpochMs=${project.getProperty('lePitEpochMs')}"])
}

// Add gePitEpochMs if specified
if (project.hasProperty('gePitEpochMs')) {
args.addAll(["-a", "gePitEpochMs=${project.getProperty('gePitEpochMs')}"])
}

commandLine args
}

docker {
dependsOn(bootJar)
name "${docker_registry}/${docker_repo}:${versionTag}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.loadindices.LoadIndices;
import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects;
import com.linkedin.datahub.upgrade.restorebackup.RestoreBackup;
import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
Expand All @@ -12,7 +13,6 @@
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -34,15 +34,19 @@ private static final class Args {

private final UpgradeManager _upgradeManager = new DefaultUpgradeManager();

@Inject
@Autowired(required = false)
@Named("loadIndices")
private LoadIndices loadIndices;

@Autowired(required = false)
@Named("restoreIndices")
private RestoreIndices restoreIndices;

@Inject
@Autowired(required = false)
@Named("restoreBackup")
private RestoreBackup restoreBackup;

@Inject
@Autowired(required = false)
@Named("removeUnknownAspects")
private RemoveUnknownAspects removeUnknownAspects;

Expand All @@ -66,29 +70,65 @@ private static final class Args {
@Named("systemUpdateCron")
private SystemUpdateCron systemUpdateCron;

@Autowired
@Autowired(required = false)
@Named("reindexDebug")
private ReindexDebug reindexDebug;

@Override
public void run(String... cmdLineArgs) {
_upgradeManager.register(restoreIndices);
_upgradeManager.register(restoreBackup);
_upgradeManager.register(removeUnknownAspects);
// Register upgrades with null checks and warnings
if (restoreIndices != null) {
_upgradeManager.register(restoreIndices);
} else {
log.warn("RestoreIndices upgrade not available - bean not found");
}

if (restoreBackup != null) {
_upgradeManager.register(restoreBackup);
} else {
log.warn("RestoreBackup upgrade not available - bean not found");
}

if (removeUnknownAspects != null) {
_upgradeManager.register(removeUnknownAspects);
} else {
log.warn("RemoveUnknownAspects upgrade not available - bean not found");
}

if (loadIndices != null) {
_upgradeManager.register(loadIndices);
} else {
log.warn("LoadIndices upgrade not available - bean not found");
}

if (systemUpdate != null) {
_upgradeManager.register(systemUpdate);
} else {
log.warn("SystemUpdate upgrade not available - bean not found");
}

if (systemUpdateBlocking != null) {
_upgradeManager.register(systemUpdateBlocking);
} else {
log.warn("SystemUpdateBlocking upgrade not available - bean not found");
}

if (systemUpdateNonBlocking != null) {
_upgradeManager.register(systemUpdateNonBlocking);
} else {
log.warn("SystemUpdateNonBlocking upgrade not available - bean not found");
}

if (systemUpdateCron != null) {
_upgradeManager.register(systemUpdateCron);
} else {
log.warn("SystemUpdateCron upgrade not available - bean not found");
}

if (reindexDebug != null) {
_upgradeManager.register(reindexDebug);
} else {
log.warn("ReindexDebug upgrade not available - bean not found");
}

final Args args = new Args();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,47 +1,13 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.gms.factory.auth.AuthorizerChainFactory;
import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory;
import com.linkedin.gms.factory.event.ExternalEventsServiceFactory;
import com.linkedin.gms.factory.event.KafkaConsumerPoolFactory;
import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
import com.linkedin.gms.factory.kafka.trace.KafkaTraceReaderFactory;
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
import com.linkedin.gms.factory.trace.TraceServiceFactory;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class})
@ComponentScan(
basePackages = {
"com.linkedin.gms.factory",
"com.linkedin.datahub.upgrade.config",
"com.linkedin.datahub.upgrade.system.cdc",
"com.linkedin.metadata.dao.producer"
},
excludeFilters = {
@ComponentScan.Filter(
type = FilterType.ASSIGNABLE_TYPE,
classes = {
ScheduledAnalyticsFactory.class,
AuthorizerChainFactory.class,
DataHubAuthorizerFactory.class,
SimpleKafkaConsumerFactory.class,
KafkaEventConsumerFactory.class,
GraphQLEngineFactory.class,
KafkaTraceReaderFactory.class,
TraceServiceFactory.class,
KafkaConsumerPoolFactory.class,
ExternalEventsServiceFactory.class
})
})
@Configuration
@Import(UpgradeConfigurationSelector.class)
public class UpgradeCliApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(UpgradeCliApplication.class, UpgradeCli.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.upgrade;

import com.linkedin.datahub.upgrade.conditions.GeneralUpgradeCondition;
import com.linkedin.datahub.upgrade.conditions.LoadIndicesCondition;
import com.linkedin.datahub.upgrade.config.GeneralUpgradeConfiguration;
import com.linkedin.datahub.upgrade.loadindices.LoadIndicesUpgradeConfig;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
* Configuration selector that chooses the appropriate upgrade configuration based on command-line
* arguments.
*/
@Configuration
public class UpgradeConfigurationSelector {

/** Configuration for LoadIndices upgrade - excludes Kafka components */
@Configuration
@Conditional(LoadIndicesCondition.class)
@Import(LoadIndicesUpgradeConfig.class)
public static class LoadIndicesConfiguration {}

/** Configuration for general upgrades - includes all components */
@Configuration
@Conditional(GeneralUpgradeCondition.class)
@Import(GeneralUpgradeConfiguration.class)
public static class GeneralConfiguration {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.linkedin.datahub.upgrade.conditions;

import java.util.Objects;
import java.util.Set;
import org.springframework.boot.ApplicationArguments;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class GeneralUpgradeCondition implements Condition {
public static final String LOAD_INDICES_ARG = "LoadIndices";
public static final Set<String> EXCLUDED_ARGS = Set.of(LOAD_INDICES_ARG);

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
// This condition matches when LoadIndices is NOT in the arguments
return !context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs().stream()
.filter(Objects::nonNull)
.anyMatch(EXCLUDED_ARGS::contains);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.datahub.upgrade.conditions;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.springframework.boot.ApplicationArguments;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;

public class LoadIndicesCondition implements Condition {
public static final String LOAD_INDICES_ARG = "LoadIndices";
public static final Set<String> LOAD_INDICES_ARGS = Set.of(LOAD_INDICES_ARG);

@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
List<String> nonOptionArgs =
context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs();
if (nonOptionArgs == null) {
return false;
}
return nonOptionArgs.stream().filter(Objects::nonNull).anyMatch(LOAD_INDICES_ARGS::contains);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.config;
package com.linkedin.datahub.upgrade.conditions;

import java.util.Objects;
import org.springframework.boot.ApplicationArguments;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.config;
package com.linkedin.datahub.upgrade.conditions;

import java.util.Objects;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.datahub.upgrade.config;
package com.linkedin.datahub.upgrade.conditions;

import java.util.Objects;
import org.springframework.boot.ApplicationArguments;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.conditions.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.lineage.BackfillDatasetLineageIndexFields;
import com.linkedin.metadata.entity.EntityService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.conditions.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.elasticsearch.BuildIndices;
import com.linkedin.gms.factory.config.ConfigurationProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.conditions.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.BlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.cdc.CDCSourceSetup;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.conditions.SystemUpdateCondition;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.elasticsearch.CleanIndices;
import com.linkedin.gms.factory.config.ConfigurationProvider;
Expand Down
Loading
Loading