Skip to content

Commit 8a61462

Browse files
committed
feat: extend RetryDelayFn to take the command to be retried
Signed-off-by: Rueian <[email protected]>
1 parent 113c567 commit 8a61462

File tree

6 files changed

+81
-80
lines changed

6 files changed

+81
-80
lines changed

client.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ retry:
4949
resp = c.conn.Do(ctx, cmd)
5050
if c.retry && cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
5151
shouldRetry := c.retryHandler.WaitOrSkipRetry(
52-
ctx, attempts, resp.Error(),
52+
ctx, attempts, cmd, resp.Error(),
5353
)
5454
if shouldRetry {
5555
attempts++
@@ -87,10 +87,10 @@ func (c *singleClient) DoMulti(ctx context.Context, multi ...Completed) (resps [
8787
retry:
8888
resps = c.conn.DoMulti(ctx, multi...).s
8989
if c.retry && allReadOnly(multi) {
90-
for _, resp := range resps {
90+
for i, resp := range resps {
9191
if c.isRetryable(resp.NonRedisError(), ctx) {
9292
shouldRetry := c.retryHandler.WaitOrSkipRetry(
93-
ctx, attempts, resp.Error(),
93+
ctx, attempts, multi[i], resp.Error(),
9494
)
9595
if shouldRetry {
9696
attempts++
@@ -115,10 +115,10 @@ func (c *singleClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
115115
retry:
116116
resps = c.conn.DoMultiCache(ctx, multi...).s
117117
if c.retry {
118-
for _, resp := range resps {
118+
for i, resp := range resps {
119119
if c.isRetryable(resp.NonRedisError(), ctx) {
120120
shouldRetry := c.retryHandler.WaitOrSkipRetry(
121-
ctx, attempts, resp.Error(),
121+
ctx, attempts, Completed(multi[i].Cmd), resp.Error(),
122122
)
123123
if shouldRetry {
124124
attempts++
@@ -140,7 +140,7 @@ func (c *singleClient) DoCache(ctx context.Context, cmd Cacheable, ttl time.Dura
140140
retry:
141141
resp = c.conn.DoCache(ctx, cmd, ttl)
142142
if c.retry && c.isRetryable(resp.NonRedisError(), ctx) {
143-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
143+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
144144
if shouldRetry {
145145
attempts++
146146
goto retry
@@ -158,7 +158,7 @@ retry:
158158
err = c.conn.Receive(ctx, subscribe, fn)
159159
if c.retry {
160160
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
161-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
161+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
162162
if shouldRetry {
163163
attempts++
164164
goto retry
@@ -217,7 +217,7 @@ retry:
217217
resp = c.wire.Do(ctx, cmd)
218218
if c.retry && cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
219219
shouldRetry := c.retryHandler.WaitOrSkipRetry(
220-
ctx, attempts, resp.Error(),
220+
ctx, attempts, cmd, resp.Error(),
221221
)
222222
if shouldRetry {
223223
attempts++
@@ -247,7 +247,7 @@ retry:
247247
for i, cmd := range multi {
248248
if retryable && isRetryable(resp[i].NonRedisError(), c.wire, ctx) {
249249
shouldRetry := c.retryHandler.WaitOrSkipRetry(
250-
ctx, attempts, resp[i].Error(),
250+
ctx, attempts, multi[i], resp[i].Error(),
251251
)
252252
if shouldRetry {
253253
attempts++
@@ -271,7 +271,7 @@ retry:
271271
if c.retry {
272272
if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) {
273273
shouldRetry := c.retryHandler.WaitOrSkipRetry(
274-
ctx, attempts, err,
274+
ctx, attempts, subscribe, err,
275275
)
276276
if shouldRetry {
277277
attempts++

client_test.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -698,21 +698,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
698698
c, m := setup()
699699
if cli, ok := c.(*sentinelClient); ok {
700700
cli.retryHandler = &mockRetryHandler{
701-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
701+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
702702
return false
703703
},
704704
}
705705
}
706706
if cli, ok := c.(*clusterClient); ok {
707707
cli.retryHandler = &mockRetryHandler{
708-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
708+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
709709
return false
710710
},
711711
}
712712
}
713713
if cli, ok := c.(*singleClient); ok {
714714
cli.retryHandler = &mockRetryHandler{
715-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
715+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
716716
return false
717717
},
718718
}
@@ -768,17 +768,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
768768
c, m := setup()
769769
if cli, ok := c.(*sentinelClient); ok {
770770
cli.retryHandler = &mockRetryHandler{
771-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
771+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
772772
return false
773773
},
774774
}
775775
}
776776
if cli, ok := c.(*clusterClient); ok {
777777
cli.retryHandler = &mockRetryHandler{
778-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
778+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
779779
return false
780780
},
781-
RetryDelayFn: func(attempts int, err error) time.Duration {
781+
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
782782
return -1
783783
},
784784
WaitForRetryFn: func(ctx context.Context, duration time.Duration) {
@@ -790,7 +790,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
790790
}
791791
if cli, ok := c.(*singleClient); ok {
792792
cli.retryHandler = &mockRetryHandler{
793-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
793+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
794794
return false
795795
},
796796
}
@@ -846,21 +846,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
846846
c, m := setup()
847847
if cli, ok := c.(*sentinelClient); ok {
848848
cli.retryHandler = &mockRetryHandler{
849-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
849+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
850850
return false
851851
},
852852
}
853853
}
854854
if cli, ok := c.(*clusterClient); ok {
855855
cli.retryHandler = &mockRetryHandler{
856-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
856+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
857857
return false
858858
},
859859
}
860860
}
861861
if cli, ok := c.(*singleClient); ok {
862862
cli.retryHandler = &mockRetryHandler{
863-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
863+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
864864
return false
865865
},
866866
}
@@ -908,17 +908,17 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
908908
c, m := setup()
909909
if cli, ok := c.(*sentinelClient); ok {
910910
cli.retryHandler = &mockRetryHandler{
911-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
911+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
912912
return false
913913
},
914914
}
915915
}
916916
if cli, ok := c.(*clusterClient); ok {
917917
cli.retryHandler = &mockRetryHandler{
918-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
918+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
919919
return false
920920
},
921-
RetryDelayFn: func(attempts int, err error) time.Duration {
921+
RetryDelayFn: func(attempts int, _ Completed, err error) time.Duration {
922922
return -1
923923
},
924924
WaitForRetryFn: func(ctx context.Context, duration time.Duration) {
@@ -930,7 +930,7 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
930930
}
931931
if cli, ok := c.(*singleClient); ok {
932932
cli.retryHandler = &mockRetryHandler{
933-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
933+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
934934
return false
935935
},
936936
}
@@ -975,21 +975,21 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
975975
c, m := setup()
976976
if cli, ok := c.(*sentinelClient); ok {
977977
cli.retryHandler = &mockRetryHandler{
978-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
978+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
979979
return false
980980
},
981981
}
982982
}
983983
if cli, ok := c.(*clusterClient); ok {
984984
cli.retryHandler = &mockRetryHandler{
985-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
985+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
986986
return false
987987
},
988988
}
989989
}
990990
if cli, ok := c.(*singleClient); ok {
991991
cli.retryHandler = &mockRetryHandler{
992-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
992+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
993993
return false
994994
},
995995
}
@@ -1052,14 +1052,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
10521052
if ret := c.Dedicated(func(cc DedicatedClient) error {
10531053
if cli, ok := cc.(*dedicatedClusterClient); ok {
10541054
cli.retryHandler = &mockRetryHandler{
1055-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
1055+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
10561056
return false
10571057
},
10581058
}
10591059
}
10601060
if cli, ok := cc.(*dedicatedSingleClient); ok {
10611061
cli.retryHandler = &mockRetryHandler{
1062-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
1062+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
10631063
return false
10641064
},
10651065
}
@@ -1137,14 +1137,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
11371137
if ret := c.Dedicated(func(cc DedicatedClient) error {
11381138
if cli, ok := cc.(*dedicatedClusterClient); ok {
11391139
cli.retryHandler = &mockRetryHandler{
1140-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
1140+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
11411141
return false
11421142
},
11431143
}
11441144
}
11451145
if cli, ok := cc.(*dedicatedSingleClient); ok {
11461146
cli.retryHandler = &mockRetryHandler{
1147-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
1147+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
11481148
return false
11491149
},
11501150
}
@@ -1216,14 +1216,14 @@ func SetupClientRetry(t *testing.T, fn func(mock *mockConn) Client) {
12161216
if ret := c.Dedicated(func(cc DedicatedClient) error {
12171217
if cli, ok := cc.(*dedicatedClusterClient); ok {
12181218
cli.retryHandler = &mockRetryHandler{
1219-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
1219+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
12201220
return false
12211221
},
12221222
}
12231223
}
12241224
if cli, ok := cc.(*dedicatedSingleClient); ok {
12251225
cli.retryHandler = &mockRetryHandler{
1226-
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, err error) bool {
1226+
WaitOrSkipRetryFunc: func(ctx context.Context, attempts int, _ Completed, err error) bool {
12271227
return false
12281228
},
12291229
}

cluster.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ process:
481481
goto process
482482
case RedirectRetry:
483483
if c.retry && cmd.IsReadOnly() {
484-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
484+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, cmd, resp.Error())
485485
if shouldRetry {
486486
attempts++
487487
goto retry
@@ -614,7 +614,7 @@ func (c *clusterClient) doresultfn(
614614
continue
615615
}
616616

617-
retryDelay = c.retryHandler.RetryDelay(attempts, resp.Error())
617+
retryDelay = c.retryHandler.RetryDelay(attempts, cm, resp.Error())
618618
} else {
619619
nc = c.redirectOrNew(addr, cc, cm.Slot(), mode)
620620
}
@@ -753,7 +753,7 @@ process:
753753
}
754754
case RedirectRetry:
755755
if c.retry && allReadOnly(multi) {
756-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
756+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, multi[i], resp.Error())
757757
if shouldRetry {
758758
resultsp.Put(resps)
759759
attempts++
@@ -786,7 +786,7 @@ process:
786786
goto process
787787
case RedirectRetry:
788788
if c.retry {
789-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, resp.Error())
789+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, Completed(cmd), resp.Error())
790790
if shouldRetry {
791791
attempts++
792792
goto retry
@@ -930,7 +930,7 @@ func (c *clusterClient) resultcachefn(
930930
continue
931931
}
932932

933-
retryDelay = c.retryHandler.RetryDelay(attempts, resp.Error())
933+
retryDelay = c.retryHandler.RetryDelay(attempts, Completed(cm.Cmd), resp.Error())
934934
} else {
935935
nc = c.redirectOrNew(addr, cc, cm.Cmd.Slot(), mode)
936936
}
@@ -1037,7 +1037,7 @@ retry:
10371037
}
10381038
err = cc.Receive(ctx, subscribe, fn)
10391039
if _, mode := c.shouldRefreshRetry(err, ctx); c.retry && mode != RedirectNone {
1040-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
1040+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
10411041
if shouldRetry {
10421042
attempts++
10431043
goto retry
@@ -1222,7 +1222,7 @@ retry:
12221222
case RedirectRetry:
12231223
if c.retry && cmd.IsReadOnly() && w.Error() == nil {
12241224
shouldRetry := c.retryHandler.WaitOrSkipRetry(
1225-
ctx, attempts, resp.Error(),
1225+
ctx, attempts, cmd, resp.Error(),
12261226
)
12271227
if shouldRetry {
12281228
attempts++
@@ -1253,11 +1253,11 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...Completed
12531253
retry:
12541254
if w, err := c.acquire(ctx, slot); err == nil {
12551255
resp = w.DoMulti(ctx, multi...).s
1256-
for _, r := range resp {
1256+
for i, r := range resp {
12571257
_, mode := c.client.shouldRefreshRetry(r.Error(), ctx)
12581258
if mode == RedirectRetry && retryable && w.Error() == nil {
12591259
shouldRetry := c.retryHandler.WaitOrSkipRetry(
1260-
ctx, attempts, r.Error(),
1260+
ctx, attempts, multi[i], r.Error(),
12611261
)
12621262
if shouldRetry {
12631263
attempts++
@@ -1291,7 +1291,7 @@ retry:
12911291
if w, err = c.acquire(ctx, subscribe.Slot()); err == nil {
12921292
err = w.Receive(ctx, subscribe, fn)
12931293
if _, mode := c.client.shouldRefreshRetry(err, ctx); c.retry && mode == RedirectRetry && w.Error() == nil {
1294-
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, err)
1294+
shouldRetry := c.retryHandler.WaitOrSkipRetry(ctx, attempts, subscribe, err)
12951295
if shouldRetry {
12961296
attempts++
12971297
goto retry

0 commit comments

Comments
 (0)