Skip to content

Commit

Permalink
[chunjun-core] bug fix for local mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
zoudaokoulife committed Feb 20, 2024
1 parent 786f68a commit 4b610c9
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 48 deletions.
29 changes: 1 addition & 28 deletions chunjun-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink sql -->
Expand Down Expand Up @@ -376,33 +376,6 @@

<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<!-- Run scala compiler in the process-resources phas e, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>

<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
2 changes: 2 additions & 0 deletions chunjun-core/src/main/java/com/dtstack/chunjun/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -330,6 +331,7 @@ private static void configStreamExecutionEnvironment(
Thread.currentThread().getContextClassLoader(),
ConstantValue.DIRTY_DATA_DIR_NAME);
// TODO sql 支持restore.
FactoryUtil.setFactoryHelper(factoryHelper);
}
PluginUtil.registerShipfileToCachedFile(options.getAddShipfile(), env);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.dtstack.chunjun.util.PropertiesUtil;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -50,11 +51,15 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(Option
flinkConf = GlobalConfiguration.loadConfiguration(options.getFlinkConfDir());
}
StreamExecutionEnvironment env;
if (StringUtils.equalsIgnoreCase(ClusterMode.local.name(), options.getMode())) {
if (StringUtils.equalsIgnoreCase(ClusterMode.localTest.name(), options.getMode())) {
flinkConf.addAll(cfg);
env = new MyLocalStreamEnvironment(flinkConf);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(cfg);
// 如果没有配置默认的并行度,那么ChunJun 默认设置并行度为1
if (!cfg.contains(CoreOptions.DEFAULT_PARALLELISM)) {
env.setParallelism(1);
}
}
env.getConfig().disableClosureCleaner();
env.getConfig().setGlobalJobParameters(cfg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,58 @@
*/
package com.dtstack.chunjun.util;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;

import java.util.HashMap;
import java.util.Map;

public class ConnectorNameConvertUtil {

// tuple f0 package name && directory name,f1 class name
private static Map<String, Tuple2<String, String>> connectorNameMap = new HashMap<>();
private static final Map<String, Tuple3<String, String, String>> connectorNameMap =
new HashMap<>();

static {
connectorNameMap.put("es", new Tuple2<>("elasticsearch6", "elasticsearch6"));
connectorNameMap.put("hbase", new Tuple2<>("hbase14", "HBase14"));
connectorNameMap.put("tidb", new Tuple2<>("mysql", "mysql"));
connectorNameMap.put("restapi", new Tuple2<>("http", "http"));
connectorNameMap.put("adbpostgresql", new Tuple2<>("postgresql", "postgresql"));
connectorNameMap.put("dorisbatch", new Tuple2<>("doris", "doris"));
connectorNameMap.put("gbase", new Tuple2<>("gBase", "gBase"));
connectorNameMap.put("protobuf", new Tuple2<>("pbformat", "pbformat"));
connectorNameMap.put("starrocks", new Tuple2<>("starrocks", "starRocks"));
connectorNameMap.put(
"es", new Tuple3<>("elasticsearch7", "elasticsearch7", "elasticsearch7"));
connectorNameMap.put("hbase", new Tuple3<>("hbase14", "HBase14", "hbase14"));
connectorNameMap.put("hbase2", new Tuple3<>("hbase2", "HBase2", null));
connectorNameMap.put("tidb", new Tuple3<>("mysql", "mysql", "mysql"));
connectorNameMap.put("restapi", new Tuple3<>("http", "http", "http"));
connectorNameMap.put(
"adbpostgresql", new Tuple3<>("postgresql", "postgresql", "postgresql"));
connectorNameMap.put("metadatamysql", new Tuple3<>("mysql", "metaDataMysql", "mysql"));
connectorNameMap.put("metadatakafka", new Tuple3<>("kafka", "metaDataKafka", "kafka"));
connectorNameMap.put(
"metadatavertica", new Tuple3<>("vertica", "metaDataVertica", "vertica"));
connectorNameMap.put(
"metadatahbase", new Tuple3<>("hbase14", "metaDataHBase14", "hbase14"));
connectorNameMap.put("metadatahive2", new Tuple3<>("hive", "metaDataHive", "hive"));
connectorNameMap.put("metadatahive1", new Tuple3<>("hive1", "metaDataHive1", "hive1"));
connectorNameMap.put(
"metadatasparkthrift",
new Tuple3<>("sparkthrift", "metaDataSparkThrift", "sparkthrift"));
connectorNameMap.put(
"metadatasqlserver", new Tuple3<>("sqlserver", "metaDataSqlServer", "sqlserver"));
connectorNameMap.put("metadataoracle", new Tuple3<>("oracle", "metaDataOracle", "oracle"));
connectorNameMap.put(
"metadataphoenix5", new Tuple3<>("phoenix5", "metaDataPhoenix5", "phoenix5"));
connectorNameMap.put("metadatahive1cdc", new Tuple3<>("hive", "metaDataHiveCdc", "hive"));
connectorNameMap.put("metadatahive2cdc", new Tuple3<>("hive", "metaDataHiveCdc", "hive"));
connectorNameMap.put(
"metadatasparkthriftcdc", new Tuple3<>("hive", "metaDataHiveCdc", "hive"));
connectorNameMap.put("metadatatidb", new Tuple3<>("tidb", "metaDataTidb", "tidb"));
connectorNameMap.put("dorisbatch", new Tuple3<>("doris", "doris", "doris"));
connectorNameMap.put("starrocks", new Tuple3<>("starrocks", "starRocks", null));
connectorNameMap.put("gbase", new Tuple3<>("gbase", "gBase", null));
connectorNameMap.put("protobuf", new Tuple3<>("pbformat", "pbformat", null));
connectorNameMap.put("huaweihbase", new Tuple3<>("huaweihbase", "huaweiHbase", null));
connectorNameMap.put("huaweihdfs", new Tuple3<>("huaweihdfs", "huaweiHdfs", null));
connectorNameMap.put("pgwal", new Tuple3<>("pgwal", "PGWal", null));
connectorNameMap.put("kafka-hw", new Tuple3<>("huaweikafka", "huaweikafka", null));
connectorNameMap.put("hudi", new Tuple3<>("hudi", "hoodie", null));
connectorNameMap.put(
"inceptorhyperbase", new Tuple3<>("inceptorhyperbase", "DirectHyperbase", null));
}

public static String convertClassPrefix(String originName) {
Expand All @@ -52,4 +84,12 @@ public static String convertPackageName(String originName) {
}
return connectorNameMap.get(originName).f0;
}

public static String convertPluginName(String originName) {
Tuple3<String, String, String> tuple3 = connectorNameMap.get(originName);
if (tuple3 == null) {
return originName;
}
return tuple3.f2 != null ? tuple3.f2 : originName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,17 +484,26 @@ public static List<String> setPipelineOptionsToEnvConfig(
jarList.addAll(urlList);

List<String> pipelineJars = new ArrayList<>();

List<String> classpathList = configuration.get(PipelineOptions.CLASSPATHS);
if (classpathList == null) {
classpathList = new ArrayList<>(urlList.size());
}

log.info("ChunJun executionMode: " + executionMode);
if (ClusterMode.getByName(executionMode) == ClusterMode.kubernetesApplication) {
for (String jarUrl : jarList) {
String newJarUrl = jarUrl;
if (StringUtils.startsWith(jarUrl, File.separator)) {
newJarUrl = "file:" + jarUrl;
}
if (pipelineJars.contains(newJarUrl)) {
continue;
if (!pipelineJars.contains(newJarUrl)) {
pipelineJars.add(newJarUrl);
}

if (!classpathList.contains(newJarUrl)) {
classpathList.add(newJarUrl);
}
pipelineJars.add(newJarUrl);
}
} else {
pipelineJars.addAll(jarList);
Expand All @@ -503,11 +512,6 @@ public static List<String> setPipelineOptionsToEnvConfig(
log.info("ChunJun reset pipeline.jars: " + pipelineJars);
configuration.set(PipelineOptions.JARS, pipelineJars);

List<String> classpathList = configuration.get(PipelineOptions.CLASSPATHS);
if (classpathList == null) {
classpathList = new ArrayList<>(urlList.size());
}
classpathList.addAll(pipelineJars);
configuration.set(PipelineOptions.CLASSPATHS, classpathList);
return pipelineJars;
} catch (Exception e) {
Expand Down

0 comments on commit 4b610c9

Please sign in to comment.