Skip to content

close dropped connection on balancer.Update() #1694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed `ydb_go_sdk_ydb_driver_conns` metric to correctly display nodes that are discovered at the moment

## v3.111.0
* Added `sugar.PrintErrorWithoutStack` helper for remove stack records from error string
* Added `sugar.UnwrapError` helper for unwrap source error to root errors
Expand Down
61 changes: 35 additions & 26 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,35 @@ func (b *Balancer) clusterDiscoveryAttempt(ctx context.Context, cc *grpc.ClientC
return nil
}

func buildConnectionsState(ctx context.Context,
pool interface {
GetIfPresent(endpoint endpoint.Endpoint) conn.Conn
Allow(ctx context.Context, cc conn.Conn)
EndpointsToConnections(endpoints []endpoint.Endpoint) []conn.Conn
},
newest []endpoint.Endpoint,
dropped []endpoint.Endpoint,
balancerConfig *balancerConfig.Config,
selfLocation balancerConfig.Info,
) *connectionsState {
connections := pool.EndpointsToConnections(newest)
for _, c := range connections {
pool.Allow(ctx, c)
c.Endpoint().Touch()
}

state := newConnectionsState(connections, balancerConfig.Filter, selfLocation, balancerConfig.AllowFallback)

for _, e := range dropped {
c := pool.GetIfPresent(e)
if c != nil {
_ = c.Close(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the connection is closed, requests that are performing long-running stream reading may be interrupted. Required graceful termination of requests.

}
}

return state
}

func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoint.Endpoint, localDC string) {
var (
onDone = trace.DriverOnBalancerUpdate(
Expand All @@ -186,10 +215,12 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
)
previous = b.connections().All()
)

_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
return strings.Compare(lhs.Address(), rhs.Address())
})

defer func() {
_, added, dropped := xslices.Diff(previous, newest, func(lhs, rhs endpoint.Endpoint) int {
return strings.Compare(lhs.Address(), rhs.Address())
})
onDone(
xslices.Transform(newest, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
xslices.Transform(added, func(t endpoint.Endpoint) trace.EndpointInfo { return t }),
Expand All @@ -198,21 +229,8 @@ func (b *Balancer) applyDiscoveredEndpoints(ctx context.Context, newest []endpoi
)
}()

connections := endpointsToConnections(b.pool, newest)
for _, c := range connections {
b.pool.Allow(ctx, c)
c.Endpoint().Touch()
}

info := balancerConfig.Info{SelfLocation: localDC}
state := newConnectionsState(connections, b.balancerConfig.Filter, info, b.balancerConfig.AllowFallback)

endpointsInfo := make([]endpoint.Info, len(newest))
for i, e := range newest {
endpointsInfo[i] = e
}

b.connectionsState.Store(state)
b.connectionsState.Store(buildConnectionsState(ctx, b.pool, newest, dropped, &b.balancerConfig, info))
}

func (b *Balancer) Close(ctx context.Context) (err error) {
Expand Down Expand Up @@ -444,12 +462,3 @@ func (b *Balancer) nextConn(ctx context.Context) (c conn.Conn, err error) {

return c, nil
}

func endpointsToConnections(p *conn.Pool, endpoints []endpoint.Endpoint) []conn.Conn {
conns := make([]conn.Conn, 0, len(endpoints))
for _, e := range endpoints {
conns = append(conns, p.Get(e))
}

return conns
}
110 changes: 110 additions & 0 deletions internal/balancer/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package balancer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

balancerConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/balancer/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/conn"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/mock"
)

type fakePool struct {
connections map[string]*mock.Conn
}

func (fp *fakePool) EndpointsToConnections(eps []endpoint.Endpoint) []conn.Conn {
var conns []conn.Conn
for _, ep := range eps {
if c, ok := fp.connections[ep.Address()]; ok {
conns = append(conns, c)
}
}

return conns
}

func (fp *fakePool) Allow(_ context.Context, c conn.Conn) {
if c, ok := fp.connections[c.Endpoint().Address()]; ok {
c.Allowed.Store(true)
}
}

func (fp *fakePool) GetIfPresent(ep endpoint.Endpoint) conn.Conn {
if c, ok := fp.connections[ep.Address()]; ok {
return c
}

return nil
}

func TestBuildConnectionsState(t *testing.T) {
ctx := context.Background()

tests := []struct {
name string
newEndpoints []endpoint.Endpoint
oldEndpoints []endpoint.Endpoint
initialConns map[string]*mock.Conn
conf balancerConfig.Config
selfLoc balancerConfig.Info
expectAllowed []string
expectClosed []string
}{
{
newEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a1"}, &mock.Endpoint{AddrField: "a2"}},
oldEndpoints: []endpoint.Endpoint{&mock.Endpoint{AddrField: "a3"}, &mock.Endpoint{AddrField: "a4"}},
initialConns: map[string]*mock.Conn{
"a1": {
AddrField: "a1",
LocationField: "local",
State: conn.Offline,
},
"a2": {
AddrField: "a2",
State: conn.Offline,
PingErr: ErrNoEndpoints,
},
"a3": {
AddrField: "a3",
State: conn.Online,
},
"a4": {
AddrField: "a4",
State: conn.Online,
},
},
conf: balancerConfig.Config{
AllowFallback: true,
DetectNearestDC: true,
},
selfLoc: balancerConfig.Info{SelfLocation: "local"},
expectAllowed: []string{"a1", "a2"},
expectClosed: []string{"a3", "a4"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fp := &fakePool{connections: make(map[string]*mock.Conn)}
for addr, c := range tt.initialConns {
fp.connections[addr] = c
}

state := buildConnectionsState(ctx, fp, tt.newEndpoints, tt.oldEndpoints, &tt.conf, tt.selfLoc)
assert.NotNil(t, state)
for _, addr := range tt.expectAllowed {
c := fp.connections[addr]
assert.True(t, c.Allowed.Load(), "connection %s should be allowed", addr)
}
for _, addr := range tt.expectClosed {
c := fp.connections[addr]
assert.True(t, c.Closed.Load(), "connection %s should be closed", addr)
assert.True(t, c.State == conn.Offline, "connection %s should be offline", addr)
}
})
}
}
2 changes: 2 additions & 0 deletions internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
Expand All @@ -36,6 +37,7 @@ var (

type Conn interface {
grpc.ClientConnInterface
closer.Closer

Endpoint() endpoint.Endpoint

Expand Down
23 changes: 23 additions & 0 deletions internal/conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,20 @@ func (p *Pool) GrpcDialOptions() []grpc.DialOption {
return p.dialOptions
}

func (p *Pool) GetIfPresent(endpoint endpoint.Endpoint) Conn {
var (
address = endpoint.Address()
cc *conn
has bool
)

if cc, has = p.conns.Get(address); has {
return cc
}

return nil
}

func (p *Pool) Get(endpoint endpoint.Endpoint) Conn {
var (
address = endpoint.Address()
Expand Down Expand Up @@ -252,3 +266,12 @@ func NewPool(ctx context.Context, config Config) *Pool {

return p
}

func (p *Pool) EndpointsToConnections(endpoints []endpoint.Endpoint) []Conn {
conns := make([]Conn, 0, len(endpoints))
for _, e := range endpoints {
conns = append(conns, p.Get(e))
}

return conns
}
18 changes: 17 additions & 1 deletion internal/mock/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"context"
"sync/atomic"
"time"

"google.golang.org/grpc"
Expand All @@ -17,6 +18,8 @@ type Conn struct {
NodeIDField uint32
State conn.State
LocalDCField bool
Allowed atomic.Bool
Closed atomic.Bool
}

func (c *Conn) Invoke(
Expand Down Expand Up @@ -53,7 +56,18 @@ func (c *Conn) Park(ctx context.Context) (err error) {
panic("not implemented in mock")
}

func (c *Conn) Close(ctx context.Context) error {
c.Closed.Store(true)
c.SetState(ctx, conn.Offline)

return nil
}

func (c *Conn) Ping(ctx context.Context) error {
if c.PingErr == nil {
c.SetState(ctx, conn.Online)
}

return c.PingErr
}

Expand Down Expand Up @@ -82,6 +96,7 @@ type Endpoint struct {
LocationField string
NodeIDField uint32
LocalDCField bool
Touched uint32
}

func (e *Endpoint) Choose(bool) {
Expand Down Expand Up @@ -116,7 +131,7 @@ func (e *Endpoint) LoadFactor() float32 {
}

func (e *Endpoint) OverrideHost() string {
panic("not implemented in mock")
return ""
}

func (e *Endpoint) String() string {
Expand All @@ -130,4 +145,5 @@ func (e *Endpoint) Copy() endpoint.Endpoint {
}

func (e *Endpoint) Touch(opts ...endpoint.Option) {
e.Touched++
}
2 changes: 1 addition & 1 deletion log/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func internalDriver(l Logger, d trace.Detailer) trace.Driver {
if d.Details()&trace.DriverBalancerEvents == 0 {
return nil
}
ctx := with(*info.Context, TRACE, "ydb", "driver", "balancer", "update")
ctx := with(*info.Context, INFO, "ydb", "driver", "balancer", "update")
l.Log(ctx, "driver balancer update starting...",
kv.Bool("needLocalDC", info.NeedLocalDC),
kv.String("database", info.Database),
Expand Down
47 changes: 47 additions & 0 deletions tests/integration/ydb_conn_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//go:build integration
// +build integration

package integration

import (
"context"
"testing"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/metrics"
"github.com/ydb-platform/ydb-go-sdk/v3/query"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
)

func TestYDBConnMetrics(t *testing.T) {
var (
scope = newScope(t)
registry = &registryConfig{
details: trace.DriverConnEvents,
gauges: newVec[gaugeVec](),
counters: newVec[counterVec](),
timers: newVec[timerVec](),
histograms: newVec[histogramVec](),
}
)

db, err := ydb.Open(scope.Ctx, "grpc://localhost:2136/local", metrics.WithTraces(registry))
scope.Require.NoError(err)
defer db.Close(scope.Ctx) // cleanup resources

db.Query().Do(scope.Ctx, func(ctx context.Context, s query.Session) error {
return s.Exec(ctx, "SELECT 42")
})

metric := "ydb.driver.conns"
labelsNames := []string{"endpoint", "node_id"}
labels := map[string]string{"endpoint": "localhost:2136", "node_id": "1"}

gauges := registry.gauges.data[nameLabelNamesToString(metric, labelsNames)]
scope.Require.NotNil(gauges)

labeledGauges := gauges.gauges[nameLabelValuesToString(metric, labels)]
scope.Require.NotNil(labeledGauges)

scope.Require.EqualValues(1, labeledGauges.value)
}
Loading