19
19
20
20
import com .dtstack .flink .sql .format .FormatType ;
21
21
import com .dtstack .flink .sql .format .SerializationMetricWrapper ;
22
- import com .dtstack .flink .sql .sink .kafka .serialization .AvroCRowSerializationSchema ;
23
- import com .dtstack .flink .sql .sink .kafka .serialization .CsvCRowSerializationSchema ;
24
- import com .dtstack .flink .sql .sink .kafka .serialization .JsonCRowSerializationSchema ;
22
+ import com .dtstack .flink .sql .sink .kafka .serialization .AvroTuple2SerializationSchema ;
23
+ import com .dtstack .flink .sql .sink .kafka .serialization .CsvTupleSerializationSchema ;
24
+ import com .dtstack .flink .sql .sink .kafka .serialization .JsonTupleSerializationSchema ;
25
25
import com .dtstack .flink .sql .sink .kafka .table .KafkaSinkTableInfo ;
26
26
import org .apache .commons .lang3 .StringUtils ;
27
27
import org .apache .flink .api .common .serialization .SerializationSchema ;
28
28
import org .apache .flink .api .common .typeinfo .TypeInformation ;
29
+ import org .apache .flink .api .java .tuple .Tuple2 ;
29
30
import org .apache .flink .streaming .api .functions .sink .RichSinkFunction ;
30
31
import org .apache .flink .streaming .connectors .kafka .partitioner .FlinkKafkaPartitioner ;
31
- import org .apache .flink .table . runtime . types .CRow ;
32
+ import org .apache .flink .types .Row ;
32
33
33
34
import java .util .Optional ;
34
35
import java .util .Properties ;
@@ -51,28 +52,29 @@ public abstract class AbstractKafkaProducerFactory {
51
52
* @param partitioner
52
53
* @return
53
54
*/
54
- public abstract RichSinkFunction <CRow > createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <CRow > typeInformation , Properties properties , Optional <FlinkKafkaPartitioner <CRow >> partitioner , String [] partitionKeys );
55
+ public abstract RichSinkFunction <Tuple2 <Boolean ,Row >> createKafkaProducer (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 <Boolean ,Row >> typeInformation ,
56
+ Properties properties , Optional <FlinkKafkaPartitioner <Tuple2 <Boolean ,Row >>> partitioner , String [] partitionKeys );
55
57
56
- protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <CRow > typeInformation ) {
57
- SerializationSchema <CRow > serializationSchema = createSerializationSchema (kafkaSinkTableInfo , typeInformation );
58
+ protected SerializationMetricWrapper createSerializationMetricWrapper (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 < Boolean , Row > > typeInformation ) {
59
+ SerializationSchema <Tuple2 < Boolean , Row > > serializationSchema = createSerializationSchema (kafkaSinkTableInfo , typeInformation );
58
60
return new SerializationMetricWrapper (serializationSchema );
59
61
}
60
62
61
- private SerializationSchema <CRow > createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <CRow > typeInformation ) {
62
- SerializationSchema <CRow > serializationSchema = null ;
63
+ private SerializationSchema <Tuple2 < Boolean , Row >> createSerializationSchema (KafkaSinkTableInfo kafkaSinkTableInfo , TypeInformation <Tuple2 < Boolean , Row > > typeInformation ) {
64
+ SerializationSchema <Tuple2 < Boolean , Row > > serializationSchema = null ;
63
65
if (FormatType .JSON .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
64
66
if (StringUtils .isNotBlank (kafkaSinkTableInfo .getSchemaString ())) {
65
- serializationSchema = new JsonCRowSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
67
+ serializationSchema = new JsonTupleSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
66
68
} else if (typeInformation != null && typeInformation .getArity () != 0 ) {
67
- serializationSchema = new JsonCRowSerializationSchema (typeInformation , kafkaSinkTableInfo .getUpdateMode ());
69
+ serializationSchema = new JsonTupleSerializationSchema (typeInformation , kafkaSinkTableInfo .getUpdateMode ());
68
70
} else {
69
71
throw new IllegalArgumentException ("sinkDataType:" + FormatType .JSON .name () + " must set schemaString(JSON Schema)or TypeInformation<Row>" );
70
72
}
71
73
} else if (FormatType .CSV .name ().equalsIgnoreCase (kafkaSinkTableInfo .getSinkDataType ())) {
72
74
if (StringUtils .isBlank (kafkaSinkTableInfo .getFieldDelimiter ())) {
73
75
throw new IllegalArgumentException ("sinkDataType:" + FormatType .CSV .name () + " must set fieldDelimiter" );
74
76
}
75
- final CsvCRowSerializationSchema .Builder serSchemaBuilder = new CsvCRowSerializationSchema .Builder (typeInformation );
77
+ final CsvTupleSerializationSchema .Builder serSchemaBuilder = new CsvTupleSerializationSchema .Builder (typeInformation );
76
78
serSchemaBuilder .setFieldDelimiter (kafkaSinkTableInfo .getFieldDelimiter ().toCharArray ()[0 ]);
77
79
serSchemaBuilder .setUpdateMode (kafkaSinkTableInfo .getUpdateMode ());
78
80
@@ -81,7 +83,7 @@ private SerializationSchema<CRow> createSerializationSchema(KafkaSinkTableInfo k
81
83
if (StringUtils .isBlank (kafkaSinkTableInfo .getSchemaString ())) {
82
84
throw new IllegalArgumentException ("sinkDataType:" + FormatType .AVRO .name () + " must set schemaString" );
83
85
}
84
- serializationSchema = new AvroCRowSerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
86
+ serializationSchema = new AvroTuple2SerializationSchema (kafkaSinkTableInfo .getSchemaString (), kafkaSinkTableInfo .getUpdateMode ());
85
87
}
86
88
87
89
if (null == serializationSchema ) {
0 commit comments