Skip to content

Commit 5b54aa6

Browse files
authored
Integrate Schema Registry with ducktape load tests (#2027)
* basic sr test * more tests * update * update * lint
1 parent 16d65e1 commit 5b54aa6

File tree

4 files changed

+284
-6
lines changed

4 files changed

+284
-6
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ venv_examples
3434
.coverage
3535
**/coverage.xml
3636
**/test-report.xml
37+
*.ducktape

tests/ducktape/README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@ Ducktape-based producer tests for the Confluent Kafka Python client with compreh
1313
# Run all tests with integrated performance metrics
1414
./tests/ducktape/run_ducktape_test.py
1515

16-
# Run specific test with metrics
17-
./tests/ducktape/run_ducktape_test.py SimpleProducerTest.test_basic_produce
16+
# Run all tests in a file
17+
./tests/ducktape/run_ducktape_test.py test_producer.py
18+
19+
# Run a specific test with metrics
20+
./tests/ducktape/run_ducktape_test.py test_producer.py SimpleProducerTest.test_basic_produce
1821
```
1922

2023
## Test Cases
@@ -70,4 +73,4 @@ bounds = MetricsBounds()
7073

7174
# Or load from a specific config file
7275
bounds = MetricsBounds.from_config_file("my_bounds.json")
73-
```
76+
```

tests/ducktape/run_ducktape_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def main():
4646

4747
# Get test file path (ducktape expects file paths, not module paths)
4848
test_dir = os.path.dirname(os.path.abspath(__file__))
49-
test_file = os.path.join(test_dir, "test_producer.py")
49+
test_file = os.path.join(test_dir, sys.argv[1])
5050

5151
if not os.path.exists(test_file):
5252
print(f"ERROR: Test file not found: {test_file}")
@@ -71,8 +71,8 @@ def main():
7171
]
7272

7373
# Add specific test if provided as argument
74-
if len(sys.argv) > 1:
75-
test_method = sys.argv[1]
74+
if len(sys.argv) > 2:
75+
test_method = sys.argv[2]
7676
cmd[-1] = f"{test_file}::{test_method}"
7777
print(f"Running specific test: {test_method}")
7878
else:
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import json
2+
import time
3+
from uuid import uuid4
4+
from ducktape.tests.test import Test
5+
6+
from confluent_kafka.schema_registry import SchemaRegistryClient
7+
from confluent_kafka.schema_registry._sync.json_schema import JSONSerializer
8+
from confluent_kafka.schema_registry._sync.protobuf import ProtobufSerializer
9+
from confluent_kafka.schema_registry.avro import AvroSerializer
10+
from confluent_kafka.serialization import MessageField, SerializationContext, StringSerializer
11+
from tests.ducktape.services.kafka import KafkaClient
12+
from tests.ducktape.benchmark_metrics import MetricsCollector, MetricsBounds, validate_metrics, print_metrics_report
13+
from tests.integration.schema_registry.data.proto import PublicTestProto_pb2
14+
from confluent_kafka import Producer
15+
16+
17+
class SimpleProducerTestWithSchemaRegistry(Test):
18+
"""Test producer functionality with Schema Registry integration"""
19+
20+
def __init__(self, test_context):
21+
super(SimpleProducerTestWithSchemaRegistry, self).__init__(test_context=test_context)
22+
23+
# Set up Kafka client and Schema Registry client
24+
self.kafka = KafkaClient(test_context, bootstrap_servers="localhost:9092")
25+
self.schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
26+
27+
def setUp(self):
28+
"""Set up test environment"""
29+
self.logger.info("Verifying connection to external Kafka at localhost:9092")
30+
31+
if not self.kafka.verify_connection():
32+
raise ConnectionError("Cannot connect to Kafka at localhost:9092. Please ensure Kafka is running.")
33+
34+
self.logger.info("Successfully connected to Kafka")
35+
36+
def calculate_and_verify_results(self, metrics_summary, bounds, serialization_type):
37+
"""Calculate throughput and verify results using comprehensive metrics"""
38+
is_valid, violations = validate_metrics(metrics_summary, bounds)
39+
40+
# Print comprehensive metrics report
41+
self.logger.info("%s serialization test with comprehensive metrics completed:", serialization_type)
42+
print_metrics_report(metrics_summary, is_valid, violations)
43+
44+
# Enhanced assertions using metrics
45+
assert metrics_summary['messages_sent'] > 0, "No messages were sent during test duration"
46+
assert metrics_summary['messages_delivered'] > 0, "No messages were delivered"
47+
assert metrics_summary['send_throughput_msg_per_sec'] > 10, \
48+
f"Send throughput too low: {metrics_summary['send_throughput_msg_per_sec']:.2f} msg/s " \
49+
f"(expected > 10 msg/s)"
50+
51+
# Validate against performance bounds
52+
if not is_valid:
53+
self.logger.warning("Performance bounds validation failed for %s: %s",
54+
serialization_type, "; ".join(violations))
55+
56+
self.logger.info("Successfully completed %s test with comprehensive metrics", serialization_type)
57+
58+
def produce_messages_with_serialization(self, producer, topic_name, serializer, string_serializer,
59+
test_duration, message_value_func, serialization_type):
60+
"""Produce messages using the given serializer with comprehensive metrics collection"""
61+
# Initialize metrics collection and bounds
62+
metrics = MetricsCollector()
63+
bounds = MetricsBounds()
64+
65+
# Track send times for latency calculation
66+
send_times = {}
67+
68+
def delivery_callback(err, msg):
69+
"""Enhanced delivery report callback with metrics tracking"""
70+
if err is not None:
71+
self.logger.error("Message delivery failed: %s", err)
72+
metrics.record_failed(topic=msg.topic() if msg else topic_name,
73+
partition=msg.partition() if msg else 0)
74+
else:
75+
# Calculate actual latency if we have send time
76+
msg_key = msg.key().decode('utf-8', errors='replace') if msg.key() else 'unknown'
77+
if msg_key in send_times:
78+
latency_ms = (time.time() - send_times[msg_key]) * 1000
79+
del send_times[msg_key] # Clean up
80+
else:
81+
latency_ms = 0.0 # Default latency if timing info not available
82+
83+
metrics.record_delivered(latency_ms, topic=msg.topic(), partition=msg.partition())
84+
85+
# Start metrics collection
86+
metrics.start()
87+
88+
self.logger.info("Producing messages with %s serialization and metrics for %.1f seconds to topic %s",
89+
serialization_type, test_duration, topic_name)
90+
start_time = time.time()
91+
messages_sent = 0
92+
93+
while time.time() - start_time < test_duration:
94+
message_value = message_value_func(messages_sent)
95+
message_key = str(uuid4())
96+
97+
try:
98+
# Calculate message size for metrics
99+
serialized_key = string_serializer(message_key)
100+
serialized_value = serializer(
101+
message_value, SerializationContext(topic_name, MessageField.VALUE)
102+
)
103+
message_size = len(serialized_key) + len(serialized_value)
104+
105+
# Record message being sent with metrics
106+
metrics.record_sent(message_size, topic=topic_name, partition=0)
107+
108+
# Track send time for latency calculation
109+
send_times[message_key] = time.time()
110+
111+
producer.produce(
112+
topic=topic_name,
113+
key=serialized_key,
114+
value=serialized_value,
115+
callback=delivery_callback
116+
)
117+
messages_sent += 1
118+
119+
# Poll frequently to trigger delivery callbacks and record poll operations
120+
if messages_sent % 100 == 0:
121+
producer.poll(0)
122+
metrics.record_poll()
123+
124+
except BufferError:
125+
# Record buffer full events and poll
126+
metrics.record_buffer_full()
127+
producer.poll(0.001)
128+
continue
129+
130+
# Flush to ensure all messages are sent
131+
self.logger.info("Flushing producer...")
132+
producer.flush(timeout=30)
133+
134+
# Finalize metrics collection
135+
metrics.finalize()
136+
137+
# Get comprehensive metrics summary and validate
138+
metrics_summary = metrics.get_summary()
139+
self.calculate_and_verify_results(metrics_summary, bounds, serialization_type)
140+
141+
def test_basic_produce_with_avro_serialization(self):
142+
"""Test producing messages with Avro serialization using Schema Registry"""
143+
topic_name = "test-topic-schema-registry"
144+
test_duration = 5.0 # 5 seconds
145+
146+
# Create topic
147+
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1)
148+
topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30)
149+
assert topic_ready, f"Topic {topic_name} was not created within timeout"
150+
151+
# Define Avro schema
152+
avro_schema = {
153+
"type": "record",
154+
"name": "User",
155+
"fields": [
156+
{"name": "name", "type": "string"},
157+
{"name": "age", "type": "int"}
158+
]
159+
}
160+
avro_schema_str = json.dumps(avro_schema)
161+
162+
# Create serializers
163+
string_serializer = StringSerializer('utf8')
164+
avro_serializer = AvroSerializer(schema_registry_client=self.schema_registry_client,
165+
schema_str=avro_schema_str)
166+
167+
# Configure producer
168+
producer_config = {
169+
'bootstrap.servers': self.kafka.bootstrap_servers(),
170+
'client.id': 'ducktape-test-producer-schema-registry',
171+
}
172+
173+
self.logger.info("Creating producer with config: %s", producer_config)
174+
producer = Producer(producer_config)
175+
176+
# Produce messages
177+
self.produce_messages_with_serialization(
178+
producer,
179+
topic_name,
180+
avro_serializer,
181+
string_serializer,
182+
test_duration,
183+
lambda messages_sent: {'name': f"User{messages_sent}", 'age': messages_sent},
184+
"Avro"
185+
)
186+
187+
def test_basic_produce_with_json_serialization(self):
188+
"""Test producing messages with JSON serialization using Schema Registry"""
189+
topic_name = "test-topic-json-serialization"
190+
test_duration = 5.0 # 5 seconds
191+
192+
# Create topic
193+
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1)
194+
topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30)
195+
assert topic_ready, f"Topic {topic_name} was not created within timeout"
196+
197+
# Define JSON schema
198+
json_schema = {
199+
"type": "object",
200+
"properties": {
201+
"name": {"type": "string"},
202+
"age": {"type": "integer"}
203+
},
204+
"required": ["name", "age"]
205+
}
206+
json_schema_str = json.dumps(json_schema)
207+
208+
# Create serializers
209+
string_serializer = StringSerializer('utf8')
210+
json_serializer = JSONSerializer(json_schema_str, self.schema_registry_client)
211+
212+
# Configure producer
213+
producer_config = {
214+
'bootstrap.servers': self.kafka.bootstrap_servers(),
215+
'client.id': 'ducktape-test-producer-json-serialization',
216+
}
217+
218+
self.logger.info("Creating producer with config: %s", producer_config)
219+
producer = Producer(producer_config)
220+
221+
# Produce messages
222+
self.produce_messages_with_serialization(
223+
producer,
224+
topic_name,
225+
json_serializer,
226+
string_serializer,
227+
test_duration,
228+
lambda messages_sent: {'name': f"User{messages_sent}", 'age': messages_sent},
229+
"JSON"
230+
)
231+
232+
def test_basic_produce_with_protobuf_serialization(self):
233+
"""Test producing messages with Protobuf serialization using Schema Registry"""
234+
topic_name = "test-topic-protobuf-serialization"
235+
test_duration = 5.0 # 5 seconds
236+
237+
# Create topic
238+
self.kafka.create_topic(topic_name, partitions=1, replication_factor=1)
239+
topic_ready = self.kafka.wait_for_topic(topic_name, max_wait_time=30)
240+
assert topic_ready, f"Topic {topic_name} was not created within timeout"
241+
242+
# Create serializers
243+
string_serializer = StringSerializer('utf8')
244+
protobuf_serializer = ProtobufSerializer(
245+
PublicTestProto_pb2.TestMessage, self.schema_registry_client
246+
)
247+
248+
# Configure producer
249+
producer_config = {
250+
'bootstrap.servers': self.kafka.bootstrap_servers(),
251+
'client.id': 'ducktape-test-producer-protobuf-serialization',
252+
}
253+
254+
self.logger.info("Creating producer with config: %s", producer_config)
255+
producer = Producer(producer_config)
256+
257+
# Produce messages
258+
self.produce_messages_with_serialization(
259+
producer,
260+
topic_name,
261+
protobuf_serializer,
262+
string_serializer,
263+
test_duration,
264+
lambda _: PublicTestProto_pb2.TestMessage(
265+
test_string="example string",
266+
test_bool=True,
267+
test_bytes=b'example bytes',
268+
test_double=1.0,
269+
test_float=12.0,
270+
test_fixed32=1,
271+
test_fixed64=1,
272+
),
273+
"Protobuf"
274+
)

0 commit comments

Comments
 (0)