diff --git a/app/server/appsmith-plugins/postgresPlugin/pom.xml b/app/server/appsmith-plugins/postgresPlugin/pom.xml index a1ec8a2f3b..0a4fbe5ecb 100644 --- a/app/server/appsmith-plugins/postgresPlugin/pom.xml +++ b/app/server/appsmith-plugins/postgresPlugin/pom.xml @@ -52,6 +52,18 @@ runtime + + com.zaxxer + HikariCP + 3.4.5 + + + org.slf4j + slf4j-api + + + + junit diff --git a/app/server/appsmith-plugins/postgresPlugin/src/main/java/com/external/plugins/PostgresPlugin.java b/app/server/appsmith-plugins/postgresPlugin/src/main/java/com/external/plugins/PostgresPlugin.java index 0e6f2c820c..4833e8fb41 100644 --- a/app/server/appsmith-plugins/postgresPlugin/src/main/java/com/external/plugins/PostgresPlugin.java +++ b/app/server/appsmith-plugins/postgresPlugin/src/main/java/com/external/plugins/PostgresPlugin.java @@ -13,8 +13,8 @@ import com.appsmith.external.pluginExceptions.AppsmithPluginException; import com.appsmith.external.pluginExceptions.StaleConnectionException; import com.appsmith.external.plugins.BasePlugin; import com.appsmith.external.plugins.PluginExecutor; -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; import org.apache.commons.lang.ObjectUtils; import org.pf4j.Extension; import org.pf4j.PluginWrapper; @@ -25,7 +25,6 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; @@ -39,21 +38,15 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static com.appsmith.external.models.Connection.Mode.READ_ONLY; - public class PostgresPlugin extends BasePlugin { static final String JDBC_DRIVER = "org.postgresql.Driver"; - private static final String USER = "user"; - private static final String PASSWORD = "password"; - private static final String SSL = "ssl"; - private static final int VALIDITY_CHECK_TIMEOUT = 5; + private static final String SSL = "useSSL"; private static final String DATE_COLUMN_TYPE_NAME = "date"; @@ -61,13 +54,8 @@ public class PostgresPlugin extends BasePlugin { super(wrapper); } - /** - * Postgres plugin receives the query as json of the following format : - */ - - @Slf4j @Extension - public static class PostgresPluginExecutor implements PluginExecutor { + public static class PostgresPluginExecutor implements PluginExecutor { private final Scheduler scheduler = Schedulers.elastic(); @@ -114,25 +102,23 @@ public class PostgresPlugin extends BasePlugin { "order by self_schema, self_table;"; @Override - public Mono execute(Connection connection, + public Mono execute(HikariDataSource connection, DatasourceConfiguration datasourceConfiguration, ActionConfiguration actionConfiguration) { - return (Mono) Mono.fromCallable(() -> { + return Mono.fromCallable(() -> { + + Connection connectionFromPool; try { - if (connection == null || connection.isClosed() || !connection.isValid(VALIDITY_CHECK_TIMEOUT)) { - System.out.println("Encountered stale connection in Postgres plugin. Reporting back."); - throw new StaleConnectionException(); - } - } catch (SQLException error) { - // This exception is thrown only when the timeout to `isValid` is negative. Since, that's not the case, - // here, this should never happen. - System.out.println("Error checking validity of Postgres connection." + error); + connectionFromPool = getConnectionFromConnectionPool(connection); + } catch (StaleConnectionException e) { + return Mono.error(e); + } catch (SQLException e) { + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage())); } String query = actionConfiguration.getBody(); - if (query == null) { return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Missing required parameter: Query.")); } @@ -142,7 +128,7 @@ public class PostgresPlugin extends BasePlugin { Statement statement = null; ResultSet resultSet = null; try { - statement = connection.createStatement(); + statement = connectionFromPool.createStatement(); boolean isResultSet = statement.execute(query); if (isResultSet) { @@ -209,7 +195,8 @@ public class PostgresPlugin extends BasePlugin { try { resultSet.close(); } catch (SQLException e) { - System.out.println("Error closing Postgres ResultSet" + e.getMessage()); + System.out.println(Thread.currentThread().getName() + + ": Execute Error closing Postgres ResultSet" + e.getMessage()); } } @@ -217,7 +204,18 @@ public class PostgresPlugin extends BasePlugin { try { statement.close(); } catch (SQLException e) { - System.out.println("Error closing Postgres Statement" + e.getMessage()); + System.out.println(Thread.currentThread().getName() + + ": Execute Error closing Postgres Statement" + e.getMessage()); + } + } + + if (connectionFromPool != null) { + try { + // Return the connetion back to the pool + connectionFromPool.close(); + } catch (SQLException e) { + System.out.println(Thread.currentThread().getName() + + ": Execute Error returning Postgres connection to pool" + e.getMessage()); } } @@ -230,87 +228,39 @@ public class PostgresPlugin extends BasePlugin { return Mono.just(result); }) .flatMap(obj -> obj) + .map(obj -> { + ActionExecutionResult result = (ActionExecutionResult) obj; + return result; + }) .subscribeOn(scheduler); } @Override - public Mono datasourceCreate(DatasourceConfiguration datasourceConfiguration) { + public Mono datasourceCreate(DatasourceConfiguration datasourceConfiguration) { try { Class.forName(JDBC_DRIVER); } catch (ClassNotFoundException e) { return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error loading Postgres JDBC Driver class.")); } - String url; - AuthenticationDTO authentication = datasourceConfiguration.getAuthentication(); - - com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection(); - - final boolean isSslEnabled = configurationConnection != null - && configurationConnection.getSsl() != null - && !SSLDetails.AuthType.NO_SSL.equals(configurationConnection.getSsl().getAuthType()); - - Properties properties = new Properties(); - properties.put(SSL, isSslEnabled); - if (authentication.getUsername() != null) { - properties.put(USER, authentication.getUsername()); - } - if (authentication.getPassword() != null) { - properties.put(PASSWORD, authentication.getPassword()); - } - - if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) { - url = datasourceConfiguration.getUrl(); - - } else { - StringBuilder urlBuilder = new StringBuilder("jdbc:postgresql://"); - for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) { - urlBuilder - .append(endpoint.getHost()) - .append(':') - .append(ObjectUtils.defaultIfNull(endpoint.getPort(), 5432L)) - .append('/'); - - if (!StringUtils.isEmpty(authentication.getDatabaseName())) { - urlBuilder.append(authentication.getDatabaseName()); - } - } - url = urlBuilder.toString(); - - } - - return Mono.fromCallable(() -> { - try { - System.out.println(Thread.currentThread().getName() + ": Connecting to Postgres db"); - Connection connection = DriverManager.getConnection(url, properties); - connection.setReadOnly( - configurationConnection != null && READ_ONLY.equals(configurationConnection.getMode())); - return Mono.just(connection); - - } catch (SQLException e) { - return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error connecting to Postgres.", e)); - - } - }) - .flatMap(obj -> obj) - .map(conn -> (Connection) conn) + return Mono + .fromCallable(() -> { + System.out.println(Thread.currentThread().getName() + ": Connecting to Postgres db"); + return createConnectionPool(datasourceConfiguration); + }) .subscribeOn(scheduler); } @Override - public void datasourceDestroy(Connection connection) { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException e) { - System.out.println("Error closing Postgres Connection." + e.getMessage()); + public void datasourceDestroy(HikariDataSource connection) { + if (connection != null) { + connection.close(); } } @Override - public Set validateDatasource(@NonNull DatasourceConfiguration datasourceConfiguration) { + public Set validateDatasource(DatasourceConfiguration datasourceConfiguration) { Set invalids = new HashSet<>(); if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) { @@ -355,12 +305,8 @@ public class PostgresPlugin extends BasePlugin { public Mono testDatasource(DatasourceConfiguration datasourceConfiguration) { return datasourceCreate(datasourceConfiguration) .map(connection -> { - try { - if (connection != null) { - connection.close(); - } - } catch (SQLException e) { - System.out.println(Thread.currentThread().getName() + ": Error closing Postgres connection that was made for testing." + e.getMessage()); + if (connection != null) { + connection.close(); } return new DatasourceTestResult(); @@ -369,27 +315,24 @@ public class PostgresPlugin extends BasePlugin { } @Override - public Mono getStructure(Connection connection, DatasourceConfiguration datasourceConfiguration) { - - try { - if (connection == null || connection.isClosed() || !connection.isValid(VALIDITY_CHECK_TIMEOUT)) { - System.out.println(Thread.currentThread().getName() + ": Encountered stale connection in Postgres plugin. Reporting back."); - throw new StaleConnectionException(); - } - } catch (SQLException e) { - // This exception is thrown only when the timeout to `isValid` is negative. Since, that's not the case, - // here, this should never happen. - System.out.println(Thread.currentThread().getName() + ": Error checking validity of Postgres connection." + e.getMessage()); - } + public Mono getStructure(HikariDataSource connection, DatasourceConfiguration datasourceConfiguration) { final DatasourceStructure structure = new DatasourceStructure(); final Map tablesByName = new LinkedHashMap<>(); return Mono.fromSupplier(() -> { - // Ref: . + Connection connectionFromPool; + try { + connectionFromPool = getConnectionFromConnectionPool(connection); + } catch (StaleConnectionException e) { + return Mono.error(e); + } catch (SQLException e) { + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage())); + } - try (Statement statement = connection.createStatement()) { + // Ref: . + try (Statement statement = connectionFromPool.createStatement()) { // Get tables and fill up their columns. try (ResultSet columnsResultSet = statement.executeQuery(TABLES_QUERY)) { @@ -514,6 +457,17 @@ public class PostgresPlugin extends BasePlugin { } catch (SQLException throwable) { return Mono.error(throwable); + } finally { + + if (connectionFromPool != null) { + try { + // Return the connection back to the pool + connectionFromPool.close(); + } catch (SQLException e) { + System.out.println(Thread.currentThread().getName() + + ": Error returning Postgres connection to pool during get structure" + e.getMessage()); + } + } } structure.setTables(new ArrayList<>(tablesByName.values())); @@ -528,4 +482,78 @@ public class PostgresPlugin extends BasePlugin { } } + /** + * This function is blocking in nature which connects to the database and creates a connection pool + * @param datasourceConfiguration + * @return connection pool + */ + private static HikariDataSource createConnectionPool(DatasourceConfiguration datasourceConfiguration) + { + HikariConfig config = new HikariConfig(); + + config.setDriverClassName(JDBC_DRIVER); + + // Set SSL property + com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection(); + + final boolean isSslEnabled = configurationConnection != null + && configurationConnection.getSsl() != null + && !SSLDetails.AuthType.NO_SSL.equals(configurationConnection.getSsl().getAuthType()); + + config.addDataSourceProperty(SSL, isSslEnabled); + + // Set authentication properties + AuthenticationDTO authentication = datasourceConfiguration.getAuthentication(); + if (authentication.getUsername() != null) { + config.setUsername(authentication.getUsername()); + } + if (authentication.getPassword() != null) { + config.setPassword(authentication.getPassword()); + } + + // Set up the connection URL + String url; + if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) { + url = datasourceConfiguration.getUrl(); + + } else { + StringBuilder urlBuilder = new StringBuilder("jdbc:postgresql://"); + for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) { + urlBuilder + .append(endpoint.getHost()) + .append(':') + .append(ObjectUtils.defaultIfNull(endpoint.getPort(), 5432L)) + .append('/'); + + if (!StringUtils.isEmpty(authentication.getDatabaseName())) { + urlBuilder.append(authentication.getDatabaseName()); + } + } + url = urlBuilder.toString(); + } + config.setJdbcUrl(url); + + // Now create the connection pool from the configuration + HikariDataSource datasource = new HikariDataSource(config); + + return datasource; + } + + /** + * First checks if the connection pool is still valid. If yes, we fetch a connection from the pool and return + * In case a connection is not available in the pool, SQL Exception is thrown + * @param connectionPool + * @return SQL Connection + */ + private static Connection getConnectionFromConnectionPool(HikariDataSource connectionPool) throws SQLException { + + if (connectionPool == null || connectionPool.isClosed() || !connectionPool.isRunning()) { + System.out.println(Thread.currentThread().getName() + + ": Encountered stale connection pool in Postgres plugin. Reporting back."); + throw new StaleConnectionException(); + } + + return connectionPool.getConnection(); + } + } diff --git a/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java b/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java index 17fd1c187b..e9ec5ecfdd 100644 --- a/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java +++ b/app/server/appsmith-plugins/postgresPlugin/src/test/java/com/external/plugins/PostgresPluginTest.java @@ -6,10 +6,11 @@ import com.appsmith.external.models.AuthenticationDTO; import com.appsmith.external.models.DatasourceConfiguration; import com.appsmith.external.models.DatasourceStructure; import com.appsmith.external.models.Endpoint; +import com.appsmith.external.pluginExceptions.StaleConnectionException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; -import lombok.extern.slf4j.Slf4j; +import com.zaxxer.hikari.HikariDataSource; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -37,7 +38,6 @@ import static org.junit.Assert.assertTrue; /** * Unit tests for the PostgresPlugin */ -@Slf4j public class PostgresPluginTest { PostgresPlugin.PostgresPluginExecutor pluginExecutor = new PostgresPlugin.PostgresPluginExecutor(); @@ -159,7 +159,7 @@ public class PostgresPluginTest { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); StepVerifier.create(dsConnectionMono) .assertNext(Assert::assertNotNull) @@ -200,7 +200,7 @@ public class PostgresPluginTest { @Test public void testAliasColumnNames() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT id as user_id FROM users WHERE id = 1"); @@ -227,7 +227,7 @@ public class PostgresPluginTest { @Test public void testExecute() { DatasourceConfiguration dsConfig = createDatasourceConfiguration(); - Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); + Mono dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig); ActionConfiguration actionConfiguration = new ActionConfiguration(); actionConfiguration.setBody("SELECT * FROM users WHERE id = 1"); @@ -388,4 +388,23 @@ public class PostgresPluginTest { }) .verifyComplete(); } + + @Test + public void testStaleConnectionCheck() { + DatasourceConfiguration dsConfig = createDatasourceConfiguration(); + + ActionConfiguration actionConfiguration = new ActionConfiguration(); + actionConfiguration.setBody("show databases"); + Mono connectionCreateMono = pluginExecutor.datasourceCreate(dsConfig); + + Mono resultMono = connectionCreateMono + .flatMap(pool -> { + pool.close(); + return pluginExecutor.execute(pool, dsConfig, actionConfiguration); + }); + + StepVerifier.create(resultMono) + .expectErrorMatches(throwable -> throwable instanceof StaleConnectionException) + .verify(); + } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/DatasourceContextServiceImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/DatasourceContextServiceImpl.java index 566d56bd53..dd22618a22 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/DatasourceContextServiceImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/DatasourceContextServiceImpl.java @@ -12,8 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import static com.appsmith.server.acl.AclPermission.EXECUTE_DATASOURCES; @@ -38,7 +38,7 @@ public class DatasourceContextServiceImpl implements DatasourceContextService { this.pluginService = pluginService; this.pluginExecutorHelper = pluginExecutorHelper; this.encryptionService = encryptionService; - this.datasourceContextMap = new HashMap<>(); + this.datasourceContextMap = new ConcurrentHashMap<>(); } @Override