diff --git a/Makefile b/Makefile index c37c76f5..26107359 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,10 @@ build: bounce: build undeploy deploy deploy-samples deploy-config deploy-demo # Integration tests expect K8s and Kafka to be running -integration-tests: +integration-tests: deploy-dev-environment deploy-samples + kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-1 --for=condition=Ready --timeout=10m -n kafka + kubectl wait kafkatopic.kafka.strimzi.io/existing-topic-2 --for=condition=Ready --timeout=10m -n kafka kubectl port-forward -n kafka svc/one-kafka-external-0 9092 & echo $$! > port-forward.pid ./gradlew intTest || kill `cat port-forward.pid` kill `cat port-forward.pid` @@ -50,8 +53,8 @@ deploy-dev-environment: deploy-config kubectl apply -f ./deploy/samples/kafkadb.yaml undeploy-dev-environment: - kubectl delete $(shell kubectl get kafkatopic.kafka.strimzi.io -o name -n kafka) -n kafka || echo "skipping" - kubectl delete $(shell kubectl get strimzi -o name -n kafka) -n kafka || echo "skipping" + kubectl delete kafkatopic.kafka.strimzi.io -n kafka --all || echo "skipping" + kubectl delete strimzi -n kafka --all || echo "skipping" kubectl delete pvc -l strimzi.io/name=one-kafka -n kafka || echo "skipping" kubectl delete -f "https://strimzi.io/install/latest?namespace=kafka" -n kafka || echo "skipping" kubectl delete -f ./deploy/samples/kafkadb.yaml || echo "skipping" diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index ccebba77..a4b76b95 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index bdc9a83b..e2847c82 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.11.1-bin.zip networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 79a61d42..f5feea6d 100755 --- a/gradlew +++ b/gradlew @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# SPDX-License-Identifier: Apache-2.0 +# ############################################################################## # @@ -55,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -83,10 +85,9 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s +' "$PWD" ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,10 +134,13 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. @@ -144,7 +148,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -152,7 +156,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -197,11 +201,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/gradlew.bat b/gradlew.bat index 6689b85b..9b42019c 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -13,6 +13,8 @@ @rem See the License for the specific language governing permissions and @rem limitations under the License. @rem +@rem SPDX-License-Identifier: Apache-2.0 +@rem @if "%DEBUG%"=="" @echo off @rem ########################################################################## @@ -43,11 +45,11 @@ set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 if %ERRORLEVEL% equ 0 goto execute -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail @@ -57,11 +59,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe if exist "%JAVA_EXE%" goto execute -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. +echo. 1>&2 +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2 +echo. 1>&2 +echo Please set the JAVA_HOME variable in your environment to match the 1>&2 +echo location of your Java installation. 1>&2 goto fail diff --git a/hoptimator-api/build.gradle b/hoptimator-api/build.gradle index 6aa79ea3..571a1964 100644 --- a/hoptimator-api/build.gradle +++ b/hoptimator-api/build.gradle @@ -1,7 +1,53 @@ plugins { id 'java' + id 'maven-publish' } dependencies { // plz keep it this way } + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + maven(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-api' + version = System.getenv('VERSION') + from components.java + pom { + name = 'hoptimator-api' + description = 'API for extending Hoptimator' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java similarity index 84% rename from hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java rename to hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java index 39456709..3181a8c7 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Job.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java @@ -1,9 +1,7 @@ -package com.linkedin.hoptimator.util; +package com.linkedin.hoptimator; import java.util.function.Function; -import org.apache.calcite.sql.SqlDialect; - public class Job { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/MaterializedView.java similarity index 91% rename from hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java rename to hoptimator-api/src/main/java/com/linkedin/hoptimator/MaterializedView.java index 7f5f8cf0..6fd662de 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/MaterializedView.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/MaterializedView.java @@ -1,13 +1,9 @@ -package com.linkedin.hoptimator.util; +package com.linkedin.hoptimator; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.calcite.sql.SqlDialect; - -import com.linkedin.hoptimator.util.planner.Pipeline; - public class MaterializedView { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Pipeline.java similarity index 91% rename from hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java rename to hoptimator-api/src/main/java/com/linkedin/hoptimator/Pipeline.java index 15f6c350..ae9ee805 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/Pipeline.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Pipeline.java @@ -1,11 +1,9 @@ -package com.linkedin.hoptimator.util.planner; +package com.linkedin.hoptimator; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import com.linkedin.hoptimator.Deployable; - /** * A set of Deployable objects that work together to deliver data. diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Sink.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Sink.java new file mode 100644 index 00000000..43ca2f53 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Sink.java @@ -0,0 +1,17 @@ +package com.linkedin.hoptimator; + +import java.util.List; +import java.util.Map; + + +public class Sink extends Source { + + public Sink(String database, List path, Map options) { + super(database, path, options); + } + + @Override + public String toString() { + return "Sink[" + pathString() + "]"; + } +} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Source.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java similarity index 74% rename from hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Source.java rename to hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java index 1702de5f..7536edb5 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Source.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java @@ -1,23 +1,19 @@ -package com.linkedin.hoptimator.util; +package com.linkedin.hoptimator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.apache.calcite.rel.type.RelDataType; - public class Source { private final String database; private final List path; - private final RelDataType rowType; private final Map options; - public Source(String database, List path, RelDataType rowType, Map options) { + public Source(String database, List path, Map options) { this.database = database; this.path = path; - this.rowType = rowType; this.options = options; } @@ -42,10 +38,6 @@ public List path() { return path; } - public RelDataType rowType() { - return rowType; - } - protected String pathString() { return path.stream().collect(Collectors.joining(".")); } diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/SqlDialect.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/SqlDialect.java new file mode 100644 index 00000000..32735348 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/SqlDialect.java @@ -0,0 +1,7 @@ +package com.linkedin.hoptimator; + + +public enum SqlDialect { + ANSI, + FLINK +} diff --git a/hoptimator-avro/build.gradle b/hoptimator-avro/build.gradle index 3d6282b8..8acd2c84 100644 --- a/hoptimator-avro/build.gradle +++ b/hoptimator-avro/build.gradle @@ -2,8 +2,6 @@ plugins { id 'java' } - - dependencies { implementation project(':hoptimator-api') implementation libs.avro diff --git a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java index ce5ec8af..13ea1623 100644 --- a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java +++ b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java @@ -9,9 +9,9 @@ import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.rel.RelNode; -import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.jline.reader.Completer; +import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; import com.linkedin.hoptimator.util.DeploymentService; import com.linkedin.hoptimator.util.planner.PipelineRel; @@ -89,7 +89,7 @@ public void execute(String line, DispatchCallback dispatchCallback) { try { RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; PipelineRel.Implementor plan = DeploymentService.plan(rel); - sqlline.output(plan.sql().apply(AnsiSqlDialect.DEFAULT)); + sqlline.output(plan.sql().apply(SqlDialect.ANSI)); } catch (SQLException e) { sqlline.error(e); dispatchCallback.setToFailure(); diff --git a/hoptimator-jdbc-driver/build.gradle b/hoptimator-jdbc-driver/build.gradle new file mode 100644 index 00000000..d39f757a --- /dev/null +++ b/hoptimator-jdbc-driver/build.gradle @@ -0,0 +1,63 @@ +plugins { + id 'com.gradleup.shadow' version '8.3.5' + id 'java' + id 'maven-publish' +} + +dependencies { + implementation project(':hoptimator-avro') + implementation project(':hoptimator-demodb') + implementation project(':hoptimator-jdbc') + implementation project(':hoptimator-util') + implementation project(':hoptimator-k8s') +} + +shadowJar { + zip64 true + mergeServiceFiles() +} + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + shadow(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-jdbc-driver' + version = System.getenv('VERSION') + from components.shadow + pom { + name = 'hoptimator-jdbc-driver' + description = 'Hoptimator JDBC driver fat jar' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } +} diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 8ec6db27..73ba94b6 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -57,9 +57,9 @@ import org.apache.calcite.util.Util; import com.linkedin.hoptimator.Database; +import com.linkedin.hoptimator.MaterializedView; +import com.linkedin.hoptimator.Pipeline; import com.linkedin.hoptimator.util.DeploymentService; -import com.linkedin.hoptimator.util.MaterializedView; -import com.linkedin.hoptimator.util.planner.Pipeline; import com.linkedin.hoptimator.util.planner.PipelineRel; import static org.apache.calcite.util.Static.RESOURCE; diff --git a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java index fe8ee49b..207ba779 100644 --- a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java +++ b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java @@ -73,7 +73,8 @@ public void execute(Context context, boolean execute) throws Exception { CalciteConnection conn = (CalciteConnection) context.connection(); RelNode rel = HoptimatorDriver.convert(conn.createPrepareContext(), sql).root.rel; String specs = - DeploymentService.plan(rel).pipeline().specify().stream().collect(Collectors.joining("---\n")); + DeploymentService.plan(rel).pipeline().specify().stream().sorted() + .collect(Collectors.joining("---\n")); String[] lines = specs.replaceAll(";\n", "\n").split("\n"); context.echo(Arrays.asList(lines)); } else { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java index bfcab3e6..9cd54af1 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java @@ -9,10 +9,10 @@ import java.util.Properties; import java.util.stream.Collectors; +import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Connector; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList; -import com.linkedin.hoptimator.util.Source; import com.linkedin.hoptimator.util.Template; diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnectorProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnectorProvider.java index 01f854ac..9fcb7622 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnectorProvider.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnectorProvider.java @@ -6,7 +6,7 @@ import com.linkedin.hoptimator.Connector; import com.linkedin.hoptimator.ConnectorProvider; -import com.linkedin.hoptimator.util.Source; +import com.linkedin.hoptimator.Source; public class K8sConnectorProvider implements ConnectorProvider { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java index e556ea03..26a684a1 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sDeployerProvider.java @@ -6,12 +6,11 @@ import org.apache.calcite.schema.impl.ViewTable; +import com.linkedin.hoptimator.Job; +import com.linkedin.hoptimator.MaterializedView; +import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Deployer; import com.linkedin.hoptimator.DeployerProvider; -import com.linkedin.hoptimator.util.Job; -import com.linkedin.hoptimator.util.MaterializedView; -import com.linkedin.hoptimator.util.Source; - public class K8sDeployerProvider implements DeployerProvider { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java index 3b1ff684..cb4e92fd 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java @@ -6,14 +6,14 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.dialect.CalciteSqlDialect; import org.apache.calcite.sql.dialect.MysqlSqlDialect; +import com.linkedin.hoptimator.Job; +import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; -import com.linkedin.hoptimator.util.Job; import com.linkedin.hoptimator.util.Template; @@ -35,9 +35,8 @@ public List specify(Job job) throws SQLException { .with("database", job.sink().database()) .with("schema", job.sink().schema()) .with("table", job.sink().table()) - .with("sql", sql.apply(MysqlSqlDialect.DEFAULT)) - .with("ansisql", sql.apply(AnsiSqlDialect.DEFAULT)) - .with("calcitesql", sql.apply(CalciteSqlDialect.DEFAULT)) + .with("sql", sql.apply(SqlDialect.ANSI)) + .with("flinksql", sql.apply(SqlDialect.FLINK)) .with(job.sink().options()); return jobTemplateApi.list() .stream() diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java index cf4f8c27..b82fde93 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sMaterializedViewDeployer.java @@ -4,10 +4,10 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta; +import com.linkedin.hoptimator.MaterializedView; import com.linkedin.hoptimator.k8s.models.V1alpha1View; import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList; import com.linkedin.hoptimator.k8s.models.V1alpha1ViewSpec; -import com.linkedin.hoptimator.util.MaterializedView; class K8sMaterializedViewDeployer extends K8sDeployer { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java index 2fb9e8da..d8a795ea 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sPipelineDeployer.java @@ -3,14 +3,13 @@ import java.sql.SQLException; import java.util.stream.Collectors; -import org.apache.calcite.sql.dialect.AnsiSqlDialect; - import io.kubernetes.client.openapi.models.V1ObjectMeta; +import com.linkedin.hoptimator.MaterializedView; +import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineSpec; -import com.linkedin.hoptimator.util.MaterializedView; /** Deploys the Pipeline behind a MaterializedView. */ @@ -24,7 +23,7 @@ class K8sPipelineDeployer extends K8sDeployer path, RelDataType rowType, Map options) { - super(database, path, rowType, options); - } - - @Override - public String toString() { - return "Sink[" + pathString() + "]"; - } -} diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 28557450..b78859a3 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -12,15 +13,16 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.util.Litmus; import com.linkedin.hoptimator.Deployable; +import com.linkedin.hoptimator.Job; +import com.linkedin.hoptimator.Pipeline; +import com.linkedin.hoptimator.Sink; +import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.SqlDialect; import com.linkedin.hoptimator.util.ConnectionService; import com.linkedin.hoptimator.util.DeploymentService; -import com.linkedin.hoptimator.util.Job; -import com.linkedin.hoptimator.util.Sink; -import com.linkedin.hoptimator.util.Source; /** @@ -39,7 +41,7 @@ public interface PipelineRel extends RelNode { /** Implements a deployable Pipeline. */ class Implementor { - private final List sources = new ArrayList<>(); + private final Map sources = new HashMap<>(); private RelNode query; private String sinkDatabase = "pipeline"; private List sinkPath = Arrays.asList(new String[]{"PIPELINE", "SINK"}); @@ -63,7 +65,7 @@ public void visit(RelNode node) throws SQLException { * a connector. The connector is configured via `CREATE TABLE...WITH(...)`. */ public void addSource(String database, List path, RelDataType rowType, Map options) { - sources.add(new Source(database, path, rowType, options)); + sources.put(new Source(database, path, options), rowType); } /** @@ -86,7 +88,7 @@ public void setQuery(RelNode query) { /** Combine Deployables into a Pipeline */ public Pipeline pipeline() throws SQLException { List deployables = new ArrayList<>(); - for (Source source : sources) { + for (Source source : sources.keySet()) { DeploymentService.deployables(source, Source.class).forEach(x -> deployables.add(x)); Map configs = ConnectionService.configure(source, Source.class); } @@ -94,10 +96,10 @@ public Pipeline pipeline() throws SQLException { if (targetRowType == null) { targetRowType = query.getRowType(); } - Sink sink = new Sink(sinkDatabase, sinkPath, targetRowType, sinkOptions); + Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions); Map sinkConfigs = ConnectionService.configure(sink, Sink.class); Job job = new Job(sink, sql()); - RelOptUtil.eq(sink.table(), sink.rowType(), "pipeline", query.getRowType(), Litmus.THROW); + RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); DeploymentService.deployables(sink, Sink.class).forEach(x -> deployables.add(x)); DeploymentService.deployables(job, Job.class).forEach(x -> deployables.add(x)); return new Pipeline(deployables); @@ -106,19 +108,19 @@ public Pipeline pipeline() throws SQLException { public Function sql() throws SQLException { ScriptImplementor script = ScriptImplementor.empty(); List deployables = new ArrayList<>(); - for (Source source : sources) { - Map configs = ConnectionService.configure(source, Source.class); - script = script.connector(source.table(), source.rowType(), configs); + for (Map.Entry source : sources.entrySet()) { + Map configs = ConnectionService.configure(source.getKey(), Source.class); + script = script.connector(source.getKey().table(), source.getValue(), configs); } RelDataType targetRowType = sinkRowType; if (targetRowType == null) { targetRowType = query.getRowType(); } - Sink sink = new Sink(sinkDatabase, sinkPath, targetRowType, sinkOptions); + Sink sink = new Sink(sinkDatabase, sinkPath, sinkOptions); Map sinkConfigs = ConnectionService.configure(sink, Sink.class); - script = script.connector(sink.table(), sink.rowType(), sinkConfigs); + script = script.connector(sink.table(), targetRowType, sinkConfigs); script = script.insert(sink.table(), query); - RelOptUtil.eq(sink.table(), sink.rowType(), "pipeline", query.getRowType(), Litmus.THROW); + RelOptUtil.eq(sink.table(), targetRowType, "pipeline", query.getRowType(), Litmus.THROW); return script.seal(); } } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 0804b16a..78e96a30 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -28,6 +28,7 @@ import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.SqlWriterConfig; import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.dialect.MysqlSqlDialect; import org.apache.calcite.sql.fun.SqlRowOperator; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.pretty.SqlPrettyWriter; @@ -107,8 +108,22 @@ default String sql(SqlDialect dialect) { } /** Generate SQL for a given dialect */ - default Function seal() { - return x -> sql(x); + default Function seal() { + return x -> { + final String sql; + switch(x) { + case ANSI: + sql = sql(AnsiSqlDialect.DEFAULT); + break; + case FLINK: + // Flink uses MySQL dialect, more or less + sql = sql(MysqlSqlDialect.DEFAULT); + break; + default: + throw new IllegalStateException("unreachable"); + }; + return sql; + }; } /** Implements an arbitrary RelNode as a statement */ diff --git a/settings.gradle b/settings.gradle index 39374b4d..94efd91c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ include 'hoptimator-cli' include 'hoptimator-demodb' include 'hoptimator-flink-runner' include 'hoptimator-jdbc' +include 'hoptimator-jdbc-driver' include 'hoptimator-k8s' include 'hoptimator-kafka-controller' include 'hoptimator-kafka'