Skip to content

Commit 1693490

Browse files
committed
move where batch is marked as sent data
1 parent 07a054f commit 1693490

File tree

2 files changed

+3
-4
lines changed

2 files changed

+3
-4
lines changed

internal/component/common/loki/client/queue_client.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ func newQueueClient(metrics *Metrics, qcMetrics *QueueClientMetrics, cfg Config,
8181
markerHandler: markerHandler,
8282
}
8383

84-
// FIXME: resharding loop and better place for this
8584
c.shards.start(1)
8685

8786
return c, nil

internal/component/common/loki/client/shards.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func newShards(metrics *Metrics, logger log.Logger, markerHandler SentDataMarker
196196
logger: logger,
197197
metrics: metrics,
198198
client: client,
199-
marketHandler: markerHandler,
199+
markerHandler: markerHandler,
200200
tenants: make(map[string]struct{}),
201201
}, nil
202202
}
@@ -210,7 +210,7 @@ type shards struct {
210210
logger log.Logger
211211
metrics *Metrics
212212
client *http.Client
213-
marketHandler SentDataMarkerHandler
213+
markerHandler SentDataMarkerHandler
214214

215215
mut sync.Mutex
216216
tenants map[string]struct{}
@@ -313,7 +313,6 @@ func (s *shards) runShard(q *queue) {
313313
// Drain all batches that have exceeded the max wait time.
314314
for _, b := range q.drain() {
315315
s.sendBatch(b.TenantID, b.Batch)
316-
b.Batch.reportAsSentData(s.marketHandler)
317316
}
318317
}
319318
}
@@ -368,6 +367,7 @@ func (s *shards) processEntry(e loki.Entry) (loki.Entry, string) {
368367

369368
// sendBatch encodes a batch and sends it to Loki with retry logic.
370369
func (s *shards) sendBatch(tenantID string, batch *batch) {
370+
defer batch.reportAsSentData(s.markerHandler)
371371
buf, entriesCount, err := batch.encode()
372372

373373
if err != nil {

0 commit comments

Comments
 (0)