feat: add basic data structures for MySQL SSH tunnel (#26349)

This commit is contained in:
Sumit Kumar 2023-08-25 15:28:16 +05:30 committed by GitHub
parent 1f5223c01d
commit 3173dd4e00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 219 additions and 80 deletions

View File

@ -18,6 +18,12 @@
</properties>
<dependencies>
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>sshj</artifactId>
<version>0.35.0</version>
</dependency>
<!-- Actual Junit5 implementation. Will transitively include junit-jupiter-api -->
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -33,4 +33,8 @@ public interface PluginConstants {
public static final String SMTP_PLUGIN_NAME = "Smtp";
public static final String SNOWFLAKE_PLUGIN_NAME = "Snowflake";
}
interface HostName {
public static final String LOCALHOST = "localhost";
}
}

View File

@ -0,0 +1,17 @@
package com.appsmith.external.helpers;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import net.schmizz.sshj.SSHClient;
import java.net.ServerSocket;
@Getter
@Setter
@AllArgsConstructor
public class SSHTunnelContext {
ServerSocket serverSocket;
Thread thread;
SSHClient sshClient;
}

View File

@ -0,0 +1,74 @@
package com.appsmith.external.helpers;
import lombok.NoArgsConstructor;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.connection.channel.direct.Parameters;
import net.schmizz.sshj.userauth.keyprovider.PKCS8KeyFile;
import net.schmizz.sshj.userauth.method.AuthPublickey;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import static com.appsmith.external.constants.PluginConstants.HostName.LOCALHOST;
/**
* This class is meant to provide methods that should help with the creation and management of SSH tunnel by
* various plugins.
*/
@NoArgsConstructor
public class SSHUtils {
static Object monitor = new Object(); // monitor object to be used for synchronization lock
public static final int RANDOM_FREE_PORT_NUM = 0; // using port 0 indicates `bind` method to acquire random free
// port
/**
* Create SSH tunnel and return the relevant connection context.
*
* @param sshHost : host address of remote SSH server
* @param sshPort : port number for remote SSH server
* @param username : login username for the remote SSH server account
* @param key : client private key
* @param dbHost : host address on which the DB is hosted relative to the SSH host
* @param dbPort : port address on which the DB is hosted relative to the SSH host
* @return
* @throws IOException
*/
public static SSHTunnelContext createSSHTunnel(
String sshHost, int sshPort, String username, String key, String dbHost, int dbPort) throws IOException {
final SSHClient client = new SSHClient();
client.connect(sshHost, sshPort);
PKCS8KeyFile keyFile = new PKCS8KeyFile();
keyFile.init(key, null);
client.auth(username, new AuthPublickey(keyFile));
final ServerSocket serverSocket = new ServerSocket();
final Parameters params = new Parameters(LOCALHOST, RANDOM_FREE_PORT_NUM, dbHost, dbPort);
serverSocket.setReuseAddress(true);
/**
* This method has been synchronized via a lock because it is not documented if the `bind` method is thread
* safe or not. In particular, this is a concern because we are acquiring a free port address via the `bind`
* method. Hypothetically, there could be a race condition when acquiring the free port.
*/
synchronized (monitor) {
serverSocket.bind(new InetSocketAddress(params.getLocalHost(), params.getLocalPort()));
}
Runnable serverTask = new Runnable() {
@Override
public void run() {
try {
client.newLocalPortForwarder(params, serverSocket).listen();
} catch (IOException e) {
e.printStackTrace();
}
}
};
Thread serverThread = new Thread(serverTask);
serverThread.start();
return new SSHTunnelContext(serverSocket, serverThread, client);
}
}

View File

@ -0,0 +1,14 @@
package com.appsmith.external.models;
import com.appsmith.external.helpers.SSHTunnelContext;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
public class ConnectionContext<C> {
C connection;
SSHTunnelContext sshTunnelContext;
}

View File

@ -10,6 +10,7 @@ import com.appsmith.external.helpers.MustacheHelper;
import com.appsmith.external.models.ActionConfiguration;
import com.appsmith.external.models.ActionExecutionRequest;
import com.appsmith.external.models.ActionExecutionResult;
import com.appsmith.external.models.ConnectionContext;
import com.appsmith.external.models.DatasourceConfiguration;
import com.appsmith.external.models.DatasourceStructure;
import com.appsmith.external.models.DatasourceStructure.Template;
@ -151,7 +152,8 @@ public class MySqlPlugin extends BasePlugin {
}
@Extension
public static class MySqlPluginExecutor implements PluginExecutor<ConnectionPool>, SmartSubstitutionInterface {
public static class MySqlPluginExecutor
implements PluginExecutor<ConnectionContext<ConnectionPool>>, SmartSubstitutionInterface {
private static final int PREPARED_STATEMENT_INDEX = 0;
private final Scheduler scheduler = Schedulers.boundedElastic();
@ -162,7 +164,7 @@ public class MySqlPlugin extends BasePlugin {
* supported by PreparedStatement. In case of PreparedStatement turned off, the action and datasource configurations are
* prepared (binding replacement) using PluginExecutor.variableSubstitution
*
* @param connection : This is the connection that is established to the data source. This connection is according
* @param connectionContext : This is the connection that is established to the data source. This connection is according
* to the parameters in Datasource Configuration
* @param executeActionDTO : This is the data structure sent by the client during execute. This contains the params
* which would be used for substitution
@ -172,7 +174,7 @@ public class MySqlPlugin extends BasePlugin {
*/
@Override
public Mono<ActionExecutionResult> executeParameterized(
ConnectionPool connection,
ConnectionContext<ConnectionPool> connectionContext,
ExecuteActionDTO executeActionDTO,
DatasourceConfiguration datasourceConfiguration,
ActionConfiguration actionConfiguration) {
@ -219,7 +221,7 @@ public class MySqlPlugin extends BasePlugin {
// In case of non prepared statement, simply do binding replacement and execute
if (FALSE.equals(isPreparedStatement)) {
prepareConfigurationsForExecution(executeActionDTO, actionConfiguration, datasourceConfiguration);
return executeCommon(connection, actionConfiguration, FALSE, null, null, requestData);
return executeCommon(connectionContext, actionConfiguration, FALSE, null, null, requestData);
}
// This has to be executed as Prepared Statement
@ -230,7 +232,7 @@ public class MySqlPlugin extends BasePlugin {
// Set the query with bindings extracted and replaced with '?' back in config
actionConfiguration.setBody(updatedQuery);
return executeCommon(
connection, actionConfiguration, TRUE, mustacheKeysInOrder, executeActionDTO, requestData);
connectionContext, actionConfiguration, TRUE, mustacheKeysInOrder, executeActionDTO, requestData);
}
@Override
@ -249,12 +251,13 @@ public class MySqlPlugin extends BasePlugin {
}
public Mono<ActionExecutionResult> executeCommon(
ConnectionPool connectionPool,
ConnectionContext<ConnectionPool> connectionContext,
ActionConfiguration actionConfiguration,
Boolean preparedStatement,
List<MustacheBindingToken> mustacheValuesInOrder,
ExecuteActionDTO executeActionDTO,
Map<String, Object> requestData) {
ConnectionPool connectionPool = connectionContext.getConnection();
String query = actionConfiguration.getBody();
/**
@ -447,7 +450,8 @@ public class MySqlPlugin extends BasePlugin {
}
@Override
public Mono<DatasourceTestResult> testDatasource(ConnectionPool pool) {
public Mono<DatasourceTestResult> testDatasource(ConnectionContext<ConnectionPool> connectionContext) {
ConnectionPool pool = connectionContext.getConnection();
return Mono.just(pool)
.flatMap(p -> p.create())
.flatMap(conn -> Mono.from(conn.close()))
@ -586,7 +590,7 @@ public class MySqlPlugin extends BasePlugin {
@Override
public Mono<ActionExecutionResult> execute(
ConnectionPool connection,
ConnectionContext connectionContext,
DatasourceConfiguration datasourceConfiguration,
ActionConfiguration actionConfiguration) {
// Unused function
@ -597,18 +601,21 @@ public class MySqlPlugin extends BasePlugin {
}
@Override
public Mono<ConnectionPool> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
public Mono<ConnectionContext<ConnectionPool>> datasourceCreate(
DatasourceConfiguration datasourceConfiguration) {
ConnectionPool pool = null;
try {
pool = getNewConnectionPool(datasourceConfiguration);
} catch (AppsmithPluginException e) {
return Mono.error(e);
}
return Mono.just(pool);
return Mono.just(new ConnectionContext(pool, null));
}
@Override
public void datasourceDestroy(ConnectionPool connectionPool) {
public void datasourceDestroy(ConnectionContext<ConnectionPool> connectionContext) {
ConnectionPool connectionPool = connectionContext.getConnection();
if (connectionPool != null) {
connectionPool
.disposeLater()
@ -628,11 +635,12 @@ public class MySqlPlugin extends BasePlugin {
@Override
public Mono<DatasourceStructure> getStructure(
ConnectionPool connectionPool, DatasourceConfiguration datasourceConfiguration) {
ConnectionContext<ConnectionPool> connectionContext, DatasourceConfiguration datasourceConfiguration) {
final DatasourceStructure structure = new DatasourceStructure();
final Map<String, DatasourceStructure.Table> tablesByName = new LinkedHashMap<>();
final Map<String, DatasourceStructure.Key> keyRegistry = new HashMap<>();
ConnectionPool connectionPool = connectionContext.getConnection();
return Mono.usingWhen(
connectionPool.create(),
connection -> Mono.from(connection.validate(ValidationDepth.REMOTE))

View File

@ -6,6 +6,7 @@ import com.appsmith.external.exceptions.pluginExceptions.AppsmithPluginException
import com.appsmith.external.exceptions.pluginExceptions.StaleConnectionException;
import com.appsmith.external.models.ActionConfiguration;
import com.appsmith.external.models.ActionExecutionResult;
import com.appsmith.external.models.ConnectionContext;
import com.appsmith.external.models.DBAuth;
import com.appsmith.external.models.DatasourceConfiguration;
import com.appsmith.external.models.DatasourceStructure;
@ -193,9 +194,9 @@ public class MySqlPluginTest {
@Test
public void testConnectMySQLContainer() {
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
StepVerifier.create(dsConnectionMono)
StepVerifier.create(connectionContextMono)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
}
@ -206,10 +207,10 @@ public class MySqlPluginTest {
dsConfig = createDatasourceConfiguration();
((DBAuth) dsConfig.getAuthentication()).setPassword("");
Mono<ConnectionPool> connectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<DatasourceTestResult> datasourceTestResultMono =
connectionMono.flatMap(connectionPool -> pluginExecutor.testDatasource(connectionPool));
connectionContextMono.flatMap(connectionPool -> pluginExecutor.testDatasource(connectionPool));
String gateway = mySQLContainer.getContainerInfo().getNetworkSettings().getGateway();
String expectedErrorMessage = new StringBuilder("Access denied for user 'mysql'@'")
@ -230,9 +231,9 @@ public class MySqlPluginTest {
final DatasourceConfiguration dsConfig = createDatasourceConfigForContainerWithInvalidTZ();
dsConfig.setProperties(List.of(new Property("serverTimezone", "UTC")));
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
StepVerifier.create(dsConnectionMono)
StepVerifier.create(connectionContextMono)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
}
@ -316,9 +317,9 @@ public class MySqlPluginTest {
Set<String> output = pluginExecutor.validateDatasource(dsConfig);
assertTrue(output.isEmpty());
// test connect
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
StepVerifier.create(dsConnectionMono)
StepVerifier.create(connectionContextMono)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
@ -342,9 +343,9 @@ public class MySqlPluginTest {
Set<String> output = pluginExecutor.validateDatasource(dsConfig);
assertTrue(output.isEmpty());
// test connect
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
StepVerifier.create(dsConnectionMono)
StepVerifier.create(connectionContextMono)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
@ -357,12 +358,12 @@ public class MySqlPluginTest {
@Test
public void testExecute() {
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("show databases");
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -377,12 +378,12 @@ public class MySqlPluginTest {
@Test
public void testExecuteWithFormattingWithShowCmd() {
dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("show\n\tdatabases");
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -399,12 +400,12 @@ public class MySqlPluginTest {
@Test
public void testExecuteWithFormattingWithSelectCmd() {
dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("select\n\t*\nfrom\nusers where id=1");
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -438,12 +439,12 @@ public class MySqlPluginTest {
@Test
public void testExecuteWithLongRunningQuery() {
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT SLEEP(20);");
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -459,11 +460,11 @@ public class MySqlPluginTest {
public void testStaleConnectionCheck() {
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("show databases");
ConnectionPool connectionPool =
ConnectionContext<ConnectionPool> connectionContext =
pluginExecutor.datasourceCreate(dsConfig).block();
Flux<ActionExecutionResult> resultFlux = Mono.from(connectionPool.disposeLater())
Flux<ActionExecutionResult> resultFlux = Mono.from((connectionContext.getConnection()).disposeLater())
.thenMany(pluginExecutor.executeParameterized(
connectionPool, new ExecuteActionDTO(), dsConfig, actionConfiguration));
connectionContext, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(resultFlux)
.expectErrorMatches(throwable -> throwable instanceof StaleConnectionException)
@ -519,12 +520,12 @@ public class MySqlPluginTest {
@Test
public void testAliasColumnNames() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT id as user_id FROM users WHERE id = 1");
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(conn ->
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -545,7 +546,7 @@ public class MySqlPluginTest {
@Test
public void testPreparedStatementErrorWithIsKeyword() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
/**
@ -573,7 +574,7 @@ public class MySqlPluginTest {
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono).verifyErrorSatisfies(error -> {
@ -595,7 +596,7 @@ public class MySqlPluginTest {
.blockLast(); // wait until completion of all the queries
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
/**
@ -635,7 +636,7 @@ public class MySqlPluginTest {
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -671,7 +672,7 @@ public class MySqlPluginTest {
.blockLast(); // wait until completion of all the queries
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT id FROM test_boolean_type WHERE c_boolean={{binding1}};");
@ -689,7 +690,7 @@ public class MySqlPluginTest {
params.add(param1);
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -711,7 +712,7 @@ public class MySqlPluginTest {
@Test
public void testExecuteWithPreparedStatement() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT id FROM users WHERE id = {{binding1}} limit 1 offset {{binding2}};");
@ -734,7 +735,7 @@ public class MySqlPluginTest {
params.add(param2);
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -792,12 +793,12 @@ public class MySqlPluginTest {
@Test
public void testExecuteDataTypes() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT * FROM users WHERE id = 1");
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(conn ->
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -931,10 +932,10 @@ public class MySqlPluginTest {
}
private void testExecute(String query, String expectedResult) {
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody(query);
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -1090,8 +1091,9 @@ public class MySqlPluginTest {
DatasourceConfiguration datasourceConfiguration = createDatasourceConfiguration();
datasourceConfiguration.getConnection().getSsl().setAuthType(SSLDetails.AuthType.DISABLED);
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(datasourceConfiguration);
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<ConnectionContext<ConnectionPool>> connectionContextMono =
pluginExecutor.datasourceCreate(datasourceConfiguration);
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -1112,8 +1114,9 @@ public class MySqlPluginTest {
DatasourceConfiguration datasourceConfiguration = createDatasourceConfiguration();
datasourceConfiguration.getConnection().getSsl().setAuthType(SSLDetails.AuthType.REQUIRED);
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(datasourceConfiguration);
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<ConnectionContext<ConnectionPool>> connectionContextMono =
pluginExecutor.datasourceCreate(datasourceConfiguration);
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -1136,8 +1139,9 @@ public class MySqlPluginTest {
DatasourceConfiguration datasourceConfiguration = createDatasourceConfiguration();
datasourceConfiguration.getConnection().getSsl().setAuthType(SSLDetails.AuthType.DEFAULT);
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(datasourceConfiguration);
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<ConnectionContext<ConnectionPool>> connectionContextMono =
pluginExecutor.datasourceCreate(datasourceConfiguration);
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -1154,12 +1158,12 @@ public class MySqlPluginTest {
@Test
public void testDuplicateColumnNames() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT id, username as id, password, email as password FROM users WHERE id = 1");
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(conn ->
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -1191,12 +1195,12 @@ public class MySqlPluginTest {
@Test
public void testExecuteDescribeTableCmd() {
dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("describe users");
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -1214,12 +1218,12 @@ public class MySqlPluginTest {
@Test
public void testExecuteDescTableCmd() {
dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("desc users");
Mono<Object> executeMono = dsConnectionMono.flatMap(conn ->
Mono<Object> executeMono = connectionContextMono.flatMap(conn ->
pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(obj -> {
@ -1239,7 +1243,7 @@ public class MySqlPluginTest {
pluginExecutor = spy(new MySqlPlugin.MySqlPluginExecutor());
doReturn(false).when(pluginExecutor).isIsOperatorUsed(any());
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT * from (\n" + "\tselect 'Appsmith' as company_name, true as open_source\n"
@ -1263,7 +1267,7 @@ public class MySqlPluginTest {
params.add(param1);
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -1286,7 +1290,7 @@ public class MySqlPluginTest {
@Test
public void testNullAsStringWithPreparedStatement() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT * from (\n" + "\tselect 'Appsmith' as company_name, true as open_source\n"
@ -1311,7 +1315,7 @@ public class MySqlPluginTest {
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -1334,7 +1338,7 @@ public class MySqlPluginTest {
@Test
public void testNumericValuesHavingLeadingZeroWithPreparedStatement() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT {{binding1}} as numeric_string;");
@ -1352,7 +1356,7 @@ public class MySqlPluginTest {
params.add(param1);
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -1376,7 +1380,7 @@ public class MySqlPluginTest {
@Test
public void testLongValueWithPreparedStatement() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
Mono<ConnectionContext<ConnectionPool>> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig);
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("select id from users LIMIT {{binding1}}");
@ -1394,7 +1398,7 @@ public class MySqlPluginTest {
params.add(param1);
executeActionDTO.setParams(params);
Mono<ActionExecutionResult> executeMono = dsConnectionMono.flatMap(
Mono<ActionExecutionResult> executeMono = connectionContextMono.flatMap(
conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
@ -1417,18 +1421,19 @@ public class MySqlPluginTest {
@Test
public void testDatasourceDestroy() {
dsConfig = createDatasourceConfiguration();
Mono<ConnectionPool> connPoolMonoCache =
Mono<ConnectionContext<ConnectionPool>> connectionContextMonoCache =
pluginExecutor.datasourceCreate(dsConfig).cache();
Mono<DatasourceTestResult> testConnResultMono =
connPoolMonoCache.flatMap(conn -> pluginExecutor.testDatasource(conn));
Mono<Tuple2<ConnectionPool, DatasourceTestResult>> zipMono = zip(connPoolMonoCache, testConnResultMono);
connectionContextMonoCache.flatMap(conn -> pluginExecutor.testDatasource(conn));
Mono<Tuple2<ConnectionContext<ConnectionPool>, DatasourceTestResult>> zipMono =
zip(connectionContextMonoCache, testConnResultMono);
StepVerifier.create(zipMono)
.assertNext(tuple2 -> {
DatasourceTestResult testDsResult = tuple2.getT2();
assertEquals(0, testDsResult.getInvalids().size());
ConnectionPool conn = tuple2.getT1();
pluginExecutor.datasourceDestroy(conn);
ConnectionContext<ConnectionPool> connectionContext = tuple2.getT1();
pluginExecutor.datasourceDestroy(connectionContext);
try {
/**
* We need to wait a few seconds before the next check because
@ -1440,7 +1445,7 @@ public class MySqlPluginTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertTrue(conn.isDisposed());
assertTrue((connectionContext.getConnection()).isDisposed());
})
.verifyComplete();
}
@ -1450,7 +1455,7 @@ public class MySqlPluginTest {
MySqlPlugin.MySqlPluginExecutor spyPlugin = spy(pluginExecutor);
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
ConnectionPool dsConnectionMono =
ConnectionContext<ConnectionPool> connectionContextMono =
pluginExecutor.datasourceCreate(dsConfig).block();
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT id FROM users WHERE -- IS operator\nid = 1 limit 1;");
@ -1461,7 +1466,7 @@ public class MySqlPluginTest {
HashMap<String, Object> requestData = new HashMap<>();
Mono<ActionExecutionResult> resultMono =
spyPlugin.executeCommon(dsConnectionMono, actionConfiguration, TRUE, null, null, requestData);
spyPlugin.executeCommon(connectionContextMono, actionConfiguration, TRUE, null, null, requestData);
StepVerifier.create(resultMono)
.assertNext(result -> {

View File

@ -4,6 +4,7 @@ import com.appsmith.external.dtos.ExecuteActionDTO;
import com.appsmith.external.exceptions.pluginExceptions.StaleConnectionException;
import com.appsmith.external.models.ActionConfiguration;
import com.appsmith.external.models.ActionExecutionResult;
import com.appsmith.external.models.ConnectionContext;
import com.external.utils.MySqlDatasourceUtils;
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.spi.Connection;
@ -35,8 +36,10 @@ public class MySqlStaleConnectionErrorMessageTest {
ConnectionPool mockConnectionPool = mock(ConnectionPool.class);
String expectedErrorMessage = "Timeout exception from MockConnectionPool";
when(mockConnectionPool.create()).thenReturn(Mono.error(new TimeoutException(expectedErrorMessage)));
ConnectionContext<ConnectionPool> connectionContext =
new ConnectionContext<ConnectionPool>(mockConnectionPool, null);
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.executeCommon(
mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
StepVerifier.create(actionExecutionResultMono)
.expectErrorSatisfies(error -> {
assertTrue(error instanceof StaleConnectionException);
@ -52,8 +55,10 @@ public class MySqlStaleConnectionErrorMessageTest {
ConnectionPool mockConnectionPool = mock(ConnectionPool.class);
String expectedErrorMessage = "Timeout exception from MockConnectionPool";
when(mockConnectionPool.create()).thenReturn(Mono.error(new PoolShutdownException(expectedErrorMessage)));
ConnectionContext<ConnectionPool> connectionContext =
new ConnectionContext<ConnectionPool>(mockConnectionPool, null);
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.executeCommon(
mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
StepVerifier.create(actionExecutionResultMono)
.expectErrorSatisfies(error -> {
assertTrue(error instanceof StaleConnectionException);
@ -69,8 +74,10 @@ public class MySqlStaleConnectionErrorMessageTest {
ConnectionPool mockConnectionPool = mock(ConnectionPool.class);
String expectedErrorMessage = "Timeout exception from MockConnectionPool";
when(mockConnectionPool.create()).thenReturn(Mono.error(new IllegalStateException(expectedErrorMessage)));
ConnectionContext<ConnectionPool> connectionContext =
new ConnectionContext<ConnectionPool>(mockConnectionPool, null);
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.executeCommon(
mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
StepVerifier.create(actionExecutionResultMono)
.expectErrorSatisfies(error -> {
assertTrue(error instanceof StaleConnectionException);
@ -87,8 +94,10 @@ public class MySqlStaleConnectionErrorMessageTest {
String expectedErrorMessage = "Timeout exception from MockConnectionPool";
when(mockConnectionPool.create())
.thenReturn(Mono.error(new R2dbcNonTransientResourceException(expectedErrorMessage)));
ConnectionContext<ConnectionPool> connectionContext =
new ConnectionContext<ConnectionPool>(mockConnectionPool, null);
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.executeCommon(
mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
StepVerifier.create(actionExecutionResultMono)
.expectErrorSatisfies(error -> {
assertTrue(error instanceof StaleConnectionException);
@ -106,8 +115,10 @@ public class MySqlStaleConnectionErrorMessageTest {
when(mockConnectionPool.create()).thenReturn(Mono.just(mockConnection));
when(mockConnection.validate(ValidationDepth.LOCAL)).thenReturn(Mono.just(false));
when(mockConnection.close()).thenReturn(Mono.empty());
ConnectionContext<ConnectionPool> connectionContext =
new ConnectionContext<ConnectionPool>(mockConnectionPool, null);
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.executeCommon(
mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>());
StepVerifier.create(actionExecutionResultMono)
.expectErrorSatisfies(error -> {
assertTrue(error instanceof StaleConnectionException);