Skip to content

Commit 17e9758

Browse files
tball-devJerome Revillard
authored andcommitted
feat: added ability to manage schema registry
1 parent 8f3afc9 commit 17e9758

25 files changed

+578
-8
lines changed

build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ sourceCompatibility = 1.8
1919

2020
repositories {
2121
mavenCentral()
22+
maven {
23+
url "https://packages.confluent.io/maven/"
24+
}
2225
}
2326

2427
dependencies {
@@ -28,6 +31,9 @@ dependencies {
2831
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.2"
2932
compile 'info.picocli:picocli:4.1.4'
3033

34+
implementation ('io.confluent:kafka-schema-registry-client:6.1.1')
35+
implementation('com.flipkart.zjsonpatch:zjsonpatch:0.4.11')
36+
3137
compile 'org.slf4j:slf4j-api:1.7.30'
3238
compile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
3339
compile group: 'ch.qos.logback', name: 'logback-core', version: '1.2.3'

src/main/java/com/devshawn/kafka/gitops/StateManager.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import ch.qos.logback.classic.Level;
44
import ch.qos.logback.classic.Logger;
5-
import com.devshawn.kafka.gitops.config.KafkaGitopsConfig;
65
import com.devshawn.kafka.gitops.config.KafkaGitopsConfigLoader;
76
import com.devshawn.kafka.gitops.config.ManagerConfig;
87
import com.devshawn.kafka.gitops.domain.confluent.ServiceAccount;
@@ -61,19 +60,20 @@ public StateManager(ManagerConfig managerConfig, ParserService parserService) {
6160
initializeLogger(managerConfig.isVerboseRequested());
6261
this.managerConfig = managerConfig;
6362
this.objectMapper = initializeObjectMapper();
64-
KafkaGitopsConfig config = KafkaGitopsConfigLoader.load();
65-
this.kafkaService = new KafkaService(config);
63+
this.kafkaService = new KafkaService(KafkaGitopsConfigLoader.load());
64+
this.schemaRegistryService = new SchemaRegistryService(SchemaRegistryConfigLoader.load());
6665
this.parserService = parserService;
6766
this.roleService = new RoleService();
6867
this.confluentCloudService = new ConfluentCloudService(objectMapper);
69-
this.planManager = new PlanManager(managerConfig, kafkaService, objectMapper);
70-
this.applyManager = new ApplyManager(managerConfig, kafkaService);
68+
this.planManager = new PlanManager(managerConfig, kafkaService, schemaRegistryService, objectMapper);
69+
this.applyManager = new ApplyManager(managerConfig, kafkaService, schemaRegistryService);
7170
}
7271

7372
public DesiredStateFile getAndValidateStateFile() {
7473
DesiredStateFile desiredStateFile = parserService.parseStateFile();
7574
validateTopics(desiredStateFile);
7675
validateCustomAcls(desiredStateFile);
76+
validateSchemas(desiredStateFile);
7777
this.describeAclEnabled = StateUtil.isDescribeTopicAclEnabled(desiredStateFile);
7878
return desiredStateFile;
7979
}
@@ -92,6 +92,7 @@ private DesiredPlan generatePlan() {
9292
planManager.planAcls(desiredState, desiredPlan);
9393
}
9494
planManager.planTopics(desiredState, desiredPlan);
95+
planManager.planSchemas(desiredState, desiredPlan);
9596
return desiredPlan.build();
9697
}
9798

@@ -107,6 +108,7 @@ public DesiredPlan apply() {
107108
if (!managerConfig.isSkipAclsDisabled()) {
108109
applyManager.applyAcls(desiredPlan);
109110
}
111+
applyManager.applySchemas(desiredPlan);
110112

111113
return desiredPlan;
112114
}
@@ -147,6 +149,7 @@ private DesiredState getDesiredState() {
147149
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
148150

149151
generateTopicsState(desiredState, desiredStateFile);
152+
generateSchemasState(desiredState, desiredStateFile);
150153

151154
if (isConfluentCloudEnabled(desiredStateFile)) {
152155
generateConfluentCloudServiceAcls(desiredState, desiredStateFile);
@@ -171,6 +174,10 @@ private void generateTopicsState(DesiredState.Builder desiredState, DesiredState
171174
}
172175
}
173176

177+
private void generateSchemasState(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
178+
desiredState.putAllSchemas(desiredStateFile.getSchemas());
179+
}
180+
174181
private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
175182
List<ServiceAccount> serviceAccounts = confluentCloudService.getServiceAccounts();
176183
desiredStateFile.getServices().forEach((name, service) -> {
@@ -323,6 +330,47 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
323330
}
324331
}
325332

333+
private void validateSchemas(DesiredStateFile desiredStateFile) {
334+
if (!desiredStateFile.getSchemas().isEmpty()) {
335+
SchemaRegistryConfig schemaRegistryConfig = SchemaRegistryConfigLoader.load();
336+
desiredStateFile.getSchemas().forEach((s, schemaDetails) -> {
337+
if (!schemaDetails.getType().equalsIgnoreCase("Avro")) {
338+
throw new ValidationException(String.format("Schema type %s is currently not supported.", schemaDetails.getType()));
339+
}
340+
if (!Files.exists(Paths.get(schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY") + "/" + schemaDetails.getFile()))) {
341+
throw new ValidationException(String.format("Schema file %s not found in schema directory at %s", schemaDetails.getFile(), schemaRegistryConfig.getConfig().get("SCHEMA_DIRECTORY")));
342+
}
343+
if (schemaDetails.getType().equalsIgnoreCase("Avro")) {
344+
AvroSchemaProvider avroSchemaProvider = new AvroSchemaProvider();
345+
if (schemaDetails.getReferences().isEmpty() && schemaDetails.getType().equalsIgnoreCase("Avro")) {
346+
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), Collections.emptyList());
347+
if (!parsedSchema.isPresent()) {
348+
throw new ValidationException(String.format("Avro schema %s could not be parsed.", schemaDetails.getFile()));
349+
}
350+
} else {
351+
List<SchemaReference> schemaReferences = new ArrayList<>();
352+
schemaDetails.getReferences().forEach(referenceDetails -> {
353+
SchemaReference schemaReference = new SchemaReference(referenceDetails.getName(), referenceDetails.getSubject(), referenceDetails.getVersion());
354+
schemaReferences.add(schemaReference);
355+
});
356+
// we need to pass a schema registry client as a config because the underlying code validates against the current state
357+
avroSchemaProvider.configure(Collections.singletonMap(SchemaProvider.SCHEMA_VERSION_FETCHER_CONFIG, schemaRegistryService.createSchemaRegistryClient()));
358+
try {
359+
Optional<ParsedSchema> parsedSchema = avroSchemaProvider.parseSchema(schemaRegistryService.loadSchemaFromDisk(schemaDetails.getFile()), schemaReferences);
360+
if (!parsedSchema.isPresent()) {
361+
throw new ValidationException(String.format("Avro schema %s could not be parsed.", schemaDetails.getFile()));
362+
}
363+
} catch (IllegalStateException ex) {
364+
throw new ValidationException(String.format("Reference validation error: %s", ex.getMessage()));
365+
} catch (RuntimeException ex) {
366+
throw new ValidationException(String.format("Error thrown when attempting to validate schema with reference", ex.getMessage()));
367+
}
368+
}
369+
}
370+
});
371+
}
372+
}
373+
326374
private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
327375
if (desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getCcloud().isPresent()) {
328376
return desiredStateFile.getSettings().get().getCcloud().get().isEnabled();

src/main/java/com/devshawn/kafka/gitops/cli/ApplyCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
99
import com.devshawn.kafka.gitops.exception.PlanIsUpToDateException;
1010
import com.devshawn.kafka.gitops.exception.ReadPlanInputException;
11+
import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException;
1112
import com.devshawn.kafka.gitops.exception.ValidationException;
1213
import com.devshawn.kafka.gitops.service.ParserService;
1314
import com.devshawn.kafka.gitops.util.LogUtil;
@@ -45,6 +46,8 @@ public Integer call() {
4546
LogUtil.printValidationResult(ex.getMessage(), false);
4647
} catch (KafkaExecutionException ex) {
4748
LogUtil.printKafkaExecutionError(ex, true);
49+
} catch (SchemaRegistryExecutionException ex) {
50+
LogUtil.printSchemaRegistryExecutionError(ex, true);
4851
}
4952
return 2;
5053
}

src/main/java/com/devshawn/kafka/gitops/cli/PlanCommand.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.devshawn.kafka.gitops.exception.KafkaExecutionException;
88
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
99
import com.devshawn.kafka.gitops.exception.PlanIsUpToDateException;
10+
import com.devshawn.kafka.gitops.exception.SchemaRegistryExecutionException;
1011
import com.devshawn.kafka.gitops.exception.ValidationException;
1112
import com.devshawn.kafka.gitops.exception.WritePlanOutputException;
1213
import com.devshawn.kafka.gitops.service.ParserService;
@@ -49,6 +50,8 @@ public Integer call() {
4950
LogUtil.printKafkaExecutionError(ex);
5051
} catch (WritePlanOutputException ex) {
5152
LogUtil.printPlanOutputError(ex);
53+
} catch (SchemaRegistryExecutionException ex) {
54+
LogUtil.printSchemaRegistryExecutionError(ex);
5255
}
5356
return 2;
5457
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.devshawn.kafka.gitops.config;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Map;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SchemaRegistryConfig.Builder.class)
10+
public interface SchemaRegistryConfig {
11+
12+
Map<String, Object> getConfig();
13+
14+
class Builder extends SchemaRegistryConfig_Builder {
15+
16+
}
17+
18+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.devshawn.kafka.gitops.config;
2+
3+
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
4+
import com.devshawn.kafka.gitops.exception.MissingMultipleConfigurationException;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
11+
public class SchemaRegistryConfigLoader {
12+
13+
private static org.slf4j.Logger log = LoggerFactory.getLogger(SchemaRegistryConfigLoader.class);
14+
15+
public static SchemaRegistryConfig load() {
16+
SchemaRegistryConfig.Builder builder = new SchemaRegistryConfig.Builder();
17+
setConfig(builder);
18+
return builder.build();
19+
}
20+
21+
private static void setConfig(SchemaRegistryConfig.Builder builder) {
22+
Map<String, Object> config = new HashMap<>();
23+
AtomicReference<String> username = new AtomicReference<>();
24+
AtomicReference<String> password = new AtomicReference<>();
25+
26+
Map<String, String> environment = System.getenv();
27+
28+
environment.forEach((key, value) -> {
29+
if (key.equals("SCHEMA_REGISTRY_SASL_JAAS_USERNAME")) {
30+
username.set(value);
31+
} else if (key.equals("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD")) {
32+
password.set(value);
33+
} else if (key.equals("SCHEMA_REGISTRY_URL")) {
34+
config.put("SCHEMA_REGISTRY_URL", value);
35+
} else if (key.equals("SCHEMA_DIRECTORY")) {
36+
config.put("SCHEMA_DIRECTORY", value);
37+
}
38+
});
39+
40+
handleDefaultConfig(config);
41+
handleAuthentication(username, password, config);
42+
43+
log.info("Schema Registry Config: {}", config);
44+
45+
builder.putAllConfig(config);
46+
}
47+
48+
private static void handleDefaultConfig(Map<String, Object> config) {
49+
final String DEFAULT_URL = "http://localhost:8081";
50+
final String CURRENT_WORKING_DIR = System.getProperty("user.dir");
51+
if (!config.containsKey("SCHEMA_REGISTRY_URL")) {
52+
log.info("SCHEMA_REGISTRY_URL not set. Using default value of {}", DEFAULT_URL);
53+
config.put("SCHEMA_REGISTRY_URL", DEFAULT_URL);
54+
}
55+
if (!config.containsKey("SCHEMA_DIRECTORY")) {
56+
log.info("SCHEMA_DIRECTORY not set. Defaulting to current working directory: {}", CURRENT_WORKING_DIR);
57+
config.put("SCHEMA_DIRECTORY", CURRENT_WORKING_DIR);
58+
}
59+
}
60+
61+
private static void handleAuthentication(AtomicReference<String> username, AtomicReference<String> password, Map<String, Object> config) {
62+
if (username.get() != null && password.get() != null) {
63+
String loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule";
64+
String value = String.format("%s required username=\"%s\" password=\"%s\";",
65+
loginModule, escape(username.get()), escape(password.get()));
66+
config.put("SCHEMA_REGISTRY_SASL_CONFIG", value);
67+
} else if (username.get() != null) {
68+
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
69+
} else if (password.get() != null) {
70+
throw new MissingConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD");
71+
} else if (username.get() == null & password.get() == null) {
72+
throw new MissingMultipleConfigurationException("SCHEMA_REGISTRY_SASL_JAAS_PASSWORD", "SCHEMA_REGISTRY_SASL_JAAS_USERNAME");
73+
}
74+
}
75+
76+
private static String escape(String value) {
77+
if (value != null) {
78+
return value.replace("\"", "\\\"");
79+
}
80+
return null;
81+
}
82+
}

src/main/java/com/devshawn/kafka/gitops/domain/plan/DesiredPlan.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@ public interface DesiredPlan {
1212

1313
List<TopicPlan> getTopicPlans();
1414

15+
List<SchemaPlan> getSchemaPlans();
16+
1517
List<AclPlan> getAclPlans();
1618

1719
default DesiredPlan toChangesOnlyPlan() {
1820
DesiredPlan.Builder builder = new DesiredPlan.Builder();
1921
getTopicPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).map(TopicPlan::toChangesOnlyPlan).forEach(builder::addTopicPlans);
2022
getAclPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addAclPlans);
23+
getSchemaPlans().stream().filter(it -> !it.getAction().equals(PlanAction.NO_CHANGE)).forEach(builder::addSchemaPlans);
2124
return builder.build();
2225
}
2326

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.devshawn.kafka.gitops.domain.plan;
2+
3+
import com.devshawn.kafka.gitops.domain.state.SchemaDetails;
4+
import com.devshawn.kafka.gitops.enums.PlanAction;
5+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
6+
import org.inferred.freebuilder.FreeBuilder;
7+
8+
import java.util.Optional;
9+
10+
@FreeBuilder
11+
@JsonDeserialize(builder = SchemaPlan.Builder.class)
12+
public interface SchemaPlan {
13+
14+
String getName();
15+
16+
PlanAction getAction();
17+
18+
Optional<SchemaDetails> getSchemaDetails();
19+
20+
class Builder extends SchemaPlan_Builder {
21+
}
22+
}

src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredState.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ public interface DesiredState {
1414

1515
Map<String, AclDetails> getAcls();
1616

17+
Map<String, SchemaDetails> getSchemas();
18+
1719
List<String> getPrefixedTopicsToIgnore();
1820

1921
class Builder extends DesiredState_Builder {

src/main/java/com/devshawn/kafka/gitops/domain/state/DesiredStateFile.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ public interface DesiredStateFile {
1818

1919
Map<String, TopicDetails> getTopics();
2020

21+
Map<String, SchemaDetails> getSchemas();
22+
2123
Map<String, UserDetails> getUsers();
2224

2325
Map<String, Map<String, CustomAclDetails>> getCustomServiceAcls();

0 commit comments

Comments
 (0)