Skip to content

Commit 5c43189

Browse files
committed
Add MinShards and configure number of shards based on that
1 parent db85a5b commit 5c43189

File tree

4 files changed

+13
-6
lines changed

4 files changed

+13
-6
lines changed

internal/component/common/loki/client/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,13 @@ func newClient(metrics *Metrics, cfg Config, logger log.Logger) (*client, error)
6868
cancel: cancel,
6969
}
7070

71+
c.shards.start(cfg.Queue.MinShards)
72+
7173
c.wg.Go(func() { c.run() })
7274
return c, nil
7375
}
7476

7577
func (c *client) run() {
76-
c.shards.start(1)
7778

7879
for {
7980
select {

internal/component/common/loki/client/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ type QueueConfig struct {
4646
// is the 1 MiB default, and a capacity of 100 MiB, the underlying buffered channel would buffer up to 100 batches.
4747
Capacity int
4848

49+
// MinShards is the minimum amount of concurrent shards sending batches to the endpoint.
50+
MinShards int
51+
4952
// DrainTimeout controls the maximum time that draining the send queue can take.
5053
DrainTimeout time.Duration
5154
}

internal/component/common/loki/client/queue_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config,
8181
markerHandler: markerHandler,
8282
}
8383

84-
c.shards.start(1)
84+
c.shards.start(cfg.Queue.MinShards)
8585

8686
return c, nil
8787
}

internal/component/loki/write/types.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,20 @@ func (r *EndpointOptions) Validate() error {
7070
return nil
7171
}
7272

73-
// QueueConfig controls how the queue logs remote write client is configured. Note that this client is only used when the
74-
// loki.write component has WAL support enabled.
73+
// QueueConfig controls how the queue logs remote write client is configured.
7574
type QueueConfig struct {
76-
Capacity units.Base2Bytes `alloy:"capacity,attr,optional"`
77-
DrainTimeout time.Duration `alloy:"drain_timeout,attr,optional"`
75+
Capacity units.Base2Bytes `alloy:"capacity,attr,optional"`
76+
77+
MinShards int `alloy:"min_shards,attr,optional"`
78+
79+
DrainTimeout time.Duration `alloy:"drain_timeout,attr,optional"`
7880
}
7981

8082
// SetToDefault implements syntax.Defaulter.
8183
func (q *QueueConfig) SetToDefault() {
8284
*q = QueueConfig{
8385
Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10
86+
MinShards: 1,
8487
DrainTimeout: 15 * time.Second,
8588
}
8689
}

0 commit comments

Comments
 (0)