Skip to content

Commit 7575fbb

Browse files
Work with the same routeMap and nameToReplicasetRef within one method (resolve issue #14)
1 parent 0f6388e commit 7575fbb

File tree

4 files changed

+43
-22
lines changed

4 files changed

+43
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ CHANGES:
88
* Add configurable pause before retrying r.Route in Router.Call method.
99
* Add ability to set custom dialer in InstaceInfo.
1010
* Router.Call: retry on VShardErrNameTransferIsInProgress error too (resolve issue #75).
11+
* Work with the same routeMap and nameToReplicasetRef within one method (resolve issue #14).
1112

1213
BUG FIXES:
1314
* Router.bucketSearchBatched: store rsFuture.rs instead of rs for bucketID.

api.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -249,13 +249,13 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
249249
// poolMode, vshardMode = pool.PreferRO, ReadMode
250250
// since go-tarantool always use balance=true politic,
251251
// we can't support this case until: https://github.com/tarantool/go-tarantool/issues/400
252-
return VshardRouterCallResp{}, fmt.Errorf("mode VshardCallModeRE is not supported yet")
252+
return VshardRouterCallResp{}, fmt.Errorf("mode CallModeRE is not supported yet")
253253
case CallModeBRO:
254254
poolMode, vshardMode = pool.ANY, ReadMode
255255
case CallModeBRE:
256256
poolMode, vshardMode = pool.PreferRO, ReadMode
257257
default:
258-
return VshardRouterCallResp{}, fmt.Errorf("unknown VshardCallMode(%d)", mode)
258+
return VshardRouterCallResp{}, fmt.Errorf("unknown CallMode(%d)", mode)
259259
}
260260

261261
timeout := callTimeoutDefault
@@ -277,6 +277,9 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
277277

278278
requestStartTime := time.Now()
279279

280+
nameToReplicasetRef := r.getNameToReplicaset()
281+
routerMap := r.getRouteMap()
282+
280283
var err error
281284

282285
for {
@@ -293,7 +296,7 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
293296

294297
var rs *Replicaset
295298

296-
rs, err = r.Route(ctx, bucketID)
299+
rs, err = r.route(ctx, nameToReplicasetRef, routerMap, bucketID)
297300
if err != nil {
298301
r.metrics().RetryOnCall("bucket_resolve_error")
299302

@@ -330,14 +333,12 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
330333
switch vshardError.Name {
331334
case VShardErrNameWrongBucket, VShardErrNameBucketIsLocked, VShardErrNameTransferIsInProgress:
332335
// We reproduce here behavior in https://github.com/tarantool/vshard/blob/0.1.34/vshard/router/init.lua#L667
333-
r.BucketReset(bucketID)
336+
r.bucketReset(routerMap, bucketID)
334337

335338
destination := vshardError.Destination
336339
if destination != "" {
337340
var loggedOnce bool
338341
for {
339-
nameToReplicasetRef := r.getNameToReplicaset()
340-
341342
// In some cases destination contains UUID (prior to tnt 3.x), in some cases it contains replicaset name.
342343
// So, at this point we don't know what destination is: a name or an UUID.
343344
// But we need a name to access values in nameToReplicasetRef map, so let's find it out.
@@ -359,11 +360,11 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
359360
}
360361

361362
if destinationExists {
362-
_, err := r.BucketSet(bucketID, destinationName)
363+
_, err := r.bucketSet(nameToReplicasetRef, routerMap, bucketID, destinationName)
363364
if err == nil {
364365
break // breaks loop
365366
}
366-
r.log().Warnf(ctx, "Failed set bucket %d to %v (possible race): %v", bucketID, destinationName, err)
367+
r.log().Warnf(ctx, "Failed set bucket %d to %v (this should not happen): %v", bucketID, destinationName, err)
367368
}
368369

369370
if !loggedOnce {
@@ -378,6 +379,9 @@ func (r *Router) Call(ctx context.Context, bucketID uint64, mode CallMode,
378379
if spent := time.Since(requestStartTime); spent > timeout {
379380
return VshardRouterCallResp{}, vshardError
380381
}
382+
383+
// update nameToReplicasetRef explicitly before next try, the topology might changed.
384+
nameToReplicasetRef = r.getNameToReplicaset()
381385
}
382386
}
383387

discovery.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,26 @@ func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error
5252
}
5353

5454
routeMap := r.getRouteMap()
55+
nameToReplicasetRef := r.getNameToReplicaset()
56+
57+
return r.route(ctx, nameToReplicasetRef, routeMap, bucketID)
58+
}
59+
60+
func (r *Router) route(ctx context.Context, nameToReplicasetRef map[string]*Replicaset,
61+
routeMap routeMap, bucketID uint64) (*Replicaset, error) {
5562

5663
rs := routeMap[bucketID].Load()
5764
if rs != nil {
58-
nameToReplicasetRef := r.getNameToReplicaset()
59-
6065
actualRs := nameToReplicasetRef[rs.info.Name]
6166
switch {
6267
case actualRs == nil:
6368
// rs is outdated, can't use it -- let's discover bucket again
64-
r.BucketReset(bucketID)
69+
r.bucketReset(routeMap, bucketID)
6570
case actualRs == rs:
6671
return rs, nil
6772
default: // actualRs != rs
6873
// update rs -> actualRs for this bucket
69-
_, _ = r.BucketSet(bucketID, actualRs.info.Name)
74+
_, _ = r.bucketSet(nameToReplicasetRef, routeMap, bucketID, actualRs.info.Name)
7075
return actualRs, nil
7176
}
7277
}
@@ -75,14 +80,14 @@ func (r *Router) Route(ctx context.Context, bucketID uint64) (*Replicaset, error
7580
r.log().Infof(ctx, "Discovering bucket %d", bucketID)
7681

7782
if r.cfg.BucketsSearchMode == BucketsSearchLegacy {
78-
return r.bucketSearchLegacy(ctx, bucketID)
83+
return r.bucketSearchLegacy(ctx, nameToReplicasetRef, routeMap, bucketID)
7984
}
8085

81-
return r.bucketSearchBatched(ctx, bucketID)
86+
return r.bucketSearchBatched(ctx, nameToReplicasetRef, routeMap, bucketID)
8287
}
8388

84-
func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Replicaset, error) {
85-
nameToReplicasetRef := r.getNameToReplicaset()
89+
func (r *Router) bucketSearchLegacy(ctx context.Context,
90+
nameToReplicasetRef map[string]*Replicaset, routeMap routeMap, bucketID uint64) (*Replicaset, error) {
8691

8792
type rsFuture struct {
8893
rsName string
@@ -109,7 +114,7 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
109114
}
110115

111116
// It's ok if several replicasets return ok to bucket_stat command for the same bucketID, just pick any of them.
112-
rs, err := r.BucketSet(bucketID, rsFuture.rsName)
117+
rs, err := r.bucketSet(nameToReplicasetRef, routeMap, bucketID, rsFuture.rsName)
113118
if err != nil {
114119
r.log().Errorf(ctx, "bucketSearchLegacy: can't set rsID %v for bucketID %d: %v", rsFuture.rsName, bucketID, err)
115120
return nil, newVShardErrorNoRouteToBucket(bucketID)
@@ -133,9 +138,8 @@ func (r *Router) bucketSearchLegacy(ctx context.Context, bucketID uint64) (*Repl
133138
// P.S. 1000 is a batch size in response of buckets_discovery, see:
134139
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/storage/init.lua#L1700
135140
// https://github.com/tarantool/vshard/blob/dfa2cc8a2aff221d5f421298851a9a229b2e0434/vshard/consts.lua#L37
136-
func (r *Router) bucketSearchBatched(ctx context.Context, bucketIDToFind uint64) (*Replicaset, error) {
137-
nameToReplicasetRef := r.getNameToReplicaset()
138-
routeMap := r.getRouteMap()
141+
func (r *Router) bucketSearchBatched(ctx context.Context,
142+
nameToReplicasetRef map[string]*Replicaset, routeMap routeMap, bucketIDToFind uint64) (*Replicaset, error) {
139143

140144
type rsFuture struct {
141145
rs *Replicaset

vshard.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,25 +238,37 @@ func NewRouter(ctx context.Context, cfg Config) (*Router, error) {
238238

239239
// BucketSet Set a bucket to a replicaset.
240240
func (r *Router) BucketSet(bucketID uint64, rsName string) (*Replicaset, error) {
241+
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
242+
return nil, fmt.Errorf("bucket id is out of range: %d (total %d)", bucketID, r.cfg.TotalBucketCount)
243+
}
244+
241245
nameToReplicasetRef := r.getNameToReplicaset()
246+
routeMap := r.getRouteMap()
247+
248+
return r.bucketSet(nameToReplicasetRef, routeMap, bucketID, rsName)
249+
}
242250

251+
func (r *Router) bucketSet(nameToReplicasetRef map[string]*Replicaset, routeMap routeMap, bucketID uint64, rsName string) (*Replicaset, error) {
243252
rs := nameToReplicasetRef[rsName]
244253
if rs == nil {
245254
return nil, newVShardErrorNoRouteToBucket(bucketID)
246255
}
247256

248-
routeMap := r.getRouteMap()
249257
routeMap[bucketID].Store(rs)
250258

251259
return rs, nil
252260
}
253261

254262
func (r *Router) BucketReset(bucketID uint64) {
255-
if bucketID > r.cfg.TotalBucketCount {
263+
if bucketID < 1 || r.cfg.TotalBucketCount < bucketID {
256264
return
257265
}
258266

259267
routeMap := r.getRouteMap()
268+
r.bucketReset(routeMap, bucketID)
269+
}
270+
271+
func (r *Router) bucketReset(routeMap routeMap, bucketID uint64) {
260272
routeMap[bucketID].Store(nil)
261273
}
262274

0 commit comments

Comments
 (0)