Skip to content

Commit 648ce78

Browse files
committed
add abstractKafkaSink
1 parent 6134850 commit 648ce78

File tree

5 files changed

+194
-450
lines changed
  • kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka10/kafka10-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka11/kafka11-sink/src/main/java/com/dtstack/flink/sql/sink/kafka
  • kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka

5 files changed

+194
-450
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22+
import com.dtstack.flink.sql.sink.kafka.table.KafkaSinkTableInfo;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.apache.flink.api.common.typeinfo.TypeInformation;
25+
import org.apache.flink.api.java.tuple.Tuple2;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
28+
import org.apache.flink.streaming.api.datastream.DataStream;
29+
import org.apache.flink.streaming.api.datastream.DataStreamSink;
30+
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
31+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
32+
import org.apache.flink.table.api.TableSchema;
33+
import org.apache.flink.table.sinks.RetractStreamTableSink;
34+
import org.apache.flink.table.sinks.TableSink;
35+
import org.apache.flink.table.types.DataType;
36+
import org.apache.flink.types.Row;
37+
import org.apache.flink.util.Preconditions;
38+
import org.apache.kafka.clients.consumer.ConsumerConfig;
39+
40+
import java.util.Optional;
41+
import java.util.Properties;
42+
import java.util.stream.IntStream;
43+
44+
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
45+
46+
/**
47+
* Date: 2020/3/30
48+
* Company: www.dtstack.com
49+
* @author maqi
50+
*/
51+
public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener {
52+
public static final String SINK_OPERATOR_NAME_TPL = "${topic}_${table}";
53+
54+
protected String[] fieldNames;
55+
protected TypeInformation<?>[] fieldTypes;
56+
57+
protected String[] partitionKeys;
58+
protected String sinkOperatorName;
59+
protected Properties properties;
60+
protected int parallelism;
61+
protected String topic;
62+
protected String tableName;
63+
64+
protected TableSchema schema;
65+
protected SinkFunction<Tuple2<Boolean,Row>> kafkaProducer011;
66+
67+
protected Optional<FlinkKafkaPartitioner<Tuple2<Boolean,Row>>> partitioner;
68+
69+
protected Properties getKafkaProperties(KafkaSinkTableInfo KafkaSinkTableInfo) {
70+
Properties props = new Properties();
71+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSinkTableInfo.getBootstrapServers());
72+
73+
for (String key : KafkaSinkTableInfo.getKafkaParamKeys()) {
74+
props.setProperty(key, KafkaSinkTableInfo.getKafkaParam(key));
75+
}
76+
return props;
77+
}
78+
79+
protected TypeInformation[] getTypeInformations(KafkaSinkTableInfo kafka11SinkTableInfo) {
80+
Class<?>[] fieldClasses = kafka11SinkTableInfo.getFieldClasses();
81+
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
82+
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
83+
.toArray(TypeInformation[]::new);
84+
return types;
85+
}
86+
87+
88+
protected TableSchema buildTableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
89+
Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "fieldNames length must equals fieldTypes length !");
90+
91+
DataType[] dataTypes = IntStream.range(0, fieldTypes.length)
92+
.mapToObj(i -> fromLegacyInfoToDataType(fieldTypes[i]))
93+
.toArray(DataType[]::new);
94+
95+
TableSchema tableSchema = TableSchema.builder()
96+
.fields(fieldNames, dataTypes)
97+
.build();
98+
return tableSchema;
99+
}
100+
101+
protected String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
102+
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())) {
103+
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
104+
}
105+
return null;
106+
}
107+
108+
@Override
109+
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
110+
DataStreamSink<Tuple2<Boolean, Row>> dataStreamSink = dataStream.addSink(kafkaProducer011).name(sinkOperatorName);
111+
if (parallelism > 0) {
112+
dataStreamSink.setParallelism(parallelism);
113+
}
114+
return dataStreamSink;
115+
}
116+
117+
@Override
118+
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
119+
consumeDataStream(dataStream);
120+
}
121+
122+
@Override
123+
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
124+
this.fieldNames = fieldNames;
125+
this.fieldTypes = fieldTypes;
126+
return this;
127+
}
128+
129+
@Override
130+
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
131+
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
132+
}
133+
134+
@Override
135+
public TableSchema getTableSchema() {
136+
return schema;
137+
}
138+
139+
@Override
140+
public TypeInformation<Row> getRecordType() {
141+
return new RowTypeInfo(fieldTypes, fieldNames);
142+
}
143+
}

kafka/kafka-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 13 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -43,113 +43,23 @@
4343
* @create: 2019-11-05 11:45
4444
* @description:
4545
**/
46-
public class KafkaSink implements RetractStreamTableSink<Row>, IStreamSinkGener<KafkaSink> {
47-
private static final String SINK_OPERATOR_NAME_TPL = "${topic}_${table}";
48-
49-
protected String[] fieldNames;
50-
51-
protected TypeInformation<?>[] fieldTypes;
52-
53-
protected String topic;
54-
55-
protected int parallelism;
56-
57-
protected Properties properties;
58-
59-
protected FlinkKafkaProducer<Tuple2<Boolean, Row>> flinkKafkaProducer;
60-
61-
/** The schema of the table. */
62-
private TableSchema schema;
63-
64-
/** Partitioner to select Kafka partition for each item. */
65-
protected Optional<FlinkKafkaPartitioner<Tuple2<Boolean, Row>>> partitioner;
66-
67-
private String[] partitionKeys;
68-
69-
protected String sinkOperatorName;
70-
71-
46+
public class KafkaSink extends AbstractKafkaSink {
7247
@Override
7348
public KafkaSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
74-
KafkaSinkTableInfo kafkaSinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
75-
this.topic = kafkaSinkTableInfo.getTopic();
76-
77-
properties = new Properties();
78-
properties.setProperty("bootstrap.servers", kafkaSinkTableInfo.getBootstrapServers());
49+
KafkaSinkTableInfo kafka11SinkTableInfo = (KafkaSinkTableInfo) targetTableInfo;
7950

80-
for (String key : kafkaSinkTableInfo.getKafkaParamKeys()) {
81-
properties.setProperty(key, kafkaSinkTableInfo.getKafkaParam(key));
82-
}
51+
Properties kafkaProperties = getKafkaProperties(kafka11SinkTableInfo);
52+
this.tableName = kafka11SinkTableInfo.getName();
53+
this.topic = kafka11SinkTableInfo.getTopic();
8354
this.partitioner = Optional.of(new CustomerFlinkPartition<>());
84-
this.partitionKeys = getPartitionKeys(kafkaSinkTableInfo);
85-
this.fieldNames = kafkaSinkTableInfo.getFields();
86-
TypeInformation[] types = new TypeInformation[kafkaSinkTableInfo.getFields().length];
87-
for (int i = 0; i < kafkaSinkTableInfo.getFieldClasses().length; i++) {
88-
types[i] = TypeInformation.of(kafkaSinkTableInfo.getFieldClasses()[i]);
89-
}
90-
this.fieldTypes = types;
91-
92-
TableSchema.Builder schemaBuilder = TableSchema.builder();
93-
for (int i = 0; i < fieldNames.length; i++) {
94-
schemaBuilder.field(fieldNames[i], fieldTypes[i]);
95-
}
96-
this.schema = schemaBuilder.build();
97-
98-
Integer parallelism = kafkaSinkTableInfo.getParallelism();
99-
if (parallelism != null) {
100-
this.parallelism = parallelism;
101-
}
102-
103-
this.flinkKafkaProducer = (FlinkKafkaProducer<Tuple2<Boolean, Row>>) new KafkaProducerFactory()
104-
.createKafkaProducer(kafkaSinkTableInfo, getOutputType().getTypeAt(1), properties, partitioner, partitionKeys);
105-
106-
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", kafkaSinkTableInfo.getName());
55+
this.partitionKeys = getPartitionKeys(kafka11SinkTableInfo);
56+
this.fieldNames = kafka11SinkTableInfo.getFields();
57+
this.fieldTypes = getTypeInformations(kafka11SinkTableInfo);
58+
this.schema = buildTableSchema(fieldNames, fieldTypes);
59+
this.parallelism = kafka11SinkTableInfo.getParallelism();
60+
this.sinkOperatorName = SINK_OPERATOR_NAME_TPL.replace("${topic}", topic).replace("${table}", tableName);
61+
this.kafkaProducer011 = new KafkaProducerFactory()
62+
.createKafkaProducer(kafka11SinkTableInfo, getOutputType().getTypeAt(1), kafkaProperties, partitioner, partitionKeys);
10763
return this;
10864
}
109-
110-
@Override
111-
public TypeInformation<Row> getRecordType() {
112-
return new RowTypeInfo(fieldTypes, fieldNames);
113-
}
114-
115-
@Override
116-
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
117-
consumeDataStream(dataStream);
118-
}
119-
120-
@Override
121-
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
122-
DataStreamSink<Tuple2<Boolean, Row>> dataStreamSink = dataStream.addSink(flinkKafkaProducer).name(sinkOperatorName);
123-
if (parallelism > 0) {
124-
dataStreamSink.setParallelism(parallelism);
125-
}
126-
return dataStreamSink;
127-
128-
}
129-
130-
@Override
131-
public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
132-
return new TupleTypeInfo(org.apache.flink.table.api.Types.BOOLEAN(), new RowTypeInfo(fieldTypes, fieldNames));
133-
}
134-
135-
136-
@Override
137-
public TableSchema getTableSchema() {
138-
return schema;
139-
}
140-
141-
142-
@Override
143-
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
144-
this.fieldNames = fieldNames;
145-
this.fieldTypes = fieldTypes;
146-
return this;
147-
}
148-
149-
private String[] getPartitionKeys(KafkaSinkTableInfo kafkaSinkTableInfo) {
150-
if (StringUtils.isNotBlank(kafkaSinkTableInfo.getPartitionKeys())) {
151-
return StringUtils.split(kafkaSinkTableInfo.getPartitionKeys(), ',');
152-
}
153-
return null;
154-
}
15565
}

0 commit comments

Comments
 (0)