Skip to content

Commit 0724b08

Browse files
committed
Add support for JSON Schemas in Confluent Schema Registry
1 parent 2bd6f4b commit 0724b08

File tree

3 files changed

+100
-19
lines changed

3 files changed

+100
-19
lines changed

nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
<groupId>org.apache.nifi</groupId>
3030
<artifactId>nifi-record</artifactId>
3131
</dependency>
32+
<dependency>
33+
<groupId>org.apache.nifi</groupId>
34+
<artifactId>nifi-json-schema-shared</artifactId>
35+
<version>${project.version}</version>
36+
</dependency>
3237
<dependency>
3338
<groupId>com.github.ben-manes.caffeine</groupId>
3439
<artifactId>caffeine</artifactId>

nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222
import org.apache.avro.SchemaParseException;
2323
import org.apache.commons.lang3.StringUtils;
2424
import org.apache.nifi.avro.AvroTypeUtil;
25+
import org.apache.nifi.json.schema.record.JsonSchemaToRecordSchemaConverter;
2526
import org.apache.nifi.logging.ComponentLog;
2627
import org.apache.nifi.schema.access.SchemaNotFoundException;
2728
import org.apache.nifi.schemaregistry.services.SchemaDefinition;
2829
import org.apache.nifi.schemaregistry.services.StandardSchemaDefinition;
30+
import org.apache.nifi.serialization.SimpleRecordSchema;
2931
import org.apache.nifi.serialization.record.RecordSchema;
3032
import org.apache.nifi.serialization.record.SchemaIdentifier;
3133
import org.apache.nifi.ssl.SSLContextProvider;
@@ -92,6 +94,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
9294
private static final String APPLICATION_JSON_CONTENT_TYPE = "application/json";
9395
private static final String BASIC_CREDENTIALS_FORMAT = "%s:%s";
9496
private static final String BASIC_AUTHORIZATION_FORMAT = "Basic %s";
97+
private final JsonSchemaToRecordSchemaConverter jsonSchemaConverter = new JsonSchemaToRecordSchemaConverter();
9598

9699
public RestSchemaRegistryClient(final List<String> baseUrls,
97100
final int timeoutMillis,
@@ -178,6 +181,7 @@ public RecordSchema getSchema(final int schemaId) throws SchemaNotFoundException
178181
// GET /schemas/ids/{int: id}
179182
final String schemaPath = getSchemaPath(schemaId);
180183
final JsonNode schemaJson = fetchJsonResponse(schemaPath, "id " + schemaId);
184+
final SchemaType schemaType = extractSchemaType(schemaJson);
181185

182186
// Get subject name by id, works only with v5.3.1+ Confluent Schema Registry
183187
// GET /schemas/ids/{int: id}/subjects
@@ -224,7 +228,7 @@ public RecordSchema getSchema(final int schemaId) throws SchemaNotFoundException
224228
}
225229

226230
if (subjectName != null) {
227-
return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
231+
return createRecordSchema(subjectName, maxVersion, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText(), schemaType);
228232
}
229233
}
230234
} catch (SchemaNotFoundException e) {
@@ -254,7 +258,7 @@ public RecordSchema getSchema(final int schemaId) throws SchemaNotFoundException
254258
// At this point, we could not get a subject/version associated to the schema and its ID
255259
// we add the schema and its ID in the cache without a subject/version
256260
if (completeSchema == null) {
257-
return createRecordSchema(null, null, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText());
261+
return createRecordSchema(null, null, schemaId, schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText(), schemaType);
258262
}
259263

260264
return createRecordSchema(completeSchema);
@@ -286,8 +290,7 @@ public SchemaDefinition getSchemaDefinition(SchemaIdentifier identifier) throws
286290

287291
// Extract schema information
288292
String schemaText = schemaJson.get(SCHEMA_TEXT_FIELD_NAME).asText();
289-
String schemaTypeText = schemaJson.get(SCHEMA_TYPE_FIELD_NAME).asText();
290-
SchemaType schemaType = toSchemaType(schemaTypeText);
293+
SchemaType schemaType = extractSchemaType(schemaJson);
291294

292295
long schemaId;
293296
if (schemaJson.has(ID_FIELD_NAME)) {
@@ -333,6 +336,13 @@ public SchemaDefinition getSchemaDefinition(SchemaIdentifier identifier) throws
333336
return new StandardSchemaDefinition(schemaIdentifier, schemaText, schemaType, references);
334337
}
335338

339+
private SchemaType extractSchemaType(final JsonNode schemaNode) {
340+
if (schemaNode != null && schemaNode.hasNonNull(SCHEMA_TYPE_FIELD_NAME)) {
341+
return toSchemaType(schemaNode.get(SCHEMA_TYPE_FIELD_NAME).asText());
342+
}
343+
return SchemaType.AVRO;
344+
}
345+
336346
private SchemaType toSchemaType(final String schemaTypeText) {
337347
try {
338348
if (schemaTypeText == null || schemaTypeText.isEmpty()) {
@@ -345,33 +355,59 @@ private SchemaType toSchemaType(final String schemaTypeText) {
345355
}
346356
}
347357

348-
private RecordSchema createRecordSchema(final String name, final Integer version, final int id, final String schema) throws SchemaNotFoundException {
349-
try {
350-
final Schema avroSchema = new Schema.Parser().parse(schema);
351-
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(name).id((long) id).version(version).build();
352-
return AvroTypeUtil.createSchema(avroSchema, schema, schemaId);
353-
} catch (final SchemaParseException spe) {
354-
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name
355-
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
356-
}
358+
private RecordSchema createRecordSchema(final String name, final Integer version, final int id, final String schema, final SchemaType schemaType) throws SchemaNotFoundException {
359+
return switch (schemaType) {
360+
case AVRO -> createAvroRecordSchema(name, version, id, schema);
361+
case JSON -> createJsonRecordSchema(name, version, id, schema);
362+
default -> throw new SchemaNotFoundException("Schema type " + schemaType + " is not supported for NiFi RecordSchemas");
363+
};
357364
}
358365

359366
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException {
360367
final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText();
361368
final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
362369
final int id = schemaNode.get(ID_FIELD_NAME).asInt();
363370
final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).asText();
371+
final SchemaType schemaType = extractSchemaType(schemaNode);
372+
373+
return createRecordSchema(subject, version, id, schemaText, schemaType);
374+
}
364375

376+
private RecordSchema createAvroRecordSchema(final String name, final Integer version, final int id, final String schema) throws SchemaNotFoundException {
365377
try {
366-
final Schema avroSchema = new Schema.Parser().parse(schemaText);
367-
final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
368-
return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
378+
final Schema avroSchema = new Schema.Parser().parse(schema);
379+
final SchemaIdentifier schemaId = buildSchemaIdentifier(name, version, id);
380+
return AvroTypeUtil.createSchema(avroSchema, schema, schemaId);
369381
} catch (final SchemaParseException spe) {
370-
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject
371-
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema");
382+
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name
383+
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema", spe);
384+
}
385+
}
386+
387+
private RecordSchema createJsonRecordSchema(final String name, final Integer version, final int id, final String schema) throws SchemaNotFoundException {
388+
try {
389+
final RecordSchema converted = jsonSchemaConverter.convert(schema);
390+
final SchemaIdentifier schemaIdentifier = buildSchemaIdentifier(name, version, id);
391+
final String schemaFormat = converted.getSchemaFormat().orElse("json-schema");
392+
final SimpleRecordSchema schemaWithId = new SimpleRecordSchema(converted.getFields(), schema, schemaFormat, schemaIdentifier);
393+
converted.getSchemaName().ifPresent(schemaWithId::setSchemaName);
394+
converted.getSchemaNamespace().ifPresent(schemaWithId::setSchemaNamespace);
395+
schemaWithId.setRecordValidators(converted.getRecordValidators());
396+
return schemaWithId;
397+
} catch (final IllegalArgumentException e) {
398+
throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + name
399+
+ " from Confluent Schema Registry but the Schema Text that was returned is not a valid JSON Schema", e);
372400
}
373401
}
374402

403+
private SchemaIdentifier buildSchemaIdentifier(final String name, final Integer version, final int id) {
404+
return SchemaIdentifier.builder()
405+
.name(name)
406+
.id((long) id)
407+
.version(version)
408+
.build();
409+
}
410+
375411
private String getSubjectPath(final String schemaName, final Integer schemaVersion) {
376412
return "/subjects/" + URLEncoder.encode(schemaName, StandardCharsets.UTF_8) + "/versions/" +
377413
(schemaVersion == null ? "latest" : URLEncoder.encode(String.valueOf(schemaVersion), StandardCharsets.UTF_8));

nifi-extension-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClientTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,18 @@ enum Status {
122122
PENDING = 3;
123123
}""";
124124

125+
private static final String JSON_SCHEMA_TEXT = """
126+
{
127+
"$id": "urn:nifi:test:user",
128+
"title": "User",
129+
"type": "object",
130+
"properties": {
131+
"id": { "type": "integer" },
132+
"name": { "type": "string" }
133+
},
134+
"required": ["id"]
135+
}""";
136+
125137
private MockWebServer mockWebServer;
126138
private String baseUrl;
127139
private RestSchemaRegistryClient client;
@@ -332,6 +344,27 @@ void testGetSchemaByIdInvalidAvroSchema() throws JsonProcessingException, Interr
332344
verifyRequest("GET", "/subjects");
333345
}
334346

347+
@Test
348+
void testGetSchemaByIdJsonSchema() throws IOException, SchemaNotFoundException, InterruptedException {
349+
enqueueJsonSchemaByIdResponse(JSON_SCHEMA_TEXT);
350+
enqueueNotFoundResponse();
351+
enqueueNotFoundResponse();
352+
enqueueNotFoundResponse();
353+
354+
final RecordSchema schema = client.getSchema(SCHEMA_ID);
355+
356+
assertNotNull(schema);
357+
assertTrue(schema.getSchemaFormat().isPresent());
358+
assertEquals("json-schema", schema.getSchemaFormat().get());
359+
assertTrue(schema.getField("id").isPresent());
360+
assertTrue(schema.getField("name").isPresent());
361+
362+
verifyRequest("GET", "/schemas/ids/" + SCHEMA_ID);
363+
verifyRequest("GET", "/schemas/ids/" + SCHEMA_ID + "/subjects");
364+
verifyRequest("GET", "/schemas/ids/" + SCHEMA_ID + "/versions");
365+
verifyRequest("GET", "/subjects");
366+
}
367+
335368
@Test
336369
void testGetSchemaDefinitionWithProtobufAndReferences() throws IOException, SchemaNotFoundException, InterruptedException {
337370
/*
@@ -414,6 +447,13 @@ private void enqueueSchemaByIdResponse(String schemaText) throws JsonProcessingE
414447
mockWebServer.enqueue(new MockResponse.Builder().code(200).addHeader("Content-Type", CONTENT_TYPE).body(jsonResponse).build());
415448
}
416449

450+
private void enqueueJsonSchemaByIdResponse(String schemaText) throws JsonProcessingException {
451+
SchemaResponse response = new SchemaResponse(schemaText, "JSON", List.of());
452+
String jsonResponse = objectMapper.writeValueAsString(response);
453+
454+
mockWebServer.enqueue(new MockResponse.Builder().code(200).addHeader("Content-Type", CONTENT_TYPE).body(jsonResponse).build());
455+
}
456+
417457
private void enqueueSchemaByIdResponseWithReferences(String schemaText, List<SchemaReference> references) throws JsonProcessingException {
418458
SchemaResponse response = new SchemaResponse(schemaText, PROTOBUF, references);
419459
String jsonResponse = objectMapper.writeValueAsString(response);
@@ -471,4 +511,4 @@ private RecordedRequest verifyRequest(String method, String expectedPath) throws
471511
assertEquals(expectedPath, request.getTarget());
472512
return request;
473513
}
474-
}
514+
}

0 commit comments

Comments
 (0)