Skip to content

Commit b32fc05

Browse files
Feat: Customize retry error type and processing handler (#8)
Signed-off-by: bodong.ybd <[email protected]>
1 parent aff619b commit b32fc05

File tree

4 files changed

+164
-0
lines changed

4 files changed

+164
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.valkey;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import java.util.Optional;
6+
import java.util.function.Predicate;
7+
8+
public class ExceptionHandler {
9+
private Map<Predicate<String>, ErrorCallback> patternCallbacks;
10+
11+
public interface ErrorCallback {
12+
void onError(String errorMessage);
13+
}
14+
15+
public ExceptionHandler() {
16+
this.patternCallbacks = new HashMap<>();
17+
}
18+
19+
public void register(Predicate<String> pattern, ErrorCallback callback) {
20+
patternCallbacks.put(pattern, callback);
21+
}
22+
23+
// This method allows the registration of a new error pattern and its corresponding callback.
24+
// It takes a Predicate<String> that defines the error pattern to match,
25+
// and an ErrorCallback that will be executed when the pattern is matched.
26+
// This enables dynamic handling of different types of errors based on their messages.
27+
public void handleException(Exception e) {
28+
Optional.ofNullable(e.getMessage()).ifPresent(errorMessage -> {
29+
patternCallbacks.forEach((pattern, callback) -> {
30+
if (pattern.test(errorMessage)) {
31+
callback.onError(errorMessage);
32+
}
33+
});
34+
});
35+
}
36+
}
37+

src/main/java/io/valkey/UnifiedJedis.java

+5
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,11 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
291291
this(new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider);
292292
}
293293

294+
public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
295+
ExceptionHandler handler) {
296+
this(new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration, handler), provider);
297+
}
298+
294299
/**
295300
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
296301
* <p>

src/main/java/io/valkey/executors/RetryableCommandExecutor.java

+14
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
import io.valkey.CommandObject;
88
import io.valkey.Connection;
9+
import io.valkey.ExceptionHandler;
910
import io.valkey.annots.VisibleForTesting;
11+
import io.valkey.exceptions.JedisDataException;
1012
import io.valkey.util.IOUtils;
1113
import org.slf4j.Logger;
1214
import org.slf4j.LoggerFactory;
@@ -22,12 +24,19 @@ public class RetryableCommandExecutor implements CommandExecutor {
2224
protected final ConnectionProvider provider;
2325
protected final int maxAttempts;
2426
protected final Duration maxTotalRetriesDuration;
27+
protected final ExceptionHandler handler;
2528

2629
public RetryableCommandExecutor(ConnectionProvider provider, int maxAttempts,
2730
Duration maxTotalRetriesDuration) {
31+
this(provider, maxAttempts, maxTotalRetriesDuration, null);
32+
}
33+
34+
public RetryableCommandExecutor(ConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
35+
ExceptionHandler handler) {
2836
this.provider = provider;
2937
this.maxAttempts = maxAttempts;
3038
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
39+
this.handler = handler;
3140
}
3241

3342
@Override
@@ -58,6 +67,11 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
5867
if (reset) {
5968
consecutiveConnectionFailures = 0;
6069
}
70+
} catch (JedisException e) {
71+
lastException = e;
72+
if (handler != null) {
73+
handler.handleException(e);
74+
}
6175
} finally {
6276
if (connection != null) {
6377
connection.close();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package io.valkey.util;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.UUID;
6+
7+
import io.valkey.DefaultJedisClientConfig;
8+
import io.valkey.ExceptionHandler;
9+
import io.valkey.HostAndPort;
10+
import io.valkey.HostAndPorts;
11+
import io.valkey.Jedis;
12+
import io.valkey.UnifiedJedis;
13+
import io.valkey.providers.PooledConnectionProvider;
14+
import org.junit.Assert;
15+
import org.junit.Before;
16+
import org.junit.Test;
17+
18+
public class ExceptionHandlerTest {
19+
private final HostAndPort hostAndPort = HostAndPorts.getRedisServers().get(0);
20+
private Jedis jedis;
21+
22+
@Before
23+
public void setUp() {
24+
jedis = new Jedis(hostAndPort);
25+
jedis.auth("foobared");
26+
}
27+
28+
@Test
29+
public void retryWithExceptionHandler() {
30+
int maxAttempts = 10;
31+
Duration maxTotalRetriesDuration = Duration.ofSeconds(10);
32+
PooledConnectionProvider provider = new PooledConnectionProvider(hostAndPort,
33+
DefaultJedisClientConfig.builder().password("foobared").build());
34+
ExceptionHandler handler = new ExceptionHandler();
35+
handler.register(
36+
message -> message.contains("WRONGTYPE Operation against a key holding the wrong kind of value"),
37+
errorMessage -> {
38+
try {
39+
System.out.println("Retrying...");
40+
Thread.sleep(1000);
41+
} catch (InterruptedException e) {
42+
throw new RuntimeException(e);
43+
}
44+
}
45+
);
46+
47+
String key = UUID.randomUUID().toString();
48+
Assert.assertEquals("OK", jedis.set(key, key));
49+
new Thread(new Runnable() {
50+
@Override
51+
public void run() {
52+
try {
53+
// after 5s, del this key.
54+
Thread.sleep(5000);
55+
Assert.assertEquals(1, jedis.del(key));
56+
} catch (Exception e) {
57+
e.printStackTrace();
58+
}
59+
}
60+
}).start();
61+
62+
UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler);
63+
Map<String, String> map = unifiedJedis.hgetAll(key);
64+
Assert.assertTrue(map.isEmpty());
65+
}
66+
67+
@Test
68+
public void retryWithExponentialBackoffCallback() {
69+
int maxAttempts = 4;
70+
Duration maxTotalRetriesDuration = Duration.ofSeconds(20);
71+
PooledConnectionProvider provider = new PooledConnectionProvider(hostAndPort,
72+
DefaultJedisClientConfig.builder().password("foobared").build());
73+
String key = UUID.randomUUID().toString();
74+
Assert.assertEquals("OK", jedis.set(key, key));
75+
ExceptionHandler handler = new ExceptionHandler();
76+
class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback {
77+
private int attempt = 0; // 计数器
78+
79+
@Override
80+
public void onError(String errorMessage) {
81+
// 计算 sleep 时间(2^attempt 秒)
82+
int sleepTime = (int) Math.pow(2, attempt);
83+
try {
84+
System.out.println("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage);
85+
Thread.sleep(sleepTime * 1000); // 转换为毫秒
86+
if (attempt == maxAttempts - 2) {
87+
Assert.assertEquals(1, jedis.del(key));
88+
}
89+
} catch (InterruptedException ie) {
90+
Thread.currentThread().interrupt(); // 恢复中断状态
91+
}
92+
attempt++; // 增加计数
93+
}
94+
}
95+
96+
handler.register(
97+
message -> message.contains("WRONGTYPE Operation against a key holding the wrong kind of value"),
98+
new ExponentialBackoffCallback()
99+
);
100+
101+
UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler);
102+
long start = System.currentTimeMillis();
103+
Map<String, String> map = unifiedJedis.hgetAll(key);
104+
long end = System.currentTimeMillis();
105+
Assert.assertTrue(map.isEmpty());
106+
Assert.assertTrue(end - start >= 7000); // 2^0 + 2^1 + 2^2 = 7 seconds
107+
}
108+
}

0 commit comments

Comments
 (0)