Skip to content

Commit

Permalink
Refactor logic of "updateConnectorDefintions" (#6123)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ella Rohm-Ensing committed Apr 27, 2023
1 parent 9da47d2 commit bb7601a
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String getPatchVersion() {
}

/**
* Compares two Version to check if they are equivalent. Only the major and minor part of the
* Compares two Versions to check if they are compatible. Only the major and minor part of the
* Version is taken into account.
*
* @param another version to compare
Expand All @@ -84,43 +84,43 @@ public int compatibleVersionCompareTo(final Version another) {
}

/**
* Test is a provided version is greater than this version.
* Test if this version is greater than another version.
*
* @param other version to compare
* @return true if this is greater than other. otherwise false.
* @return true if this version is greater than the other, otherwise false.
*/
public boolean greaterThan(final Version other) {
return patchVersionCompareTo(other) > 0;
return versionCompareTo(other) > 0;
}

/**
* Test is a provided version is greater than or equal to this version.
* Test if this version is greater than or equal to another version.
*
* @param other version to compare
* @return true if this is greater than or equal to other. otherwise false.
* @return true if this version is greater than or equal to the other, otherwise false.
*/
public boolean greaterThanOrEqualTo(final Version other) {
return patchVersionCompareTo(other) >= 0;
return versionCompareTo(other) >= 0;
}

/**
* Test is a provided version is less than to this version.
* Test if a provided version is less than another version.
*
* @param other version to compare
* @return true if this is greater than other. otherwise false.
* @return true if this version is less than the other, otherwise false.
*/
public boolean lessThan(final Version other) {
return patchVersionCompareTo(other) < 0;
return versionCompareTo(other) < 0;
}

/**
* Compares two Version to check if they are equivalent (including patch version).
* Compares two Versions to check if they are equivalent.
*
* @param another version to compare
* @return the value 0 if version == another; a value less than 0 if version < another; and a value
* greater than 0 if version > another
*/
public int patchVersionCompareTo(final Version another) {
public int versionCompareTo(final Version another) {
if (isDev() || another.isDev()) {
return 0;
}
Expand All @@ -136,7 +136,7 @@ public int patchVersionCompareTo(final Version another) {
}

/**
* Compares two Version to check if only the patch version was updated.
* Compares two Versions to check if only the patch version was updated.
*
* @param another version to compare
* @return true if exactly the same version or if the same version except for the patch. otherwise,
Expand Down Expand Up @@ -176,7 +176,7 @@ private static int compareVersion(final String v1, final String v2) {
}

/**
* Compares two Version to check if they are equivalent. Only the major and minor part of the
* Compares two Versions to check if they are compatible. Only the major and minor part of the
* Version is taken into account.
*
* @param v1 version to compare
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ void testCompatibleVersionCompareTo() {
}

@Test
void testPatchVersionCompareTo() {
assertEquals(0, new AirbyteVersion(VERSION_678_OMEGA).patchVersionCompareTo(new AirbyteVersion(VERSION_678_GAMMA)));
assertTrue(0 > new AirbyteVersion(VERSION_678_ALPHA).patchVersionCompareTo(new AirbyteVersion(VERSION_679_ALPHA)));
assertTrue(0 > new AirbyteVersion(VERSION_678_ALPHA).patchVersionCompareTo(new AirbyteVersion("6.7.11-alpha")));
assertTrue(0 < new AirbyteVersion(VERSION_680_ALPHA).patchVersionCompareTo(new AirbyteVersion(VERSION_678_ALPHA)));
assertTrue(0 < new AirbyteVersion(VERSION_6110_ALPHA).patchVersionCompareTo(new AirbyteVersion(VERSION_678_ALPHA)));
assertTrue(0 > new AirbyteVersion(VERSION_380_ALPHA).patchVersionCompareTo(new AirbyteVersion(VERSION_678_ALPHA)));
assertTrue(0 > new AirbyteVersion(VERSION_380_ALPHA).patchVersionCompareTo(new AirbyteVersion("11.7.8-alpha")));
assertEquals(0, new AirbyteVersion(VERSION_123_PROD).patchVersionCompareTo(new AirbyteVersion(DEV)));
assertEquals(0, new AirbyteVersion(DEV).patchVersionCompareTo(new AirbyteVersion(VERSION_123_PROD)));
void testversionCompareTo() {
assertEquals(0, new AirbyteVersion(VERSION_678_OMEGA).versionCompareTo(new AirbyteVersion(VERSION_678_GAMMA)));
assertTrue(0 > new AirbyteVersion(VERSION_678_ALPHA).versionCompareTo(new AirbyteVersion(VERSION_679_ALPHA)));
assertTrue(0 > new AirbyteVersion(VERSION_678_ALPHA).versionCompareTo(new AirbyteVersion("6.7.11-alpha")));
assertTrue(0 < new AirbyteVersion(VERSION_680_ALPHA).versionCompareTo(new AirbyteVersion(VERSION_678_ALPHA)));
assertTrue(0 < new AirbyteVersion(VERSION_6110_ALPHA).versionCompareTo(new AirbyteVersion(VERSION_678_ALPHA)));
assertTrue(0 > new AirbyteVersion(VERSION_380_ALPHA).versionCompareTo(new AirbyteVersion(VERSION_678_ALPHA)));
assertTrue(0 > new AirbyteVersion(VERSION_380_ALPHA).versionCompareTo(new AirbyteVersion("11.7.8-alpha")));
assertEquals(0, new AirbyteVersion(VERSION_123_PROD).versionCompareTo(new AirbyteVersion(DEV)));
assertEquals(0, new AirbyteVersion(DEV).versionCompareTo(new AirbyteVersion(VERSION_123_PROD)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
package io.airbyte.commons.version;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.json.Jsons;
import org.junit.jupiter.api.Test;

class VersionTest {

private static final Version LOWER_VERSION = new Version("1.0.0");
private static final Version HIGHER_VERSION = new Version("1.2.3");

@Test
void testJsonSerializationDeserialization() {
final String jsonString = """
Expand All @@ -25,4 +30,25 @@ void testJsonSerializationDeserialization() {
assertEquals(expectedVersion, deserializedVersionLoop);
}

@Test
void testGreaterThanOrEqualTo() {
assertTrue(LOWER_VERSION.greaterThanOrEqualTo(LOWER_VERSION));
assertTrue(HIGHER_VERSION.greaterThanOrEqualTo(LOWER_VERSION));
assertFalse(LOWER_VERSION.greaterThanOrEqualTo(HIGHER_VERSION));
}

@Test
void testGreaterThan() {
assertFalse(LOWER_VERSION.greaterThan(LOWER_VERSION));
assertTrue(HIGHER_VERSION.greaterThan(LOWER_VERSION));
assertFalse(LOWER_VERSION.greaterThan(HIGHER_VERSION));
}

@Test
void testLessThan() {
assertFalse(LOWER_VERSION.lessThan(LOWER_VERSION));
assertFalse(HIGHER_VERSION.lessThan(LOWER_VERSION));
assertTrue(LOWER_VERSION.lessThan(HIGHER_VERSION));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import static org.jooq.impl.DSL.asterisk;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AirbyteConfig;
Expand Down Expand Up @@ -95,7 +92,7 @@ void updateConfigsFromSeed(final DSLContext ctx,
newConnectorCount += destinationConnectorCounter.newCount;
updatedConnectorCount += destinationConnectorCounter.updateCount;

LOGGER.info("Connector definitions have been updated ({} new connectors, and {} updates)", newConnectorCount, updatedConnectorCount);
LOGGER.info("Connector definitions have been updated ({} new connectors, and {} version updates)", newConnectorCount, updatedConnectorCount);
}

/**
Expand Down Expand Up @@ -133,7 +130,7 @@ Map<String, ConnectorInfo> getConnectorRepositoryToInfoMap(final DSLContext ctx)
final AirbyteVersion v2 = new AirbyteVersion(c2.dockerImageTag);
LOGGER.warn("Duplicated connector version found for {}: {} ({}) vs {} ({})",
c1.dockerRepository, c1.dockerImageTag, c1.definitionId, c2.dockerImageTag, c2.definitionId);
final int comparison = v1.patchVersionCompareTo(v2);
final int comparison = v1.versionCompareTo(v2);
if (comparison >= 0) {
return c1;
} else {
Expand Down Expand Up @@ -188,87 +185,61 @@ <T> ConnectorCounter updateConnectorDefinitions(final DSLContext ctx,
final AirbyteConfig configType,
final List<T> latestDefinitions,
final Set<String> connectorRepositoriesInUse,
final Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap)
throws IOException {
final Map<String, ConnectorInfo> connectorRepositoryToIdVersionMap) {
int newCount = 0;
int updatedCount = 0;

for (final T definition : latestDefinitions) {
final JsonNode latestDefinition = Jsons.jsonNode(definition);
final String repository = latestDefinition.get("dockerRepository").asText();
for (final T latestDefinition : latestDefinitions) {
final JsonNode latestDefinitionJson = Jsons.jsonNode(latestDefinition);
final String repository = latestDefinitionJson.get("dockerRepository").asText();
final boolean connectorIsInUse = connectorRepositoriesInUse.contains(repository);

final Map<String, ConnectorInfo> connectorRepositoryToIdVersionMapWithoutCustom = filterCustomConnector(connectorRepositoryToIdVersionMap,
configType);

// Add new connector
if (!connectorRepositoryToIdVersionMapWithoutCustom.containsKey(repository)) {
LOGGER.info("Adding new connector {}: {}", repository, latestDefinition);
LOGGER.info("Adding new connector {}: {}", repository, latestDefinitionJson);
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
newCount++;
continue;
}

// Handle existing connectors
final ConnectorInfo connectorInfo = connectorRepositoryToIdVersionMapWithoutCustom.get(repository);
final JsonNode currentDefinition = connectorInfo.definition;

// todo (lmossman) - this logic to remove the "spec" field is temporary; it is necessary to avoid
// breaking users who are actively using an old connector version, otherwise specs from the most
// recent connector versions may be inserted into the db which could be incompatible with the
// version they are actually using.
// Once the faux major version bump has been merged, this "new field" logic will be removed
// entirely.
final Set<String> newFields = Sets.difference(getNewFields(currentDefinition, latestDefinition), Set.of("spec"));

// Process connector in use
if (connectorRepositoriesInUse.contains(repository)) {
final String latestImageTag = latestDefinition.get("dockerImageTag").asText();
if (hasNewPatchVersion(connectorInfo.dockerImageTag, latestImageTag)) {
// Update connector to the latest patch version
final String latestImageTag = latestDefinitionJson.get("dockerImageTag").asText();

if (updateIsAvailable(connectorInfo.dockerImageTag, latestImageTag)) {
if (updateIsPatchOnly(connectorInfo.dockerImageTag, latestImageTag)) {
// Always update the connector to a new patch version if available
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
updatedCount++;
} else if (newFields.isEmpty()) {
LOGGER.info("Connector {} is in use and has all fields; skip updating", repository);
} else {
// Add new fields to the connector definition
final JsonNode definitionToUpdate = getDefinitionWithNewFields(currentDefinition, latestDefinition, newFields);
LOGGER.info("Connector {} has new fields: {}", repository, String.join(", ", newFields));
writeOrUpdateStandardDefinition(ctx, configType, definitionToUpdate);
} else if (!connectorIsInUse) {
// Only update the connector to new major/minor versions if it's not in use
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
updatedCount++;
}
continue;
}

// Process unused connector
final String latestImageTag = latestDefinition.get("dockerImageTag").asText();
if (hasNewVersion(connectorInfo.dockerImageTag, latestImageTag)) {
// Update connector to the latest version
LOGGER.info("Connector {} needs update: {} vs {}", repository, connectorInfo.dockerImageTag, latestImageTag);
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
updatedCount++;
} else if (!newFields.isEmpty()) {
// Add new fields to the connector definition
final JsonNode definitionToUpdate = getDefinitionWithNewFields(currentDefinition, latestDefinition, newFields);
LOGGER.info("Connector {} has new fields: {}", repository, String.join(", ", newFields));
writeOrUpdateStandardDefinition(ctx, configType, definitionToUpdate);
updatedCount++;
} else {
LOGGER.info("Connector {} does not need update: {}", repository, connectorInfo.dockerImageTag);
// If no new version, still upsert in case something changed in the definition
// without the version being updated. We won't count that toward updatedCount though.
writeOrUpdateStandardDefinition(ctx, configType, latestDefinition);
}
}

return new ConnectorCounter(newCount, updatedCount);
}

private void writeOrUpdateStandardDefinition(final DSLContext ctx,
final AirbyteConfig configType,
final JsonNode definition) {
private <T> void writeOrUpdateStandardDefinition(final DSLContext ctx,
final AirbyteConfig configType,
final T definition) {
if (configType == ConfigSchema.STANDARD_SOURCE_DEFINITION) {
final StandardSourceDefinition sourceDef = Jsons.object(definition, StandardSourceDefinition.class);
final StandardSourceDefinition sourceDef = (StandardSourceDefinition) definition;
sourceDef.withProtocolVersion(getProtocolVersion(sourceDef.getSpec()));
ConfigWriter.writeStandardSourceDefinition(Collections.singletonList(sourceDef), ctx);
} else if (configType == ConfigSchema.STANDARD_DESTINATION_DEFINITION) {
final StandardDestinationDefinition destDef = Jsons.object(definition, StandardDestinationDefinition.class);
final StandardDestinationDefinition destDef = (StandardDestinationDefinition) definition;
destDef.withProtocolVersion(getProtocolVersion(destDef.getSpec()));
ConfigWriter.writeStandardDestinationDefinition(Collections.singletonList(destDef), ctx);
} else {
Expand All @@ -281,40 +252,17 @@ private static String getProtocolVersion(final ConnectorSpecification spec) {
}

@VisibleForTesting
static Set<String> getNewFields(final JsonNode currentDefinition, final JsonNode latestDefinition) {
final Set<String> currentFields = MoreIterators.toSet(currentDefinition.fieldNames());
final Set<String> latestFields = MoreIterators.toSet(latestDefinition.fieldNames());
return Sets.difference(latestFields, currentFields);
}

/**
* Get definition with new fields. Adds new fields to currentDefinition by pulling them out of
* latestDefinition.
*
* @param currentDefinition current definition
* @param latestDefinition latest definition
* @param newFields fields to add
* @return a clone of the current definition with the new fields from the latest definition.
*/
@VisibleForTesting
static JsonNode getDefinitionWithNewFields(final JsonNode currentDefinition, final JsonNode latestDefinition, final Set<String> newFields) {
final ObjectNode currentClone = (ObjectNode) Jsons.clone(currentDefinition);
newFields.forEach(field -> currentClone.set(field, latestDefinition.get(field)));
return currentClone;
}

@VisibleForTesting
static boolean hasNewVersion(final String currentVersion, final String latestVersion) {
static boolean updateIsAvailable(final String currentVersion, final String latestVersion) {
try {
return new AirbyteVersion(latestVersion).patchVersionCompareTo(new AirbyteVersion(currentVersion)) > 0;
return new AirbyteVersion(latestVersion).versionCompareTo(new AirbyteVersion(currentVersion)) > 0;
} catch (final Exception e) {
LOGGER.error("Failed to check version: {} vs {}", currentVersion, latestVersion);
return false;
}
}

@VisibleForTesting
static boolean hasNewPatchVersion(final String currentVersion, final String latestVersion) {
static boolean updateIsPatchOnly(final String currentVersion, final String latestVersion) {
try {
return new AirbyteVersion(latestVersion).checkOnlyPatchVersionIsUpdatedComparedTo(new AirbyteVersion(currentVersion));
} catch (final Exception e) {
Expand Down
Loading

0 comments on commit bb7601a

Please sign in to comment.