diff --git a/karapace/compatibility/__init__.py b/karapace/compatibility/__init__.py index b2f3914d4..57cc2593b 100644 --- a/karapace/compatibility/__init__.py +++ b/karapace/compatibility/__init__.py @@ -20,6 +20,16 @@ @unique class CompatibilityModes(Enum): + """ Supported compatibility modes. + + - none: no compatibility checks done. + - backward compatibility: new schema can *read* data produced by the olders + schemas. + - forward compatibility: new schema can *produce* data compatible with old + schemas. + - transitive compatibility: new schema can read data produced by *all* + previous schemas, otherwise only the previous schema is checked. + """ BACKWARD = "BACKWARD" BACKWARD_TRANSITIVE = "BACKWARD_TRANSITIVE" FORWARD = "FORWARD" @@ -53,12 +63,13 @@ def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Valida def check_compatibility( - source: TypedSchema, target: TypedSchema, compatibility_mode: CompatibilityModes + old_schema: TypedSchema, new_schema: TypedSchema, compatibility_mode: CompatibilityModes ) -> SchemaCompatibilityResult: - if source.schema_type is not target.schema_type: + """ Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`. """ + if old_schema.schema_type is not new_schema.schema_type: return SchemaCompatibilityResult.incompatible( incompat_type=SchemaIncompatibilityType.type_mismatch, - message=f"Comparing different schema types: {source.schema_type} with {target.schema_type}", + message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}", location=[], ) @@ -66,32 +77,60 @@ def check_compatibility( LOG.info("Compatibility level set to NONE, no schema compatibility checks performed") return SchemaCompatibilityResult.compatible() - if source.schema_type is SchemaType.AVRO: + if old_schema.schema_type is SchemaType.AVRO: if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema) + result = check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema) + result = check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ) elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_avro_compatibility(reader_schema=target.schema, writer_schema=source.schema) - result = result.merged_with(check_avro_compatibility(reader_schema=source.schema, writer_schema=target.schema)) - - elif source.schema_type is SchemaType.JSONSCHEMA: + result = check_avro_compatibility( + reader_schema=new_schema.schema, + writer_schema=old_schema.schema, + ) + result = result.merged_with( + check_avro_compatibility( + reader_schema=old_schema.schema, + writer_schema=new_schema.schema, + ) + ) + + elif old_schema.schema_type is SchemaType.JSONSCHEMA: if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}: - result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema) + result = check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}: - result = check_jsonschema_compatibility(reader=source.schema, writer=target.schema) + result = check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}: - result = check_jsonschema_compatibility(reader=target.schema, writer=source.schema) - result = result.merged_with(check_jsonschema_compatibility(reader=source.schema, writer=target.schema)) + result = check_jsonschema_compatibility( + reader=new_schema.schema, + writer=old_schema.schema, + ) + result = result.merged_with( + check_jsonschema_compatibility( + reader=old_schema.schema, + writer=new_schema.schema, + ) + ) else: result = SchemaCompatibilityResult.incompatible( incompat_type=SchemaIncompatibilityType.type_mismatch, - message=f"Unknow schema_type {source.schema_type}", + message=f"Unknow schema_type {old_schema.schema_type}", location=[], ) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 16fb70c7c..21ab396bb 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -241,7 +241,7 @@ async def compatibility_check(self, content_type, *, subject, version, request): self.log.info("Existing schema: %r, new_schema: %r", old["schema"], body["schema"]) try: schema_type = SchemaType(body.get("schemaType", "AVRO")) - new = TypedSchema.parse(schema_type, body["schema"]) + new_schema = TypedSchema.parse(schema_type, body["schema"]) except InvalidSchema: self.log.warning("Invalid schema: %r", body["schema"]) self.r( @@ -268,9 +268,15 @@ async def compatibility_check(self, content_type, *, subject, version, request): compatibility_mode = self._get_compatibility_mode(subject=old, content_type=content_type) - result = check_compatibility(source=old_schema, target=new, compatibility_mode=compatibility_mode) + result = check_compatibility( + old_schema=old_schema, + new_schema=new_schema, + compatibility_mode=compatibility_mode, + ) if is_incompatible(result): - self.log.warning("Invalid schema %s found by compatibility check: old: %s new: %s", result, old_schema, new) + self.log.warning( + "Invalid schema %s found by compatibility check: old: %s new: %s", result, old_schema, new_schema + ) self.r({"is_compatible": False}, content_type) self.r({"is_compatible": True}, content_type) @@ -681,7 +687,11 @@ def write_new_schema_local(self, subject, body, content_type): for old_version in check_against: old_schema = subject_data["schemas"][old_version]["schema"] - result = check_compatibility(source=old_schema, target=new_schema, compatibility_mode=compatibility_mode) + result = check_compatibility( + old_schema=old_schema, + new_schema=new_schema, + compatibility_mode=compatibility_mode, + ) if is_incompatible(result): message = set(result.messages).pop() if result.messages else "" self.log.warning("Incompatible schema: %s", result)