Skip to content

Commit 41989a8

Browse files
libailinzoudaokoulife
authored andcommitted
[Feature-#1851][socket] Support reading data from socket connector in sql mode
1 parent 3a72ca9 commit 41989a8

File tree

7 files changed

+543
-3
lines changed

7 files changed

+543
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.chunjun.connector.socket.converter;
20+
21+
import com.dtstack.chunjun.converter.AbstractRowConverter;
22+
import com.dtstack.chunjun.converter.IDeserializationConverter;
23+
import com.dtstack.chunjun.converter.ISerializationConverter;
24+
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
25+
import com.dtstack.chunjun.util.ExternalDataUtil;
26+
import com.dtstack.chunjun.util.GsonUtil;
27+
28+
import org.apache.flink.table.data.*;
29+
import org.apache.flink.table.types.logical.*;
30+
31+
import java.math.BigDecimal;
32+
import java.math.BigInteger;
33+
import java.sql.Array;
34+
import java.sql.Date;
35+
import java.sql.Time;
36+
import java.sql.Timestamp;
37+
import java.time.LocalDate;
38+
import java.time.LocalTime;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.Random;
43+
44+
public class SocketSqlConverter
45+
extends AbstractRowConverter<RowData, RowData, RowData, LogicalType> {
46+
47+
private static final long serialVersionUID = 6652637680662065910L;
48+
49+
private final Random random = new Random();
50+
51+
public SocketSqlConverter(RowType rowType) {
52+
super(rowType);
53+
List<RowType.RowField> fields = rowType.getFields();
54+
for (RowType.RowField field : fields) {
55+
toInternalConverters.add(
56+
wrapIntoNullableInternalConverter(createInternalConverter(field)));
57+
toExternalConverters.add(
58+
wrapIntoNullableExternalConverter(
59+
createExternalConverter(field), field.getType()));
60+
}
61+
}
62+
63+
@Override
64+
protected ISerializationConverter<GenericRowData> wrapIntoNullableExternalConverter(
65+
ISerializationConverter serializationConverter, LogicalType type) {
66+
return (val, index, rowData) -> {
67+
if (val == null
68+
|| val.isNullAt(index)
69+
|| LogicalTypeRoot.NULL.equals(type.getTypeRoot())) {
70+
rowData.setField(index, null);
71+
} else {
72+
serializationConverter.serialize(val, index, rowData);
73+
}
74+
};
75+
}
76+
77+
@Override
78+
public RowData toInternal(RowData input) throws Exception {
79+
GenericRowData row = new GenericRowData(input.getArity());
80+
if (input instanceof GenericRowData) {
81+
GenericRowData genericRowData = (GenericRowData) input;
82+
for (int i = 0; i < input.getArity(); i++) {
83+
row.setField(
84+
i, toInternalConverters.get(i).deserialize(genericRowData.getField(i)));
85+
}
86+
} else {
87+
throw new ChunJunRuntimeException(
88+
"Error RowData type, RowData:["
89+
+ input
90+
+ "] should be instance of GenericRowData.");
91+
}
92+
return row;
93+
}
94+
95+
@Override
96+
public RowData toExternal(RowData rowData, RowData output) throws Exception {
97+
for (int index = 0; index < fieldTypes.length; index++) {
98+
toExternalConverters.get(index).serialize(rowData, index, output);
99+
}
100+
return output;
101+
}
102+
103+
protected IDeserializationConverter createInternalConverter(RowType.RowField rowField) {
104+
LogicalType type = rowField.getType();
105+
switch (type.getTypeRoot()) {
106+
case NULL:
107+
return val -> null;
108+
case BOOLEAN:
109+
return val -> val == null ? null : Boolean.valueOf(String.valueOf(val));
110+
case FLOAT:
111+
return val -> val == null ? null : new Float(String.valueOf(val));
112+
case DOUBLE:
113+
return val -> val == null ? null : new Double(String.valueOf(val));
114+
case INTERVAL_YEAR_MONTH:
115+
case INTERVAL_DAY_TIME:
116+
return val -> val == null ? null : Time.valueOf(String.valueOf(val));
117+
case INTEGER:
118+
return val -> val == null ? null : new Integer(String.valueOf(val));
119+
case BIGINT:
120+
return val -> val == null ? null : new Long(String.valueOf(val));
121+
case TINYINT:
122+
return val -> val == null ? null : new Integer(String.valueOf(val)).byteValue();
123+
case SMALLINT:
124+
// Converter for small type that casts value to int and then return short value,
125+
// since
126+
// JDBC 1.0 use int type for small values.
127+
return val -> val == null ? null : new Integer(String.valueOf(val)).shortValue();
128+
case DECIMAL:
129+
final int precision = ((DecimalType) type).getPrecision();
130+
final int scale = ((DecimalType) type).getScale();
131+
// using decimal(20, 0) to support db type bigint unsigned, user should define
132+
// decimal(20, 0) in SQL,
133+
// but other precision like decimal(30, 0) can work too from lenient consideration.
134+
return val ->
135+
val == null
136+
? null
137+
: val instanceof BigInteger
138+
? DecimalData.fromBigDecimal(
139+
new BigDecimal((BigInteger) val, 0),
140+
precision,
141+
scale)
142+
: DecimalData.fromBigDecimal(
143+
new BigDecimal(String.valueOf(val)),
144+
precision,
145+
scale);
146+
case DATE:
147+
return val ->
148+
val == null
149+
? null
150+
: (int)
151+
((Date.valueOf(String.valueOf(val)))
152+
.toLocalDate()
153+
.toEpochDay());
154+
case TIME_WITHOUT_TIME_ZONE:
155+
return val ->
156+
val == null
157+
? null
158+
: (int)
159+
((Time.valueOf(String.valueOf(val)))
160+
.toLocalTime()
161+
.toNanoOfDay()
162+
/ 1_000_000L);
163+
case TIMESTAMP_WITH_TIME_ZONE:
164+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
165+
case TIMESTAMP_WITHOUT_TIME_ZONE:
166+
return val ->
167+
val == null
168+
? null
169+
: TimestampData.fromTimestamp(
170+
Timestamp.valueOf(String.valueOf(val)));
171+
case CHAR:
172+
case VARCHAR:
173+
return val -> val == null ? null : StringData.fromString(val.toString());
174+
case BINARY:
175+
case VARBINARY:
176+
return val -> val == null ? null : (byte[]) val;
177+
case ARRAY:
178+
return (val) -> {
179+
Array val1 = (Array) val;
180+
Object[] array = (Object[]) val1.getArray();
181+
Object[] result = new Object[array.length];
182+
LogicalType logicalType = type.getChildren().get(0);
183+
RowType.RowField rowField1 = new RowType.RowField("", logicalType, "");
184+
IDeserializationConverter internalConverter =
185+
createInternalConverter(rowField1);
186+
for (int i = 0; i < array.length; i++) {
187+
Object value = internalConverter.deserialize(array[i]);
188+
result[i] = value;
189+
}
190+
return new GenericArrayData(result);
191+
};
192+
case ROW:
193+
return val -> {
194+
List<RowType.RowField> childrenFields = ((RowType) type).getFields();
195+
HashMap childrenData = GsonUtil.GSON.fromJson(val.toString(), HashMap.class);
196+
GenericRowData genericRowData = new GenericRowData(childrenFields.size());
197+
for (int i = 0; i < childrenFields.size(); i++) {
198+
Object value =
199+
createInternalConverter(childrenFields.get(i))
200+
.deserialize(
201+
childrenData.get(childrenFields.get(i).getName()));
202+
genericRowData.setField(i, value);
203+
}
204+
return genericRowData;
205+
};
206+
case MAP:
207+
return val -> {
208+
if (val == null) {
209+
return null;
210+
}
211+
HashMap<Object, Object> resultMap = new HashMap<>();
212+
Map map = GsonUtil.GSON.fromJson(val.toString(), Map.class);
213+
LogicalType keyType = ((MapType) type).getKeyType();
214+
LogicalType valueType = ((MapType) type).getValueType();
215+
RowType.RowField keyRowField = new RowType.RowField("", keyType, "");
216+
RowType.RowField valueRowField = new RowType.RowField("", valueType, "");
217+
IDeserializationConverter keyInternalConverter =
218+
createInternalConverter(keyRowField);
219+
IDeserializationConverter valueInternalConverter =
220+
createInternalConverter(valueRowField);
221+
for (Object key : map.keySet()) {
222+
resultMap.put(
223+
keyInternalConverter.deserialize(key),
224+
valueInternalConverter.deserialize(map.get(key)));
225+
}
226+
227+
return new GenericMapData(resultMap);
228+
};
229+
case RAW:
230+
default:
231+
throw new UnsupportedOperationException("Unsupported type:" + type);
232+
}
233+
}
234+
235+
protected ISerializationConverter<GenericRowData> createExternalConverter(
236+
RowType.RowField rowField) {
237+
LogicalType type = rowField.getType();
238+
switch (type.getTypeRoot()) {
239+
case BOOLEAN:
240+
return (val, index, rowData) -> rowData.setField(index, val.getBoolean(index));
241+
case TINYINT:
242+
return (val, index, rowData) -> rowData.setField(index, val.getByte(index));
243+
case SMALLINT:
244+
return (val, index, rowData) -> rowData.setField(index, val.getShort(index));
245+
case INTEGER:
246+
case INTERVAL_YEAR_MONTH:
247+
return (val, index, rowData) -> rowData.setField(index, val.getInt(index));
248+
case BIGINT:
249+
case INTERVAL_DAY_TIME:
250+
return (val, index, rowData) -> rowData.setField(index, val.getLong(index));
251+
case FLOAT:
252+
return (val, index, rowData) -> rowData.setField(index, val.getFloat(index));
253+
case DOUBLE:
254+
return (val, index, rowData) -> rowData.setField(index, val.getDouble(index));
255+
case CHAR:
256+
case VARCHAR:
257+
// value is BinaryString
258+
return (val, index, rowData) ->
259+
rowData.setField(index, val.getString(index).toString());
260+
case BINARY:
261+
case VARBINARY:
262+
return (val, index, rowData) -> rowData.setField(index, val.getBinary(index));
263+
case DATE:
264+
return (val, index, rowData) ->
265+
rowData.setField(
266+
index, Date.valueOf(LocalDate.ofEpochDay(val.getInt(index))));
267+
case TIME_WITHOUT_TIME_ZONE:
268+
return (val, index, rowData) ->
269+
rowData.setField(
270+
index,
271+
Time.valueOf(
272+
LocalTime.ofNanoOfDay(val.getInt(index) * 1_000_000L)));
273+
case TIMESTAMP_WITH_TIME_ZONE:
274+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
275+
case TIMESTAMP_WITHOUT_TIME_ZONE:
276+
int timestampPrecision;
277+
if (type instanceof LocalZonedTimestampType) {
278+
timestampPrecision = ((LocalZonedTimestampType) type).getPrecision();
279+
} else {
280+
timestampPrecision = ((TimestampType) type).getPrecision();
281+
}
282+
return (val, index, rowData) ->
283+
rowData.setField(
284+
index, val.getTimestamp(index, timestampPrecision).toTimestamp());
285+
case DECIMAL:
286+
final int decimalPrecision = ((DecimalType) type).getPrecision();
287+
final int decimalScale = ((DecimalType) type).getScale();
288+
return (val, index, rowData) ->
289+
rowData.setField(
290+
index,
291+
val.getDecimal(index, decimalPrecision, decimalScale)
292+
.toBigDecimal());
293+
case ARRAY:
294+
return (val, index, rowData) -> {
295+
ArrayData array = val.getArray(index);
296+
Object[] obj = new Object[array.size()];
297+
ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, array);
298+
rowData.setField(index, GsonUtil.GSON.toJson(obj));
299+
};
300+
case MAP:
301+
return (val, index, rowData) -> {
302+
MapData map = val.getMap(index);
303+
Map<Object, Object> resultMap = new HashMap<>();
304+
ExternalDataUtil.mapDataToExternal(
305+
map,
306+
((MapType) type).getKeyType(),
307+
((MapType) type).getValueType(),
308+
resultMap);
309+
rowData.setField(index, GsonUtil.GSON.toJson(resultMap));
310+
};
311+
case MULTISET:
312+
return (val, index, rowData) -> {
313+
MapData map = val.getMap(index);
314+
ArrayData arrayData = map.keyArray();
315+
Object[] obj = new Object[arrayData.size()];
316+
ExternalDataUtil.arrayDataToExternal(type.getChildren().get(0), obj, arrayData);
317+
rowData.setField(index, GsonUtil.GSON.toJson(obj));
318+
};
319+
case ROW:
320+
return (val, index, rowData) -> {
321+
List<RowType.RowField> fields = ((RowType) type).getFields();
322+
HashMap<String, Object> map = new HashMap<>();
323+
for (int i = 0; i < fields.size(); i++) {
324+
ExternalDataUtil.rowDataToExternal(
325+
val.getRow(index, fields.size()),
326+
i,
327+
fields.get(i).getType(),
328+
map,
329+
fields.get(i).getName());
330+
}
331+
rowData.setField(index, GsonUtil.GSON.toJson(map));
332+
};
333+
case RAW:
334+
default:
335+
throw new UnsupportedOperationException("Unsupported type:" + type);
336+
}
337+
}
338+
}

chunjun-connectors/chunjun-connector-socket/src/main/java/com/dtstack/chunjun/connector/socket/inputformat/SocketInputFormat.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,19 @@ protected RowData nextRecordInternal(RowData row) throws ReadRecordException {
8181
try {
8282
row = queue.take();
8383
// 设置特殊字符串,作为失败标志
84-
if (StringUtils.startsWith((String) ((GenericRowData) row).getField(0), KEY_EXIT0)) {
84+
if (StringUtils.startsWith(
85+
String.valueOf(((GenericRowData) row).getField(0)), KEY_EXIT0)) {
8586
throw new ReadRecordException(
8687
"socket client lost connection completely, job failed "
8788
+ ((GenericRowData) row).getField(0),
8889
new Exception("receive data error"));
8990
}
91+
row = rowConverter.toInternal(row);
9092
} catch (InterruptedException e) {
9193
log.error("takeEvent interrupted error: {}", ExceptionUtil.getErrorMessage(e));
9294
throw new ReadRecordException(row.toString(), e);
95+
} catch (Exception e) {
96+
throw new ReadRecordException("", e, 0, row);
9397
}
9498
return row;
9599
}

chunjun-connectors/chunjun-connector-socket/src/main/java/com/dtstack/chunjun/connector/socket/inputformat/SocketInputFormatBuilder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
public class SocketInputFormatBuilder extends BaseRichInputFormatBuilder<SocketInputFormat> {
3030

31-
protected SocketInputFormat format;
32-
3331
protected SocketConfig socketConfig;
3432

3533
private static final int ADDRESS_SPLITS = 2;

0 commit comments

Comments
 (0)