diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration065_CopyTenantIdToOrganizationId.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration065_CopyTenantIdToOrganizationId.java index 7cf8d0945f..77aee87773 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration065_CopyTenantIdToOrganizationId.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration065_CopyTenantIdToOrganizationId.java @@ -57,7 +57,10 @@ public class Migration065_CopyTenantIdToOrganizationId { public void execute() { migrateTenantCollection(); migrateMongoCollections(); - migrateRedisData(); + + // Removing this for environments where the migration hasn't run yet since for k8 clusters with default + // configurations, the redis session migration is taking longer than startup probe (which is 2 minutes). + // migrateRedisData(); } private void migrateTenantCollection() { diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration066_CacheBustTenantOrgMigration.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration066_CacheBustTenantOrgMigration.java new file mode 100644 index 0000000000..295b5bf232 --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/migrations/db/ce/Migration066_CacheBustTenantOrgMigration.java @@ -0,0 +1,58 @@ +package com.appsmith.server.migrations.db.ce; + +import io.mongock.api.annotations.ChangeUnit; +import io.mongock.api.annotations.Execution; +import io.mongock.api.annotations.RollbackExecution; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.core.ReactiveRedisOperations; +import org.springframework.data.redis.core.ScanOptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +@ChangeUnit(order = "066", id = "cache-bust-tenant-org-migration") +public class Migration066_CacheBustTenantOrgMigration { + @RollbackExecution + public void rollbackExecution() {} + + @Execution + public void execute( + @Qualifier("reactiveRedisOperations") ReactiveRedisOperations reactiveRedisOperations) { + log.info("Starting cache bust migration"); + scanForKeysAcrossCluster(reactiveRedisOperations, "*").block(); + log.info("Completed cache bust migration"); + } + + private Mono scanForKeysAcrossCluster( + ReactiveRedisOperations reactiveRedisOperations, String pattern) { + AtomicInteger deletedKeysCount = new AtomicInteger(0); + + return reactiveRedisOperations + .execute(connection -> { + Flux scanFlux = connection + .keyCommands() + .scan(ScanOptions.scanOptions() + .match(pattern) + .count(1000) + .build()); + + return scanFlux.flatMap(scannedKey -> connection + .keyCommands() + .del(scannedKey) + .doOnSuccess(result -> { + int count = deletedKeysCount.incrementAndGet(); + if (count % 10000 == 0) { + log.info("Processed {} Redis keys", count); + } + })) + .then() + .doOnSuccess(v -> log.info("Total Redis keys processed: {}", deletedKeysCount.get())) + .doOnError(error -> log.error("Redis key deletion error: {}", error.getMessage())); + }) + .then(); + } +}