Skip to content

Commit 63ed57a

Browse files
committed
Add new trace hook for endpoint discovery
1 parent 3146f19 commit 63ed57a

File tree

8 files changed

+113
-62
lines changed

8 files changed

+113
-62
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
* Pinged new connections on discovery attempt, closed dropped ones, so `ydb_go_sdk_ydb_driver_conns` metric is correct
1+
* Fixed `ydb_go_sdk_ydb_driver_conns` metric to correctly display nodes that are discovered at the moment
22

33
## v3.110.1
44
* Added the ability to send BulkRequest exceeding the GrpcMaxMessageSize

internal/balancer/balancer.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,24 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context, cc *grpc.ClientC
175175
return nil
176176
}
177177

178-
func buildConnectionsState(ctx context.Context, pool interface {
179-
GetIfPresent(endpoint endpoint.Endpoint) conn.Conn
180-
Allow(ctx context.Context, cc conn.Conn)
181-
EndpointsToConnections(endpoints []endpoint.Endpoint) []conn.Conn
182-
}, newest []endpoint.Endpoint,
178+
func buildConnectionsState(ctx context.Context,
179+
pool interface {
180+
GetIfPresent(endpoint endpoint.Endpoint) conn.Conn
181+
Allow(ctx context.Context, cc conn.Conn)
182+
EndpointsToConnections(endpoints []endpoint.Endpoint) []conn.Conn
183+
},
184+
newest []endpoint.Endpoint,
183185
dropped []endpoint.Endpoint,
184-
config balancerConfig.Config,
186+
balancerConfig *balancerConfig.Config,
185187
selfLocation balancerConfig.Info,
186188
) *connectionsState {
187189
connections := pool.EndpointsToConnections(newest)
188190
for _, c := range connections {
189191
pool.Allow(ctx, c)
190192
c.Endpoint().Touch()
191-
_ = c.Ping(ctx)
192193
}
193194

194-
state := newConnectionsState(connections, config.Filter, selfLocation, config.AllowFallback)
195+
state := newConnectionsState(connections, balancerConfig.Filter, selfLocation, balancerConfig.AllowFallback)
195196

196197
for _, e := range dropped {
197198
c := pool.GetIfPresent(e)
@@ -229,7 +230,14 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
229230
}()
230231

231232
info := balancerConfig.Info{SelfLocation: localDC}
232-
b.connectionsState.Store(buildConnectionsState(ctx, b.pool, newest, dropped, b.balancerConfig, info))
233+
b.connectionsState.Store(buildConnectionsState(ctx, b.pool, newest, dropped, &b.balancerConfig, info))
234+
for _, e := range added {
235+
trace.DriverOnConnDiscover(
236+
b.driverConfig.Trace(), &ctx,
237+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer.(*Balancer).applyDiscoveredEndpoints"),
238+
e,
239+
)
240+
}
233241
}
234242

235243
func (b *Balancer) Close(ctx context.Context) (err error) {

internal/balancer/balancer_test.go

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ func (fp *fakePool) EndpointsToConnections(eps []endpoint.Endpoint) []conn.Conn
2727
return conns
2828
}
2929

30-
func (fp *fakePool) Allow(_ context.Context, _ conn.Conn) {}
30+
func (fp *fakePool) Allow(_ context.Context, c conn.Conn) {
31+
if c, ok := fp.connections[c.Endpoint().Address()]; ok {
32+
c.Allowed.Store(true)
33+
}
34+
}
3135

3236
func (fp *fakePool) GetIfPresent(ep endpoint.Endpoint) conn.Conn {
3337
if c, ok := fp.connections[ep.Address()]; ok {
@@ -41,40 +45,18 @@ func TestBuildConnectionsState(t *testing.T) {
4145
ctx := context.Background()
4246

4347
tests := []struct {
44-
name string
45-
newEndpoints []endpoint.Endpoint
46-
oldEndpoints []endpoint.Endpoint
47-
initialConns map[string]*mock.Conn
48-
conf balancerConfig.Config
49-
selfLoc balancerConfig.Info
50-
expectPinged []string
51-
expectClosed []string
48+
name string
49+
newEndpoints []endpoint.Endpoint
50+
oldEndpoints []endpoint.Endpoint
51+
initialConns map[string]*mock.Conn
52+
conf balancerConfig.Config
53+
selfLoc balancerConfig.Info
54+
expectAllowed []string
55+
expectClosed []string
5256
}{
53-
{
54-
name: "single new and old endpoint",
55-
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "127.0.0.1"}},
56-
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "127.0.0.2"}},
57-
initialConns: map[string]*mock.Conn{
58-
"127.0.0.1": {
59-
AddrField: "127.0.0.1",
60-
State: conn.Online,
61-
},
62-
"127.0.0.2": {
63-
AddrField: "127.0.0.2",
64-
State: conn.Offline,
65-
},
66-
},
67-
conf: balancerConfig.Config{
68-
AllowFallback: true,
69-
DetectNearestDC: true,
70-
},
71-
selfLoc: balancerConfig.Info{SelfLocation: "local"},
72-
expectPinged: []string{"127.0.0.1"},
73-
expectClosed: []string{"127.0.0.2"},
74-
},
7557
{
7658
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a1"}, &mock.Endpoint{AddrField: "a2"}},
77-
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a3"}},
59+
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a3"}, &mock.Endpoint{AddrField: "a4"}},
7860
initialConns: map[string]*mock.Conn{
7961
"a1": {
8062
AddrField: "a1",
@@ -84,19 +66,24 @@ func TestBuildConnectionsState(t *testing.T) {
8466
"a2": {
8567
AddrField: "a2",
8668
State: conn.Offline,
69+
PingErr: ErrNoEndpoints,
8770
},
8871
"a3": {
8972
AddrField: "a3",
9073
State: conn.Online,
9174
},
75+
"a4": {
76+
AddrField: "a4",
77+
State: conn.Online,
78+
},
9279
},
9380
conf: balancerConfig.Config{
9481
AllowFallback: true,
9582
DetectNearestDC: true,
9683
},
97-
selfLoc: balancerConfig.Info{SelfLocation: "local"},
98-
expectPinged: []string{"a1", "a2"},
99-
expectClosed: []string{"a3"},
84+
selfLoc: balancerConfig.Info{SelfLocation: "local"},
85+
expectAllowed: []string{"a1", "a2"},
86+
expectClosed: []string{"a3", "a4"},
10087
},
10188
}
10289

@@ -107,12 +94,11 @@ func TestBuildConnectionsState(t *testing.T) {
10794
fp.connections[addr] = c
10895
}
10996

110-
state := buildConnectionsState(ctx, fp, tt.newEndpoints, tt.oldEndpoints, tt.conf, tt.selfLoc)
97+
state := buildConnectionsState(ctx, fp, tt.newEndpoints, tt.oldEndpoints, &tt.conf, tt.selfLoc)
11198
assert.NotNil(t, state)
112-
for _, addr := range tt.expectPinged {
99+
for _, addr := range tt.expectAllowed {
113100
c := fp.connections[addr]
114-
assert.True(t, c.Pinged.Load(), "connection %s should be pinged", addr)
115-
assert.True(t, c.State == conn.Online || c.PingErr != nil)
101+
assert.True(t, c.Allowed.Load(), "connection %s should be allowed", addr)
116102
}
117103
for _, addr := range tt.expectClosed {
118104
c := fp.connections[addr]

internal/mock/conn.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type Conn struct {
1818
NodeIDField uint32
1919
State conn.State
2020
LocalDCField bool
21-
Pinged atomic.Bool
21+
Allowed atomic.Bool
2222
Closed atomic.Bool
2323
}
2424

@@ -64,7 +64,6 @@ func (c *Conn) Close(ctx context.Context) error {
6464
}
6565

6666
func (c *Conn) Ping(ctx context.Context) error {
67-
c.Pinged.Store(true)
6867
if c.PingErr == nil {
6968
c.SetState(ctx, conn.Online)
7069
}
@@ -97,6 +96,7 @@ type Endpoint struct {
9796
LocationField string
9897
NodeIDField uint32
9998
LocalDCField bool
99+
Touched uint32
100100
}
101101

102102
func (e *Endpoint) Choose(bool) {
@@ -145,4 +145,5 @@ func (e *Endpoint) Copy() endpoint.Endpoint {
145145
}
146146

147147
func (e *Endpoint) Touch(opts ...endpoint.Option) {
148+
e.Touched++
148149
}

log/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver {
460460
if d.Details()&trace.DriverBalancerEvents == 0 {
461461
return nil
462462
}
463-
ctx := with(*info.Context, TRACE, "ydb", "driver", "balancer", "update")
463+
ctx := with(*info.Context, INFO, "ydb", "driver", "balancer", "update")
464464
l.Log(ctx, "driver balancer update starting...",
465465
kv.Bool("needLocalDC", info.NeedLocalDC),
466466
kv.String("database", info.Database),

metrics/driver.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,19 +139,15 @@ func driver(config Config) (t trace.Driver) {
139139
}
140140
}
141141
}
142-
t.OnConnDial = func(info trace.DriverConnDialStartInfo) func(trace.DriverConnDialDoneInfo) {
142+
t.OnConnDiscover = func(info trace.DriverConnDiscoverInfo) {
143143
endpoint := info.Endpoint.Address()
144144
nodeID := info.Endpoint.NodeID()
145145

146-
return func(info trace.DriverConnDialDoneInfo) {
147-
if config.Details()&trace.DriverConnEvents != 0 {
148-
if info.Error == nil {
149-
conns.With(map[string]string{
150-
"endpoint": endpoint,
151-
"node_id": idToString(nodeID),
152-
}).Add(1)
153-
}
154-
}
146+
if config.Details()&trace.DriverConnEvents != 0 {
147+
conns.With(map[string]string{
148+
"endpoint": endpoint,
149+
"node_id": idToString(nodeID),
150+
}).Add(1)
155151
}
156152
}
157153
t.OnConnClose = func(info trace.DriverConnCloseStartInfo) func(trace.DriverConnCloseDoneInfo) {

trace/driver.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ type (
5252
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
5353
OnConnDial func(DriverConnDialStartInfo) func(DriverConnDialDoneInfo)
5454
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
55+
OnConnDiscover func(DriverConnDiscoverInfo)
56+
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
5557
OnConnBan func(DriverConnBanStartInfo) func(DriverConnBanDoneInfo)
5658
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
5759
OnConnAllow func(DriverConnAllowStartInfo) func(DriverConnAllowDoneInfo)
@@ -273,6 +275,16 @@ type (
273275
Error error
274276
}
275277
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
278+
DriverConnDiscoverInfo struct {
279+
// Context make available context in trace callback function.
280+
// Pointer to context provide replacement of context in trace callback function.
281+
// Warning: concurrent access to pointer on client side must be excluded.
282+
// Safe replacement of context are provided only inside callback function
283+
Context *context.Context
284+
Call call
285+
Endpoint EndpointInfo
286+
}
287+
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
276288
DriverConnDialStartInfo struct {
277289
// Context make available context in trace callback function.
278290
// Pointer to context provide replacement of context in trace callback function.
@@ -282,10 +294,24 @@ type (
282294
Call call
283295
Endpoint EndpointInfo
284296
}
297+
285298
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
286299
DriverConnDialDoneInfo struct {
287300
Error error
288301
}
302+
DriverConnDiscoverStartInfo struct {
303+
// Context make available context in trace callback function.
304+
// Pointer to context provide replacement of context in trace callback function.
305+
// Warning: concurrent access to pointer on client side must be excluded.
306+
// Safe replacement of context are provided only inside callback function
307+
Context *context.Context
308+
Call call
309+
Endpoint EndpointInfo
310+
}
311+
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
312+
DriverConnDiscoverDoneInfo struct {
313+
Error error
314+
}
289315
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
290316
DriverConnParkStartInfo struct {
291317
// Context make available context in trace callback function.

trace/driver_gtrace.go

Lines changed: 34 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)