Multi data source support for plugins using Resource Context

This commit is contained in:
Trisha Anand 2019-10-11 10:32:18 +00:00
parent 9d01cecbd5
commit e1d59ca3a6
7 changed files with 258 additions and 74 deletions

View File

@ -11,5 +11,27 @@ import java.util.List;
public interface PluginExecutor extends ExtensionPoint {
Mono<ActionExecutionResult> execute(ResourceConfiguration resourceConfiguration, ActionConfiguration action, List<Param> params);
/**
* This function is used to execute the action.
* @param connection : This is the connection that is established to the data source. This connection is according
* to the parameters in Resource Configuration
* @param resourceConfiguration : These are the configurations which have been used to create a Resource from a Plugin
* @param actionConfiguration : These are the configurations which have been used to create an Action from a Resource.
* @return ActionExecutionResult : This object is returned to the user which contains the result values from the execution.
*/
Mono<ActionExecutionResult> execute(Object connection, ResourceConfiguration resourceConfiguration, ActionConfiguration actionConfiguration);
/**
* This function is responsible for creating the connection to the data source and returning the connection variable
* on success. For executing actions, this connection object would be passed for each function call.
* @param resourceConfiguration
* @return Connection object
*/
Object resourceCreate(ResourceConfiguration resourceConfiguration);
/**
* This function is used to bring down/destroy the connection to the data source.
* @param connection
*/
void resourceDestroy(Object connection);
}

View File

@ -6,7 +6,10 @@ import com.appsmith.external.models.Param;
import com.appsmith.external.models.ResourceConfiguration;
import com.appsmith.external.plugins.BasePlugin;
import com.appsmith.external.plugins.PluginExecutor;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONObject;
import org.pf4j.Extension;
import org.pf4j.PluginException;
import org.pf4j.PluginWrapper;
@ -25,48 +28,14 @@ import java.util.List;
@Slf4j
public class PostgresPlugin extends BasePlugin {
private static ObjectMapper objectMapper;
static String JDBC_DRIVER = "org.postgresql.Driver";
static String DB_URL = "jdbc:postgresql://localhost/mobtools";
// Database credentials
static String DB_USER = "root";
static String DB_PASS = "root";
static Connection conn = null;
public PostgresPlugin(PluginWrapper wrapper) {
super(wrapper);
}
@Override
public void start() throws PluginException {
log.debug("Going to initialize the PostgresDBPlugin");
try {
// Load the class into JVM
Class.forName(JDBC_DRIVER);
log.debug("Got the jdbc url as {}", DB_URL);
// Create the connection
conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS);
} catch (ClassNotFoundException e) {
log.error("", e);
} catch (SQLException e) {
log.error("", e);
}
}
@Override
public void stop() throws PluginException {
log.debug("PostgresPlugin.stop()");
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
log.error("", e);
throw new PluginException(e);
}
this.objectMapper = new ObjectMapper();
}
@Slf4j
@ -74,12 +43,11 @@ public class PostgresPlugin extends BasePlugin {
public static class PostgresPluginExecutor implements PluginExecutor {
@Override
public Mono<ActionExecutionResult> execute(ResourceConfiguration resourceConfiguration,
ActionConfiguration actionConfiguration,
List<Param> params) {
public Mono<ActionExecutionResult> execute(Object connection,
ResourceConfiguration resourceConfiguration,
ActionConfiguration actionConfiguration) {
log.debug("In the PostgresPlugin execute with resourceConfiguration: {}, ActionConfig: {}",
resourceConfiguration, actionConfiguration);
Connection conn = (Connection) connection;
Assert.notNull(conn);
ArrayList list = new ArrayList(50);
@ -99,13 +67,49 @@ public class PostgresPlugin extends BasePlugin {
} catch (SQLException e) {
log.error("", e);
}
//Return list because list is the actual result. ActionExecutionResult is just a stop gap measure
list.forEach(System.out::println);
ActionExecutionResult result = new ActionExecutionResult();
result.setBody(objectMapper.valueToTree(list));
return Mono.just(result);
}
@Override
public Object resourceCreate(ResourceConfiguration resourceConfiguration) {
Connection conn = null;
try {
// Load the class into JVM
Class.forName(JDBC_DRIVER);
// Create the connection
conn = DriverManager.getConnection(resourceConfiguration.getUrl(),
resourceConfiguration.getAuthentication().getUsername(),
resourceConfiguration.getAuthentication().getPassword());
return conn;
} catch (ClassNotFoundException e) {
log.error("", e);
} catch (SQLException e) {
log.error("", e);
}
return conn;
}
@Override
public void resourceDestroy(Object connection) {
Connection conn = (Connection) connection;
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
log.error("", e);
try {
throw new PluginException(e);
} catch (PluginException ex) {
ex.printStackTrace();
}
}
}
}
}

View File

@ -39,19 +39,17 @@ public class RestApiPlugin extends BasePlugin {
public static class RestApiPluginExecutor implements PluginExecutor {
@Override
public Mono<ActionExecutionResult> execute(ResourceConfiguration resourceConfiguration,
ActionConfiguration actionConfiguration,
List<Param> params) {
public Mono<ActionExecutionResult> execute(Object connection,
ResourceConfiguration resourceConfiguration,
ActionConfiguration actionConfiguration) {
Map<String, Object> requestBody = actionConfiguration.getBody();
if (requestBody == null) {
requestBody = (Map<String, Object>) new HashMap<String, Object>();
}
Map<String, Param> propertyMap = params.stream()
.collect(Collectors.toMap(Param::getKey, param -> param));
String path = (actionConfiguration.getPath() == null) ? "" : actionConfiguration.getPath();
String url = resourceConfiguration.getUrl() + path;
HttpMethod httpMethod = actionConfiguration.getHttpMethod();
if (httpMethod == null) {
return Mono.error(new Exception("HttpMethod must not be null"));
@ -62,6 +60,7 @@ public class RestApiPlugin extends BasePlugin {
if (resourceConfiguration.getHeaders() != null) {
List<Property> headers = resourceConfiguration.getHeaders();
for (Property header : headers) {
webClientBuilder.defaultHeader(header.getKey(), header.getValue());
}
}
@ -90,9 +89,13 @@ public class RestApiPlugin extends BasePlugin {
ActionExecutionResult result = new ActionExecutionResult();
result.setStatusCode(statusCode.toString());
try {
result.setBody(objectMapper.readTree(body));
String headerInJsonString = objectMapper.writeValueAsString(headers);
result.setHeaders(objectMapper.readTree(headerInJsonString));
if (body!=null) {
result.setBody(objectMapper.readTree(body));
}
if (headers != null) {
String headerInJsonString = objectMapper.writeValueAsString(headers);
result.setHeaders(objectMapper.readTree(headerInJsonString));
}
} catch (IOException e) {
e.printStackTrace();
}
@ -100,5 +103,15 @@ public class RestApiPlugin extends BasePlugin {
return result;
});
}
@Override
public Object resourceCreate(ResourceConfiguration resourceConfiguration) {
return null;
}
@Override
public void resourceDestroy(Object connection) {
}
}
}

View File

@ -0,0 +1,14 @@
package com.appsmith.server.domains;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
@Getter
@Setter
@ToString
@NoArgsConstructor
public class ResourceContext {
Object connection;
}

View File

@ -10,6 +10,7 @@ import com.appsmith.server.domains.Page;
import com.appsmith.server.domains.PageAction;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.domains.Resource;
import com.appsmith.server.domains.ResourceContext;
import com.appsmith.server.domains.User;
import com.appsmith.server.dtos.ExecuteActionDTO;
import com.appsmith.server.exceptions.AppsmithError;
@ -49,6 +50,7 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
private final PageService pageService;
private final PluginManager pluginManager;
private final ObjectMapper objectMapper;
private final ResourceContextService resourceContextService;
@Autowired
public ActionServiceImpl(Scheduler scheduler,
@ -62,7 +64,7 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
PluginManager pluginManager,
Analytics analytics,
SessionUserService sessionUserService,
ObjectMapper objectMapper) {
ObjectMapper objectMapper, ResourceContextService resourceContextService) {
super(scheduler, validator, mongoConverter, reactiveMongoTemplate, repository, analytics, sessionUserService);
this.repository = repository;
this.resourceService = resourceService;
@ -70,6 +72,7 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
this.pageService = pageService;
this.pluginManager = pluginManager;
this.objectMapper = objectMapper;
this.resourceContextService = resourceContextService;
}
@Override
@ -125,7 +128,6 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
@Override
public Mono<ActionExecutionResult> executeAction(ExecuteActionDTO executeActionDTO) {
String actionId = executeActionDTO.getActionId();
log.debug("Going to execute action with id: {}", actionId);
// 1. Fetch the query from the DB to get the type
Mono<Action> actionMono = repository.findById(actionId)
@ -147,28 +149,36 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
}
);
// 3. Execute the query
return actionMono
.flatMap(action -> resourceMono.zipWith(pluginExecutorMono, (resource, pluginExecutor) -> {
log.debug("Variable substitutions before invoking the plugin");
ResourceConfiguration resourceConfiguration;
ActionConfiguration actionConfiguration;
//Do variable substitution before invoking the plugin
Map<String, String> replaceParamsMap = executeActionDTO
.getParams()
.stream()
.collect(Collectors.toMap(Param::getKey, Param::getValue,
// Incase there's a conflict, we pick the older value
(oldValue, newValue) -> oldValue)
);
ResourceConfiguration resourceConfiguration = (ResourceConfiguration) variableSubstitution(resource.getResourceConfiguration(), replaceParamsMap);
ActionConfiguration actionConfiguration = (ActionConfiguration) variableSubstitution(action.getActionConfiguration(), replaceParamsMap);
//Do this only if params have been provided in the execute command
if (executeActionDTO.getParams() != null && !executeActionDTO.getParams().isEmpty()) {
Map<String, String> replaceParamsMap = executeActionDTO
.getParams()
.stream()
.collect(Collectors.toMap(Param::getKey, Param::getValue,
// Incase there's a conflict, we pick the older value
(oldValue, newValue) -> oldValue)
);
resourceConfiguration = (ResourceConfiguration) variableSubstitution(resource.getResourceConfiguration(), replaceParamsMap);
actionConfiguration = (ActionConfiguration) variableSubstitution(action.getActionConfiguration(), replaceParamsMap);
} else {
resourceConfiguration = resource.getResourceConfiguration();
actionConfiguration = action.getActionConfiguration();
}
log.debug("About to invoke the plugin");
long start = System.currentTimeMillis();
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.execute(resourceConfiguration, actionConfiguration, executeActionDTO.getParams());
long end = System.currentTimeMillis();
log.debug("Time taken by plugin executor is : {} ms",(end-start));
return actionExecutionResultMono;
return resourceContextService
.getResourceContext(resource.getId())
//Now that we have the context (connection details, execute the action
.flatMap(resourceContext -> pluginExecutor.execute(
resourceContext.getConnection(),
resourceConfiguration,
actionConfiguration));
}))
.flatMap(obj -> obj);
}

View File

@ -0,0 +1,18 @@
package com.appsmith.server.services;
import com.appsmith.server.domains.ResourceContext;
import reactor.core.publisher.Mono;
public interface ResourceContextService {
/**
* This function is responsible for returning the resource context stored
* against the resource id. In case the resourceId is not found in the
* map, create a new resource context and return that.
* @param resourceId
* @return ResourceContext
*/
Mono<ResourceContext> getResourceContext(String resourceId);
Mono<ResourceContext> deleteResourceContext(String resourceId);
}

View File

@ -0,0 +1,103 @@
package com.appsmith.server.services;
import com.appsmith.external.plugins.PluginExecutor;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.domains.Resource;
import com.appsmith.server.domains.ResourceContext;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class ResourceContextServiceImpl implements ResourceContextService {
//This is ResourceID mapped to the ResourceContext
Map<String, ResourceContext> resourceContextHashMap;
private final ResourceService resourceService;
private final PluginManager pluginManager;
private final PluginService pluginService;
@Autowired
public ResourceContextServiceImpl(ResourceService resourceService, PluginManager pluginManager, PluginService pluginService) {
this.resourceService = resourceService;
this.pluginManager = pluginManager;
this.pluginService = pluginService;
this.resourceContextHashMap = new HashMap<>();
}
@Override
public Mono<ResourceContext> getResourceContext(String resourceId) {
if (resourceContextHashMap.get(resourceId) != null) {
log.debug("resource context exists. Returning the same.");
return Mono.just(resourceContextHashMap.get(resourceId));
}
log.debug("Resource context doesnt exist. Creating connection");
Mono<Resource> resourceMono = resourceService
.findById(resourceId);
Mono<Plugin> pluginMono = resourceMono
.flatMap(resource -> pluginService.findById(resource.getPluginId()));
//Resource Context has not been created for this resource on this machine. Create one now.
Mono<PluginExecutor> pluginExecutorMono = pluginMono.flatMap(plugin -> {
List<PluginExecutor> executorList = pluginManager.getExtensions(PluginExecutor.class, plugin.getExecutorClass());
if (executorList.isEmpty()) {
return Mono.error(new AppsmithException(AppsmithError.NO_RESOURCE_FOUND, "plugin", plugin.getExecutorClass()));
}
return Mono.just(executorList.get(0));
}
);
return Mono.zip(resourceMono, pluginExecutorMono, ((resource, pluginExecutor) -> {
log.debug("calling plugin create connection.");
Object connection = pluginExecutor.resourceCreate(resource.getResourceConfiguration());
ResourceContext resourceContext = new ResourceContext();
resourceContext.setConnection(connection);
resourceContextHashMap.put(resourceId, resourceContext);
return resourceContext;
}));
}
@Override
public Mono<ResourceContext> deleteResourceContext(String resourceId) {
ResourceContext resourceContext = resourceContextHashMap.get(resourceId);
if (resourceContext == null) {
//No resource context exists for this resource. Return void;
return Mono.empty();
}
Mono<Resource> resourceMono = resourceService
.findById(resourceId);
Mono<Plugin> pluginMono = resourceMono
.flatMap(resource -> pluginService.findById(resource.getPluginId()));
//Resource Context has not been created for this resource on this machine. Create one now.
Mono<PluginExecutor> pluginExecutorMono = pluginMono.flatMap(plugin -> {
List<PluginExecutor> executorList = pluginManager.getExtensions(PluginExecutor.class, plugin.getExecutorClass());
if (executorList.isEmpty()) {
return Mono.error(new AppsmithException(AppsmithError.NO_RESOURCE_FOUND, "plugin", plugin.getExecutorClass()));
}
return Mono.just(executorList.get(0));
}
);
return Mono.zip(resourceMono, pluginExecutorMono, ((resource, pluginExecutor) -> {
pluginExecutor.resourceDestroy(resourceContext.getConnection());
resourceContextHashMap.remove(resourceId);
return resourceContext;
}));
}
}