Skip to content

Commit f4963e9

Browse files
Bowbaqsnorecone
authored andcommitted
Fix race condition leading to duplicate tailing (#181)
* Fix race condition leading to duplicate tailing * Add threadsafe testWriter * Make sure the test runs in a clean folder * Remove line check, panic must have been caused by something else * Switch to a test registry * Fix race condition in logger
1 parent f31e3ca commit f4963e9

File tree

4 files changed

+111
-21
lines changed

4 files changed

+111
-21
lines changed

remote_syslog.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ var (
2626
type Server struct {
2727
config *Config
2828
logger *syslog.Logger
29-
registry *WorkerRegistry
29+
registry WorkerRegistry
3030
stopChan chan struct{}
3131
stopped bool
3232
mu sync.RWMutex
@@ -35,7 +35,7 @@ type Server struct {
3535
func NewServer(config *Config) *Server {
3636
return &Server{
3737
config: config,
38-
registry: NewWorkerRegistry(),
38+
registry: NewInMemoryRegistry(),
3939
stopChan: make(chan struct{}),
4040
}
4141
}
@@ -99,7 +99,6 @@ func (s *Server) closing() bool {
9999
// Tails a single file
100100
func (s *Server) tailOne(file, tag string, whence int) {
101101
defer s.registry.Remove(file)
102-
s.registry.Add(file)
103102

104103
t, err := tail.TailFile(file, tail.Config{
105104
ReOpen: true,
@@ -196,6 +195,7 @@ func (s *Server) globFiles(firstPass bool) {
196195
whence = io.SeekEnd
197196
}
198197

198+
s.registry.Add(file)
199199
go s.tailOne(file, tag, whence)
200200
}
201201
}

remote_syslog_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"net"
77
"os"
88
"regexp"
9+
"strconv"
10+
"sync"
911
"testing"
1012
"time"
1113

@@ -83,6 +85,51 @@ func TestNewFileSeek(t *testing.T) {
8385
}
8486
}
8587

88+
func TestGlobCollisions(t *testing.T) {
89+
assert := assert.New(t)
90+
91+
// Make sure we're running on a clean directory
92+
os.RemoveAll(tmpdir)
93+
os.Mkdir(tmpdir, 0755)
94+
95+
// Add colliding globs
96+
config := testConfig()
97+
config.Files = append(config.Files, LogFile{
98+
Path: "tmp/*.log",
99+
})
100+
101+
// Use an observable registry
102+
testRegistry := &testRegistry{workers: make(map[string]int)}
103+
104+
s := NewServer(config)
105+
s.registry = testRegistry
106+
go s.Start()
107+
defer s.Close()
108+
109+
// just a quick rest to get the server started
110+
time.Sleep(1 * time.Second)
111+
112+
var files []*os.File
113+
for i := 0; i < 50; i++ {
114+
file := tmpLogFile()
115+
files = append(files, file)
116+
writeLog(file, "the most important message"+strconv.Itoa(i))
117+
}
118+
119+
// NewFileCheckInterval = 1 second, so wait 1100ms for messages
120+
time.Sleep(3000 * time.Millisecond)
121+
122+
testRegistry.mu.RLock()
123+
for file, forwardCount := range testRegistry.workers {
124+
assert.Equal(1, forwardCount, "Expected %s to be added once, got %d", file, forwardCount)
125+
}
126+
testRegistry.mu.RUnlock()
127+
128+
for _, file := range files {
129+
file.Close()
130+
}
131+
}
132+
86133
// write to test log file
87134
func writeLog(file *os.File, msg string) {
88135
w := bufio.NewWriterSize(file, 1024*32)
@@ -148,3 +195,30 @@ func testConfig() *Config {
148195
},
149196
}
150197
}
198+
199+
// testRegistry is a WorkerRegistry implementation that keeps track of how many times a file was added
200+
type testRegistry struct {
201+
mu sync.RWMutex
202+
workers map[string]int
203+
}
204+
205+
func (tr *testRegistry) Exists(worker string) bool {
206+
tr.mu.RLock()
207+
defer tr.mu.RUnlock()
208+
_, ok := tr.workers[worker]
209+
return ok
210+
}
211+
212+
func (tr *testRegistry) Add(worker string) {
213+
tr.mu.Lock()
214+
defer tr.mu.Unlock()
215+
log.Tracef("Adding %s to worker registry", worker)
216+
tr.workers[worker] += 1
217+
}
218+
219+
func (tr *testRegistry) Remove(worker string) {
220+
tr.mu.Lock()
221+
defer tr.mu.Unlock()
222+
log.Tracef("Removing %s from worker registry", worker)
223+
delete(tr.workers, worker)
224+
}

syslog/syslog.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ type Logger struct {
9898
connectTimeout time.Duration
9999
writeTimeout time.Duration
100100
tcpMaxLineLength int
101-
mu sync.Mutex
101+
mu sync.RWMutex
102102
stopChan chan struct{}
103103
stopped bool
104104
}
@@ -127,6 +127,9 @@ func Dial(clientHostname, network, raddr string, rootCAs *x509.CertPool, connect
127127
}
128128

129129
func (l *Logger) Write(packet Packet) {
130+
l.mu.RLock()
131+
defer l.mu.RUnlock()
132+
130133
if l.stopped {
131134
return
132135
}

worker_registry.go

+30-17
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,46 @@ import (
44
"sync"
55
)
66

7-
func NewWorkerRegistry() *WorkerRegistry {
8-
return &WorkerRegistry{workers: make(map[string]bool)}
7+
// WorkerRegistry keeps track of which files are being actively tailed in order to avoid sending the same logs
8+
// multiple times. Implementations must be thread-safe.
9+
type WorkerRegistry interface {
10+
// Exists returns true if a log file is currently being tailed
11+
Exists(worker string) bool
12+
13+
// Add marks a log file as being currently tailed
14+
Add(worker string)
15+
16+
// Remove clears a log file from the registry
17+
Remove(worker string)
918
}
1019

11-
type WorkerRegistry struct {
12-
workers map[string]bool
20+
// InMemoryRegistry is a simple WorkerRegistry implementation that uses a map protected by a sync.RWMutex.
21+
type InMemoryRegistry struct {
1322
mu sync.RWMutex
23+
workers map[string]bool
24+
}
25+
26+
func NewInMemoryRegistry() WorkerRegistry {
27+
return &InMemoryRegistry{workers: make(map[string]bool)}
1428
}
1529

16-
func (w *WorkerRegistry) Exists(worker string) bool {
17-
w.mu.RLock()
18-
defer w.mu.RUnlock()
19-
_, ok := w.workers[worker]
30+
func (imr *InMemoryRegistry) Exists(worker string) bool {
31+
imr.mu.RLock()
32+
defer imr.mu.RUnlock()
33+
_, ok := imr.workers[worker]
2034
return ok
2135
}
2236

23-
func (w *WorkerRegistry) Add(worker string) {
24-
w.mu.Lock()
25-
defer w.mu.Unlock()
37+
func (imr *InMemoryRegistry) Add(worker string) {
38+
imr.mu.Lock()
39+
defer imr.mu.Unlock()
2640
log.Tracef("Adding %s to worker registry", worker)
27-
w.workers[worker] = true
41+
imr.workers[worker] = true
2842
}
2943

30-
func (w *WorkerRegistry) Remove(worker string) error {
31-
w.mu.Lock()
32-
defer w.mu.Unlock()
44+
func (imr *InMemoryRegistry) Remove(worker string) {
45+
imr.mu.Lock()
46+
defer imr.mu.Unlock()
3347
log.Tracef("Removing %s from worker registry", worker)
34-
delete(w.workers, worker)
35-
return nil
48+
delete(imr.workers, worker)
3649
}

0 commit comments

Comments
 (0)