Skip to content

Commit 921ab85

Browse files
committed
Move walWriter into manager and contain it there
1 parent 5c43189 commit 921ab85

File tree

4 files changed

+54
-55
lines changed

4 files changed

+54
-55
lines changed

internal/component/common/loki/client/manager.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ func (p watcherClientPair) Stop(drain bool) {
7272
// https://github.com/grafana/loki/issues/8197, this Manager will be
7373
// responsible for instantiating all client types: Logger, Multi and WAL.
7474
type Manager struct {
75-
clients []Client
76-
pairs []watcherClientPair
75+
clients []Client
76+
pairs []watcherClientPair
77+
walWriter *wal.Writer
7778

7879
entries chan loki.Entry
7980
once sync.Once
@@ -82,20 +83,34 @@ type Manager struct {
8283
}
8384

8485
// NewManager creates a new Manager
85-
func NewManager(metrics *Metrics, logger log.Logger, reg prometheus.Registerer, walCfg wal.Config, notifier WriterEventsNotifier, clientCfgs ...Config) (*Manager, error) {
86+
func NewManager(metrics *Metrics, logger log.Logger, reg prometheus.Registerer, walCfg wal.Config, clientCfgs ...Config) (*Manager, error) {
8687
var fake struct{}
8788

88-
walWatcherMetrics := wal.NewWatcherMetrics(reg)
89-
walMarkerMetrics := internal.NewMarkerMetrics(reg)
90-
queueClientMetrics := NewQueueClientMetrics(reg)
91-
9289
if len(clientCfgs) == 0 {
9390
return nil, fmt.Errorf("at least one client config must be provided")
9491
}
9592

96-
clientsCheck := make(map[string]struct{})
97-
clients := make([]Client, 0, len(clientCfgs))
98-
pairs := make([]watcherClientPair, 0, len(clientCfgs))
93+
var walWriter *wal.Writer
94+
if walCfg.Enabled {
95+
var err error
96+
walWriter, err = wal.NewWriter(walCfg, logger, reg)
97+
if err != nil {
98+
return nil, fmt.Errorf("error creating wal writer: %w", err)
99+
}
100+
}
101+
102+
var (
103+
walWatcherMetrics = wal.NewWatcherMetrics(reg)
104+
walMarkerMetrics = internal.NewMarkerMetrics(reg)
105+
queueClientMetrics = NewQueueClientMetrics(reg)
106+
)
107+
108+
var (
109+
clientsCheck = make(map[string]struct{})
110+
clients = make([]Client, 0, len(clientCfgs))
111+
pairs = make([]watcherClientPair, 0, len(clientCfgs))
112+
)
113+
99114
for _, cfg := range clientCfgs {
100115
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
101116
clientName := getClientName(cfg)
@@ -120,13 +135,13 @@ func NewManager(metrics *Metrics, logger log.Logger, reg prometheus.Registerer,
120135
return nil, fmt.Errorf("error starting queue client: %w", err)
121136
}
122137

138+
watcher := wal.NewWatcher(walCfg.Dir, clientName, walWatcherMetrics, queue, wlog, walCfg.WatchConfig, markerHandler)
139+
123140
// subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo
124141
// series cache whenever a segment is deleted.
125-
notifier.SubscribeCleanup(queue)
126-
127-
watcher := wal.NewWatcher(walCfg.Dir, clientName, walWatcherMetrics, queue, wlog, walCfg.WatchConfig, markerHandler)
142+
walWriter.SubscribeCleanup(queue)
128143
// subscribe watcher to wal write events
129-
notifier.SubscribeWrite(watcher)
144+
walWriter.SubscribeWrite(watcher)
130145

131146
level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName)
132147
watcher.Start()
@@ -149,9 +164,10 @@ func NewManager(metrics *Metrics, logger log.Logger, reg prometheus.Registerer,
149164
}
150165
}
151166
manager := &Manager{
152-
clients: clients,
153-
pairs: pairs,
154-
entries: make(chan loki.Entry),
167+
clients: clients,
168+
pairs: pairs,
169+
walWriter: walWriter,
170+
entries: make(chan loki.Entry),
155171
}
156172

157173
if walCfg.Enabled {
@@ -187,6 +203,9 @@ func (m *Manager) startWithForward() {
187203
}
188204

189205
func (m *Manager) Chan() chan<- loki.Entry {
206+
if m.walWriter != nil {
207+
return m.walWriter.Chan()
208+
}
190209
return m.entries
191210
}
192211

@@ -195,11 +214,15 @@ func (m *Manager) Stop() {
195214
m.StopWithDrain(false)
196215
}
197216

198-
// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
217+
// StopWithDrain will stop the manager, its WalWriter, Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
199218
// the Watchers will attempt to drain the WAL completely.
200-
// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then
201-
// the clients are shut down accordingly.
219+
// The shutdown procedure first stops the WalWriter, Then Watchers, allowing them to flush as much data into the clients as possible.
220+
// Lastly the clients are shut down accordingly.
202221
func (m *Manager) StopWithDrain(drain bool) {
222+
if m.walWriter != nil {
223+
m.walWriter.Stop()
224+
}
225+
203226
// first stop the receiving channel
204227
m.once.Do(func() { close(m.entries) })
205228
m.wg.Wait()

internal/component/common/loki/client/manager_test.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func TestManager_NoDuplicateMetricsPanic(t *testing.T) {
3737
for range 2 {
3838
_, err := NewManager(metrics, log.NewNopLogger(), reg, wal.Config{
3939
WatchConfig: wal.DefaultWatchConfig,
40-
}, NilNotifier, Config{
40+
}, Config{
4141
URL: flagext.URLValue{URL: host},
4242
})
4343
require.NoError(t, err)
@@ -53,7 +53,7 @@ func TestManager_ErrorCreatingWhenNoClientConfigsProvided(t *testing.T) {
5353
Dir: walDir,
5454
Enabled: walEnabled,
5555
WatchConfig: wal.DefaultWatchConfig,
56-
}, NilNotifier)
56+
})
5757
require.Error(t, err)
5858
})
5959
}
@@ -74,7 +74,7 @@ func TestManager_ErrorCreatingWhenRepeatedConfigs(t *testing.T) {
7474
Dir: walDir,
7575
Enabled: walEnabled,
7676
WatchConfig: wal.DefaultWatchConfig,
77-
}, NilNotifier, config1, config1Copy)
77+
}, config1, config1Copy)
7878
require.Error(t, err)
7979
})
8080
}
@@ -131,10 +131,7 @@ func TestManager_WALEnabled(t *testing.T) {
131131
testClientConfig, rwReceivedReqs, closeServer := newServerAndClientConfig(t)
132132
clientMetrics := NewMetrics(reg)
133133

134-
// start writer and manager
135-
writer, err := wal.NewWriter(walConfig, logger, reg)
136-
require.NoError(t, err)
137-
manager, err := NewManager(clientMetrics, logger, prometheus.NewRegistry(), walConfig, writer, testClientConfig)
134+
manager, err := NewManager(clientMetrics, logger, prometheus.NewRegistry(), walConfig, testClientConfig)
138135
require.NoError(t, err)
139136

140137
receivedRequests := utils.NewSyncSlice[utils.RemoteWriteRequest]()
@@ -145,7 +142,6 @@ func TestManager_WALEnabled(t *testing.T) {
145142
}()
146143

147144
defer func() {
148-
writer.Stop()
149145
manager.Stop()
150146
closeServer.Close()
151147
}()
@@ -155,7 +151,7 @@ func TestManager_WALEnabled(t *testing.T) {
155151
}
156152
var totalLines = 100
157153
for i := range totalLines {
158-
writer.Chan() <- loki.Entry{
154+
manager.Chan() <- loki.Entry{
159155
Labels: testLabels,
160156
Entry: push.Entry{
161157
Timestamp: time.Now(),
@@ -189,7 +185,7 @@ func TestManager_WALDisabled(t *testing.T) {
189185
clientMetrics := NewMetrics(reg)
190186

191187
// start writer and manager
192-
manager, err := NewManager(clientMetrics, logger, prometheus.NewRegistry(), walConfig, NilNotifier, testClientConfig)
188+
manager, err := NewManager(clientMetrics, logger, prometheus.NewRegistry(), walConfig, testClientConfig)
193189
require.NoError(t, err)
194190

195191
receivedRequests := utils.NewSyncSlice[utils.RemoteWriteRequest]()
@@ -247,7 +243,7 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
247243
clientMetrics := NewMetrics(reg)
248244

249245
// start writer and manager
250-
manager, err := NewManager(clientMetrics, logger, prometheus.NewRegistry(), walConfig, NilNotifier, testClientConfig, testClientConfig2)
246+
manager, err := NewManager(clientMetrics, logger, prometheus.NewRegistry(), walConfig, testClientConfig, testClientConfig2)
251247
require.NoError(t, err)
252248

253249
receivedRequests := utils.NewSyncSlice[utils.RemoteWriteRequest]()

internal/component/common/loki/client/shards.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ type shards struct {
233233
// Each shard gets its own queue and a dedicated worker that processes batches
234234
// from that queue. The number of shards determines the parallelism level.
235235
func (s *shards) start(n int) {
236+
n = max(n, 1)
237+
236238
s.mut.Lock()
237239
defer s.mut.Unlock()
238240

internal/component/loki/write/write.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,11 @@ func New(o component.Options, args Arguments) (*Component, error) {
118118
// Run implements component.Component.
119119
func (c *Component) Run(ctx context.Context) error {
120120
defer func() {
121-
// First we need to stop the sink, this is either wrapped clientManger or walWriter.
122-
// Stopping the sink will not stop the inner handler
121+
// First we need to stop the sink. Stopping the sink will not stop the wrapped handler.
123122
if c.sink != nil {
124123
c.sink.Stop()
125124
}
126125

127-
// when exiting Run, proceed to shut down first the writer component, and then
128-
// the client manager, with the WAL and remote-write client inside
129-
if c.walWriter != nil {
130-
c.walWriter.Stop()
131-
}
132126
if c.clientManger != nil {
133127
// drain, since the component is shutting down. That means Alloy is shutting down as well
134128
c.clientManger.StopWithDrain(true)
@@ -164,10 +158,6 @@ func (c *Component) Update(args component.Arguments) error {
164158
c.sink.Stop()
165159
}
166160

167-
if c.walWriter != nil {
168-
c.walWriter.Stop()
169-
}
170-
171161
if c.clientManger != nil {
172162
// only drain on component shutdown
173163
c.clientManger.Stop()
@@ -199,19 +189,7 @@ func (c *Component) Update(args component.Arguments) error {
199189
walCfg.Dir = filepath.Join(c.opts.DataPath, "wal")
200190

201191
var err error
202-
var notifier client.WriterEventsNotifier = client.NilNotifier
203-
// nil-out wal writer in case WAL was disabled
204-
c.walWriter = nil
205-
// only configure WAL Writer if enabled
206-
if walCfg.Enabled {
207-
c.walWriter, err = wal.NewWriter(walCfg, c.opts.Logger, c.opts.Registerer)
208-
if err != nil {
209-
return fmt.Errorf("error creating wal writer: %w", err)
210-
}
211-
notifier = c.walWriter
212-
}
213-
214-
c.clientManger, err = client.NewManager(c.metrics, c.opts.Logger, c.opts.Registerer, walCfg, notifier, cfgs...)
192+
c.clientManger, err = client.NewManager(c.metrics, c.opts.Logger, c.opts.Registerer, walCfg, cfgs...)
215193
if err != nil {
216194
return fmt.Errorf("failed to create client manager: %w", err)
217195
}

0 commit comments

Comments
 (0)