fix: Making login rate limit service reactive (#27903)

Refactor : While debugging the production downtime, took a deeper look
into the code path of rate limiting. Blocking calls to redis were being
made on the main threads which can impact the overall response times of
the appsmith server (since the number of main threads equals no of CPUs
on the machine). No blocking calls should be made in the main thread.
Moving executions to bounded elastic threadpool and making the reset
flow reactive. Also the code was not split according to the code split
guidelines of the server. Taking care of that as well.
This commit is contained in:
Trisha Anand 2023-10-10 16:52:40 +05:30 committed by GitHub
parent 31ce25f275
commit 7a3792ed97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 144 additions and 55 deletions

View File

@ -254,7 +254,6 @@ public class AuthenticationSuccessHandlerCE implements ServerAuthenticationSucce
log.debug("Login succeeded for user: {}", authentication.getPrincipal()); log.debug("Login succeeded for user: {}", authentication.getPrincipal());
Mono<Void> redirectionMono = null; Mono<Void> redirectionMono = null;
User user = (User) authentication.getPrincipal(); User user = (User) authentication.getPrincipal();
rateLimitService.resetCounter(RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, user.getEmail());
String originHeader = String originHeader =
webFilterExchange.getExchange().getRequest().getHeaders().getOrigin(); webFilterExchange.getExchange().getRequest().getHeaders().getOrigin();
@ -316,6 +315,11 @@ public class AuthenticationSuccessHandlerCE implements ServerAuthenticationSucce
.getCurrentUser() .getCurrentUser()
.flatMap(currentUser -> { .flatMap(currentUser -> {
List<Mono<?>> monos = new ArrayList<>(); List<Mono<?>> monos = new ArrayList<>();
// Since the user has successfully logged in, lets reset the rate limit counter for the user.
monos.add(rateLimitService.resetCounter(
RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, user.getEmail()));
monos.add(userDataService.ensureViewedCurrentVersionReleaseNotes(currentUser)); monos.add(userDataService.ensureViewedCurrentVersionReleaseNotes(currentUser));
String modeOfLogin = FieldName.FORM_LOGIN; String modeOfLogin = FieldName.FORM_LOGIN;

View File

@ -15,6 +15,8 @@ import java.net.URI;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import static java.lang.Boolean.FALSE;
@Slf4j @Slf4j
public class PreAuth implements WebFilter { public class PreAuth implements WebFilter {
@ -27,21 +29,23 @@ public class PreAuth implements WebFilter {
@Override @Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
Mono<Void> filterMono = chain.filter(exchange);
return getUsername(exchange).flatMap(username -> { return getUsername(exchange).flatMap(username -> {
if (!username.isEmpty()) { if (!username.isEmpty()) {
return rateLimitService return rateLimitService
.tryIncreaseCounter(RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, username) .tryIncreaseCounter(RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, username)
.flatMap(counterIncreaseAttemptSuccessful -> { .flatMap(counterIncreaseAttemptSuccessful -> {
if (Boolean.FALSE.equals(counterIncreaseAttemptSuccessful)) { if (FALSE.equals(counterIncreaseAttemptSuccessful)) {
log.error("Rate limit exceeded. Redirecting to login page.");
return handleRateLimitExceeded(exchange); return handleRateLimitExceeded(exchange);
} }
return chain.filter(exchange); return filterMono;
}); });
} else { } else {
// If username is empty, simply continue with the filter chain // If username is empty, simply continue with the filter chain
return chain.filter(exchange); return filterMono;
} }
}); });
} }

View File

@ -10,6 +10,7 @@ import io.github.bucket4j.redis.lettuce.cas.LettuceBasedProxyManager;
import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient; import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.RedisClusterClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -20,8 +21,9 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
@Configuration @Configuration
@Slf4j
public class RateLimitConfig { public class RateLimitConfig {
private static final Map<String, BucketConfiguration> apiConfigurations = new HashMap<>(); private static final Map<String, BucketConfiguration> apiConfigurationMap = new HashMap<>();
@Autowired @Autowired
private final AbstractRedisClient redisClient; private final AbstractRedisClient redisClient;
@ -31,7 +33,7 @@ public class RateLimitConfig {
} }
static { static {
apiConfigurations.put( apiConfigurationMap.put(
RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, createBucketConfiguration(Duration.ofDays(1), 5)); RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, createBucketConfiguration(Duration.ofDays(1), 5));
// Add more API configurations as needed // Add more API configurations as needed
} }
@ -60,7 +62,7 @@ public class RateLimitConfig {
public Map<String, BucketProxy> apiBuckets() { public Map<String, BucketProxy> apiBuckets() {
Map<String, BucketProxy> apiBuckets = new HashMap<>(); Map<String, BucketProxy> apiBuckets = new HashMap<>();
apiConfigurations.forEach((apiIdentifier, configuration) -> apiConfigurationMap.forEach((apiIdentifier, configuration) ->
apiBuckets.put(apiIdentifier, proxyManager().builder().build(apiIdentifier.getBytes(), configuration))); apiBuckets.put(apiIdentifier, proxyManager().builder().build(apiIdentifier.getBytes(), configuration)));
return apiBuckets; return apiBuckets;
@ -73,7 +75,7 @@ public class RateLimitConfig {
return proxyManager().builder().build(bucketIdentifier.getBytes(), bucketProxy.get()); return proxyManager().builder().build(bucketIdentifier.getBytes(), bucketProxy.get());
} }
return proxyManager().builder().build(bucketIdentifier.getBytes(), apiConfigurations.get(apiIdentifier)); return proxyManager().builder().build(bucketIdentifier.getBytes(), apiConfigurationMap.get(apiIdentifier));
} }
private static BucketConfiguration createBucketConfiguration(Duration refillDuration, int limit) { private static BucketConfiguration createBucketConfiguration(Duration refillDuration, int limit) {

View File

@ -1,45 +1,5 @@
package com.appsmith.server.ratelimiting; package com.appsmith.server.ratelimiting;
import io.github.bucket4j.distributed.BucketProxy; import com.appsmith.server.ratelimiting.ce.RateLimitServiceCE;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.Map; public interface RateLimitService extends RateLimitServiceCE {}
@Slf4j
@Service
public class RateLimitService {
private final Map<String, BucketProxy> apiBuckets;
private final RateLimitConfig rateLimitConfig;
// this number of tokens var can later be customised per API in the configuration.
private final Integer DEFAULT_NUMBER_OF_TOKENS_CONSUMED_PER_REQUEST = 1;
public RateLimitService(Map<String, BucketProxy> apiBuckets, RateLimitConfig rateLimitConfig) {
this.apiBuckets = apiBuckets;
this.rateLimitConfig = rateLimitConfig;
}
public Mono<Boolean> tryIncreaseCounter(String apiIdentifier, String userIdentifier) {
log.debug(
"RateLimitService.tryIncreaseCounter() called with apiIdentifier = {}, userIdentifier = {}",
apiIdentifier,
userIdentifier);
// handle the case where API itself is not rate limited
log.debug(
apiBuckets.containsKey(apiIdentifier) ? "apiBuckets contains key" : "apiBuckets does not contain key");
if (!apiBuckets.containsKey(apiIdentifier)) return Mono.just(false);
BucketProxy userSpecificBucket =
rateLimitConfig.getOrCreateAPIUserSpecificBucket(apiIdentifier, userIdentifier);
log.debug("userSpecificBucket = {}", userSpecificBucket);
return Mono.just(userSpecificBucket.tryConsume(DEFAULT_NUMBER_OF_TOKENS_CONSUMED_PER_REQUEST));
}
public void resetCounter(String apiIdentifier, String userIdentifier) {
rateLimitConfig
.getOrCreateAPIUserSpecificBucket(apiIdentifier, userIdentifier)
.reset();
}
}

View File

@ -0,0 +1,17 @@
package com.appsmith.server.ratelimiting;
import com.appsmith.server.ratelimiting.ce.RateLimitServiceCEImpl;
import io.github.bucket4j.distributed.BucketProxy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.Map;
@Slf4j
@Service
public class RateLimitServiceImpl extends RateLimitServiceCEImpl implements RateLimitService {
public RateLimitServiceImpl(Map<String, BucketProxy> apiBuckets, RateLimitConfig rateLimitConfig) {
super(apiBuckets, rateLimitConfig);
}
}

View File

@ -42,8 +42,7 @@ public class RateLimitAspect {
Object result = joinPoint.proceed(); Object result = joinPoint.proceed();
return result instanceof Mono ? (Mono) result : Mono.just(result); return result instanceof Mono ? (Mono) result : Mono.just(result);
} catch (Throwable e) { } catch (Throwable e) {
AppsmithError error = AppsmithError.INTERNAL_SERVER_ERROR; return Mono.error(new AppsmithException(AppsmithError.INTERNAL_SERVER_ERROR));
throw new AppsmithException(error, e.getMessage());
} }
}); });
}); });

View File

@ -0,0 +1,9 @@
package com.appsmith.server.ratelimiting.ce;
import reactor.core.publisher.Mono;
public interface RateLimitServiceCE {
Mono<Boolean> tryIncreaseCounter(String apiIdentifier, String userIdentifier);
Mono<Void> resetCounter(String apiIdentifier, String userIdentifier);
}

View File

@ -0,0 +1,90 @@
package com.appsmith.server.ratelimiting.ce;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.ratelimiting.RateLimitConfig;
import io.github.bucket4j.distributed.BucketProxy;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.Map;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
@Slf4j
public class RateLimitServiceCEImpl implements RateLimitServiceCE {
private final Scheduler scheduler = Schedulers.boundedElastic();
private final Map<String, BucketProxy> apiBuckets;
private final RateLimitConfig rateLimitConfig;
// this number of tokens can later be customised per API in the configuration.
private final Integer DEFAULT_NUMBER_OF_TOKENS_CONSUMED_PER_REQUEST = 1;
public RateLimitServiceCEImpl(Map<String, BucketProxy> apiBuckets, RateLimitConfig rateLimitConfig) {
this.apiBuckets = apiBuckets;
this.rateLimitConfig = rateLimitConfig;
}
@Override
public Mono<Boolean> tryIncreaseCounter(String apiIdentifier, String userIdentifier) {
return sanitizeInput(apiIdentifier, userIdentifier)
.flatMap(isInputValid -> {
BucketProxy userSpecificBucket =
rateLimitConfig.getOrCreateAPIUserSpecificBucket(apiIdentifier, userIdentifier);
return Mono.just(userSpecificBucket.tryConsume(DEFAULT_NUMBER_OF_TOKENS_CONSUMED_PER_REQUEST));
})
.map(isSuccessful -> {
if (FALSE.equals(isSuccessful)) {
log.debug(
"{} - Rate Limit exceeded for apiIdentifier = {}, userIdentifier = {}",
Thread.currentThread().getName(),
apiIdentifier,
userIdentifier);
}
return isSuccessful;
})
// Since we are interacting with redis, we want to make sure that the operation is done on a separate
// thread pool
.subscribeOn(scheduler);
}
@Override
public Mono<Void> resetCounter(String apiIdentifier, String userIdentifier) {
return sanitizeInput(apiIdentifier, userIdentifier)
.flatMap(isInputValid -> {
rateLimitConfig
.getOrCreateAPIUserSpecificBucket(apiIdentifier, userIdentifier)
.reset();
return Mono.just(TRUE);
})
.then()
// Since we are interacting with redis, we want to make sure that the operation is done on a separate
// thread pool
.subscribeOn(scheduler);
}
private Mono<Boolean> sanitizeInput(String apiIdentifier, String userIdentifier) {
if (userIdentifier == null) {
return Mono.error(new AppsmithException(AppsmithError.INTERNAL_SERVER_ERROR));
}
return Mono.just(userIdentifier)
.flatMap(username -> {
// Handle the case where API itself is not rate limited.
if (!apiBuckets.containsKey(apiIdentifier)) {
return Mono.error(new AppsmithException(AppsmithError.UNSUPPORTED_OPERATION));
}
return Mono.just(true);
})
.subscribeOn(scheduler);
}
}

View File

@ -425,8 +425,12 @@ public class UserServiceCEImpl extends BaseService<UserRepository, User, String>
.subscribe(); .subscribe();
// we reset the counter for user's login attempts once password is reset // we reset the counter for user's login attempts once password is reset
rateLimitService.resetCounter( rateLimitService
RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API, userFromDb.getEmail()); .resetCounter(
RateLimitConstants.BUCKET_KEY_FOR_LOGIN_API,
userFromDb.getEmail())
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}) })
.thenReturn(true); .thenReturn(true);
})); }));