@@ -192,12 +192,8 @@ func (wl *walForge) rotateLogIfNeeded(entrySize uint32) error {
192
192
193
193
// rotateLog rotates the log by closing the current segment file,
194
194
// incrementing the current segment index, and opening a new segment file.
195
- // This method is thread safe.
196
195
func (wl * walForge ) rotateLog () error {
197
196
fmt .Println ("rotating log" )
198
- wl .mu .Lock ()
199
- defer wl .mu .Unlock ()
200
-
201
197
// TODO: Ideally this function should not return any error
202
198
// Check for the conditions where it can return an error
203
199
// and handle them gracefully.
@@ -237,11 +233,7 @@ func (wl *walForge) rotateLog() error {
237
233
238
234
// Writes out any data in the WAL's in-memory buffer to the segment file.
239
235
// and syncs the segment file to disk.
240
- // This method is thread safe.
241
236
func (wl * walForge ) sync () error {
242
- wl .mu .Lock ()
243
- defer wl .mu .Unlock ()
244
-
245
237
// Flush the buffer to the segment file
246
238
if err := wl .csWriter .Flush (); err != nil {
247
239
return err
@@ -265,10 +257,12 @@ func (wl *walForge) periodicSyncBuffer() {
265
257
for {
266
258
select {
267
259
case <- wl .bufferSyncTicker .C :
260
+ wl .mu .Lock ()
268
261
err := wl .sync ()
269
262
if err != nil {
270
263
slog .Error ("failed to sync buffer" , slog .String ("error" , err .Error ()))
271
264
}
265
+ wl .mu .Unlock ()
272
266
case <- wl .ctx .Done ():
273
267
return
274
268
}
@@ -281,9 +275,11 @@ func (wl *walForge) periodicRotateSegment() {
281
275
select {
282
276
case <- wl .segmentRotationTicker .C :
283
277
// TODO: Remove this error handling once we clean up the error handling in the rotateLog function.
278
+ wl .mu .Lock ()
284
279
if err := wl .rotateLog (); err != nil {
285
280
slog .Error ("failed to rotate segment" , slog .String ("error" , err .Error ()))
286
281
}
282
+ wl .mu .Unlock ()
287
283
case <- wl .ctx .Done ():
288
284
return
289
285
}
0 commit comments