feat: Add instanceId to distributed lock keys for multi-instance isolation (#40966)

This commit is contained in:
Abhijeet 2025-06-19 12:20:59 +05:30 committed by GitHub
parent ad36f76920
commit 24ec7954a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 129 additions and 31 deletions

View File

@ -0,0 +1,19 @@
package com.appsmith.server.configurations;
import com.appsmith.caching.components.InstanceIdProvider;
import com.appsmith.server.services.ConfigService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
@RequiredArgsConstructor
public class InstanceIdProviderImpl implements InstanceIdProvider {
private final ConfigService configService;
@Override
public Mono<String> getInstanceId() {
return configService.getInstanceId();
}
}

View File

@ -1,8 +1,10 @@
package com.appsmith.server.aspect;
import com.appsmith.caching.components.InstanceIdProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ -16,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@SpringBootTest
class DistributedLockAspectTest {
@ -26,45 +29,61 @@ class DistributedLockAspectTest {
@Autowired
private ReactiveRedisOperations<String, String> redisOperations;
private static final String LOCK_PREFIX = "lock:";
@MockBean
private InstanceIdProvider instanceIdProvider;
private static final String LOCK_PREFIX = "lock";
private static final String TEST_INSTANCE_ID = "test-instance-123";
private String getLockKey(String key) {
return LOCK_PREFIX + ":" + TEST_INSTANCE_ID + ":" + key;
}
@Test
void testMonoOperation() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
StepVerifier.create(testLockService.monoOperation())
.expectNext("mono-success")
.verifyComplete();
// Verify lock is released
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "mono-test"))
StepVerifier.create(redisOperations.hasKey(getLockKey("mono-test")))
.expectNext(false)
.verifyComplete();
}
@Test
void testFluxOperation() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
StepVerifier.create(testLockService.fluxOperation().collectList())
.expectNext(List.of("flux-success-1", "flux-success-2"))
.verifyComplete();
// Verify lock is released
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "flux-test"))
StepVerifier.create(redisOperations.hasKey(getLockKey("flux-test")))
.expectNext(false)
.verifyComplete();
}
@Test
void testBlockingOperation() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
String result = testLockService.blockingOperation();
assertEquals("blocking-success", result);
// Verify lock is released
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "blocking-test"))
StepVerifier.create(redisOperations.hasKey(getLockKey("blocking-test")))
.expectNext(false)
.verifyComplete();
}
@Test
void testConcurrentAccess() throws InterruptedException {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
AtomicReference<String> thread1Result = new AtomicReference<>();
AtomicReference<String> thread2Result = new AtomicReference<>();
CountDownLatch thread1Started = new CountDownLatch(1);
@ -106,13 +125,15 @@ class DistributedLockAspectTest {
@Test
void testPersistentLock() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
// First operation acquires lock and doesn't release it
StepVerifier.create(testLockService.operationWithPersistentLock())
.expectNext("success")
.verifyComplete();
// Verify lock still exists after operation completes
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "persistent-lock"))
StepVerifier.create(redisOperations.hasKey(getLockKey("persistent-lock")))
.expectNext(true)
.verifyComplete();
@ -121,20 +142,22 @@ class DistributedLockAspectTest {
.verifyComplete(); // Completes empty because lock is still held
// Cleanup: Release lock for other tests
StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations))
StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations, TEST_INSTANCE_ID))
.expectNext(1L)
.verifyComplete();
}
@Test
void testPersistentLockExpiration() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
// Execute operation with short-lived lock
StepVerifier.create(Mono.just(testLockService.operationWithShortLivedLock()))
.expectNext("success")
.verifyComplete();
// Verify lock exists immediately after
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock"))
StepVerifier.create(redisOperations.hasKey(getLockKey("short-lived-lock")))
.expectNext(true)
.verifyComplete();
@ -146,43 +169,49 @@ class DistributedLockAspectTest {
}
// Verify lock has expired
StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock"))
StepVerifier.create(redisOperations.hasKey(getLockKey("short-lived-lock")))
.expectNext(false)
.verifyComplete();
}
@Test
void testLockReleasedOnBlockingError() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
// Execute operation that throws error
assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError());
// Verify lock is released despite shouldReleaseLock = false
StepVerifier.create(redisOperations.hasKey("lock:error-lock"))
StepVerifier.create(redisOperations.hasKey(getLockKey("error-lock")))
.expectNext(false)
.verifyComplete();
}
@Test
void testLockReleasedOnReactiveError() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
// Execute operation that returns Mono.error
StepVerifier.create(testLockService.reactiveMethodWithError())
.expectError(RuntimeException.class)
.verify();
// Verify lock is released despite shouldReleaseLock = false
StepVerifier.create(redisOperations.hasKey("lock:error-lock"))
StepVerifier.create(redisOperations.hasKey(getLockKey("error-lock")))
.expectNext(false)
.verifyComplete();
}
@Test
void testLockReleasedOnErrorAllowsSubsequentExecution() {
when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID));
// First call throws error
assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError());
// Verify we can acquire the same lock immediately after error
AtomicBoolean lockAcquired = new AtomicBoolean(false);
StepVerifier.create(redisOperations.opsForValue().setIfAbsent("lock:error-lock", "test-value"))
StepVerifier.create(redisOperations.opsForValue().setIfAbsent(getLockKey("error-lock"), "test-value"))
.consumeNextWith(result -> lockAcquired.set(result))
.verifyComplete();
@ -190,6 +219,6 @@ class DistributedLockAspectTest {
assertTrue(lockAcquired.get());
// Cleanup
redisOperations.delete("lock:error-lock").block();
redisOperations.delete(getLockKey("error-lock")).block();
}
}

View File

@ -42,8 +42,9 @@ public class TestLockService {
}
// Method to manually release the lock (for testing cleanup)
public Mono<Long> releaseLock(String lockKey, ReactiveRedisOperations<String, String> redisOperations) {
return redisOperations.delete("lock:" + lockKey);
public Mono<Long> releaseLock(
String lockKey, ReactiveRedisOperations<String, String> redisOperations, String instanceId) {
return redisOperations.delete("lock:" + instanceId + ":" + lockKey);
}
@DistributedLock(key = "error-lock", shouldReleaseLock = false)

View File

@ -1,6 +1,7 @@
package com.appsmith.caching.aspects;
import com.appsmith.caching.annotations.DistributedLock;
import com.appsmith.caching.components.InstanceIdProvider;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
@ -23,11 +24,14 @@ import java.time.temporal.ChronoUnit;
public class DistributedLockAspect {
private final ReactiveRedisOperations<String, String> redisOperations;
private final InstanceIdProvider instanceIdProvider;
private static final String LOCK_PREFIX = "lock:";
private static final String LOCK_PREFIX = "lock";
public DistributedLockAspect(ReactiveRedisOperations<String, String> redisOperations) {
public DistributedLockAspect(
ReactiveRedisOperations<String, String> redisOperations, InstanceIdProvider instanceIdProvider) {
this.redisOperations = redisOperations;
this.instanceIdProvider = instanceIdProvider;
}
// Method to acquire a distributed lock before executing the annotated method.
@ -57,12 +61,14 @@ public class DistributedLockAspect {
}
}
private LockDetails createLockDetails(DistributedLock lock) {
String lockKey = LOCK_PREFIX + lock.key();
long ttl = lock.ttl();
String value =
"locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString();
return new LockDetails(lockKey, value, Duration.ofSeconds(ttl));
private Mono<LockDetails> createLockDetails(DistributedLock lock) {
return instanceIdProvider.getInstanceId().defaultIfEmpty("unknown").map(instanceId -> {
String lockKey = LOCK_PREFIX + ":" + instanceId + ":" + lock.key();
long ttl = lock.ttl();
String value = "locked until "
+ Instant.now().plus(ttl, ChronoUnit.SECONDS).toString();
return new LockDetails(lockKey, value, Duration.ofSeconds(ttl));
});
}
private void releaseLock(String lockKey) {
@ -80,9 +86,7 @@ public class DistributedLockAspect {
}
private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) {
LockDetails lockDetails = createLockDetails(lock);
return redisOperations
return createLockDetails(lock).flatMap(lockDetails -> redisOperations
.opsForValue()
.setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration)
.flatMap(acquired -> {
@ -103,13 +107,11 @@ public class DistributedLockAspect {
}
log.info("Lock already acquired for: {}", lockDetails.key);
return Mono.empty();
});
}));
}
private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) {
LockDetails lockDetails = createLockDetails(lock);
return redisOperations
return createLockDetails(lock).flatMapMany(lockDetails -> redisOperations
.opsForValue()
.setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration)
.flatMapMany(acquired -> {
@ -130,11 +132,11 @@ public class DistributedLockAspect {
}
log.info("Lock already acquired for: {}", lockDetails.key);
return Flux.empty();
});
}));
}
private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable {
LockDetails lockDetails = createLockDetails(lock);
LockDetails lockDetails = createLockDetails(lock).block();
Boolean acquired = null;
try {

View File

@ -0,0 +1,16 @@
package com.appsmith.caching.components;
import reactor.core.publisher.Mono;
/**
* Interface to provide instanceId for distributed lock keys.
* This allows the reactive-caching module to get instanceId without depending on higher-level modules.
*/
public interface InstanceIdProvider {
/**
* Get the instance ID for this Appsmith instance
* @return Mono containing the instance ID
*/
Mono<String> getInstanceId();
}

View File

@ -0,0 +1,14 @@
package com.appsmith.testcaching;
import com.appsmith.caching.components.InstanceIdProvider;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
@TestConfiguration
public class TestConfig {
@Bean
public InstanceIdProvider instanceIdProvider() {
return new TestInstanceIdProvider();
}
}

View File

@ -0,0 +1,14 @@
package com.appsmith.testcaching;
import com.appsmith.caching.components.InstanceIdProvider;
import reactor.core.publisher.Mono;
public class TestInstanceIdProvider implements InstanceIdProvider {
private static final String TEST_INSTANCE_ID = "test-instance-123";
@Override
public Mono<String> getInstanceId() {
return Mono.just(TEST_INSTANCE_ID);
}
}

View File

@ -1,6 +1,7 @@
package com.appsmith.testcaching.test;
import com.appsmith.caching.components.CacheManager;
import com.appsmith.testcaching.TestConfig;
import com.appsmith.testcaching.model.ArgumentModel;
import com.appsmith.testcaching.model.TestModel;
import com.appsmith.testcaching.service.CacheTestService;
@ -10,6 +11,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import java.util.List;
@ -20,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Slf4j
@ContextConfiguration(classes = TestConfig.class)
public class TestCachingMethods {
@Autowired