Skip to content

Commit e568f64

Browse files
dylrichedenhill
authored andcommitted
avro: fix primitive and union schema parsing bugs
This commit cleans up a few bugs in the _schema_loads function: We use primitive types retrieved from the Confluent Registry and encountered an issue where _schema_loads would cause json deserialization errors by double quoting valid primitive declarations. Previous tests included incorrectly specified primitive declarations, according to the Avro spec primitive declarations are valid JSON documents, but they had been specified as strings of their type name with no quoting. I fixed the tests as well as the issue in _schema_loads Somewhat separately, there was also an issue with Avro union types. _schema_loads was incorrectly causing json serialization errors for unions because it included them on accident with its special-casing of primitive declarations. I added a check for json arrays to exclude them from the special casing. I also had to add a check later to ensure the _schema_name property was special-cased to None for unions. This should have no impact on names in the registry because _schema_name isn't used at all for the recommended subject name strategy with unions.
1 parent e2a11bc commit e568f64

File tree

3 files changed

+62
-14
lines changed

3 files changed

+62
-14
lines changed

src/confluent_kafka/schema_registry/avro.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ def _schema_loads(schema_str):
6262
schema_str = schema_str.strip()
6363

6464
# canonical form primitive declarations are not supported
65-
if schema_str[0] != "{":
66-
schema_str = '{"type":"' + schema_str + '"}'
65+
if schema_str[0] != "{" and schema_str[0] != "[":
66+
schema_str = '{"type":' + schema_str + '}'
6767

6868
return Schema(schema_str, schema_type='AVRO')
6969

@@ -192,11 +192,20 @@ def __init__(self, schema_registry_client, schema_str,
192192
schema = _schema_loads(schema_str)
193193
schema_dict = loads(schema.schema_str)
194194
parsed_schema = parse_schema(schema_dict)
195-
# The Avro spec states primitives have a name equal to their type
196-
# i.e. {"type": "string"} has a name of string.
197-
# This function does not comply.
198-
# https://github.com/fastavro/fastavro/issues/415
199-
schema_name = parsed_schema.get('name', schema_dict['type'])
195+
196+
if isinstance(parsed_schema, list):
197+
# if parsed_schema is a list, we have an Avro union and there
198+
# is no valid schema name. This is fine because the only use of
199+
# schema_name is for supplying the subject name to the registry
200+
# and union types should use topic_subject_name_strategy, which
201+
# just discards the schema name anyway
202+
schema_name = None
203+
else:
204+
# The Avro spec states primitives have a name equal to their type
205+
# i.e. {"type": "string"} has a name of string.
206+
# This function does not comply.
207+
# https://github.com/fastavro/fastavro/issues/415
208+
schema_name = parsed_schema.get("name", schema_dict["type"])
200209

201210
self._schema = schema
202211
self._schema_name = schema_name
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[
2+
{
3+
"name": "RecordOne",
4+
"type": "record",
5+
"fields": [
6+
{
7+
"name": "field_one",
8+
"type": "string"
9+
}
10+
]
11+
},
12+
{
13+
"name": "RecordTwo",
14+
"type": "record",
15+
"fields": [
16+
{
17+
"name": "field_two",
18+
"type": "int"
19+
}
20+
]
21+
}
22+
]

tests/schema_registry/test_avro_serializer.py

+24-7
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def test_avro_serializer_config_auto_register_schemas():
3434
"""
3535
conf = {'url': TEST_URL}
3636
test_client = SchemaRegistryClient(conf)
37-
test_serializer = AvroSerializer(test_client, 'string',
37+
test_serializer = AvroSerializer(test_client, '"string"',
3838
conf={'auto.register.schemas': False})
3939
assert not test_serializer._auto_register
4040

@@ -60,7 +60,7 @@ def test_avro_serializer_config_auto_register_schemas_false(mock_schema_registry
6060
topic = "test-auto-register"
6161
subject = topic + '-key'
6262

63-
test_serializer = AvroSerializer(test_client, 'string',
63+
test_serializer = AvroSerializer(test_client, '"string"',
6464
conf={'auto.register.schemas': False})
6565

6666
test_serializer("test",
@@ -83,7 +83,7 @@ def test_avro_serializer_config_use_latest_version(mock_schema_registry):
8383
topic = "test-use-latest-version"
8484
subject = topic + '-key'
8585

86-
test_serializer = AvroSerializer(test_client, 'string',
86+
test_serializer = AvroSerializer(test_client, '"string"',
8787
conf={'auto.register.schemas': False, 'use.latest.version': True})
8888

8989
test_serializer("test",
@@ -104,7 +104,7 @@ def test_avro_serializer_config_subject_name_strategy():
104104

105105
conf = {'url': TEST_URL}
106106
test_client = SchemaRegistryClient(conf)
107-
test_serializer = AvroSerializer(test_client, 'int',
107+
test_serializer = AvroSerializer(test_client, '"int"',
108108
conf={'subject.name.strategy':
109109
record_subject_name_strategy})
110110

@@ -119,7 +119,7 @@ def test_avro_serializer_config_subject_name_strategy_invalid():
119119
conf = {'url': TEST_URL}
120120
test_client = SchemaRegistryClient(conf)
121121
with pytest.raises(ValueError, match="must be callable"):
122-
AvroSerializer(test_client, 'int',
122+
AvroSerializer(test_client, '"int"',
123123
conf={'subject.name.strategy': dict()})
124124

125125

@@ -148,7 +148,7 @@ def test_avro_serializer_record_subject_name_strategy_primitive(load_avsc):
148148
"""
149149
conf = {'url': TEST_URL}
150150
test_client = SchemaRegistryClient(conf)
151-
test_serializer = AvroSerializer(test_client, 'int',
151+
test_serializer = AvroSerializer(test_client, '"int"',
152152
conf={'subject.name.strategy':
153153
record_subject_name_strategy})
154154

@@ -181,7 +181,7 @@ def test_avro_serializer_topic_record_subject_name_strategy_primitive(load_avsc)
181181
"""
182182
conf = {'url': TEST_URL}
183183
test_client = SchemaRegistryClient(conf)
184-
test_serializer = AvroSerializer(test_client, 'int',
184+
test_serializer = AvroSerializer(test_client, '"int"',
185185
conf={'subject.name.strategy':
186186
topic_record_subject_name_strategy})
187187

@@ -204,3 +204,20 @@ def test_avro_serializer_subject_name_strategy_default(load_avsc):
204204
ctx = SerializationContext('test_subj', MessageField.VALUE)
205205
assert test_serializer._subject_name_func(
206206
ctx, test_serializer._schema_name) == 'test_subj-value'
207+
208+
209+
def test_avro_serializer_schema_loads_union(load_avsc):
210+
"""
211+
Ensures union types are correctly parsed
212+
"""
213+
conf = {'url': TEST_URL}
214+
test_client = SchemaRegistryClient(conf)
215+
test_serializer = AvroSerializer(test_client,
216+
load_avsc('union_schema.avsc'))
217+
218+
assert test_serializer._schema_name is None
219+
220+
schema = test_serializer._parsed_schema
221+
assert isinstance(schema, list)
222+
assert schema[0]["name"] == "RecordOne"
223+
assert schema[1]["name"] == "RecordTwo"

0 commit comments

Comments
 (0)