1
-
2
- ## 1.格式:
3
- All:
4
- ```
5
- create table sideTable(
6
- id int,
7
- tablename1 VARCHAR,
8
- PRIMARY KEY(id),
9
- PERIOD FOR SYSTEM_TIME
10
- )WITH(
11
- type='kudu',
12
- kuduMasters ='ip1,ip2,ip3',
13
- tableName ='impala::default.testSide',
14
- cache ='ALL',
15
- primaryKey='id,xx',
16
- lowerBoundPrimaryKey='10,xx',
17
- upperBoundPrimaryKey='15,xx',
18
- workerCount='1',
19
- defaultOperationTimeoutMs='600000',
20
- defaultSocketReadTimeoutMs='6000000',
21
- batchSizeBytes='100000000',
22
- limitNum='1000',
23
- isFaultTolerant='false',
24
- partitionedJoin='false'
25
- );
26
- ```
27
- LRU:
1
+ ## 1.格式
2
+ 通过建表语句中的` PERIOD FOR SYSTEM_TIME ` 将表标识为维表,其中` PRIMARY KEY(keyInfo) ` 中的keyInfo,表示用来和源表进行关联的字段,
3
+ 维表JOIN的条件必须与` keyInfo ` 字段一致。
28
4
```
29
- create table sideTable (
30
- id int ,
31
- tablename1 VARCHAR,
32
- PRIMARY KEY(id) ,
5
+ CREATE TABLE tableName (
6
+ colName cloType ,
7
+ ...
8
+ PRIMARY KEY(colName1,colName2) ,
33
9
PERIOD FOR SYSTEM_TIME
34
- )WITH(
35
- type='kudu',
10
+ )WITH(
11
+ type ='kudu',
36
12
kuduMasters ='ip1,ip2,ip3',
37
13
tableName ='impala::default.testSide',
38
- cache ='LRU',
14
+ primaryKey='id,xx',
15
+ lowerBoundPrimaryKey='10,xx',
16
+ upperBoundPrimaryKey='15,xx',
39
17
workerCount='1',
40
18
defaultOperationTimeoutMs='600000',
41
19
defaultSocketReadTimeoutMs='6000000',
42
20
batchSizeBytes='100000000',
43
21
limitNum='1000',
44
22
isFaultTolerant='false',
23
+ cache ='LRU',
24
+ cacheSize ='10000',
25
+ cacheTTLMs ='60000',
26
+ parallelism ='1',
45
27
partitionedJoin='false'
46
- );
47
- ```
48
-
28
+ );
29
+ ```
49
30
## 2.支持版本
50
- kudu 1.10.0+cdh6.2.0
31
+ kudu 1.10.0+cdh6.2.0
51
32
52
33
## 3.表结构定义
53
34
54
35
| 参数名称| 含义|
55
36
| ----| ---|
56
37
| tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)|
57
38
| colName | 列名称|
58
- | colType | 列类型 [ colType支持的类型] ( docs/ colType.md) |
39
+ | colType | 列类型 [ colType支持的类型] ( colType.md ) |
59
40
| PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息|
60
41
| PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开|
61
42
62
- ## 3 .参数
43
+ ## 4 .参数
63
44
45
+ 参数详细说明请看[ 参数详细说明] ( ./sideParams.md )
64
46
65
47
| 参数名称| 含义| 是否必填| 默认值|
66
48
| ----| ---| ---| -----|
@@ -78,17 +60,25 @@ kudu 1.10.0+cdh6.2.0
78
60
| isFaultTolerant | 查询是否容错 查询失败是否扫描第二个副本 默认false 容错 | 否||
79
61
| cache | 维表缓存策略(NONE/LRU/ALL)| 否| NONE|
80
62
| partitionedJoin | 是否在維表join之前先根据 設定的key 做一次keyby操作(可以減少维表的数据缓存量)| 否| false|
81
-
82
-
83
63
--------------
84
- > 缓存策略
85
- * NONE: 不做内存缓存
86
- * LRU:
87
- * cacheSize: 缓存的条目数量
88
- * cacheTTLMs:缓存的过期时间(ms)
89
64
90
- ## 4.样例
91
- All:
65
+ ## 5.样例
66
+ ### LRU维表示例
67
+ ```
68
+ create table sideTable(
69
+ id int,
70
+ tablename1 VARCHAR,
71
+ PRIMARY KEY(id),
72
+ PERIOD FOR SYSTEM_TIME
73
+ )WITH(
74
+ type='kudu',
75
+ kuduMasters ='ip1,ip2,ip3',
76
+ tableName ='impala::default.testSide',
77
+ cache ='LRU',
78
+ partitionedJoin='false'
79
+ );
80
+ ```
81
+ ### ALL维表示例
92
82
```
93
83
create table sideTable(
94
84
id int,
@@ -106,19 +96,70 @@ create table sideTable(
106
96
partitionedJoin='false'
107
97
);
108
98
```
109
- LRU:
99
+
100
+ ## 6.kudu异步关联完整样例
101
+
110
102
```
111
- create table sideTable(
103
+ CREATE TABLE MyTable(
104
+ id bigint,
105
+ name varchar,
106
+ address varchar
107
+ )WITH(
108
+ type = 'kafka10',
109
+ bootstrapServers = '172.16.101.224:9092',
110
+ zookeeperQuorm = '172.16.100.188:2181/kafka',
111
+ offsetReset = 'latest',
112
+ topic = 'tiezhu_test_in2',
113
+ timezone = 'Asia/Shanghai',
114
+ topicIsPattern = 'false',
115
+ parallelism = '1'
116
+ );
117
+
118
+ CREATE TABLE sideTable(
112
119
id int,
113
- tablename1 VARCHAR ,
120
+ message varchar ,
114
121
PRIMARY KEY(id),
115
122
PERIOD FOR SYSTEM_TIME
116
- )WITH(
123
+ )WITH(
117
124
type='kudu',
118
125
kuduMasters ='ip1,ip2,ip3',
119
126
tableName ='impala::default.testSide',
120
127
cache ='LRU',
121
128
partitionedJoin='false'
122
- );
123
- ```
129
+ );
130
+
131
+ CREATE TABLE MyResult(
132
+ id bigint,
133
+ name varchar,
134
+ address varchar,
135
+ message varchar
136
+ )WITH(
137
+ type ='console',
138
+ address ='192.168.80.106:9042,192.168.80.107:9042',
139
+ userName='cassandra',
140
+ password='cassandra',
141
+ database ='tiezhu',
142
+ tableName ='stu_out',
143
+ parallelism ='1'
144
+ );
145
+
146
+ insert
147
+ into
148
+ MyResult
149
+ select
150
+ t1.id AS id,
151
+ t1.name AS name,
152
+ t1.address AS address,
153
+ t2.message AS message
154
+ from(
155
+ select
156
+ id,
157
+ name,
158
+ address
159
+ from
160
+ MyTable
161
+ ) t1
162
+ join sideTable t2
163
+ on t1.id = t2.id;
164
+ ```
124
165
0 commit comments