Skip to content

Commit ebda382

Browse files
authored
backport: fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789) (#2813)
fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789) * move bufio reader creation out of for loop if the bufio reader is created in the for loop we get unmarshaling errors * fix linter issue * add fixed ut * fix existing unit test flake due to closing pipe on error a previous fix ensured the socket closed on error, but this caused an existing ut to nondeterministically fail without the previous fix, the socket wouldn't have been closed on error * make read inline
1 parent 2b45998 commit ebda382

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

telemetry/telemetrybuffer.go

+5-13
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,14 @@ func (tb *TelemetryBuffer) StartServer() error {
136136
tb.connections = remove(tb.connections, index)
137137
}
138138
}()
139-
139+
reader := bufio.NewReader(conn)
140140
for {
141-
reportStr, err := read(conn)
142-
if err != nil {
141+
reportStr, readErr := reader.ReadBytes(Delimiter)
142+
if readErr != nil {
143143
return
144144
}
145+
reportStr = reportStr[:len(reportStr)-1]
146+
145147
var tmp map[string]interface{}
146148
err = json.Unmarshal(reportStr, &tmp)
147149
if err != nil {
@@ -228,16 +230,6 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) {
228230
}
229231
}
230232

231-
// read - read from the file descriptor
232-
func read(conn net.Conn) (b []byte, err error) {
233-
b, err = bufio.NewReader(conn).ReadBytes(Delimiter)
234-
if err == nil {
235-
b = b[:len(b)-1]
236-
}
237-
238-
return
239-
}
240-
241233
// Write - write to the file descriptor.
242234
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
243235
buf := make([]byte, len(b))

telemetry/telemetrybuffer_test.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,36 @@ func TestClientConnClose(t *testing.T) {
7070
tbClient.Close()
7171
}
7272

73+
func TestCloseOnWriteError(t *testing.T) {
74+
tbServer, closeTBServer := createTBServer(t)
75+
defer closeTBServer()
76+
77+
tbClient := NewTelemetryBuffer(nil)
78+
err := tbClient.Connect()
79+
require.NoError(t, err)
80+
defer tbClient.Close()
81+
82+
data := []byte("{\"good\":1}")
83+
_, err = tbClient.Write(data)
84+
require.NoError(t, err)
85+
// need to wait for connection to populate in server
86+
time.Sleep(1 * time.Second)
87+
tbServer.mutex.Lock()
88+
conns := tbServer.connections
89+
tbServer.mutex.Unlock()
90+
require.Len(t, conns, 1)
91+
92+
// the connection should be automatically closed on failure
93+
badData := []byte("} malformed json }}}")
94+
_, err = tbClient.Write(badData)
95+
require.NoError(t, err)
96+
time.Sleep(1 * time.Second)
97+
tbServer.mutex.Lock()
98+
conns = tbServer.connections
99+
tbServer.mutex.Unlock()
100+
require.Empty(t, conns)
101+
}
102+
73103
func TestWrite(t *testing.T) {
74104
_, closeTBServer := createTBServer(t)
75105
defer closeTBServer()
@@ -87,8 +117,8 @@ func TestWrite(t *testing.T) {
87117
}{
88118
{
89119
name: "write",
90-
data: []byte("testdata"),
91-
want: len("testdata") + 1, // +1 due to Delimiter('\n)
120+
data: []byte("{\"testdata\":1}"),
121+
want: len("{\"testdata\":1}") + 1, // +1 due to Delimiter('\n)
92122
wantErr: false,
93123
},
94124
{

0 commit comments

Comments
 (0)