Skip to content

Commit b85c6e3

Browse files
committed
Merge master
2 parents bae70fe + 27ac1e1 commit b85c6e3

File tree

109 files changed

+12889
-1143
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+12889
-1143
lines changed

.github/pull_request_template.md

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<!--
2+
Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block.
3+
-->
4+
What
5+
----
6+
<!--
7+
Briefly describe **what** you have changed and **why**.
8+
Optionally include implementation strategy.
9+
-->
10+
11+
Checklist
12+
------------------
13+
- [ ] Contains customer facing changes? Including API/behavior changes <!-- This can help identify if it has introduced any breaking changes -->
14+
- [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
15+
- If not, please explain why it is not required
16+
17+
References
18+
----------
19+
JIRA:
20+
<!--
21+
Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations...
22+
For code bumps: link to PR, tag or GitHub `/compare/master...master`
23+
-->
24+
25+
Test & Review
26+
------------
27+
<!--
28+
Has it been tested? how?
29+
Copy&paste any handy instructions, steps or requirements that can save time to the reviewer or any reader.
30+
-->
31+
32+
Open questions / Follow-ups
33+
--------------------------
34+
<!--
35+
Optional: anything open to discussion for the reviewer, out of scope, or follow-ups.
36+
-->
37+
38+
<!--
39+
Review stakeholders
40+
------------------
41+
<!--
42+
Optional: mention stakeholders or if special context that is required to review.
43+
-->

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ tmp-KafkaCluster
2929
.venv
3030
venv_test
3131
venv_examples
32+
*Zone.Identifier

.semaphore/semaphore.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ execution_time_limit:
88
global_job_config:
99
env_vars:
1010
- name: LIBRDKAFKA_VERSION
11-
value: v2.6.0
11+
value: v2.6.1
1212
prologue:
1313
commands:
1414
- checkout

CHANGELOG.md

+25-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,34 @@
11
# Confluent's Python client for Apache Kafka
22

3+
## v2.6.2
4+
5+
v2.6.2 is a feature release with the following features, fixes and enhancements:
6+
7+
- Support for Data Contracts with Schema Registry, including
8+
- Data Quality rules
9+
- Data Transformation rules
10+
- Client-Side Field Level Encryption (CSFLE)
11+
- Schema Migration rules (requires Python 3.9+)
12+
- Migrated the Schema Registry client from requests to httpx
13+
- Add support for multiple URLs (#409)
14+
- Allow configuring timeout (#622)
15+
- Fix deletion semantics (#1127)
16+
- Python deserializer can take SR client (#1174)
17+
- Fix handling of Avro unions (#1562)
18+
- Remove deprecated RefResolver for JSON (#1840)
19+
- Support delete of subject version (#1851)
20+
21+
confluent-kafka-python is based on librdkafka v2.6.1, see the
22+
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.6.1)
23+
for a complete list of changes, enhancements, fixes and upgrade considerations.
24+
25+
326
## v2.6.1
427

528
v2.6.1 is a maintenance release with the following fixes and enhancements:
629

7-
- Migrated build system from `setup.py` to `pyproject.toml` in accordance with `PEP 517` and `PEP 518`, improving project configuration, build system requirements management, and compatibility with modern Python packaging tools like `pip` and `build`.
30+
- Migrated build system from `setup.py` to `pyproject.toml` in accordance with `PEP 517` and `PEP 518`, improving project configuration, build system requirements management, and compatibility with modern Python packaging tools like `pip` and `build`. (#1592)
31+
- Removed python 3.6 support. (#1592)
832
- Added an example for OAUTH OIDC producer with support for confluent cloud (#1769, @sarwarbhuiyan)
933

1034
confluent-kafka-python is based on librdkafka v2.6.1, see the

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ The `Producer`, `Consumer` and `AdminClient` are all thread safe.
134134
confluent-kafka using the instructions in the
135135
"Install from source" section below.
136136

137+
To use Schema Registry with the Avro serializer/deserializer:
138+
139+
$ pip install confluent-kafka[avro,schemaregistry]
140+
141+
To use Schema Registry with the JSON serializer/deserializer:
142+
143+
$ pip install confluent-kafka[json,schemaregistry]
144+
145+
To use Schema Registry with the Protobuf serializer/deserializer:
146+
147+
$ pip install confluent-kafka[protobuf,schemaregistry]
148+
149+
When using Data Contract rules (including CSFLE) add the `rules`extra, e.g.:
150+
151+
$ pip install confluent-kafka[avro,schemaregistry,rules]
152+
137153
**Install from source**
138154

139155
For source install, see the *Install from source* section in [INSTALL.md](INSTALL.md).

docs/conf.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
# built documents.
2828
#
2929
# The short X.Y version.
30-
version = '2.6.0'
30+
version = '2.6.2'
3131
# The full version, including alpha/beta/rc tags.
3232
release = version
3333
######################################################################

examples/avro/user_generic.avsc

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"fields": [
55
{
66
"name": "name",
7-
"type": "string"
7+
"type": "string",
8+
"confluent:tags": ["PII"]
89
},
910
{
1011
"name": "favorite_number",

examples/avro/user_specific.avsc

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
"fields": [
66
{
77
"name": "name",
8-
"type": "string"
8+
"type": "string",
9+
"confluent:tags": ["PII"]
910
},
1011
{
1112
"name": "favorite_number",

examples/avro_consumer_encryption.py

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2024 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
19+
# A simple example demonstrating use of AvroDeserializer.
20+
21+
import argparse
22+
23+
from confluent_kafka.schema_registry.rules.encryption.encrypt_executor import \
24+
FieldEncryptionExecutor
25+
26+
from confluent_kafka.schema_registry.rules.encryption.localkms.local_driver import \
27+
LocalKmsDriver
28+
29+
from confluent_kafka.schema_registry.rules.encryption.hcvault.hcvault_driver import \
30+
HcVaultKmsDriver
31+
32+
from confluent_kafka.schema_registry.rules.encryption.gcpkms.gcp_driver import \
33+
GcpKmsDriver
34+
35+
from confluent_kafka.schema_registry.rules.encryption.azurekms.azure_driver import \
36+
AzureKmsDriver
37+
38+
from confluent_kafka.schema_registry.rules.encryption.awskms.aws_driver import \
39+
AwsKmsDriver
40+
41+
from confluent_kafka import Consumer
42+
from confluent_kafka.serialization import SerializationContext, MessageField
43+
from confluent_kafka.schema_registry import SchemaRegistryClient
44+
from confluent_kafka.schema_registry.avro import AvroDeserializer
45+
46+
47+
class User(object):
48+
"""
49+
User record
50+
51+
Args:
52+
name (str): User's name
53+
54+
favorite_number (int): User's favorite number
55+
56+
favorite_color (str): User's favorite color
57+
"""
58+
59+
def __init__(self, name=None, favorite_number=None, favorite_color=None):
60+
self.name = name
61+
self.favorite_number = favorite_number
62+
self.favorite_color = favorite_color
63+
64+
65+
def dict_to_user(obj, ctx):
66+
"""
67+
Converts object literal(dict) to a User instance.
68+
69+
Args:
70+
obj (dict): Object literal(dict)
71+
72+
ctx (SerializationContext): Metadata pertaining to the serialization
73+
operation.
74+
"""
75+
76+
if obj is None:
77+
return None
78+
79+
return User(name=obj['name'],
80+
favorite_number=obj['favorite_number'],
81+
favorite_color=obj['favorite_color'])
82+
83+
84+
def main(args):
85+
# Register the KMS drivers and the field-level encryption executor
86+
AwsKmsDriver.register()
87+
AzureKmsDriver.register()
88+
GcpKmsDriver.register()
89+
HcVaultKmsDriver.register()
90+
LocalKmsDriver.register()
91+
FieldEncryptionExecutor.register()
92+
93+
topic = args.topic
94+
95+
# When using Data Contract rules, a schema should not be passed to the
96+
# AvroDeserializer. The schema is fetched from the Schema Registry.
97+
schema_str = None
98+
99+
sr_conf = {'url': args.schema_registry}
100+
schema_registry_client = SchemaRegistryClient(sr_conf)
101+
102+
avro_deserializer = AvroDeserializer(schema_registry_client,
103+
schema_str,
104+
dict_to_user)
105+
106+
consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
107+
'group.id': args.group,
108+
'auto.offset.reset': "earliest"}
109+
110+
consumer = Consumer(consumer_conf)
111+
consumer.subscribe([topic])
112+
113+
while True:
114+
try:
115+
# SIGINT can't be handled when polling, limit timeout to 1 second.
116+
msg = consumer.poll(1.0)
117+
if msg is None:
118+
continue
119+
120+
user = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
121+
if user is not None:
122+
print("User record {}: name: {}\n"
123+
"\tfavorite_number: {}\n"
124+
"\tfavorite_color: {}\n"
125+
.format(msg.key(), user.name,
126+
user.favorite_number,
127+
user.favorite_color))
128+
except KeyboardInterrupt:
129+
break
130+
131+
consumer.close()
132+
133+
134+
if __name__ == '__main__':
135+
parser = argparse.ArgumentParser(description="AvroDeserializer example")
136+
parser.add_argument('-b', dest="bootstrap_servers", required=True,
137+
help="Bootstrap broker(s) (host[:port])")
138+
parser.add_argument('-s', dest="schema_registry", required=True,
139+
help="Schema Registry (http(s)://host[:port]")
140+
parser.add_argument('-t', dest="topic", default="example_serde_avro",
141+
help="Topic name")
142+
parser.add_argument('-g', dest="group", default="example_serde_avro",
143+
help="Consumer group")
144+
145+
main(parser.parse_args())

0 commit comments

Comments
 (0)