Skip to content

Commit 012075c

Browse files
authored
Cancel Http safe client requests before getting connection (#254)
* Fix race condition on gauge shutdown * Refuse requests when trying to get connection instead of when it already has it
1 parent 0931c4e commit 012075c

File tree

4 files changed

+14
-8
lines changed

4 files changed

+14
-8
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ test:
1515
integration-test:
1616
docker-compose -f Dockercompose.test.yml up --build --abort-on-container-exit --always-recreate-deps
1717
docker-compose -f Dockercompose.test.yml down --volumes
18-
18+
1919
clean:
2020
docker-compose -f Dockercompose.test.yml rm -f
2121

2222
kafkacat:
23-
docker run -it --network=host confluentinc/cp-kafkacat kafkacat -b localhost:19092 -C -t gotest -J
23+
docker run -it --network=host confluentinc/cp-kafkacat kafkacat -b localhost:19092 -C -t gotest -J

http/http.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ func (no noLocalTransport) RoundTrip(req *http.Request) (*http.Response, error)
5252
ctx, cancel := context.WithCancel(req.Context())
5353

5454
ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{
55-
GotConn: func(info httptrace.GotConnInfo) {
56-
addr := info.Conn.RemoteAddr().String()
57-
host, _, err := net.SplitHostPort(addr)
55+
GetConn: func(hostPort string) {
56+
host, _, err := net.SplitHostPort(hostPort)
5857
if err != nil {
5958
cancel()
6059
no.errlog.WithError(err).Error("Cancelled request due to error in address parsing")

http/http_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/sirupsen/logrus"
1111
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
1213
)
1314

1415
func TestIsPrivateIP(t *testing.T) {
@@ -44,15 +45,16 @@ func TestSafeHTTPClient(t *testing.T) {
4445

4546
// It blocks the local IP.
4647
_, err = client.Get(ts.URL)
47-
assert.NotNil(t, err)
48+
require.NotNil(t, err)
4849

4950
// It blocks localhost.
5051
_, err = client.Get("http://localhost:" + tsURL.Port())
51-
assert.NotNil(t, err)
52+
require.NotNil(t, err)
5253

5354
// It works when reusing pooled connections.
5455
for i := 0; i < 50; i++ {
55-
_, err = client.Get("http://localhost:" + tsURL.Port())
56+
res, err := client.Get("http://localhost:" + tsURL.Port())
57+
assert.Nil(t, res)
5658
assert.NotNil(t, err)
5759
}
5860

metriks/gauge.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package metriks
22

33
import (
44
"context"
5+
"sync"
56
"sync/atomic"
67
"time"
78

@@ -25,6 +26,7 @@ type PersistentGauge struct {
2526
ticker *time.Ticker
2627
cancel context.CancelFunc
2728
dur time.Duration
29+
wg sync.WaitGroup
2830
}
2931

3032
// Set will replace the value with a new one, it returns the old value
@@ -55,9 +57,11 @@ func (g *PersistentGauge) report(v int32) {
5557
}
5658

5759
func (g *PersistentGauge) start(ctx context.Context) {
60+
g.wg.Add(1)
5861
for {
5962
select {
6063
case <-ctx.Done():
64+
g.wg.Done()
6165
return
6266
case <-g.ticker.C:
6367
g.report(g.value)
@@ -69,6 +73,7 @@ func (g *PersistentGauge) start(ctx context.Context) {
6973
// to the metrics collector
7074
func (g *PersistentGauge) Stop() {
7175
g.cancel()
76+
g.wg.Wait()
7277
}
7378

7479
// NewPersistentGauge will create and start a PersistentGauge that reports the current value every 10s

0 commit comments

Comments
 (0)