Closed
Description
Feature description
While working on streaming benchmarks, and implementing ReadN
, I found out that there's a big difference on performance with the way we're currently allocating memory for the slice of records.
To illustrate it. These are the results I've got on my machine (Apple M4 Max 36GB), inserting 20M records on CDC using sdk.batch.size: 10000
:
// 40K msg/s
//recs := make([]opencdc.Record, 0, n)
// 150K msg/s
var recs []opencdc.Record
This is a similar experience I've got on an EC2 instance c7a.xlarge 80GB
:
// 15649.41 msg/s
//recs := make([]opencdc.Record, 0, n)
// 70734.26 msg/s
var recs []opencdc.Record
Here's some further testing:
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"testing"
"github.com/conduitio/conduit-commons/opencdc"
)
func BenchmarkSliceAllocated_WithChannel_Allocated(b *testing.B) {
runBenchmark_WithChannel(b, func() []opencdc.Record {
recs := make([]opencdc.Record, 0, b.N)
return recs
})
}
func BenchmarkSliceAllocated_WithChannel_Unallocated(b *testing.B) {
runBenchmark_WithChannel(b, func() []opencdc.Record {
var recs []opencdc.Record
return recs
})
}
// runBenchmark_WithChannel runs a benchmark of the NextN function
// that simulates the NextN function from the Postgres connector's CDCIterator.
// It sets up a channel to which a goroutine will be sending records.
func runBenchmark_WithChannel(b *testing.B, sliceFn func() []opencdc.Record) {
ctx := context.Background()
recordsCh := make(chan opencdc.Record)
go func() {
for i := 0; i < b.N; i++ {
r := opencdc.Record{
Key: opencdc.StructuredData{"n": i},
}
recordsCh <- r
}
}()
fetched := 0
for fetched != b.N {
got, err := NextN(ctx, recordsCh, b.N, sliceFn)
if err != nil {
b.Fatalf("NextN failed: %v", err)
}
fetched += len(got)
}
}
func NextN(ctx context.Context, recordsCh chan opencdc.Record, n int, sliceFn func() []opencdc.Record) ([]opencdc.Record, error) {
if n <= 0 {
return nil, fmt.Errorf("n must be greater than 0, got %d", n)
}
// 40K msg/s
// recs := make([]opencdc.Record, 0, n)
// var recs []opencdc.Record
recs := sliceFn()
// Block until at least one record is received or context is canceled
select {
case <-ctx.Done():
return nil, ctx.Err()
case rec := <-recordsCh:
recs = append(recs, rec)
}
for len(recs) < n {
select {
case rec := <-recordsCh:
recs = append(recs, rec)
case <-ctx.Done():
return nil, ctx.Err()
default:
// No more records currently available
return recs, nil
}
}
return recs, nil
}
With results:
goos: darwin
goarch: arm64
pkg: github.com/trash/test-benchmark
cpu: Apple M4 Max
BenchmarkSliceAllocated_WithChannel_Allocated
BenchmarkSliceAllocated_WithChannel_Allocated-14 21981 74243 ns/op
BenchmarkSliceAllocated_WithChannel_Unallocated
BenchmarkSliceAllocated_WithChannel_Unallocated-14 4114660 292.5 ns/op
Even though I chose to release #279 as a way to improve the performance, this requires some further investigation. Theory says that it's possible the bottleneck lives elsewhere.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
Done