1717
1818import reactor .core .publisher .Flux ;
1919import reactor .core .publisher .Mono ;
20+ import reactor .core .publisher .SignalType ;
2021
2122import java .nio .ByteBuffer ;
2223import java .nio .charset .StandardCharsets ;
2324import java .time .Duration ;
2425import java .util .concurrent .CompletableFuture ;
2526import java .util .concurrent .TimeUnit ;
2627import java .util .concurrent .atomic .AtomicLong ;
28+ import java .util .function .Consumer ;
2729import java .util .function .Function ;
2830
2931import org .springframework .dao .PessimisticLockingFailureException ;
3739import org .springframework .data .redis .util .ByteUtils ;
3840import org .springframework .lang .Nullable ;
3941import org .springframework .util .Assert ;
40- import org .springframework .util .ClassUtils ;
41- import org .springframework .util .ObjectUtils ;
4242
4343/**
4444 * {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
4747 * <p>
4848 * {@link DefaultRedisCacheWriter} can be used in
4949 * {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
50- * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
51- * {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
52- * operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
53- * command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
54- * requests and potential command wait times.
50+ * {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
51+ * aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
52+ * multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
53+ * by setting an explicit lock key and checking against presence of this key which leads to additional requests
54+ * and potential command wait times.
5555 *
5656 * @author Christoph Strobl
5757 * @author Mark Paluch
6161 */
6262class DefaultRedisCacheWriter implements RedisCacheWriter {
6363
64- private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
65- .isPresent ("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory" , null );
64+ private final AsyncCacheWriter asyncCacheWriter ;
6665
6766 private final BatchStrategy batchStrategy ;
6867
@@ -74,33 +73,21 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
7473
7574 private final TtlFunction lockTtl ;
7675
77- private final AsyncCacheWriter asyncCacheWriter ;
78-
79- /**
80- * @param connectionFactory must not be {@literal null}.
81- * @param batchStrategy must not be {@literal null}.
82- */
8376 DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , BatchStrategy batchStrategy ) {
8477 this (connectionFactory , Duration .ZERO , batchStrategy );
8578 }
8679
8780 /**
88- * @param connectionFactory must not be {@literal null}.
89- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
90- * to disable locking.
91- * @param batchStrategy must not be {@literal null}.
81+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
82+ * Use {@link Duration#ZERO} to disable locking.
9283 */
9384 DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , BatchStrategy batchStrategy ) {
9485 this (connectionFactory , sleepTime , TtlFunction .persistent (), CacheStatisticsCollector .none (), batchStrategy );
9586 }
9687
9788 /**
98- * @param connectionFactory must not be {@literal null}.
99- * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
100- * to disable locking.
101- * @param lockTtl Lock TTL function must not be {@literal null}.
102- * @param cacheStatisticsCollector must not be {@literal null}.
103- * @param batchStrategy must not be {@literal null}.
89+ * @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
90+ * Use {@link Duration#ZERO} to disable locking.
10491 */
10592 DefaultRedisCacheWriter (RedisConnectionFactory connectionFactory , Duration sleepTime , TtlFunction lockTtl ,
10693 CacheStatisticsCollector cacheStatisticsCollector , BatchStrategy batchStrategy ) {
@@ -116,12 +103,12 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
116103 this .lockTtl = lockTtl ;
117104 this .statistics = cacheStatisticsCollector ;
118105 this .batchStrategy = batchStrategy ;
106+ this .asyncCacheWriter = isAsyncCacheSupportEnabled () ? new AsynchronousCacheWriterDelegate ()
107+ : UnsupportedAsyncCacheWriter .INSTANCE ;
108+ }
119109
120- if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this .connectionFactory instanceof ReactiveRedisConnectionFactory ) {
121- asyncCacheWriter = new AsynchronousCacheWriterDelegate ();
122- } else {
123- asyncCacheWriter = UnsupportedAsyncCacheWriter .INSTANCE ;
124- }
110+ private boolean isAsyncCacheSupportEnabled () {
111+ return this .connectionFactory instanceof ReactiveRedisConnectionFactory ;
125112 }
126113
127114 @ Override
@@ -162,18 +149,19 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
162149 Assert .notNull (key , "Key must not be null" );
163150
164151 return asyncCacheWriter .retrieve (name , key , ttl ) //
165- .thenApply (cachedValue -> {
152+ .thenApply (cachedValue -> {
166153
167- statistics .incGets (name );
154+ statistics .incGets (name );
168155
169- if (cachedValue != null ) {
170- statistics .incHits (name );
171- } else {
172- statistics .incMisses (name );
173- }
156+ if (cachedValue != null ) {
157+ statistics .incHits (name );
158+ }
159+ else {
160+ statistics .incMisses (name );
161+ }
174162
175- return cachedValue ;
176- });
163+ return cachedValue ;
164+ });
177165 }
178166
179167 @ Override
@@ -186,8 +174,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
186174 execute (name , connection -> {
187175
188176 if (shouldExpireWithin (ttl )) {
189- connection .stringCommands ().set (key , value , Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ),
190- SetOption .upsert ());
177+ connection .stringCommands ().set (key , value , toExpiration (ttl ), SetOption .upsert ());
191178 } else {
192179 connection .stringCommands ().set (key , value );
193180 }
@@ -224,16 +211,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
224211
225212 try {
226213
227- boolean put ;
228-
229- if (shouldExpireWithin (ttl )) {
230- put = ObjectUtils .nullSafeEquals (
231- connection .stringCommands ().set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ()), true );
232- } else {
233- put = ObjectUtils .nullSafeEquals (connection .stringCommands ().setNX (key , value ), true );
234- }
214+ Boolean wasSet = shouldExpireWithin (ttl )
215+ ? connection .stringCommands ().set (key , value , Expiration .from (ttl ), SetOption .ifAbsent ())
216+ : connection .stringCommands ().setNX (key , value );
235217
236- if (put ) {
218+ if (Boolean . TRUE . equals ( wasSet ) ) {
237219 statistics .incPuts (name );
238220 return null ;
239221 }
@@ -378,19 +360,18 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
378360 Thread .sleep (this .sleepTime .toMillis ());
379361 }
380362 } catch (InterruptedException cause ) {
381-
382363 // Re-interrupt current Thread to allow other participants to react.
383364 Thread .currentThread ().interrupt ();
384-
385- throw new PessimisticLockingFailureException (String .format ("Interrupted while waiting to unlock cache %s" , name ),
386- cause );
365+ String message = "Interrupted while waiting to unlock cache %s" .formatted (name );
366+ throw new PessimisticLockingFailureException (message , cause );
387367 } finally {
388368 this .statistics .incLockTime (name , System .nanoTime () - lockWaitTimeNs );
389369 }
390370 }
391371
392372 boolean doCheckLock (String name , RedisConnection connection ) {
393- return ObjectUtils .nullSafeEquals (connection .keyCommands ().exists (createCacheLockKey (name )), true );
373+ Boolean cacheLockExists = connection .keyCommands ().exists (createCacheLockKey (name ));
374+ return Boolean .TRUE .equals (cacheLockExists );
394375 }
395376
396377 byte [] createCacheLockKey (String name ) {
@@ -401,6 +382,14 @@ private static boolean shouldExpireWithin(@Nullable Duration ttl) {
401382 return ttl != null && !ttl .isZero () && !ttl .isNegative ();
402383 }
403384
385+ private Expiration toExpiration (Duration ttl ) {
386+ return Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS );
387+ }
388+
389+ private Expiration toExpiration (Object key , @ Nullable Object value ) {
390+ return Expiration .from (this .lockTtl .getTimeToLive (key , value ));
391+ }
392+
404393 /**
405394 * Interface for asynchronous cache retrieval.
406395 *
@@ -419,8 +408,8 @@ interface AsyncCacheWriter {
419408 * @param name the cache name from which to retrieve the cache entry.
420409 * @param key the cache entry key.
421410 * @param ttl optional TTL to set for Time-to-Idle eviction.
422- * @return a future that completes either with a value if the value exists or completing with {@code null} if the
423- * cache does not contain an entry.
411+ * @return a future that completes either with a value if the value exists or completing with {@code null}
412+ * if the cache does not contain an entry.
424413 */
425414 CompletableFuture <byte []> retrieve (String name , byte [] key , @ Nullable Duration ttl );
426415
@@ -463,8 +452,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
463452 }
464453
465454 /**
466- * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
467- * {@link ReactiveRedisConnectionFactory}.
455+ * Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
456+ * using {@link ReactiveRedisConnectionFactory}.
468457 *
469458 * @since 3.2
470459 */
@@ -481,11 +470,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
481470 return doWithConnection (connection -> {
482471
483472 ByteBuffer wrappedKey = ByteBuffer .wrap (key );
473+
484474 Mono <?> cacheLockCheck = isLockingCacheWriter () ? waitForLock (connection , name ) : Mono .empty ();
475+
485476 ReactiveStringCommands stringCommands = connection .stringCommands ();
486477
487478 Mono <ByteBuffer > get = shouldExpireWithin (ttl )
488- ? stringCommands .getEx (wrappedKey , Expiration . from (ttl ))
479+ ? stringCommands .getEx (wrappedKey , toExpiration (ttl ))
489480 : stringCommands .get (wrappedKey );
490481
491482 return cacheLockCheck .then (get ).map (ByteUtils ::getBytes ).toFuture ();
@@ -498,41 +489,44 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
498489 return doWithConnection (connection -> {
499490
500491 Mono <?> mono = isLockingCacheWriter ()
501- ? doStoreWithLocking (name , key , value , ttl , connection )
492+ ? doLockStoreUnlock (name , key , value , ttl , connection )
502493 : doStore (key , value , ttl , connection );
503494
504495 return mono .then ().toFuture ();
505496 });
506497 }
507498
508- private Mono <Boolean > doStoreWithLocking (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
509- ReactiveRedisConnection connection ) {
510-
511- return Mono .usingWhen (doLock (name , key , value , connection ), unused -> doStore (key , value , ttl , connection ),
512- unused -> doUnlock (name , connection ));
513- }
514-
515499 private Mono <Boolean > doStore (byte [] cacheKey , byte [] value , @ Nullable Duration ttl ,
516500 ReactiveRedisConnection connection ) {
517501
518502 ByteBuffer wrappedKey = ByteBuffer .wrap (cacheKey );
519503 ByteBuffer wrappedValue = ByteBuffer .wrap (value );
520504
521- if (shouldExpireWithin (ttl )) {
522- return connection .stringCommands ().set (wrappedKey , wrappedValue ,
523- Expiration .from (ttl .toMillis (), TimeUnit .MILLISECONDS ), SetOption .upsert ());
524- } else {
525- return connection .stringCommands ().set (wrappedKey , wrappedValue );
526- }
505+ ReactiveStringCommands stringCommands = connection .stringCommands ();
506+
507+ return shouldExpireWithin (ttl )
508+ ? stringCommands .set (wrappedKey , wrappedValue , toExpiration (ttl ), SetOption .upsert ())
509+ : stringCommands .set (wrappedKey , wrappedValue );
527510 }
528511
512+ private Mono <Boolean > doLockStoreUnlock (String name , byte [] key , byte [] value , @ Nullable Duration ttl ,
513+ ReactiveRedisConnection connection ) {
514+
515+ Mono <Object > lock = doLock (name , key , value , connection );
516+
517+ Function <Object , Mono <Boolean >> store = unused -> doStore (key , value , ttl , connection );
518+ Function <Object , Mono <Void >> unlock = unused -> doUnlock (name , connection );
519+
520+ return Mono .usingWhen (lock , store , unlock );
521+ }
529522
530523 private Mono <Object > doLock (String name , Object contextualKey , @ Nullable Object contextualValue ,
531524 ReactiveRedisConnection connection ) {
532525
533526 ByteBuffer key = ByteBuffer .wrap (createCacheLockKey (name ));
534527 ByteBuffer value = ByteBuffer .wrap (new byte [0 ]);
535- Expiration expiration = Expiration .from (lockTtl .getTimeToLive (contextualKey , contextualValue ));
528+
529+ Expiration expiration = toExpiration (contextualKey , contextualValue );
536530
537531 return connection .stringCommands ().set (key , value , expiration , SetOption .SET_IF_ABSENT ) //
538532 // Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
@@ -545,28 +539,52 @@ private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
545539
546540 private Mono <Void > waitForLock (ReactiveRedisConnection connection , String cacheName ) {
547541
548- AtomicLong lockWaitTimeNs = new AtomicLong ();
549- byte [] cacheLockKey = createCacheLockKey (cacheName );
542+ AtomicLong lockWaitNanoTime = new AtomicLong ();
543+
544+ Consumer <org .reactivestreams .Subscription > setNanoTimeOnLockWait = subscription ->
545+ lockWaitNanoTime .set (System .nanoTime ());
550546
551- Flux < Long > wait = Flux . interval ( Duration . ZERO , sleepTime );
552- Mono < Boolean > exists = connection . keyCommands (). exists ( ByteBuffer . wrap ( cacheLockKey )). filter ( it -> ! it );
547+ Consumer < SignalType > recordStatistics = signalType ->
548+ statistics . incLockTime ( cacheName , System . nanoTime () - lockWaitNanoTime . get () );
553549
554- return wait .doOnSubscribe (subscription -> lockWaitTimeNs .set (System .nanoTime ())) //
555- .flatMap (it -> exists ) //
556- .doFinally (signalType -> statistics .incLockTime (cacheName , System .nanoTime () - lockWaitTimeNs .get ())) //
550+ Function <Long , Mono <Boolean >> doWhileCacheLockExists = lockWaitTime -> connection .keyCommands ()
551+ .exists (toCacheLockKey (cacheName )).filter (cacheLockKeyExists -> !cacheLockKeyExists );
552+
553+ return waitInterval (sleepTime ) //
554+ .doOnSubscribe (setNanoTimeOnLockWait ) //
555+ .flatMap (doWhileCacheLockExists ) //
556+ .doFinally (recordStatistics ) //
557557 .next () //
558558 .then ();
559559 }
560560
561+ private Flux <Long > waitInterval (Duration period ) {
562+ return Flux .interval (Duration .ZERO , period );
563+ }
564+
565+ private ByteBuffer toCacheLockKey (String cacheName ) {
566+ return ByteBuffer .wrap (createCacheLockKey (cacheName ));
567+ }
568+
569+ private ReactiveRedisConnectionFactory getReactiveConnectionFactory () {
570+ return (ReactiveRedisConnectionFactory ) DefaultRedisCacheWriter .this .connectionFactory ;
571+ }
572+
573+ private Mono <ReactiveRedisConnection > getReactiveConnection () {
574+ return Mono .fromSupplier (getReactiveConnectionFactory ()::getReactiveConnection );
575+ }
576+
561577 private <T > CompletableFuture <T > doWithConnection (
562578 Function <ReactiveRedisConnection , CompletableFuture <T >> callback ) {
563579
564- ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory ) connectionFactory ;
580+ Function <ReactiveRedisConnection , Mono <T >> commandExecution = connection ->
581+ Mono .fromCompletionStage (callback .apply (connection ));
582+
583+ Function <ReactiveRedisConnection , Mono <Void >> connectionClose = ReactiveRedisConnection ::closeLater ;
584+
585+ Mono <T > result = Mono .usingWhen (getReactiveConnection (), commandExecution , connectionClose );
565586
566- return Mono .usingWhen (Mono .fromSupplier (cf ::getReactiveConnection ), //
567- it -> Mono .fromCompletionStage (callback .apply (it )), //
568- ReactiveRedisConnection ::closeLater ) //
569- .toFuture ();
587+ return result .toFuture ();
570588 }
571589 }
572590}
0 commit comments