@@ -43,7 +43,8 @@ public class KvGcExecutor {
43
43
private final KvConfig kvConfig ;
44
44
private final MpscSlotHashExecutor deleteExecutor ;
45
45
private final ThreadPoolExecutor submitExecutor ;
46
- private final ScheduledThreadPoolExecutor scheduleExecutor ;
46
+ private final ScheduledThreadPoolExecutor scheduler ;
47
+ private final ThreadPoolExecutor scheduleExecutor ;
47
48
private RedisTemplate redisTemplate ;
48
49
49
50
public KvGcExecutor (KVClient kvClient , KeyDesign keyDesign , KvConfig kvConfig ) {
@@ -55,7 +56,9 @@ public KvGcExecutor(KVClient kvClient, KeyDesign keyDesign, KvConfig kvConfig) {
55
56
kvConfig .gcExecutorQueueSize (), new MpscSlotHashExecutor .CallerRunsPolicy ());
56
57
this .submitExecutor = new ThreadPoolExecutor (1 , 1 , 0 , TimeUnit .SECONDS ,
57
58
new LinkedBlockingQueue <>(1024 *128 ), new CamelliaThreadFactory ("camellia-kv-gc-submit" ), new ThreadPoolExecutor .AbortPolicy ());
58
- this .scheduleExecutor = new ScheduledThreadPoolExecutor (1 , new CamelliaThreadFactory ("camellia-kv-gc-scheduler" ));
59
+ this .scheduler = new ScheduledThreadPoolExecutor (1 , new CamelliaThreadFactory ("camellia-kv-gc-scheduler" ));
60
+ this .scheduleExecutor = new ThreadPoolExecutor (2 , 2 , 0 , TimeUnit .SECONDS ,
61
+ new LinkedBlockingQueue <>(32 ), new CamelliaThreadFactory ("camellia-kv-gc-scheduler-executor" ));
59
62
60
63
boolean gcScheduleEnable = ProxyDynamicConf .getBoolean ("kv.gc.schedule.enable" , false );
61
64
if (gcScheduleEnable ) {
@@ -74,7 +77,7 @@ public KvGcExecutor(KVClient kvClient, KeyDesign keyDesign, KvConfig kvConfig) {
74
77
}
75
78
76
79
public void start () {
77
- scheduleExecutor .scheduleAtFixedRate (() -> {
80
+ scheduler .scheduleAtFixedRate (() -> {
78
81
try {
79
82
boolean gcScheduleEnable = ProxyDynamicConf .getBoolean ("kv.gc.schedule.enable" , false );
80
83
if (!gcScheduleEnable ) {
@@ -95,12 +98,17 @@ public void start() {
95
98
return ;
96
99
}
97
100
98
- boolean scanMetaKeysSuccess = true ;
101
+ //meta-key scan
102
+ Future <Boolean > metaKeyScanFuture ;
99
103
if (!kvClient .supportTTL ()) {
100
- scanMetaKeysSuccess = scanMetaKeys ();
104
+ metaKeyScanFuture = scheduleExecutor .submit (this ::scanMetaKeys );
105
+ } else {
106
+ metaKeyScanFuture = CompletableFuture .completedFuture (true );
101
107
}
102
- boolean scanSubKeysSuccess = scanSubKeys ();
103
- if (scanMetaKeysSuccess && scanSubKeysSuccess ) {
108
+ //sub-key scan
109
+ Future <Boolean > subKeyScanFuture = scheduleExecutor .submit (this ::scanSubKeys );
110
+ //
111
+ if (metaKeyScanFuture .get () && subKeyScanFuture .get ()) {
104
112
KvGcEnv .updateGcTime (namespace , System .currentTimeMillis ());
105
113
}
106
114
} catch (Throwable e ) {
0 commit comments