Skip to content

Commit

Permalink
revert protocol v1 migration to noop (#5678)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 6, 2023
1 parent c2f922f commit 28d0093
Show file tree
Hide file tree
Showing 16 changed files with 2,336 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,29 @@

package io.airbyte.commons.protocol.migrations.v1;

import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.REF_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.protocol.migrations.AirbyteMessageMigration;
import io.airbyte.commons.protocol.migrations.util.RecordMigrations;
import io.airbyte.commons.protocol.migrations.util.RecordMigrations.MigratedNode;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.Version;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaReferenceTypes;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

/**
Expand All @@ -19,20 +36,136 @@
// @Singleton
public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration<io.airbyte.protocol.models.v0.AirbyteMessage, AirbyteMessage> {

private final JsonSchemaValidator validator;

public AirbyteMessageMigrationV1() {
this(new JsonSchemaValidator());
}

@VisibleForTesting
public AirbyteMessageMigrationV1(final JsonSchemaValidator validator) {
this.validator = validator;
}

@Override
public io.airbyte.protocol.models.v0.AirbyteMessage downgrade(final AirbyteMessage oldMessage,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return Jsons.object(
final io.airbyte.protocol.models.v0.AirbyteMessage newMessage = Jsons.object(
Jsons.jsonNode(oldMessage),
io.airbyte.protocol.models.v0.AirbyteMessage.class);
if (oldMessage.getType() == Type.CATALOG && oldMessage.getCatalog() != null) {
for (final io.airbyte.protocol.models.v0.AirbyteStream stream : newMessage.getCatalog().getStreams()) {
final JsonNode schema = stream.getJsonSchema();
SchemaMigrationV1.downgradeSchema(schema);
}
} else if (oldMessage.getType() == Type.RECORD && oldMessage.getRecord() != null) {
if (configuredAirbyteCatalog.isPresent()) {
final ConfiguredAirbyteCatalog catalog = configuredAirbyteCatalog.get();
final io.airbyte.protocol.models.v0.AirbyteRecordMessage record = newMessage.getRecord();
final Optional<ConfiguredAirbyteStream> maybeStream = catalog.getStreams().stream()
.filter(stream -> Objects.equals(stream.getStream().getName(), record.getStream())
&& Objects.equals(stream.getStream().getNamespace(), record.getNamespace()))
.findFirst();
// If this record doesn't belong to any configured stream, then there's no point downgrading it
// So only do the downgrade if we can find its stream
if (maybeStream.isPresent()) {
final JsonNode schema = maybeStream.get().getStream().getJsonSchema();
final JsonNode oldData = record.getData();
final MigratedNode downgradedNode = downgradeRecord(oldData, schema);
record.setData(downgradedNode.node());
}
}
}
return newMessage;
}

@Override
public AirbyteMessage upgrade(final io.airbyte.protocol.models.v0.AirbyteMessage oldMessage,
final Optional<ConfiguredAirbyteCatalog> configuredAirbyteCatalog) {
return Jsons.object(
// We're not introducing any changes to the structure of the record/catalog
// so just clone a new message object, which we can edit in-place
final AirbyteMessage newMessage = Jsons.object(
Jsons.jsonNode(oldMessage),
AirbyteMessage.class);
if (oldMessage.getType() == io.airbyte.protocol.models.v0.AirbyteMessage.Type.CATALOG && oldMessage.getCatalog() != null) {
for (final AirbyteStream stream : newMessage.getCatalog().getStreams()) {
final JsonNode schema = stream.getJsonSchema();
SchemaMigrationV1.upgradeSchema(schema);
}
} else if (oldMessage.getType() == io.airbyte.protocol.models.v0.AirbyteMessage.Type.RECORD && oldMessage.getRecord() != null) {
final JsonNode oldData = newMessage.getRecord().getData();
final JsonNode newData = upgradeRecord(oldData);
newMessage.getRecord().setData(newData);
}
return newMessage;
}

/**
* Returns a copy of oldData, with numeric values converted to strings. String and boolean values
* are returned as-is for convenience, i.e. this is not a true deep copy.
*/
private static JsonNode upgradeRecord(final JsonNode oldData) {
if (oldData.isNumber()) {
// Base case: convert numbers to strings
return Jsons.convertValue(oldData.asText(), TextNode.class);
} else if (oldData.isObject()) {
// Recurse into each field of the object
final ObjectNode newData = (ObjectNode) Jsons.emptyObject();

final Iterator<Entry<String, JsonNode>> fieldsIterator = oldData.fields();
while (fieldsIterator.hasNext()) {
final Entry<String, JsonNode> next = fieldsIterator.next();
final String key = next.getKey();
final JsonNode value = next.getValue();

final JsonNode newValue = upgradeRecord(value);
newData.set(key, newValue);
}

return newData;
} else if (oldData.isArray()) {
// Recurse into each element of the array
final ArrayNode newData = Jsons.arrayNode();
for (final JsonNode element : oldData) {
newData.add(upgradeRecord(element));
}
return newData;
} else {
// Base case: this is a string or boolean, so we don't need to modify it
return oldData;
}
}

/**
* We need the schema to recognize which fields are integers, since it would be wrong to just assume
* any numerical string should be parsed out.
*
* Works on a best-effort basis. If the schema doesn't match the data, we'll do our best to
* downgrade anything that we can definitively say is a number. Should _not_ throw an exception if
* bad things happen (e.g. we try to parse a non-numerical string as a number).
*/
private MigratedNode downgradeRecord(final JsonNode data, final JsonNode schema) {
return RecordMigrations.mutateDataNode(
validator,
s -> {
if (s.hasNonNull(REF_KEY)) {
final String type = s.get(REF_KEY).asText();
return JsonSchemaReferenceTypes.INTEGER_REFERENCE.equals(type)
|| JsonSchemaReferenceTypes.NUMBER_REFERENCE.equals(type);
} else {
return false;
}
},
(s, d) -> {
if (d.asText().matches("-?\\d+(\\.\\d+)?")) {
// If this string is a numeric literal, convert it to a numeric node.
return new MigratedNode(Jsons.deserialize(d.asText()), true);
} else {
// Otherwise, just leave the node unchanged.
return new MigratedNode(d, false);
}
},
data, schema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.protocol.migrations.v1;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.protocol.migrations.util.SchemaMigrations;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;

/**
* For the v0 to v1 migration, it appears that we are persisting some protocol objects without
* version. Until this gets addressed more properly, this class contains the helper functions used
* to handle this on the fly migration.
*
* Once persisted objects are versioned, this code should be deleted.
*/
public class CatalogMigrationV1Helper {

/**
* Performs an in-place migration of the schema from v0 to v1 if v0 data types are detected.
*
* @param configuredAirbyteCatalog to migrate
*/
public static void upgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV0DataTypes(configuredAirbyteCatalog)) {
upgradeSchema(configuredAirbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v0 to v1 if v0 data types are detected.
*
* @param airbyteCatalog to migrate
*/
public static void upgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV0DataTypes(airbyteCatalog)) {
upgradeSchema(airbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v0 to v1.
*
* @param configuredAirbyteCatalog to migrate
*/
private static void upgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.upgradeSchema(stream.getStream().getJsonSchema());
}
}

/**
* Performs an in-place migration of the schema from v0 to v1.
*
* @param airbyteCatalog to migrate
*/
private static void upgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.upgradeSchema(stream.getJsonSchema());
}
}

/**
* Returns true if catalog contains v0 data types.
*/
private static boolean containsV0DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}

return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV0DataTypes)
.orElse(false);
}

/**
* Returns true if catalog contains v0 data types.
*/
private static boolean containsV0DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}

return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV0DataTypes)
.orElse(false);
}

private static boolean streamContainsV0DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV0DataType(airbyteStream.getJsonSchema());
}

/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV0DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveTypeDeclaration(schema)) {
return true;
}

for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV0DataType(subSchema)) {
return true;
}
}
return false;
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected.
*
* @param configuredAirbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV1DataTypes(configuredAirbyteCatalog)) {
downgradeSchema(configuredAirbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected.
*
* @param airbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV1DataTypes(airbyteCatalog)) {
downgradeSchema(airbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0.
*
* @param configuredAirbyteCatalog to migrate
*/
private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema());
}
}

/**
* Performs an in-place migration of the schema from v1 to v0.
*
* @param airbyteCatalog to migrate
*/
private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getJsonSchema());
}
}

/**
* Returns true if catalog contains v1 data types.
*/
private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}

return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

/**
* Returns true if catalog contains v1 data types.
*/
private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}

return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV1DataType(airbyteStream.getJsonSchema());
}

/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV1DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) {
return true;
}

for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV1DataType(subSchema)) {
return true;
}
}
return false;
}

}
Loading

0 comments on commit 28d0093

Please sign in to comment.