diff --git a/app/server/appsmith-interfaces/pom.xml b/app/server/appsmith-interfaces/pom.xml index 34561dd3a9..24305c1191 100644 --- a/app/server/appsmith-interfaces/pom.xml +++ b/app/server/appsmith-interfaces/pom.xml @@ -18,6 +18,12 @@ + + com.hierynomus + sshj + 0.35.0 + + org.junit.jupiter diff --git a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java index 4609cfee8c..3d12fee22c 100644 --- a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java +++ b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/constants/PluginConstants.java @@ -33,4 +33,8 @@ public interface PluginConstants { public static final String SMTP_PLUGIN_NAME = "Smtp"; public static final String SNOWFLAKE_PLUGIN_NAME = "Snowflake"; } + + interface HostName { + public static final String LOCALHOST = "localhost"; + } } diff --git a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/helpers/SSHTunnelContext.java b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/helpers/SSHTunnelContext.java new file mode 100644 index 0000000000..a2c41718f5 --- /dev/null +++ b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/helpers/SSHTunnelContext.java @@ -0,0 +1,17 @@ +package com.appsmith.external.helpers; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import net.schmizz.sshj.SSHClient; + +import java.net.ServerSocket; + +@Getter +@Setter +@AllArgsConstructor +public class SSHTunnelContext { + ServerSocket serverSocket; + Thread thread; + SSHClient sshClient; +} diff --git a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/helpers/SSHUtils.java b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/helpers/SSHUtils.java new file mode 100644 index 0000000000..fe96229ea2 --- /dev/null +++ b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/helpers/SSHUtils.java @@ -0,0 +1,74 @@ +package com.appsmith.external.helpers; + +import lombok.NoArgsConstructor; +import net.schmizz.sshj.SSHClient; +import net.schmizz.sshj.connection.channel.direct.Parameters; +import net.schmizz.sshj.userauth.keyprovider.PKCS8KeyFile; +import net.schmizz.sshj.userauth.method.AuthPublickey; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; + +import static com.appsmith.external.constants.PluginConstants.HostName.LOCALHOST; + +/** + * This class is meant to provide methods that should help with the creation and management of SSH tunnel by + * various plugins. + */ +@NoArgsConstructor +public class SSHUtils { + static Object monitor = new Object(); // monitor object to be used for synchronization lock + public static final int RANDOM_FREE_PORT_NUM = 0; // using port 0 indicates `bind` method to acquire random free + // port + + /** + * Create SSH tunnel and return the relevant connection context. + * + * @param sshHost : host address of remote SSH server + * @param sshPort : port number for remote SSH server + * @param username : login username for the remote SSH server account + * @param key : client private key + * @param dbHost : host address on which the DB is hosted relative to the SSH host + * @param dbPort : port address on which the DB is hosted relative to the SSH host + * @return + * @throws IOException + */ + public static SSHTunnelContext createSSHTunnel( + String sshHost, int sshPort, String username, String key, String dbHost, int dbPort) throws IOException { + final SSHClient client = new SSHClient(); + + client.connect(sshHost, sshPort); + PKCS8KeyFile keyFile = new PKCS8KeyFile(); + keyFile.init(key, null); + client.auth(username, new AuthPublickey(keyFile)); + + final ServerSocket serverSocket = new ServerSocket(); + final Parameters params = new Parameters(LOCALHOST, RANDOM_FREE_PORT_NUM, dbHost, dbPort); + serverSocket.setReuseAddress(true); + + /** + * This method has been synchronized via a lock because it is not documented if the `bind` method is thread + * safe or not. In particular, this is a concern because we are acquiring a free port address via the `bind` + * method. Hypothetically, there could be a race condition when acquiring the free port. + */ + synchronized (monitor) { + serverSocket.bind(new InetSocketAddress(params.getLocalHost(), params.getLocalPort())); + } + + Runnable serverTask = new Runnable() { + @Override + public void run() { + try { + client.newLocalPortForwarder(params, serverSocket).listen(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + Thread serverThread = new Thread(serverTask); + serverThread.start(); + + return new SSHTunnelContext(serverSocket, serverThread, client); + } +} diff --git a/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/models/ConnectionContext.java b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/models/ConnectionContext.java new file mode 100644 index 0000000000..c91f4aed28 --- /dev/null +++ b/app/server/appsmith-interfaces/src/main/java/com/appsmith/external/models/ConnectionContext.java @@ -0,0 +1,14 @@ +package com.appsmith.external.models; + +import com.appsmith.external.helpers.SSHTunnelContext; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor +public class ConnectionContext { + C connection; + SSHTunnelContext sshTunnelContext; +} diff --git a/app/server/appsmith-plugins/mysqlPlugin/src/main/java/com/external/plugins/MySqlPlugin.java b/app/server/appsmith-plugins/mysqlPlugin/src/main/java/com/external/plugins/MySqlPlugin.java index 1c7b82466e..e7749e9b74 100644 --- a/app/server/appsmith-plugins/mysqlPlugin/src/main/java/com/external/plugins/MySqlPlugin.java +++ b/app/server/appsmith-plugins/mysqlPlugin/src/main/java/com/external/plugins/MySqlPlugin.java @@ -10,6 +10,7 @@ import com.appsmith.external.helpers.MustacheHelper; import com.appsmith.external.models.ActionConfiguration; import com.appsmith.external.models.ActionExecutionRequest; import com.appsmith.external.models.ActionExecutionResult; +import com.appsmith.external.models.ConnectionContext; import com.appsmith.external.models.DatasourceConfiguration; import com.appsmith.external.models.DatasourceStructure; import com.appsmith.external.models.DatasourceStructure.Template; @@ -151,7 +152,8 @@ public class MySqlPlugin extends BasePlugin { } @Extension - public static class MySqlPluginExecutor implements PluginExecutor, SmartSubstitutionInterface { + public static class MySqlPluginExecutor + implements PluginExecutor>, SmartSubstitutionInterface { private static final int PREPARED_STATEMENT_INDEX = 0; private final Scheduler scheduler = Schedulers.boundedElastic(); @@ -162,7 +164,7 @@ public class MySqlPlugin extends BasePlugin { * supported by PreparedStatement. In case of PreparedStatement turned off, the action and datasource configurations are * prepared (binding replacement) using PluginExecutor.variableSubstitution * - * @param connection : This is the connection that is established to the data source. This connection is according + * @param connectionContext : This is the connection that is established to the data source. This connection is according * to the parameters in Datasource Configuration * @param executeActionDTO : This is the data structure sent by the client during execute. This contains the params * which would be used for substitution @@ -172,7 +174,7 @@ public class MySqlPlugin extends BasePlugin { */ @Override public Mono executeParameterized( - ConnectionPool connection, + ConnectionContext connectionContext, ExecuteActionDTO executeActionDTO, DatasourceConfiguration datasourceConfiguration, ActionConfiguration actionConfiguration) { @@ -219,7 +221,7 @@ public class MySqlPlugin extends BasePlugin { // In case of non prepared statement, simply do binding replacement and execute if (FALSE.equals(isPreparedStatement)) { prepareConfigurationsForExecution(executeActionDTO, actionConfiguration, datasourceConfiguration); - return executeCommon(connection, actionConfiguration, FALSE, null, null, requestData); + return executeCommon(connectionContext, actionConfiguration, FALSE, null, null, requestData); } // This has to be executed as Prepared Statement @@ -230,7 +232,7 @@ public class MySqlPlugin extends BasePlugin { // Set the query with bindings extracted and replaced with '?' back in config actionConfiguration.setBody(updatedQuery); return executeCommon( - connection, actionConfiguration, TRUE, mustacheKeysInOrder, executeActionDTO, requestData); + connectionContext, actionConfiguration, TRUE, mustacheKeysInOrder, executeActionDTO, requestData); } @Override @@ -249,12 +251,13 @@ public class MySqlPlugin extends BasePlugin { } public Mono executeCommon( - ConnectionPool connectionPool, + ConnectionContext connectionContext, ActionConfiguration actionConfiguration, Boolean preparedStatement, List mustacheValuesInOrder, ExecuteActionDTO executeActionDTO, Map requestData) { + ConnectionPool connectionPool = connectionContext.getConnection(); String query = actionConfiguration.getBody(); /** @@ -447,7 +450,8 @@ public class MySqlPlugin extends BasePlugin { } @Override - public Mono testDatasource(ConnectionPool pool) { + public Mono testDatasource(ConnectionContext connectionContext) { + ConnectionPool pool = connectionContext.getConnection(); return Mono.just(pool) .flatMap(p -> p.create()) .flatMap(conn -> Mono.from(conn.close())) @@ -586,7 +590,7 @@ public class MySqlPlugin extends BasePlugin { @Override public Mono execute( - ConnectionPool connection, + ConnectionContext connectionContext, DatasourceConfiguration datasourceConfiguration, ActionConfiguration actionConfiguration) { // Unused function @@ -597,18 +601,21 @@ public class MySqlPlugin extends BasePlugin { } @Override - public Mono datasourceCreate(DatasourceConfiguration datasourceConfiguration) { + public Mono> datasourceCreate( + DatasourceConfiguration datasourceConfiguration) { ConnectionPool pool = null; try { pool = getNewConnectionPool(datasourceConfiguration); } catch (AppsmithPluginException e) { return Mono.error(e); } - return Mono.just(pool); + + return Mono.just(new ConnectionContext(pool, null)); } @Override - public void datasourceDestroy(ConnectionPool connectionPool) { + public void datasourceDestroy(ConnectionContext connectionContext) { + ConnectionPool connectionPool = connectionContext.getConnection(); if (connectionPool != null) { connectionPool .disposeLater() @@ -628,11 +635,12 @@ public class MySqlPlugin extends BasePlugin { @Override public Mono getStructure( - ConnectionPool connectionPool, DatasourceConfiguration datasourceConfiguration) { + ConnectionContext connectionContext, DatasourceConfiguration datasourceConfiguration) { final DatasourceStructure structure = new DatasourceStructure(); final Map tablesByName = new LinkedHashMap<>(); final Map keyRegistry = new HashMap<>(); + ConnectionPool connectionPool = connectionContext.getConnection(); return Mono.usingWhen( connectionPool.create(), connection -> Mono.from(connection.validate(ValidationDepth.REMOTE)) diff --git a/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlPluginTest.java b/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlPluginTest.java index 90065f179a..eac06a1548 100755 --- a/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlPluginTest.java +++ b/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlPluginTest.java @@ -6,6 +6,7 @@ import com.appsmith.external.exceptions.pluginExceptions.AppsmithPluginException import com.appsmith.external.exceptions.pluginExceptions.StaleConnectionException; import com.appsmith.external.models.ActionConfiguration; import com.appsmith.external.models.ActionExecutionResult; +import com.appsmith.external.models.ConnectionContext; import com.appsmith.external.models.DBAuth; import com.appsmith.external.models.DatasourceConfiguration; import com.appsmith.external.models.DatasourceStructure; @@ -193,9 +194,9 @@ public class MySqlPluginTest { @Test public void testConnectMySQLContainer() { - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); - StepVerifier.create(dsConnectionMono) + StepVerifier.create(connectionContextMono) .assertNext(Assertions::assertNotNull) .verifyComplete(); } @@ -206,10 +207,10 @@ public class MySqlPluginTest { dsConfig = createDatasourceConfiguration(); ((DBAuth) dsConfig.getAuthentication()).setPassword(""); - Mono connectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); Mono datasourceTestResultMono = - connectionMono.flatMap(connectionPool -> pluginExecutor.testDatasource(connectionPool)); + connectionContextMono.flatMap(connectionPool -> pluginExecutor.testDatasource(connectionPool)); String gateway = mySQLContainer.getContainerInfo().getNetworkSettings().getGateway(); String expectedErrorMessage = new StringBuilder("Access denied for user 'mysql'@'") @@ -230,9 +231,9 @@ public class MySqlPluginTest { final DatasourceConfiguration dsConfig = createDatasourceConfigForContainerWithInvalidTZ(); dsConfig.setProperties(List.of(new Property("serverTimezone", "UTC"))); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); - StepVerifier.create(dsConnectionMono) + StepVerifier.create(connectionContextMono) .assertNext(Assertions::assertNotNull) .verifyComplete(); } @@ -316,9 +317,9 @@ public class MySqlPluginTest { Set output = pluginExecutor.validateDatasource(dsConfig); assertTrue(output.isEmpty()); // test connect - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); - StepVerifier.create(dsConnectionMono) + StepVerifier.create(connectionContextMono) .assertNext(Assertions::assertNotNull) .verifyComplete(); @@ -342,9 +343,9 @@ public class MySqlPluginTest { Set output = pluginExecutor.validateDatasource(dsConfig); assertTrue(output.isEmpty()); // test connect - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); - StepVerifier.create(dsConnectionMono) + StepVerifier.create(connectionContextMono) .assertNext(Assertions::assertNotNull) .verifyComplete(); @@ -357,12 +358,12 @@ public class MySqlPluginTest { @Test public void testExecute() { - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("show databases"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -377,12 +378,12 @@ public class MySqlPluginTest { @Test public void testExecuteWithFormattingWithShowCmd() { dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("show\n\tdatabases"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -399,12 +400,12 @@ public class MySqlPluginTest { @Test public void testExecuteWithFormattingWithSelectCmd() { dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("select\n\t*\nfrom\nusers where id=1"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -438,12 +439,12 @@ public class MySqlPluginTest { @Test public void testExecuteWithLongRunningQuery() { - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT SLEEP(20);"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -459,11 +460,11 @@ public class MySqlPluginTest { public void testStaleConnectionCheck() { ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("show databases"); - ConnectionPool connectionPool = + ConnectionContext connectionContext = pluginExecutor.datasourceCreate(dsConfig).block(); - Flux resultFlux = Mono.from(connectionPool.disposeLater()) + Flux resultFlux = Mono.from((connectionContext.getConnection()).disposeLater()) .thenMany(pluginExecutor.executeParameterized( - connectionPool, new ExecuteActionDTO(), dsConfig, actionConfiguration)); + connectionContext, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(resultFlux) .expectErrorMatches(throwable -> throwable instanceof StaleConnectionException) @@ -519,12 +520,12 @@ public class MySqlPluginTest { @Test public void testAliasColumnNames() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id as user_id FROM users WHERE id = 1"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -545,7 +546,7 @@ public class MySqlPluginTest { @Test public void testPreparedStatementErrorWithIsKeyword() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); /** @@ -573,7 +574,7 @@ public class MySqlPluginTest { executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono).verifyErrorSatisfies(error -> { @@ -595,7 +596,7 @@ public class MySqlPluginTest { .blockLast(); // wait until completion of all the queries DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); /** @@ -635,7 +636,7 @@ public class MySqlPluginTest { executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -671,7 +672,7 @@ public class MySqlPluginTest { .blockLast(); // wait until completion of all the queries DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id FROM test_boolean_type WHERE c_boolean={{binding1}};"); @@ -689,7 +690,7 @@ public class MySqlPluginTest { params.add(param1); executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -711,7 +712,7 @@ public class MySqlPluginTest { @Test public void testExecuteWithPreparedStatement() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id FROM users WHERE id = {{binding1}} limit 1 offset {{binding2}};"); @@ -734,7 +735,7 @@ public class MySqlPluginTest { params.add(param2); executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -792,12 +793,12 @@ public class MySqlPluginTest { @Test public void testExecuteDataTypes() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT * FROM users WHERE id = 1"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -931,10 +932,10 @@ public class MySqlPluginTest { } private void testExecute(String query, String expectedResult) { - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody(query); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -1090,8 +1091,9 @@ public class MySqlPluginTest { DatasourceConfiguration datasourceConfiguration = createDatasourceConfiguration(); datasourceConfiguration.getConnection().getSsl().setAuthType(SSLDetails.AuthType.DISABLED); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(datasourceConfiguration); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono> connectionContextMono = + pluginExecutor.datasourceCreate(datasourceConfiguration); + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -1112,8 +1114,9 @@ public class MySqlPluginTest { DatasourceConfiguration datasourceConfiguration = createDatasourceConfiguration(); datasourceConfiguration.getConnection().getSsl().setAuthType(SSLDetails.AuthType.REQUIRED); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(datasourceConfiguration); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono> connectionContextMono = + pluginExecutor.datasourceCreate(datasourceConfiguration); + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -1136,8 +1139,9 @@ public class MySqlPluginTest { DatasourceConfiguration datasourceConfiguration = createDatasourceConfiguration(); datasourceConfiguration.getConnection().getSsl().setAuthType(SSLDetails.AuthType.DEFAULT); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(datasourceConfiguration); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono> connectionContextMono = + pluginExecutor.datasourceCreate(datasourceConfiguration); + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -1154,12 +1158,12 @@ public class MySqlPluginTest { @Test public void testDuplicateColumnNames() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id, username as id, password, email as password FROM users WHERE id = 1"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -1191,12 +1195,12 @@ public class MySqlPluginTest { @Test public void testExecuteDescribeTableCmd() { dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("describe users"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -1214,12 +1218,12 @@ public class MySqlPluginTest { @Test public void testExecuteDescTableCmd() { dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("desc users"); - Mono executeMono = dsConnectionMono.flatMap(conn -> + Mono executeMono = connectionContextMono.flatMap(conn -> pluginExecutor.executeParameterized(conn, new ExecuteActionDTO(), dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(obj -> { @@ -1239,7 +1243,7 @@ public class MySqlPluginTest { pluginExecutor = spy(new MySqlPlugin.MySqlPluginExecutor()); doReturn(false).when(pluginExecutor).isIsOperatorUsed(any()); DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT * from (\n" + "\tselect 'Appsmith' as company_name, true as open_source\n" @@ -1263,7 +1267,7 @@ public class MySqlPluginTest { params.add(param1); executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -1286,7 +1290,7 @@ public class MySqlPluginTest { @Test public void testNullAsStringWithPreparedStatement() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT * from (\n" + "\tselect 'Appsmith' as company_name, true as open_source\n" @@ -1311,7 +1315,7 @@ public class MySqlPluginTest { executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -1334,7 +1338,7 @@ public class MySqlPluginTest { @Test public void testNumericValuesHavingLeadingZeroWithPreparedStatement() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT {{binding1}} as numeric_string;"); @@ -1352,7 +1356,7 @@ public class MySqlPluginTest { params.add(param1); executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -1376,7 +1380,7 @@ public class MySqlPluginTest { @Test public void testLongValueWithPreparedStatement() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono> connectionContextMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("select id from users LIMIT {{binding1}}"); @@ -1394,7 +1398,7 @@ public class MySqlPluginTest { params.add(param1); executeActionDTO.setParams(params); - Mono executeMono = dsConnectionMono.flatMap( + Mono executeMono = connectionContextMono.flatMap( conn -> pluginExecutor.executeParameterized(conn, executeActionDTO, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) @@ -1417,18 +1421,19 @@ public class MySqlPluginTest { @Test public void testDatasourceDestroy() { dsConfig = createDatasourceConfiguration(); - Mono connPoolMonoCache = + Mono> connectionContextMonoCache = pluginExecutor.datasourceCreate(dsConfig).cache(); Mono testConnResultMono = - connPoolMonoCache.flatMap(conn -> pluginExecutor.testDatasource(conn)); - Mono> zipMono = zip(connPoolMonoCache, testConnResultMono); + connectionContextMonoCache.flatMap(conn -> pluginExecutor.testDatasource(conn)); + Mono, DatasourceTestResult>> zipMono = + zip(connectionContextMonoCache, testConnResultMono); StepVerifier.create(zipMono) .assertNext(tuple2 -> { DatasourceTestResult testDsResult = tuple2.getT2(); assertEquals(0, testDsResult.getInvalids().size()); - ConnectionPool conn = tuple2.getT1(); - pluginExecutor.datasourceDestroy(conn); + ConnectionContext connectionContext = tuple2.getT1(); + pluginExecutor.datasourceDestroy(connectionContext); try { /** * We need to wait a few seconds before the next check because @@ -1440,7 +1445,7 @@ public class MySqlPluginTest { } catch (InterruptedException e) { throw new RuntimeException(e); } - assertTrue(conn.isDisposed()); + assertTrue((connectionContext.getConnection()).isDisposed()); }) .verifyComplete(); } @@ -1450,7 +1455,7 @@ public class MySqlPluginTest { MySqlPlugin.MySqlPluginExecutor spyPlugin = spy(pluginExecutor); DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - ConnectionPool dsConnectionMono = + ConnectionContext connectionContextMono = pluginExecutor.datasourceCreate(dsConfig).block(); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id FROM users WHERE -- IS operator\nid = 1 limit 1;"); @@ -1461,7 +1466,7 @@ public class MySqlPluginTest { HashMap requestData = new HashMap<>(); Mono resultMono = - spyPlugin.executeCommon(dsConnectionMono, actionConfiguration, TRUE, null, null, requestData); + spyPlugin.executeCommon(connectionContextMono, actionConfiguration, TRUE, null, null, requestData); StepVerifier.create(resultMono) .assertNext(result -> { diff --git a/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlStaleConnectionErrorMessageTest.java b/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlStaleConnectionErrorMessageTest.java index b03538cc1e..f765c74346 100644 --- a/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlStaleConnectionErrorMessageTest.java +++ b/app/server/appsmith-plugins/mysqlPlugin/src/test/java/com/external/plugins/MySqlStaleConnectionErrorMessageTest.java @@ -4,6 +4,7 @@ import com.appsmith.external.dtos.ExecuteActionDTO; import com.appsmith.external.exceptions.pluginExceptions.StaleConnectionException; import com.appsmith.external.models.ActionConfiguration; import com.appsmith.external.models.ActionExecutionResult; +import com.appsmith.external.models.ConnectionContext; import com.external.utils.MySqlDatasourceUtils; import io.r2dbc.pool.ConnectionPool; import io.r2dbc.spi.Connection; @@ -35,8 +36,10 @@ public class MySqlStaleConnectionErrorMessageTest { ConnectionPool mockConnectionPool = mock(ConnectionPool.class); String expectedErrorMessage = "Timeout exception from MockConnectionPool"; when(mockConnectionPool.create()).thenReturn(Mono.error(new TimeoutException(expectedErrorMessage))); + ConnectionContext connectionContext = + new ConnectionContext(mockConnectionPool, null); Mono actionExecutionResultMono = pluginExecutor.executeCommon( - mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); + connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); StepVerifier.create(actionExecutionResultMono) .expectErrorSatisfies(error -> { assertTrue(error instanceof StaleConnectionException); @@ -52,8 +55,10 @@ public class MySqlStaleConnectionErrorMessageTest { ConnectionPool mockConnectionPool = mock(ConnectionPool.class); String expectedErrorMessage = "Timeout exception from MockConnectionPool"; when(mockConnectionPool.create()).thenReturn(Mono.error(new PoolShutdownException(expectedErrorMessage))); + ConnectionContext connectionContext = + new ConnectionContext(mockConnectionPool, null); Mono actionExecutionResultMono = pluginExecutor.executeCommon( - mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); + connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); StepVerifier.create(actionExecutionResultMono) .expectErrorSatisfies(error -> { assertTrue(error instanceof StaleConnectionException); @@ -69,8 +74,10 @@ public class MySqlStaleConnectionErrorMessageTest { ConnectionPool mockConnectionPool = mock(ConnectionPool.class); String expectedErrorMessage = "Timeout exception from MockConnectionPool"; when(mockConnectionPool.create()).thenReturn(Mono.error(new IllegalStateException(expectedErrorMessage))); + ConnectionContext connectionContext = + new ConnectionContext(mockConnectionPool, null); Mono actionExecutionResultMono = pluginExecutor.executeCommon( - mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); + connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); StepVerifier.create(actionExecutionResultMono) .expectErrorSatisfies(error -> { assertTrue(error instanceof StaleConnectionException); @@ -87,8 +94,10 @@ public class MySqlStaleConnectionErrorMessageTest { String expectedErrorMessage = "Timeout exception from MockConnectionPool"; when(mockConnectionPool.create()) .thenReturn(Mono.error(new R2dbcNonTransientResourceException(expectedErrorMessage))); + ConnectionContext connectionContext = + new ConnectionContext(mockConnectionPool, null); Mono actionExecutionResultMono = pluginExecutor.executeCommon( - mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); + connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); StepVerifier.create(actionExecutionResultMono) .expectErrorSatisfies(error -> { assertTrue(error instanceof StaleConnectionException); @@ -106,8 +115,10 @@ public class MySqlStaleConnectionErrorMessageTest { when(mockConnectionPool.create()).thenReturn(Mono.just(mockConnection)); when(mockConnection.validate(ValidationDepth.LOCAL)).thenReturn(Mono.just(false)); when(mockConnection.close()).thenReturn(Mono.empty()); + ConnectionContext connectionContext = + new ConnectionContext(mockConnectionPool, null); Mono actionExecutionResultMono = pluginExecutor.executeCommon( - mockConnectionPool, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); + connectionContext, actionConfiguration, false, List.of(), new ExecuteActionDTO(), new HashMap<>()); StepVerifier.create(actionExecutionResultMono) .expectErrorSatisfies(error -> { assertTrue(error instanceof StaleConnectionException);