chore: subscribe file IO to elastic and publish to parallel thread when fetching plugins (#35213)

## Description
> This code change focuses on moving the File I/O operations related to
plugins API to elastic and parallel threads for subscription and
publishing. It will allow the `nioEventLoop` threadpool to not be locked
on file I/O.


Fixes #`Issue Number`  
_or_  
Fixes `Issue URL`
> [!WARNING]  
> _If no issue exists, please create an issue first, and check with the
maintainers if the issue is valid._

## Automation

/ok-to-test tags="@tag.Sanity"

### 🔍 Cypress test results
<!-- This is an auto-generated comment: Cypress test results  -->
> [!TIP]
> 🟢 🟢 🟢 All cypress tests have passed! 🎉 🎉 🎉
> Workflow run:
<https://github.com/appsmithorg/appsmith/actions/runs/10138493266>
> Commit: cc3b5f1ac44138985ba228e8b2d4f8e324ed6e7f
> <a
href="https://internal.appsmith.com/app/cypress-dashboard/rundetails-65890b3c81d7400d08fa9ee5?branch=master&workflowId=10138493266&attempt=1"
target="_blank">Cypress dashboard</a>.
> Tags: `@tag.Sanity`
> Spec:
> <hr>Mon, 29 Jul 2024 05:34:16 UTC
<!-- end of auto-generated comment: Cypress test results  -->


## Communication
Should the DevRel and Marketing teams inform users about this change?
- [ ] Yes
- [x] No


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced a new `LoadShifter` helper class for improved thread
scheduling in reactive programming.
- Enhanced asynchronous operations in various services using elastic and
parallel scheduling mechanisms.

- **Bug Fixes**
- Updated scheduling strategies in services to improve performance and
responsiveness under varying loads.

- **Chores**
- Streamlined dependency management by removing unnecessary `Scheduler`
parameters and dependencies in multiple classes.
  
- **Tests**
- Refined test setups by removing unused `Scheduler` and `CommonConfig`
mock beans for cleaner test definitions.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Nilesh Sarupriya <20905988+nsarupr@users.noreply.github.com>
This commit is contained in:
Nilesh Sarupriya 2024-07-29 12:02:22 +05:30 committed by GitHub
parent 492fb353d7
commit 871c4263a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 87 additions and 53 deletions

View File

@ -1,5 +1,6 @@
package com.appsmith.server.configurations;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.util.JSONPrettyPrinter;
import com.appsmith.util.SerializationUtils;
import com.fasterxml.jackson.core.PrettyPrinter;
@ -19,7 +20,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.time.Instant;
import java.util.ArrayList;
@ -34,7 +34,6 @@ import java.util.Set;
@Configuration
public class CommonConfig {
private static final String ELASTIC_THREAD_POOL_NAME = "appsmith-elastic-pool";
public static final Integer LATEST_INSTANCE_SCHEMA_VERSION = 2;
@Setter(AccessLevel.NONE)
@ -77,11 +76,8 @@ public class CommonConfig {
private static String adminEmailDomainHash;
@Bean
public Scheduler scheduler() {
return Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
ELASTIC_THREAD_POOL_NAME);
public Scheduler elasticScheduler() {
return LoadShifter.elasticScheduler;
}
@Bean

View File

@ -0,0 +1,47 @@
package com.appsmith.server.helpers;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* This class is used configure the load shifts for the Monos.
* The schedulers which are configured in the CommonConfig class.
*/
@Slf4j
public class LoadShifter {
private static final String PARALLEL_THREAD_POOL_NAME = "appsmith-parallel-pool";
private static final String ELASTIC_THREAD_POOL_NAME = "appsmith-elastic-pool";
public static final Scheduler parallelScheduler = Schedulers.newParallel(PARALLEL_THREAD_POOL_NAME);
public static final Scheduler elasticScheduler = Schedulers.newBoundedElastic(
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
ELASTIC_THREAD_POOL_NAME);
/**
* This method is used to shift the subscription from the current thread to the elastic scheduler
* and then publish the result on the parallel scheduler.
* @param mono The mono to be shifted.
* This mono will be subscribed on the elastic scheduler and the result will be published on the parallel scheduler.
* @param message The message to be logged.
* @return The shifted mono.
* @param <T> The type of the mono.
*/
public static <T> Mono<T> subscribeOnElasticPublishOnParallel(Mono<T> mono, String message) {
return mono.subscribeOn(elasticScheduler).publishOn(parallelScheduler);
}
/**
* This method is used to shift the subscription from the current thread to the elastic scheduler.
* @param mono The mono to be shifted.
* This mono will be subscribed on the elastic scheduler.
* @param message The message to be logged.
* @return The shifted mono.
* @param <T> The type of the mono.
*/
public static <T> Mono<T> subscribeOnElastic(Mono<T> mono, String message) {
return mono.subscribeOn(elasticScheduler);
}
}

View File

@ -9,6 +9,7 @@ import com.appsmith.server.domains.Config;
import com.appsmith.server.dtos.ResponseDTO;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.server.helpers.NetworkUtils;
import com.appsmith.server.helpers.RTSCaller;
import com.appsmith.server.services.AnalyticsService;
@ -125,7 +126,7 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE {
analyticsProperties,
false);
})
.subscribeOn(commonConfig.scheduler())
.subscribeOn(LoadShifter.elasticScheduler)
.subscribe();
}

View File

@ -11,6 +11,7 @@ import com.appsmith.server.dtos.PluginWorkspaceDTO;
import com.appsmith.server.dtos.WorkspacePluginStatus;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.server.repositories.PluginRepository;
import com.appsmith.server.services.AnalyticsService;
import com.appsmith.server.services.BaseService;
@ -403,7 +404,13 @@ public class PluginServiceCEImpl extends BaseService<PluginRepository, Plugin, S
})
.cache();
templateCache.put(pluginId, mono);
/*
* The method loadTemplatesFromPlugin is reads a file from the system, and this is a blocking process.
* Since, we need to keep the nioEventLoop thread pool free, we are shifting the subscription to elastic
* thread pool and then publishing the result on the parallel thread pool.
*/
templateCache.put(
pluginId, LoadShifter.subscribeOnElasticPublishOnParallel(mono, "loadTemplatesFromPlugin"));
}
return templateCache.get(pluginId);
@ -600,24 +607,32 @@ public class PluginServiceCEImpl extends BaseService<PluginRepository, Plugin, S
@Override
public Mono<Map<?, ?>> loadPluginResource(String pluginId, String resourcePath) {
return findById(pluginId).map(plugin -> {
return findById(pluginId).flatMap(plugin -> {
if ("editor.json".equals(resourcePath)) {
// UI config will be available if this plugin is sourced from the cloud
if (plugin.getActionUiConfig() != null) {
return plugin.getActionUiConfig();
return Mono.just(plugin.getActionUiConfig());
}
// For UQI, use another format of loading the config
if (UQI_DB_EDITOR_FORM.equals(plugin.getUiComponent())) {
return loadEditorPluginResourceUqi(plugin);
return Mono.just(loadEditorPluginResourceUqi(plugin));
}
}
if ("form.json".equals(resourcePath)) {
// UI config will be available if this plugin is sourced from the cloud
if (plugin.getDatasourceUiConfig() != null) {
return plugin.getDatasourceUiConfig();
return Mono.just(plugin.getDatasourceUiConfig());
}
}
return loadPluginResourceGivenPluginAsMap(plugin, resourcePath);
/*
* The method loadPluginResourceGivenPluginAsMap is reads a file from the system, and this is a blocking
* process. Since, we need to keep the nioEventLoop thread pool free, we are shifting the subscription to
* elastic thread pool and then publishing the result on the parallel thread pool.
*/
Mono<? extends Map<?, ?>> pluginResourceMono =
Mono.fromCallable(() -> loadPluginResourceGivenPluginAsMap(plugin, resourcePath));
return LoadShifter.subscribeOnElasticPublishOnParallel(pluginResourceMono, "pluginResourceMono");
});
}

View File

@ -2,13 +2,12 @@ package com.appsmith.server.ratelimiting.ce;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.server.helpers.RedisUtils;
import com.appsmith.server.ratelimiting.RateLimitConfig;
import io.github.bucket4j.distributed.BucketProxy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Map;
@ -19,7 +18,6 @@ import static java.lang.Boolean.TRUE;
@Slf4j
public class RateLimitServiceCEImpl implements RateLimitServiceCE {
private final Scheduler scheduler = Schedulers.boundedElastic();
private final Map<String, BucketProxy> apiBuckets;
private final RateLimitConfig rateLimitConfig;
// this number of tokens can later be customised per API in the configuration.
@ -58,7 +56,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE {
})
// Since we are interacting with redis, we want to make sure that the operation is done on a separate
// thread pool
.subscribeOn(scheduler);
.subscribeOn(LoadShifter.elasticScheduler);
}
@Override
@ -75,7 +73,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE {
.then()
// Since we are interacting with redis, we want to make sure that the operation is done on a separate
// thread pool
.subscribeOn(scheduler);
.subscribeOn(LoadShifter.elasticScheduler);
}
/* **************************************************************************************************** */
@ -112,7 +110,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE {
})
// Since we are interacting with redis, we want to make sure that the operation is done on a separate
// thread pool
.subscribeOn(scheduler);
.subscribeOn(LoadShifter.elasticScheduler);
}
/*
@ -139,7 +137,7 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE {
})
// Since we are interacting with redis, we want to make sure that the operation is done on a separate
// thread pool
.subscribeOn(scheduler);
.subscribeOn(LoadShifter.elasticScheduler);
}
private Mono<Boolean> sanitizeInput(String apiIdentifier, String userIdentifier) {
@ -156,6 +154,6 @@ public class RateLimitServiceCEImpl implements RateLimitServiceCE {
return Mono.just(true);
})
.subscribeOn(scheduler);
.subscribeOn(LoadShifter.elasticScheduler);
}
}

View File

@ -10,13 +10,11 @@ import io.micrometer.observation.ObservationRegistry;
import jakarta.validation.Validator;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import reactor.core.scheduler.Scheduler;
@Service
public class TenantServiceImpl extends TenantServiceCEImpl implements TenantService {
public TenantServiceImpl(
Scheduler scheduler,
Validator validator,
TenantRepository repository,
AnalyticsService analyticsService,

View File

@ -4,6 +4,7 @@ import com.appsmith.server.configurations.CommonConfig;
import com.appsmith.server.dtos.ProductAlertResponseDTO;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.LoadShifter;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.EvaluationContext;
@ -14,8 +15,6 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@ -31,8 +30,6 @@ public class ProductAlertServiceCEImpl implements ProductAlertServiceCE {
private final ProductAlertResponseDTO[] messages;
private final Scheduler scheduler = Schedulers.boundedElastic();
public ProductAlertServiceCEImpl(ObjectMapper objectMapper, CommonConfig commonConfig) {
this.commonConfig = commonConfig;
this.mapper = objectMapper;
@ -64,7 +61,7 @@ public class ProductAlertServiceCEImpl implements ProductAlertServiceCE {
log.error("exception while getting and filtering product alert messages", error);
throw new AppsmithException(AppsmithError.INTERNAL_SERVER_ERROR, error.getMessage());
})
.subscribeOn(scheduler);
.subscribeOn(LoadShifter.elasticScheduler);
}
public Boolean evaluateAlertApplicability(ProductAlertResponseDTO productAlertResponseDTO) {

View File

@ -3,10 +3,9 @@ package com.appsmith.server.solutions;
import com.appsmith.server.services.FeatureFlagService;
import com.appsmith.server.services.TenantService;
import com.appsmith.server.solutions.ce.ScheduledTaskCEImpl;
import reactor.core.scheduler.Scheduler;
public class ScheduledTaskImpl extends ScheduledTaskCEImpl implements ScheduledTask {
public ScheduledTaskImpl(FeatureFlagService featureFlagService, TenantService tenantService, Scheduler scheduler) {
super(featureFlagService, tenantService, scheduler);
public ScheduledTaskImpl(FeatureFlagService featureFlagService, TenantService tenantService) {
super(featureFlagService, tenantService);
}
}

View File

@ -1,7 +1,6 @@
package com.appsmith.server.solutions;
import com.appsmith.server.authentication.handlers.AuthenticationSuccessHandler;
import com.appsmith.server.configurations.CommonConfig;
import com.appsmith.server.helpers.NetworkUtils;
import com.appsmith.server.helpers.UserUtils;
import com.appsmith.server.services.AnalyticsService;
@ -27,7 +26,6 @@ public class UserSignupImpl extends UserSignupCEImpl implements UserSignup {
ConfigService configService,
AnalyticsService analyticsService,
EnvManager envManager,
CommonConfig commonConfig,
UserUtils userUtils,
NetworkUtils networkUtils,
EmailService emailService,
@ -41,7 +39,6 @@ public class UserSignupImpl extends UserSignupCEImpl implements UserSignup {
configService,
analyticsService,
envManager,
commonConfig,
userUtils,
networkUtils,
emailService,

View File

@ -1,5 +1,6 @@
package com.appsmith.server.solutions.ce;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.server.services.FeatureFlagService;
import com.appsmith.server.services.TenantService;
import io.micrometer.observation.annotation.Observed;
@ -7,7 +8,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.scheduler.Scheduler;
@RequiredArgsConstructor
@Slf4j
@ -18,8 +18,6 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE {
private final TenantService tenantService;
private final Scheduler scheduler;
@Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 30 * 60 * 1000 /* thirty minutes */)
@Observed(name = "fetchFeatures")
public void fetchFeatures() {
@ -31,7 +29,7 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE {
.flatMap(featureFlagService::checkAndExecuteMigrationsForTenantFeatureFlags)
.then(tenantService.restartTenant()))
.doOnError(error -> log.error("Error while fetching tenant feature flags", error))
.subscribeOn(scheduler)
.subscribeOn(LoadShifter.elasticScheduler)
.subscribe();
}
}

View File

@ -2,7 +2,6 @@ package com.appsmith.server.solutions.ce;
import com.appsmith.external.constants.AnalyticsEvents;
import com.appsmith.server.authentication.handlers.AuthenticationSuccessHandler;
import com.appsmith.server.configurations.CommonConfig;
import com.appsmith.server.constants.FieldName;
import com.appsmith.server.domains.LoginSource;
import com.appsmith.server.domains.Tenant;
@ -14,6 +13,7 @@ import com.appsmith.server.dtos.UserSignupDTO;
import com.appsmith.server.dtos.UserSignupRequestDTO;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.server.helpers.NetworkUtils;
import com.appsmith.server.helpers.UserUtils;
import com.appsmith.server.services.AnalyticsService;
@ -79,7 +79,6 @@ public class UserSignupCEImpl implements UserSignupCE {
private final ConfigService configService;
private final AnalyticsService analyticsService;
private final EnvManager envManager;
private final CommonConfig commonConfig;
private final UserUtils userUtils;
private final NetworkUtils networkUtils;
private final EmailService emailService;
@ -97,7 +96,6 @@ public class UserSignupCEImpl implements UserSignupCE {
ConfigService configService,
AnalyticsService analyticsService,
EnvManager envManager,
CommonConfig commonConfig,
UserUtils userUtils,
NetworkUtils networkUtils,
EmailService emailService,
@ -110,7 +108,6 @@ public class UserSignupCEImpl implements UserSignupCE {
this.configService = configService;
this.analyticsService = analyticsService;
this.envManager = envManager;
this.commonConfig = commonConfig;
this.userUtils = userUtils;
this.networkUtils = networkUtils;
this.emailService = emailService;
@ -350,7 +347,7 @@ public class UserSignupCEImpl implements UserSignupCE {
// because
// of any other secondary function mono throwing an exception
sendInstallationSetupAnalytics(userFromRequest, user, userData)
.subscribeOn(commonConfig.scheduler())
.subscribeOn(LoadShifter.elasticScheduler)
.subscribe();
Mono<Long> allSecondaryFunctions = Mono.when(

View File

@ -19,7 +19,6 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.scheduler.Scheduler;
import java.io.IOException;
import java.util.LinkedHashMap;
@ -33,9 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(SpringExtension.class)
public class PluginServiceCEImplTest {
@MockBean
Scheduler scheduler;
@MockBean
Validator validator;

View File

@ -1,7 +1,6 @@
package com.appsmith.server.solutions;
import com.appsmith.server.authentication.handlers.AuthenticationSuccessHandler;
import com.appsmith.server.configurations.CommonConfig;
import com.appsmith.server.domains.Tenant;
import com.appsmith.server.domains.TenantConfiguration;
import com.appsmith.server.domains.User;
@ -55,9 +54,6 @@ public class UserSignupTest {
@MockBean
private EnvManager envManager;
@MockBean
private CommonConfig commonConfig;
@MockBean
private UserUtils userUtils;
@ -86,7 +82,6 @@ public class UserSignupTest {
configService,
analyticsService,
envManager,
commonConfig,
userUtils,
networkUtils,
emailService,