diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/CommonConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/CommonConfig.java index 72545773fb..aedf383937 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/CommonConfig.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/CommonConfig.java @@ -1,5 +1,6 @@ package com.appsmith.server.configurations; +import com.appsmith.server.helpers.LoadShifter; import com.appsmith.util.JSONPrettyPrinter; import com.appsmith.util.SerializationUtils; import com.fasterxml.jackson.core.PrettyPrinter; @@ -19,7 +20,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.time.Instant; import java.util.ArrayList; @@ -34,7 +34,6 @@ import java.util.Set; @Configuration public class CommonConfig { - private static final String ELASTIC_THREAD_POOL_NAME = "appsmith-elastic-pool"; public static final Integer LATEST_INSTANCE_SCHEMA_VERSION = 2; @Setter(AccessLevel.NONE) @@ -77,11 +76,8 @@ public class CommonConfig { private static String adminEmailDomainHash; @Bean - public Scheduler scheduler() { - return Schedulers.newBoundedElastic( - Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, - Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, - ELASTIC_THREAD_POOL_NAME); + public Scheduler elasticScheduler() { + return LoadShifter.elasticScheduler; } @Bean diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/LoadShifter.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/LoadShifter.java new file mode 100644 index 0000000000..b1a9f4c928 --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/LoadShifter.java @@ -0,0 +1,47 @@ +package com.appsmith.server.helpers; + +import lombok.extern.slf4j.Slf4j; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +/** + * This class is used configure the load shifts for the Monos. + * The schedulers which are configured in the CommonConfig class. + */ +@Slf4j +public class LoadShifter { + private static final String PARALLEL_THREAD_POOL_NAME = "appsmith-parallel-pool"; + private static final String ELASTIC_THREAD_POOL_NAME = "appsmith-elastic-pool"; + public static final Scheduler parallelScheduler = Schedulers.newParallel(PARALLEL_THREAD_POOL_NAME); + + public static final Scheduler elasticScheduler = Schedulers.newBoundedElastic( + Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE, + Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, + ELASTIC_THREAD_POOL_NAME); + + /** + * This method is used to shift the subscription from the current thread to the elastic scheduler + * and then publish the result on the parallel scheduler. + * @param mono The mono to be shifted. + * This mono will be subscribed on the elastic scheduler and the result will be published on the parallel scheduler. + * @param message The message to be logged. + * @return The shifted mono. + * @param The type of the mono. + */ + public static Mono subscribeOnElasticPublishOnParallel(Mono mono, String message) { + return mono.subscribeOn(elasticScheduler).publishOn(parallelScheduler); + } + + /** + * This method is used to shift the subscription from the current thread to the elastic scheduler. + * @param mono The mono to be shifted. + * This mono will be subscribed on the elastic scheduler. + * @param message The message to be logged. + * @return The shifted mono. + * @param The type of the mono. + */ + public static Mono subscribeOnElastic(Mono mono, String message) { + return mono.subscribeOn(elasticScheduler); + } +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index 5a1b029916..b35035d4fb 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -9,6 +9,7 @@ import com.appsmith.server.domains.Config; import com.appsmith.server.dtos.ResponseDTO; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; +import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.helpers.NetworkUtils; import com.appsmith.server.helpers.RTSCaller; import com.appsmith.server.services.AnalyticsService; @@ -125,7 +126,7 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE { analyticsProperties, false); }) - .subscribeOn(commonConfig.scheduler()) + .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/plugins/base/PluginServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/plugins/base/PluginServiceCEImpl.java index ca2c8d5009..f29d56e980 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/plugins/base/PluginServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/plugins/base/PluginServiceCEImpl.java @@ -11,6 +11,7 @@ import com.appsmith.server.dtos.PluginWorkspaceDTO; import com.appsmith.server.dtos.WorkspacePluginStatus; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; +import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.repositories.PluginRepository; import com.appsmith.server.services.AnalyticsService; import com.appsmith.server.services.BaseService; @@ -403,7 +404,13 @@ public class PluginServiceCEImpl extends BaseService> loadPluginResource(String pluginId, String resourcePath) { - return findById(pluginId).map(plugin -> { + return findById(pluginId).flatMap(plugin -> { if ("editor.json".equals(resourcePath)) { // UI config will be available if this plugin is sourced from the cloud if (plugin.getActionUiConfig() != null) { - return plugin.getActionUiConfig(); + return Mono.just(plugin.getActionUiConfig()); } // For UQI, use another format of loading the config if (UQI_DB_EDITOR_FORM.equals(plugin.getUiComponent())) { - return loadEditorPluginResourceUqi(plugin); + return Mono.just(loadEditorPluginResourceUqi(plugin)); } } if ("form.json".equals(resourcePath)) { // UI config will be available if this plugin is sourced from the cloud if (plugin.getDatasourceUiConfig() != null) { - return plugin.getDatasourceUiConfig(); + return Mono.just(plugin.getDatasourceUiConfig()); } } - return loadPluginResourceGivenPluginAsMap(plugin, resourcePath); + /* + * The method loadPluginResourceGivenPluginAsMap is reads a file from the system, and this is a blocking + * process. Since, we need to keep the nioEventLoop thread pool free, we are shifting the subscription to + * elastic thread pool and then publishing the result on the parallel thread pool. + */ + Mono> pluginResourceMono = + Mono.fromCallable(() -> loadPluginResourceGivenPluginAsMap(plugin, resourcePath)); + + return LoadShifter.subscribeOnElasticPublishOnParallel(pluginResourceMono, "pluginResourceMono"); }); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/ratelimiting/ce/RateLimitServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/ratelimiting/ce/RateLimitServiceCEImpl.java index 49687bdef9..174a58ff79 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/ratelimiting/ce/RateLimitServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/ratelimiting/ce/RateLimitServiceCEImpl.java @@ -2,13 +2,12 @@ package com.appsmith.server.ratelimiting.ce; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; +import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.helpers.RedisUtils; import com.appsmith.server.ratelimiting.RateLimitConfig; import io.github.bucket4j.distributed.BucketProxy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.time.Duration; import java.util.Map; @@ -19,7 +18,6 @@ import static java.lang.Boolean.TRUE; @Slf4j public class RateLimitServiceCEImpl implements RateLimitServiceCE { - private final Scheduler scheduler = Schedulers.boundedElastic(); private final Map apiBuckets; private final RateLimitConfig rateLimitConfig; // this number of tokens can later be customised per API in the configuration. @@ -58,7 +56,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE { }) // Since we are interacting with redis, we want to make sure that the operation is done on a separate // thread pool - .subscribeOn(scheduler); + .subscribeOn(LoadShifter.elasticScheduler); } @Override @@ -75,7 +73,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE { .then() // Since we are interacting with redis, we want to make sure that the operation is done on a separate // thread pool - .subscribeOn(scheduler); + .subscribeOn(LoadShifter.elasticScheduler); } /* **************************************************************************************************** */ @@ -112,7 +110,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE { }) // Since we are interacting with redis, we want to make sure that the operation is done on a separate // thread pool - .subscribeOn(scheduler); + .subscribeOn(LoadShifter.elasticScheduler); } /* @@ -139,7 +137,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE { }) // Since we are interacting with redis, we want to make sure that the operation is done on a separate // thread pool - .subscribeOn(scheduler); + .subscribeOn(LoadShifter.elasticScheduler); } private Mono sanitizeInput(String apiIdentifier, String userIdentifier) { @@ -156,6 +154,6 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE { return Mono.just(true); }) - .subscribeOn(scheduler); + .subscribeOn(LoadShifter.elasticScheduler); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/TenantServiceImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/TenantServiceImpl.java index fba38e745a..53fdf6b1be 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/TenantServiceImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/TenantServiceImpl.java @@ -10,13 +10,11 @@ import io.micrometer.observation.ObservationRegistry; import jakarta.validation.Validator; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; -import reactor.core.scheduler.Scheduler; @Service public class TenantServiceImpl extends TenantServiceCEImpl implements TenantService { public TenantServiceImpl( - Scheduler scheduler, Validator validator, TenantRepository repository, AnalyticsService analyticsService, diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/ProductAlertServiceCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/ProductAlertServiceCEImpl.java index f034595494..0cb04d5716 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/ProductAlertServiceCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/ce/ProductAlertServiceCEImpl.java @@ -4,6 +4,7 @@ import com.appsmith.server.configurations.CommonConfig; import com.appsmith.server.dtos.ProductAlertResponseDTO; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; +import com.appsmith.server.helpers.LoadShifter; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.expression.EvaluationContext; @@ -14,8 +15,6 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -31,8 +30,6 @@ public class ProductAlertServiceCEImpl implements ProductAlertServiceCE { private final ProductAlertResponseDTO[] messages; - private final Scheduler scheduler = Schedulers.boundedElastic(); - public ProductAlertServiceCEImpl(ObjectMapper objectMapper, CommonConfig commonConfig) { this.commonConfig = commonConfig; this.mapper = objectMapper; @@ -64,7 +61,7 @@ public class ProductAlertServiceCEImpl implements ProductAlertServiceCE { log.error("exception while getting and filtering product alert messages", error); throw new AppsmithException(AppsmithError.INTERNAL_SERVER_ERROR, error.getMessage()); }) - .subscribeOn(scheduler); + .subscribeOn(LoadShifter.elasticScheduler); } public Boolean evaluateAlertApplicability(ProductAlertResponseDTO productAlertResponseDTO) { diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java index 6ab52cbf88..7b16eae7de 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java @@ -3,10 +3,9 @@ package com.appsmith.server.solutions; import com.appsmith.server.services.FeatureFlagService; import com.appsmith.server.services.TenantService; import com.appsmith.server.solutions.ce.ScheduledTaskCEImpl; -import reactor.core.scheduler.Scheduler; public class ScheduledTaskImpl extends ScheduledTaskCEImpl implements ScheduledTask { - public ScheduledTaskImpl(FeatureFlagService featureFlagService, TenantService tenantService, Scheduler scheduler) { - super(featureFlagService, tenantService, scheduler); + public ScheduledTaskImpl(FeatureFlagService featureFlagService, TenantService tenantService) { + super(featureFlagService, tenantService); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/UserSignupImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/UserSignupImpl.java index df46969d4e..29b03db32a 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/UserSignupImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/UserSignupImpl.java @@ -1,7 +1,6 @@ package com.appsmith.server.solutions; import com.appsmith.server.authentication.handlers.AuthenticationSuccessHandler; -import com.appsmith.server.configurations.CommonConfig; import com.appsmith.server.helpers.NetworkUtils; import com.appsmith.server.helpers.UserUtils; import com.appsmith.server.services.AnalyticsService; @@ -27,7 +26,6 @@ public class UserSignupImpl extends UserSignupCEImpl implements UserSignup { ConfigService configService, AnalyticsService analyticsService, EnvManager envManager, - CommonConfig commonConfig, UserUtils userUtils, NetworkUtils networkUtils, EmailService emailService, @@ -41,7 +39,6 @@ public class UserSignupImpl extends UserSignupCEImpl implements UserSignup { configService, analyticsService, envManager, - commonConfig, userUtils, networkUtils, emailService, diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index e49dedfcb8..9d385f2655 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -1,5 +1,6 @@ package com.appsmith.server.solutions.ce; +import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.services.FeatureFlagService; import com.appsmith.server.services.TenantService; import io.micrometer.observation.annotation.Observed; @@ -7,7 +8,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import reactor.core.scheduler.Scheduler; @RequiredArgsConstructor @Slf4j @@ -18,8 +18,6 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { private final TenantService tenantService; - private final Scheduler scheduler; - @Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 30 * 60 * 1000 /* thirty minutes */) @Observed(name = "fetchFeatures") public void fetchFeatures() { @@ -31,7 +29,7 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { .flatMap(featureFlagService::checkAndExecuteMigrationsForTenantFeatureFlags) .then(tenantService.restartTenant())) .doOnError(error -> log.error("Error while fetching tenant feature flags", error)) - .subscribeOn(scheduler) + .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/UserSignupCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/UserSignupCEImpl.java index f19dc32a01..931d850e7a 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/UserSignupCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/UserSignupCEImpl.java @@ -2,7 +2,6 @@ package com.appsmith.server.solutions.ce; import com.appsmith.external.constants.AnalyticsEvents; import com.appsmith.server.authentication.handlers.AuthenticationSuccessHandler; -import com.appsmith.server.configurations.CommonConfig; import com.appsmith.server.constants.FieldName; import com.appsmith.server.domains.LoginSource; import com.appsmith.server.domains.Tenant; @@ -14,6 +13,7 @@ import com.appsmith.server.dtos.UserSignupDTO; import com.appsmith.server.dtos.UserSignupRequestDTO; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; +import com.appsmith.server.helpers.LoadShifter; import com.appsmith.server.helpers.NetworkUtils; import com.appsmith.server.helpers.UserUtils; import com.appsmith.server.services.AnalyticsService; @@ -79,7 +79,6 @@ public class UserSignupCEImpl implements UserSignupCE { private final ConfigService configService; private final AnalyticsService analyticsService; private final EnvManager envManager; - private final CommonConfig commonConfig; private final UserUtils userUtils; private final NetworkUtils networkUtils; private final EmailService emailService; @@ -97,7 +96,6 @@ public class UserSignupCEImpl implements UserSignupCE { ConfigService configService, AnalyticsService analyticsService, EnvManager envManager, - CommonConfig commonConfig, UserUtils userUtils, NetworkUtils networkUtils, EmailService emailService, @@ -110,7 +108,6 @@ public class UserSignupCEImpl implements UserSignupCE { this.configService = configService; this.analyticsService = analyticsService; this.envManager = envManager; - this.commonConfig = commonConfig; this.userUtils = userUtils; this.networkUtils = networkUtils; this.emailService = emailService; @@ -350,7 +347,7 @@ public class UserSignupCEImpl implements UserSignupCE { // because // of any other secondary function mono throwing an exception sendInstallationSetupAnalytics(userFromRequest, user, userData) - .subscribeOn(commonConfig.scheduler()) + .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); Mono allSecondaryFunctions = Mono.when( diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/PluginServiceCEImplTest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/PluginServiceCEImplTest.java index a2f648007f..cb17b8cbf9 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/PluginServiceCEImplTest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/services/ce/PluginServiceCEImplTest.java @@ -19,7 +19,6 @@ import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.test.context.junit.jupiter.SpringExtension; -import reactor.core.scheduler.Scheduler; import java.io.IOException; import java.util.LinkedHashMap; @@ -33,9 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(SpringExtension.class) public class PluginServiceCEImplTest { - @MockBean - Scheduler scheduler; - @MockBean Validator validator; diff --git a/app/server/appsmith-server/src/test/java/com/appsmith/server/solutions/UserSignupTest.java b/app/server/appsmith-server/src/test/java/com/appsmith/server/solutions/UserSignupTest.java index eb85d05b67..b00c0e0bd6 100644 --- a/app/server/appsmith-server/src/test/java/com/appsmith/server/solutions/UserSignupTest.java +++ b/app/server/appsmith-server/src/test/java/com/appsmith/server/solutions/UserSignupTest.java @@ -1,7 +1,6 @@ package com.appsmith.server.solutions; import com.appsmith.server.authentication.handlers.AuthenticationSuccessHandler; -import com.appsmith.server.configurations.CommonConfig; import com.appsmith.server.domains.Tenant; import com.appsmith.server.domains.TenantConfiguration; import com.appsmith.server.domains.User; @@ -55,9 +54,6 @@ public class UserSignupTest { @MockBean private EnvManager envManager; - @MockBean - private CommonConfig commonConfig; - @MockBean private UserUtils userUtils; @@ -86,7 +82,6 @@ public class UserSignupTest { configService, analyticsService, envManager, - commonConfig, userUtils, networkUtils, emailService,