Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] implement v2 with official confluent client #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@
:url "http://www.eclipse.org/legal/epl-v10.html"}

:dependencies [[org.clojure/clojure "1.9.0"]
[org.clojure/core.memoize "0.7.1"]
[com.damballa/abracad "0.4.13"]
[cheshire "5.8.1"]

[clj-http "3.9.1"]
[org.apache.kafka/kafka-clients "1.1.1" :exclusions [org.scala-lang/scala-library]]

;; v2
[org.akvo/kfk.avro-bridge "1.5.0b144984c335ea011dad0a891409710f2d0b75a0"]
[io.confluent/kafka-avro-serializer "5.0.0"]

[org.apache.kafka/kafka-clients "1.1.1"
:exclusions [org.scala-lang/scala-library]]
;; legacy
[cheshire "5.8.1"]
[clj-http "3.9.1"]
[com.damballa/abracad "0.4.13"]
[org.clojure/core.memoize "0.7.1"]
[org.clojure/tools.logging "0.4.1"]]

:aot [kafka-avro-confluent.serializers
kafka-avro-confluent.deserializers
kafka-avro-confluent.v2.common
kafka-avro-confluent.v2.serializer
kafka-avro-confluent.v2.deserializer]

Expand Down
24 changes: 24 additions & 0 deletions src/kafka_avro_confluent/v2/common.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(ns kafka-avro-confluent.v2.common
(:require [clojure.walk :as walk]))

(defn clj-config->confluent-config
[config]
(let [;; NOTE it supports the keys defined in kafka-avro-confluent.v2.specs
{:keys [schema-registry/base-url
schema-registry/username
schema-registry/password]
:as config} (walk/keywordize-keys config)
converted-keys (cond-> {:schema.registry.url base-url}
(and username password)
(assoc :basic.auth.credentials.source "USER_INFO"
:basic.auth.user.info (str username ":" password)))
;; ... but the official confluent config will take precedence if it is present
;; https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
confluent-keys (select-keys config
[:schema.registry.url
:basic.auth.credentials.source
:basic.auth.user.info])]
(walk/stringify-keys (merge converted-keys
confluent-keys))))


80 changes: 31 additions & 49 deletions src/kafka_avro_confluent/v2/deserializer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,54 @@
"Avro deserializer for Apache Kafka using Confluent's Schema Registry.
Use for deserializing Kafka keys and values.
See https://avro.apache.org/
See http://docs.confluent.io/current/schema-registry/docs
See https://github.com/damballa/abracad"
(:require [abracad.avro :as avro]
[clojure.spec.alpha :as s]
[kafka-avro-confluent.magic :as magic]
[kafka-avro-confluent.v2.schema-registry-client :as sr])
(:import java.nio.ByteBuffer
See http://docs.confluent.io/current/schema-registry/docs"
(:require [clojure.spec.alpha :as s]
[kafka-avro-confluent.v2.common :as kc]
[thdr.kfk.avro-bridge.core :as ab])
(:import io.confluent.kafka.serializers.KafkaAvroDeserializer
org.apache.kafka.common.serialization.Deserializer))
(require 'kafka-avro-confluent.v2.specs)

(defn #^"[B" byte-buffer->bytes
[^ByteBuffer buffer]
(let [array (byte-array (.remaining buffer))]
(.get buffer array)
array))

(defn- -deserialize*
[schema-registry data]
(when data
(let [buffer (ByteBuffer/wrap data)
magic (.get buffer)
_ (assert (= magic/magic magic) (str "Found different magic byte: " magic))
schema-id (.getInt buffer)
schema (sr/get-schema-by-id schema-registry schema-id)]
(avro/decode schema (byte-buffer->bytes buffer)))))

;;,------------
;;| Boilerplate
;;`------------
(require 'kafka-avro-confluent.v2.specs)

(gen-class
:name kafka_avro_confluent.v2.AvroDeserializer
:implements [org.apache.kafka.common.serialization.Deserializer]
:state state
:state confluent_deserializer
:init init
:main false
:methods [])

(defn- get-field [this key] (@(.state this) key))

(defn -init "Default, no arg constructor." [] [[] (atom nil)])
(defn -init "Default, no arg constructor." []
[[] (KafkaAvroDeserializer.)])

(s/fdef -configure
:args (s/cat :this some?
:config :kafka.serde/config
:_key? boolean?))
(defn -configure [this config _key?]
(reset! (.state this)
{:schema-registry-client (->> config
(s/conform :kafka.serde/config)
sr/->schema-registry-client)}))

:args (s/cat :this some?
:config :kafka.serde/config
:key? boolean?))
(defn -configure [this config key?]
(.configure (.confluent-deserializer this)
(kc/clj-config->confluent-config config)
key?))

(def ^:private clj-field-fn (comp keyword str))
(s/fdef -deserialize
:args (s/cat :this some?
:topic string?
:data bytes?))
(defn -deserialize [this _topic data]
(-deserialize* (get-field this :schema-registry-client) data))

;; TODO cleanup memo caches?
(defn -close [this])
:args (s/cat :this some?
:topic string?
:data bytes?))
(defn -deserialize [this topic data]
(let [avro-record (.deserialize (.confluent-deserializer this)
topic
data)]
(ab/->clj avro-record {:clj-field-fn clj-field-fn})))

(defn -close [this]
(-> this
.confluent-deserializer
.close))

(defn ->avro-deserializer
"Avro deserializer for Apache Kafka using Confluent's Schema Registry."
^kafka_avro_confluent.v2.AvroDeserializer
[config]
(doto (kafka_avro_confluent.v2.AvroDeserializer.)
;; NOTE .. `key?` is ignored in the deserializer;
(.configure config false)))
83 changes: 29 additions & 54 deletions src/kafka_avro_confluent/v2/serializer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,53 @@
Use for serializing Kafka keys and values.

See https://avro.apache.org/
See http://docs.confluent.io/current/schema-registry/docs
See https://github.com/damballa/abracad"
See http://docs.confluent.io/current/schema-registry/docs"
(:require [abracad.avro :as avro]
[clojure.spec.alpha :as s]
[kafka-avro-confluent.magic :as magic]
[kafka-avro-confluent.v2.schema-registry-client :as sr]
[kafka-avro-confluent.v2.specs :as ks])
(:import java.io.ByteArrayOutputStream
java.nio.ByteBuffer
[kafka-avro-confluent.v2.common :as kc]
[kafka-avro-confluent.v2.specs :as ks]
[thdr.kfk.avro-bridge.core :as ab])
(:import io.confluent.kafka.serializers.KafkaAvroSerializer
org.apache.kafka.common.serialization.Serializer))
(require 'kafka-avro-confluent.v2.specs)

(defn- #^"[B" schema-id->bytes [schema-id]
(-> (ByteBuffer/allocate 4)
(.putInt schema-id)
.array))

(defn- ->serialized-bytes [schema-id avro-schema data]
(with-open [out (ByteArrayOutputStream.)]
(.write out magic/magic)
(.write out (schema-id->bytes schema-id))
(.write out #^"[B" (avro/binary-encoded avro-schema data))
(.toByteArray out)))

(defn- -serialize*
[schema-registry key? topic avro-schema data]
(when data
(let [subject (format "%s-%s" topic (if key? "key" "value"))
schema-id (sr/post-schema schema-registry subject avro-schema)
serialized-bytes (->serialized-bytes schema-id avro-schema data)]
serialized-bytes)))

;;,------------
;;| Boilerplate
;;`------------
(require 'kafka-avro-confluent.v2.specs)

(gen-class
:name kafka_avro_confluent.v2.AvroSerializer
:implements [org.apache.kafka.common.serialization.Serializer]
:state state
:state confluent_serializer
:init init
:main false
:methods [])

(defn- get-field [this key] (@(.state this) key))

(defn -init "Default, no arg constructor." [] [[] (atom nil)])
(defn -init "Default, no arg constructor." []
[[] (KafkaAvroSerializer.)])

(s/fdef -configure
:args (s/cat :this some?
:config :kafka.serde/config
:key? boolean?))
:args (s/cat :this some?
:config :kafka.serde/config
:key? boolean?))
(defn -configure [this config key?]
(reset! (.state this)
{:schema-registry-client (->> config
(s/conform :kafka.serde/config)
sr/->schema-registry-client)
:key? key?}))
(.configure (.confluent-serializer this)
(kc/clj-config->confluent-config config)
(boolean key?)))

(s/fdef -serialize
:args (s/cat :this some?
:topic string?
:avro-record ::ks/avro-record))
:args (s/cat :this some?
:topic string?
:avro-record ::ks/avro-record))
(defn -serialize [this topic avro-record]
(-serialize* (get-field this :schema-registry-client)
(get-field this :key?)
topic
(:schema avro-record)
(:value avro-record)))

;; TODO cleanup memo caches?
(defn -close [this])
(let [avro-record (ab/->java (avro/parse-schema (:schema avro-record))
(:value avro-record)
{:java-field-fn name})]
(.serialize (.confluent-serializer this)
topic
avro-record)))

(defn -close [this]
(-> this
.confluent-serializer
.close))

(defn ->avro-serializer
"Avro serializer for Apache Kafka using Confluent's Schema Registry."
Expand Down
18 changes: 12 additions & 6 deletions src/kafka_avro_confluent/v2/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@
(s/def :schema-registry/username ::non-blank-string)
(s/def :schema-registry/password ::non-blank-string)

(s/def :confluent/schema.registry.url ::non-blank-string)
(s/def :confluent/basic.auth.credentials.source ::non-blank-string)
(s/def :confluent/basic.auth.user.info ::non-blank-string)

(s/def :kafka.serde/config
(s/and (s/conformer #(try
(->> %
(into {})
w/keywordize-keys)
(w/keywordize-keys %)
(catch Exception ex
:clojure.spec.alpha/invalid)))
(s/keys :req [:schema-registry/base-url]
:opt [:schema-registry/username
:schema-registry/password])))
(s/or :clj (s/keys :req [:schema-registry/base-url]
:opt [:schema-registry/username
:schema-registry/password])
:confluent
(s/keys :req-un [:confluent/schema.registry.url]
:opt-un [:confluent/basic.auth.credentials.source
:confluent/basic.auth.user.info]))))

(s/def :avro-record/schema any?)
(s/def :avro-record/value any?)
Expand Down
21 changes: 21 additions & 0 deletions test/kafka_avro_confluent/v2/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,26 @@
(def schema-registry-client
(sut-reg/->schema-registry-client config))

(deftest init-close-test
(testing "can initialize and close"
(with-open [ser (kafka_avro_confluent.v2.AvroSerializer.)
des (kafka_avro_confluent.v2.AvroDeserializer.)]
(is ser)
(is des))))

(deftest raw-confluent-config-test
;; https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
(testing "supports official config keys"
(let [config {:schema.registry.url "http://localhost:8081,http://foohost.com:8081"
:basic.auth.credentials.source "USER_INFO"
:basic.auth.user.info "user:pass"}]
(with-open [ser (kafka_avro_confluent.v2.AvroSerializer.)
des (kafka_avro_confluent.v2.AvroDeserializer.)]
(.configure ser config false)
(.configure des config false)
(is ser)
(is des)))))

(deftest avro-serde
(testing "Can round-trip"
(let [serializer (sut-ser/->avro-serializer config)
Expand All @@ -41,6 +61,7 @@
(is (sut-reg/get-latest-schema-by-subject schema-registry-client
(str topic "-value")))))))


(deftest avro-serde-with-key?=true-test
(let [serializer (sut-ser/->avro-serializer config :key? true)
deserializer (sut-des/->avro-deserializer config)
Expand Down