Skip to content

Commit 2cac138

Browse files
authored
Merge pull request #314 from rita7lopes/master
Add ping to authenticate
2 parents e925b43 + 486a7db commit 2cac138

File tree

2 files changed

+54
-11
lines changed

2 files changed

+54
-11
lines changed

publish/hep.go

+36-11
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ type HEPOutputer struct {
2828
msgPing []byte
2929
}
3030

31+
func writeAndFlush(client *HEPConn, data []byte, action string) (int, error) {
32+
hl, err := client.conn.Write(data)
33+
if err != nil {
34+
promstats.HepFileFlushesError.Inc()
35+
return 0, fmt.Errorf("error writing to socket during %s: %w", action, err)
36+
}
37+
38+
if err := client.writer.Flush(); err != nil {
39+
promstats.HepFileFlushesError.Inc()
40+
return 0, fmt.Errorf("error flushing writer during %s: %w", action, err)
41+
}
42+
43+
return hl, nil
44+
}
45+
3146
func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) {
3247
a := strings.Split(cutSpace(serverAddr), ",")
3348
l := len(a)
@@ -43,6 +58,11 @@ func NewHEPOutputer(serverAddr string) (*HEPOutputer, error) {
4358
errCnt++
4459
} else {
4560
if config.Cfg.HEPBufferEnable {
61+
logp.Debug("collector", "send ping packet after disconnect")
62+
_, err := writeAndFlush(&h.client[n], h.msgPing, "ping operation establish connection")
63+
if err != nil {
64+
return nil, err
65+
}
4666
if _, err := os.Stat(config.Cfg.HEPBufferFile); err == nil {
4767
if _, err := h.copyHEPFileOut(n); err != nil {
4868
logp.Err("Sending HEP from file error: %v", err)
@@ -161,8 +181,10 @@ func (h *HEPOutputer) Send(msg []byte) {
161181
logp.Debug("Connection is not up", fmt.Sprintf("index: %d, Len: %d, once: %v", n, len(h.addr), onceSent))
162182
err = fmt.Errorf("connection is broken")
163183
} else {
164-
h.client[n].writer.Write(msg)
165-
err = h.client[n].writer.Flush()
184+
_, err := writeAndFlush(&h.client[n], msg, "sending message")
185+
if err != nil {
186+
logp.Err("Failed to send message: %s", err.Error())
187+
}
166188
}
167189

168190
if err != nil {
@@ -183,17 +205,15 @@ func (h *HEPOutputer) Send(msg []byte) {
183205
} else {
184206
if h.msgPing != nil {
185207
logp.Debug("collector", "send ping packet after disconnect")
186-
h.client[n].writer.Write(h.msgPing)
187-
err = h.client[n].writer.Flush()
208+
_, err := writeAndFlush(&h.client[n], h.msgPing, "Error during resend ping packet")
188209
if err != nil {
189-
logp.Err("Bad during resend ping packet : %v", err)
210+
logp.Err("Error sending ping packet: %s", err.Error())
190211
}
191212
}
192213

193-
h.client[n].writer.Write(msg)
194-
err = h.client[n].writer.Flush()
214+
_, err := writeAndFlush(&h.client[n], msg, "Bad resend")
195215
if err != nil {
196-
logp.Err("Bad resend: %v", err)
216+
logp.Err(err.Error())
197217
if config.Cfg.HEPBufferEnable && (!onceSent && n == (len(h.addr)-1)) {
198218
h.copyHEPbufftoFile(msg)
199219
}
@@ -241,11 +261,16 @@ func (h *HEPOutputer) copyHEPFileOut(n int) (int, error) {
241261
return 0, fmt.Errorf("Connection is broken")
242262
}
243263

244-
//Send Logged HEP upon reconnect out to backend
245-
hl, err := h.client[n].conn.Write(HEPFileData)
264+
hl, err := writeAndFlush(&h.client[n], h.msgPing, "ping operation")
265+
if err != nil {
266+
return 0, err
267+
}
268+
269+
// Send Logged HEP upon reconnect
270+
hl, err = writeAndFlush(&h.client[n], HEPFileData, "HEP reconnect")
246271
if err != nil {
247272
promstats.HepFileFlushesError.Inc()
248-
return 0, fmt.Errorf("Bad write to socket")
273+
return 0, err
249274
}
250275

251276
err = h.client[n].writer.Flush()

sniffer/sniffer.go

+18
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,12 @@ LOOP:
374374
conn, err := sniffer.collectorTCPconn.Accept()
375375
if err != nil {
376376
logp.Err("Error accepting tcp connection: %s", err.Error())
377+
// Close connection if it exists
378+
if conn != nil {
379+
conn.Close()
380+
}
381+
// Add delay before next attempt
382+
time.Sleep(time.Second)
377383
continue
378384
}
379385

@@ -657,6 +663,9 @@ func (sniffer *SnifferSetup) handleRequestSimple(conn net.Conn) {
657663
_, err := conn.Read(message)
658664
if err != nil {
659665
fmt.Println("Error reading:", err.Error())
666+
if conn != nil {
667+
conn.Close() // Ensure the connection is closed
668+
}
660669
break
661670
}
662671

@@ -674,6 +683,9 @@ func (sniffer *SnifferSetup) handleRequestSimple(conn net.Conn) {
674683
_, err := conn.Read(data)
675684
if err != nil {
676685
fmt.Println("Error reading:", err.Error())
686+
if conn != nil {
687+
conn.Close() // Ensure the connection is closed
688+
}
677689
break
678690
}
679691

@@ -715,6 +727,9 @@ func (sniffer *SnifferSetup) handleRequestExtended(conn net.Conn) {
715727
n, err := conn.Read(message)
716728
if err != nil {
717729
logp.Err("Incoming tcp connection closed during read with error [1]: %s", err.Error())
730+
if conn != nil {
731+
conn.Close() // Ensure the connection is closed
732+
}
718733
break
719734
}
720735

@@ -778,6 +793,9 @@ func (sniffer *SnifferSetup) handleRequestExtended(conn net.Conn) {
778793
if err != nil {
779794
logp.Err("Incoming tcp connection closed during direct read from buffer with error [2]: %s", err.Error())
780795
bufferPool.Reset()
796+
if conn != nil {
797+
conn.Close() // Ensure the connection is closed
798+
}
781799
break
782800
}
783801

0 commit comments

Comments
 (0)