Skip to content

Commit fcb0f7d

Browse files
committed
benches/avro_ingest: use kgen directly rather than kafka-avro-generator
kafka-avro-generator is going away soon. Use kgen directly instead.
1 parent 1e444ab commit fcb0f7d

File tree

1 file changed

+120
-19
lines changed

1 file changed

+120
-19
lines changed

misc/python/materialize/benches/avro_ingest.py

Lines changed: 120 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"""Ingest some Avro records, and report how long it takes"""
1111

1212
import argparse
13+
import json
1314
import os
1415
import time
1516
from typing import IO, NamedTuple
@@ -87,21 +88,14 @@ def main() -> None:
8788
type=int,
8889
help="Number of Avro records to generate",
8990
)
90-
parser.add_argument(
91-
"-d",
92-
"--distribution",
93-
default="benchmark",
94-
type=str,
95-
help="Distribution to use in kafka-avro-generator",
96-
)
9791
args = parser.parse_args()
9892

9993
os.chdir(ROOT)
10094
repo = mzbuild.Repository(ROOT)
10195

10296
wait_for_confluent(args.confluent_host)
10397

104-
images = ["kafka-avro-generator", "materialized"]
98+
images = ["kgen", "materialized"]
10599
deps = repo.resolve_dependencies([repo.images[name] for name in images])
106100
deps.acquire()
107101

@@ -114,18 +108,18 @@ def main() -> None:
114108
)
115109

116110
docker_client.containers.run(
117-
deps["kafka-avro-generator"].spec(),
111+
deps["kgen"].spec(),
118112
[
119-
"-n",
120-
str(args.records),
121-
"-b",
122-
f"{args.confluent_host}:9092",
123-
"-r",
124-
f"http://{args.confluent_host}:8081",
125-
"-t",
126-
"bench_data",
127-
"-d",
128-
args.distribution,
113+
f"--num-records={args.records}",
114+
f"--bootstrap-server={args.confluent_host}:9092",
115+
f"--schema-registry-url=http://{args.confluent_host}:8081",
116+
"--topic=bench_data",
117+
"--keys=avro",
118+
"--values=avro",
119+
f"--avro-schema={VALUE_SCHEMA}",
120+
f"--avro-distribution={VALUE_DISTRIBUTION}",
121+
f"--avro-key-schema={KEY_SCHEMA}",
122+
f"--avro-key-distribution={KEY_DISTRIBUTION}",
129123
],
130124
network_mode="host",
131125
)
@@ -158,5 +152,112 @@ def main() -> None:
158152
prev = print_stats(mz_container, prev, results_file)
159153

160154

155+
KEY_SCHEMA = json.dumps(
156+
{
157+
"name": "testrecordkey",
158+
"type": "record",
159+
"namespace": "com.acme.avro",
160+
"fields": [{"name": "Key1", "type": "long"}, {"name": "Key2", "type": "long"}],
161+
}
162+
)
163+
164+
KEY_DISTRIBUTION = json.dumps(
165+
{
166+
"com.acme.avro.testrecordkey::Key1": [0, 100],
167+
"com.acme.avro.testrecordkey::Key2": [0, 250000],
168+
}
169+
)
170+
171+
VALUE_SCHEMA = json.dumps(
172+
{
173+
"name": "testrecord",
174+
"type": "record",
175+
"namespace": "com.acme.avro",
176+
"fields": [
177+
{"name": "Key1Unused", "type": "long"},
178+
{"name": "Key2Unused", "type": "long"},
179+
{
180+
"name": "OuterRecord",
181+
"type": {
182+
"name": "OuterRecord",
183+
"type": "record",
184+
"fields": [
185+
{
186+
"name": "Record1",
187+
"type": {
188+
"name": "Record1",
189+
"type": "record",
190+
"fields": [
191+
{
192+
"name": "InnerRecord1",
193+
"type": {
194+
"name": "InnerRecord1",
195+
"type": "record",
196+
"fields": [
197+
{"name": "Point", "type": "long"}
198+
],
199+
},
200+
},
201+
{
202+
"name": "InnerRecord2",
203+
"type": {
204+
"name": "InnerRecord2",
205+
"type": "record",
206+
"fields": [
207+
{"name": "Point", "type": "long"}
208+
],
209+
},
210+
},
211+
],
212+
},
213+
},
214+
{
215+
"name": "Record2",
216+
"type": {
217+
"name": "Record2",
218+
"type": "record",
219+
"fields": [
220+
{
221+
"name": "InnerRecord3",
222+
"type": {
223+
"name": "InnerRecord3",
224+
"type": "record",
225+
"fields": [
226+
{"name": "Point", "type": "long"}
227+
],
228+
},
229+
},
230+
{
231+
"name": "InnerRecord4",
232+
"type": {
233+
"name": "InnerRecord4",
234+
"type": "record",
235+
"fields": [
236+
{"name": "Point", "type": "long"}
237+
],
238+
},
239+
},
240+
],
241+
},
242+
},
243+
],
244+
},
245+
},
246+
],
247+
}
248+
)
249+
250+
VALUE_DISTRIBUTION = json.dumps(
251+
{
252+
"com.acme.avro.testrecord::Key1Unused": [0, 100],
253+
"com.acme.avro.testrecord::Key2Unused": [0, 250000],
254+
"com.acme.avro.InnerRecord1::Point": [10000, 1000000000],
255+
"com.acme.avro.InnerRecord2::Point": [10000, 1000000000],
256+
"com.acme.avro.InnerRecord3::Point": [10000, 1000000000],
257+
"com.acme.avro.InnerRecord4::Point": [10000, 10000000000],
258+
}
259+
)
260+
261+
161262
if __name__ == "__main__":
162263
main()

0 commit comments

Comments
 (0)