Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,3 @@ pom.xml.*
# Scala Plugin for VSCode
.metals
.bloop/

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public class Mappings {

public static final Collection<Mapping> ALL = Arrays.asList(
new FilterMapping(),
new ProjectionMapping()
new ProjectionMapping(),
new TableSinkMapping()
);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.genericjdbc.mapping;

import org.apache.wayang.basic.operators.TableSink;
import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.core.mapping.OperatorPattern;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
import org.apache.wayang.core.mapping.SubplanPattern;
import org.apache.wayang.genericjdbc.operators.GenericJdbcTableSinkOperator;
import org.apache.wayang.genericjdbc.platform.GenericJdbcPlatform;

import java.util.Collection;
import java.util.Collections;

/**
* Mapping from {@link TableSink} to {@link GenericJdbcTableSinkOperator}.
*/
public class TableSinkMapping implements Mapping {

@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
GenericJdbcPlatform.getInstance()
));
}

private SubplanPattern createSubplanPattern() {
final OperatorPattern<TableSink> operatorPattern = new OperatorPattern<>(
"sink", new TableSink<>(null, null, null), false
);
return SubplanPattern.createSingleton(operatorPattern);
}

private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<TableSink>(
(matchedOperator, epoch) -> new GenericJdbcTableSinkOperator(matchedOperator).at(epoch)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.genericjdbc.operators;

import org.apache.wayang.basic.operators.TableSink;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator;

/**
* GenericJdbc implementation of the {@link JdbcTableSinkOperator}.
*/
public class GenericJdbcTableSinkOperator extends JdbcTableSinkOperator implements GenericJdbcExecutionOperator {

public GenericJdbcTableSinkOperator(String tableName, String[] columnNames) {
super(tableName, columnNames);
}

public GenericJdbcTableSinkOperator(TableSink<Record> that) {
super(that);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.wayang.jdbc.operators.JdbcFilterOperator;
import org.apache.wayang.jdbc.operators.JdbcJoinOperator;
import org.apache.wayang.jdbc.operators.JdbcProjectionOperator;
import org.apache.wayang.jdbc.operators.JdbcTableSinkOperator;
import org.apache.wayang.jdbc.operators.JdbcTableSource;
import org.apache.wayang.jdbc.platform.JdbcPlatformTemplate;
import org.apache.logging.log4j.LogManager;
Expand All @@ -52,6 +53,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
Expand All @@ -78,14 +80,96 @@ public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) {

@Override
public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) {
final Tuple2<String, SqlQueryChannel.Instance> pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
final String query = pair.field0;
final SqlQueryChannel.Instance queryChannel = pair.field1;
// Check if this stage ends with a sink operator
final Collection<?> termTasks = stage.getTerminalTasks();
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];

if (termTask.getOperator() instanceof JdbcTableSinkOperator) {
// If it is a sink stage: compose and execute SQL directly within the database
JdbcExecutor.executeSinkStage(stage, optimizationContext, this);
} else {
//If it is normal stage: compose SQL and store in channel for downstream consumption
final Tuple2<String, SqlQueryChannel.Instance> pair = JdbcExecutor.createSqlQuery(stage, optimizationContext, this);
final String query = pair.field0;
final SqlQueryChannel.Instance queryChannel = pair.field1;
queryChannel.setSqlQuery(query);
executionState.register(queryChannel);
}
}

queryChannel.setSqlQuery(query);
/**
* Handles execution stages that end with a {@link JdbcTableSinkOperator}.
* Composes a SQL query from the stage's operators and executes it directly
* on the database connection, keeping all data within the database.
*
* @param stage the execution stage ending with a sink
* @param optimizationContext provides optimization information
* @param jdbcExecutor the executor with the database connection
*/
private static void executeSinkStage(final ExecutionStage stage,
final OptimizationContext optimizationContext,
final JdbcExecutor jdbcExecutor) {
final Collection<?> startTasks = stage.getStartTasks();
final Collection<?> termTasks = stage.getTerminalTasks();

// Return the tipChannelInstance.
executionState.register(queryChannel);
assert startTasks.size() == 1 : "Invalid JDBC stage: multiple sources are not currently supported";
final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0];
assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported.";
final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0];
assert startTask.getOperator() instanceof TableSource
: "Invalid JDBC stage: Start task has to be a TableSource";
assert termTask.getOperator() instanceof JdbcTableSinkOperator
: "Invalid JDBC stage: Terminal task has to be a JdbcTableSinkOperator";

// Extract operators from the stage
final JdbcTableSource tableOp = (JdbcTableSource) startTask.getOperator();
final JdbcTableSinkOperator sinkOp = (JdbcTableSinkOperator) termTask.getOperator();
final Collection<JdbcFilterOperator> filterTasks = new ArrayList<>(4);
JdbcProjectionOperator projectionTask = null;
final Collection<JdbcJoinOperator<?>> joinTasks = new ArrayList<>();

// Walk through intermediate operators, stopping at the sink
ExecutionTask nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(startTask, stage);
while (nextTask != null && !(nextTask.getOperator() instanceof JdbcTableSinkOperator)) {
if (nextTask.getOperator() instanceof final JdbcFilterOperator filterOperator) {
filterTasks.add(filterOperator);
} else if (nextTask.getOperator() instanceof JdbcProjectionOperator projectionOperator) {
assert projectionTask == null;
projectionTask = projectionOperator;
} else if (nextTask.getOperator() instanceof JdbcJoinOperator joinOperator) {
joinTasks.add(joinOperator);
} else {
throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString()));
}
nextTask = JdbcExecutor.findJdbcExecutionOperatorTaskInStage(nextTask, stage);
}

// Compose the SELECT query
final StringBuilder selectQuery = createSqlString(jdbcExecutor, tableOp, filterTasks, projectionTask, joinTasks);

// Remove trailing semicolon from SELECT
String selectSql = selectQuery.toString();
if (selectSql.endsWith(";")) {
selectSql = selectSql.substring(0, selectSql.length() - 1);
}

// Get the sink's SQL clause
final String sinkClause = sinkOp.createSqlClause(jdbcExecutor.connection, jdbcExecutor.functionCompiler);

// Execute on the database
try (Statement stmt = jdbcExecutor.connection.createStatement()) {
// Handle overwrite: drop existing table first
if ("overwrite".equals(sinkOp.getMode())) {
stmt.execute("DROP TABLE IF EXISTS " + sinkOp.getTableName());
}
// Execute the composed query: CREATE TABLE x AS SELECT ... or INSERT INTO x SELECT ...
final String fullSql = sinkClause + " " + selectSql + sinkOp.createSqlSuffix();
stmt.execute(fullSql);
jdbcExecutor.logger.info("Executed SQL sink: {}", fullSql);
} catch (SQLException e) {
throw new WayangException("Failed to execute SQL sink on table: " + sinkOp.getTableName(), e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.jdbc.operators;

import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.operators.TableSink;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.jdbc.compiler.FunctionCompiler;

import java.sql.Connection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

/**
* Abstract JDBC-based implementation of {@link TableSink} that operates within
* the {@link org.apache.wayang.jdbc.channels.SqlQueryChannel} ecosystem.
* Instead of pulling data into Java/Spark memory and inserting via JDBC,
* this operator wraps the composed SQL query in a CREATE TABLE AS SELECT
* or INSERT INTO ... SELECT statement, keeping all data within the database.
*/
public abstract class JdbcTableSinkOperator extends TableSink<Record> implements JdbcExecutionOperator {

public JdbcTableSinkOperator(String tableName, String[] columnNames) {
super(null, null, tableName, columnNames);
}

public JdbcTableSinkOperator(TableSink<Record> that) {
super(that);
}

@Override
public String createSqlClause(Connection connection, FunctionCompiler compiler) {
String mode = this.getMode();
if ("overwrite".equals(mode)) {
return "CREATE TABLE " + this.getTableName() + " AS";
}
return "INSERT INTO " + this.getTableName();
}

/**
* Returns a SQL suffix appended after the composed SELECT query.
* Default is empty, which works for most databases (PostgreSQL, SQLite, MySQL).
* Subclasses can potentiallyoverride for dialect-specific syntax (e.g., HSQLDB that we used for the tests requires
* parenthesized subquery form: {@code CREATE TABLE x AS (SELECT ...)}).
*/
public String createSqlSuffix() {
return "";
}

@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
return Collections.singletonList(this.getPlatform().getSqlQueryChannelDescriptor());
}

@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
throw new UnsupportedOperationException("This operator has no outputs.");
}

@Override
public String getLoadProfileEstimatorConfigurationKey() {
return String.format("wayang.%s.tablesink.load", this.getPlatform().getPlatformId());
}

@Override
public Optional<LoadProfileEstimator> createLoadProfileEstimator(Configuration configuration) {
return JdbcExecutionOperator.super.createLoadProfileEstimator(configuration);
}
}
Loading
Loading