Skip to content

Commit a4df40f

Browse files
authored
Merge branch 'master' into fix-driver-conns-metric
2 parents 01560f5 + 329010f commit a4df40f

32 files changed

+776
-215
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353
fail-fast: false
5454
matrix:
5555
go-version: [1.21.x, 1.24.x]
56-
ydb-version: [24.1, 24.2, 24.3, 24.4, 25.1]
56+
ydb-version: [latest, 24.4, 25.1]
5757
os: [ubuntu]
5858
services:
5959
ydb:

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
* Pinged new connections on discovery attempt, closed dropped ones, so `ydb_go_sdk_ydb_driver_conns` metric is correct
22

3+
## v3.108.4
4+
* Removed `experimental` from coordination API
5+
* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields
6+
37
## v3.108.3
48
* Fixed handling of zero values for DyNumber
59
* Fixed the decimal yql slice bounds out of range

coordination/client.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ type Client interface {
2525
// - call Close on the Session,
2626
// - close the Client which the session was created with,
2727
// - call any method of the Session until the ErrSessionClosed is returned.
28-
//
29-
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
3028
Session(ctx context.Context, path string, opts ...options.SessionOption) (Session, error)
3129
}
3230

go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
github.com/jonboulle/clockwork v0.5.0
1111
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77
1212
go.uber.org/goleak v1.3.0
13+
go.uber.org/zap v1.27.0
1314
golang.org/x/net v0.38.0
1415
golang.org/x/sync v0.12.0
1516
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
@@ -20,17 +21,18 @@ require (
2021
// requires for tests only
2122
require (
2223
github.com/rekby/fixenv v0.6.1
23-
github.com/stretchr/testify v1.8.0
24+
github.com/stretchr/testify v1.10.0
2425
go.uber.org/mock v0.4.0
2526
)
2627

2728
require (
2829
github.com/davecgh/go-spew v1.1.1 // indirect
29-
github.com/kr/text v0.2.0 // indirect
3030
github.com/pmezard/go-difflib v1.0.0 // indirect
31+
go.uber.org/multierr v1.10.0 // indirect
3132
golang.org/x/sys v0.31.0 // indirect
3233
golang.org/x/text v0.23.0 // indirect
3334
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
35+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
3436
gopkg.in/yaml.v3 v3.0.1 // indirect
3537
)
3638

go.sum

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
1111
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1212
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1313
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
14-
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
1514
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1615
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1716
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -60,8 +59,10 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
6059
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
6160
github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
6261
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
63-
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
64-
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
62+
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
63+
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
64+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
65+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
6566
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
6667
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
6768
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -71,12 +72,10 @@ github.com/rekby/fixenv v0.6.1 h1:jUFiSPpajT4WY2cYuc++7Y1zWrnCxnovGCIX72PZniM=
7172
github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+XWk4c=
7273
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
7374
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
74-
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
7575
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
7676
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
77-
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
78-
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
79-
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
77+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
78+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
8079
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q=
8180
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
8281
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
@@ -94,6 +93,10 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
9493
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
9594
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
9695
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
96+
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
97+
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
98+
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
99+
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
97100
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
98101
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
99102
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -171,8 +174,8 @@ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
171174
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
172175
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
173176
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
174-
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
175-
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
177+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
178+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
176179
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
177180
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
178181
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/grpcwrapper/rawtopic/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (c *Client) StreamWrite(
126126
Stream: protoResp,
127127
Tracer: tracer,
128128
InternalStreamID: uuid.New().String(),
129+
LogContext: &ctxStreamLifeTime,
129130
}, nil
130131
}
131132

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rawtopicwriter
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -34,6 +35,7 @@ type StreamWriter struct {
3435
readMessagesCount int
3536
writtenMessagesCount int
3637
sessionID string
38+
LogContext *context.Context
3739
}
3840

3941
//nolint:funlen
@@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
5052
defer func() {
5153
// defer needs for set good session id on first init response before trace the message
5254
trace.TopicOnWriterReceiveGRPCMessage(
53-
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
55+
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
5456
)
5557
}()
5658
if sendErr != nil {
@@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
139141

140142
err = w.Stream.Send(&protoMsg)
141143
w.writtenMessagesCount++
142-
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
144+
trace.TopicOnWriterSentGRPCMessage(
145+
w.Tracer,
146+
w.LogContext,
147+
w.InternalStreamID,
148+
w.sessionID,
149+
w.writtenMessagesCount,
150+
&protoMsg,
151+
err,
152+
)
143153
if err != nil {
144154
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
145155
}

internal/topic/topicclientinternal/client.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ func (c *Client) StartReader(
322322
if err != nil {
323323
return nil, err
324324
}
325-
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
325+
326+
internalReader.TopicOnReaderStart(consumer, err)
326327

327328
return topicreader.NewReader(internalReader), nil
328329
}
@@ -365,15 +366,8 @@ func (c *Client) createWriterConfig(
365366
topicPath string,
366367
opts []topicoptions.WriterOption,
367368
) topicwriterinternal.WriterReconnectorConfig {
368-
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
369-
topicwriterinternal.RawTopicWriterStream,
370-
error,
371-
) {
372-
return c.rawClient.StreamWrite(ctx, tracer)
373-
}
374-
375369
options := []topicoptions.WriterOption{
376-
topicwriterinternal.WithConnectFunc(connector),
370+
topicwriterinternal.WithRawClient(&c.rawClient),
377371
topicwriterinternal.WithTopic(topicPath),
378372
topicwriterinternal.WithCommonConfig(c.cfg.Common),
379373
topicwriterinternal.WithTrace(c.cfg.Trace),

internal/topic/topicreadercommon/committer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {
148148

149149
onDone := trace.TopicOnReaderSendCommitMessage(
150150
c.tracer,
151+
&ctx,
151152
&commits,
152153
)
153154
err := c.send(commits.ToRawMessage())

internal/topic/topicreaderinternal/batched_stream_reader_interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ type batchedStreamReader interface {
1515
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
1616
CloseWithError(ctx context.Context, err error) error
1717
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
18+
TopicOnReaderStart(consumer string, err error)
1819
}

internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

Lines changed: 36 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicreaderinternal/reader.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ type Reader struct {
3939
readerID int64
4040
}
4141

42+
func (r *Reader) TopicOnReaderStart(consumer string, err error) {
43+
r.reader.TopicOnReaderStart(consumer, err)
44+
}
45+
4246
type ReadMessageBatchOptions struct {
4347
batcherGetOptions
4448
}
@@ -93,14 +97,17 @@ func NewReader(
9397
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
9498
}
9599

100+
reader := newReaderReconnector(
101+
cfg.BaseContext,
102+
readerID,
103+
readerConnector,
104+
cfg.OperationTimeout(),
105+
cfg.RetrySettings,
106+
cfg.Trace,
107+
)
108+
96109
res := Reader{
97-
reader: newReaderReconnector(
98-
readerID,
99-
readerConnector,
100-
cfg.OperationTimeout(),
101-
cfg.RetrySettings,
102-
cfg.Trace,
103-
),
110+
reader: reader,
104111
defaultBatchConfig: cfg.DefaultBatchConfig,
105112
tracer: cfg.Trace,
106113
readerID: readerID,

0 commit comments

Comments
 (0)