Skip to content

Commit 94c2784

Browse files
committed
Merge remote-tracking branch 'origin/feat_1.8_kafkaSourceAvro' into v1.8.0_dev
# Conflicts: # kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java
2 parents 59b996b + 0b22f9c commit 94c2784

File tree

10 files changed

+328
-586
lines changed

10 files changed

+328
-586
lines changed

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,14 @@ public static String addJdbcParam(String dbUrl, Map<String, String> addParams, b
219219
return preStr + "?" + sb.toString();
220220
}
221221

222-
public static boolean isJosn(String str){
222+
public static boolean isJson(String str) {
223223
boolean flag = false;
224-
if(StringUtils.isNotBlank(str)){
224+
if (StringUtils.isNotBlank(str)) {
225225
try {
226-
objectMapper.readValue(str,Map.class);
226+
objectMapper.readValue(str, Map.class);
227227
flag = true;
228228
} catch (Throwable e) {
229-
flag=false;
229+
flag = false;
230230
}
231231
}
232232
return flag;

docs/kafkaSource.md

Lines changed: 26 additions & 198 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
## 1.格式:
22
```
3-
数据现在支持json格式{"xx":"bb","cc":"dd"}
43
54
CREATE TABLE tableName(
65
colName colType,
@@ -15,9 +14,8 @@ CREATE TABLE tableName(
1514
topic ='topicName',
1615
groupId='test',
1716
parallelism ='parllNum',
18-
--timezone='America/Los_Angeles',
1917
timezone='Asia/Shanghai',
20-
sourcedatatype ='json' #可不设置
18+
sourcedatatype ='dt_nest' #可不设置
2119
);
2220
```
2321

@@ -47,7 +45,9 @@ CREATE TABLE tableName(
4745
|topicIsPattern | topic是否是正则表达式格式(true&#124;false) |否| false
4846
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
4947
|parallelism | 并行度设置||1|
50-
|sourcedatatype | 数据类型||json|
48+
|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式||dt_nest|
49+
|schemaInfo | avro类型使用的schema信息|||
50+
|fieldDelimiter |csv类型使用的数据分隔符|| | |
5151
|timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai'
5252
**kafka相关参数可以自定义,使用kafka.开头即可。**
5353
```
@@ -169,24 +169,10 @@ CREATE TABLE MyTable(
169169
parallelism ='1'
170170
);
171171
```
172-
# 二、csv格式数据源
173-
根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
174-
## 1.参数:
175-
176-
|参数名称|含义|是否必填|默认值|
177-
|----|---|---|---|
178-
|type | kafka09 |||
179-
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
180-
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
181-
|topic | 需要读取的 topic 名称|||
182-
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
183-
|parallelism | 并行度设置 ||1|
184-
|sourcedatatype | 数据类型||csv|
185-
|fielddelimiter | 字段分隔符|||
186-
|lengthcheckpolicy | 单行字段条数检查策略 ||可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。|
187-
**kafka相关参数可以自定义,使用kafka.开头即可。**
188172

189-
## 2.样例:
173+
## 7.csv格式数据源
174+
175+
190176
```
191177
CREATE TABLE MyTable(
192178
name varchar,
@@ -203,186 +189,28 @@ CREATE TABLE MyTable(
203189
--topic ='mqTest.*',
204190
--topicIsPattern='true'
205191
parallelism ='1',
206-
sourcedatatype ='csv',
207-
fielddelimiter ='\|',
208-
lengthcheckpolicy = 'PAD'
192+
sourceDatatype ='csv'
209193
);
210194
```
211-
# 三、text格式数据源UDF自定义拆分
212-
Kafka源表数据解析流程:Kafka Source Table -> UDTF ->Realtime Compute -> SINK。从Kakfa读入的数据,都是VARBINARY(二进制)格式,对读入的每条数据,都需要用UDTF将其解析成格式化数据。
213-
与其他格式不同,本格式定义DDL必须与以下SQL一摸一样,表中的五个字段顺序务必保持一致:
214-
215-
## 1. 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
216-
```
217-
create table kafka_stream(
218-
_topic STRING,
219-
_messageKey STRING,
220-
_message STRING,
221-
_partition INT,
222-
_offset BIGINT,
223-
) with (
224-
type ='kafka09',
225-
bootstrapServers ='172.16.8.198:9092',
226-
zookeeperQuorum ='172.16.8.198:2181/kafka',
227-
offsetReset ='latest',
228-
topic ='nbTest1',
229-
parallelism ='1',
230-
sourcedatatype='text'
231-
232-
```
233-
## 2.参数:
234-
235-
|参数名称|含义|是否必填|默认值|
236-
|----|---|---|---|
237-
|type | kafka09 |||
238-
|bootstrapServers | kafka bootstrap-server 地址信息(多个用逗号隔开)|||
239-
|zookeeperQuorum | kafka zk地址信息(多个之间用逗号分隔)|||
240-
|topic | 需要读取的 topic 名称|||
241-
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest]||latest|
242-
|parallelism | 并行度设置||1|
243-
|sourcedatatype | 数据类型||text|
244-
**kafka相关参数可以自定义,使用kafka.开头即可。**
195+
## 8.avro格式数据源
245196

246-
## 2.自定义:
247-
从kafka读出的数据,需要进行窗口计算。 按照实时计算目前的设计,滚窗/滑窗等窗口操作,需要(且必须)在源表DDL上定义Watermark。Kafka源表比较特殊。如果要以kafka中message字段中的的Event Time进行窗口操作,
248-
需要先从message字段,使用UDX解析出event time,才能定义watermark。 在kafka源表场景中,需要使用计算列。 假设,kafka中写入的数据如下:
249-
2018-11-11 00:00:00|1|Anna|female整个计算流程为:Kafka SOURCE->UDTF->Realtime Compute->RDS SINK(单一分隔符可直接使用类csv格式模板,自定义适用于更复杂的数据类型,本说明只做参考)
250-
251-
**SQL**
252197
```
253-
-- 定义解析Kakfa message的UDTF
254-
CREATE FUNCTION kafkapaser AS 'com.XXXX.kafkaUDTF';
255-
CREATE FUNCTION kafkaUDF AS 'com.XXXX.kafkaUDF';
256-
-- 定义源表,注意:kafka源表DDL字段必须与以下例子一模一样。WITH中参数可改。
257-
create table kafka_src (
258-
_topic STRING,
259-
_messageKey STRING,
260-
_message STRING,
261-
_partition INT,
262-
_offset BIGINT,
263-
ctime AS TO_TIMESTAMP(kafkaUDF(_message)), -- 定义计算列,计算列可理解为占位符,源表中并没有这一列,其中的数据可经过下游计算得出。注意计算里的类型必须为timestamp才能在做watermark。
264-
watermark for ctime as withoffset(ctime,0) -- 在计算列上定义watermark
265-
) WITH (
266-
type = 'kafka010', -- Kafka Source类型,与Kafka版本强相关,目前支持的Kafka版本请参考本文档
267-
topic = 'test_kafka_topic',
268-
...
269-
);
270-
create table rds_sink (
271-
name VARCHAR,
272-
age INT,
273-
grade VARCHAR,
274-
updateTime TIMESTAMP
275-
) WITH(
276-
type='mysql',
277-
url='jdbc:mysql://localhost:3306/test',
278-
tableName='test4',
279-
userName='test',
280-
password='XXXXXX'
198+
CREATE TABLE MyTable(
199+
channel varchar,
200+
pv varchar
201+
--xctime bigint
202+
)WITH(
203+
type='kafka',
204+
bootstrapServers='172.16.8.107:9092',
205+
groupId='mqTest01',
206+
offsetReset='latest',
207+
topic='mqTest01',
208+
parallelism ='1',
209+
topicIsPattern ='false',
210+
kafka.group.id='mqTest',
211+
sourceDataType ='avro',
212+
schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"},{"name":"pv","type":"string"}]}'
281213
);
282-
-- 使用UDTF,将二进制数据解析成格式化数据
283-
CREATE VIEW input_view (
284-
name,
285-
age,
286-
grade,
287-
updateTime
288-
) AS
289-
SELECT
290-
COUNT(*) as cnt,
291-
T.ctime,
292-
T.order,
293-
T.name,
294-
T.sex
295-
from
296-
kafka_src as S,
297-
LATERAL TABLE (kafkapaser _message)) as T (
298-
ctime,
299-
order,
300-
name,
301-
sex
302-
)
303-
Group BY T.sex,
304-
TUMBLE(ctime, INTERVAL '1' MINUTE);
305-
-- 对input_view中输出的数据做计算
306-
CREATE VIEW view2 (
307-
cnt,
308-
sex
309-
) AS
310-
SELECT
311-
COUNT(*) as cnt,
312-
T.sex
313-
from
314-
input_view
315-
Group BY sex, TUMBLE(ctime, INTERVAL '1' MINUTE);
316-
-- 使用解析出的格式化数据进行计算,并将结果输出到RDS中
317-
insert into rds_sink
318-
SELECT
319-
cnt,sex
320-
from view2;
321-
```
322-
**UDF&UDTF**
214+
323215
```
324-
package com.XXXX;
325-
import com.XXXX.fastjson.JSONObject;
326-
import org.apache.flink.table.functions.TableFunction;
327-
import org.apache.flink.table.types.DataType;
328-
import org.apache.flink.table.types.DataTypes;
329-
import org.apache.flink.types.Row;
330-
import java.io.UnsupportedEncodingException;
331-
/**
332-
以下例子解析输入Kafka中的JSON字符串,并将其格式化输出
333-
**/
334-
public class kafkaUDTF extends TableFunction<Row> {
335-
public void eval(byte[] message) {
336-
try {
337-
// 读入一个二进制数据,并将其转换为String格式
338-
String msg = new String(message, "UTF-8");
339-
// 提取JSON Object中各字段
340-
String ctime = Timestamp.valueOf(data.split('\\|')[0]);
341-
String order = data.split('\\|')[1];
342-
String name = data.split('\\|')[2];
343-
String sex = data.split('\\|')[3];
344-
// 将解析出的字段放到要输出的Row()对象
345-
Row row = new Row(4);
346-
row.setField(0, ctime);
347-
row.setField(1, age);
348-
row.setField(2, grade);
349-
row.setField(3, updateTime);
350-
System.out.println("Kafka message str ==>" + row.toString());
351-
// 输出一行
352-
collect(row);
353-
} catch (ClassCastException e) {
354-
System.out.println("Input data format error. Input data " + msg + "is not json string");
355-
}
356-
} catch (UnsupportedEncodingException e) {
357-
e.printStackTrace();
358-
}
359-
}
360-
@Override
361-
// 如果返回值是Row,就必须重载实现这个方法,显式地告诉系统返回的字段类型
362-
// 定义输出Row()对象的字段类型
363-
public DataType getResultType(Object[] arguments, Class[] argTypes) {
364-
return DataTypes.createRowType(DataTypes.TIMESTAMP,DataTypes.STRING, DataTypes.Integer, DataTypes.STRING,DataTypes.STRING);
365-
}
366-
}
367-
368-
package com.dp58;
369-
package com.dp58.sql.udx;
370-
import org.apache.flink.table.functions.FunctionContext;
371-
import org.apache.flink.table.functions.ScalarFunction;
372-
public class KafkaUDF extends ScalarFunction {
373-
// 可选,open方法可以不写
374-
// 需要import org.apache.flink.table.functions.FunctionContext;
375-
public String eval(byte[] message) {
376-
// 读入一个二进制数据,并将其转换为String格式
377-
String msg = new String(message, "UTF-8");
378-
return msg.split('\\|')[0];
379-
}
380-
public long eval(String b, String c) {
381-
return eval(b) + eval(c);
382-
}
383-
//可选,close方法可以不写
384-
@Override
385-
public void close() {
386-
}
387-
}
388-
```
216+
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka;
20+
21+
import com.dtstack.flink.sql.source.IStreamSourceGener;
22+
import com.dtstack.flink.sql.source.kafka.enums.EKafkaOffset;
23+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24+
import com.dtstack.flink.sql.util.DtStringUtil;
25+
import com.dtstack.flink.sql.util.PluginUtil;
26+
import org.apache.commons.lang3.StringUtils;
27+
import org.apache.flink.api.common.typeinfo.TypeInformation;
28+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
30+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
31+
import org.apache.flink.table.api.Table;
32+
import org.apache.flink.types.Row;
33+
import org.apache.kafka.clients.consumer.ConsumerConfig;
34+
35+
import java.util.Map;
36+
import java.util.Properties;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.IntStream;
39+
40+
/**
41+
* Date: 2020/3/20
42+
* Company: www.dtstack.com
43+
* @author maqi
44+
*/
45+
public abstract class AbstractKafkaSource implements IStreamSourceGener<Table> {
46+
47+
private static final String SOURCE_OPERATOR_NAME_TPL = "${topic}_${table}";
48+
49+
protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInfo) {
50+
Properties props = new Properties();
51+
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSourceTableInfo.getBootstrapServers());
52+
53+
if (DtStringUtil.isJson(kafkaSourceTableInfo.getOffsetReset())) {
54+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EKafkaOffset.NONE.name().toLowerCase());
55+
} else {
56+
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceTableInfo.getOffsetReset());
57+
}
58+
59+
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) {
60+
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceTableInfo.getGroupId());
61+
}
62+
63+
for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
64+
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
65+
}
66+
return props;
67+
}
68+
69+
protected String generateOperatorName(String tabName, String topicName) {
70+
return SOURCE_OPERATOR_NAME_TPL.replace("${topic}", topicName).replace("${table}", tabName);
71+
}
72+
73+
protected TypeInformation<Row> getRowTypeInformation(KafkaSourceTableInfo kafkaSourceTableInfo) {
74+
Class<?>[] fieldClasses = kafkaSourceTableInfo.getFieldClasses();
75+
TypeInformation[] types = IntStream.range(0, fieldClasses.length)
76+
.mapToObj(i -> TypeInformation.of(fieldClasses[i]))
77+
.toArray(TypeInformation[]::new);
78+
79+
return new RowTypeInfo(types, kafkaSourceTableInfo.getFields());
80+
}
81+
82+
protected void setStartPosition(String offset, String topicName, FlinkKafkaConsumerBase<Row> kafkaSrc) {
83+
if (StringUtils.equalsIgnoreCase(offset, EKafkaOffset.EARLIEST.name())) {
84+
kafkaSrc.setStartFromEarliest();
85+
} else if (DtStringUtil.isJson(offset)) {
86+
Map<KafkaTopicPartition, Long> specificStartupOffsets = buildOffsetMap(offset, topicName);
87+
kafkaSrc.setStartFromSpecificOffsets(specificStartupOffsets);
88+
} else {
89+
kafkaSrc.setStartFromLatest();
90+
}
91+
}
92+
93+
/**
94+
* kafka offset,eg.. {"0":12312,"1":12321,"2":12312}
95+
* @param offsetJson
96+
* @param topicName
97+
* @return
98+
*/
99+
protected Map<KafkaTopicPartition, Long> buildOffsetMap(String offsetJson, String topicName) {
100+
try {
101+
Properties properties = PluginUtil.jsonStrToObject(offsetJson, Properties.class);
102+
Map<String, Object> offsetMap = PluginUtil.objectToMap(properties);
103+
Map<KafkaTopicPartition, Long> specificStartupOffsets = offsetMap
104+
.entrySet()
105+
.stream()
106+
.collect(Collectors.toMap(
107+
(Map.Entry<String, Object> entry) -> new KafkaTopicPartition(topicName, Integer.valueOf(entry.getKey())),
108+
(Map.Entry<String, Object> entry) -> Long.valueOf(entry.getValue().toString()))
109+
);
110+
111+
return specificStartupOffsets;
112+
} catch (Exception e) {
113+
throw new RuntimeException("not support offsetReset type:" + offsetJson);
114+
}
115+
}
116+
117+
}

0 commit comments

Comments
 (0)