Merge branch 'feature/postgres' into 'master'
Multi data source support for plugins using Resource Context See merge request theappsmith/internal-tools-server!38
This commit is contained in:
commit
0d4d9dd7c2
|
|
@ -11,5 +11,27 @@ import java.util.List;
|
|||
|
||||
public interface PluginExecutor extends ExtensionPoint {
|
||||
|
||||
Mono<ActionExecutionResult> execute(ResourceConfiguration resourceConfiguration, ActionConfiguration action, List<Param> params);
|
||||
/**
|
||||
* This function is used to execute the action.
|
||||
* @param connection : This is the connection that is established to the data source. This connection is according
|
||||
* to the parameters in Resource Configuration
|
||||
* @param resourceConfiguration : These are the configurations which have been used to create a Resource from a Plugin
|
||||
* @param actionConfiguration : These are the configurations which have been used to create an Action from a Resource.
|
||||
* @return ActionExecutionResult : This object is returned to the user which contains the result values from the execution.
|
||||
*/
|
||||
Mono<ActionExecutionResult> execute(Object connection, ResourceConfiguration resourceConfiguration, ActionConfiguration actionConfiguration);
|
||||
|
||||
/**
|
||||
* This function is responsible for creating the connection to the data source and returning the connection variable
|
||||
* on success. For executing actions, this connection object would be passed for each function call.
|
||||
* @param resourceConfiguration
|
||||
* @return Connection object
|
||||
*/
|
||||
Object resourceCreate(ResourceConfiguration resourceConfiguration);
|
||||
|
||||
/**
|
||||
* This function is used to bring down/destroy the connection to the data source.
|
||||
* @param connection
|
||||
*/
|
||||
void resourceDestroy(Object connection);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,10 @@ import com.appsmith.external.models.Param;
|
|||
import com.appsmith.external.models.ResourceConfiguration;
|
||||
import com.appsmith.external.plugins.BasePlugin;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.json.JSONObject;
|
||||
import org.pf4j.Extension;
|
||||
import org.pf4j.PluginException;
|
||||
import org.pf4j.PluginWrapper;
|
||||
|
|
@ -25,48 +28,14 @@ import java.util.List;
|
|||
|
||||
@Slf4j
|
||||
public class PostgresPlugin extends BasePlugin {
|
||||
|
||||
private static ObjectMapper objectMapper;
|
||||
|
||||
static String JDBC_DRIVER = "org.postgresql.Driver";
|
||||
|
||||
static String DB_URL = "jdbc:postgresql://localhost/mobtools";
|
||||
|
||||
// Database credentials
|
||||
static String DB_USER = "root";
|
||||
|
||||
static String DB_PASS = "root";
|
||||
|
||||
static Connection conn = null;
|
||||
|
||||
public PostgresPlugin(PluginWrapper wrapper) {
|
||||
super(wrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws PluginException {
|
||||
log.debug("Going to initialize the PostgresDBPlugin");
|
||||
try {
|
||||
// Load the class into JVM
|
||||
Class.forName(JDBC_DRIVER);
|
||||
log.debug("Got the jdbc url as {}", DB_URL);
|
||||
// Create the connection
|
||||
conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASS);
|
||||
} catch (ClassNotFoundException e) {
|
||||
log.error("", e);
|
||||
} catch (SQLException e) {
|
||||
log.error("", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws PluginException {
|
||||
log.debug("PostgresPlugin.stop()");
|
||||
try {
|
||||
if (conn != null) {
|
||||
conn.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
log.error("", e);
|
||||
throw new PluginException(e);
|
||||
}
|
||||
this.objectMapper = new ObjectMapper();
|
||||
}
|
||||
|
||||
@Slf4j
|
||||
|
|
@ -74,12 +43,11 @@ public class PostgresPlugin extends BasePlugin {
|
|||
public static class PostgresPluginExecutor implements PluginExecutor {
|
||||
|
||||
@Override
|
||||
public Mono<ActionExecutionResult> execute(ResourceConfiguration resourceConfiguration,
|
||||
ActionConfiguration actionConfiguration,
|
||||
List<Param> params) {
|
||||
public Mono<ActionExecutionResult> execute(Object connection,
|
||||
ResourceConfiguration resourceConfiguration,
|
||||
ActionConfiguration actionConfiguration) {
|
||||
|
||||
log.debug("In the PostgresPlugin execute with resourceConfiguration: {}, ActionConfig: {}",
|
||||
resourceConfiguration, actionConfiguration);
|
||||
Connection conn = (Connection) connection;
|
||||
Assert.notNull(conn);
|
||||
|
||||
ArrayList list = new ArrayList(50);
|
||||
|
|
@ -99,13 +67,49 @@ public class PostgresPlugin extends BasePlugin {
|
|||
} catch (SQLException e) {
|
||||
log.error("", e);
|
||||
}
|
||||
//Return list because list is the actual result. ActionExecutionResult is just a stop gap measure
|
||||
list.forEach(System.out::println);
|
||||
|
||||
ActionExecutionResult result = new ActionExecutionResult();
|
||||
result.setBody(objectMapper.valueToTree(list));
|
||||
return Mono.just(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object resourceCreate(ResourceConfiguration resourceConfiguration) {
|
||||
Connection conn = null;
|
||||
try {
|
||||
// Load the class into JVM
|
||||
Class.forName(JDBC_DRIVER);
|
||||
|
||||
// Create the connection
|
||||
conn = DriverManager.getConnection(resourceConfiguration.getUrl(),
|
||||
resourceConfiguration.getAuthentication().getUsername(),
|
||||
resourceConfiguration.getAuthentication().getPassword());
|
||||
return conn;
|
||||
} catch (ClassNotFoundException e) {
|
||||
log.error("", e);
|
||||
} catch (SQLException e) {
|
||||
log.error("", e);
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceDestroy(Object connection) {
|
||||
Connection conn = (Connection) connection;
|
||||
try {
|
||||
if (conn != null) {
|
||||
conn.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
log.error("", e);
|
||||
try {
|
||||
throw new PluginException(e);
|
||||
} catch (PluginException ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,19 +39,17 @@ public class RestApiPlugin extends BasePlugin {
|
|||
public static class RestApiPluginExecutor implements PluginExecutor {
|
||||
|
||||
@Override
|
||||
public Mono<ActionExecutionResult> execute(ResourceConfiguration resourceConfiguration,
|
||||
ActionConfiguration actionConfiguration,
|
||||
List<Param> params) {
|
||||
public Mono<ActionExecutionResult> execute(Object connection,
|
||||
ResourceConfiguration resourceConfiguration,
|
||||
ActionConfiguration actionConfiguration) {
|
||||
Map<String, Object> requestBody = actionConfiguration.getBody();
|
||||
if (requestBody == null) {
|
||||
requestBody = (Map<String, Object>) new HashMap<String, Object>();
|
||||
}
|
||||
|
||||
Map<String, Param> propertyMap = params.stream()
|
||||
.collect(Collectors.toMap(Param::getKey, param -> param));
|
||||
|
||||
String path = (actionConfiguration.getPath() == null) ? "" : actionConfiguration.getPath();
|
||||
String url = resourceConfiguration.getUrl() + path;
|
||||
|
||||
HttpMethod httpMethod = actionConfiguration.getHttpMethod();
|
||||
if (httpMethod == null) {
|
||||
return Mono.error(new Exception("HttpMethod must not be null"));
|
||||
|
|
@ -62,6 +60,7 @@ public class RestApiPlugin extends BasePlugin {
|
|||
if (resourceConfiguration.getHeaders() != null) {
|
||||
List<Property> headers = resourceConfiguration.getHeaders();
|
||||
for (Property header : headers) {
|
||||
|
||||
webClientBuilder.defaultHeader(header.getKey(), header.getValue());
|
||||
}
|
||||
}
|
||||
|
|
@ -90,9 +89,13 @@ public class RestApiPlugin extends BasePlugin {
|
|||
ActionExecutionResult result = new ActionExecutionResult();
|
||||
result.setStatusCode(statusCode.toString());
|
||||
try {
|
||||
result.setBody(objectMapper.readTree(body));
|
||||
String headerInJsonString = objectMapper.writeValueAsString(headers);
|
||||
result.setHeaders(objectMapper.readTree(headerInJsonString));
|
||||
if (body!=null) {
|
||||
result.setBody(objectMapper.readTree(body));
|
||||
}
|
||||
if (headers != null) {
|
||||
String headerInJsonString = objectMapper.writeValueAsString(headers);
|
||||
result.setHeaders(objectMapper.readTree(headerInJsonString));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
@ -100,5 +103,15 @@ public class RestApiPlugin extends BasePlugin {
|
|||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object resourceCreate(ResourceConfiguration resourceConfiguration) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resourceDestroy(Object connection) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
package com.appsmith.server.domains;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import lombok.ToString;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
public class ResourceContext {
|
||||
Object connection;
|
||||
}
|
||||
|
|
@ -10,6 +10,7 @@ import com.appsmith.server.domains.Page;
|
|||
import com.appsmith.server.domains.PageAction;
|
||||
import com.appsmith.server.domains.Plugin;
|
||||
import com.appsmith.server.domains.Resource;
|
||||
import com.appsmith.server.domains.ResourceContext;
|
||||
import com.appsmith.server.domains.User;
|
||||
import com.appsmith.server.dtos.ExecuteActionDTO;
|
||||
import com.appsmith.server.exceptions.AppsmithError;
|
||||
|
|
@ -49,6 +50,7 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
|
|||
private final PageService pageService;
|
||||
private final PluginManager pluginManager;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ResourceContextService resourceContextService;
|
||||
|
||||
@Autowired
|
||||
public ActionServiceImpl(Scheduler scheduler,
|
||||
|
|
@ -62,7 +64,7 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
|
|||
PluginManager pluginManager,
|
||||
Analytics analytics,
|
||||
SessionUserService sessionUserService,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper, ResourceContextService resourceContextService) {
|
||||
super(scheduler, validator, mongoConverter, reactiveMongoTemplate, repository, analytics, sessionUserService);
|
||||
this.repository = repository;
|
||||
this.resourceService = resourceService;
|
||||
|
|
@ -70,6 +72,7 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
|
|||
this.pageService = pageService;
|
||||
this.pluginManager = pluginManager;
|
||||
this.objectMapper = objectMapper;
|
||||
this.resourceContextService = resourceContextService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -125,7 +128,6 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
|
|||
@Override
|
||||
public Mono<ActionExecutionResult> executeAction(ExecuteActionDTO executeActionDTO) {
|
||||
String actionId = executeActionDTO.getActionId();
|
||||
log.debug("Going to execute action with id: {}", actionId);
|
||||
|
||||
// 1. Fetch the query from the DB to get the type
|
||||
Mono<Action> actionMono = repository.findById(actionId)
|
||||
|
|
@ -147,28 +149,36 @@ public class ActionServiceImpl extends BaseService<ActionRepository, Action, Str
|
|||
}
|
||||
);
|
||||
|
||||
|
||||
// 3. Execute the query
|
||||
return actionMono
|
||||
.flatMap(action -> resourceMono.zipWith(pluginExecutorMono, (resource, pluginExecutor) -> {
|
||||
log.debug("Variable substitutions before invoking the plugin");
|
||||
|
||||
ResourceConfiguration resourceConfiguration;
|
||||
ActionConfiguration actionConfiguration;
|
||||
//Do variable substitution before invoking the plugin
|
||||
Map<String, String> replaceParamsMap = executeActionDTO
|
||||
.getParams()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Param::getKey, Param::getValue,
|
||||
// Incase there's a conflict, we pick the older value
|
||||
(oldValue, newValue) -> oldValue)
|
||||
);
|
||||
ResourceConfiguration resourceConfiguration = (ResourceConfiguration) variableSubstitution(resource.getResourceConfiguration(), replaceParamsMap);
|
||||
ActionConfiguration actionConfiguration = (ActionConfiguration) variableSubstitution(action.getActionConfiguration(), replaceParamsMap);
|
||||
//Do this only if params have been provided in the execute command
|
||||
if (executeActionDTO.getParams() != null && !executeActionDTO.getParams().isEmpty()) {
|
||||
Map<String, String> replaceParamsMap = executeActionDTO
|
||||
.getParams()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Param::getKey, Param::getValue,
|
||||
// Incase there's a conflict, we pick the older value
|
||||
(oldValue, newValue) -> oldValue)
|
||||
);
|
||||
resourceConfiguration = (ResourceConfiguration) variableSubstitution(resource.getResourceConfiguration(), replaceParamsMap);
|
||||
actionConfiguration = (ActionConfiguration) variableSubstitution(action.getActionConfiguration(), replaceParamsMap);
|
||||
} else {
|
||||
resourceConfiguration = resource.getResourceConfiguration();
|
||||
actionConfiguration = action.getActionConfiguration();
|
||||
}
|
||||
|
||||
log.debug("About to invoke the plugin");
|
||||
long start = System.currentTimeMillis();
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = pluginExecutor.execute(resourceConfiguration, actionConfiguration, executeActionDTO.getParams());
|
||||
long end = System.currentTimeMillis();
|
||||
log.debug("Time taken by plugin executor is : {} ms",(end-start));
|
||||
return actionExecutionResultMono;
|
||||
return resourceContextService
|
||||
.getResourceContext(resource.getId())
|
||||
//Now that we have the context (connection details, execute the action
|
||||
.flatMap(resourceContext -> pluginExecutor.execute(
|
||||
resourceContext.getConnection(),
|
||||
resourceConfiguration,
|
||||
actionConfiguration));
|
||||
}))
|
||||
.flatMap(obj -> obj);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
package com.appsmith.server.services;
|
||||
|
||||
import com.appsmith.server.domains.ResourceContext;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface ResourceContextService {
|
||||
|
||||
/**
|
||||
* This function is responsible for returning the resource context stored
|
||||
* against the resource id. In case the resourceId is not found in the
|
||||
* map, create a new resource context and return that.
|
||||
* @param resourceId
|
||||
* @return ResourceContext
|
||||
*/
|
||||
Mono<ResourceContext> getResourceContext(String resourceId);
|
||||
|
||||
Mono<ResourceContext> deleteResourceContext(String resourceId);
|
||||
}
|
||||
|
|
@ -0,0 +1,103 @@
|
|||
package com.appsmith.server.services;
|
||||
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import com.appsmith.server.domains.Plugin;
|
||||
import com.appsmith.server.domains.Resource;
|
||||
import com.appsmith.server.domains.ResourceContext;
|
||||
import com.appsmith.server.exceptions.AppsmithError;
|
||||
import com.appsmith.server.exceptions.AppsmithException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.pf4j.PluginManager;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ResourceContextServiceImpl implements ResourceContextService {
|
||||
|
||||
//This is ResourceID mapped to the ResourceContext
|
||||
Map<String, ResourceContext> resourceContextHashMap;
|
||||
private final ResourceService resourceService;
|
||||
private final PluginManager pluginManager;
|
||||
private final PluginService pluginService;
|
||||
|
||||
@Autowired
|
||||
public ResourceContextServiceImpl(ResourceService resourceService, PluginManager pluginManager, PluginService pluginService) {
|
||||
this.resourceService = resourceService;
|
||||
this.pluginManager = pluginManager;
|
||||
this.pluginService = pluginService;
|
||||
this.resourceContextHashMap = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResourceContext> getResourceContext(String resourceId) {
|
||||
if (resourceContextHashMap.get(resourceId) != null) {
|
||||
log.debug("resource context exists. Returning the same.");
|
||||
return Mono.just(resourceContextHashMap.get(resourceId));
|
||||
}
|
||||
log.debug("Resource context doesnt exist. Creating connection");
|
||||
|
||||
Mono<Resource> resourceMono = resourceService
|
||||
.findById(resourceId);
|
||||
|
||||
Mono<Plugin> pluginMono = resourceMono
|
||||
.flatMap(resource -> pluginService.findById(resource.getPluginId()));
|
||||
|
||||
//Resource Context has not been created for this resource on this machine. Create one now.
|
||||
Mono<PluginExecutor> pluginExecutorMono = pluginMono.flatMap(plugin -> {
|
||||
List<PluginExecutor> executorList = pluginManager.getExtensions(PluginExecutor.class, plugin.getExecutorClass());
|
||||
if (executorList.isEmpty()) {
|
||||
return Mono.error(new AppsmithException(AppsmithError.NO_RESOURCE_FOUND, "plugin", plugin.getExecutorClass()));
|
||||
}
|
||||
return Mono.just(executorList.get(0));
|
||||
}
|
||||
);
|
||||
|
||||
return Mono.zip(resourceMono, pluginExecutorMono, ((resource, pluginExecutor) -> {
|
||||
log.debug("calling plugin create connection.");
|
||||
Object connection = pluginExecutor.resourceCreate(resource.getResourceConfiguration());
|
||||
ResourceContext resourceContext = new ResourceContext();
|
||||
resourceContext.setConnection(connection);
|
||||
|
||||
resourceContextHashMap.put(resourceId, resourceContext);
|
||||
return resourceContext;
|
||||
}));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResourceContext> deleteResourceContext(String resourceId) {
|
||||
|
||||
ResourceContext resourceContext = resourceContextHashMap.get(resourceId);
|
||||
if (resourceContext == null) {
|
||||
//No resource context exists for this resource. Return void;
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
Mono<Resource> resourceMono = resourceService
|
||||
.findById(resourceId);
|
||||
|
||||
Mono<Plugin> pluginMono = resourceMono
|
||||
.flatMap(resource -> pluginService.findById(resource.getPluginId()));
|
||||
|
||||
//Resource Context has not been created for this resource on this machine. Create one now.
|
||||
Mono<PluginExecutor> pluginExecutorMono = pluginMono.flatMap(plugin -> {
|
||||
List<PluginExecutor> executorList = pluginManager.getExtensions(PluginExecutor.class, plugin.getExecutorClass());
|
||||
if (executorList.isEmpty()) {
|
||||
return Mono.error(new AppsmithException(AppsmithError.NO_RESOURCE_FOUND, "plugin", plugin.getExecutorClass()));
|
||||
}
|
||||
return Mono.just(executorList.get(0));
|
||||
}
|
||||
);
|
||||
|
||||
return Mono.zip(resourceMono, pluginExecutorMono, ((resource, pluginExecutor) -> {
|
||||
pluginExecutor.resourceDestroy(resourceContext.getConnection());
|
||||
resourceContextHashMap.remove(resourceId);
|
||||
return resourceContext;
|
||||
}));
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user