-
Notifications
You must be signed in to change notification settings - Fork 10
feat: Implement ReadN #276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
case rec := <-i.records: | ||
recs = append(recs, rec) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried out a couple of things with channels in this gist that might help here too. The test sends a number of objects from one goroutine to another through a channel.
The sending goroutine sends 20M objects in batches of different sizes (1-100k). The channel that's shared by the sending and receiving goroutine is either buffered or unbuffered.
On my own machine, sending those 20M objects in batches of 1 record with an unbuffered channel takes around 5.5 seconds.
With batch size set to 1, and a buffered channel of size 50, that drops to 1.8 seconds (3x less).
The biggest improvement though was I set the batch size to 1k or 10k (it didn't matter much if the channel was buffered), and the result was around 80ms.
With that, I think it's worth if we try sending the data from the CDC and snapshot iterator in batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what I meant with iterators sending data in batches: https://github.com/ConduitIO/conduit-connector-postgres/compare/read-n...haris/read-n-batches?expand=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we should try collecting records as soon as we can to cut down on the code that processes records 1 by 1 as much as we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we should try collecting records as soon as we can to cut down on the code that processes records 1 by 1 as much as we can.
This may work ok without a timer, since I think "collecting" needs two parameters (size, tinterval) for which the requirements are satisfied. Now, there is a side-effect which can be used and it has to do with Postgres sending keep alive messages so often that the time may not be required and we can count time based on the keep alive messages.. something like:
1. I want 3 messages
2. recv message -> record (n++)
3. n < 3
4. recv meessage -> keepalive (t < waitFor) -> continue
5. recv message -> record (n++)
6. n < 3
7. recv message -> keep alive (t >= waitFor) -> return 2 messages
```
This is mostly implementation detail
return nil, fmt.Errorf("n must be greater than 0, got %d", n) | ||
} | ||
|
||
var records []opencdc.Record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an optimization we can do here by pre-allocating the slice, otherwise we're creating more allocations than we should, especially for big batches.
Now, what size should we pick? An easy solution would be to just allocate n
elements and expect that we'll fill the slice up. Of course that will waste space if we don't actually fill the slice up. So I'm thinking that we could store the last batch size in the iterator and use that to allocate the slice, if it's smaller than n
. The assumption here is that we will mostly see batches of the same size. For instance, if records are steadily streaming and batches are filled, we'll constantly create slices of the full batch. Then, if the stream dies down and we don't have a lot to process, the batch size will decrease and we'll allocate smaller slices.
var records []opencdc.Record | |
records := make([]opencdc.Record, min(i.lastBatchSize, n)) | |
... | |
// before returning the records store the batch size | |
i.lastBatchSize = len(records) | |
return records, nil |
Or do you see a better strategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course that will waste space if we don't actually fill the slice up.
How much space would actually be wasted?
There's another thing that plays a role and that is the snapshot.fetchSize
parameter (which I think probably can be removed, because it we have sdk.batch.size
). While the iterator is active, we can always expect the database to return that amount of records (except for the last fetch),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hariso good point. I have actually been confused by the two whenever I used this connector as a source. This change would require deprecating the field, making use only of the SDK parameter, but I'll see how to approach this in a way that doesn't increase much scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit more nuanced than the CDC iterator, because records are coming in the fetch size defined in the config. As it stands, each record from that cursor fetch is fed 1 by 1 into the channel. If there is caching at that level, you essentially need to take N and not have to pre-allocate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's another thing that plays a role and that is the
snapshot.fetchSize
parameter (which I think probably can be removed, because it we havesdk.batch.size
). While the iterator is active, we can always expect the database to return that amount of records (except for the last fetch),
@hariso I'm unclear about this, the fetchSize is the cursor fetch size, e.g. how many rows to get from Postgres, this is primarily used to optimize retrieval of rows. Removing that will result in select * from table
which may or may not perform better, but something we should try, but somewhat tangential to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lyuboxa what I meant is to keep the FETCH
, but not have a fetchSize
parameter in the connector. Instead, we should use FETCH sdk.batch.size
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hariso I think this will be a bit confusing, since sdk.batch.size
is a general config, while fetch size is specific for the snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it need to be specific? When I find sdk.batch.size: 100
in a source connector's configuration, I expect the connector to read 100 records at a time, which is the fetch size in snapshots, or no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it does not apply to CDC. Its specific to postgres and it may become irrelevant if we decided to move to select * from table
¯_(ツ)_/¯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hariso and I had a chat, the TL;DR is that sdk.batch.size
is what conduit passes to ReadN
and that may change on configuration, thus allocating a slice with n
capacity based on the ReadN
argument makes sense.
records := make([]opencdc.Record, 0, sdkBatchSize)
The compiler has no idea how to allocate this on the stack, so it will go on the heap anyway.
The append to that should be fairly efficient and you can either return the full slice or something like records[:nread]
but that doesn't matter, the slice will have the same capacity. You need to get that from the source batch configuration from the config.
What @lovromazgon is suggesting will be needed if we are returning []opencdc.Record
but I don't think we need it in this implementation.
Description
Since the introduction of end-to-end batching on the Connector SDK, (part of the Connector SDK 0.12.0), source connectors can add support to batching by implementing the
ReadN
method. This wasn't a breaking change since the SDK will fallback to the traditionalRead
method as it can be spotted in the logs when starting Conduit.Example:
2025-04-24T19:21:21+00:00 INF source does not support batch reads, falling back to single reads component=plugin connector_id=postgres-to-kafka:postgres-source plugin_name=builtin:postgres plugin_type=source
This pull-request implements
ReadN
while still maintaining support toRead
method for those who wish to use this connector in an older version of Conduit. This is probably not necessary considering Postgres is a builtin connector.To really utilize the advantage of this, it is recommended to run Conduit with the flag that enables a new upcoming architecture via
--preview.pipeline-arch-v2
. More information on this blog post.Running benchmarks
Inserting 20M records using Benchi using CDC:
Read
: 142824.47 msg/sReadN
: 153107.18 msg/s (previously )This is a 7,2% better on CDC.
Quick checks: