Adding hikariCP for postgres (#2130)
This commit is contained in:
parent
85d958c2c0
commit
74342f8598
|
|
@ -52,6 +52,18 @@
|
|||
<scope>runtime</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.zaxxer</groupId>
|
||||
<artifactId>HikariCP</artifactId>
|
||||
<version>3.4.5</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- Test Dependencies -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ import com.appsmith.external.pluginExceptions.AppsmithPluginException;
|
|||
import com.appsmith.external.pluginExceptions.StaleConnectionException;
|
||||
import com.appsmith.external.plugins.BasePlugin;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import lombok.NonNull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.apache.commons.lang.ObjectUtils;
|
||||
import org.pf4j.Extension;
|
||||
import org.pf4j.PluginWrapper;
|
||||
|
|
@ -25,7 +25,6 @@ import reactor.core.scheduler.Scheduler;
|
|||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
|
|
@ -39,21 +38,15 @@ import java.util.HashSet;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static com.appsmith.external.models.Connection.Mode.READ_ONLY;
|
||||
|
||||
public class PostgresPlugin extends BasePlugin {
|
||||
|
||||
static final String JDBC_DRIVER = "org.postgresql.Driver";
|
||||
|
||||
private static final String USER = "user";
|
||||
private static final String PASSWORD = "password";
|
||||
private static final String SSL = "ssl";
|
||||
private static final int VALIDITY_CHECK_TIMEOUT = 5;
|
||||
private static final String SSL = "useSSL";
|
||||
|
||||
private static final String DATE_COLUMN_TYPE_NAME = "date";
|
||||
|
||||
|
|
@ -61,13 +54,8 @@ public class PostgresPlugin extends BasePlugin {
|
|||
super(wrapper);
|
||||
}
|
||||
|
||||
/**
|
||||
* Postgres plugin receives the query as json of the following format :
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Extension
|
||||
public static class PostgresPluginExecutor implements PluginExecutor<Connection> {
|
||||
public static class PostgresPluginExecutor implements PluginExecutor<HikariDataSource> {
|
||||
|
||||
private final Scheduler scheduler = Schedulers.elastic();
|
||||
|
||||
|
|
@ -114,25 +102,23 @@ public class PostgresPlugin extends BasePlugin {
|
|||
"order by self_schema, self_table;";
|
||||
|
||||
@Override
|
||||
public Mono<ActionExecutionResult> execute(Connection connection,
|
||||
public Mono<ActionExecutionResult> execute(HikariDataSource connection,
|
||||
DatasourceConfiguration datasourceConfiguration,
|
||||
ActionConfiguration actionConfiguration) {
|
||||
|
||||
return (Mono<ActionExecutionResult>) Mono.fromCallable(() -> {
|
||||
return Mono.fromCallable(() -> {
|
||||
|
||||
Connection connectionFromPool;
|
||||
|
||||
try {
|
||||
if (connection == null || connection.isClosed() || !connection.isValid(VALIDITY_CHECK_TIMEOUT)) {
|
||||
System.out.println("Encountered stale connection in Postgres plugin. Reporting back.");
|
||||
throw new StaleConnectionException();
|
||||
}
|
||||
} catch (SQLException error) {
|
||||
// This exception is thrown only when the timeout to `isValid` is negative. Since, that's not the case,
|
||||
// here, this should never happen.
|
||||
System.out.println("Error checking validity of Postgres connection." + error);
|
||||
connectionFromPool = getConnectionFromConnectionPool(connection);
|
||||
} catch (StaleConnectionException e) {
|
||||
return Mono.error(e);
|
||||
} catch (SQLException e) {
|
||||
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage()));
|
||||
}
|
||||
|
||||
String query = actionConfiguration.getBody();
|
||||
|
||||
if (query == null) {
|
||||
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Missing required parameter: Query."));
|
||||
}
|
||||
|
|
@ -142,7 +128,7 @@ public class PostgresPlugin extends BasePlugin {
|
|||
Statement statement = null;
|
||||
ResultSet resultSet = null;
|
||||
try {
|
||||
statement = connection.createStatement();
|
||||
statement = connectionFromPool.createStatement();
|
||||
boolean isResultSet = statement.execute(query);
|
||||
|
||||
if (isResultSet) {
|
||||
|
|
@ -209,7 +195,8 @@ public class PostgresPlugin extends BasePlugin {
|
|||
try {
|
||||
resultSet.close();
|
||||
} catch (SQLException e) {
|
||||
System.out.println("Error closing Postgres ResultSet" + e.getMessage());
|
||||
System.out.println(Thread.currentThread().getName() +
|
||||
": Execute Error closing Postgres ResultSet" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -217,7 +204,18 @@ public class PostgresPlugin extends BasePlugin {
|
|||
try {
|
||||
statement.close();
|
||||
} catch (SQLException e) {
|
||||
System.out.println("Error closing Postgres Statement" + e.getMessage());
|
||||
System.out.println(Thread.currentThread().getName() +
|
||||
": Execute Error closing Postgres Statement" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (connectionFromPool != null) {
|
||||
try {
|
||||
// Return the connetion back to the pool
|
||||
connectionFromPool.close();
|
||||
} catch (SQLException e) {
|
||||
System.out.println(Thread.currentThread().getName() +
|
||||
": Execute Error returning Postgres connection to pool" + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -230,87 +228,39 @@ public class PostgresPlugin extends BasePlugin {
|
|||
return Mono.just(result);
|
||||
})
|
||||
.flatMap(obj -> obj)
|
||||
.map(obj -> {
|
||||
ActionExecutionResult result = (ActionExecutionResult) obj;
|
||||
return result;
|
||||
})
|
||||
.subscribeOn(scheduler);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Connection> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
|
||||
public Mono<HikariDataSource> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
|
||||
try {
|
||||
Class.forName(JDBC_DRIVER);
|
||||
} catch (ClassNotFoundException e) {
|
||||
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error loading Postgres JDBC Driver class."));
|
||||
}
|
||||
|
||||
String url;
|
||||
AuthenticationDTO authentication = datasourceConfiguration.getAuthentication();
|
||||
|
||||
com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection();
|
||||
|
||||
final boolean isSslEnabled = configurationConnection != null
|
||||
&& configurationConnection.getSsl() != null
|
||||
&& !SSLDetails.AuthType.NO_SSL.equals(configurationConnection.getSsl().getAuthType());
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put(SSL, isSslEnabled);
|
||||
if (authentication.getUsername() != null) {
|
||||
properties.put(USER, authentication.getUsername());
|
||||
}
|
||||
if (authentication.getPassword() != null) {
|
||||
properties.put(PASSWORD, authentication.getPassword());
|
||||
}
|
||||
|
||||
if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) {
|
||||
url = datasourceConfiguration.getUrl();
|
||||
|
||||
} else {
|
||||
StringBuilder urlBuilder = new StringBuilder("jdbc:postgresql://");
|
||||
for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) {
|
||||
urlBuilder
|
||||
.append(endpoint.getHost())
|
||||
.append(':')
|
||||
.append(ObjectUtils.defaultIfNull(endpoint.getPort(), 5432L))
|
||||
.append('/');
|
||||
|
||||
if (!StringUtils.isEmpty(authentication.getDatabaseName())) {
|
||||
urlBuilder.append(authentication.getDatabaseName());
|
||||
}
|
||||
}
|
||||
url = urlBuilder.toString();
|
||||
|
||||
}
|
||||
|
||||
return Mono.fromCallable(() -> {
|
||||
try {
|
||||
System.out.println(Thread.currentThread().getName() + ": Connecting to Postgres db");
|
||||
Connection connection = DriverManager.getConnection(url, properties);
|
||||
connection.setReadOnly(
|
||||
configurationConnection != null && READ_ONLY.equals(configurationConnection.getMode()));
|
||||
return Mono.just(connection);
|
||||
|
||||
} catch (SQLException e) {
|
||||
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error connecting to Postgres.", e));
|
||||
|
||||
}
|
||||
})
|
||||
.flatMap(obj -> obj)
|
||||
.map(conn -> (Connection) conn)
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
System.out.println(Thread.currentThread().getName() + ": Connecting to Postgres db");
|
||||
return createConnectionPool(datasourceConfiguration);
|
||||
})
|
||||
.subscribeOn(scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void datasourceDestroy(Connection connection) {
|
||||
try {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
System.out.println("Error closing Postgres Connection." + e.getMessage());
|
||||
public void datasourceDestroy(HikariDataSource connection) {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> validateDatasource(@NonNull DatasourceConfiguration datasourceConfiguration) {
|
||||
public Set<String> validateDatasource(DatasourceConfiguration datasourceConfiguration) {
|
||||
Set<String> invalids = new HashSet<>();
|
||||
|
||||
if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) {
|
||||
|
|
@ -355,12 +305,8 @@ public class PostgresPlugin extends BasePlugin {
|
|||
public Mono<DatasourceTestResult> testDatasource(DatasourceConfiguration datasourceConfiguration) {
|
||||
return datasourceCreate(datasourceConfiguration)
|
||||
.map(connection -> {
|
||||
try {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
System.out.println(Thread.currentThread().getName() + ": Error closing Postgres connection that was made for testing." + e.getMessage());
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
return new DatasourceTestResult();
|
||||
|
|
@ -369,27 +315,24 @@ public class PostgresPlugin extends BasePlugin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Mono<DatasourceStructure> getStructure(Connection connection, DatasourceConfiguration datasourceConfiguration) {
|
||||
|
||||
try {
|
||||
if (connection == null || connection.isClosed() || !connection.isValid(VALIDITY_CHECK_TIMEOUT)) {
|
||||
System.out.println(Thread.currentThread().getName() + ": Encountered stale connection in Postgres plugin. Reporting back.");
|
||||
throw new StaleConnectionException();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
// This exception is thrown only when the timeout to `isValid` is negative. Since, that's not the case,
|
||||
// here, this should never happen.
|
||||
System.out.println(Thread.currentThread().getName() + ": Error checking validity of Postgres connection." + e.getMessage());
|
||||
}
|
||||
public Mono<DatasourceStructure> getStructure(HikariDataSource connection, DatasourceConfiguration datasourceConfiguration) {
|
||||
|
||||
final DatasourceStructure structure = new DatasourceStructure();
|
||||
final Map<String, DatasourceStructure.Table> tablesByName = new LinkedHashMap<>();
|
||||
|
||||
return Mono.fromSupplier(() -> {
|
||||
|
||||
// Ref: <https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/DatabaseMetaData.html>.
|
||||
Connection connectionFromPool;
|
||||
try {
|
||||
connectionFromPool = getConnectionFromConnectionPool(connection);
|
||||
} catch (StaleConnectionException e) {
|
||||
return Mono.error(e);
|
||||
} catch (SQLException e) {
|
||||
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage()));
|
||||
}
|
||||
|
||||
try (Statement statement = connection.createStatement()) {
|
||||
// Ref: <https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/DatabaseMetaData.html>.
|
||||
try (Statement statement = connectionFromPool.createStatement()) {
|
||||
|
||||
// Get tables and fill up their columns.
|
||||
try (ResultSet columnsResultSet = statement.executeQuery(TABLES_QUERY)) {
|
||||
|
|
@ -514,6 +457,17 @@ public class PostgresPlugin extends BasePlugin {
|
|||
|
||||
} catch (SQLException throwable) {
|
||||
return Mono.error(throwable);
|
||||
} finally {
|
||||
|
||||
if (connectionFromPool != null) {
|
||||
try {
|
||||
// Return the connection back to the pool
|
||||
connectionFromPool.close();
|
||||
} catch (SQLException e) {
|
||||
System.out.println(Thread.currentThread().getName() +
|
||||
": Error returning Postgres connection to pool during get structure" + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
structure.setTables(new ArrayList<>(tablesByName.values()));
|
||||
|
|
@ -528,4 +482,78 @@ public class PostgresPlugin extends BasePlugin {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is blocking in nature which connects to the database and creates a connection pool
|
||||
* @param datasourceConfiguration
|
||||
* @return connection pool
|
||||
*/
|
||||
private static HikariDataSource createConnectionPool(DatasourceConfiguration datasourceConfiguration)
|
||||
{
|
||||
HikariConfig config = new HikariConfig();
|
||||
|
||||
config.setDriverClassName(JDBC_DRIVER);
|
||||
|
||||
// Set SSL property
|
||||
com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection();
|
||||
|
||||
final boolean isSslEnabled = configurationConnection != null
|
||||
&& configurationConnection.getSsl() != null
|
||||
&& !SSLDetails.AuthType.NO_SSL.equals(configurationConnection.getSsl().getAuthType());
|
||||
|
||||
config.addDataSourceProperty(SSL, isSslEnabled);
|
||||
|
||||
// Set authentication properties
|
||||
AuthenticationDTO authentication = datasourceConfiguration.getAuthentication();
|
||||
if (authentication.getUsername() != null) {
|
||||
config.setUsername(authentication.getUsername());
|
||||
}
|
||||
if (authentication.getPassword() != null) {
|
||||
config.setPassword(authentication.getPassword());
|
||||
}
|
||||
|
||||
// Set up the connection URL
|
||||
String url;
|
||||
if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) {
|
||||
url = datasourceConfiguration.getUrl();
|
||||
|
||||
} else {
|
||||
StringBuilder urlBuilder = new StringBuilder("jdbc:postgresql://");
|
||||
for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) {
|
||||
urlBuilder
|
||||
.append(endpoint.getHost())
|
||||
.append(':')
|
||||
.append(ObjectUtils.defaultIfNull(endpoint.getPort(), 5432L))
|
||||
.append('/');
|
||||
|
||||
if (!StringUtils.isEmpty(authentication.getDatabaseName())) {
|
||||
urlBuilder.append(authentication.getDatabaseName());
|
||||
}
|
||||
}
|
||||
url = urlBuilder.toString();
|
||||
}
|
||||
config.setJdbcUrl(url);
|
||||
|
||||
// Now create the connection pool from the configuration
|
||||
HikariDataSource datasource = new HikariDataSource(config);
|
||||
|
||||
return datasource;
|
||||
}
|
||||
|
||||
/**
|
||||
* First checks if the connection pool is still valid. If yes, we fetch a connection from the pool and return
|
||||
* In case a connection is not available in the pool, SQL Exception is thrown
|
||||
* @param connectionPool
|
||||
* @return SQL Connection
|
||||
*/
|
||||
private static Connection getConnectionFromConnectionPool(HikariDataSource connectionPool) throws SQLException {
|
||||
|
||||
if (connectionPool == null || connectionPool.isClosed() || !connectionPool.isRunning()) {
|
||||
System.out.println(Thread.currentThread().getName() +
|
||||
": Encountered stale connection pool in Postgres plugin. Reporting back.");
|
||||
throw new StaleConnectionException();
|
||||
}
|
||||
|
||||
return connectionPool.getConnection();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,10 +6,11 @@ import com.appsmith.external.models.AuthenticationDTO;
|
|||
import com.appsmith.external.models.DatasourceConfiguration;
|
||||
import com.appsmith.external.models.DatasourceStructure;
|
||||
import com.appsmith.external.models.Endpoint;
|
||||
import com.appsmith.external.pluginExceptions.StaleConnectionException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
|
|
@ -37,7 +38,6 @@ import static org.junit.Assert.assertTrue;
|
|||
/**
|
||||
* Unit tests for the PostgresPlugin
|
||||
*/
|
||||
@Slf4j
|
||||
public class PostgresPluginTest {
|
||||
|
||||
PostgresPlugin.PostgresPluginExecutor pluginExecutor = new PostgresPlugin.PostgresPluginExecutor();
|
||||
|
|
@ -159,7 +159,7 @@ public class PostgresPluginTest {
|
|||
|
||||
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
|
||||
|
||||
Mono<Connection> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
Mono<HikariDataSource> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
|
||||
StepVerifier.create(dsConnectionMono)
|
||||
.assertNext(Assert::assertNotNull)
|
||||
|
|
@ -200,7 +200,7 @@ public class PostgresPluginTest {
|
|||
@Test
|
||||
public void testAliasColumnNames() {
|
||||
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
|
||||
Mono<Connection> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
Mono<HikariDataSource> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
|
||||
ActionConfiguration actionConfiguration = new ActionConfiguration();
|
||||
actionConfiguration.setBody("SELECT id as user_id FROM users WHERE id = 1");
|
||||
|
|
@ -227,7 +227,7 @@ public class PostgresPluginTest {
|
|||
@Test
|
||||
public void testExecute() {
|
||||
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
|
||||
Mono<Connection> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
Mono<HikariDataSource> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
|
||||
ActionConfiguration actionConfiguration = new ActionConfiguration();
|
||||
actionConfiguration.setBody("SELECT * FROM users WHERE id = 1");
|
||||
|
|
@ -388,4 +388,23 @@ public class PostgresPluginTest {
|
|||
})
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStaleConnectionCheck() {
|
||||
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
|
||||
|
||||
ActionConfiguration actionConfiguration = new ActionConfiguration();
|
||||
actionConfiguration.setBody("show databases");
|
||||
Mono<HikariDataSource> connectionCreateMono = pluginExecutor.datasourceCreate(dsConfig);
|
||||
|
||||
Mono<ActionExecutionResult> resultMono = connectionCreateMono
|
||||
.flatMap(pool -> {
|
||||
pool.close();
|
||||
return pluginExecutor.execute(pool, dsConfig, actionConfiguration);
|
||||
});
|
||||
|
||||
StepVerifier.create(resultMono)
|
||||
.expectErrorMatches(throwable -> throwable instanceof StaleConnectionException)
|
||||
.verify();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,8 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static com.appsmith.server.acl.AclPermission.EXECUTE_DATASOURCES;
|
||||
|
|
@ -38,7 +38,7 @@ public class DatasourceContextServiceImpl implements DatasourceContextService {
|
|||
this.pluginService = pluginService;
|
||||
this.pluginExecutorHelper = pluginExecutorHelper;
|
||||
this.encryptionService = encryptionService;
|
||||
this.datasourceContextMap = new HashMap<>();
|
||||
this.datasourceContextMap = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user