Skip to content

Commit 2b5586f

Browse files
committed
Refine and clarify operations in asynchronous caching implementation.
Uses more descriptive names for operations, especially Reactive operations, by either calling a local, private method or introducing a (functional) variable with strongly typed parameters. Edits and refines Javadoc. Original Pull Request: #2717 Closes #2743
1 parent 9a81e6c commit 2b5586f

File tree

1 file changed

+111
-75
lines changed

1 file changed

+111
-75
lines changed

Diff for: src/main/java/org/springframework/data/redis/cache/DefaultRedisCacheWriter.java

+111-75
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
*/
1616
package org.springframework.data.redis.cache;
1717

18-
import reactor.core.publisher.Flux;
19-
import reactor.core.publisher.Mono;
20-
2118
import java.nio.ByteBuffer;
2219
import java.nio.charset.StandardCharsets;
2320
import java.time.Duration;
2421
import java.util.concurrent.CompletableFuture;
2522
import java.util.concurrent.TimeUnit;
2623
import java.util.concurrent.atomic.AtomicLong;
24+
import java.util.function.Consumer;
2725
import java.util.function.Function;
2826

2927
import org.springframework.dao.PessimisticLockingFailureException;
@@ -38,7 +36,10 @@
3836
import org.springframework.lang.Nullable;
3937
import org.springframework.util.Assert;
4038
import org.springframework.util.ClassUtils;
41-
import org.springframework.util.ObjectUtils;
39+
40+
import reactor.core.publisher.Flux;
41+
import reactor.core.publisher.Mono;
42+
import reactor.core.publisher.SignalType;
4243

4344
/**
4445
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
@@ -47,11 +48,11 @@
4748
* <p>
4849
* {@link DefaultRedisCacheWriter} can be used in
4950
* {@link RedisCacheWriter#lockingRedisCacheWriter(RedisConnectionFactory) locking} or
50-
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While
51-
* {@literal non-locking} aims for maximum performance it may result in overlapping, non-atomic, command execution for
52-
* operations spanning multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents
53-
* command overlap by setting an explicit lock key and checking against presence of this key which leads to additional
54-
* requests and potential command wait times.
51+
* {@link RedisCacheWriter#nonLockingRedisCacheWriter(RedisConnectionFactory) non-locking} mode. While {@literal non-locking}
52+
* aims for maximum performance it may result in overlapping, non-atomic, command execution for operations spanning
53+
* multiple Redis interactions like {@code putIfAbsent}. The {@literal locking} counterpart prevents command overlap
54+
* by setting an explicit lock key and checking against presence of this key which leads to additional requests
55+
* and potential command wait times.
5556
*
5657
* @author Christoph Strobl
5758
* @author Mark Paluch
@@ -61,8 +62,12 @@
6162
*/
6263
class DefaultRedisCacheWriter implements RedisCacheWriter {
6364

64-
private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT = ClassUtils
65-
.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
65+
public static final boolean FLUX_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", null);
66+
67+
private static final boolean REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT =
68+
ClassUtils.isPresent("org.springframework.data.redis.connection.ReactiveRedisConnectionFactory", null);
69+
70+
private final AsyncCacheWriter asyncCacheWriter;
6671

6772
private final BatchStrategy batchStrategy;
6873

@@ -74,8 +79,6 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
7479

7580
private final TtlFunction lockTtl;
7681

77-
private final AsyncCacheWriter asyncCacheWriter;
78-
7982
/**
8083
* @param connectionFactory must not be {@literal null}.
8184
* @param batchStrategy must not be {@literal null}.
@@ -86,8 +89,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
8689

8790
/**
8891
* @param connectionFactory must not be {@literal null}.
89-
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
90-
* to disable locking.
92+
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
93+
* Use {@link Duration#ZERO} to disable locking.
9194
* @param batchStrategy must not be {@literal null}.
9295
*/
9396
DefaultRedisCacheWriter(RedisConnectionFactory connectionFactory, Duration sleepTime, BatchStrategy batchStrategy) {
@@ -96,8 +99,8 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
9699

97100
/**
98101
* @param connectionFactory must not be {@literal null}.
99-
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}. Use {@link Duration#ZERO}
100-
* to disable locking.
102+
* @param sleepTime sleep time between lock request attempts. Must not be {@literal null}.
103+
* Use {@link Duration#ZERO} to disable locking.
101104
* @param lockTtl Lock TTL function must not be {@literal null}.
102105
* @param cacheStatisticsCollector must not be {@literal null}.
103106
* @param batchStrategy must not be {@literal null}.
@@ -116,12 +119,13 @@ class DefaultRedisCacheWriter implements RedisCacheWriter {
116119
this.lockTtl = lockTtl;
117120
this.statistics = cacheStatisticsCollector;
118121
this.batchStrategy = batchStrategy;
122+
this.asyncCacheWriter = isAsyncCacheSupportEnabled() ? new AsynchronousCacheWriterDelegate()
123+
: UnsupportedAsyncCacheWriter.INSTANCE;
124+
}
119125

120-
if (REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && this.connectionFactory instanceof ReactiveRedisConnectionFactory) {
121-
asyncCacheWriter = new AsynchronousCacheWriterDelegate();
122-
} else {
123-
asyncCacheWriter = UnsupportedAsyncCacheWriter.INSTANCE;
124-
}
126+
private boolean isAsyncCacheSupportEnabled() {
127+
return REACTIVE_REDIS_CONNECTION_FACTORY_PRESENT && FLUX_PRESENT
128+
&& this.connectionFactory instanceof ReactiveRedisConnectionFactory;
125129
}
126130

127131
@Override
@@ -168,7 +172,8 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
168172

169173
if (cachedValue != null) {
170174
statistics.incHits(name);
171-
} else {
175+
}
176+
else {
172177
statistics.incMisses(name);
173178
}
174179

@@ -186,8 +191,7 @@ public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {
186191
execute(name, connection -> {
187192

188193
if (shouldExpireWithin(ttl)) {
189-
connection.stringCommands().set(key, value, Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS),
190-
SetOption.upsert());
194+
connection.stringCommands().set(key, value, toExpiration(ttl), SetOption.upsert());
191195
} else {
192196
connection.stringCommands().set(key, value);
193197
}
@@ -224,16 +228,11 @@ public byte[] putIfAbsent(String name, byte[] key, byte[] value, @Nullable Durat
224228

225229
try {
226230

227-
boolean put;
228-
229-
if (shouldExpireWithin(ttl)) {
230-
put = ObjectUtils.nullSafeEquals(
231-
connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent()), true);
232-
} else {
233-
put = ObjectUtils.nullSafeEquals(connection.stringCommands().setNX(key, value), true);
234-
}
231+
Boolean wasSet = shouldExpireWithin(ttl)
232+
? connection.stringCommands().set(key, value, Expiration.from(ttl), SetOption.ifAbsent())
233+
: connection.stringCommands().setNX(key, value);
235234

236-
if (put) {
235+
if (Boolean.TRUE.equals(wasSet)) {
237236
statistics.incPuts(name);
238237
return null;
239238
}
@@ -322,9 +321,11 @@ void lock(String name) {
322321
private Boolean doLock(String name, Object contextualKey, @Nullable Object contextualValue,
323322
RedisConnection connection) {
324323

325-
Expiration expiration = Expiration.from(this.lockTtl.getTimeToLive(contextualKey, contextualValue));
324+
byte[] cacheLockKey = createCacheLockKey(name);
326325

327-
return connection.stringCommands().set(createCacheLockKey(name), new byte[0], expiration, SetOption.SET_IF_ABSENT);
326+
Expiration expiration = toExpiration(contextualKey, contextualValue);
327+
328+
return connection.stringCommands().set(cacheLockKey, new byte[0], expiration, SetOption.SET_IF_ABSENT);
328329
}
329330

330331
/**
@@ -378,29 +379,40 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
378379
Thread.sleep(this.sleepTime.toMillis());
379380
}
380381
} catch (InterruptedException cause) {
381-
382382
// Re-interrupt current Thread to allow other participants to react.
383383
Thread.currentThread().interrupt();
384-
385-
throw new PessimisticLockingFailureException(String.format("Interrupted while waiting to unlock cache %s", name),
386-
cause);
384+
String message = "Interrupted while waiting to unlock cache %s".formatted(name);
385+
throw new PessimisticLockingFailureException(message, cause);
387386
} finally {
388387
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
389388
}
390389
}
391390

392391
boolean doCheckLock(String name, RedisConnection connection) {
393-
return ObjectUtils.nullSafeEquals(connection.keyCommands().exists(createCacheLockKey(name)), true);
392+
Boolean cacheLockExists = connection.keyCommands().exists(createCacheLockKey(name));
393+
return Boolean.TRUE.equals(cacheLockExists);
394394
}
395395

396396
byte[] createCacheLockKey(String name) {
397397
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
398398
}
399399

400+
private ReactiveRedisConnectionFactory getReactiveConnectionFactory() {
401+
return (ReactiveRedisConnectionFactory) this.connectionFactory;
402+
}
403+
400404
private static boolean shouldExpireWithin(@Nullable Duration ttl) {
401405
return ttl != null && !ttl.isZero() && !ttl.isNegative();
402406
}
403407

408+
private Expiration toExpiration(Duration ttl) {
409+
return Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS);
410+
}
411+
412+
private Expiration toExpiration(Object key, @Nullable Object value) {
413+
return Expiration.from(this.lockTtl.getTimeToLive(key, value));
414+
}
415+
404416
/**
405417
* Interface for asynchronous cache retrieval.
406418
*
@@ -419,8 +431,8 @@ interface AsyncCacheWriter {
419431
* @param name the cache name from which to retrieve the cache entry.
420432
* @param key the cache entry key.
421433
* @param ttl optional TTL to set for Time-to-Idle eviction.
422-
* @return a future that completes either with a value if the value exists or completing with {@code null} if the
423-
* cache does not contain an entry.
434+
* @return a future that completes either with a value if the value exists or completing with {@code null}
435+
* if the cache does not contain an entry.
424436
*/
425437
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);
426438

@@ -463,8 +475,8 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
463475
}
464476

465477
/**
466-
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations using
467-
* {@link ReactiveRedisConnectionFactory}.
478+
* Delegate implementing {@link AsyncCacheWriter} to provide asynchronous cache retrieval and storage operations
479+
* using {@link ReactiveRedisConnectionFactory}.
468480
*
469481
* @since 3.2
470482
*/
@@ -481,11 +493,13 @@ public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Dur
481493
return doWithConnection(connection -> {
482494

483495
ByteBuffer wrappedKey = ByteBuffer.wrap(key);
496+
484497
Mono<?> cacheLockCheck = isLockingCacheWriter() ? waitForLock(connection, name) : Mono.empty();
498+
485499
ReactiveStringCommands stringCommands = connection.stringCommands();
486500

487501
Mono<ByteBuffer> get = shouldExpireWithin(ttl)
488-
? stringCommands.getEx(wrappedKey, Expiration.from(ttl))
502+
? stringCommands.getEx(wrappedKey, toExpiration(ttl))
489503
: stringCommands.get(wrappedKey);
490504

491505
return cacheLockCheck.then(get).map(ByteUtils::getBytes).toFuture();
@@ -498,75 +512,97 @@ public CompletableFuture<Void> store(String name, byte[] key, byte[] value, @Nul
498512
return doWithConnection(connection -> {
499513

500514
Mono<?> mono = isLockingCacheWriter()
501-
? doStoreWithLocking(name, key, value, ttl, connection)
515+
? doLockStoreUnlock(name, key, value, ttl, connection)
502516
: doStore(key, value, ttl, connection);
503517

504518
return mono.then().toFuture();
505519
});
506520
}
507521

508-
private Mono<Boolean> doStoreWithLocking(String name, byte[] key, byte[] value, @Nullable Duration ttl,
509-
ReactiveRedisConnection connection) {
510-
511-
return Mono.usingWhen(doLock(name, key, value, connection), unused -> doStore(key, value, ttl, connection),
512-
unused -> doUnlock(name, connection));
513-
}
514-
515522
private Mono<Boolean> doStore(byte[] cacheKey, byte[] value, @Nullable Duration ttl,
516523
ReactiveRedisConnection connection) {
517524

518525
ByteBuffer wrappedKey = ByteBuffer.wrap(cacheKey);
519526
ByteBuffer wrappedValue = ByteBuffer.wrap(value);
520527

521-
if (shouldExpireWithin(ttl)) {
522-
return connection.stringCommands().set(wrappedKey, wrappedValue,
523-
Expiration.from(ttl.toMillis(), TimeUnit.MILLISECONDS), SetOption.upsert());
524-
} else {
525-
return connection.stringCommands().set(wrappedKey, wrappedValue);
526-
}
528+
ReactiveStringCommands stringCommands = connection.stringCommands();
529+
530+
return shouldExpireWithin(ttl)
531+
? stringCommands.set(wrappedKey, wrappedValue, toExpiration(ttl), SetOption.upsert())
532+
: stringCommands.set(wrappedKey, wrappedValue);
527533
}
528534

535+
private Mono<Boolean> doLockStoreUnlock(String name, byte[] key, byte[] value, @Nullable Duration ttl,
536+
ReactiveRedisConnection connection) {
537+
538+
Mono<Object> lock = doLock(name, key, value, connection);
539+
540+
Function<Object, Mono<Boolean>> store = unused -> doStore(key, value, ttl, connection);
541+
Function<Object, Mono<Void>> unlock = unused -> doUnlock(name, connection);
542+
543+
return Mono.usingWhen(lock, store, unlock);
544+
}
529545

530546
private Mono<Object> doLock(String name, Object contextualKey, @Nullable Object contextualValue,
531547
ReactiveRedisConnection connection) {
532548

533-
ByteBuffer key = ByteBuffer.wrap(createCacheLockKey(name));
549+
ByteBuffer key = toCacheLockKey(name);
534550
ByteBuffer value = ByteBuffer.wrap(new byte[0]);
535-
Expiration expiration = Expiration.from(lockTtl.getTimeToLive(contextualKey, contextualValue));
551+
552+
Expiration expiration = toExpiration(contextualKey, contextualValue);
536553

537554
return connection.stringCommands().set(key, value, expiration, SetOption.SET_IF_ABSENT) //
538555
// Ensure we emit an object, otherwise, the Mono.usingWhen operator doesn't run the inner resource function.
539556
.thenReturn(Boolean.TRUE);
540557
}
541558

542559
private Mono<Void> doUnlock(String name, ReactiveRedisConnection connection) {
543-
return connection.keyCommands().del(ByteBuffer.wrap(createCacheLockKey(name))).then();
560+
return connection.keyCommands().del(toCacheLockKey(name)).then();
544561
}
545562

546563
private Mono<Void> waitForLock(ReactiveRedisConnection connection, String cacheName) {
547564

548-
AtomicLong lockWaitTimeNs = new AtomicLong();
549-
byte[] cacheLockKey = createCacheLockKey(cacheName);
565+
AtomicLong lockWaitNanoTime = new AtomicLong();
550566

551-
Flux<Long> wait = Flux.interval(Duration.ZERO, sleepTime);
552-
Mono<Boolean> exists = connection.keyCommands().exists(ByteBuffer.wrap(cacheLockKey)).filter(it -> !it);
567+
Consumer<org.reactivestreams.Subscription> setNanoTimeOnLockWait = subscription ->
568+
lockWaitNanoTime.set(System.nanoTime());
553569

554-
return wait.doOnSubscribe(subscription -> lockWaitTimeNs.set(System.nanoTime())) //
555-
.flatMap(it -> exists) //
556-
.doFinally(signalType -> statistics.incLockTime(cacheName, System.nanoTime() - lockWaitTimeNs.get())) //
570+
Consumer<SignalType> recordStatistics = signalType ->
571+
statistics.incLockTime(cacheName, System.nanoTime() - lockWaitNanoTime.get());
572+
573+
Function<Long, Mono<Boolean>> doWhileCacheLockExists = lockWaitTime -> connection.keyCommands()
574+
.exists(toCacheLockKey(cacheName)).filter(cacheLockKeyExists -> !cacheLockKeyExists);
575+
576+
return waitInterval(sleepTime) //
577+
.doOnSubscribe(setNanoTimeOnLockWait) //
578+
.flatMap(doWhileCacheLockExists) //
579+
.doFinally(recordStatistics) //
557580
.next() //
558581
.then();
559582
}
560583

584+
private Flux<Long> waitInterval(Duration period) {
585+
return Flux.interval(Duration.ZERO, period);
586+
}
587+
588+
private ByteBuffer toCacheLockKey(String cacheName) {
589+
return ByteBuffer.wrap(createCacheLockKey(cacheName));
590+
}
591+
561592
private <T> CompletableFuture<T> doWithConnection(
562593
Function<ReactiveRedisConnection, CompletableFuture<T>> callback) {
563594

564-
ReactiveRedisConnectionFactory cf = (ReactiveRedisConnectionFactory) connectionFactory;
595+
Mono<ReactiveRedisConnection> reactiveConnection =
596+
Mono.fromSupplier(getReactiveConnectionFactory()::getReactiveConnection);
597+
598+
Function<ReactiveRedisConnection, Mono<T>> commandExecution = connection ->
599+
Mono.fromCompletionStage(callback.apply(connection));
600+
601+
Function<ReactiveRedisConnection, Mono<Void>> connectionClose = ReactiveRedisConnection::closeLater;
602+
603+
Mono<T> result = Mono.usingWhen(reactiveConnection, commandExecution, connectionClose);
565604

566-
return Mono.usingWhen(Mono.fromSupplier(cf::getReactiveConnection), //
567-
it -> Mono.fromCompletionStage(callback.apply(it)), //
568-
ReactiveRedisConnection::closeLater) //
569-
.toFuture();
605+
return result.toFuture();
570606
}
571607
}
572608
}

0 commit comments

Comments
 (0)