Skip to content

Commit

Permalink
Merge pull request Aiven-Open#193 from aiven/hacka-schema-compat-docs
Browse files Browse the repository at this point in the history
Documentation for compatibility modes
  • Loading branch information
tvainika authored Mar 30, 2021
2 parents e18a059 + 23c47c0 commit a40492a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 19 deletions.
69 changes: 54 additions & 15 deletions karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@

@unique
class CompatibilityModes(Enum):
""" Supported compatibility modes.
- none: no compatibility checks done.
- backward compatibility: new schema can *read* data produced by the olders
schemas.
- forward compatibility: new schema can *produce* data compatible with old
schemas.
- transitive compatibility: new schema can read data produced by *all*
previous schemas, otherwise only the previous schema is checked.
"""
BACKWARD = "BACKWARD"
BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE"
FORWARD = "FORWARD"
Expand Down Expand Up @@ -53,45 +63,74 @@ def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Valida


def check_compatibility(
source: TypedSchema, target: TypedSchema, compatibility_mode: CompatibilityModes
old_schema: TypedSchema, new_schema: TypedSchema, compatibility_mode: CompatibilityModes
) -> SchemaCompatibilityResult:
if source.schema_type is not target.schema_type:
""" Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`. """
if old_schema.schema_type is not new_schema.schema_type:
return SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {source.schema_type} with {target.schema_type}",
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult.compatible()

if source.schema_type is SchemaType.AVRO:
if old_schema.schema_type is SchemaType.AVRO:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema)
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema)
result = check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema)
result = result.merged_with(check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema))

elif source.schema_type is SchemaType.JSONSCHEMA:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = result.merged_with(
check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)
)

elif old_schema.schema_type is SchemaType.JSONSCHEMA:
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=source.schema, writer=target.schema)
result = check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema)
result = result.merged_with(check_jsonschema_compatibility(reader=source.schema, writer=target.schema))
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = result.merged_with(
check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)
)

else:
result = SchemaCompatibilityResult.incompatible(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Unknow schema_type {source.schema_type}",
message=f"Unknow schema_type {old_schema.schema_type}",
location=[],
)

Expand Down
18 changes: 14 additions & 4 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ async def compatibility_check(self, content_type, *, subject, version, request):
self.log.info("Existing schema: %r, new_schema: %r", old["schema"], body["schema"])
try:
schema_type = SchemaType(body.get("schemaType", "AVRO"))
new = TypedSchema.parse(schema_type, body["schema"])
new_schema = TypedSchema.parse(schema_type, body["schema"])
except InvalidSchema:
self.log.warning("Invalid schema: %r", body["schema"])
self.r(
Expand All @@ -268,9 +268,15 @@ async def compatibility_check(self, content_type, *, subject, version, request):

compatibility_mode = self._get_compatibility_mode(subject=old, content_type=content_type)

result = check_compatibility(source=old_schema, target=new, compatibility_mode=compatibility_mode)
result = check_compatibility(
old_schema=old_schema,
new_schema=new_schema,
compatibility_mode=compatibility_mode,
)
if is_incompatible(result):
self.log.warning("Invalid schema %s found by compatibility check: old: %s new: %s", result, old_schema, new)
self.log.warning(
"Invalid schema %s found by compatibility check: old: %s new: %s", result, old_schema, new_schema
)
self.r({"is_compatible": False}, content_type)
self.r({"is_compatible": True}, content_type)

Expand Down Expand Up @@ -681,7 +687,11 @@ def write_new_schema_local(self, subject, body, content_type):

for old_version in check_against:
old_schema = subject_data["schemas"][old_version]["schema"]
result = check_compatibility(source=old_schema, target=new_schema, compatibility_mode=compatibility_mode)
result = check_compatibility(
old_schema=old_schema,
new_schema=new_schema,
compatibility_mode=compatibility_mode,
)
if is_incompatible(result):
message = set(result.messages).pop() if result.messages else ""
self.log.warning("Incompatible schema: %s", result)
Expand Down

0 comments on commit a40492a

Please sign in to comment.