Skip to content

Commit 995488e

Browse files
committed
feat(loadIndices): loadIndices upgrade
1 parent eea4670 commit 995488e

File tree

78 files changed

+5014
-205
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+5014
-205
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ project.ext.externalDependency = [
213213
'log4jApi': "org.apache.logging.log4j:log4j-api:$log4jVersion",
214214
'log4j12Api': "org.slf4j:log4j-over-slf4j:$slf4jVersion",
215215
'log4j2Api': "org.apache.logging.log4j:log4j-to-slf4j:$log4jVersion",
216-
'lombok': 'org.projectlombok:lombok:1.18.30',
216+
'lombok': 'org.projectlombok:lombok:1.18.42',
217217
'mariadbConnector': 'org.mariadb.jdbc:mariadb-java-client:2.6.0',
218218
'mavenArtifact': "org.apache.maven:maven-artifact:$mavenVersion",
219219
'mixpanel': 'com.mixpanel:mixpanel-java:1.4.4',

datahub-upgrade/build.gradle

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222

2323
implementation externalDependency.mustache
2424
implementation externalDependency.javaxInject
25+
implementation externalDependency.springActuator
2526
implementation(externalDependency.hadoopClient) {
2627
exclude group: 'net.minidev', module: 'json-smart'
2728
exclude group: 'com.nimbusds', module: 'nimbus-jose-jwt'
@@ -229,6 +230,84 @@ task runRestoreIndicesUrn(type: Exec) {
229230
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100", "-a", "urnBasedPagination=true"
230231
}
231232

233+
/**
234+
* Runs LoadIndices on locally running system. This ensures indices have the correct mappings
235+
* and then loads data from database to Elasticsearch.
236+
*
237+
* The process includes two steps:
238+
* 1. BuildIndicesStep: Ensures indices have the correct mappings (creates/updates index structure)
239+
* 2. LoadIndicesStep: Loads data from database to Elasticsearch
240+
*
241+
* The task automatically configures ES_BULK_REQUESTS_LIMIT to match the batch size
242+
* and disables ES_BULK_FLUSH_PERIOD for optimal bulk processor performance.
243+
*
244+
* Optional parameters:
245+
* - limit: Maximum number of records to process (default: no limit)
246+
* - batchSize: Number of records per batch (default: 10000)
247+
* - esThreadCount: Elasticsearch I/O thread count (default: 3, enables async bulk processing)
248+
* - urnLike: URN pattern filter (e.g., "urn:li:dataset:%")
249+
* - aspectNames: Comma-separated aspect names to filter
250+
* - lePitEpochMs: Process records created before this timestamp
251+
* - gePitEpochMs: Process records created after this timestamp
252+
*
253+
* Usage examples:
254+
* ./gradlew runLoadIndices
255+
* ./gradlew runLoadIndices -Plimit=5000
256+
* ./gradlew runLoadIndices -Plimit=1000 -PbatchSize=2500
257+
* ./gradlew runLoadIndices -PurnLike="urn:li:dataset:%"
258+
* ./gradlew runLoadIndices -PesThreadCount=3
259+
*/
260+
task runLoadIndices(type: Exec) {
261+
dependsOn bootJar
262+
group = "Execution"
263+
description = "Run the load indices process locally - ensures correct mappings and loads data from database to Elasticsearch."
264+
environment "ENTITY_REGISTRY_CONFIG_PATH", "../metadata-models/src/main/resources/entity-registry.yml"
265+
266+
def args = ["java", "-agentlib:jdwp=transport=dt_socket,address=5003,server=y,suspend=n",
267+
"-jar", "-Dserver.port=8083", bootJar.getArchiveFile().get(), "-u", "LoadIndices"]
268+
269+
// Add batchSize (default: 10000)
270+
def batchSize = project.hasProperty('batchSize') ? project.getProperty('batchSize') : '10000'
271+
args.addAll(["-a", "batchSize=${batchSize}"])
272+
273+
// Optimize bulk request limit for sustained load
274+
environment "ES_BULK_REQUESTS_LIMIT", batchSize
275+
// Set flush period to 5 minutes to allow proper batching
276+
environment "ES_BULK_FLUSH_PERIOD", "300"
277+
278+
// Configure Elasticsearch I/O threads for better BulkProcessor performance
279+
// Increase from default 2 to allow more concurrent HTTP connections
280+
def threadCount = project.hasProperty('esThreadCount') ? project.getProperty('esThreadCount') : '4'
281+
environment "ELASTICSEARCH_THREAD_COUNT", threadCount
282+
283+
// Add limit if specified
284+
if (project.hasProperty('limit')) {
285+
args.addAll(["-a", "limit=${project.getProperty('limit')}"])
286+
}
287+
288+
// Add urnLike if specified
289+
if (project.hasProperty('urnLike')) {
290+
args.addAll(["-a", "urnLike=${project.getProperty('urnLike')}"])
291+
}
292+
293+
// Add aspectNames if specified
294+
if (project.hasProperty('aspectNames')) {
295+
args.addAll(["-a", "aspectNames=${project.getProperty('aspectNames')}"])
296+
}
297+
298+
// Add lePitEpochMs if specified
299+
if (project.hasProperty('lePitEpochMs')) {
300+
args.addAll(["-a", "lePitEpochMs=${project.getProperty('lePitEpochMs')}"])
301+
}
302+
303+
// Add gePitEpochMs if specified
304+
if (project.hasProperty('gePitEpochMs')) {
305+
args.addAll(["-a", "gePitEpochMs=${project.getProperty('gePitEpochMs')}"])
306+
}
307+
308+
commandLine args
309+
}
310+
232311
docker {
233312
dependsOn(bootJar)
234313
name "${docker_registry}/${docker_repo}:${versionTag}"

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCli.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.linkedin.datahub.upgrade;
22

33
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
4+
import com.linkedin.datahub.upgrade.loadindices.LoadIndices;
45
import com.linkedin.datahub.upgrade.removeunknownaspects.RemoveUnknownAspects;
56
import com.linkedin.datahub.upgrade.restorebackup.RestoreBackup;
67
import com.linkedin.datahub.upgrade.restoreindices.RestoreIndices;
@@ -12,7 +13,6 @@
1213
import com.linkedin.upgrade.DataHubUpgradeState;
1314
import io.datahubproject.metadata.context.OperationContext;
1415
import java.util.List;
15-
import javax.inject.Inject;
1616
import javax.inject.Named;
1717
import lombok.extern.slf4j.Slf4j;
1818
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,15 +34,19 @@ private static final class Args {
3434

3535
private final UpgradeManager _upgradeManager = new DefaultUpgradeManager();
3636

37-
@Inject
37+
@Autowired(required = false)
38+
@Named("loadIndices")
39+
private LoadIndices loadIndices;
40+
41+
@Autowired(required = false)
3842
@Named("restoreIndices")
3943
private RestoreIndices restoreIndices;
4044

41-
@Inject
45+
@Autowired(required = false)
4246
@Named("restoreBackup")
4347
private RestoreBackup restoreBackup;
4448

45-
@Inject
49+
@Autowired(required = false)
4650
@Named("removeUnknownAspects")
4751
private RemoveUnknownAspects removeUnknownAspects;
4852

@@ -66,29 +70,65 @@ private static final class Args {
6670
@Named("systemUpdateCron")
6771
private SystemUpdateCron systemUpdateCron;
6872

69-
@Autowired
73+
@Autowired(required = false)
7074
@Named("reindexDebug")
7175
private ReindexDebug reindexDebug;
7276

7377
@Override
7478
public void run(String... cmdLineArgs) {
75-
_upgradeManager.register(restoreIndices);
76-
_upgradeManager.register(restoreBackup);
77-
_upgradeManager.register(removeUnknownAspects);
79+
// Register upgrades with null checks and warnings
80+
if (restoreIndices != null) {
81+
_upgradeManager.register(restoreIndices);
82+
} else {
83+
log.warn("RestoreIndices upgrade not available - bean not found");
84+
}
85+
86+
if (restoreBackup != null) {
87+
_upgradeManager.register(restoreBackup);
88+
} else {
89+
log.warn("RestoreBackup upgrade not available - bean not found");
90+
}
91+
92+
if (removeUnknownAspects != null) {
93+
_upgradeManager.register(removeUnknownAspects);
94+
} else {
95+
log.warn("RemoveUnknownAspects upgrade not available - bean not found");
96+
}
97+
98+
if (loadIndices != null) {
99+
_upgradeManager.register(loadIndices);
100+
} else {
101+
log.warn("LoadIndices upgrade not available - bean not found");
102+
}
103+
78104
if (systemUpdate != null) {
79105
_upgradeManager.register(systemUpdate);
106+
} else {
107+
log.warn("SystemUpdate upgrade not available - bean not found");
80108
}
109+
81110
if (systemUpdateBlocking != null) {
82111
_upgradeManager.register(systemUpdateBlocking);
112+
} else {
113+
log.warn("SystemUpdateBlocking upgrade not available - bean not found");
83114
}
115+
84116
if (systemUpdateNonBlocking != null) {
85117
_upgradeManager.register(systemUpdateNonBlocking);
118+
} else {
119+
log.warn("SystemUpdateNonBlocking upgrade not available - bean not found");
86120
}
121+
87122
if (systemUpdateCron != null) {
88123
_upgradeManager.register(systemUpdateCron);
124+
} else {
125+
log.warn("SystemUpdateCron upgrade not available - bean not found");
89126
}
127+
90128
if (reindexDebug != null) {
91129
_upgradeManager.register(reindexDebug);
130+
} else {
131+
log.warn("ReindexDebug upgrade not available - bean not found");
92132
}
93133

94134
final Args args = new Args();

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,13 @@
11
package com.linkedin.datahub.upgrade;
22

3-
import com.linkedin.gms.factory.auth.AuthorizerChainFactory;
4-
import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory;
5-
import com.linkedin.gms.factory.event.ExternalEventsServiceFactory;
6-
import com.linkedin.gms.factory.event.KafkaConsumerPoolFactory;
7-
import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
8-
import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
9-
import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
10-
import com.linkedin.gms.factory.kafka.trace.KafkaTraceReaderFactory;
11-
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
12-
import com.linkedin.gms.factory.trace.TraceServiceFactory;
133
import org.springframework.boot.WebApplicationType;
14-
import org.springframework.boot.autoconfigure.SpringBootApplication;
15-
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
164
import org.springframework.boot.builder.SpringApplicationBuilder;
17-
import org.springframework.context.annotation.ComponentScan;
18-
import org.springframework.context.annotation.FilterType;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.context.annotation.Import;
197

208
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
21-
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class})
22-
@ComponentScan(
23-
basePackages = {
24-
"com.linkedin.gms.factory",
25-
"com.linkedin.datahub.upgrade.config",
26-
"com.linkedin.datahub.upgrade.system.cdc",
27-
"com.linkedin.metadata.dao.producer"
28-
},
29-
excludeFilters = {
30-
@ComponentScan.Filter(
31-
type = FilterType.ASSIGNABLE_TYPE,
32-
classes = {
33-
ScheduledAnalyticsFactory.class,
34-
AuthorizerChainFactory.class,
35-
DataHubAuthorizerFactory.class,
36-
SimpleKafkaConsumerFactory.class,
37-
KafkaEventConsumerFactory.class,
38-
GraphQLEngineFactory.class,
39-
KafkaTraceReaderFactory.class,
40-
TraceServiceFactory.class,
41-
KafkaConsumerPoolFactory.class,
42-
ExternalEventsServiceFactory.class
43-
})
44-
})
9+
@Configuration
10+
@Import(UpgradeConfigurationSelector.class)
4511
public class UpgradeCliApplication {
4612
public static void main(String[] args) {
4713
new SpringApplicationBuilder(UpgradeCliApplication.class, UpgradeCli.class)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.linkedin.datahub.upgrade;
2+
3+
import com.linkedin.datahub.upgrade.conditions.GeneralUpgradeCondition;
4+
import com.linkedin.datahub.upgrade.conditions.LoadIndicesCondition;
5+
import com.linkedin.datahub.upgrade.config.GeneralUpgradeConfiguration;
6+
import com.linkedin.datahub.upgrade.loadindices.LoadIndicesUpgradeConfig;
7+
import org.springframework.context.annotation.Conditional;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.context.annotation.Import;
10+
11+
/**
12+
* Configuration selector that chooses the appropriate upgrade configuration based on command-line
13+
* arguments.
14+
*/
15+
@Configuration
16+
public class UpgradeConfigurationSelector {
17+
18+
/** Configuration for LoadIndices upgrade - excludes Kafka components */
19+
@Configuration
20+
@Conditional(LoadIndicesCondition.class)
21+
@Import(LoadIndicesUpgradeConfig.class)
22+
public static class LoadIndicesConfiguration {}
23+
24+
/** Configuration for general upgrades - includes all components */
25+
@Configuration
26+
@Conditional(GeneralUpgradeCondition.class)
27+
@Import(GeneralUpgradeConfiguration.class)
28+
public static class GeneralConfiguration {}
29+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.linkedin.datahub.upgrade.conditions;
2+
3+
import java.util.Objects;
4+
import java.util.Set;
5+
import org.springframework.boot.ApplicationArguments;
6+
import org.springframework.context.annotation.Condition;
7+
import org.springframework.context.annotation.ConditionContext;
8+
import org.springframework.core.type.AnnotatedTypeMetadata;
9+
10+
public class GeneralUpgradeCondition implements Condition {
11+
public static final String LOAD_INDICES_ARG = "LoadIndices";
12+
public static final Set<String> EXCLUDED_ARGS = Set.of(LOAD_INDICES_ARG);
13+
14+
@Override
15+
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
16+
// This condition matches when LoadIndices is NOT in the arguments
17+
return !context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs().stream()
18+
.filter(Objects::nonNull)
19+
.anyMatch(EXCLUDED_ARGS::contains);
20+
}
21+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.linkedin.datahub.upgrade.conditions;
2+
3+
import java.util.List;
4+
import java.util.Objects;
5+
import java.util.Set;
6+
import org.springframework.boot.ApplicationArguments;
7+
import org.springframework.context.annotation.Condition;
8+
import org.springframework.context.annotation.ConditionContext;
9+
import org.springframework.core.type.AnnotatedTypeMetadata;
10+
11+
public class LoadIndicesCondition implements Condition {
12+
public static final String LOAD_INDICES_ARG = "LoadIndices";
13+
public static final Set<String> LOAD_INDICES_ARGS = Set.of(LOAD_INDICES_ARG);
14+
15+
@Override
16+
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
17+
List<String> nonOptionArgs =
18+
context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs();
19+
if (nonOptionArgs == null) {
20+
return false;
21+
}
22+
return nonOptionArgs.stream().filter(Objects::nonNull).anyMatch(LOAD_INDICES_ARGS::contains);
23+
}
24+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.linkedin.datahub.upgrade.config;
1+
package com.linkedin.datahub.upgrade.conditions;
22

33
import java.util.Objects;
44
import org.springframework.boot.ApplicationArguments;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.linkedin.datahub.upgrade.config;
1+
package com.linkedin.datahub.upgrade.conditions;
22

33
import java.util.Objects;
44
import java.util.Set;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.linkedin.datahub.upgrade.config;
1+
package com.linkedin.datahub.upgrade.conditions;
22

33
import java.util.Objects;
44
import org.springframework.boot.ApplicationArguments;

0 commit comments

Comments
 (0)