Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33461][Connector/JDBC] Support streaming related semantics for the new JDBC source #119

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
486 changes: 463 additions & 23 deletions docs/content.zh/docs/connectors/datastream/jdbc.md

Large diffs are not rendered by default.

200 changes: 197 additions & 3 deletions docs/content/docs/connectors/datastream/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,208 @@ under the License.

# JDBC Connector

This connector provides a sink that writes data to a JDBC database.
This connector provides a source that read data from a JDBC database and
provides a sink that writes data to a JDBC database.

To use it, add the following dependency to your project (along with your JDBC driver):

{{< connector_artifact flink-connector-jdbc jdbc >}}

Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
A driver dependency is also required to connect to a specified database. Please consult your database documentation on how to add the corresponding driver.
Note that the streaming connectors are currently __NOT__ part of the binary distribution.
See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
A driver dependency is also required to connect to a specified database.
Please consult your database documentation on how to add the corresponding driver.

## JDBC Source

Configuration goes as follow (see also {{< javadoc file="org/apache/flink/connector/jdbc/source/JdbcSource.html" name="JdbcSource javadoc" >}}
and {{< javadoc file="org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.html" name="JdbcSourceBuilder javadoc" >}}).

### Usage

{{< tabs "4ab65f13-607a-411a-8d24-e709f701cd41" >}}
{{< tab "Java" >}}
```java
JdbcSource source = JdbcSourceBuilder.builder()
// Required
.setSql(...)
.setResultExtractor(...)
.setUsername(...)
.setPassword(...)
.setDriverName(...)
.setDBUrl(...)
.setTypeInformation(...)

// Optional
.setContinuousUnBoundingSettings(...)
.setJdbcParameterValuesProvider(...)
.setDeliveryGuarantee(...)
.setConnectionCheckTimeoutSeconds(...)

// The extended JDBC connection property passing
.setConnectionProperty("key", "value")

// other attributes
.setSplitReaderFetchBatchSize(...)
.setResultSetType(...)
.setResultSetConcurrency(...)
.setAutoCommit(...)
.setResultSetFetchSize(...)
.setConnectionProvider(...)
.build();

```
{{< /tab >}}
{{< tab "Python" >}}
```python
Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}

### Delivery guarantee

The JDBC source provides `at-least-once`/`at-most-once(default)`/`exactly-once` guarantee.
The `JdbcSource` supports `Delivery guarantee` semantic based on `Concur` of `ResultSet`.

**NOTE:** Here's a few disadvantage. It only makes sense for corresponding semantic
that the `ResultSet` corresponding to this SQL(`JdbcSourceSplit`)
remains unchanged in the whole lifecycle of `JdbcSourceSplit` processing.
Unfortunately, this condition is not met in most databases and data scenarios.
See [FLIP-239](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217386271) for more details.

### ResultExtractor

An `Extractor` to extract a record from `ResultSet` executed by a sql.

```java
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;

import java.sql.ResultSet;
import java.sql.SQLException;

class Book {
public Book(Long id, String title) {
this.id = id;
this.title = title;
}

final Long id;
final String title;
};

ResultExtractor resultExtractor = new ResultExtractor() {
@Override
public Object extract(ResultSet resultSet) throws SQLException {
return new Book(resultSet.getLong("id"), resultSet.getString("titile"));
}
};

```

### JdbcParameterValuesProvider

A provider to provide parameters in sql to fulfill actual value in the corresponding placeholders, which is in the form of two-dimension array.
See {{< javadoc file="org/apache/flink/connector/jdbc/split/JdbcParameterValuesProvider.html" name="JdbcParameterValuesProvider javadoc" >}} for more details.

```java

class TestEntry {
...
};

ResultSetExtractor extractor = ...;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

JdbcSource<TestEntry> jdbcSource =
JdbcSource.<TestEntry>builder()
.setTypeInformation(TypeInformation.of(TestEntry.class))
.setSql("select * from testing_table where id >= ? and id <= ?")
.setJdbcParameterValuesProvider(
new JdbcGenericParameterValuesProvider(
new Serializable[][] {{1001, 1005}, {1006, 1010}}))
...
.build();
env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "TestSource")
.addSink(new DiscardSink());
env.execute();

```

### Minimalist Streaming Semantic and ContinuousUnBoundingSettings

If you want to generate continuous milliseconds parameters based on sliding-window,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If you want to generate continuous milliseconds parameters based on sliding-window,
If you want to generate continuous milliseconds parameters based on sliding-window,

Add the empty line.

please have a try on setting the followed attributes of `JdbcSource`:

```java

jdbcSourceBuilder =
JdbcSource.<TestEntry>builder()
.setSql("select * from testing_table where ts >= ? and ts < ?")

// Required for streaming related semantic.
.setContinuousUnBoundingSettings(new ContinuousUnBoundingSettings(Duration.ofMillis(10L), Duration.ofSeconds(1L)))
.setJdbcParameterValuesProvider(new JdbcSlideTimingParameterProvider(0L, 1000L, 1000L, 100L))
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE);

// other attributes
...

JdbcSource source = jdbcSourceBuilder.build();
```

See {{< javadoc file="org/apache/flink/connector/jdbc/utils/ContinuousUnBoundingSettings.html" name="ContinuousUnBoundingSettings javadoc" >}} for more details.

### Full example

{{< tabs "4ab65f13-608a-411a-8d24-e303f348ds81" >}}
{{< tab "Java" >}}

```java

public class JdbcSourceExample {

static class Book {
public Book(Long id, String title) {
this.id = id;
this.title = title;
}

final Long id;
final String title;
};

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
JdbcSource<Book> jdbcSource =
JdbcSource.<Book>builder()
.setTypeInformation(TypeInformation.of(Book.class))
.setSql("select * from testing_table where id < ?")
.setDBUrl(...)
.setJdbcParameterValuesProvider(
new JdbcGenericParameterValuesProvider(
new Serializable[][] {{1001L}}))
.setDriverName(...)
.setResultExtractor(resultSet ->
new Book(
resultSet.getLong("id"),
resultSet.getString("title")))
.build();
env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "TestSource")
.addSink(new DiscardingSink());
env.execute();
}
}
```
{{< /tab >}}
{{< tab "Python" >}}
```python
Still not supported in Python API.
```
{{< /tab >}}
{{< /tabs >}}

## `JdbcSink.sink`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Properties getProperties() {
@Override
public boolean isConnectionValid() throws SQLException {
return connection != null
&& !connection.isClosed()
&& connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
import org.apache.flink.connector.jdbc.utils.ContinuousUnBoundingSettings;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Objects;
Expand All @@ -55,6 +58,7 @@ public class JdbcSource<OUT>

private final Boundedness boundedness;
private final TypeInformation<OUT> typeInformation;
private final @Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings;

private final Configuration configuration;
private final JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider;
Expand All @@ -69,29 +73,20 @@ public class JdbcSource<OUT>
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
ResultExtractor<OUT> resultExtractor,
TypeInformation<OUT> typeInformation,
DeliveryGuarantee deliveryGuarantee) {
@Nullable DeliveryGuarantee deliveryGuarantee,
@Nullable ContinuousUnBoundingSettings continuousUnBoundingSettings) {
this.configuration = Preconditions.checkNotNull(configuration);
this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
this.resultExtractor = Preconditions.checkNotNull(resultExtractor);
this.deliveryGuarantee = Preconditions.checkNotNull(deliveryGuarantee);
this.deliveryGuarantee =
Objects.isNull(deliveryGuarantee) ? DeliveryGuarantee.NONE : deliveryGuarantee;
this.typeInformation = Preconditions.checkNotNull(typeInformation);
this.boundedness = Boundedness.BOUNDED;
}

JdbcSource(
Configuration configuration,
JdbcConnectionProvider connectionProvider,
JdbcSqlSplitEnumeratorBase.Provider<JdbcSourceSplit> sqlSplitEnumeratorProvider,
ResultExtractor<OUT> resultExtractor,
TypeInformation<OUT> typeInformation) {
this(
configuration,
connectionProvider,
sqlSplitEnumeratorProvider,
resultExtractor,
typeInformation,
DeliveryGuarantee.NONE);
this.continuousUnBoundingSettings = continuousUnBoundingSettings;
this.boundedness =
Objects.isNull(continuousUnBoundingSettings)
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
Expand Down Expand Up @@ -119,7 +114,10 @@ public SourceReader<OUT, JdbcSourceSplit> createReader(SourceReaderContext reade
public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> createEnumerator(
SplitEnumeratorContext<JdbcSourceSplit> enumContext) throws Exception {
return new JdbcSourceEnumerator(
enumContext, sqlSplitEnumeratorProvider.create(), new ArrayList<>());
enumContext,
sqlSplitEnumeratorProvider.create(),
continuousUnBoundingSettings,
new ArrayList<>());
}

@Override
Expand All @@ -132,6 +130,7 @@ public SplitEnumerator<JdbcSourceSplit, JdbcSourceEnumeratorState> restoreEnumer
return new JdbcSourceEnumerator(
enumContext,
sqlSplitEnumeratorProvider.restore(optionalUserDefinedSplitEnumeratorState),
continuousUnBoundingSettings,
checkpoint.getRemainingSplits());
}

Expand Down Expand Up @@ -193,6 +192,7 @@ public boolean equals(Object o) {
&& Objects.equals(sqlSplitEnumeratorProvider, that.sqlSplitEnumeratorProvider)
&& Objects.equals(connectionProvider, that.connectionProvider)
&& Objects.equals(resultExtractor, that.resultExtractor)
&& deliveryGuarantee == that.deliveryGuarantee;
&& deliveryGuarantee == that.deliveryGuarantee
&& Objects.equals(continuousUnBoundingSettings, that.continuousUnBoundingSettings);
}
}
Loading