diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceIdProviderImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceIdProviderImpl.java new file mode 100644 index 0000000000..01bb7b11e3 --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/InstanceIdProviderImpl.java @@ -0,0 +1,19 @@ +package com.appsmith.server.configurations; + +import com.appsmith.caching.components.InstanceIdProvider; +import com.appsmith.server.services.ConfigService; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +@Component +@RequiredArgsConstructor +public class InstanceIdProviderImpl implements InstanceIdProvider { + + private final ConfigService configService; + + @Override + public Mono getInstanceId() { + return configService.getInstanceId(); + } +} diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java index a03b1eaada..55e0387ed0 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/DistributedLockAspectTest.java @@ -1,8 +1,10 @@ package com.appsmith.server.aspect; +import com.appsmith.caching.components.InstanceIdProvider; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.data.redis.core.ReactiveRedisOperations; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -16,6 +18,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; @SpringBootTest class DistributedLockAspectTest { @@ -26,45 +29,61 @@ class DistributedLockAspectTest { @Autowired private ReactiveRedisOperations redisOperations; - private static final String LOCK_PREFIX = "lock:"; + @MockBean + private InstanceIdProvider instanceIdProvider; + + private static final String LOCK_PREFIX = "lock"; + private static final String TEST_INSTANCE_ID = "test-instance-123"; + + private String getLockKey(String key) { + return LOCK_PREFIX + ":" + TEST_INSTANCE_ID + ":" + key; + } @Test void testMonoOperation() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + StepVerifier.create(testLockService.monoOperation()) .expectNext("mono-success") .verifyComplete(); // Verify lock is released - StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "mono-test")) + StepVerifier.create(redisOperations.hasKey(getLockKey("mono-test"))) .expectNext(false) .verifyComplete(); } @Test void testFluxOperation() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + StepVerifier.create(testLockService.fluxOperation().collectList()) .expectNext(List.of("flux-success-1", "flux-success-2")) .verifyComplete(); // Verify lock is released - StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "flux-test")) + StepVerifier.create(redisOperations.hasKey(getLockKey("flux-test"))) .expectNext(false) .verifyComplete(); } @Test void testBlockingOperation() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + String result = testLockService.blockingOperation(); assertEquals("blocking-success", result); // Verify lock is released - StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "blocking-test")) + StepVerifier.create(redisOperations.hasKey(getLockKey("blocking-test"))) .expectNext(false) .verifyComplete(); } @Test void testConcurrentAccess() throws InterruptedException { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + AtomicReference thread1Result = new AtomicReference<>(); AtomicReference thread2Result = new AtomicReference<>(); CountDownLatch thread1Started = new CountDownLatch(1); @@ -106,13 +125,15 @@ class DistributedLockAspectTest { @Test void testPersistentLock() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + // First operation acquires lock and doesn't release it StepVerifier.create(testLockService.operationWithPersistentLock()) .expectNext("success") .verifyComplete(); // Verify lock still exists after operation completes - StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "persistent-lock")) + StepVerifier.create(redisOperations.hasKey(getLockKey("persistent-lock"))) .expectNext(true) .verifyComplete(); @@ -121,20 +142,22 @@ class DistributedLockAspectTest { .verifyComplete(); // Completes empty because lock is still held // Cleanup: Release lock for other tests - StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations)) + StepVerifier.create(testLockService.releaseLock("persistent-lock", redisOperations, TEST_INSTANCE_ID)) .expectNext(1L) .verifyComplete(); } @Test void testPersistentLockExpiration() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + // Execute operation with short-lived lock StepVerifier.create(Mono.just(testLockService.operationWithShortLivedLock())) .expectNext("success") .verifyComplete(); // Verify lock exists immediately after - StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock")) + StepVerifier.create(redisOperations.hasKey(getLockKey("short-lived-lock"))) .expectNext(true) .verifyComplete(); @@ -146,43 +169,49 @@ class DistributedLockAspectTest { } // Verify lock has expired - StepVerifier.create(redisOperations.hasKey(LOCK_PREFIX + "short-lived-lock")) + StepVerifier.create(redisOperations.hasKey(getLockKey("short-lived-lock"))) .expectNext(false) .verifyComplete(); } @Test void testLockReleasedOnBlockingError() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + // Execute operation that throws error assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError()); // Verify lock is released despite shouldReleaseLock = false - StepVerifier.create(redisOperations.hasKey("lock:error-lock")) + StepVerifier.create(redisOperations.hasKey(getLockKey("error-lock"))) .expectNext(false) .verifyComplete(); } @Test void testLockReleasedOnReactiveError() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + // Execute operation that returns Mono.error StepVerifier.create(testLockService.reactiveMethodWithError()) .expectError(RuntimeException.class) .verify(); // Verify lock is released despite shouldReleaseLock = false - StepVerifier.create(redisOperations.hasKey("lock:error-lock")) + StepVerifier.create(redisOperations.hasKey(getLockKey("error-lock"))) .expectNext(false) .verifyComplete(); } @Test void testLockReleasedOnErrorAllowsSubsequentExecution() { + when(instanceIdProvider.getInstanceId()).thenReturn(Mono.just(TEST_INSTANCE_ID)); + // First call throws error assertThrows(RuntimeException.class, () -> testLockService.blockingMethodWithError()); // Verify we can acquire the same lock immediately after error AtomicBoolean lockAcquired = new AtomicBoolean(false); - StepVerifier.create(redisOperations.opsForValue().setIfAbsent("lock:error-lock", "test-value")) + StepVerifier.create(redisOperations.opsForValue().setIfAbsent(getLockKey("error-lock"), "test-value")) .consumeNextWith(result -> lockAcquired.set(result)) .verifyComplete(); @@ -190,6 +219,6 @@ class DistributedLockAspectTest { assertTrue(lockAcquired.get()); // Cleanup - redisOperations.delete("lock:error-lock").block(); + redisOperations.delete(getLockKey("error-lock")).block(); } } diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java index 0ff5a8d193..6ebf814cc4 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/aspect/TestLockService.java @@ -42,8 +42,9 @@ public class TestLockService { } // Method to manually release the lock (for testing cleanup) - public Mono releaseLock(String lockKey, ReactiveRedisOperations redisOperations) { - return redisOperations.delete("lock:" + lockKey); + public Mono releaseLock( + String lockKey, ReactiveRedisOperations redisOperations, String instanceId) { + return redisOperations.delete("lock:" + instanceId + ":" + lockKey); } @DistributedLock(key = "error-lock", shouldReleaseLock = false) diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java index 3bf198ecca..2c60deabea 100644 --- a/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/aspects/DistributedLockAspect.java @@ -1,6 +1,7 @@ package com.appsmith.caching.aspects; import com.appsmith.caching.annotations.DistributedLock; +import com.appsmith.caching.components.InstanceIdProvider; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -23,11 +24,14 @@ import java.time.temporal.ChronoUnit; public class DistributedLockAspect { private final ReactiveRedisOperations redisOperations; + private final InstanceIdProvider instanceIdProvider; - private static final String LOCK_PREFIX = "lock:"; + private static final String LOCK_PREFIX = "lock"; - public DistributedLockAspect(ReactiveRedisOperations redisOperations) { + public DistributedLockAspect( + ReactiveRedisOperations redisOperations, InstanceIdProvider instanceIdProvider) { this.redisOperations = redisOperations; + this.instanceIdProvider = instanceIdProvider; } // Method to acquire a distributed lock before executing the annotated method. @@ -57,12 +61,14 @@ public class DistributedLockAspect { } } - private LockDetails createLockDetails(DistributedLock lock) { - String lockKey = LOCK_PREFIX + lock.key(); - long ttl = lock.ttl(); - String value = - "locked until " + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); - return new LockDetails(lockKey, value, Duration.ofSeconds(ttl)); + private Mono createLockDetails(DistributedLock lock) { + return instanceIdProvider.getInstanceId().defaultIfEmpty("unknown").map(instanceId -> { + String lockKey = LOCK_PREFIX + ":" + instanceId + ":" + lock.key(); + long ttl = lock.ttl(); + String value = "locked until " + + Instant.now().plus(ttl, ChronoUnit.SECONDS).toString(); + return new LockDetails(lockKey, value, Duration.ofSeconds(ttl)); + }); } private void releaseLock(String lockKey) { @@ -80,9 +86,7 @@ public class DistributedLockAspect { } private Object handleMono(ProceedingJoinPoint joinPoint, DistributedLock lock) { - LockDetails lockDetails = createLockDetails(lock); - - return redisOperations + return createLockDetails(lock).flatMap(lockDetails -> redisOperations .opsForValue() .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) .flatMap(acquired -> { @@ -103,13 +107,11 @@ public class DistributedLockAspect { } log.info("Lock already acquired for: {}", lockDetails.key); return Mono.empty(); - }); + })); } private Object handleFlux(ProceedingJoinPoint joinPoint, DistributedLock lock) { - LockDetails lockDetails = createLockDetails(lock); - - return redisOperations + return createLockDetails(lock).flatMapMany(lockDetails -> redisOperations .opsForValue() .setIfAbsent(lockDetails.key, lockDetails.value, lockDetails.duration) .flatMapMany(acquired -> { @@ -130,11 +132,11 @@ public class DistributedLockAspect { } log.info("Lock already acquired for: {}", lockDetails.key); return Flux.empty(); - }); + })); } private Object handleBlocking(ProceedingJoinPoint joinPoint, DistributedLock lock) throws Throwable { - LockDetails lockDetails = createLockDetails(lock); + LockDetails lockDetails = createLockDetails(lock).block(); Boolean acquired = null; try { diff --git a/app/server/reactive-caching/src/main/java/com/appsmith/caching/components/InstanceIdProvider.java b/app/server/reactive-caching/src/main/java/com/appsmith/caching/components/InstanceIdProvider.java new file mode 100644 index 0000000000..08cd0f68f5 --- /dev/null +++ b/app/server/reactive-caching/src/main/java/com/appsmith/caching/components/InstanceIdProvider.java @@ -0,0 +1,16 @@ +package com.appsmith.caching.components; + +import reactor.core.publisher.Mono; + +/** + * Interface to provide instanceId for distributed lock keys. + * This allows the reactive-caching module to get instanceId without depending on higher-level modules. + */ +public interface InstanceIdProvider { + + /** + * Get the instance ID for this Appsmith instance + * @return Mono containing the instance ID + */ + Mono getInstanceId(); +} diff --git a/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/TestConfig.java b/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/TestConfig.java new file mode 100644 index 0000000000..20842faee1 --- /dev/null +++ b/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/TestConfig.java @@ -0,0 +1,14 @@ +package com.appsmith.testcaching; + +import com.appsmith.caching.components.InstanceIdProvider; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; + +@TestConfiguration +public class TestConfig { + + @Bean + public InstanceIdProvider instanceIdProvider() { + return new TestInstanceIdProvider(); + } +} diff --git a/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/TestInstanceIdProvider.java b/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/TestInstanceIdProvider.java new file mode 100644 index 0000000000..52bffa52b3 --- /dev/null +++ b/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/TestInstanceIdProvider.java @@ -0,0 +1,14 @@ +package com.appsmith.testcaching; + +import com.appsmith.caching.components.InstanceIdProvider; +import reactor.core.publisher.Mono; + +public class TestInstanceIdProvider implements InstanceIdProvider { + + private static final String TEST_INSTANCE_ID = "test-instance-123"; + + @Override + public Mono getInstanceId() { + return Mono.just(TEST_INSTANCE_ID); + } +} diff --git a/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/test/TestCachingMethods.java b/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/test/TestCachingMethods.java index 03118403f2..e047b8d541 100644 --- a/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/test/TestCachingMethods.java +++ b/app/server/reactive-caching/src/test/java/com/appsmith/testcaching/test/TestCachingMethods.java @@ -1,6 +1,7 @@ package com.appsmith.testcaching.test; import com.appsmith.caching.components.CacheManager; +import com.appsmith.testcaching.TestConfig; import com.appsmith.testcaching.model.ArgumentModel; import com.appsmith.testcaching.model.TestModel; import com.appsmith.testcaching.service.CacheTestService; @@ -10,6 +11,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ContextConfiguration; import java.util.List; @@ -20,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; @SpringBootTest @TestInstance(TestInstance.Lifecycle.PER_CLASS) @Slf4j +@ContextConfiguration(classes = TestConfig.class) public class TestCachingMethods { @Autowired