Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract upsert from sink strategies as configurable option #162

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/MongoSinkConnector.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ post.processor.chain=com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
# Write configuration
delete.on.null.values=false
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy
writemodel.strategy.upsert=true

max.batch.size = 0
rate.limiting.timeout=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public String value() {
static final String WRITEMODEL_STRATEGY_DEFAULT =
"com.mongodb.kafka.connect.sink.writemodel.strategy.DefaultWriteModelStrategy";

public static final String WRITEMODEL_STRATEGY_UPSERT_CONFIG = "writemodel.strategy.upsert";
private static final String WRITEMODEL_STRATEGY_UPSERT_DISPLAY = "The upsert writeModel strategy";
private static final String WRITEMODEL_STRATEGY_UPSERT_DOC =
"Whether or not to use upserts for the write model strategy";
static final boolean WRITEMODEL_STRATEGY_UPSERT_DEFAULT = true;

public static final String DELETE_WRITEMODEL_STRATEGY_CONFIG = "delete.writemodel.strategy";
private static final String DELETE_WRITEMODEL_STRATEGY_DISPLAY = "The delete writeModel strategy";
private static final String DELETE_WRITEMODEL_STRATEGY_DOC =
Expand Down Expand Up @@ -518,6 +524,10 @@ public IdStrategy getIdStrategy() {
return idStrategy;
}

public boolean isUpsertEnabled() {
return getBoolean(WRITEMODEL_STRATEGY_UPSERT_CONFIG);
}

PostProcessors getPostProcessors() {
if (postProcessors == null) {
postProcessors = new PostProcessors(this, getList(POST_PROCESSOR_CHAIN_CONFIG));
Expand Down Expand Up @@ -832,6 +842,16 @@ private static ConfigDef createConfigDef() {
++orderInGroup,
ConfigDef.Width.MEDIUM,
WRITEMODEL_STRATEGY_DISPLAY);
configDef.define(
WRITEMODEL_STRATEGY_UPSERT_CONFIG,
ConfigDef.Type.BOOLEAN,
WRITEMODEL_STRATEGY_UPSERT_DEFAULT,
ConfigDef.Importance.LOW,
WRITEMODEL_STRATEGY_UPSERT_DOC,
group,
++orderInGroup,
ConfigDef.Width.MEDIUM,
WRITEMODEL_STRATEGY_UPSERT_DISPLAY);
configDef.define(
DELETE_WRITEMODEL_STRATEGY_CONFIG,
ConfigDef.Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@

public class ReplaceOneBusinessKeyStrategy implements WriteModelStrategy, Configurable {

private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
private boolean isPartialId = false;
private boolean isUpsertEnabled = true;

@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
ReplaceOptions replaceOptions = new ReplaceOptions().upsert(isUpsertEnabled);
BsonDocument vd =
document
.getValueDoc()
Expand All @@ -65,12 +66,13 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
if (isPartialId) {
businessKey = flattenKeys(businessKey);
}
return new ReplaceOneModel<>(businessKey, vd, REPLACE_OPTIONS);
return new ReplaceOneModel<>(businessKey, vd, replaceOptions);
}

@Override
public void configure(final MongoSinkTopicConfig configuration) {
IdStrategy idStrategy = configuration.getIdStrategy();
isUpsertEnabled = configuration.isUpsertEnabled();
isPartialId =
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.Configurable;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;

public class ReplaceOneDefaultStrategy implements WriteModelStrategy {
public class ReplaceOneDefaultStrategy implements WriteModelStrategy, Configurable {

private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions().upsert(true);
private boolean isUpsertEnabled = true;

@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
ReplaceOptions replaceOptions = new ReplaceOptions().upsert(isUpsertEnabled);
BsonDocument vd =
document
.getValueDoc()
Expand All @@ -51,6 +54,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
"Could not build the WriteModel,the `_id` field was missing unexpectedly");
}

return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, REPLACE_OPTIONS);
return new ReplaceOneModel<>(new BsonDocument(ID_FIELD, idValue), vd, replaceOptions);
}

@Override
public void configure(final MongoSinkTopicConfig configuration) {
isUpsertEnabled = configuration.isUpsertEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@

public class UpdateOneBusinessKeyTimestampStrategy implements WriteModelStrategy, Configurable {

private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS";
static final String FIELD_NAME_INSERTED_TS = "_insertedTS";
private boolean isPartialId = false;
private boolean isUpsertEnabled = true;

@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled);
BsonDocument vd =
document
.getValueDoc()
Expand All @@ -71,12 +72,13 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
businessKey,
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
UPDATE_OPTIONS);
updateOptions);
}

@Override
public void configure(final MongoSinkTopicConfig configuration) {
IdStrategy idStrategy = configuration.getIdStrategy();
isUpsertEnabled = configuration.isUpsertEnabled();
isPartialId =
idStrategy instanceof PartialKeyStrategy || idStrategy instanceof PartialValueStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.Configurable;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;

public class UpdateOneDefaultStrategy implements WriteModelStrategy {
public class UpdateOneDefaultStrategy implements WriteModelStrategy, Configurable {

private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
private boolean isUpsertEnabled = true;

@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled);
BsonDocument vd =
document
.getValueDoc()
Expand All @@ -52,6 +55,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
}
vd.remove(ID_FIELD);
return new UpdateOneModel<>(
new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), UPDATE_OPTIONS);
new BsonDocument(ID_FIELD, idValue), new BsonDocument("$set", vd), updateOptions);
}

@Override
public void configure(final MongoSinkTopicConfig configuration) {
isUpsertEnabled = configuration.isUpsertEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.Configurable;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;

public class UpdateOneTimestampsStrategy implements WriteModelStrategy {
private static final UpdateOptions UPDATE_OPTIONS = new UpdateOptions().upsert(true);
public class UpdateOneTimestampsStrategy implements WriteModelStrategy, Configurable {
static final String FIELD_NAME_MODIFIED_TS = "_modifiedTS";
static final String FIELD_NAME_INSERTED_TS = "_insertedTS";
private boolean isUpsertEnabled = true;

@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
UpdateOptions updateOptions = new UpdateOptions().upsert(isUpsertEnabled);
BsonDocument vd =
document
.getValueDoc()
Expand All @@ -58,6 +61,11 @@ public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
new BsonDocument(ID_FIELD, idValue),
new BsonDocument("$set", vd.append(FIELD_NAME_MODIFIED_TS, dateTime))
.append("$setOnInsert", new BsonDocument(FIELD_NAME_INSERTED_TS, dateTime)),
UPDATE_OPTIONS);
updateOptions);
}

@Override
public void configure(final MongoSinkTopicConfig configuration) {
isUpsertEnabled = configuration.isUpsertEnabled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static com.mongodb.kafka.connect.sink.SinkTestHelper.createTopicConfig;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -42,6 +43,7 @@
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;

import com.mongodb.kafka.connect.sink.Configurable;
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.sink.converter.SinkDocument;
Expand Down Expand Up @@ -81,6 +83,7 @@ class WriteModelStrategyTest {
MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_CONFIG, PartialKeyStrategy.class.getName());
configMap.put(
MongoSinkTopicConfig.DOCUMENT_ID_STRATEGY_PARTIAL_KEY_PROJECTION_TYPE_CONFIG, "AllowList");
configMap.put(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true");

MongoSinkTopicConfig partialKeyConfig =
new MongoSinkConfig(configMap).getMongoSinkTopicConfig(TEST_TOPIC);
Expand Down Expand Up @@ -130,6 +133,59 @@ void testDefaultWriteModelStrategy() {
defaultWriteModelStrategy.getWriteModelStrategy() instanceof InsertOneDefaultStrategy);
}

@Test
@DisplayName("Ensure upsert config is correctly set")
void testWriteModelStrategyUpsertConfig() {
MongoSinkTopicConfig topicConfig;
ReplaceOneModel<BsonDocument> replaceOneModel;
UpdateOneModel<BsonDocument> updateOneModel;
WriteModel<BsonDocument> result;

Object[] replaceStrategies = {REPLACE_ONE_BUSINESS_KEY_STRATEGY, REPLACE_ONE_DEFAULT_STRATEGY};
for (Object strategy : replaceStrategies) {
Configurable configurableStrategy = (Configurable) strategy;
WriteModelStrategy writeStrategy = (WriteModelStrategy) strategy;

topicConfig =
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "false");
configurableStrategy.configure(topicConfig);
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
replaceOneModel = (ReplaceOneModel<BsonDocument>) result;
assertFalse(replaceOneModel.getReplaceOptions().isUpsert());

topicConfig =
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true");
configurableStrategy.configure(topicConfig);
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
replaceOneModel = (ReplaceOneModel<BsonDocument>) result;
assertTrue(replaceOneModel.getReplaceOptions().isUpsert());
}

Object[] updateStrategies = {
UPDATE_ONE_BUSINESS_KEY_TIMESTAMPS_STRATEGY,
UPDATE_ONE_DEFAULT_STRATEGY,
UPDATE_ONE_TIMESTAMPS_STRATEGY,
};
for (Object strategy : updateStrategies) {
Configurable configurableStrategy = (Configurable) strategy;
WriteModelStrategy writeStrategy = (WriteModelStrategy) strategy;

topicConfig =
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "false");
configurableStrategy.configure(topicConfig);
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
updateOneModel = (UpdateOneModel<BsonDocument>) result;
assertFalse(updateOneModel.getOptions().isUpsert());

topicConfig =
createTopicConfig(MongoSinkTopicConfig.WRITEMODEL_STRATEGY_UPSERT_CONFIG, "true");
configurableStrategy.configure(topicConfig);
result = writeStrategy.createWriteModel(new SinkDocument(null, VALUE_DOC.clone()));
updateOneModel = (UpdateOneModel<BsonDocument>) result;
assertTrue(updateOneModel.getOptions().isUpsert());
}
}

@Test
@DisplayName(
"when sink document is valid for InsertOneDefaultStrategy then correct InsertOneModel")
Expand Down