Skip to content

Commit e04f99b

Browse files
author
dapeng
committed
hbase 结果表支持 rowKey
1 parent 1fc0d53 commit e04f99b

File tree

7 files changed

+419
-23
lines changed

7 files changed

+419
-23
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
24+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceOpType;
25+
26+
import java.util.Map;
27+
28+
/**
29+
* Reason:
30+
* Date: 2018/8/23
31+
* Company: www.dtstack.com
32+
* @author xuchao
33+
*/
34+
35+
public abstract class AbstractReplaceOperator {
36+
37+
private EReplaceOpType opType;
38+
39+
public AbstractReplaceOperator(EReplaceOpType opType){
40+
this.opType = opType;
41+
}
42+
43+
public String doOperator(Map<String, Object> refData){
44+
String replaceStr = replaceStr(refData);
45+
return doFunc(replaceStr);
46+
}
47+
48+
public String replaceStr(Map<String, Object> refData){
49+
return "";
50+
}
51+
52+
/**
53+
* The processing function to provide custom
54+
* @param replaceStr
55+
* @return
56+
*/
57+
abstract String doFunc(String replaceStr);
58+
}

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseOutputFormat.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.dtstack.flink.sql.enums.EUpdateMode;
2424
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
2525
import com.google.common.collect.Lists;
26+
import com.google.common.collect.Maps;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.flink.api.java.tuple.Tuple2;
2829
import org.apache.flink.configuration.Configuration;
@@ -40,7 +41,6 @@
4041
import org.slf4j.LoggerFactory;
4142

4243
import java.io.IOException;
43-
import java.text.SimpleDateFormat;
4444
import java.util.List;
4545
import java.util.Map;
4646
import java.util.Set;
@@ -69,8 +69,6 @@ public class HbaseOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
6969
private transient Connection conn;
7070
private transient Table table;
7171

72-
public final SimpleDateFormat ROWKEY_DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss");
73-
7472
@Override
7573
public void configure(Configuration parameters) {
7674
LOG.warn("---configure---");
@@ -178,30 +176,23 @@ private String buildRowKey(Row record) {
178176

179177
private List<String> getRowKeyValues(Row record) {
180178
List<String> rowKeyValues = Lists.newArrayList();
181-
for (int i = 0; i < rowkey.length; ++i) {
182-
String colName = rowkey[i];
183-
int rowKeyIndex = 0;
184-
for (; rowKeyIndex < columnNames.length; ++rowKeyIndex) {
185-
if (columnNames[rowKeyIndex].equals(colName)) {
186-
break;
187-
}
188-
}
189-
190-
if (rowKeyIndex != columnNames.length && record.getField(rowKeyIndex) != null) {
191-
Object field = record.getField(rowKeyIndex);
192-
if (field == null) {
193-
continue;
194-
} else if (field instanceof java.util.Date) {
195-
java.util.Date d = (java.util.Date) field;
196-
rowKeyValues.add(ROWKEY_DATE_FORMAT.format(d));
197-
} else {
198-
rowKeyValues.add(field.toString());
199-
}
200-
}
179+
Map<String, Object> row = rowConvertMap(record);
180+
for (String key : rowkey) {
181+
RowKeyBuilder rowKeyBuilder = new RowKeyBuilder();
182+
rowKeyBuilder.init(key);
183+
rowKeyValues.add(rowKeyBuilder.getRowKey(row));
201184
}
202185
return rowKeyValues;
203186
}
204187

188+
private Map<String, Object> rowConvertMap(Row record){
189+
Map<String, Object> rowValue = Maps.newHashMap();
190+
for(int i = 0; i < columnNames.length; i++){
191+
rowValue.put(columnNames[i], record.getField(i));
192+
}
193+
return rowValue;
194+
}
195+
205196
@Override
206197
public void close() throws IOException {
207198
if (conn != null) {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceOpType;
24+
import com.dtstack.flink.sql.util.MD5Utils;
25+
26+
/**
27+
* Reason:
28+
* Date: 2018/8/23
29+
* Company: www.dtstack.com
30+
* @author xuchao
31+
*/
32+
33+
public class Md5ReplaceOperator extends AbstractReplaceOperator {
34+
35+
public Md5ReplaceOperator(EReplaceOpType opType) {
36+
super(opType);
37+
}
38+
39+
@Override
40+
String doFunc(String replaceStr) {
41+
return MD5Utils.getMD5String(replaceStr);
42+
}
43+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
24+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceType;
25+
26+
import java.io.Serializable;
27+
import java.util.List;
28+
29+
/**
30+
* Reason:
31+
* Date: 2018/8/23
32+
* Company: www.dtstack.com
33+
*
34+
* @author xuchao
35+
*/
36+
37+
public class ReplaceInfo implements Serializable {
38+
39+
private static final long serialVersionUID = 2058635242957737717L;
40+
41+
private EReplaceType type;
42+
43+
private String param;
44+
45+
private List<ReplaceInfo> subReplaceInfos;
46+
47+
public ReplaceInfo(EReplaceType type){
48+
this.type = type;
49+
}
50+
51+
public EReplaceType getType() {
52+
return type;
53+
}
54+
55+
public void setType(EReplaceType type) {
56+
this.type = type;
57+
}
58+
59+
public String getParam() {
60+
return param;
61+
}
62+
63+
public void setParam(String param) {
64+
this.param = param;
65+
}
66+
67+
public List<ReplaceInfo> getSubReplaceInfos() {
68+
return subReplaceInfos;
69+
}
70+
71+
public void setSubReplaceInfos(List<ReplaceInfo> subReplaceInfos) {
72+
this.subReplaceInfos = subReplaceInfos;
73+
}
74+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
20+
21+
package com.dtstack.flink.sql.sink.hbase;
22+
23+
import com.dtstack.flink.sql.sink.hbase.enums.EReplaceType;
24+
import com.dtstack.flink.sql.util.MD5Utils;
25+
import com.google.common.collect.Lists;
26+
import org.apache.commons.collections.CollectionUtils;
27+
28+
import javax.xml.crypto.Data;
29+
import java.io.Serializable;
30+
import java.text.SimpleDateFormat;
31+
import java.util.Date;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.regex.Matcher;
35+
import java.util.regex.Pattern;
36+
37+
/**
38+
* rowkey rule
39+
* Date: 2018/8/23
40+
* Company: www.dtstack.com
41+
* @author xuchao
42+
*/
43+
44+
public class RowKeyBuilder implements Serializable{
45+
46+
private static final long serialVersionUID = 2058635242857937717L;
47+
48+
private static Pattern Md5Operator = Pattern.compile("(?i)^md5\\(\\s*(.*)\\s*\\)$");
49+
50+
private List<ReplaceInfo> operatorChain = Lists.newArrayList();
51+
52+
public void init(String rowKeyTempl){
53+
operatorChain.addAll(makeFormula(rowKeyTempl));
54+
}
55+
56+
/**
57+
*
58+
* @param refData
59+
* @return
60+
*/
61+
public String getRowKey(Map<String, Object> refData){
62+
return buildStr(operatorChain, refData);
63+
}
64+
65+
66+
67+
private String buildStr(List<ReplaceInfo> fieldList, Map<String, Object> refData){
68+
if(CollectionUtils.isEmpty(fieldList)){
69+
return "";
70+
}
71+
StringBuffer sb = new StringBuffer("");
72+
for(ReplaceInfo replaceInfo : fieldList){
73+
74+
if(replaceInfo.getType() == EReplaceType.CONSTANT){
75+
sb.append(replaceInfo.getParam());
76+
continue;
77+
}
78+
79+
if(replaceInfo.getType() == EReplaceType.FUNC){
80+
sb.append(MD5Utils.getMD5String(buildStr(replaceInfo.getSubReplaceInfos(), refData)));
81+
continue;
82+
}
83+
String replaceName = replaceInfo.getParam();
84+
if(!refData.containsKey(replaceName)){
85+
throw new RuntimeException(String.format("build rowKey with field %s which value not found.", replaceName));
86+
}
87+
Object value = refData.get(replaceName);
88+
if(value instanceof Date){
89+
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
90+
java.util.Date d = (java.util.Date) value;
91+
sb.append(sdf.format(d));
92+
} else {
93+
sb.append(value);
94+
}
95+
}
96+
97+
return sb.toString();
98+
}
99+
100+
public static String[] splitIgnoreQuotaBrackets(String str, String delimiter){
101+
String splitPatternStr = delimiter + "(?![^()]*+\\))(?![^{}]*+})(?![^\\[\\]]*+\\])";
102+
return str.split(splitPatternStr);
103+
}
104+
105+
/**
106+
*
107+
* @param field
108+
* @return
109+
*/
110+
public ReplaceInfo getReplaceInfo(String field){
111+
112+
field = field.trim();
113+
if(field.length() <= 2){
114+
throw new RuntimeException(field + " \n" +
115+
"Format defined exceptions");
116+
}
117+
118+
//判断是不是常量==>''包裹的标识
119+
if(field.startsWith("'") && field.endsWith("'")){
120+
ReplaceInfo replaceInfo = new ReplaceInfo(EReplaceType.CONSTANT);
121+
field = field.substring(1, field.length() - 1);
122+
replaceInfo.setParam(field);
123+
return replaceInfo;
124+
}
125+
126+
ReplaceInfo replaceInfo = new ReplaceInfo(EReplaceType.PARAM);
127+
replaceInfo.setParam(field);
128+
return replaceInfo;
129+
}
130+
131+
private List<ReplaceInfo> makeFormula(String formula){
132+
if(formula == null || formula.length() <= 0){
133+
Lists.newArrayList();
134+
}
135+
List<ReplaceInfo> result = Lists.newArrayList();
136+
for(String meta: splitIgnoreQuotaBrackets(formula, "\\+")){
137+
Matcher matcher = Md5Operator.matcher(meta.trim());
138+
if(matcher.find()){
139+
ReplaceInfo replaceInfo = new ReplaceInfo(EReplaceType.FUNC);
140+
replaceInfo.setSubReplaceInfos(makeFormula(matcher.group(1)));
141+
result.add(replaceInfo);
142+
} else {
143+
result.add(getReplaceInfo(meta));
144+
}
145+
}
146+
return result;
147+
}
148+
}

0 commit comments

Comments
 (0)