Skip to content

Commit 7f28d8b

Browse files
committed
kafka sink outtype
1 parent 648ce78 commit 7f28d8b

File tree

4 files changed

+28
-46
lines changed
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

4 files changed

+28
-46
lines changed

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,8 @@
1818

1919
package com.dtstack.flink.sql.sink.kafka;
2020

21-
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2221
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
2322
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
24-
import org.apache.commons.lang3.StringUtils;
25-
import org.apache.flink.api.common.typeinfo.TypeInformation;
26-
import org.apache.flink.api.java.tuple.Tuple2;
27-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
28-
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
29-
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.datastream.DataStreamSink;
31-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
32-
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
33-
import org.apache.flink.table.api.TableSchema;
34-
import org.apache.flink.table.sinks.RetractStreamTableSink;
35-
import org.apache.flink.table.sinks.TableSink;
36-
import org.apache.flink.types.Row;
3723

3824
import java.util.Optional;
3925
import java.util.Properties;
@@ -46,20 +32,19 @@
4632
public class KafkaSink extends AbstractKafkaSink {
4733
@Override
4834
public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
49-
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
35+
KafkaSinkTableInfo kafkaSinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
5036

51-
Properties kafkaProperties = getKafkaProperties(kafka11SinkTableInfo);
52-
this.tableName = kafka11SinkTableInfo.getName();
53-
this.topic = kafka11SinkTableInfo.getTopic();
37+
Properties kafkaProperties = getKafkaProperties(kafkaSinkTableInfo);
38+
this.tableName = kafkaSinkTableInfo.getName();
39+
this.topic = kafkaSinkTableInfo.getTopic();
5440
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
55-
this.partitionKeys = getPartitionKeys(kafka11SinkTableInfo);
56-
this.fieldNames = kafka11SinkTableInfo.getFields();
57-
this.fieldTypes = getTypeInformations(kafka11SinkTableInfo);
41+
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);
42+
this.fieldNames = kafkaSinkTableInfo.getFields();
43+
this.fieldTypes = getTypeInformations(kafkaSinkTableInfo);
5844
this.schema = buildTableSchema(fieldNames, fieldTypes);
59-
this.parallelism = kafka11SinkTableInfo.getParallelism();
45+
this.parallelism = kafkaSinkTableInfo.getParallelism();
6046
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
61-
this.kafkaProducer011 = new KafkaProducerFactory()
62-
.createKafkaProducer(kafka11SinkTableInfo, getOutputType().getTypeAt(1), kafkaProperties, partitioner, partitionKeys);
47+
this.kafkaProducer011 = new KafkaProducerFactory().createKafkaProducer(kafkaSinkTableInfo, getOutputType(), kafkaProperties, partitioner, partitionKeys);
6348
return this;
6449
}
6550
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,19 @@
3434
public class KafkaSink extends AbstractKafkaSink {
3535
@Override
3636
public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
37-
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
37+
KafkaSinkTableInfo kafka09SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
3838

39-
Properties kafkaProperties = getKafkaProperties(kafka11SinkTableInfo);
40-
this.tableName = kafka11SinkTableInfo.getName();
41-
this.topic = kafka11SinkTableInfo.getTopic();
39+
Properties kafkaProperties = getKafkaProperties(kafka09SinkTableInfo);
40+
this.tableName = kafka09SinkTableInfo.getName();
41+
this.topic = kafka09SinkTableInfo.getTopic();
4242
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
43-
this.partitionKeys = getPartitionKeys(kafka11SinkTableInfo);
44-
this.fieldNames = kafka11SinkTableInfo.getFields();
45-
this.fieldTypes = getTypeInformations(kafka11SinkTableInfo);
43+
this.partitionKeys = getPartitionKeys(kafka09SinkTableInfo);
44+
this.fieldNames = kafka09SinkTableInfo.getFields();
45+
this.fieldTypes = getTypeInformations(kafka09SinkTableInfo);
4646
this.schema = buildTableSchema(fieldNames, fieldTypes);
47-
this.parallelism = kafka11SinkTableInfo.getParallelism();
47+
this.parallelism = kafka09SinkTableInfo.getParallelism();
4848
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
49-
this.kafkaProducer011 = new KafkaProducer09Factory()
50-
.createKafkaProducer(kafka11SinkTableInfo, getOutputType().getTypeAt(1), kafkaProperties, partitioner, partitionKeys);
49+
this.kafkaProducer011 = new KafkaProducer09Factory().createKafkaProducer(kafka09SinkTableInfo, getOutputType(), kafkaProperties, partitioner, partitionKeys);
5150
return this;
5251
}
5352
}

kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,20 +38,19 @@ public class KafkaSink extends AbstractKafkaSink {
3838

3939
@Override
4040
public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
41-
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
41+
KafkaSinkTableInfo kafka10SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
4242

43-
Properties kafkaProperties = getKafkaProperties(kafka11SinkTableInfo);
44-
this.tableName = kafka11SinkTableInfo.getName();
45-
this.topic = kafka11SinkTableInfo.getTopic();
43+
Properties kafkaProperties = getKafkaProperties(kafka10SinkTableInfo);
44+
this.tableName = kafka10SinkTableInfo.getName();
45+
this.topic = kafka10SinkTableInfo.getTopic();
4646
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
47-
this.partitionKeys = getPartitionKeys(kafka11SinkTableInfo);
48-
this.fieldNames = kafka11SinkTableInfo.getFields();
49-
this.fieldTypes = getTypeInformations(kafka11SinkTableInfo);
47+
this.partitionKeys = getPartitionKeys(kafka10SinkTableInfo);
48+
this.fieldNames = kafka10SinkTableInfo.getFields();
49+
this.fieldTypes = getTypeInformations(kafka10SinkTableInfo);
5050
this.schema = buildTableSchema(fieldNames, fieldTypes);
51-
this.parallelism = kafka11SinkTableInfo.getParallelism();
51+
this.parallelism = kafka10SinkTableInfo.getParallelism();
5252
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
53-
this.kafkaProducer011 = new KafkaProducer010Factory()
54-
.createKafkaProducer(kafka11SinkTableInfo, getOutputType().getTypeAt(1), kafkaProperties, partitioner, partitionKeys);
53+
this.kafkaProducer011 = new KafkaProducer010Factory().createKafkaProducer(kafka10SinkTableInfo, getOutputType(), kafkaProperties, partitioner, partitionKeys);
5554
return this;
5655
}
5756
}

kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
4949
this.schema = buildTableSchema(fieldNames, fieldTypes);
5050
this.parallelism = kafka11SinkTableInfo.getParallelism();
5151
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
52-
this.kafkaProducer011 = new KafkaProducer011Factory()
53-
.createKafkaProducer(kafka11SinkTableInfo, getOutputType().getTypeAt(1), kafkaProperties, partitioner, partitionKeys);
52+
this.kafkaProducer011 = new KafkaProducer011Factory().createKafkaProducer(kafka11SinkTableInfo, getOutputType(), kafkaProperties, partitioner, partitionKeys);
5453
return this;
5554
}
5655
}

0 commit comments

Comments
 (0)