fix: Handle burst traffic for fetching feature flags during cron job (#40808)

This commit is contained in:
Abhijeet 2025-05-30 12:58:44 +05:30 committed by GitHub
parent 0377b37bbc
commit 39636498ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 95 additions and 11 deletions

View File

@ -24,6 +24,7 @@ import reactor.netty.resources.ConnectionProvider;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -44,6 +45,22 @@ public class WebClientUtils {
public static final ExchangeFilterFunction IP_CHECK_FILTER =
ExchangeFilterFunction.ofRequestProcessor(WebClientUtils::requestFilterFn);
// Cloud Services specific configuration
public static final Duration CLOUD_SERVICES_API_TIMEOUT = Duration.ofSeconds(60);
// Dedicated connection pool for Cloud Services API calls to prevent connection exhaustion
public static final ConnectionProvider CLOUD_SERVICES_CONNECTION_PROVIDER = ConnectionProvider.builder(
"cloud-services")
.maxConnections(100)
.maxIdleTime(Duration.ofSeconds(30))
.maxLifeTime(Duration.ofSeconds(120))
.pendingAcquireTimeout(Duration.ofSeconds(10))
.evictInBackground(Duration.ofSeconds(150))
.build();
// Singleton WebClient instance for Cloud Services to avoid creating multiple instances
private static volatile WebClient cloudServicesWebClient;
private WebClientUtils() {}
private static Set<String> computeDisallowedHosts() {
@ -74,6 +91,52 @@ public class WebClientUtils {
return builder(provider).baseUrl(baseUrl).build();
}
/**
* Creates a WebClient specifically optimized for Cloud Services API calls.
* This WebClient includes:
* - Dedicated connection pool to prevent connection exhaustion
* - Optimized timeouts for CS API patterns
* - Standard IP filtering and memory limits
*
* Returns a singleton instance to avoid creating multiple WebClient instances.
*
* @return Singleton WebClient configured for Cloud Services calls
*/
public static WebClient createForCloudServices() {
if (cloudServicesWebClient == null) {
synchronized (WebClientUtils.class) {
if (cloudServicesWebClient == null) {
cloudServicesWebClient =
builder(CLOUD_SERVICES_CONNECTION_PROVIDER).build();
}
}
}
return cloudServicesWebClient;
}
/**
* Gets the singleton WebClient instance for Cloud Services.
* This is an alias for createForCloudServices() but makes the singleton nature more explicit.
*
* @return Singleton WebClient configured for Cloud Services calls
*/
public static WebClient getCloudServicesWebClient() {
return createForCloudServices();
}
/**
* Resets the singleton Cloud Services WebClient instance.
* This method is primarily intended for testing purposes.
*
* @deprecated This method should only be used in tests
*/
@Deprecated
static void resetCloudServicesWebClient() {
synchronized (WebClientUtils.class) {
cloudServicesWebClient = null;
}
}
private static boolean shouldUseSystemProxy() {
return "true".equals(System.getProperty("java.net.useSystemProxies"))
&& (!System.getProperty("http.proxyHost", "").isEmpty()

View File

@ -21,7 +21,6 @@ import com.appsmith.server.services.ConfigService;
import com.appsmith.server.services.UserIdentifierService;
import com.appsmith.server.solutions.ReleaseNotesService;
import com.appsmith.util.WebClientUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.core.ParameterizedTypeReference;
@ -29,6 +28,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.time.Instant;
@ -42,8 +42,11 @@ import static com.appsmith.server.constants.ApiConstants.CLOUD_SERVICES_SIGNATUR
import static com.appsmith.server.constants.ce.FieldNameCE.DEFAULT;
@Slf4j
@RequiredArgsConstructor
public class CacheableFeatureFlagHelperCEImpl implements CacheableFeatureFlagHelperCE {
// Dedicated WebClient for Cloud Services calls with optimized connection pool
private final WebClient cloudServicesWebClient;
private final OrganizationRepository organizationRepository;
private final ConfigService configService;
private final CloudServicesConfig cloudServicesConfig;
@ -51,6 +54,24 @@ public class CacheableFeatureFlagHelperCEImpl implements CacheableFeatureFlagHel
private final UserIdentifierService userIdentifierService;
private final ReleaseNotesService releaseNotesService;
public CacheableFeatureFlagHelperCEImpl(
OrganizationRepository organizationRepository,
ConfigService configService,
CloudServicesConfig cloudServicesConfig,
CommonConfig commonConfig,
UserIdentifierService userIdentifierService,
ReleaseNotesService releaseNotesService) {
this.organizationRepository = organizationRepository;
this.configService = configService;
this.cloudServicesConfig = cloudServicesConfig;
this.commonConfig = commonConfig;
this.userIdentifierService = userIdentifierService;
this.releaseNotesService = releaseNotesService;
// Initialize dedicated WebClient for Cloud Services with optimized connection pool
this.cloudServicesWebClient = WebClientUtils.createForCloudServices();
}
@Cache(cacheName = "featureFlag", key = "{#userIdentifier}")
@Override
public Mono<CachedFlags> fetchUserCachedFlags(String userIdentifier, User user) {
@ -131,9 +152,9 @@ public class CacheableFeatureFlagHelperCEImpl implements CacheableFeatureFlagHel
*/
private Mono<Map<String, Map<String, Boolean>>> getRemoteFeatureFlagsByIdentity(
FeatureFlagIdentityTraits featureFlagIdentityTraits) {
return WebClientUtils.create(cloudServicesConfig.getBaseUrlWithSignatureVerification())
return cloudServicesWebClient
.post()
.uri("/api/v1/feature-flags")
.uri(cloudServicesConfig.getBaseUrlWithSignatureVerification() + "/api/v1/feature-flags")
.body(BodyInserters.fromValue(featureFlagIdentityTraits))
.exchangeToMono(clientResponse -> {
if (clientResponse.statusCode().is2xxSuccessful()) {
@ -149,6 +170,7 @@ public class CacheableFeatureFlagHelperCEImpl implements CacheableFeatureFlagHel
}
})
.map(ResponseDTO::getData)
.timeout(WebClientUtils.CLOUD_SERVICES_API_TIMEOUT)
.onErrorMap(
// Only map errors if we haven't already wrapped them into an AppsmithException
e -> !(e instanceof AppsmithException),
@ -231,10 +253,9 @@ public class CacheableFeatureFlagHelperCEImpl implements CacheableFeatureFlagHel
*/
@Override
public Mono<FeaturesResponseDTO> getRemoteFeaturesForOrganization(FeaturesRequestDTO featuresRequestDTO) {
Mono<ResponseEntity<ResponseDTO<FeaturesResponseDTO>>> responseEntityMono = WebClientUtils.create(
cloudServicesConfig.getBaseUrlWithSignatureVerification())
Mono<ResponseEntity<ResponseDTO<FeaturesResponseDTO>>> responseEntityMono = cloudServicesWebClient
.post()
.uri("/api/v1/business-features")
.uri(cloudServicesConfig.getBaseUrlWithSignatureVerification() + "/api/v1/business-features")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(BodyInserters.fromValue(featuresRequestDTO))
@ -251,6 +272,7 @@ public class CacheableFeatureFlagHelperCEImpl implements CacheableFeatureFlagHel
return Mono.just(Objects.requireNonNull(entity.getBody()));
})
.map(ResponseDTO::getData)
.timeout(WebClientUtils.CLOUD_SERVICES_API_TIMEOUT)
.onErrorMap(
// Only map errors if we haven't already wrapped them into an AppsmithException
e -> !(e instanceof AppsmithException),

View File

@ -7,7 +7,6 @@ import com.appsmith.server.configurations.DeploymentProperties;
import com.appsmith.server.configurations.ProjectProperties;
import com.appsmith.server.configurations.SegmentConfig;
import com.appsmith.server.domains.Organization;
import com.appsmith.server.helpers.LoadShifter;
import com.appsmith.server.helpers.NetworkUtils;
import com.appsmith.server.repositories.ApplicationRepository;
import com.appsmith.server.repositories.DatasourceRepository;
@ -258,10 +257,10 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE {
UserTrackingType.MAU.name(), tuple.getT3()));
}
@Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 30 * 60 * 1000 /* thirty minutes */)
@Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 1 * 60 * 60 * 1000 /* one hour */)
@DistributedLock(
key = "fetchFeatures",
ttl = 20 * 60, // 20 minutes
ttl = 45 * 60, // 45 minutes
shouldReleaseLock = false) // Ensure only one pod executes this
@Observed(name = "fetchFeatures")
public void fetchFeatures() {
@ -277,7 +276,7 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE {
return Mono.empty();
})
.contextWrite(Context.of(ORGANIZATION_ID, organization.getId())))
.subscribeOn(LoadShifter.elasticScheduler)
.subscribeOn(Schedulers.single())
.subscribe();
}
}