Handle stale database connection from datasources (#151)
* Handle stale database connection from datasources * Fix potential secondary case of stale connection error * Fix Postgres to MySQL * Move validity check timeout to a constant field * Add test for recovery when stale connection error is thrown
This commit is contained in:
parent
8e5fb14f70
commit
dfcabab4cc
|
|
@ -29,7 +29,7 @@ public class AppsmithPluginException extends Exception {
|
|||
}
|
||||
|
||||
public Integer getAppErrorCode() {
|
||||
return this.error.getAppErrorCode();
|
||||
return this.error == null ? 0 : this.error.getAppErrorCode();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
package com.appsmith.external.pluginExceptions;
|
||||
|
||||
public class StaleConnectionException extends RuntimeException {
|
||||
}
|
||||
|
|
@ -1,8 +1,14 @@
|
|||
package com.external.plugins;
|
||||
|
||||
import com.appsmith.external.models.*;
|
||||
import com.appsmith.external.models.ActionConfiguration;
|
||||
import com.appsmith.external.models.ActionExecutionResult;
|
||||
import com.appsmith.external.models.AuthenticationDTO;
|
||||
import com.appsmith.external.models.DatasourceConfiguration;
|
||||
import com.appsmith.external.models.DatasourceTestResult;
|
||||
import com.appsmith.external.models.Endpoint;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginError;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginException;
|
||||
import com.appsmith.external.pluginExceptions.StaleConnectionException;
|
||||
import com.appsmith.external.plugins.BasePlugin;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
|
@ -13,9 +19,19 @@ import org.springframework.util.CollectionUtils;
|
|||
import org.springframework.util.StringUtils;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.sql.*;
|
||||
import java.sql.Connection;
|
||||
import java.util.*;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import static com.appsmith.external.models.Connection.Mode.READ_ONLY;
|
||||
|
||||
|
|
@ -25,6 +41,7 @@ public class MySqlPlugin extends BasePlugin {
|
|||
|
||||
private static final String USER = "user";
|
||||
private static final String PASSWORD = "password";
|
||||
private static final int VALIDITY_CHECK_TIMEOUT = 5;
|
||||
|
||||
public MySqlPlugin(PluginWrapper wrapper) {
|
||||
super(wrapper);
|
||||
|
|
@ -39,11 +56,23 @@ public class MySqlPlugin extends BasePlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<Object> execute(Object connection, DatasourceConfiguration datasourceConfiguration,
|
||||
public Mono<Object> execute(Object connection,
|
||||
DatasourceConfiguration datasourceConfiguration,
|
||||
ActionConfiguration actionConfiguration) {
|
||||
|
||||
Connection conn = (Connection) connection;
|
||||
|
||||
try {
|
||||
if (conn == null || conn.isClosed() || !conn.isValid(VALIDITY_CHECK_TIMEOUT)) {
|
||||
log.info("Encountered stale connection in MySQL plugin. Reporting back.");
|
||||
throw new StaleConnectionException();
|
||||
}
|
||||
} catch (SQLException error) {
|
||||
// This exception is thrown only when the timeout to `isValid` is negative. Since, that's not the case,
|
||||
// here, this should never happen.
|
||||
log.error("Error checking validity of MySQL connection.", error);
|
||||
}
|
||||
|
||||
String query = actionConfiguration.getBody();
|
||||
|
||||
if (query == null) {
|
||||
|
|
@ -149,7 +178,7 @@ public class MySqlPlugin extends BasePlugin {
|
|||
configurationConnection != null && READ_ONLY.equals(configurationConnection.getMode()));
|
||||
return Mono.just(connection);
|
||||
} catch (SQLException e) {
|
||||
return pluginErrorMono("Error connecting to MySql.", e);
|
||||
return pluginErrorMono("Error connecting to MySQL: " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import com.appsmith.external.models.Endpoint;
|
|||
import com.appsmith.external.models.SSLDetails;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginError;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginException;
|
||||
import com.appsmith.external.pluginExceptions.StaleConnectionException;
|
||||
import com.appsmith.external.plugins.BasePlugin;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import lombok.NonNull;
|
||||
|
|
@ -47,6 +48,7 @@ public class PostgresPlugin extends BasePlugin {
|
|||
private static final String PASSWORD = "password";
|
||||
private static final String SSL = "ssl";
|
||||
private static final String DATE_COLUMN_TYPE_NAME = "date";
|
||||
private static final int VALIDITY_CHECK_TIMEOUT = 5;
|
||||
|
||||
public PostgresPlugin(PluginWrapper wrapper) {
|
||||
super(wrapper);
|
||||
|
|
@ -61,12 +63,23 @@ public class PostgresPlugin extends BasePlugin {
|
|||
public static class PostgresPluginExecutor implements PluginExecutor {
|
||||
|
||||
@Override
|
||||
public Mono<Object> execute(@NonNull Object connection,
|
||||
public Mono<Object> execute(Object connection,
|
||||
DatasourceConfiguration datasourceConfiguration,
|
||||
ActionConfiguration actionConfiguration) {
|
||||
|
||||
Connection conn = (Connection) connection;
|
||||
|
||||
try {
|
||||
if (conn == null || conn.isClosed() || !conn.isValid(VALIDITY_CHECK_TIMEOUT)) {
|
||||
log.info("Encountered stale connection in Postgres plugin. Reporting back.");
|
||||
throw new StaleConnectionException();
|
||||
}
|
||||
} catch (SQLException error) {
|
||||
// This exception is thrown only when the timeout to `isValid` is negative. Since, that's not the case,
|
||||
// here, this should never happen.
|
||||
log.error("Error checking validity of Postgres connection.", error);
|
||||
}
|
||||
|
||||
String query = actionConfiguration.getBody();
|
||||
|
||||
if (query == null) {
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import com.appsmith.external.models.Property;
|
|||
import com.appsmith.external.models.Provider;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginError;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginException;
|
||||
import com.appsmith.external.pluginExceptions.StaleConnectionException;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import com.appsmith.server.acl.AclPermission;
|
||||
import com.appsmith.server.acl.PolicyGenerator;
|
||||
|
|
@ -412,16 +413,34 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
|
|||
action.getPageId(), action.getId(), action.getName(), datasourceConfiguration,
|
||||
actionConfiguration);
|
||||
|
||||
return datasourceContextService
|
||||
.getDatasourceContext(datasource)
|
||||
//Now that we have the context (connection details, execute the action
|
||||
.flatMap(resourceContext -> pluginExecutor.execute(
|
||||
resourceContext.getConnection(),
|
||||
datasourceConfiguration,
|
||||
actionConfiguration))
|
||||
Mono<Object> executionMono = Mono.just(datasource)
|
||||
.flatMap(datasourceContextService::getDatasourceContext)
|
||||
// Now that we have the context (connection details), execute the action.
|
||||
.flatMap(
|
||||
resourceContext -> pluginExecutor.execute(
|
||||
resourceContext.getConnection(),
|
||||
datasourceConfiguration,
|
||||
actionConfiguration
|
||||
)
|
||||
);
|
||||
|
||||
return executionMono
|
||||
.onErrorResume(StaleConnectionException.class, error -> {
|
||||
log.info("Looks like the connection is stale. Retrying with a fresh context.");
|
||||
return datasourceContextService
|
||||
.deleteDatasourceContext(datasource.getId())
|
||||
.then(executionMono);
|
||||
})
|
||||
.timeout(Duration.ofMillis(timeoutDuration))
|
||||
.onErrorMap(
|
||||
StaleConnectionException.class,
|
||||
error -> new AppsmithPluginException(
|
||||
AppsmithPluginError.PLUGIN_ERROR,
|
||||
"Secondary stale connection error."
|
||||
)
|
||||
)
|
||||
.onErrorResume(e -> {
|
||||
log.debug("In the action execution error mode. Cause: {}", e.getMessage());
|
||||
log.debug("In the action execution error mode.", e);
|
||||
ActionExecutionResult result = new ActionExecutionResult();
|
||||
result.setBody(e.getMessage());
|
||||
result.setIsExecutionSuccess(false);
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ public class DatasourceContextServiceImpl implements DatasourceContextService {
|
|||
return Mono.just(datasourceContextMap.get(datasourceId));
|
||||
}
|
||||
|
||||
log.debug("Datasource context doesn't exist. Creating connection");
|
||||
log.debug("Datasource context doesn't exist. Creating connection.");
|
||||
|
||||
Mono<Datasource> datasourceMono;
|
||||
|
||||
|
|
@ -124,26 +124,24 @@ public class DatasourceContextServiceImpl implements DatasourceContextService {
|
|||
|
||||
@Override
|
||||
public Mono<DatasourceContext> deleteDatasourceContext(String datasourceId) {
|
||||
|
||||
DatasourceContext datasourceContext = datasourceContextMap.get(datasourceId);
|
||||
if (datasourceContext == null) {
|
||||
//No resource context exists for this resource. Return void;
|
||||
// No resource context exists for this resource. Return void.
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
Mono<Datasource> datasourceMono = datasourceService.findById(datasourceId, EXECUTE_DATASOURCES);
|
||||
|
||||
Mono<Plugin> pluginMono = datasourceMono
|
||||
.flatMap(datasource -> pluginService.findById(datasource.getPluginId()));
|
||||
|
||||
//Datasource Context has not been created for this resource on this machine. Create one now.
|
||||
Mono<PluginExecutor> pluginExecutorMono = pluginExecutorHelper.getPluginExecutor(pluginMono);
|
||||
|
||||
return Mono.zip(datasourceMono, pluginExecutorMono, ((datasource, pluginExecutor) -> {
|
||||
pluginExecutor.datasourceDestroy(datasourceContext.getConnection());
|
||||
datasourceContextMap.remove(datasourceId);
|
||||
return datasourceContext;
|
||||
}));
|
||||
return datasourceService
|
||||
.findById(datasourceId, EXECUTE_DATASOURCES)
|
||||
.zipWhen(datasource1 ->
|
||||
pluginExecutorHelper.getPluginExecutor(pluginService.findById(datasource1.getPluginId()))
|
||||
)
|
||||
.map(tuple -> {
|
||||
final Datasource datasource = tuple.getT1();
|
||||
final PluginExecutor pluginExecutor = tuple.getT2();
|
||||
log.info("Clearing datasource context for datasource ID {}.", datasource.getId());
|
||||
pluginExecutor.datasourceDestroy(datasourceContext.getConnection());
|
||||
return datasourceContextMap.remove(datasourceId);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import com.appsmith.external.models.Policy;
|
|||
import com.appsmith.external.models.Property;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginError;
|
||||
import com.appsmith.external.pluginExceptions.AppsmithPluginException;
|
||||
import com.appsmith.external.pluginExceptions.StaleConnectionException;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import com.appsmith.server.acl.AclPermission;
|
||||
import com.appsmith.server.constants.FieldName;
|
||||
|
|
@ -517,6 +518,37 @@ public class ActionServiceTest {
|
|||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
@WithUserDetails(value = "api_user")
|
||||
public void checkRecoveryFromStaleConnections() {
|
||||
ActionExecutionResult mockResult = new ActionExecutionResult();
|
||||
mockResult.setIsExecutionSuccess(true);
|
||||
mockResult.setBody("response-body");
|
||||
|
||||
Action action = new Action();
|
||||
ActionConfiguration actionConfiguration = new ActionConfiguration();
|
||||
actionConfiguration.setBody("select * from users");
|
||||
action.setActionConfiguration(actionConfiguration);
|
||||
|
||||
ExecuteActionDTO executeActionDTO = new ExecuteActionDTO();
|
||||
executeActionDTO.setAction(action);
|
||||
|
||||
Mockito.when(pluginExecutorHelper.getPluginExecutor(Mockito.any())).thenReturn(Mono.just(pluginExecutor));
|
||||
Mockito.when(pluginExecutor.execute(Mockito.any(), Mockito.any(), Mockito.any()))
|
||||
.thenThrow(new StaleConnectionException())
|
||||
.thenReturn(Mono.just(mockResult));
|
||||
Mockito.when(pluginExecutor.datasourceCreate(Mockito.any())).thenReturn(Mono.empty());
|
||||
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = actionService.executeAction(executeActionDTO);
|
||||
|
||||
StepVerifier.create(actionExecutionResultMono)
|
||||
.assertNext(result -> {
|
||||
assertThat(result).isNotNull();
|
||||
assertThat(result.getBody()).isEqualTo(mockResult.getBody());
|
||||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
private void executeAndAssertAction(ExecuteActionDTO executeActionDTO, ActionConfiguration actionConfiguration, ActionExecutionResult mockResult) {
|
||||
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = executeAction(executeActionDTO, actionConfiguration, mockResult);
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user