fix: fix query failure on simultaneous execution of multiple queries (#15458)

* update driver
remove connection closure when not required

* add connection pool
remove ssl options

* got working with postgres driver

* use Redshift driver instead of postgres

* updated JUnit TC
added comments
minor refactor

* add comment
cleanup

* update default port
This commit is contained in:
Sumit Kumar 2022-07-28 17:01:17 +05:30 committed by GitHub
parent bae0b75583
commit 02f1451443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 405 additions and 262 deletions

View File

@ -1344,6 +1344,7 @@ public class PostgresPluginTest {
.verifyComplete();
}
@Test
public void testReadOnlyMode() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
dsConfig.getConnection().setMode(com.appsmith.external.models.Connection.Mode.READ_ONLY);

View File

@ -39,9 +39,21 @@
<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
<version>2.1.0.1</version>
<version>2.1.0.9</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.4.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- ******************* Test Dependencies ******************* -->
@ -51,7 +63,6 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -12,9 +12,10 @@ import com.appsmith.external.models.DatasourceStructure;
import com.appsmith.external.models.DatasourceTestResult;
import com.appsmith.external.models.Endpoint;
import com.appsmith.external.models.RequestParamDTO;
import com.appsmith.external.models.SSLDetails;
import com.appsmith.external.plugins.BasePlugin;
import com.appsmith.external.plugins.PluginExecutor;
import com.zaxxer.hikari.HikariDataSource;
import com.zaxxer.hikari.HikariPoolMXBean;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.ObjectUtils;
@ -27,7 +28,6 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@ -42,23 +42,18 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static com.appsmith.external.constants.ActionConstants.ACTION_CONFIGURATION_BODY;
import static com.appsmith.external.helpers.PluginUtils.getColumnsListForJdbcPlugin;
import static com.appsmith.external.helpers.PluginUtils.getIdenticalColumns;
import static com.appsmith.external.models.Connection.Mode.READ_ONLY;
import static com.external.utils.RedshiftDatasourceUtils.createConnectionPool;
import static com.external.utils.RedshiftDatasourceUtils.getConnectionFromConnectionPool;
@Slf4j
public class RedshiftPlugin extends BasePlugin {
static final String JDBC_DRIVER = "com.amazon.redshift.jdbc.Driver";
private static final String JDBC_PROTOCOL = "jdbc:redshift://";
private static final String USER = "user";
private static final String PASSWORD = "password";
private static final String SSL = "ssl";
private static final int VALIDITY_CHECK_TIMEOUT = 5; /* must be positive, otherwise may receive exception */
public static final String JDBC_DRIVER = "com.amazon.redshift.jdbc.Driver";
private static final String DATE_COLUMN_TYPE_NAME = "date";
public RedshiftPlugin(PluginWrapper wrapper) {
@ -66,7 +61,7 @@ public class RedshiftPlugin extends BasePlugin {
}
@Extension
public static class RedshiftPluginExecutor implements PluginExecutor<Connection> {
public static class RedshiftPluginExecutor implements PluginExecutor<HikariDataSource> {
private final Scheduler scheduler = Schedulers.elastic();
@ -197,19 +192,8 @@ public class RedshiftPlugin extends BasePlugin {
return row;
}
/*
* 1. This method can throw SQLException via connection.isClosed() or connection.isValid(...)
* 2. StaleConnectionException thrown by this method needs to be propagated to upper layers so that a retry
* can be triggered.
*/
private void checkConnectionValidity(Connection connection) throws SQLException {
if (connection == null || connection.isClosed()) {
throw new StaleConnectionException();
}
}
@Override
public Mono<ActionExecutionResult> execute(Connection connection,
public Mono<ActionExecutionResult> execute(HikariDataSource connectionPool,
DatasourceConfiguration datasourceConfiguration,
ActionConfiguration actionConfiguration) {
@ -227,15 +211,36 @@ public class RedshiftPlugin extends BasePlugin {
}
return Mono.fromCallable(() -> {
/*
* 1. If there is any issue with checking connection validity then assume that the connection is stale.
*/
Connection connection = null;
try {
checkConnectionValidity(connection);
} catch (SQLException e) {
connection = getConnectionFromConnectionPool(connectionPool);
} catch (SQLException | StaleConnectionException e) {
e.printStackTrace();
/**
* When the user configured time limit for the query execution is over, and the query is still
* queued in the connectionPool then InterruptedException is thrown as the execution thread is
* prepared for termination. This exception is wrapped inside SQLException and hence needs to be
* checked via getCause method. This exception does not indicate a Stale connection.
*/
if (e.getCause() != null && e.getCause().getClass().equals(InterruptedException.class)) {
return Mono.error(e);
}
// The function can throw either StaleConnectionException or SQLException. The underlying hikari
// library throws SQLException in case the pool is closed or there is an issue initializing
// the connection pool which can also be translated in our world to StaleConnectionException
// and should then trigger the destruction and recreation of the pool.
return Mono.error(new StaleConnectionException());
}
/**
* Keeping this print statement post call to getConnectionFromConnectionPool because it checks for
* stale connection pool.
*/
printConnectionPoolStatus(connectionPool, false);
List<Map<String, Object>> rowsList = new ArrayList<>(50);
final List<String> columnsList = new ArrayList<>();
Statement statement = null;
@ -327,6 +332,20 @@ public class RedshiftPlugin extends BasePlugin {
.subscribeOn(scheduler);
}
public void printConnectionPoolStatus(HikariDataSource connectionPool, boolean isFetchingStructure) {
HikariPoolMXBean poolProxy = connectionPool.getHikariPoolMXBean();
int idleConnections = poolProxy.getIdleConnections();
int activeConnections = poolProxy.getActiveConnections();
int totalConnections = poolProxy.getTotalConnections();
int threadsAwaitingConnection = poolProxy.getThreadsAwaitingConnection();
log.debug(Thread.currentThread().getName() + (isFetchingStructure ? "Before fetching Redshift db" +
" structure." : "Before executing Redshift query.") + " Hikari Pool stats : " +
" active - " + activeConnections +
", idle - " + idleConnections +
", awaiting - " + threadsAwaitingConnection +
", total - " + totalConnections);
}
private Set<String> populateHintMessages(List<String> columnNames) {
Set<String> messages = new HashSet<>();
@ -342,80 +361,26 @@ public class RedshiftPlugin extends BasePlugin {
}
@Override
public Mono<Connection> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
public Mono<HikariDataSource> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
try {
Class.forName(JDBC_DRIVER);
} catch (ClassNotFoundException e) {
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error loading Redshift JDBC Driver class."));
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Error loading " +
"Redshift JDBC Driver class."));
}
String url;
DBAuth authentication = (DBAuth) datasourceConfiguration.getAuthentication();
com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection();
final boolean isSslEnabled = configurationConnection != null
&& configurationConnection.getSsl() != null
&& !SSLDetails.AuthType.NO_SSL.equals(configurationConnection.getSsl().getAuthType());
Properties properties = new Properties();
properties.put(SSL, isSslEnabled);
if (authentication.getUsername() != null) {
properties.put(USER, authentication.getUsername());
}
if (authentication.getPassword() != null) {
properties.put(PASSWORD, authentication.getPassword());
}
if (CollectionUtils.isEmpty(datasourceConfiguration.getEndpoints())) {
url = datasourceConfiguration.getUrl();
} else {
StringBuilder urlBuilder = new StringBuilder(JDBC_PROTOCOL);
for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) {
urlBuilder
.append(endpoint.getHost())
.append(':')
.append(ObjectUtils.defaultIfNull(endpoint.getPort(), 5439L))
.append('/');
if (!StringUtils.isEmpty(authentication.getDatabaseName())) {
urlBuilder.append(authentication.getDatabaseName());
}
}
url = urlBuilder.toString();
}
return Mono.fromCallable(() -> {
try {
log.debug("Connecting to Redshift db");
Connection connection = DriverManager.getConnection(url, properties);
connection.setReadOnly(
configurationConnection != null && READ_ONLY.equals(configurationConnection.getMode()));
return Mono.just(connection);
} catch (SQLException e) {
e.printStackTrace();
return Mono.error(
new AppsmithPluginException(
AppsmithPluginError.PLUGIN_DATASOURCE_ARGUMENT_ERROR,
e.getMessage()
)
);
}
return Mono
.fromCallable(() -> {
log.debug(Thread.currentThread().getName() + ": Connecting to Redshift db");
return createConnectionPool(datasourceConfiguration);
})
.flatMap(obj -> obj)
.map(conn -> (Connection) conn)
.subscribeOn(scheduler);
}
@Override
public void datasourceDestroy(Connection connection) {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
log.error("Error closing Redshift Connection.", e);
public void datasourceDestroy(HikariDataSource connectionPool) {
if (connectionPool != null) {
connectionPool.close();
}
}
@ -465,13 +430,9 @@ public class RedshiftPlugin extends BasePlugin {
public Mono<DatasourceTestResult> testDatasource(DatasourceConfiguration datasourceConfiguration) {
return datasourceCreate(datasourceConfiguration)
.map(connection -> {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
log.warn("Error closing Redshift connection that was made for testing.", e);
}
return new DatasourceTestResult();
})
@ -619,23 +580,44 @@ public class RedshiftPlugin extends BasePlugin {
}
@Override
public Mono<DatasourceStructure> getStructure(Connection connection, DatasourceConfiguration datasourceConfiguration) {
/*
* 1. If there is any issue with checking connection validity then assume that the connection is stale.
*/
try {
checkConnectionValidity(connection);
} catch (SQLException e) {
return Mono.error(new StaleConnectionException());
}
public Mono<DatasourceStructure> getStructure(HikariDataSource connectionPool,
DatasourceConfiguration datasourceConfiguration) {
final DatasourceStructure structure = new DatasourceStructure();
final Map<String, DatasourceStructure.Table> tablesByName = new LinkedHashMap<>();
final Map<String, DatasourceStructure.Key> keyRegistry = new HashMap<>();
return Mono.fromSupplier(() -> {
Connection connection = null;
try {
connection = getConnectionFromConnectionPool(connectionPool);
} catch (SQLException | StaleConnectionException e) {
e.printStackTrace();
/**
* When the user configured time limit for the query execution is over, and the query is still
* queued in the connectionPool then InterruptedException is thrown as the execution thread is
* prepared for termination. This exception is wrapped inside SQLException and hence needs to be
* checked via getCause method. This exception does not indicate a Stale connection.
*/
if (e.getCause() != null && e.getCause().getClass().equals(InterruptedException.class)) {
return Mono.error(e);
}
// The function can throw either StaleConnectionException or SQLException. The underlying hikari
// library throws SQLException in case the pool is closed or there is an issue initializing
// the connection pool which can also be translated in our world to StaleConnectionException
// and should then trigger the destruction and recreation of the pool.
return Mono.error(new StaleConnectionException());
}
/**
* Keeping this print statement post call to getConnectionFromConnectionPool because it checks for
* stale connection pool.
*/
printConnectionPoolStatus(connectionPool, true);
// Ref: <https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/DatabaseMetaData.html>.
log.debug("Getting Redshift Db structure");
log.debug(Thread.currentThread().getName() + ": Getting Redshift Db structure");
try (Statement statement = connection.createStatement()) {
// Get tables' schema and fill up their columns.
@ -664,7 +646,8 @@ public class RedshiftPlugin extends BasePlugin {
e.printStackTrace();
return Mono.error(e);
} finally {
}
finally {
try {
connection.close();
} catch (SQLException e) {
@ -683,11 +666,11 @@ public class RedshiftPlugin extends BasePlugin {
})
.map(resultStructure -> (DatasourceStructure) resultStructure)
.onErrorMap(e -> {
if (!(e instanceof AppsmithPluginException)) {
return new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage());
if ((e instanceof AppsmithPluginException) || (e instanceof StaleConnectionException)) {
return e;
}
return e;
return new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e.getMessage());
})
.subscribeOn(scheduler);
}

View File

@ -0,0 +1,106 @@
package com.external.utils;
import com.appsmith.external.exceptions.pluginExceptions.AppsmithPluginError;
import com.appsmith.external.exceptions.pluginExceptions.AppsmithPluginException;
import com.appsmith.external.exceptions.pluginExceptions.StaleConnectionException;
import com.appsmith.external.models.DBAuth;
import com.appsmith.external.models.DatasourceConfiguration;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import com.zaxxer.hikari.pool.HikariPool;
import org.apache.commons.lang.ObjectUtils;
import org.springframework.util.StringUtils;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import static com.external.plugins.RedshiftPlugin.JDBC_DRIVER;
public class RedshiftDatasourceUtils {
private static final int MINIMUM_POOL_SIZE = 1;
private static final int MAXIMUM_POOL_SIZE = 5;
private static final long LEAK_DETECTION_TIME_MS = 60 * 1000;
private static final String JDBC_PROTOCOL = "jdbc:redshift://";
public static HikariDataSource createConnectionPool(DatasourceConfiguration datasourceConfiguration) throws AppsmithPluginException {
HikariConfig config = new HikariConfig();
config.setDriverClassName(JDBC_DRIVER);
config.setMinimumIdle(MINIMUM_POOL_SIZE);
config.setMaximumPoolSize(MAXIMUM_POOL_SIZE);
// Set authentication properties
DBAuth authentication = (DBAuth) datasourceConfiguration.getAuthentication();
if (authentication.getUsername() != null) {
config.setUsername(authentication.getUsername());
}
if (authentication.getPassword() != null) {
config.setPassword(authentication.getPassword());
}
// Set up the connection URL
StringBuilder urlBuilder = new StringBuilder(JDBC_PROTOCOL);
List<String> hosts = datasourceConfiguration
.getEndpoints()
.stream()
.map(endpoint -> endpoint.getHost() + ":" + ObjectUtils.defaultIfNull(endpoint.getPort(), 5439L))
.collect(Collectors.toList());
urlBuilder.append(String.join(",", hosts)).append("/");
if (!StringUtils.isEmpty(authentication.getDatabaseName())) {
urlBuilder.append(authentication.getDatabaseName());
}
String url = urlBuilder.toString();
config.setJdbcUrl(url);
// Configuring leak detection threshold for 60 seconds. Any connection which hasn't been released in 60 seconds
// should get tracked (may be falsely for long running queries) as leaked connection
config.setLeakDetectionThreshold(LEAK_DETECTION_TIME_MS);
config.setConnectionTimeout(60 * 1000);
// Set read only mode if applicable
com.appsmith.external.models.Connection configurationConnection = datasourceConfiguration.getConnection();
switch (configurationConnection.getMode()) {
case READ_WRITE: {
config.setReadOnly(false);
break;
}
case READ_ONLY: {
config.setReadOnly(true);
config.addDataSourceProperty("readOnlyMode", "always");
break;
}
}
// Now create the connection pool from the configuration
HikariDataSource datasource = null;
try {
datasource = new HikariDataSource(config);
} catch (HikariPool.PoolInitializationException e) {
throw new AppsmithPluginException(
AppsmithPluginError.PLUGIN_DATASOURCE_ARGUMENT_ERROR,
e.getMessage()
);
}
return datasource;
}
public static Connection getConnectionFromConnectionPool(HikariDataSource connectionPool) throws SQLException {
if (connectionPool == null || connectionPool.isClosed() || !connectionPool.isRunning()) {
System.out.println(Thread.currentThread().getName() +
": Encountered stale connection pool in Redshift plugin. Reporting back.");
throw new StaleConnectionException();
}
return connectionPool.getConnection();
}
}

View File

@ -76,6 +76,7 @@
{
"id": 3,
"sectionName": "SSL (optional)",
"hidden": true,
"children": [
{
"label": "SSL Mode",

View File

@ -9,11 +9,12 @@ import com.appsmith.external.models.DBAuth;
import com.appsmith.external.models.DatasourceConfiguration;
import com.appsmith.external.models.DatasourceStructure;
import com.appsmith.external.models.Endpoint;
import com.appsmith.external.models.Property;
import com.appsmith.external.models.RequestParamDTO;
import com.appsmith.external.plugins.PluginExecutor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -45,8 +46,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
@ -54,7 +58,7 @@ import static org.mockito.Mockito.when;
*/
@Slf4j
public class RedshiftPluginTest {
PluginExecutor pluginExecutor = new RedshiftPlugin.RedshiftPluginExecutor();
RedshiftPlugin.RedshiftPluginExecutor pluginExecutor = new RedshiftPlugin.RedshiftPluginExecutor();
private static String address;
private static Integer port;
@ -91,7 +95,14 @@ public class RedshiftPluginTest {
@Test
public void testDatasourceCreateConnectionFailure() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<Connection> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
DBAuth authentication = (DBAuth) dsConfig.getAuthentication();
authentication.setUsername("user");
authentication.setPassword("pass");
authentication.setDatabaseName("dbName");
dsConfig.setEndpoints(List.of(new Endpoint("host", 1234L)));
dsConfig.setConnection(new com.appsmith.external.models.Connection());
dsConfig.getConnection().setMode(com.appsmith.external.models.Connection.Mode.READ_ONLY);
Mono<HikariDataSource> dsConnectionMono = pluginExecutor.datasourceCreate(dsConfig);
StepVerifier.create(dsConnectionMono)
.expectErrorMatches(throwable ->
@ -99,7 +110,7 @@ public class RedshiftPluginTest {
throwable.getMessage().equals(
new AppsmithPluginException(
AppsmithPluginError.PLUGIN_DATASOURCE_ARGUMENT_ERROR,
"The connection attempt failed."
"Failed to initialize pool: The connection attempt failed."
)
.getMessage()
)
@ -117,9 +128,9 @@ public class RedshiftPluginTest {
* a. isClosed(): return true
* b. isValid() : return false
*/
Connection mockConnection = mock(Connection.class);
HikariDataSource mockConnection = mock(HikariDataSource.class);
when(mockConnection.isClosed()).thenReturn(true);
when(mockConnection.isValid(Mockito.anyInt())).thenReturn(false);
when(mockConnection.isRunning()).thenReturn(false);
Mono<ActionExecutionResult> resultMono = pluginExecutor.execute(mockConnection, dsConfig, actionConfiguration);
@ -184,13 +195,18 @@ public class RedshiftPluginTest {
*/
@Test
public void testExecute() throws SQLException {
/* Mock java.sql.Connection:
/* Mock Hikari connection pool object:
* a. isClosed()
* b. isValid()
* b. isRunning()
*/
HikariDataSource mockConnectionPool = mock(HikariDataSource.class);
when(mockConnectionPool.isClosed()).thenReturn(false);
when(mockConnectionPool.isRunning()).thenReturn(true);
Connection mockConnection = mock(Connection.class);
when(mockConnection.isClosed()).thenReturn(false);
when(mockConnection.isValid(Mockito.anyInt())).thenReturn(true);
when(mockConnectionPool.getConnection()).thenReturn(mockConnection);
/* Mock java.sql.Statement:
* a. execute(...)
@ -198,7 +214,7 @@ public class RedshiftPluginTest {
*/
Statement mockStatement = mock(Statement.class);
when(mockConnection.createStatement()).thenReturn(mockStatement);
when(mockStatement.execute(Mockito.any())).thenReturn(true);
when(mockStatement.execute(any())).thenReturn(true);
doNothing().when(mockStatement).close();
/* Mock java.sql.ResultSet:
@ -217,7 +233,7 @@ public class RedshiftPluginTest {
when(mockResultSet.getDate(Mockito.anyInt())).thenReturn(Date.valueOf("2018-12-31"), Date.valueOf("2018-11-30"));
when(mockResultSet.getString(Mockito.anyInt())).thenReturn("18:32:45", "12:05:06+00");
when(mockResultSet.getTime(Mockito.anyInt())).thenReturn(Time.valueOf("20:45:15"));
when(mockResultSet.getObject(Mockito.anyInt(), Mockito.any(Class.class))).thenReturn(OffsetDateTime.parse(
when(mockResultSet.getObject(Mockito.anyInt(), any(Class.class))).thenReturn(OffsetDateTime.parse(
"2018-11-30T19:45:15+00"));
when(mockResultSet.next()).thenReturn(true).thenReturn(false);
doNothing().when(mockResultSet).close();
@ -238,10 +254,13 @@ public class RedshiftPluginTest {
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT * FROM users WHERE id = 1");
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<Connection> dsConnectionMono = Mono.just(mockConnection);
Mono<HikariDataSource> dsConnectionMono = Mono.just(mockConnectionPool);
RedshiftPlugin.RedshiftPluginExecutor spyPluginExecutor = spy(new RedshiftPlugin.RedshiftPluginExecutor());
doNothing().when(spyPluginExecutor).printConnectionPoolStatus(mockConnectionPool, false);
Mono<ActionExecutionResult> executeMono = dsConnectionMono
.flatMap(conn -> pluginExecutor.execute(conn, dsConfig, actionConfiguration));
.flatMap(connPool -> spyPluginExecutor.execute(connPool, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(result -> {
@ -308,13 +327,24 @@ public class RedshiftPluginTest {
*/
@Test
public void testStructure() throws SQLException {
/* Mock Hikari connection pool object:
* a. isClosed()
* b. isRunning()
*/
HikariDataSource mockConnectionPool = mock(HikariDataSource.class);
when(mockConnectionPool.isClosed()).thenReturn(false);
when(mockConnectionPool.isRunning()).thenReturn(true);
/* Mock java.sql.Connection:
* a. isClosed()
* b. isValid()
* Also, return mock connection object from mock connection pool
*/
Connection mockConnection = mock(Connection.class);
when(mockConnection.isClosed()).thenReturn(false);
when(mockConnection.isValid(Mockito.anyInt())).thenReturn(true);
when(mockConnectionPool.getConnection()).thenReturn(mockConnection);
/* Mock java.sql.Statement:
* a. execute(...)
@ -322,7 +352,7 @@ public class RedshiftPluginTest {
*/
Statement mockStatement = mock(Statement.class);
when(mockConnection.createStatement()).thenReturn(mockStatement);
when(mockStatement.execute(Mockito.any())).thenReturn(true);
when(mockStatement.execute(any())).thenReturn(true);
doNothing().when(mockStatement).close();
/* Mock java.sql.ResultSet:
@ -367,10 +397,13 @@ public class RedshiftPluginTest {
when(mockResultSet.getString("foreign_column")).thenReturn("id"); // KEYS_QUERY_FOREIGN_KEY
doNothing().when(mockResultSet).close();
RedshiftPlugin.RedshiftPluginExecutor spyPluginExecutor = spy(new RedshiftPlugin.RedshiftPluginExecutor());
doNothing().when(spyPluginExecutor).printConnectionPoolStatus(mockConnectionPool, true);
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<Connection> dsConnectionMono = Mono.just(mockConnection);
Mono<HikariDataSource> dsConnectionMono = Mono.just(mockConnectionPool);
Mono<DatasourceStructure> structureMono = dsConnectionMono
.flatMap(connection -> pluginExecutor.getStructure(connection, dsConfig));
.flatMap(connectionPool -> spyPluginExecutor.getStructure(connectionPool, dsConfig));
StepVerifier.create(structureMono)
.assertNext(structure -> {
@ -469,6 +502,14 @@ public class RedshiftPluginTest {
@Test
public void testDuplicateColumnNames() throws SQLException {
/* Mock Hikari connection pool object:
* a. isClosed()
* b. isRunning()
*/
HikariDataSource mockConnectionPool = mock(HikariDataSource.class);
when(mockConnectionPool.isClosed()).thenReturn(false);
when(mockConnectionPool.isRunning()).thenReturn(true);
/* Mock java.sql.Connection:
* a. isClosed()
* b. isValid()
@ -476,14 +517,11 @@ public class RedshiftPluginTest {
Connection mockConnection = mock(Connection.class);
when(mockConnection.isClosed()).thenReturn(false);
when(mockConnection.isValid(Mockito.anyInt())).thenReturn(true);
when(mockConnectionPool.getConnection()).thenReturn(mockConnection);
/* Mock java.sql.Statement:
* a. execute(...)
* b. close()
*/
Statement mockStatement = mock(Statement.class);
when(mockConnection.createStatement()).thenReturn(mockStatement);
when(mockStatement.execute(Mockito.any())).thenReturn(true);
when(mockStatement.execute(any())).thenReturn(true);
doNothing().when(mockStatement).close();
/* Mock java.sql.ResultSet:
@ -512,10 +550,13 @@ public class RedshiftPluginTest {
ActionConfiguration actionConfiguration = new ActionConfiguration();
actionConfiguration.setBody("SELECT id, id, username, username FROM users WHERE id = 1");
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
Mono<Connection> dsConnectionMono = Mono.just(mockConnection);
Mono<HikariDataSource> dsConnectionMono = Mono.just(mockConnectionPool);
RedshiftPlugin.RedshiftPluginExecutor spyPluginExecutor = spy(new RedshiftPlugin.RedshiftPluginExecutor());
doNothing().when(spyPluginExecutor).printConnectionPoolStatus(mockConnectionPool, false);
Mono<ActionExecutionResult> executeMono = dsConnectionMono
.flatMap(conn -> pluginExecutor.execute(conn, dsConfig, actionConfiguration));
.flatMap(connPool -> spyPluginExecutor.execute(connPool, dsConfig, actionConfiguration));
StepVerifier.create(executeMono)
.assertNext(result -> {