Skip to content

Commit be72511

Browse files
authored
[improve] auto create Doris table for new MongoDB collections in Data (#573)
1 parent 4e41752 commit be72511

File tree

9 files changed

+404
-59
lines changed

9 files changed

+404
-59
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java

+8-38
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@
2828
import org.apache.flink.util.Preconditions;
2929
import org.apache.flink.util.StringUtils;
3030

31-
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
3231
import org.apache.doris.flink.catalog.doris.DorisSystem;
3332
import org.apache.doris.flink.catalog.doris.TableSchema;
3433
import org.apache.doris.flink.cfg.DorisConnectionOptions;
3534
import org.apache.doris.flink.cfg.DorisExecutionOptions;
3635
import org.apache.doris.flink.cfg.DorisOptions;
3736
import org.apache.doris.flink.cfg.DorisReadOptions;
38-
import org.apache.doris.flink.exception.DorisSystemException;
3937
import org.apache.doris.flink.sink.DorisSink;
4038
import org.apache.doris.flink.sink.schema.SchemaChangeMode;
4139
import org.apache.doris.flink.sink.writer.WriteMode;
@@ -44,12 +42,12 @@
4442
import org.apache.doris.flink.table.DorisConfigOptions;
4543
import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
4644
import org.apache.doris.flink.tools.cdc.oracle.OracleDatabaseSync;
45+
import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

5049
import java.sql.Connection;
5150
import java.sql.SQLException;
52-
import java.sql.SQLSyntaxErrorException;
5351
import java.util.ArrayList;
5452
import java.util.HashMap;
5553
import java.util.HashSet;
@@ -150,7 +148,13 @@ public boolean build() throws Exception {
150148
// Calculate the mapping relationship between upstream and downstream tables
151149
tableMapping.put(
152150
schema.getTableIdentifier(), String.format("%s.%s", targetDb, dorisTable));
153-
tryCreateTableIfAbsent(dorisSystem, targetDb, dorisTable, schema);
151+
DorisTableUtil.tryCreateTableIfAbsent(
152+
dorisSystem,
153+
targetDb,
154+
dorisTable,
155+
schema,
156+
dorisTableConfig,
157+
ignoreIncompatible);
154158

155159
if (!dorisTables.contains(Tuple2.of(targetDb, dorisTable))) {
156160
dorisTables.add(Tuple2.of(targetDb, dorisTable));
@@ -470,40 +474,6 @@ public void setTableSchemaBuckets(
470474
}
471475
}
472476

473-
private void tryCreateTableIfAbsent(
474-
DorisSystem dorisSystem, String targetDb, String dorisTable, SourceSchema schema) {
475-
if (!dorisSystem.tableExists(targetDb, dorisTable)) {
476-
if (dorisTableConfig.isConvertUniqToPk()
477-
&& CollectionUtil.isNullOrEmpty(schema.primaryKeys)
478-
&& !CollectionUtil.isNullOrEmpty(schema.uniqueIndexs)) {
479-
schema.primaryKeys = new ArrayList<>(schema.uniqueIndexs);
480-
}
481-
TableSchema dorisSchema =
482-
DorisSchemaFactory.createTableSchema(
483-
targetDb,
484-
dorisTable,
485-
schema.getFields(),
486-
schema.getPrimaryKeys(),
487-
dorisTableConfig,
488-
schema.getTableComment());
489-
try {
490-
dorisSystem.createTable(dorisSchema);
491-
} catch (Exception ex) {
492-
handleTableCreationFailure(ex);
493-
}
494-
}
495-
}
496-
497-
private void handleTableCreationFailure(Exception ex) throws DorisSystemException {
498-
if (ignoreIncompatible && ex.getCause() instanceof SQLSyntaxErrorException) {
499-
LOG.warn(
500-
"Doris schema and source table schema are not compatible. Error: {} ",
501-
ex.getCause().toString());
502-
} else {
503-
throw new DorisSystemException("Failed to create table due to: ", ex);
504-
}
505-
}
506-
507477
protected Properties getJdbcProperties() {
508478
Properties jdbcProps = new Properties();
509479
for (Map.Entry<String, String> entry : config.toMap().entrySet()) {

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBDatabaseSync.java

+3
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ public DorisRecordSerializer<String> buildSchemaSerializer(
240240
.setTableMapping(tableMapping)
241241
.setTableConf(dorisTableConfig)
242242
.setTargetDatabase(database)
243+
.setTargetTablePrefix(tablePrefix)
244+
.setTargetTableSuffix(tableSuffix)
245+
.setTableNameConverter(converter)
243246
.build();
244247
}
245248

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/MongoDBSchema.java

+23
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.annotation.VisibleForTesting;
2121
import org.apache.flink.api.java.tuple.Tuple2;
2222

23+
import com.fasterxml.jackson.databind.JsonNode;
2324
import org.apache.doris.flink.catalog.doris.DorisType;
2425
import org.apache.doris.flink.catalog.doris.FieldSchema;
2526
import org.apache.doris.flink.tools.cdc.SourceSchema;
@@ -64,6 +65,28 @@ public MongoDBSchema(
6465
primaryKeys.add("_id");
6566
}
6667

68+
public MongoDBSchema(
69+
JsonNode jsonData, String databaseName, String tableName, String tableComment)
70+
throws Exception {
71+
super(databaseName, null, tableName, tableComment);
72+
fields = new LinkedHashMap<>();
73+
processSampleData(jsonData);
74+
75+
primaryKeys = new ArrayList<>();
76+
primaryKeys.add("_id");
77+
}
78+
79+
@VisibleForTesting
80+
protected void processSampleData(JsonNode data) {
81+
data.fieldNames()
82+
.forEachRemaining(
83+
fieldName -> {
84+
JsonNode value = data.get(fieldName);
85+
String dorisType = MongoDBType.jsonNodeToDorisType(value);
86+
fields.put(fieldName, new FieldSchema(fieldName, dorisType, null));
87+
});
88+
}
89+
6790
@VisibleForTesting
6891
protected void processSampleData(Document sampleData) {
6992
for (Map.Entry<String, Object> entry : sampleData.entrySet()) {

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoDBJsonDebeziumSchemaSerializer.java

+27-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
3030
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
3131
import org.apache.doris.flink.tools.cdc.DorisTableConfig;
32+
import org.apache.doris.flink.tools.cdc.converter.TableNameConverter;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

@@ -62,6 +63,7 @@ public class MongoDBJsonDebeziumSchemaSerializer implements DorisRecordSerialize
6263

6364
private String targetTablePrefix;
6465
private String targetTableSuffix;
66+
private TableNameConverter tableNameConverter;
6567

6668
public MongoDBJsonDebeziumSchemaSerializer(
6769
DorisOptions dorisOptions,
@@ -72,7 +74,8 @@ public MongoDBJsonDebeziumSchemaSerializer(
7274
DorisTableConfig dorisTableConfig,
7375
String targetDatabase,
7476
String targetTablePrefix,
75-
String targetTableSuffix) {
77+
String targetTableSuffix,
78+
TableNameConverter tableNameConverter) {
7679
this.dorisOptions = dorisOptions;
7780
this.pattern = pattern;
7881
this.sourceTableName = sourceTableName;
@@ -85,6 +88,7 @@ public MongoDBJsonDebeziumSchemaSerializer(
8588
this.targetDatabase = targetDatabase;
8689
this.targetTablePrefix = targetTablePrefix;
8790
this.targetTableSuffix = targetTableSuffix;
91+
this.tableNameConverter = tableNameConverter;
8892
if (executionOptions != null) {
8993
this.lineDelimiter =
9094
executionOptions
@@ -111,6 +115,7 @@ private void init() {
111115
targetTablePrefix,
112116
targetTableSuffix,
113117
enableDelete);
118+
changeContext.setTableNameConverter(tableNameConverter);
114119
this.dataChange = new MongoJsonDebeziumDataChange(changeContext);
115120
this.schemaChange = new MongoJsonDebeziumSchemaChange(changeContext);
116121
}
@@ -143,6 +148,7 @@ public static class Builder {
143148
private String targetDatabase;
144149
private String targetTablePrefix = "";
145150
private String targetTableSuffix = "";
151+
private TableNameConverter tableNameConverter;
146152

147153
public MongoDBJsonDebeziumSchemaSerializer.Builder setDorisOptions(
148154
DorisOptions dorisOptions) {
@@ -192,6 +198,24 @@ public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetDatabase(
192198
return this;
193199
}
194200

201+
public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetTablePrefix(
202+
String targetTablePrefix) {
203+
this.targetTablePrefix = targetTablePrefix;
204+
return this;
205+
}
206+
207+
public MongoDBJsonDebeziumSchemaSerializer.Builder setTargetTableSuffix(
208+
String targetTableSuffix) {
209+
this.targetTableSuffix = targetTableSuffix;
210+
return this;
211+
}
212+
213+
public MongoDBJsonDebeziumSchemaSerializer.Builder setTableNameConverter(
214+
TableNameConverter converter) {
215+
this.tableNameConverter = converter;
216+
return this;
217+
}
218+
195219
public MongoDBJsonDebeziumSchemaSerializer build() {
196220
return new MongoDBJsonDebeziumSchemaSerializer(
197221
dorisOptions,
@@ -202,7 +226,8 @@ public MongoDBJsonDebeziumSchemaSerializer build() {
202226
dorisTableConfig,
203227
targetDatabase,
204228
targetTablePrefix,
205-
targetTableSuffix);
229+
targetTableSuffix,
230+
tableNameConverter);
206231
}
207232
}
208233
}

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumDataChange.java

+2-17
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.flink.util.StringUtils;
2121

2222
import com.fasterxml.jackson.core.JsonProcessingException;
23-
import com.fasterxml.jackson.core.type.TypeReference;
2423
import com.fasterxml.jackson.databind.JsonNode;
2524
import com.fasterxml.jackson.databind.ObjectMapper;
2625
import com.fasterxml.jackson.databind.node.NullNode;
@@ -30,6 +29,7 @@
3029
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcDataChange;
3130
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
3231
import org.apache.doris.flink.tools.cdc.SourceSchema;
32+
import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil;
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

@@ -127,16 +127,7 @@ public Map<String, Object> extractBeforeRow(JsonNode record) {
127127
@Override
128128
public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
129129
JsonNode dataNode = recordRoot.get(FIELD_DATA);
130-
Map<String, Object> rowMap = extractRow(dataNode);
131-
String objectId;
132-
// if user specifies the `_id` field manually, the $oid field may not exist
133-
if (rowMap.get(ID_FIELD) instanceof Map<?, ?>) {
134-
objectId = ((Map<?, ?>) rowMap.get(ID_FIELD)).get(OID_FIELD).toString();
135-
} else {
136-
objectId = rowMap.get(ID_FIELD).toString();
137-
}
138-
rowMap.put(ID_FIELD, objectId);
139-
return rowMap;
130+
return JsonNodeExtractUtil.extractAfterRow(dataNode, objectMapper);
140131
}
141132

142133
private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
@@ -154,10 +145,4 @@ private Map<String, Object> extractDeleteRow(JsonNode recordRoot)
154145
row.put(ID_FIELD, objectId);
155146
return row;
156147
}
157-
158-
private Map<String, Object> extractRow(JsonNode recordRow) {
159-
Map<String, Object> recordMap =
160-
objectMapper.convertValue(recordRow, new TypeReference<Map<String, Object>>() {});
161-
return recordMap != null ? recordMap : new HashMap<>();
162-
}
163148
}

flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java

+54-2
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717

1818
package org.apache.doris.flink.tools.cdc.mongodb.serializer;
1919

20+
import org.apache.flink.annotation.VisibleForTesting;
21+
import org.apache.flink.util.StringUtils;
22+
2023
import com.fasterxml.jackson.databind.JsonNode;
2124
import com.fasterxml.jackson.databind.ObjectMapper;
2225
import com.fasterxml.jackson.databind.node.NullNode;
2326
import com.fasterxml.jackson.databind.node.ObjectNode;
27+
import org.apache.doris.flink.catalog.doris.DataModel;
2428
import org.apache.doris.flink.catalog.doris.DorisSystem;
2529
import org.apache.doris.flink.catalog.doris.FieldSchema;
2630
import org.apache.doris.flink.cfg.DorisOptions;
@@ -30,8 +34,11 @@
3034
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.CdcSchemaChange;
3135
import org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumChangeContext;
3236
import org.apache.doris.flink.tools.cdc.SourceSchema;
37+
import org.apache.doris.flink.tools.cdc.mongodb.MongoDBSchema;
3338
import org.apache.doris.flink.tools.cdc.mongodb.MongoDBType;
3439
import org.apache.doris.flink.tools.cdc.mongodb.MongoDateConverter;
40+
import org.apache.doris.flink.tools.cdc.utils.DorisTableUtil;
41+
import org.apache.doris.flink.tools.cdc.utils.JsonNodeExtractUtil;
3542
import org.slf4j.Logger;
3643
import org.slf4j.LoggerFactory;
3744

@@ -60,17 +67,19 @@ public class MongoJsonDebeziumSchemaChange extends CdcSchemaChange {
6067

6168
private final Map<String, Map<String, String>> tableFields;
6269

63-
private final SchemaChangeManager schemaChangeManager;
70+
private SchemaChangeManager schemaChangeManager;
6471

65-
private final DorisSystem dorisSystem;
72+
private DorisSystem dorisSystem;
6673

6774
public Map<String, String> tableMapping;
6875
private final DorisOptions dorisOptions;
76+
public JsonDebeziumChangeContext changeContext;
6977

7078
private final Set<String> specialFields =
7179
new HashSet<>(Arrays.asList(DATE_FIELD, TIMESTAMP_FIELD, DECIMAL_FIELD, LONG_FIELD));
7280

7381
public MongoJsonDebeziumSchemaChange(JsonDebeziumChangeContext changeContext) {
82+
this.changeContext = changeContext;
7483
this.objectMapper = changeContext.getObjectMapper();
7584
this.dorisOptions = changeContext.getDorisOptions();
7685
this.tableFields = new HashMap<>();
@@ -96,6 +105,35 @@ public boolean schemaChange(JsonNode recordRoot) {
96105
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
97106
String dorisTableIdentifier =
98107
getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
108+
109+
// if table dorisTableIdentifier is null, create table
110+
if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
111+
String[] split = cdcTableIdentifier.split("\\.");
112+
String targetDb = changeContext.getTargetDatabase();
113+
String sourceTable = split[1];
114+
String dorisTable = changeContext.getTableNameConverter().convert(sourceTable);
115+
LOG.info(
116+
"The table [{}.{}] does not exist. Attempting to create a new table named: {}.{}",
117+
targetDb,
118+
sourceTable,
119+
targetDb,
120+
dorisTable);
121+
tableMapping.put(cdcTableIdentifier, String.format("%s.%s", targetDb, dorisTable));
122+
dorisTableIdentifier = tableMapping.get(cdcTableIdentifier);
123+
Map<String, Object> stringObjectMap = extractAfterRow(logData);
124+
JsonNode jsonNode = objectMapper.valueToTree(stringObjectMap);
125+
126+
MongoDBSchema mongoSchema = new MongoDBSchema(jsonNode, targetDb, dorisTable, "");
127+
128+
mongoSchema.setModel(DataModel.UNIQUE);
129+
DorisTableUtil.tryCreateTableIfAbsent(
130+
dorisSystem,
131+
targetDb,
132+
dorisTable,
133+
mongoSchema,
134+
changeContext.getDorisTableConf());
135+
}
136+
99137
String[] tableInfo = dorisTableIdentifier.split("\\.");
100138
if (tableInfo.length != 2) {
101139
throw new DorisRuntimeException();
@@ -163,6 +201,10 @@ private JsonNode getFullDocument(JsonNode recordRoot) {
163201
}
164202
}
165203

204+
public Map<String, Object> extractAfterRow(JsonNode recordRoot) {
205+
return JsonNodeExtractUtil.extractAfterRow(recordRoot, objectMapper);
206+
}
207+
166208
private void checkAndUpdateSchemaChange(
167209
JsonNode logData, String dorisTableIdentifier, String database, String table) {
168210
Map<String, String> tableFieldMap = tableFields.get(dorisTableIdentifier);
@@ -207,4 +249,14 @@ public String getCdcTableIdentifier(JsonNode record) {
207249
String db = nameSpace.get(FIELD_DATABASE).asText();
208250
return SourceSchema.getString(db, null, table);
209251
}
252+
253+
@VisibleForTesting
254+
public void setDorisSystem(DorisSystem dorisSystem) {
255+
this.dorisSystem = dorisSystem;
256+
}
257+
258+
@VisibleForTesting
259+
public void setSchemaChangeManager(SchemaChangeManager schemaChangeManager) {
260+
this.schemaChangeManager = schemaChangeManager;
261+
}
210262
}

0 commit comments

Comments
 (0)