-
Notifications
You must be signed in to change notification settings - Fork 472
loki: implement queue_config for loki.write #4788
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
be92f53 to
d1fafcf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the Loki client implementation by introducing a new sharding architecture and removing the maxStreams parameter from function signatures in favor of storing it in the Config struct. The refactoring consolidates batch sending logic into a new shards component that manages parallel queues for improved throughput.
Key changes:
- Introduced
shards.gowith queue-based sharding logic for parallel entry processing - Moved
maxStreamsfrom function parameters toConfigstruct - Refactored
client.goandqueue_client.goto use the new sharding architecture - Replaced manual WaitGroup management with
wg.Go()pattern
Reviewed Changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
internal/component/loki/write/write.go |
Configured MaxStreams on client configs and removed from NewManager call |
internal/component/common/loki/wal/writer.go |
Refactored to use wg.Go() pattern |
internal/component/common/loki/client/shards.go |
New file implementing sharding logic for parallel batch processing |
internal/component/common/loki/client/shards_test.go |
Tests for the new sharding implementation |
internal/component/common/loki/client/queue_client.go |
Refactored to use shards instead of custom queue logic |
internal/component/common/loki/client/client.go |
Simplified by delegating to shards |
internal/component/common/loki/client/batch.go |
Removed addFromWAL, unified to single add method |
internal/component/common/loki/client/manager.go |
Removed maxStreams parameter, cleaned up naming |
internal/component/common/loki/client/config.go |
Added MaxStreams field |
internal/component/common/loki/client/metrics.go |
Moved Metrics struct from client.go |
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
d14db8a to
921ab85
Compare
|
💻 Deploy preview available (loki: implement queue_config for loki.write): |
| When multiple `endpoint` blocks are provided, the `loki.write` component creates a client for each. | ||
| Received log entries are fanned-out to these clients in succession. | ||
| That means that if one client is bottlenecked, it may impact the rest. | ||
| Received log entries are fanned-out to these clients in succession. That means that if one client is bottlenecked, it may impact the rest. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We introduce the word 'client' here, while it's an unnecessary implementation detail I think - we can just talk about 'endpoints' and queues which each has maybe a 'worker' that sends the logs to their destination.
| | Name | Type | Description | Default | Required | | ||
| | --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | | ||
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | | ||
| | `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| | `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no | | |
| | `drain_timeout` | `duration` | Configures the maximum time the endpoint can take to drain its send queue upon shutdown. | `"1m"` | no | |
| | --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | | ||
| | `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | | ||
| | `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no | | ||
| | `min_shards` | `number` | Minimum amount of concurrent shards sending samples to the endpoint. | `1` | no | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| | `min_shards` | `number` | Minimum amount of concurrent shards sending samples to the endpoint. | `1` | no | | |
| | `min_shards` | `number` | Minimum number of concurrent workers sending logs to the endpoint. | `1` | no | |
Maybe we should also stop using term 'shard' and use 'worker'?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the same wording that we do in prometheus.remote_write
| | `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no | | ||
| | `min_shards` | `number` | Minimum amount of concurrent shards sending samples to the endpoint. | `1` | no | | ||
|
|
||
| Each queue manages a number of concurrent _shards_ which is responsible for sending a fraction of batches to their respective endpoints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Each queue manages a number of concurrent _shards_ which is responsible for sending a fraction of batches to their respective endpoints. | |
| Each queue manages a number of concurrent _workers_ which are responsible for sending batches to their respective endpoints. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as this
| Each queue manages a number of concurrent _shards_ which is responsible for sending a fraction of batches to their respective endpoints. | ||
| Each shard has a buffer of batches it keeps in memory, controlled with the `capacity` argument. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- each queue manages concurrent shards
- each shard has buffer in memory controlled by capacity
- capacity is defined as the size of queue buffer
so what is the queue? is it the thing that manages the shards that have buffers? or is it the buffer inside of each shard and there is some kind of manager above them?
We need to document it in a way that will make the answers to above questions clear :)
This doc already had some of these issues, but I think we need to clean this up and make it easier to understand with this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah this is wrong, each shard has it's own queue
Co-authored-by: Piotr <[email protected]>
PR Description
This PR implements
queue_configfor theloki.writecomponent, enabling users to configure queue-based batching and parallel processing. The implementation introduces a new sharding architecture that distributes log entries across multiple parallel queues based on label fingerprints that is based on Prometheus rw sharding.The shards implementation is used with both "normal" client and
queue_client. So we no longer need to duplicate the same logic in both implementations.Before this pr we had a
queue_configblock that was only used whenWALwas enabled. It is now always used.Currently no automatic "resharding" is implemented. Implementing this without the WAL will most likely be pretty primitive. So for now
min_shardsis the only configurable value until we address this.Ideally we would move a couple of attributes from
endpointblock toqueue_configblock to closer matchprometheus.remote_write. But we can't do that without doing a breaking change. These attributes are:retry_on_http_429max_backoff_periodmin_backoff_periodbatch_sizebatch_waitWhich issue(s) this PR fixes
Part of: #4728
Notes to the Reviewer
I moved wal writer ownership into client.Manager. No need to expose it to
loki.writecomponent.I plan to work on resharding in followup pr
PR Checklist