Skip to content

Commit 51e23f0

Browse files
authored
Merge pull request #1231 Fixed data race on close topic writer
2 parents 79ae31a + 799c0b4 commit 51e23f0

File tree

4 files changed

+15
-9
lines changed

4 files changed

+15
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed race of stop internal processes on close topic writer
12
* Fixed goroutines leak within topic reader on network problems
23

34
## v3.67.0

internal/background/worker.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package background
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"runtime/pprof"
78
"sync"
89

@@ -75,7 +76,9 @@ func (b *Worker) Close(ctx context.Context, err error) error {
7576
var resErr error
7677
b.m.WithLock(func() {
7778
if b.closed {
78-
resErr = xerrors.WithStackTrace(ErrAlreadyClosed)
79+
// The error of Close is second close, close reason added for describe previous close only, for better debug
80+
//nolint:errorlint
81+
resErr = xerrors.WithStackTrace(fmt.Errorf("%w with reason: %+v", ErrAlreadyClosed, b.closeReason))
7982

8083
return
8184
}

internal/topic/topicwriterinternal/writer_reconnector.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -347,16 +347,17 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err
347347
onDone(resErr)
348348
}()
349349

350-
closeErr := w.queue.Close(reason)
351-
if resErr == nil && closeErr != nil {
352-
resErr = closeErr
353-
}
354-
350+
// stop background work and single stream writer
355351
bgErr := w.background.Close(ctx, reason)
356352
if resErr == nil && bgErr != nil {
357353
resErr = bgErr
358354
}
359355

356+
closeErr := w.queue.Close(reason)
357+
if resErr == nil && closeErr != nil {
358+
resErr = closeErr
359+
}
360+
360361
return resErr
361362
}
362363

internal/topic/topicwriterinternal/writer_reconnector_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,8 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
555555
},
556556
},
557557
Codec: rawtopiccommon.CodecRaw,
558+
}).Do(func(_ *rawtopicwriter.WriteRequest) {
559+
close(writeCompleted)
558560
}).Return(nil)
559561

560562
flushCompleted := make(empty.Chan)
@@ -564,7 +566,6 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
564566
CreatedAt: messageTime,
565567
Data: bytes.NewReader(messageData),
566568
}})
567-
close(writeCompleted)
568569
require.NoError(t, err)
569570
}()
570571

@@ -611,11 +612,11 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
611612
{
612613
name: "flush",
613614
flush: func(ctx context.Context, writer *WriterReconnector) error {
614-
return writer.Close(ctx)
615+
return writer.Flush(ctx)
615616
},
616617
},
617618
{
618-
name: "flush and close",
619+
name: "flush_and_close",
619620
flush: func(ctx context.Context, writer *WriterReconnector) error {
620621
err := writer.Flush(ctx)
621622
if err != nil {

0 commit comments

Comments
 (0)