Add support for reactive streams with mongodb. (#1720)

Add support for reactive streams with mongodb. (fixes 1480)

1. Replace mongodb driver with reactive mongodb driver. Change APIs accordingly.
2. Use webflux + reactor framework to execute mongodb queries in event loop model.
3. Add test to test MongoPluginExecutor class' method "testDatasource".
This commit is contained in:
Sumit Kumar 2020-11-18 10:58:50 +05:30 committed by GitHub
parent fc3197b78f
commit 2c2aa06e32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 319 additions and 297 deletions

View File

@ -44,11 +44,6 @@
<version>1.18.8</version> <version>1.18.8</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.11.1</version>
</dependency>
<!-- Test Dependencies --> <!-- Test Dependencies -->
<dependency> <dependency>

View File

@ -14,12 +14,16 @@ import com.appsmith.external.pluginExceptions.AppsmithPluginException;
import com.appsmith.external.pluginExceptions.StaleConnectionException; import com.appsmith.external.pluginExceptions.StaleConnectionException;
import com.appsmith.external.plugins.BasePlugin; import com.appsmith.external.plugins.BasePlugin;
import com.appsmith.external.plugins.PluginExecutor; import com.appsmith.external.plugins.PluginExecutor;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoCommandException; import com.mongodb.MongoCommandException;
import com.mongodb.MongoTimeoutException; import com.mongodb.MongoTimeoutException;
import com.mongodb.client.ClientSession; import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoDatabase; import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.Success;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.bson.Document; import org.bson.Document;
import org.bson.conversions.Bson; import org.bson.conversions.Bson;
@ -32,6 +36,7 @@ import org.pf4j.PluginWrapper;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
@ -96,17 +101,15 @@ public class MongoPlugin extends BasePlugin {
throw new StaleConnectionException(); throw new StaleConnectionException();
} }
MongoDatabase database = mongoClient.getDatabase(getDatabaseName(datasourceConfiguration));
Bson command = Document.parse(actionConfiguration.getBody());
Mono<Document> mongoOutputMono = Mono.from(database.runCommand(command));
ActionExecutionResult result = new ActionExecutionResult(); ActionExecutionResult result = new ActionExecutionResult();
MongoDatabase database = mongoClient.getDatabase(getDatabaseName(datasourceConfiguration)); return mongoOutputMono
.flatMap(mongoOutput -> {
Bson command = Document.parse(actionConfiguration.getBody());
try { try {
Document mongoOutput = database.runCommand(command);
JSONObject outputJson = new JSONObject(mongoOutput.toJson()); JSONObject outputJson = new JSONObject(mongoOutput.toJson());
//The output json contains the key "ok". This is the status of the command //The output json contains the key "ok". This is the status of the command
BigInteger status = outputJson.getBigInteger("ok"); BigInteger status = outputJson.getBigInteger("ok");
JSONArray headerArray = new JSONArray(); JSONArray headerArray = new JSONArray();
@ -114,33 +117,42 @@ public class MongoPlugin extends BasePlugin {
if (BigInteger.ONE.equals(status)) { if (BigInteger.ONE.equals(status)) {
result.setIsExecutionSuccess(true); result.setIsExecutionSuccess(true);
// For the `findAndModify` command, we don't get the count of modifications made. Instead, we either /**
// get the modified new value or the pre-modified old value (depending on the `new` field in the * For the `findAndModify` command, we don't get the count of modifications made. Instead,
// command. Let's return that value to the user. * we either get the modified new value or the pre-modified old value (depending on the
* `new` field in the command. Let's return that value to the user.
*/
if (outputJson.has(VALUE_STR)) { if (outputJson.has(VALUE_STR)) {
result.setBody(objectMapper.readTree( result.setBody(objectMapper.readTree(
cleanUp(new JSONObject().put(VALUE_STR, outputJson.get(VALUE_STR))).toString() cleanUp(new JSONObject().put(VALUE_STR, outputJson.get(VALUE_STR))).toString()
)); ));
} }
//The json contains key "cursor" when find command was issued and there are 1 or more results. In case /**
//there are no results for find, this key is not present in the result json. * The json contains key "cursor" when find command was issued and there are 1 or more
* results. In case there are no results for find, this key is not present in the result json.
*/
if (outputJson.has("cursor")) { if (outputJson.has("cursor")) {
JSONArray outputResult = (JSONArray) cleanUp( JSONArray outputResult = (JSONArray) cleanUp(
outputJson.getJSONObject("cursor").getJSONArray("firstBatch")); outputJson.getJSONObject("cursor").getJSONArray("firstBatch"));
result.setBody(objectMapper.readTree(outputResult.toString())); result.setBody(objectMapper.readTree(outputResult.toString()));
} }
//The json contains key "n" when insert/update command is issued. "n" for update signifies the no of /**
//documents selected for update. "n" in case of insert signifies the number of documents inserted. * The json contains key "n" when insert/update command is issued. "n" for update
* signifies the no of documents selected for update. "n" in case of insert signifies the
* number of documents inserted.
*/
if (outputJson.has("n")) { if (outputJson.has("n")) {
JSONObject body = new JSONObject().put("n", outputJson.getBigInteger("n")); JSONObject body = new JSONObject().put("n", outputJson.getBigInteger("n"));
result.setBody(body); result.setBody(body);
headerArray.put(body); headerArray.put(body);
} }
//The json key contains key "nModified" in case of update command. This signifies the no of /**
//documents updated. * The json key contains key "nModified" in case of update command. This signifies the no of
* documents updated.
*/
if (outputJson.has(N_MODIFIED)) { if (outputJson.has(N_MODIFIED)) {
JSONObject body = new JSONObject().put(N_MODIFIED, outputJson.getBigInteger(N_MODIFIED)); JSONObject body = new JSONObject().put(N_MODIFIED, outputJson.getBigInteger(N_MODIFIED));
result.setBody(body); result.setBody(body);
@ -161,6 +173,7 @@ public class MongoPlugin extends BasePlugin {
} }
return Mono.just(result); return Mono.just(result);
});
} }
private String getDatabaseName(DatasourceConfiguration datasourceConfiguration) { private String getDatabaseName(DatasourceConfiguration datasourceConfiguration) {
@ -178,18 +191,20 @@ public class MongoPlugin extends BasePlugin {
@Override @Override
public Mono<MongoClient> datasourceCreate(DatasourceConfiguration datasourceConfiguration) { public Mono<MongoClient> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
// TODO: ReadOnly seems to be not supported at the driver level. The recommendation is to connect with a /**
// user that doesn't have write permissions on the database. * TODO: ReadOnly seems to be not supported at the driver level. The recommendation is to connect with a
// Ref: https://api.mongodb.com/java/2.13/com/mongodb/DB.html#setReadOnly-java.lang.Boolean- * user that doesn't have write permissions on the database.
* Ref: https://api.mongodb.com/java/2.13/com/mongodb/DB.html#setReadOnly-java.lang.Boolean-
*/
try { try {
return Mono.just(new MongoClient(buildClientURI(datasourceConfiguration))); return Mono.just(MongoClients.create(buildClientURI(datasourceConfiguration)));
} catch (Exception e) { } catch (Exception e) {
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e)); return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, e));
} }
} }
public static MongoClientURI buildClientURI(DatasourceConfiguration datasourceConfiguration) { public static String buildClientURI(DatasourceConfiguration datasourceConfiguration) {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
final Connection connection = datasourceConfiguration.getConnection(); final Connection connection = datasourceConfiguration.getConnection();
@ -252,9 +267,7 @@ public class MongoPlugin extends BasePlugin {
builder.deleteCharAt(builder.length() - 1); builder.deleteCharAt(builder.length() - 1);
} }
final String uri = builder.toString(); return builder.toString();
log.info("MongoPlugin URI: `{}`.", uri);
return new MongoClientURI(uri);
} }
@Override @Override
@ -313,56 +326,22 @@ public class MongoPlugin extends BasePlugin {
@Override @Override
public Mono<DatasourceTestResult> testDatasource(DatasourceConfiguration datasourceConfiguration) { public Mono<DatasourceTestResult> testDatasource(DatasourceConfiguration datasourceConfiguration) {
final Connection.Type connectionType = datasourceConfiguration.getConnection().getType();
return datasourceCreate(datasourceConfiguration) return datasourceCreate(datasourceConfiguration)
.map(mongoClient -> { .flatMap(mongoClient -> {
ClientSession clientSession = null; return Mono.zip(Mono.just(mongoClient),
Mono.from(mongoClient.getDatabase("admin").runCommand(new Document(
"listDatabases", 1))));
})
.doOnSuccess(tuple -> {
MongoClient mongoClient = tuple.getT1();
try { if(mongoClient != null) {
// Not using try-with-resources here since we want to close the *session* before closing the
// MongoClient instance.
if (Connection.Type.REPLICA_SET.equals(connectionType)) {
// For REPLICA_SET connections, we check by creating a session, as this is faster.
clientSession = mongoClient.startSession();
} else {
// For DIRECT connections, we check by running a DB command, as it's the only reliable
// method of checking if the connection is usable.
mongoClient
.getDatabase("admin")
.runCommand(new Document("listDatabases", 1));
return new DatasourceTestResult();
}
} catch (MongoTimeoutException e) {
log.warn("Timeout connecting to MongoDB from MongoPlugin.", e);
return new DatasourceTestResult("Timed out trying to connect to MongoDB host.");
} catch (MongoCommandException e) {
// The fact that we got a response saying "Unauthorized" means that the connection to the
// MongoDB instance is valid. It also means we don't have access to the admin database, but
// that's okay for our purposes here.
return "Unauthorized".equals(e.getErrorCodeName())
? new DatasourceTestResult()
: new DatasourceTestResult(e.getMessage());
} catch (Exception e) {
return new DatasourceTestResult(e.getMessage());
} finally {
if (clientSession != null) {
clientSession.close();
}
if (mongoClient != null) {
mongoClient.close(); mongoClient.close();
} }
}
return new DatasourceTestResult();
}) })
.onErrorResume(error -> Mono.just(new DatasourceTestResult(error.getMessage()))); .then(Mono.just(new DatasourceTestResult()))
.onErrorResume(error -> {
return Mono.just(new DatasourceTestResult(error.getMessage()));});
} }
@Override @Override
@ -370,12 +349,10 @@ public class MongoPlugin extends BasePlugin {
final DatasourceStructure structure = new DatasourceStructure(); final DatasourceStructure structure = new DatasourceStructure();
List<DatasourceStructure.Table> tables = new ArrayList<>(); List<DatasourceStructure.Table> tables = new ArrayList<>();
structure.setTables(tables); structure.setTables(tables);
final MongoDatabase database = mongoClient.getDatabase(getDatabaseName(datasourceConfiguration)); final MongoDatabase database = mongoClient.getDatabase(getDatabaseName(datasourceConfiguration));
for (Document collection : database.listCollections()) { return Flux.from(database.listCollectionNames()).
final String collectionName = collection.getString("name"); flatMap(collectionName -> {
final ArrayList<DatasourceStructure.Column> columns = new ArrayList<>(); final ArrayList<DatasourceStructure.Column> columns = new ArrayList<>();
final ArrayList<DatasourceStructure.Template> templates = new ArrayList<>(); final ArrayList<DatasourceStructure.Template> templates = new ArrayList<>();
tables.add(new DatasourceStructure.Table( tables.add(new DatasourceStructure.Table(
@ -386,16 +363,38 @@ public class MongoPlugin extends BasePlugin {
templates templates
)); ));
final Document first = database.getCollection(collectionName).find().limit(1).first(); return Mono.zip(
if (first == null) { Mono.just(columns),
continue; Mono.just(templates),
Mono.just(collectionName),
Mono.from(database.getCollection(collectionName).find().limit(1).first())
);
}).
flatMap(tuple -> {
final ArrayList<DatasourceStructure.Column> columns = tuple.getT1();
final ArrayList<DatasourceStructure.Template> templates = tuple.getT2();
String collectionName = tuple.getT3();
Document document = tuple.getT4();
generateTemplatesAndStructureForACollection(collectionName, document, columns, templates);
return Mono.just(structure);
}).
collectList().
flatMap(documentList -> {
return Mono.just(structure);
});
} }
private static void generateTemplatesAndStructureForACollection(String collectionName,
Document document,
ArrayList<DatasourceStructure.Column> columns,
ArrayList<DatasourceStructure.Template> templates) {
String filterFieldName = null; String filterFieldName = null;
String filterFieldValue = null; String filterFieldValue = null;
Map<String, String> sampleInsertValues = new LinkedHashMap<>(); Map<String, String> sampleInsertValues = new LinkedHashMap<>();
for (Map.Entry<String, Object> entry : first.entrySet()) { for (Map.Entry<String, Object> entry : document.entrySet()) {
final String name = entry.getKey(); final String name = entry.getKey();
final Object value = entry.getValue(); final Object value = entry.getValue();
String type; String type;
@ -525,11 +524,6 @@ public class MongoPlugin extends BasePlugin {
) )
); );
} }
tables.sort(Comparator.comparing(DatasourceStructure.Table::getName));
return Mono.just(structure);
}
} }
private static String urlEncode(String text) { private static String urlEncode(String text) {

View File

@ -9,14 +9,24 @@ import com.appsmith.external.models.Endpoint;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.Success;
import org.reactivestreams.Publisher;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
import org.bson.Document; import org.bson.Document;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.GenericContainer;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier; import reactor.test.StepVerifier;
import java.math.BigDecimal; import java.math.BigDecimal;
@ -25,14 +35,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/** /**
* Unit tests for MongoPlugin * Unit tests for MongoPlugin
*/ */
public class MongoPluginTest { public class MongoPluginTest {
MongoPlugin.MongoPluginExecutor pluginExecutor = new MongoPlugin.MongoPluginExecutor(); MongoPlugin.MongoPluginExecutor pluginExecutor = new MongoPlugin.MongoPluginExecutor();
@ -51,11 +59,15 @@ public class MongoPluginTest {
public static void setUp() { public static void setUp() {
address = mongoContainer.getContainerIpAddress(); address = mongoContainer.getContainerIpAddress();
port = mongoContainer.getFirstMappedPort(); port = mongoContainer.getFirstMappedPort();
String uri = "mongodb://" + address + ":" + Integer.toString(port);
final MongoClient mongoClient = MongoClients.create(uri);
final MongoClient mongoClient = new MongoClient(address, port); Flux.from(mongoClient.getDatabase("test").listCollectionNames()).collectList().
if (!mongoClient.getDatabase("test").listCollectionNames().iterator().hasNext()) { flatMap(collectionNamesList -> {
final MongoCollection<Document> usersCollection = mongoClient.getDatabase("test").getCollection("users"); final MongoCollection<Document> usersCollection = mongoClient.getDatabase("test").getCollection(
usersCollection.insertMany(List.of( "users");
if(collectionNamesList.size() == 0) {
Mono.from(usersCollection.insertMany(List.of(
new Document(Map.of( new Document(Map.of(
"name", "Cierra Vega", "name", "Cierra Vega",
"gender", "F", "gender", "F",
@ -66,8 +78,11 @@ public class MongoPluginTest {
)), )),
new Document(Map.of("name", "Alden Cantrell", "gender", "M", "age", 30)), new Document(Map.of("name", "Alden Cantrell", "gender", "M", "age", 30)),
new Document(Map.of("name", "Kierra Gentry", "gender", "F", "age", 40)) new Document(Map.of("name", "Kierra Gentry", "gender", "F", "age", 40))
)); ))).block();
} }
return Mono.just(usersCollection);
}).block();
} }
private DatasourceConfiguration createDatasourceConfiguration() { private DatasourceConfiguration createDatasourceConfiguration() {
@ -103,6 +118,25 @@ public class MongoPluginTest {
.verifyComplete(); .verifyComplete();
} }
/**
* 1. Test "testDatasource" method in MongoPluginExecutor class.
*/
@Test
public void testDatasourceFail() {
System.out.println(mongoContainer.getContainerIpAddress());
System.out.println(mongoContainer.getFirstMappedPort());
DatasourceConfiguration dsConfig = createDatasourceConfiguration();
dsConfig.getEndpoints().get(0).setHost("badHost");
System.out.println(dsConfig);
StepVerifier.create(pluginExecutor.testDatasource(dsConfig))
.assertNext(datasourceTestResult -> {
assertNotNull(datasourceTestResult);
assertFalse(datasourceTestResult.isSuccess());
})
.verifyComplete();
}
@Test @Test
public void testExecuteReadQuery() { public void testExecuteReadQuery() {
DatasourceConfiguration dsConfig = createDatasourceConfiguration(); DatasourceConfiguration dsConfig = createDatasourceConfiguration();
@ -311,5 +345,4 @@ public class MongoPluginTest {
}) })
.verifyComplete(); .verifyComplete();
} }
} }