Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ Main (unreleased)

- `loki.source.api` during component shutdown will now reject all the inflight requests with status code 503 after `graceful_shutdown_timeout` has expired. (@kalleep)

- `loki.write` now supports `queue_config` when used with or without WAL.
- It was also exteded with configuration to start multiple shards, each having their own queue and is able to push batches in parallel.

### Bugfixes

- Stop `loki.source.kubernetes` discarding log lines with duplicate timestamps. (@ciaranj)
Expand Down
16 changes: 9 additions & 7 deletions docs/sources/reference/components/loki/loki.write.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ You can use the following blocks with `loki.write`:
| `endpoint` > [`basic_auth`][basic_auth] | Configure `basic_auth` for authenticating to the endpoint. | no |
| `endpoint` > [`oauth2`][oauth2] | Configure OAuth 2.0 for authenticating to the endpoint. | no |
| `endpoint` > `oauth2` > [`tls_config`][tls_config] | Configure TLS settings for connecting to the endpoint. | no |
| `endpoint` > [`queue_config`][queue_config] | When WAL is enabled, configures the queue client. | no |
| `endpoint` > [`queue_config`][queue_config] | Configure the queue used for endpoint. | no |
| `endpoint` > [`tls_config`][tls_config] | Configure TLS settings for connecting to the endpoint. | no |
| [`wal`][wal] | Write-ahead log configuration. | no |

Expand Down Expand Up @@ -104,8 +104,9 @@ The following arguments are supported:
If no `tenant_id` is provided, the component assumes that the Loki instance at `endpoint` is running in single-tenant mode and no X-Scope-OrgID header is sent.

When multiple `endpoint` blocks are provided, the `loki.write` component creates a client for each.
Received log entries are fanned-out to these clients in succession.
That means that if one client is bottlenecked, it may impact the rest.
Received log entries are fanned-out to these clients in succession. That means that if one client is bottlenecked, it may impact the rest.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We introduce the word 'client' here, while it's an unnecessary implementation detail I think - we can just talk about 'endpoints' and queues which each has maybe a 'worker' that sends the logs to their destination.

Each endpoint has a _queue_ which is used to queue them for sending. The `queue_config` block can be used to customize the behavior of the queue.


Endpoints can be named for easier identification in debug metrics by using the `name` argument. If the `name` argument isn't provided, a name is generated based on a hash of the endpoint settings.

Expand All @@ -127,17 +128,18 @@ When `retry_on_http_429` is enabled, the retry mechanism is governed by the back

### `queue_config`

{{< docs/shared lookup="stability/experimental_feature.md" source="alloy" version="<ALLOY_VERSION>" >}}

The optional `queue_config` block configures, when WAL is enabled, how the underlying client queues batches of logs sent to Loki.
Refer to [Write-Ahead block](#wal) for more information.
The optional `queue_config` block configures, how the endpoint queues batches of logs sent to Loki.

The following arguments are supported:

| Name | Type | Description | Default | Required |
| --------------- | ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- |
| `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no |
| `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it enqueues pending batches and drains the send queue sending each. | `"1m"` | no |
| `drain_timeout` | `duration` | Configures the maximum time the endpoint can take to drain its send queue upon shutdown. | `"1m"` | no |

| `min_shards` | `number` | Minimum amount of concurrent shards sending samples to the endpoint. | `1` | no |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `min_shards` | `number` | Minimum amount of concurrent shards sending samples to the endpoint. | `1` | no |
| `min_shards` | `number` | Minimum number of concurrent workers sending logs to the endpoint. | `1` | no |

Maybe we should also stop using term 'shard' and use 'worker'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the same wording that we do in prometheus.remote_write


Each queue manages a number of concurrent _shards_ which is responsible for sending a fraction of batches to their respective endpoints.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Each queue manages a number of concurrent _shards_ which is responsible for sending a fraction of batches to their respective endpoints.
Each queue manages a number of concurrent _workers_ which are responsible for sending batches to their respective endpoints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as this

Each shard has a buffer of batches it keeps in memory, controlled with the `capacity` argument.
Comment on lines +141 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • each queue manages concurrent shards
  • each shard has buffer in memory controlled by capacity
  • capacity is defined as the size of queue buffer

so what is the queue? is it the thing that manages the shards that have buffers? or is it the buffer inside of each shard and there is some kind of manager above them?

We need to document it in a way that will make the answers to above questions clear :)

This doc already had some of these issues, but I think we need to clean this up and make it easier to understand with this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah this is wrong, each shard has it's own queue


### `tls_config`

Expand Down
38 changes: 4 additions & 34 deletions internal/component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type batch struct {
segmentCounter map[int]int
}

func newBatch(maxStreams int, entries ...loki.Entry) *batch {
func newBatch(maxStreams int) *batch {
b := &batch{
streams: map[string]*push.Stream{},
totalBytes: 0,
Expand All @@ -51,17 +51,12 @@ func newBatch(maxStreams int, entries ...loki.Entry) *batch {
segmentCounter: map[int]int{},
}

// Add entries to the batch
for _, entry := range entries {
//never error here
_ = b.add(entry)
}

return b
}

// add an entry to the batch
func (b *batch) add(entry loki.Entry) error {
// add an entry to the batch, tracking that the data being added
// comes from segment segmentNum read from the WAL.
func (b *batch) add(entry loki.Entry, segmentNum int) error {
b.totalBytes += entrySize(entry.Entry)

// Append the entry to an already existing stream (if any)
Expand All @@ -80,32 +75,7 @@ func (b *batch) add(entry loki.Entry) error {
Labels: labels,
Entries: []push.Entry{entry.Entry},
}
return nil
}

// addFromWAL adds an entry to the batch, tracking that the data being added comes from segment segmentNum read from the
// WAL.
func (b *batch) addFromWAL(lbs model.LabelSet, entry push.Entry, segmentNum int) error {
b.totalBytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := labelsMapToString(lbs)
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry)
b.countForSegment(segmentNum)
return nil
}

streams := len(b.streams)
if b.maxStreams > 0 && streams >= b.maxStreams {
return fmt.Errorf("%w, streams: %d exceeds limit: %d, stream: '%s'", errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
}

// Add the entry as a new stream
b.streams[labels] = &push.Stream{
Labels: labels,
Entries: []push.Entry{entry},
}
b.countForSegment(segmentNum)

return nil
Expand Down
41 changes: 23 additions & 18 deletions internal/component/common/loki/client/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/loki/pkg/push"
)

func TestBatch_MaxStreams(t *testing.T) {
Expand All @@ -27,7 +27,7 @@ func TestBatch_MaxStreams(t *testing.T) {

errCount := 0
for _, entry := range inputEntries {
err := b.add(entry)
err := b.add(entry, 0)
if err != nil {
errCount++
assert.ErrorIs(t, err, errMaxStreamsLimitExceeded)
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestBatch_add(t *testing.T) {
b := newBatch(0)

for _, entry := range testData.inputEntries {
err := b.add(entry)
err := b.add(entry, 0)
assert.NoError(t, err)
}

Expand All @@ -99,24 +99,30 @@ func TestBatch_encode(t *testing.T) {
expectedEntriesCount: 0,
},
"single stream with single log entry": {
inputBatch: newBatch(0,
loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},
),
inputBatch: func() *batch {
b := newBatch(0)
_ = b.add(loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, 0)
return b
}(),
expectedEntriesCount: 1,
},
"single stream with multiple log entries": {
inputBatch: newBatch(0,
loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry},
loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[1].Entry},
),
inputBatch: func() *batch {
b := newBatch(0)
_ = b.add(loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, 0)
_ = b.add(loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[1].Entry}, 0)
return b
}(),
expectedEntriesCount: 2,
},
"multiple streams with multiple log entries": {
inputBatch: newBatch(0,
loki.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[0].Entry},
loki.Entry{Labels: model.LabelSet{"type": "a"}, Entry: logEntries[1].Entry},
loki.Entry{Labels: model.LabelSet{"type": "b"}, Entry: logEntries[2].Entry},
),
inputBatch: func() *batch {
b := newBatch(0)
_ = b.add(loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[0].Entry}, 0)
_ = b.add(loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[1].Entry}, 0)
_ = b.add(loki.Entry{Labels: model.LabelSet{}, Entry: logEntries[2].Entry}, 0)
return b
}(),
expectedEntriesCount: 3,
},
}
Expand Down Expand Up @@ -146,9 +152,8 @@ func TestHashCollisions(t *testing.T) {
const entriesPerLabel = 10

for i := 0; i < entriesPerLabel; i++ {
_ = b.add(loki.Entry{Labels: ls1, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})

_ = b.add(loki.Entry{Labels: ls2, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}})
_ = b.add(loki.Entry{Labels: ls1, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, 0)
_ = b.add(loki.Entry{Labels: ls2, Entry: push.Entry{Timestamp: time.Now(), Line: fmt.Sprintf("line %d", i)}}, 0)
}

// make sure that colliding labels are stored properly as independent streams
Expand Down
Loading
Loading