Skip to content

Commit e20088b

Browse files
committed
Fixed one of races, improve logs
1 parent 79ae31a commit e20088b

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

internal/background/worker.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package background
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"runtime/pprof"
78
"sync"
89

@@ -75,7 +76,7 @@ func (b *Worker) Close(ctx context.Context, err error) error {
7576
var resErr error
7677
b.m.WithLock(func() {
7778
if b.closed {
78-
resErr = xerrors.WithStackTrace(ErrAlreadyClosed)
79+
resErr = xerrors.WithStackTrace(fmt.Errorf("%w with reason: %+v", ErrAlreadyClosed, b.closeReason))
7980

8081
return
8182
}

internal/topic/topicwriterinternal/writer_reconnector_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,8 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
555555
},
556556
},
557557
Codec: rawtopiccommon.CodecRaw,
558+
}).Do(func(_ *rawtopicwriter.WriteRequest) {
559+
close(writeCompleted)
558560
}).Return(nil)
559561

560562
flushCompleted := make(empty.Chan)
@@ -564,7 +566,6 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
564566
CreatedAt: messageTime,
565567
Data: bytes.NewReader(messageData),
566568
}})
567-
close(writeCompleted)
568569
require.NoError(t, err)
569570
}()
570571

@@ -611,11 +612,11 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
611612
{
612613
name: "flush",
613614
flush: func(ctx context.Context, writer *WriterReconnector) error {
614-
return writer.Close(ctx)
615+
return writer.Flush(ctx)
615616
},
616617
},
617618
{
618-
name: "flush and close",
619+
name: "flush_and_close",
619620
flush: func(ctx context.Context, writer *WriterReconnector) error {
620621
err := writer.Flush(ctx)
621622
if err != nil {
@@ -631,7 +632,7 @@ func TestWriterImpl_CloseWithFlush(t *testing.T) {
631632
t.Run(test.name, func(t *testing.T) {
632633
xtest.TestManyTimes(t, func(t testing.TB) {
633634
f(t, test.flush)
634-
})
635+
}, xtest.StopAfter(time.Minute))
635636
})
636637
}
637638
}

0 commit comments

Comments
 (0)