-
-
Notifications
You must be signed in to change notification settings - Fork 192
BE: SR: Add compatibility w/ GCP SR #1153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
BE: SR: Add compatibility w/ GCP SR #1153
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi magnusdriver! 👋
Welcome, and thank you for opening your first PR in the repo!
Please wait for triaging by our maintainers.
Please take a look at our contributing guide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution! I believe it would be more effective to implement this in a more generic manner by exposing the BEARER_SCOPE and BEARER_AUTH_CUSTOM_PROVIDER_CLASS parameters through SchemaRegistryAuth. Don't you mind to change it?
api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java
Outdated
Show resolved
Hide resolved
|
I finally exposed the BEARER_AUTH_CUSTOM_PROVIDER_CLASS parameter. I didn't need the bearer scope for this use case. If you think it's still interesting to expose that parameter I'll add it. Maybe pass it to WebClientConfigurator.configureBearerTokenAuth as parameter for future use cases? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few inline comments. Once we decide on a higher level of abstractions, we can dive deep into reviewing the concrete implementations. Right now, it's a nailed-down solution not taking future maintainability (or possible expansion towards other implementations) into account.
| properties: | ||
| compatibilityLevel: | ||
| $ref: '#/components/schemas/Compatibility' | ||
| required: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has compatibility become no longer required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I finally added a new compatibilityConfigGcp schema detached from the existent compatibilityConfig. And I had to modify code in SchemaRegistryService class.
If this new modification is not ok then please let me know if you have any recommendation about how to address this as the only alternative I can think of is to add a new kafka-gcp-sr-api and a new GcpSchemaRegistryService class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure it's the best solution here:
- this breaks backward compatibility
- every other new value (AwsCompatibility, AzureCompatibility, etc) will expand this list and break the backward compatibility further
@germanosin what do you think?
| keyStorePassword); | ||
| } | ||
|
|
||
| if (bearerAuthCustomProviderClass != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this called a class? The implementation rather retrieves a token via custom implementation from GCP which is later used as a bearer token value, there are no "classes" in use per se.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use this to reference this class. And I changed the bearer token implementation for the WebClient using this class too. Sorry 🙏 .
| properties: | ||
| compatibilityLevel: | ||
| $ref: '#/components/schemas/Compatibility' | ||
| required: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure it's the best solution here:
- this breaks backward compatibility
- every other new value (AwsCompatibility, AzureCompatibility, etc) will expand this list and break the backward compatibility further
@germanosin what do you think?
| application/json: | ||
| schema: | ||
| $ref: '#/components/schemas/CompatibilityConfig' | ||
| oneOf: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach seems messy. If GCP SR uses a different API, we should define a separate contract for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for taking so long. But I finally managed to decouple both APIs. To make it work I decided to create a GcpSchemaRegistryService and a GcpKafkaSrMapper and make use of them in the SchemaController based on a KafkaCluster variable called "isGcpSchemaRegistry" (bool) initialized in the KafkaClusterFactory. I think with this approach would be easy to undo this changes if Google decides at some point to make their SchemaRegistry fully compliant with the standard SR API.
The Serdes remains as it was because the problematic "compatibility" field is not used there. For that component just the custom auth class is needed.
What changes did you make? (Give an overview)
Added new cluster property "schemaRegistryAuth.bearerAuthCustomProviderClass" to enable compatibility with new GCP Schema Registries.
This property can be used in config.yaml file like this:
Or can be enabled with the env variable:
KAFKA_CLUSTERS_0_SCHEMA_REGISTRY_AUTH_BEARER_AUTH_CUSTOM_PROVIDER_CLASS="com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"or
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_BEARERAUTHCUSTOMPROVIDERCLASSis also valid.Important changes in code are:
bearerAuthCustomProviderClassconfig property in kafbat-ui-apiproperty in SchemaRegistryService
It's needed to use
gcloud auth application-default loginto get the credentials to connect to GCP SchemaRegistries or run kafka-ui in a compute engine instance or GKE with a service account with the needed permissions.
Note: This functionality only works now with Avro schema types as these are the ones I work with.
It could work with Protobuf schemas, but not with JSON ones as GCP Schema Registries don't
support them.
How Has This Been Tested? (put an "x" (case-sensitive!) next to an item)
Manually (please, describe, if necessary)
Changes tested connecting to GCP and Confluent Schema Registries to check:
It was impossible to pass unit tests even before adding any change :'(
Checklist (put an "x" (case-sensitive!) next to all the items, otherwise the build will fail)
Check out Contributing and Code of Conduct