From 02f1451443577a18ea7d10efdddf62c3ad7c0b6c Mon Sep 17 00:00:00 2001 From: Sumit Kumar Date: Thu, 28 Jul 2022 17:01:17 +0530 Subject: [PATCH] fix: fix query failure on simultaneous execution of multiple queries (#15458) * update driver remove connection closure when not required * add connection pool remove ssl options * got working with postgres driver * use Redshift driver instead of postgres * updated JUnit TC added comments minor refactor * add comment cleanup * update default port --- .../external/plugins/PostgresPluginTest.java | 1 + .../appsmith-plugins/redshiftPlugin/pom.xml | 15 +- .../com/external/plugins/RedshiftPlugin.java | 459 +++++++++--------- .../utils/RedshiftDatasourceUtils.java | 106 ++++ .../src/main/resources/form.json | 1 + .../external/plugins/RedshiftPluginTest.java | 85 +++- 6 files changed, 405 insertions(+), 262 deletions(-) create mode 100644 app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/utils/RedshiftDatasourceUtils.java diff --git a/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java b/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java index afe1c66e6b..2455fa6246 100644 --- a/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java +++ b/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java @@ -1344,6 +1344,7 @@ public class PostgresPluginTest { .verifyComplete(); } + @Test public void testReadOnlyMode() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); dsConfig.getConnection().setMode(com.appsmith.external.models.Connection.Mode.READ_ONLY); diff --git a/app/server/appsmith-plugins/redshiftPlugin/pom.xml b/app/server/appsmith-plugins/redshiftPlugin/pom.xml index 9f58e8b805..bf96f22b7f 100644 --- a/app/server/appsmith-plugins/redshiftPlugin/pom.xml +++ b/app/server/appsmith-plugins/redshiftPlugin/pom.xml @@ -39,9 +39,21 @@ com.amazon.redshift redshift-jdbc42 - 2.1.0.1 + 2.1.0.9 runtime + + com.zaxxer + HikariCP + 3.4.5 + + + org.slf4j + slf4j-api + + + + @@ -51,7 +63,6 @@ 1.3 test - diff --git a/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/plugins/RedshiftPlugin.java b/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/plugins/RedshiftPlugin.java index 3fe3352fdb..3ee52a0684 100644 --- a/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/plugins/RedshiftPlugin.java +++ b/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/plugins/RedshiftPlugin.java @@ -12,9 +12,10 @@ import com.appsmith.external.models.DatasourceStructure; import com.appsmith.external.models.DatasourceTestResult; import com.appsmith.external.models.Endpoint; import com.appsmith.external.models.RequestParamDTO; -import com.appsmith.external.models.SSLDetails; import com.appsmith.external.plugins.BasePlugin; import com.appsmith.external.plugins.PluginExecutor; +import com.zaxxer.hikari.HikariDataSource; +import com.zaxxer.hikari.HikariPoolMXBean; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.ObjectUtils; @@ -27,7 +28,6 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -42,23 +42,18 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import static com.appsmith.external.constants.ActionConstants.ACTION_CONFIGURATION_BODY; import static com.appsmith.external.helpers.PluginUtils.getColumnsListForJdbcPlugin; import static com.appsmith.external.helpers.PluginUtils.getIdenticalColumns; -import static com.appsmith.external.models.Connection.Mode.READ_ONLY; +import static com.external.utils.RedshiftDatasourceUtils.createConnectionPool; +import static com.external.utils.RedshiftDatasourceUtils.getConnectionFromConnectionPool; @Slf4j public class RedshiftPlugin extends BasePlugin { - static final String JDBC_DRIVER = "com.amazon.redshift.jdbc.Driver"; - private static final String JDBC_PROTOCOL = "jdbc:redshift://"; - private static final String USER = "user"; - private static final String PASSWORD = "password"; - private static final String SSL = "ssl"; - private static final int VALIDITY_CHECK_TIMEOUT = 5; /* must be positive, otherwise may receive exception */ + public static final String JDBC_DRIVER = "com.amazon.redshift.jdbc.Driver"; private static final String DATE_COLUMN_TYPE_NAME = "date"; public RedshiftPlugin(PluginWrapper wrapper) { @@ -66,7 +61,7 @@ public class RedshiftPlugin extends BasePlugin { } @Extension - public static class RedshiftPluginExecutor implements PluginExecutor { + public static class RedshiftPluginExecutor implements PluginExecutor { private final Scheduler scheduler = Schedulers.elastic(); @@ -197,19 +192,8 @@ public class RedshiftPlugin extends BasePlugin { return row; } - /* - * 1. This method can throw SQLException via connection.isClosed() or connection.isValid(...) - * 2. StaleConnectionException thrown by this method needs to be propagated to upper layers so that a retry - * can be triggered. - */ - private void checkConnectionValidity(Connection connection) throws SQLException { - if (connection == null || connection.isClosed()) { - throw new StaleConnectionException(); - } - } - @Override - public Mono execute(Connection connection, + public Mono execute(HikariDataSource connectionPool, DatasourceConfiguration datasourceConfiguration, ActionConfiguration actionConfiguration) { @@ -227,104 +211,139 @@ public class RedshiftPlugin extends BasePlugin { } return Mono.fromCallable(() -> { - /* - * 1. If there is any issue with checking connection validity then assume that the connection is stale. - */ + Connection connection = null; + try { + connection = getConnectionFromConnectionPool(connectionPool); + } catch (SQLException | StaleConnectionException e) { + e.printStackTrace(); + + /** + * When the user configured time limit for the query execution is over, and the query is still + * queued in the connectionPool then InterruptedException is thrown as the execution thread is + * prepared for termination. This exception is wrapped inside SQLException and hence needs to be + * checked via getCause method. This exception does not indicate a Stale connection. + */ + if (e.getCause() != null && e.getCause().getClass().equals(InterruptedException.class)) { + return Mono.error(e); + } + + // The function can throw either StaleConnectionException or SQLException. The underlying hikari + // library throws SQLException in case the pool is closed or there is an issue initializing + // the connection pool which can also be translated in our world to StaleConnectionException + // and should then trigger the destruction and recreation of the pool. + return Mono.error(new StaleConnectionException()); + } + + + /** + * Keeping this print statement post call to getConnectionFromConnectionPool because it checks for + * stale connection pool. + */ + printConnectionPoolStatus(connectionPool, false); + + List> rowsList = new ArrayList<>(50); + final List columnsList = new ArrayList<>(); + Statement statement = null; + ResultSet resultSet = null; + + try { + statement = connection.createStatement(); + boolean isResultSet = statement.execute(query); + + if (isResultSet) { + resultSet = statement.getResultSet(); + ResultSetMetaData metaData = resultSet.getMetaData(); + columnsList.addAll(getColumnsListForJdbcPlugin(metaData)); + + while (resultSet.next()) { + Map row = getRow(resultSet); + rowsList.add(row); + } + } else { + rowsList.add(Map.of( + "affectedRows", + ObjectUtils.defaultIfNull(statement.getUpdateCount(), 0)) + ); + + } + } catch (SQLException e) { + e.printStackTrace(); + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_EXECUTE_ARGUMENT_ERROR, e.getMessage())); + } finally { + if (resultSet != null) { try { - checkConnectionValidity(connection); + resultSet.close(); } catch (SQLException e) { - return Mono.error(new StaleConnectionException()); + log.warn("Error closing Redshift ResultSet", e); } + } - List> rowsList = new ArrayList<>(50); - final List columnsList = new ArrayList<>(); - Statement statement = null; - ResultSet resultSet = null; - + if (statement != null) { try { - statement = connection.createStatement(); - boolean isResultSet = statement.execute(query); - - if (isResultSet) { - resultSet = statement.getResultSet(); - ResultSetMetaData metaData = resultSet.getMetaData(); - columnsList.addAll(getColumnsListForJdbcPlugin(metaData)); - - while (resultSet.next()) { - Map row = getRow(resultSet); - rowsList.add(row); - } - } else { - rowsList.add(Map.of( - "affectedRows", - ObjectUtils.defaultIfNull(statement.getUpdateCount(), 0)) - ); - - } + statement.close(); } catch (SQLException e) { - e.printStackTrace(); - return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_EXECUTE_ARGUMENT_ERROR, e.getMessage())); - } finally { - if (resultSet != null) { - try { - resultSet.close(); - } catch (SQLException e) { - log.warn("Error closing Redshift ResultSet", e); - } - } - - if (statement != null) { - try { - statement.close(); - } catch (SQLException e) { - log.warn("Error closing Redshift Statement", e); - } - } - - try { - connection.close(); - } catch (SQLException e) { - log.warn("Error closing Redshift Connection", e); - } - + log.warn("Error closing Redshift Statement", e); } + } - ActionExecutionResult result = new ActionExecutionResult(); - result.setBody(objectMapper.valueToTree(rowsList)); - result.setMessages(populateHintMessages(columnsList)); - result.setIsExecutionSuccess(true); - log.debug("In RedshiftPlugin, got action execution result"); - return Mono.just(result); - }) - .flatMap(obj -> obj) - .map(obj -> (ActionExecutionResult) obj) - .onErrorMap(e -> { - if (!(e instanceof AppsmithPluginException) && !(e instanceof StaleConnectionException)) { - return new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage()); - } + try { + connection.close(); + } catch (SQLException e) { + log.warn("Error closing Redshift Connection", e); + } - return e; - }) - .onErrorResume(error -> { - error.printStackTrace(); - if (error instanceof StaleConnectionException) { - return Mono.error(error); - } - ActionExecutionResult result = new ActionExecutionResult(); - result.setIsExecutionSuccess(false); - result.setErrorInfo(error); - return Mono.just(result); - }) - // Now set the request in the result to be returned back to the server - .map(actionExecutionResult -> { - ActionExecutionRequest request = new ActionExecutionRequest(); - request.setQuery(query); - request.setRequestParams(requestParams); - ActionExecutionResult result = actionExecutionResult; - result.setRequest(request); - return result; - }) - .subscribeOn(scheduler); + } + + ActionExecutionResult result = new ActionExecutionResult(); + result.setBody(objectMapper.valueToTree(rowsList)); + result.setMessages(populateHintMessages(columnsList)); + result.setIsExecutionSuccess(true); + log.debug("In RedshiftPlugin, got action execution result"); + return Mono.just(result); + }) + .flatMap(obj -> obj) + .map(obj -> (ActionExecutionResult) obj) + .onErrorMap(e -> { + if (!(e instanceof AppsmithPluginException) && !(e instanceof StaleConnectionException)) { + return new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage()); + } + + return e; + }) + .onErrorResume(error -> { + error.printStackTrace(); + if (error instanceof StaleConnectionException) { + return Mono.error(error); + } + ActionExecutionResult result = new ActionExecutionResult(); + result.setIsExecutionSuccess(false); + result.setErrorInfo(error); + return Mono.just(result); + }) + // Now set the request in the result to be returned back to the server + .map(actionExecutionResult -> { + ActionExecutionRequest request = new ActionExecutionRequest(); + request.setQuery(query); + request.setRequestParams(requestParams); + ActionExecutionResult result = actionExecutionResult; + result.setRequest(request); + return result; + }) + .subscribeOn(scheduler); + } + + public void printConnectionPoolStatus(HikariDataSource connectionPool, boolean isFetchingStructure) { + HikariPoolMXBean poolProxy = connectionPool.getHikariPoolMXBean(); + int idleConnections = poolProxy.getIdleConnections(); + int activeConnections = poolProxy.getActiveConnections(); + int totalConnections = poolProxy.getTotalConnections(); + int threadsAwaitingConnection = poolProxy.getThreadsAwaitingConnection(); + log.debug(Thread.currentThread().getName() + (isFetchingStructure ? "Before fetching Redshift db" + + " structure." : "Before executing Redshift query.") + " Hikari Pool stats : " + + " active - " + activeConnections + + ", idle - " + idleConnections + + ", awaiting - " + threadsAwaitingConnection + + ", total - " + totalConnections); } private Set populateHintMessages(List columnNames) { @@ -342,80 +361,26 @@ public class RedshiftPlugin extends BasePlugin { } @Override - public Mono datasourceCreate(DatasourceConfiguration datasourceConfiguration) { + public Mono datasourceCreate(DatasourceConfiguration datasourceConfiguration) { try { Class.forName(JDBC_DRIVER); } catch (ClassNotFoundException e) { - return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error loading Redshift JDBC Driver class.")); + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error loading " + + "Redshift JDBC Driver class.")); } - String url; - DBAuth authentication = (DBAuth) datasourceConfiguration.getAuthentication(); - - com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection(); - - final boolean isSslEnabled = configurationConnection != null - && configurationConnection.getSsl() != null - && !SSLDetails.AuthType.NO_SSL.equals(configurationConnection.getSsl().getAuthType()); - - Properties properties = new Properties(); - properties.put(SSL, isSslEnabled); - if (authentication.getUsername() != null) { - properties.put(USER, authentication.getUsername()); - } - if (authentication.getPassword() != null) { - properties.put(PASSWORD, authentication.getPassword()); - } - - if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) { - url = datasourceConfiguration.getUrl(); - - } else { - StringBuilder urlBuilder = new StringBuilder(JDBC_PROTOCOL); - for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) { - urlBuilder - .append(endpoint.getHost()) - .append(':') - .append(ObjectUtils.defaultIfNull(endpoint.getPort(), 5439L)) - .append('/'); - - if (!StringUtils.isEmpty(authentication.getDatabaseName())) { - urlBuilder.append(authentication.getDatabaseName()); - } - } - url = urlBuilder.toString(); - } - - return Mono.fromCallable(() -> { - try { - log.debug("Connecting to Redshift db"); - Connection connection = DriverManager.getConnection(url, properties); - connection.setReadOnly( - configurationConnection != null && READ_ONLY.equals(configurationConnection.getMode())); - return Mono.just(connection); - } catch (SQLException e) { - e.printStackTrace(); - return Mono.error( - new AppsmithPluginException( - AppsmithPluginError.PLUGIN_DATASOURCE_ARGUMENT_ERROR, - e.getMessage() - ) - ); - } + return Mono + .fromCallable(() -> { + log.debug(Thread.currentThread().getName() + ": Connecting to Redshift db"); + return createConnectionPool(datasourceConfiguration); }) - .flatMap(obj -> obj) - .map(conn -> (Connection) conn) .subscribeOn(scheduler); } @Override - public void datasourceDestroy(Connection connection) { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException e) { - log.error("Error closing Redshift Connection.", e); + public void datasourceDestroy(HikariDataSource connectionPool) { + if (connectionPool != null) { + connectionPool.close(); } } @@ -465,12 +430,8 @@ public class RedshiftPlugin extends BasePlugin { public Mono testDatasource(DatasourceConfiguration datasourceConfiguration) { return datasourceCreate(datasourceConfiguration) .map(connection -> { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException e) { - log.warn("Error closing Redshift connection that was made for testing.", e); + if (connection != null) { + connection.close(); } return new DatasourceTestResult(); @@ -619,77 +580,99 @@ public class RedshiftPlugin extends BasePlugin { } @Override - public Mono getStructure(Connection connection, DatasourceConfiguration datasourceConfiguration) { - /* - * 1. If there is any issue with checking connection validity then assume that the connection is stale. - */ - try { - checkConnectionValidity(connection); - } catch (SQLException e) { - return Mono.error(new StaleConnectionException()); - } - + public Mono getStructure(HikariDataSource connectionPool, + DatasourceConfiguration datasourceConfiguration) { final DatasourceStructure structure = new DatasourceStructure(); final Map tablesByName = new LinkedHashMap<>(); final Map keyRegistry = new HashMap<>(); return Mono.fromSupplier(() -> { - // Ref: . - log.debug("Getting Redshift Db structure"); - try (Statement statement = connection.createStatement()) { + Connection connection = null; + try { + connection = getConnectionFromConnectionPool(connectionPool); + } catch (SQLException | StaleConnectionException e) { + e.printStackTrace(); - // Get tables' schema and fill up their columns. - ResultSet columnsResultSet = statement.executeQuery(TABLES_QUERY); - getTablesInfo(columnsResultSet, tablesByName); + /** + * When the user configured time limit for the query execution is over, and the query is still + * queued in the connectionPool then InterruptedException is thrown as the execution thread is + * prepared for termination. This exception is wrapped inside SQLException and hence needs to be + * checked via getCause method. This exception does not indicate a Stale connection. + */ + if (e.getCause() != null && e.getCause().getClass().equals(InterruptedException.class)) { + return Mono.error(e); + } - // Get tables' primary key constraints and fill those up. - ResultSet primaryKeyConstraintsResultSet = statement.executeQuery(KEYS_QUERY_PRIMARY_KEY); - getKeysInfo(primaryKeyConstraintsResultSet, tablesByName, keyRegistry); + // The function can throw either StaleConnectionException or SQLException. The underlying hikari + // library throws SQLException in case the pool is closed or there is an issue initializing + // the connection pool which can also be translated in our world to StaleConnectionException + // and should then trigger the destruction and recreation of the pool. + return Mono.error(new StaleConnectionException()); + } - // Get tables' foreign key constraints and fill those up. - ResultSet foreignKeyConstraintsResultSet = statement.executeQuery(KEYS_QUERY_FOREIGN_KEY); - getKeysInfo(foreignKeyConstraintsResultSet, tablesByName, keyRegistry); + /** + * Keeping this print statement post call to getConnectionFromConnectionPool because it checks for + * stale connection pool. + */ + printConnectionPoolStatus(connectionPool, true); - // Get templates for each table and put those in. - getTemplates(tablesByName); - } catch (SQLException e) { - e.printStackTrace(); - return Mono.error( - new AppsmithPluginException( - AppsmithPluginError.PLUGIN_GET_STRUCTURE_ERROR, - e.getMessage() - ) - ); - } catch (AppsmithPluginException e) { - e.printStackTrace(); - return Mono.error(e); + // Ref: . + log.debug(Thread.currentThread().getName() + ": Getting Redshift Db structure"); + try (Statement statement = connection.createStatement()) { - } finally { - try { - connection.close(); - } catch (SQLException e) { - log.warn("Error closing Redshift Connection", e); - } + // Get tables' schema and fill up their columns. + ResultSet columnsResultSet = statement.executeQuery(TABLES_QUERY); + getTablesInfo(columnsResultSet, tablesByName); - } + // Get tables' primary key constraints and fill those up. + ResultSet primaryKeyConstraintsResultSet = statement.executeQuery(KEYS_QUERY_PRIMARY_KEY); + getKeysInfo(primaryKeyConstraintsResultSet, tablesByName, keyRegistry); - structure.setTables(new ArrayList<>(tablesByName.values())); + // Get tables' foreign key constraints and fill those up. + ResultSet foreignKeyConstraintsResultSet = statement.executeQuery(KEYS_QUERY_FOREIGN_KEY); + getKeysInfo(foreignKeyConstraintsResultSet, tablesByName, keyRegistry); - for (DatasourceStructure.Table table : structure.getTables()) { - table.getKeys().sort(Comparator.naturalOrder()); - } + // Get templates for each table and put those in. + getTemplates(tablesByName); + } catch (SQLException e) { + e.printStackTrace(); + return Mono.error( + new AppsmithPluginException( + AppsmithPluginError.PLUGIN_GET_STRUCTURE_ERROR, + e.getMessage() + ) + ); + } catch (AppsmithPluginException e) { + e.printStackTrace(); + return Mono.error(e); - return structure; - }) - .map(resultStructure -> (DatasourceStructure) resultStructure) - .onErrorMap(e -> { - if (!(e instanceof AppsmithPluginException)) { - return new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage()); - } + } + finally { + try { + connection.close(); + } catch (SQLException e) { + log.warn("Error closing Redshift Connection", e); + } - return e; - }) - .subscribeOn(scheduler); + } + + structure.setTables(new ArrayList<>(tablesByName.values())); + + for (DatasourceStructure.Table table : structure.getTables()) { + table.getKeys().sort(Comparator.naturalOrder()); + } + + return structure; + }) + .map(resultStructure -> (DatasourceStructure) resultStructure) + .onErrorMap(e -> { + if ((e instanceof AppsmithPluginException) || (e instanceof StaleConnectionException)) { + return e; + } + + return new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage()); + }) + .subscribeOn(scheduler); } } } diff --git a/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/utils/RedshiftDatasourceUtils.java b/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/utils/RedshiftDatasourceUtils.java new file mode 100644 index 0000000000..7931ac5c9e --- /dev/null +++ b/app/server/appsmith-plugins/redshiftPlugin/src/main/java/com/external/utils/RedshiftDatasourceUtils.java @@ -0,0 +1,106 @@ +package com.external.utils; + +import com.appsmith.external.exceptions.pluginExceptions.AppsmithPluginError; +import com.appsmith.external.exceptions.pluginExceptions.AppsmithPluginException; +import com.appsmith.external.exceptions.pluginExceptions.StaleConnectionException; +import com.appsmith.external.models.DBAuth; +import com.appsmith.external.models.DatasourceConfiguration; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import com.zaxxer.hikari.pool.HikariPool; +import org.apache.commons.lang.ObjectUtils; +import org.springframework.util.StringUtils; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; +import java.util.stream.Collectors; + +import static com.external.plugins.RedshiftPlugin.JDBC_DRIVER; + +public class RedshiftDatasourceUtils { + + private static final int MINIMUM_POOL_SIZE = 1; + private static final int MAXIMUM_POOL_SIZE = 5; + private static final long LEAK_DETECTION_TIME_MS = 60 * 1000; + private static final String JDBC_PROTOCOL = "jdbc:redshift://"; + + + public static HikariDataSource createConnectionPool(DatasourceConfiguration datasourceConfiguration) throws AppsmithPluginException { + HikariConfig config = new HikariConfig(); + + config.setDriverClassName(JDBC_DRIVER); + config.setMinimumIdle(MINIMUM_POOL_SIZE); + config.setMaximumPoolSize(MAXIMUM_POOL_SIZE); + + // Set authentication properties + DBAuth authentication = (DBAuth) datasourceConfiguration.getAuthentication(); + if (authentication.getUsername() != null) { + config.setUsername(authentication.getUsername()); + } + if (authentication.getPassword() != null) { + config.setPassword(authentication.getPassword()); + } + + // Set up the connection URL + StringBuilder urlBuilder = new StringBuilder(JDBC_PROTOCOL); + + List hosts = datasourceConfiguration + .getEndpoints() + .stream() + .map(endpoint -> endpoint.getHost() + ":" + ObjectUtils.defaultIfNull(endpoint.getPort(), 5439L)) + .collect(Collectors.toList()); + + urlBuilder.append(String.join(",", hosts)).append("/"); + + if (!StringUtils.isEmpty(authentication.getDatabaseName())) { + urlBuilder.append(authentication.getDatabaseName()); + } + + String url = urlBuilder.toString(); + config.setJdbcUrl(url); + + // Configuring leak detection threshold for 60 seconds. Any connection which hasn't been released in 60 seconds + // should get tracked (may be falsely for long running queries) as leaked connection + config.setLeakDetectionThreshold(LEAK_DETECTION_TIME_MS); + config.setConnectionTimeout(60 * 1000); + + // Set read only mode if applicable + com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection(); + switch (configurationConnection.getMode()) { + case READ_WRITE: { + config.setReadOnly(false); + break; + } + case READ_ONLY: { + config.setReadOnly(true); + config.addDataSourceProperty("readOnlyMode", "always"); + break; + } + } + + // Now create the connection pool from the configuration + HikariDataSource datasource = null; + try { + datasource = new HikariDataSource(config); + } catch (HikariPool.PoolInitializationException e) { + throw new AppsmithPluginException( + AppsmithPluginError.PLUGIN_DATASOURCE_ARGUMENT_ERROR, + e.getMessage() + ); + } + + return datasource; + } + + public static Connection getConnectionFromConnectionPool(HikariDataSource connectionPool) throws SQLException { + + if (connectionPool == null || connectionPool.isClosed() || !connectionPool.isRunning()) { + System.out.println(Thread.currentThread().getName() + + ": Encountered stale connection pool in Redshift plugin. Reporting back."); + throw new StaleConnectionException(); + } + + return connectionPool.getConnection(); + } +} diff --git a/app/server/appsmith-plugins/redshiftPlugin/src/main/resources/form.json b/app/server/appsmith-plugins/redshiftPlugin/src/main/resources/form.json index eb58eb62e8..1b10f5bf55 100644 --- a/app/server/appsmith-plugins/redshiftPlugin/src/main/resources/form.json +++ b/app/server/appsmith-plugins/redshiftPlugin/src/main/resources/form.json @@ -76,6 +76,7 @@ { "id": 3, "sectionName": "SSL (optional)", + "hidden": true, "children": [ { "label": "SSL Mode", diff --git a/app/server/appsmith-plugins/redshiftPlugin/src/test/java/com/external/plugins/RedshiftPluginTest.java b/app/server/appsmith-plugins/redshiftPlugin/src/test/java/com/external/plugins/RedshiftPluginTest.java index 9a112c4888..6fde6d8c4e 100644 --- a/app/server/appsmith-plugins/redshiftPlugin/src/test/java/com/external/plugins/RedshiftPluginTest.java +++ b/app/server/appsmith-plugins/redshiftPlugin/src/test/java/com/external/plugins/RedshiftPluginTest.java @@ -9,11 +9,12 @@ import com.appsmith.external.models.DBAuth; import com.appsmith.external.models.DatasourceConfiguration; import com.appsmith.external.models.DatasourceStructure; import com.appsmith.external.models.Endpoint; +import com.appsmith.external.models.Property; import com.appsmith.external.models.RequestParamDTO; -import com.appsmith.external.plugins.PluginExecutor; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.zaxxer.hikari.HikariDataSource; import lombok.extern.slf4j.Slf4j; import org.junit.Assert; import org.junit.BeforeClass; @@ -45,8 +46,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; /** @@ -54,7 +58,7 @@ import static org.mockito.Mockito.when; */ @Slf4j public class RedshiftPluginTest { - PluginExecutor pluginExecutor = new RedshiftPlugin.RedshiftPluginExecutor(); + RedshiftPlugin.RedshiftPluginExecutor pluginExecutor = new RedshiftPlugin.RedshiftPluginExecutor(); private static String address; private static Integer port; @@ -91,7 +95,14 @@ public class RedshiftPluginTest { @Test public void testDatasourceCreateConnectionFailure() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + DBAuth authentication = (DBAuth) dsConfig.getAuthentication(); + authentication.setUsername("user"); + authentication.setPassword("pass"); + authentication.setDatabaseName("dbName"); + dsConfig.setEndpoints(List.of(new Endpoint("host", 1234L))); + dsConfig.setConnection(new com.appsmith.external.models.Connection()); + dsConfig.getConnection().setMode(com.appsmith.external.models.Connection.Mode.READ_ONLY); + Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); StepVerifier.create(dsConnectionMono) .expectErrorMatches(throwable -> @@ -99,7 +110,7 @@ public class RedshiftPluginTest { throwable.getMessage().equals( new AppsmithPluginException( AppsmithPluginError.PLUGIN_DATASOURCE_ARGUMENT_ERROR, - "The connection attempt failed." + "Failed to initialize pool: The connection attempt failed." ) .getMessage() ) @@ -117,9 +128,9 @@ public class RedshiftPluginTest { * a. isClosed(): return true * b. isValid() : return false */ - Connection mockConnection = mock(Connection.class); + HikariDataSource mockConnection = mock(HikariDataSource.class); when(mockConnection.isClosed()).thenReturn(true); - when(mockConnection.isValid(Mockito.anyInt())).thenReturn(false); + when(mockConnection.isRunning()).thenReturn(false); Mono resultMono = pluginExecutor.execute(mockConnection, dsConfig, actionConfiguration); @@ -184,13 +195,18 @@ public class RedshiftPluginTest { */ @Test public void testExecute() throws SQLException { - /* Mock java.sql.Connection: + /* Mock Hikari connection pool object: * a. isClosed() - * b. isValid() + * b. isRunning() */ + HikariDataSource mockConnectionPool = mock(HikariDataSource.class); + when(mockConnectionPool.isClosed()).thenReturn(false); + when(mockConnectionPool.isRunning()).thenReturn(true); + Connection mockConnection = mock(Connection.class); when(mockConnection.isClosed()).thenReturn(false); when(mockConnection.isValid(Mockito.anyInt())).thenReturn(true); + when(mockConnectionPool.getConnection()).thenReturn(mockConnection); /* Mock java.sql.Statement: * a. execute(...) @@ -198,7 +214,7 @@ public class RedshiftPluginTest { */ Statement mockStatement = mock(Statement.class); when(mockConnection.createStatement()).thenReturn(mockStatement); - when(mockStatement.execute(Mockito.any())).thenReturn(true); + when(mockStatement.execute(any())).thenReturn(true); doNothing().when(mockStatement).close(); /* Mock java.sql.ResultSet: @@ -217,7 +233,7 @@ public class RedshiftPluginTest { when(mockResultSet.getDate(Mockito.anyInt())).thenReturn(Date.valueOf("2018-12-31"), Date.valueOf("2018-11-30")); when(mockResultSet.getString(Mockito.anyInt())).thenReturn("18:32:45", "12:05:06+00"); when(mockResultSet.getTime(Mockito.anyInt())).thenReturn(Time.valueOf("20:45:15")); - when(mockResultSet.getObject(Mockito.anyInt(), Mockito.any(Class.class))).thenReturn(OffsetDateTime.parse( + when(mockResultSet.getObject(Mockito.anyInt(), any(Class.class))).thenReturn(OffsetDateTime.parse( "2018-11-30T19:45:15+00")); when(mockResultSet.next()).thenReturn(true).thenReturn(false); doNothing().when(mockResultSet).close(); @@ -238,10 +254,13 @@ public class RedshiftPluginTest { ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT * FROM users WHERE id = 1"); DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = Mono.just(mockConnection); + Mono dsConnectionMono = Mono.just(mockConnectionPool); + + RedshiftPlugin.RedshiftPluginExecutor spyPluginExecutor = spy(new RedshiftPlugin.RedshiftPluginExecutor()); + doNothing().when(spyPluginExecutor).printConnectionPoolStatus(mockConnectionPool, false); Mono executeMono = dsConnectionMono - .flatMap(conn -> pluginExecutor.execute(conn, dsConfig, actionConfiguration)); + .flatMap(connPool -> spyPluginExecutor.execute(connPool, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(result -> { @@ -308,13 +327,24 @@ public class RedshiftPluginTest { */ @Test public void testStructure() throws SQLException { + + /* Mock Hikari connection pool object: + * a. isClosed() + * b. isRunning() + */ + HikariDataSource mockConnectionPool = mock(HikariDataSource.class); + when(mockConnectionPool.isClosed()).thenReturn(false); + when(mockConnectionPool.isRunning()).thenReturn(true); + /* Mock java.sql.Connection: * a. isClosed() * b. isValid() + * Also, return mock connection object from mock connection pool */ Connection mockConnection = mock(Connection.class); when(mockConnection.isClosed()).thenReturn(false); when(mockConnection.isValid(Mockito.anyInt())).thenReturn(true); + when(mockConnectionPool.getConnection()).thenReturn(mockConnection); /* Mock java.sql.Statement: * a. execute(...) @@ -322,7 +352,7 @@ public class RedshiftPluginTest { */ Statement mockStatement = mock(Statement.class); when(mockConnection.createStatement()).thenReturn(mockStatement); - when(mockStatement.execute(Mockito.any())).thenReturn(true); + when(mockStatement.execute(any())).thenReturn(true); doNothing().when(mockStatement).close(); /* Mock java.sql.ResultSet: @@ -367,10 +397,13 @@ public class RedshiftPluginTest { when(mockResultSet.getString("foreign_column")).thenReturn("id"); // KEYS_QUERY_FOREIGN_KEY doNothing().when(mockResultSet).close(); + RedshiftPlugin.RedshiftPluginExecutor spyPluginExecutor = spy(new RedshiftPlugin.RedshiftPluginExecutor()); + doNothing().when(spyPluginExecutor).printConnectionPoolStatus(mockConnectionPool, true); + DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = Mono.just(mockConnection); + Mono dsConnectionMono = Mono.just(mockConnectionPool); Mono structureMono = dsConnectionMono - .flatMap(connection -> pluginExecutor.getStructure(connection, dsConfig)); + .flatMap(connectionPool -> spyPluginExecutor.getStructure(connectionPool, dsConfig)); StepVerifier.create(structureMono) .assertNext(structure -> { @@ -469,6 +502,14 @@ public class RedshiftPluginTest { @Test public void testDuplicateColumnNames() throws SQLException { + /* Mock Hikari connection pool object: + * a. isClosed() + * b. isRunning() + */ + HikariDataSource mockConnectionPool = mock(HikariDataSource.class); + when(mockConnectionPool.isClosed()).thenReturn(false); + when(mockConnectionPool.isRunning()).thenReturn(true); + /* Mock java.sql.Connection: * a. isClosed() * b. isValid() @@ -476,14 +517,11 @@ public class RedshiftPluginTest { Connection mockConnection = mock(Connection.class); when(mockConnection.isClosed()).thenReturn(false); when(mockConnection.isValid(Mockito.anyInt())).thenReturn(true); + when(mockConnectionPool.getConnection()).thenReturn(mockConnection); - /* Mock java.sql.Statement: - * a. execute(...) - * b. close() - */ Statement mockStatement = mock(Statement.class); when(mockConnection.createStatement()).thenReturn(mockStatement); - when(mockStatement.execute(Mockito.any())).thenReturn(true); + when(mockStatement.execute(any())).thenReturn(true); doNothing().when(mockStatement).close(); /* Mock java.sql.ResultSet: @@ -512,10 +550,13 @@ public class RedshiftPluginTest { ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id, id, username, username FROM users WHERE id = 1"); DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = Mono.just(mockConnection); + Mono dsConnectionMono = Mono.just(mockConnectionPool); + + RedshiftPlugin.RedshiftPluginExecutor spyPluginExecutor = spy(new RedshiftPlugin.RedshiftPluginExecutor()); + doNothing().when(spyPluginExecutor).printConnectionPoolStatus(mockConnectionPool, false); Mono executeMono = dsConnectionMono - .flatMap(conn -> pluginExecutor.execute(conn, dsConfig, actionConfiguration)); + .flatMap(connPool -> spyPluginExecutor.execute(connPool, dsConfig, actionConfiguration)); StepVerifier.create(executeMono) .assertNext(result -> {