Skip to content

Commit cf34ed6

Browse files
authored
Merge pull request #145 from /issues/136
[Fix] Adjust data conversion logic for compatibility with clickhouse-jdbc 0.6 #136
2 parents 7346c14 + 7cce8b1 commit cf34ed6

File tree

3 files changed

+78
-29
lines changed

3 files changed

+78
-29
lines changed

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseConverterUtils.java

+36-12
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,14 @@
2929
import org.apache.flink.table.types.logical.LogicalType;
3030
import org.apache.flink.table.types.logical.MapType;
3131

32+
import com.clickhouse.data.value.UnsignedByte;
33+
import com.clickhouse.data.value.UnsignedInteger;
34+
import com.clickhouse.data.value.UnsignedLong;
35+
import com.clickhouse.data.value.UnsignedShort;
36+
3237
import java.math.BigDecimal;
3338
import java.math.BigInteger;
39+
import java.net.InetAddress;
3440
import java.sql.Array;
3541
import java.sql.Date;
3642
import java.sql.SQLException;
@@ -39,8 +45,10 @@
3945
import java.time.LocalDate;
4046
import java.time.LocalDateTime;
4147
import java.time.LocalTime;
48+
import java.time.OffsetDateTime;
4249
import java.util.HashMap;
4350
import java.util.Map;
51+
import java.util.UUID;
4452

4553
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone;
4654
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp;
@@ -129,35 +137,51 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept
129137
case DOUBLE:
130138
case INTERVAL_YEAR_MONTH:
131139
case INTERVAL_DAY_TIME:
132-
case INTEGER:
133-
case BIGINT:
140+
case TINYINT:
134141
case BINARY:
135142
case VARBINARY:
136143
return value;
137-
case TINYINT:
138-
return ((Integer) value).byteValue();
139144
case SMALLINT:
140-
return value instanceof Integer ? ((Integer) value).shortValue() : value;
145+
return value instanceof UnsignedByte ? ((UnsignedByte) value).shortValue() : value;
146+
case INTEGER:
147+
return value instanceof UnsignedShort ? ((UnsignedShort) value).intValue() : value;
148+
case BIGINT:
149+
return value instanceof UnsignedInteger
150+
? ((UnsignedInteger) value).longValue()
151+
: value;
141152
case DECIMAL:
142153
final int precision = ((DecimalType) type).getPrecision();
143154
final int scale = ((DecimalType) type).getScale();
144-
return value instanceof BigInteger
145-
? DecimalData.fromBigDecimal(
146-
new BigDecimal((BigInteger) value, 0), precision, scale)
147-
: DecimalData.fromBigDecimal((BigDecimal) value, precision, scale);
155+
BigDecimal decimalValue =
156+
value instanceof BigDecimal
157+
? (BigDecimal) value
158+
: new BigDecimal(
159+
value instanceof UnsignedLong
160+
? ((UnsignedLong) value).bigIntegerValue()
161+
: (BigInteger) value);
162+
return DecimalData.fromBigDecimal(decimalValue, precision, scale);
148163
case DATE:
149-
return (int) (((Date) value).toLocalDate().toEpochDay());
164+
return (int) (((LocalDate) value).toEpochDay());
150165
case TIME_WITHOUT_TIME_ZONE:
151166
return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L);
152167
case TIMESTAMP_WITH_TIME_ZONE:
153168
case TIMESTAMP_WITHOUT_TIME_ZONE:
154-
return TimestampData.fromLocalDateTime((LocalDateTime) value);
169+
return TimestampData.fromLocalDateTime(
170+
value instanceof OffsetDateTime
171+
? ((OffsetDateTime) value).toLocalDateTime()
172+
: (LocalDateTime) value);
155173
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
156174
return TimestampData.fromInstant(
157175
((LocalDateTime) value).atZone(getFlinkTimeZone().toZoneId()).toInstant());
158176
case CHAR:
159177
case VARCHAR:
160-
return StringData.fromString((String) value);
178+
if (value instanceof UUID) {
179+
return StringData.fromString(value.toString());
180+
} else if (value instanceof InetAddress) {
181+
return StringData.fromString(((InetAddress) value).getHostAddress());
182+
} else {
183+
return StringData.fromString((String) value);
184+
}
161185
case ARRAY:
162186
LogicalType elementType =
163187
type.getChildren().stream()

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/converter/ClickHouseRowConverter.java

+40-16
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,17 @@
3131
import org.apache.flink.table.types.logical.TimestampType;
3232
import org.apache.flink.util.Preconditions;
3333

34+
import com.clickhouse.data.value.UnsignedByte;
35+
import com.clickhouse.data.value.UnsignedInteger;
36+
import com.clickhouse.data.value.UnsignedLong;
37+
import com.clickhouse.data.value.UnsignedShort;
3438
import com.clickhouse.jdbc.ClickHousePreparedStatement;
3539
import com.clickhouse.jdbc.ClickHouseResultSet;
3640

3741
import java.io.Serializable;
3842
import java.math.BigDecimal;
3943
import java.math.BigInteger;
44+
import java.net.InetAddress;
4045
import java.sql.Date;
4146
import java.sql.ResultSet;
4247
import java.sql.SQLException;
@@ -45,6 +50,7 @@
4550
import java.time.LocalDate;
4651
import java.time.LocalDateTime;
4752
import java.time.LocalTime;
53+
import java.time.OffsetDateTime;
4854
import java.util.UUID;
4955

5056
import static org.apache.flink.connector.clickhouse.internal.converter.ClickHouseConverterUtils.BOOL_TRUE;
@@ -109,30 +115,41 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
109115
case DOUBLE:
110116
case INTERVAL_YEAR_MONTH:
111117
case INTERVAL_DAY_TIME:
112-
case INTEGER:
113-
case BIGINT:
118+
case TINYINT:
114119
case BINARY:
115120
case VARBINARY:
116121
return val -> val;
117-
case TINYINT:
118-
return val -> ((Integer) val).byteValue();
119122
case SMALLINT:
120-
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
123+
return val -> val instanceof UnsignedByte ? ((UnsignedByte) val).shortValue() : val;
124+
case INTEGER:
125+
return val -> val instanceof UnsignedShort ? ((UnsignedShort) val).intValue() : val;
126+
case BIGINT:
127+
return val ->
128+
val instanceof UnsignedInteger ? ((UnsignedInteger) val).longValue() : val;
121129
case DECIMAL:
122130
final int precision = ((DecimalType) type).getPrecision();
123131
final int scale = ((DecimalType) type).getScale();
124-
return val ->
125-
val instanceof BigInteger
126-
? DecimalData.fromBigDecimal(
127-
new BigDecimal((BigInteger) val, 0), precision, scale)
128-
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
132+
return val -> {
133+
BigDecimal decimalValue =
134+
val instanceof BigDecimal
135+
? (BigDecimal) val
136+
: new BigDecimal(
137+
val instanceof UnsignedLong
138+
? ((UnsignedLong) val).bigIntegerValue()
139+
: (BigInteger) val);
140+
return DecimalData.fromBigDecimal(decimalValue, precision, scale);
141+
};
129142
case DATE:
130-
return val -> (int) ((Date) val).toLocalDate().toEpochDay();
143+
return val -> (int) ((LocalDate) val).toEpochDay();
131144
case TIME_WITHOUT_TIME_ZONE:
132145
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
133146
case TIMESTAMP_WITH_TIME_ZONE:
134147
case TIMESTAMP_WITHOUT_TIME_ZONE:
135-
return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
148+
return val ->
149+
TimestampData.fromLocalDateTime(
150+
val instanceof OffsetDateTime
151+
? ((OffsetDateTime) val).toLocalDateTime()
152+
: (LocalDateTime) val);
136153
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
137154
return val ->
138155
TimestampData.fromInstant(
@@ -141,10 +158,15 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
141158
.toInstant());
142159
case CHAR:
143160
case VARCHAR:
144-
return val ->
145-
val instanceof UUID
146-
? StringData.fromString(val.toString())
147-
: StringData.fromString((String) val);
161+
return val -> {
162+
if (val instanceof UUID) {
163+
return StringData.fromString(val.toString());
164+
} else if (val instanceof InetAddress) {
165+
return StringData.fromString(((InetAddress) val).getHostAddress());
166+
} else {
167+
return StringData.fromString((String) val);
168+
}
169+
};
148170
case ARRAY:
149171
case MAP:
150172
return val -> ClickHouseConverterUtils.toInternal(val, type);
@@ -242,6 +264,7 @@ private SerializationConverter createToExternalConverter(LogicalType type) {
242264

243265
@FunctionalInterface
244266
interface SerializationConverter extends Serializable {
267+
245268
/**
246269
* Convert an internal field to java object and fill into the {@link
247270
* ClickHousePreparedStatement}.
@@ -252,6 +275,7 @@ void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement)
252275

253276
@FunctionalInterface
254277
interface DeserializationConverter extends Serializable {
278+
255279
/**
256280
* Convert an object of {@link ClickHouseResultSet} to the internal data structure object.
257281
*/

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/util/DataTypeUtil.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
4141
switch (clickHouseColumnInfo.getDataType()) {
4242
case Int8:
4343
return DataTypes.TINYINT();
44-
case Int16:
4544
case Bool:
4645
return DataTypes.BOOLEAN();
46+
case Int16:
4747
case UInt8:
4848
return DataTypes.SMALLINT();
4949
case Int32:
@@ -92,6 +92,7 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
9292
case UUID:
9393
return DataTypes.VARCHAR(clickHouseColumnInfo.getPrecision());
9494
case Date:
95+
case Date32:
9596
return DataTypes.DATE();
9697
case DateTime:
9798
case DateTime32:

0 commit comments

Comments
 (0)