chore: decomposing action execution flow methods into smaller methods (#18762)
This commit is contained in:
parent
4877acf1eb
commit
654f94a94c
|
|
@ -162,5 +162,6 @@ public class FieldName {
|
|||
public static final String UNASSIGNED_USER_GROUPS_FROM_PERMISSION_GROUPS = "unAssignedGroups";
|
||||
public static final String ASSIGNED_TO_PERMISSION_GROUPS = "assignedUserAndGroups";
|
||||
public static final String UNASSIGNED_FROM_PERMISSION_GROUPS = "unAssignedUsersAndGroups";
|
||||
public static final String ENVIRONMENT_NAME ="environmentName";
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,8 +74,9 @@ public class ActionControllerCE {
|
|||
|
||||
@PostMapping(value = "/execute", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
|
||||
public Mono<ResponseDTO<ActionExecutionResult>> executeAction(@RequestBody Flux<Part> partFlux,
|
||||
@RequestHeader(name = FieldName.BRANCH_NAME, required = false) String branchName) {
|
||||
return newActionService.executeAction(partFlux, branchName)
|
||||
@RequestHeader(name = FieldName.BRANCH_NAME, required = false) String branchName,
|
||||
@RequestHeader(name = FieldName.ENVIRONMENT_NAME, required = false) String environmentName) {
|
||||
return newActionService.executeAction(partFlux, branchName, environmentName)
|
||||
.map(updatedResource -> new ResponseDTO<>(HttpStatus.OK.value(), updatedResource, null));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -36,9 +36,9 @@ public interface NewActionServiceCE extends CrudService<NewAction, String> {
|
|||
|
||||
Mono<ActionDTO> updateUnpublishedAction(String id, ActionDTO action);
|
||||
|
||||
Mono<ActionExecutionResult> executeAction(ExecuteActionDTO executeActionDTO);
|
||||
Mono<ActionExecutionResult> executeAction(ExecuteActionDTO executeActionDTO, String environmentName);
|
||||
|
||||
Mono<ActionExecutionResult> executeAction(Flux<Part> partsFlux, String branchName);
|
||||
Mono<ActionExecutionResult> executeAction(Flux<Part> partsFlux, String branchName, String environmentName);
|
||||
|
||||
Mono<ActionDTO> getValidActionForExecution(ExecuteActionDTO executeActionDTO, String actionId, NewAction newAction);
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,9 @@ import com.appsmith.external.models.Policy;
|
|||
import com.appsmith.external.models.Property;
|
||||
import com.appsmith.external.models.Provider;
|
||||
import com.appsmith.external.models.RequestParamDTO;
|
||||
import com.appsmith.external.models.ActionProvider;
|
||||
import com.appsmith.external.models.PluginType;
|
||||
import com.appsmith.external.models.ActionDTO;
|
||||
import com.appsmith.external.plugins.PluginExecutor;
|
||||
import com.appsmith.server.acl.AclPermission;
|
||||
import com.appsmith.server.acl.PolicyGenerator;
|
||||
|
|
@ -83,6 +86,7 @@ import reactor.core.publisher.Flux;
|
|||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.util.function.Tuple2;
|
||||
import reactor.util.function.Tuple5;
|
||||
|
||||
import javax.lang.model.SourceVersion;
|
||||
import javax.validation.Validator;
|
||||
|
|
@ -629,10 +633,12 @@ public class NewActionServiceCEImpl extends BaseService<NewActionRepository, New
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ActionExecutionResult> executeAction(ExecuteActionDTO executeActionDTO) {
|
||||
// 1. Validate input parameters which are required for mustache replacements
|
||||
List<Param> params = executeActionDTO.getParams();
|
||||
/**
|
||||
* Sets the param value to "" if key is not empty and value is null for each param
|
||||
* @param params
|
||||
*/
|
||||
protected void replaceNullWithQuotesForParamValues(List<Param> params) {
|
||||
|
||||
if (!CollectionUtils.isEmpty(params)) {
|
||||
for (Param param : params) {
|
||||
// In case the parameter values turn out to be null, set it to empty string instead to allow
|
||||
|
|
@ -642,157 +648,285 @@ public class NewActionServiceCEImpl extends BaseService<NewActionRepository, New
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String actionId = executeActionDTO.getActionId();
|
||||
AtomicReference<String> actionName = new AtomicReference<>();
|
||||
// Initialize the name to be empty value
|
||||
actionName.set("");
|
||||
// 2. Fetch the action from the DB and check if it can be executed
|
||||
Mono<NewAction> actionMono = repository.findById(actionId, actionPermission.getExecutePermission())
|
||||
/**
|
||||
* Fetches and caches action with permission.
|
||||
* @param actionId
|
||||
* @return actionMono
|
||||
*/
|
||||
protected Mono<NewAction> getCachedActionForActionExecution(String actionId) {
|
||||
|
||||
return repository.findById(actionId, actionPermission.getExecutePermission())
|
||||
.switchIfEmpty(Mono.error(new AppsmithException(AppsmithError.NO_RESOURCE_FOUND, FieldName.ACTION, actionId)))
|
||||
.cache();
|
||||
}
|
||||
|
||||
Mono<ActionDTO> actionDTOMono = actionMono
|
||||
/**
|
||||
* Retrieves and caches validated actionDTO from actionMono.
|
||||
* @param actionMono
|
||||
* @param executeActionDTO
|
||||
* @param actionId
|
||||
* @return actionDTOMono
|
||||
*/
|
||||
protected Mono<ActionDTO> getCachedActionDTOForActionExecution(Mono<NewAction> actionMono,
|
||||
ExecuteActionDTO executeActionDTO,
|
||||
String actionId) {
|
||||
return actionMono
|
||||
.flatMap(action -> getValidActionForExecution(executeActionDTO, actionId, action))
|
||||
.cache();
|
||||
}
|
||||
|
||||
// 3. Instantiate the implementation class based on the query type
|
||||
/**
|
||||
* Fetches, validates and caches the datasource from actionDTO
|
||||
* @param actionDTOMono
|
||||
* @return datasourceMono
|
||||
*/
|
||||
protected Mono<Datasource> getCachedDatasourceForActionExecution(Mono<ActionDTO> actionDTOMono) {
|
||||
|
||||
Mono<Datasource> datasourceMono = actionDTOMono
|
||||
.flatMap(actionDTO -> datasourceService.getValidDatasourceFromActionMono(actionDTO, datasourcePermission.getExecutePermission()))
|
||||
.cache();
|
||||
|
||||
Mono<Plugin> pluginMono = datasourceMono
|
||||
return actionDTOMono
|
||||
.flatMap(actionDTO -> datasourceService.getValidDatasourceFromActionMono(actionDTO,
|
||||
datasourcePermission.getExecutePermission()))
|
||||
.flatMap(datasource -> {
|
||||
// For embedded datasources, validate the datasource for each execution
|
||||
// For embedded datasource, validate the datasource for each execution
|
||||
if (datasource.getId() == null) {
|
||||
return datasourceService.validateDatasource(datasource);
|
||||
}
|
||||
|
||||
// The external datasources have already been validated. No need to validate again.
|
||||
// The external datasource have already been validated. No need to validate again.
|
||||
return Mono.just(datasource);
|
||||
})
|
||||
.cache();
|
||||
}
|
||||
|
||||
/**
|
||||
* fetches and caches plugin by pluginId after checking datasource for invalids(issues)
|
||||
* @param datasourceMono
|
||||
* @param actionId
|
||||
* @return pluginMono if datasource has no issues and plugin is find, else throws error
|
||||
*/
|
||||
protected Mono<Plugin> getCachedPluginForActionExecution(Mono<Datasource> datasourceMono, String actionId) {
|
||||
return datasourceMono
|
||||
.flatMap(datasource -> {
|
||||
Set<String> invalids = datasource.getInvalids();
|
||||
if (!CollectionUtils.isEmpty(invalids)) {
|
||||
log.error("Unable to execute actionId: {} because it's datasource is not valid. Cause: {}",
|
||||
actionId, ArrayUtils.toString(invalids));
|
||||
actionId, ArrayUtils.toString(invalids));
|
||||
return Mono.error(new AppsmithException(AppsmithError.INVALID_DATASOURCE,
|
||||
datasource.getName(),
|
||||
ArrayUtils.toString(invalids)));
|
||||
datasource.getName(),
|
||||
ArrayUtils.toString(invalids)));
|
||||
}
|
||||
return pluginService.findById(datasource.getPluginId());
|
||||
})
|
||||
.switchIfEmpty(Mono.error(new AppsmithException(AppsmithError.NO_RESOURCE_FOUND, FieldName.PLUGIN)))
|
||||
.cache();
|
||||
}
|
||||
|
||||
Mono<PluginExecutor> pluginExecutorMono = pluginExecutorHelper.getPluginExecutor(pluginMono);
|
||||
/**
|
||||
* Fetches and returns editorConfigLabelMap if datasourceId is present
|
||||
* @param datasourceMono
|
||||
* @return an Empty hashMap if datasource doesn't have id, else configLabelMap from plugin service
|
||||
*/
|
||||
protected Mono<Map> getEditorConfigLabelMap (Mono<Datasource> datasourceMono) {
|
||||
|
||||
// 4. Execute the query
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = Mono
|
||||
.zip(
|
||||
actionDTOMono,
|
||||
datasourceMono,
|
||||
pluginExecutorMono,
|
||||
pluginMono
|
||||
)
|
||||
return datasourceMono
|
||||
.flatMap(datasource -> {
|
||||
if (!StringUtils.hasLength(datasource.getId())) {
|
||||
return Mono.just(new HashMap());
|
||||
}
|
||||
|
||||
return pluginService.getEditorConfigLabelMap(datasource.getPluginId());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Passes the payload to pluginExecutor post datasource validation and context retrieval
|
||||
* <p>
|
||||
* This method validates the datasource, retrieves context and subsequently passes the payload to pluginExecutor for
|
||||
* further execution of the request.
|
||||
* </p>
|
||||
* <p> In case of failure the method retries to from context</p>
|
||||
*
|
||||
* @param executeActionDTO
|
||||
* @param actionDTO
|
||||
* @param datasource
|
||||
* @param plugin
|
||||
* @param pluginExecutor
|
||||
* @param environmentName
|
||||
* @return actionExecutionResultMono
|
||||
*/
|
||||
protected Mono<ActionExecutionResult> verifyDatasourceAndMakeRequest (ExecuteActionDTO executeActionDTO,
|
||||
ActionDTO actionDTO,
|
||||
Datasource datasource,
|
||||
Plugin plugin,
|
||||
PluginExecutor pluginExecutor,
|
||||
String environmentName) {
|
||||
// This method will be overridden in EE branch to make use of environmentName.
|
||||
Mono<Datasource> validatedDatasourceMono = getValidatedDatasourceForActionExecution(datasource, environmentName);
|
||||
|
||||
Mono<ActionExecutionResult> executionMono = validatedDatasourceMono
|
||||
.flatMap(datasource1 -> getDatasourceContextFromValidatedDatasourceForActionExecution(datasource1,
|
||||
plugin,
|
||||
environmentName))
|
||||
// Now that we have the context (connection details), execute the action.
|
||||
.flatMap(resourceContext -> validatedDatasourceMono
|
||||
.flatMap(datasource1 -> {
|
||||
final Instant requestedAt = Instant.now();
|
||||
return ((Mono<ActionExecutionResult>) pluginExecutor.
|
||||
executeParameterized(resourceContext.getConnection(),
|
||||
executeActionDTO,
|
||||
datasource1.getDatasourceConfiguration(),
|
||||
actionDTO.getActionConfiguration()))
|
||||
.map(actionExecutionResult -> {
|
||||
ActionExecutionRequest actionExecutionRequest = actionExecutionResult.getRequest();
|
||||
if (actionExecutionRequest == null) {
|
||||
actionExecutionRequest = new ActionExecutionRequest();
|
||||
}
|
||||
actionExecutionRequest.setActionId(executeActionDTO.getActionId());
|
||||
actionExecutionRequest.setRequestedAt(requestedAt);
|
||||
|
||||
actionExecutionResult.setRequest(actionExecutionRequest);
|
||||
return actionExecutionResult;
|
||||
});
|
||||
}));
|
||||
|
||||
return executionMono.onErrorResume(StaleConnectionException.class, error -> {
|
||||
log.info("Looks like the connection is stale. Retrying with a fresh context.");
|
||||
return deleteDatasourceContextForRetry(datasource, environmentName)
|
||||
.then(executionMono);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the datasource for further execution
|
||||
* @param datasource
|
||||
* @return
|
||||
*/
|
||||
protected Mono<Datasource> getValidatedDatasourceForActionExecution (Datasource datasource, String environmentName) {
|
||||
// the environmentName argument is not consumed over here
|
||||
// See EE override for usage of variable
|
||||
return authenticationValidator.validateAuthentication(datasource).cache();
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides datasource context for execution
|
||||
* @param validatedDatasource
|
||||
* @param plugin
|
||||
* @param environmentName
|
||||
* @return datasourceContextMono
|
||||
*/
|
||||
protected Mono<DatasourceContext<?>> getDatasourceContextFromValidatedDatasourceForActionExecution
|
||||
(Datasource validatedDatasource, Plugin plugin, String environmentName) {
|
||||
// the environmentName argument is not consumed over here
|
||||
// See EE override for usage of variable
|
||||
if (plugin.isRemotePlugin()) {
|
||||
return datasourceContextService.getRemoteDatasourceContext(plugin, validatedDatasource);
|
||||
}
|
||||
return datasourceContextService.getDatasourceContext(validatedDatasource);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the datasourceContext for the given datasource
|
||||
* @param datasource
|
||||
* @param environmentName
|
||||
* @return datasourceContextMono
|
||||
*/
|
||||
protected Mono<DatasourceContext<?>> deleteDatasourceContextForRetry(Datasource datasource, String environmentName) {
|
||||
// the environmentName argument is not consumed over here
|
||||
// See EE override for usage of variable
|
||||
return datasourceContextService.deleteDatasourceContext(datasource.getId());
|
||||
}
|
||||
|
||||
protected Mono<ActionExecutionResult> handleExecutionErrors(Mono<ActionExecutionResult> actionExecutionResultMono,
|
||||
ActionDTO actionDTO,
|
||||
Integer timeoutDuration,
|
||||
String actionId) {
|
||||
return actionExecutionResultMono
|
||||
.onErrorMap(TimeoutException.class, error ->
|
||||
new AppsmithPluginException(AppsmithPluginError.PLUGIN_QUERY_TIMEOUT_ERROR,
|
||||
actionDTO.getName(),
|
||||
timeoutDuration))
|
||||
.onErrorMap(StaleConnectionException.class, error ->
|
||||
new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR,
|
||||
"Secondary stale connection error."))
|
||||
.onErrorResume(e -> {
|
||||
log.debug("{}: In the action execution error mode.",
|
||||
Thread.currentThread().getName(), e);
|
||||
ActionExecutionResult result = new ActionExecutionResult();
|
||||
result.setBody(e.getMessage());
|
||||
result.setIsExecutionSuccess(false);
|
||||
final ActionExecutionRequest actionExecutionRequest = new ActionExecutionRequest();
|
||||
actionExecutionRequest.setActionId(actionId);
|
||||
actionExecutionRequest.setRequestedAt(Instant.now());
|
||||
result.setRequest(actionExecutionRequest);
|
||||
// Set the status code for Appsmith plugin errors
|
||||
if (e instanceof AppsmithPluginException) {
|
||||
result.setStatusCode(((AppsmithPluginException) e).getAppErrorCode().toString());
|
||||
result.setTitle(((AppsmithPluginException) e).getTitle());
|
||||
result.setErrorType(((AppsmithPluginException) e).getErrorType());
|
||||
} else {
|
||||
result.setStatusCode(AppsmithPluginError.PLUGIN_ERROR.getAppErrorCode().toString());
|
||||
|
||||
if (e instanceof AppsmithException) {
|
||||
result.setTitle(((AppsmithException) e).getTitle());
|
||||
result.setErrorType(((AppsmithException) e).getErrorType());
|
||||
}
|
||||
}
|
||||
return Mono.just(result);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the execution logic, call to pluginExecutor with the payload post retrieval and validation of action, datasource, and plugin
|
||||
* @param executeActionDTO
|
||||
* @param actionMono
|
||||
* @param actionDTOMono
|
||||
* @param datasourceMono
|
||||
* @param pluginMono
|
||||
* @param pluginExecutorMono
|
||||
* @param actionName
|
||||
* @param actionId
|
||||
* @param environmentName
|
||||
* @return actionExecutionResultMono
|
||||
*/
|
||||
protected Mono<ActionExecutionResult> getActionExecutionResult( ExecuteActionDTO executeActionDTO,
|
||||
Mono<NewAction> actionMono,
|
||||
Mono<ActionDTO> actionDTOMono,
|
||||
Mono<Datasource> datasourceMono,
|
||||
Mono<Plugin> pluginMono,
|
||||
Mono<PluginExecutor> pluginExecutorMono,
|
||||
AtomicReference<String> actionName,
|
||||
String actionId,
|
||||
String environmentName) {
|
||||
|
||||
Mono<Tuple5<ActionDTO, Datasource, PluginExecutor, Plugin, NewAction>> executeActionPublishersCache =
|
||||
Mono.zip(actionDTOMono, datasourceMono, pluginExecutorMono, pluginMono, actionMono).cache();
|
||||
|
||||
return executeActionPublishersCache
|
||||
.flatMap(tuple -> {
|
||||
final ActionDTO action = tuple.getT1();
|
||||
final ActionDTO actionDTO = tuple.getT1();
|
||||
final Datasource datasource = tuple.getT2();
|
||||
final PluginExecutor pluginExecutor = tuple.getT3();
|
||||
final Plugin plugin = tuple.getT4();
|
||||
final NewAction actionFromDb = tuple.getT5();
|
||||
|
||||
// Set the action name
|
||||
actionName.set(action.getName());
|
||||
|
||||
ActionConfiguration actionConfiguration = action.getActionConfiguration();
|
||||
|
||||
Integer timeoutDuration = actionConfiguration.getTimeoutInMillisecond();
|
||||
actionName.set(actionDTO.getName());
|
||||
|
||||
log.debug("[{}]Execute Action called in Page {}, for action id : {} action name : {}",
|
||||
Thread.currentThread().getName(),
|
||||
action.getPageId(), actionId, action.getName());
|
||||
actionDTO.getPageId(), actionId, actionDTO.getName());
|
||||
|
||||
Mono<Datasource> validatedDatasourceMono = authenticationValidator.validateAuthentication(datasource).cache();
|
||||
Integer timeoutDuration = actionDTO.getActionConfiguration().getTimeoutInMillisecond();
|
||||
|
||||
Mono<ActionExecutionResult> executionMono = validatedDatasourceMono
|
||||
.flatMap(datasource1 -> {
|
||||
if (plugin.isRemotePlugin()) {
|
||||
return datasourceContextService.getRemoteDatasourceContext(plugin, datasource1);
|
||||
} else {
|
||||
return datasourceContextService.getDatasourceContext(datasource1);
|
||||
}
|
||||
})
|
||||
// Now that we have the context (connection details), execute the action.
|
||||
.flatMap(resourceContext -> validatedDatasourceMono
|
||||
.flatMap(datasource1 -> {
|
||||
final Instant requestedAt = Instant.now();
|
||||
return ((Mono<ActionExecutionResult>) pluginExecutor.executeParameterized(
|
||||
resourceContext.getConnection(),
|
||||
executeActionDTO,
|
||||
datasource1.getDatasourceConfiguration(),
|
||||
actionConfiguration
|
||||
)).map(actionExecutionResult -> {
|
||||
ActionExecutionRequest actionExecutionRequest = actionExecutionResult.getRequest();
|
||||
if (actionExecutionRequest == null) {
|
||||
actionExecutionRequest = new ActionExecutionRequest();
|
||||
}
|
||||
actionExecutionRequest.setActionId(actionId);
|
||||
actionExecutionRequest.setRequestedAt(requestedAt);
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono =
|
||||
verifyDatasourceAndMakeRequest(executeActionDTO, actionDTO, datasource,
|
||||
plugin, pluginExecutor, environmentName)
|
||||
.timeout(Duration.ofMillis(timeoutDuration));
|
||||
|
||||
actionExecutionResult.setRequest(actionExecutionRequest);
|
||||
|
||||
return actionExecutionResult;
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
return executionMono
|
||||
.onErrorResume(StaleConnectionException.class, error -> {
|
||||
log.info("Looks like the connection is stale. Retrying with a fresh context.");
|
||||
return datasourceContextService
|
||||
.deleteDatasourceContext(datasource.getId())
|
||||
.then(executionMono);
|
||||
})
|
||||
.timeout(Duration.ofMillis(timeoutDuration))
|
||||
.onErrorMap(TimeoutException.class,
|
||||
error -> new AppsmithPluginException(
|
||||
AppsmithPluginError.PLUGIN_QUERY_TIMEOUT_ERROR,
|
||||
action.getName(), timeoutDuration
|
||||
)
|
||||
)
|
||||
.onErrorMap(
|
||||
StaleConnectionException.class,
|
||||
error -> new AppsmithPluginException(
|
||||
AppsmithPluginError.PLUGIN_ERROR,
|
||||
"Secondary stale connection error."
|
||||
)
|
||||
)
|
||||
.onErrorResume(e -> {
|
||||
log.debug("{}: In the action execution error mode.",
|
||||
Thread.currentThread().getName(), e);
|
||||
ActionExecutionResult result = new ActionExecutionResult();
|
||||
result.setBody(e.getMessage());
|
||||
result.setIsExecutionSuccess(false);
|
||||
final ActionExecutionRequest actionExecutionRequest = new ActionExecutionRequest();
|
||||
actionExecutionRequest.setActionId(actionId);
|
||||
actionExecutionRequest.setRequestedAt(Instant.now());
|
||||
result.setRequest(actionExecutionRequest);
|
||||
// Set the status code for Appsmith plugin errors
|
||||
if (e instanceof AppsmithPluginException) {
|
||||
result.setStatusCode(((AppsmithPluginException) e).getAppErrorCode().toString());
|
||||
result.setTitle(((AppsmithPluginException) e).getTitle());
|
||||
result.setErrorType(((AppsmithPluginException) e).getErrorType());
|
||||
} else {
|
||||
result.setStatusCode(AppsmithPluginError.PLUGIN_ERROR.getAppErrorCode().toString());
|
||||
|
||||
if (e instanceof AppsmithException) {
|
||||
result.setTitle(((AppsmithException) e).getTitle());
|
||||
result.setErrorType(((AppsmithException) e).getErrorType());
|
||||
}
|
||||
}
|
||||
return Mono.just(result);
|
||||
})
|
||||
return handleExecutionErrors(actionExecutionResultMono, actionDTO ,timeoutDuration ,actionId)
|
||||
.elapsed()
|
||||
// Now send the analytics event for this execution
|
||||
.flatMap(tuple1 -> {
|
||||
|
|
@ -806,18 +940,10 @@ public class NewActionServiceCEImpl extends BaseService<NewActionRepository, New
|
|||
timeElapsed
|
||||
);
|
||||
|
||||
return Mono.zip(actionMono, actionDTOMono, datasourceMono)
|
||||
.flatMap(tuple2 -> {
|
||||
ActionExecutionResult actionExecutionResult = result;
|
||||
NewAction actionFromDb = tuple2.getT1();
|
||||
ActionDTO actionDTO = tuple2.getT2();
|
||||
Datasource datasourceFromDb = tuple2.getT3();
|
||||
|
||||
return Mono.when(sendExecuteAnalyticsEvent(actionFromDb, actionDTO, datasourceFromDb, executeActionDTO, actionExecutionResult, timeElapsed))
|
||||
.thenReturn(result);
|
||||
});
|
||||
}
|
||||
);
|
||||
return sendExecuteAnalyticsEvent(actionFromDb, actionDTO, datasource,
|
||||
executeActionDTO, result, timeElapsed)
|
||||
.then(Mono.just(result));
|
||||
});
|
||||
})
|
||||
.onErrorResume(AppsmithException.class, error -> {
|
||||
ActionExecutionResult result = new ActionExecutionResult();
|
||||
|
|
@ -828,40 +954,63 @@ public class NewActionServiceCEImpl extends BaseService<NewActionRepository, New
|
|||
result.setErrorType(error.getErrorType());
|
||||
return Mono.just(result);
|
||||
});
|
||||
}
|
||||
|
||||
Mono<Map> editorConfigLabelMapMono = datasourceMono
|
||||
.flatMap(datasource -> {
|
||||
if (datasource.getId() != null) {
|
||||
return pluginService.getEditorConfigLabelMap(datasource.getPluginId());
|
||||
}
|
||||
/**
|
||||
* Fetches the required Mono (action, datasource, and plugin) and makes actionExecution call to plugin
|
||||
* @param executeActionDTO
|
||||
* @param environmentName
|
||||
* @return actionExecutionResult if query succeeds, error messages otherwise
|
||||
*/
|
||||
public Mono<ActionExecutionResult> executeAction(ExecuteActionDTO executeActionDTO, String environmentName) {
|
||||
|
||||
return Mono.just(new HashMap());
|
||||
});
|
||||
// 1. Validate input parameters which are required for mustache replacements
|
||||
replaceNullWithQuotesForParamValues(executeActionDTO.getParams());
|
||||
|
||||
return Mono.zip(actionExecutionResultMono, editorConfigLabelMapMono)
|
||||
.flatMap(tuple -> {
|
||||
ActionExecutionResult result = tuple.getT1();
|
||||
// In case the action was executed in view mode, do not return the request object
|
||||
String actionId = executeActionDTO.getActionId();
|
||||
AtomicReference<String> actionName = new AtomicReference<>();
|
||||
actionName.set("");
|
||||
|
||||
// 2. Fetch the action from the DB and check if it can be executed
|
||||
Mono<NewAction> actionMono = getCachedActionForActionExecution(actionId);
|
||||
Mono<ActionDTO> actionDTOMono = getCachedActionDTOForActionExecution(actionMono, executeActionDTO, actionId);
|
||||
|
||||
// 3. Instantiate the implementation class based on the query type
|
||||
Mono<Datasource> datasourceMono = getCachedDatasourceForActionExecution(actionDTOMono);
|
||||
Mono<Plugin> pluginMono = getCachedPluginForActionExecution(datasourceMono, actionId);
|
||||
Mono<PluginExecutor> pluginExecutorMono = pluginExecutorHelper.getPluginExecutor(pluginMono);
|
||||
|
||||
// 4. Execute the query
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = getActionExecutionResult(executeActionDTO,
|
||||
actionMono,
|
||||
actionDTOMono,
|
||||
datasourceMono,
|
||||
pluginMono,
|
||||
pluginExecutorMono,
|
||||
actionName,
|
||||
actionId,
|
||||
environmentName);
|
||||
|
||||
Mono<Map> editorConfigLabelMapMono = getEditorConfigLabelMap(datasourceMono);
|
||||
|
||||
return actionExecutionResultMono
|
||||
.zipWith(editorConfigLabelMapMono, (result, labelMap) -> {
|
||||
if (TRUE.equals(executeActionDTO.getViewMode())) {
|
||||
result.setRequest(null);
|
||||
return Mono.just(result);
|
||||
} else if (result.getRequest() != null && result.getRequest().getRequestParams()!= null) {
|
||||
transformRequestParams(result, labelMap);
|
||||
}
|
||||
|
||||
if (result.getRequest() == null || result.getRequest().getRequestParams() == null) {
|
||||
return Mono.just(result);
|
||||
}
|
||||
|
||||
Map labelMap = tuple.getT2();
|
||||
transformRequestParams(result, labelMap);
|
||||
|
||||
return Mono.just(result);
|
||||
return result;
|
||||
})
|
||||
.map(result -> addDataTypesAndSetSuggestedWidget(result, executeActionDTO.getViewMode()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ActionExecutionResult> executeAction(Flux<Part> partFlux, String branchName) {
|
||||
|
||||
/**
|
||||
* Creates the ExecuteActionDTO from Flux of ByteBuffers
|
||||
* @param partFlux
|
||||
* @return an executionDTO object with parameterMap
|
||||
*/
|
||||
protected Mono<ExecuteActionDTO> createExecuteActionDTO(Flux<Part> partFlux) {
|
||||
final ExecuteActionDTO dto = new ExecuteActionDTO();
|
||||
return partFlux
|
||||
.flatMap(part -> {
|
||||
|
|
@ -933,22 +1082,31 @@ public class NewActionServiceCEImpl extends BaseService<NewActionRepository, New
|
|||
params.forEach(
|
||||
param -> {
|
||||
String pseudoBindingName = param.getPseudoBindingName();
|
||||
param.setKey(dto.getInvertParameterMap().get(pseudoBindingName));
|
||||
param.setKey(dto.getInvertParameterMap()
|
||||
.get(pseudoBindingName));
|
||||
//if the type is not an array e.g. "k1": "string" or "k1": "boolean"
|
||||
if (dto.getParamProperties().get(pseudoBindingName) instanceof String) {
|
||||
param.setClientDataType(ClientDataType.valueOf(String.valueOf(dto.getParamProperties().get(pseudoBindingName)).toUpperCase()));
|
||||
} else if (dto.getParamProperties().get(pseudoBindingName) instanceof LinkedHashMap) {
|
||||
if (dto.getParamProperties()
|
||||
.get(pseudoBindingName) instanceof String) {
|
||||
param.setClientDataType(ClientDataType.valueOf(String.valueOf(dto.getParamProperties()
|
||||
.get(pseudoBindingName))
|
||||
.toUpperCase()));
|
||||
} else if (dto.getParamProperties()
|
||||
.get(pseudoBindingName) instanceof LinkedHashMap) {
|
||||
//if the type is an array e.g. "k1": { "array": [ "string", "number", "string", "boolean"]
|
||||
LinkedHashMap<String, ArrayList> stringArrayListLinkedHashMap =
|
||||
(LinkedHashMap<String, ArrayList>) dto.getParamProperties().get(pseudoBindingName);
|
||||
Optional<String> firstKeyOpt = stringArrayListLinkedHashMap.keySet().stream().findFirst();
|
||||
(LinkedHashMap<String, ArrayList>) dto.getParamProperties()
|
||||
.get(pseudoBindingName);
|
||||
Optional<String> firstKeyOpt = stringArrayListLinkedHashMap.keySet()
|
||||
.stream()
|
||||
.findFirst();
|
||||
if (firstKeyOpt.isPresent()) {
|
||||
String firstKey = firstKeyOpt.get();
|
||||
param.setClientDataType(ClientDataType.valueOf(firstKey.toUpperCase()));
|
||||
List<String> individualTypes = stringArrayListLinkedHashMap.get(firstKey);
|
||||
List<ClientDataType> dataTypesOfArrayElements =
|
||||
individualTypes.stream()
|
||||
.map(it -> ClientDataType.valueOf(String.valueOf(it).toUpperCase()))
|
||||
.map(it -> ClientDataType.valueOf(String.valueOf(it)
|
||||
.toUpperCase()))
|
||||
.collect(Collectors.toList());
|
||||
param.setDataTypesOfArrayElements(dataTypesOfArrayElements);
|
||||
}
|
||||
|
|
@ -958,17 +1116,30 @@ public class NewActionServiceCEImpl extends BaseService<NewActionRepository, New
|
|||
);
|
||||
dto.setParams(params);
|
||||
return Mono.just(dto);
|
||||
})
|
||||
.flatMap(executeActionDTO -> this
|
||||
.findByBranchNameAndDefaultActionId(branchName, executeActionDTO.getActionId(), actionPermission.getExecutePermission())
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the action(queries) by creating executeActionDTO and sending it to the plugin for further execution
|
||||
* @param partFlux
|
||||
* @param branchName
|
||||
* @param environmentName
|
||||
* @return Mono of actionExecutionResult if the query succeeds, error messages otherwise
|
||||
*/
|
||||
@Override
|
||||
public Mono<ActionExecutionResult> executeAction(Flux<Part> partFlux, String branchName, String environmentName) {
|
||||
return createExecuteActionDTO(partFlux)
|
||||
.flatMap(executeActionDTO -> findByBranchNameAndDefaultActionId(branchName,
|
||||
executeActionDTO.getActionId(),
|
||||
actionPermission.getExecutePermission())
|
||||
.map(branchedAction -> {
|
||||
executeActionDTO.setActionId(branchedAction.getId());
|
||||
return executeActionDTO;
|
||||
})
|
||||
)
|
||||
.flatMap(this::executeAction);
|
||||
}))
|
||||
.flatMap(executeActionDTO -> this.executeAction(executeActionDTO, environmentName));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Mono<ActionDTO> getValidActionForExecution(ExecuteActionDTO executeActionDTO, String actionId, NewAction newAction) {
|
||||
Mono<ActionDTO> actionDTOMono = Mono.just(newAction)
|
||||
|
|
|
|||
|
|
@ -839,7 +839,7 @@ public class ActionServiceCE_Test {
|
|||
Mockito.when(pluginExecutor.executeParameterized(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Mono.error(pluginException));
|
||||
Mockito.when(pluginExecutor.datasourceCreate(Mockito.any())).thenReturn(Mono.empty());
|
||||
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO);
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO, null);
|
||||
|
||||
StepVerifier.create(executionResultMono)
|
||||
.assertNext(result -> {
|
||||
|
|
@ -889,7 +889,7 @@ public class ActionServiceCE_Test {
|
|||
Mockito.when(pluginExecutor.executeParameterized(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Mono.error(pluginException));
|
||||
Mockito.when(pluginExecutor.datasourceCreate(Mockito.any())).thenReturn(Mono.empty());
|
||||
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO);
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO, null);
|
||||
|
||||
StepVerifier.create(executionResultMono)
|
||||
.assertNext(result -> {
|
||||
|
|
@ -933,7 +933,7 @@ public class ActionServiceCE_Test {
|
|||
.thenReturn(Mono.error(new StaleConnectionException())).thenReturn(Mono.error(new StaleConnectionException()));
|
||||
Mockito.when(pluginExecutor.datasourceCreate(Mockito.any())).thenReturn(Mono.empty());
|
||||
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO);
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO, null);
|
||||
|
||||
StepVerifier.create(executionResultMono)
|
||||
.assertNext(result -> {
|
||||
|
|
@ -977,7 +977,7 @@ public class ActionServiceCE_Test {
|
|||
.thenAnswer(x -> Mono.delay(Duration.ofMillis(1000)).ofType(ActionExecutionResult.class));
|
||||
Mockito.when(pluginExecutor.datasourceCreate(Mockito.any())).thenReturn(Mono.empty());
|
||||
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO);
|
||||
Mono<ActionExecutionResult> executionResultMono = newActionService.executeAction(executeActionDTO, null);
|
||||
|
||||
StepVerifier.create(executionResultMono)
|
||||
.assertNext(result -> {
|
||||
|
|
@ -1079,7 +1079,7 @@ public class ActionServiceCE_Test {
|
|||
executeActionDTO.setActionId(createdAction.getId());
|
||||
executeActionDTO.setViewMode(false);
|
||||
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(executeActionDTO);
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(executeActionDTO, null);
|
||||
|
||||
StepVerifier.create(actionExecutionResultMono)
|
||||
.assertNext(result -> {
|
||||
|
|
@ -1114,7 +1114,7 @@ public class ActionServiceCE_Test {
|
|||
Mockito.when(pluginExecutor.executeParameterized(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(Mono.just(mockResult));
|
||||
Mockito.when(pluginExecutor.datasourceCreate(Mockito.any())).thenReturn(Mono.empty());
|
||||
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(executeActionDTO);
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(executeActionDTO, null);
|
||||
return actionExecutionResultMono;
|
||||
}
|
||||
|
||||
|
|
@ -1181,7 +1181,7 @@ public class ActionServiceCE_Test {
|
|||
ExecuteActionDTO executeActionDTO = new ExecuteActionDTO();
|
||||
executeActionDTO.setActionId(savedAction.getId());
|
||||
executeActionDTO.setViewMode(false);
|
||||
return newActionService.executeAction(executeActionDTO);
|
||||
return newActionService.executeAction(executeActionDTO, null);
|
||||
});
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package com.appsmith.server.services.ce;
|
||||
|
||||
import com.appsmith.external.dtos.ExecuteActionDTO;
|
||||
import com.appsmith.external.models.ActionExecutionResult;
|
||||
import com.appsmith.external.models.Datasource;
|
||||
import com.appsmith.server.acl.PolicyGenerator;
|
||||
|
|
@ -198,7 +199,7 @@ public class NewActionServiceCEImplTest {
|
|||
|
||||
@Test
|
||||
public void testExecuteAction_withoutExecuteActionDTOPart_failsValidation() {
|
||||
final Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(Flux.empty(), null);
|
||||
final Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(Flux.empty(), null, null);
|
||||
|
||||
StepVerifier
|
||||
.create(actionExecutionResultMono)
|
||||
|
|
@ -219,7 +220,7 @@ public class NewActionServiceCEImplTest {
|
|||
final Flux<Part> partsFlux = BodyExtractors.toParts()
|
||||
.extract(mock, this.context);
|
||||
|
||||
final Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(partsFlux, null);
|
||||
final Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(partsFlux, null, null);
|
||||
|
||||
StepVerifier
|
||||
.create(actionExecutionResultMono)
|
||||
|
|
@ -240,7 +241,7 @@ public class NewActionServiceCEImplTest {
|
|||
final Flux<Part> partsFlux = BodyExtractors.toParts()
|
||||
.extract(mock, this.context);
|
||||
|
||||
final Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(partsFlux, null);
|
||||
final Mono<ActionExecutionResult> actionExecutionResultMono = newActionService.executeAction(partsFlux, null, null);
|
||||
|
||||
StepVerifier
|
||||
.create(actionExecutionResultMono)
|
||||
|
|
@ -330,7 +331,7 @@ public class NewActionServiceCEImplTest {
|
|||
|
||||
NewActionServiceCE newActionServiceSpy = spy(newActionService);
|
||||
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionServiceSpy.executeAction(partsFlux, null);
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionServiceSpy.executeAction(partsFlux, null, null);
|
||||
|
||||
ActionExecutionResult mockResult = new ActionExecutionResult();
|
||||
mockResult.setIsExecutionSuccess(true);
|
||||
|
|
@ -339,7 +340,7 @@ public class NewActionServiceCEImplTest {
|
|||
|
||||
NewAction newAction = new NewAction();
|
||||
newAction.setId("63285a3388e48972c7519b18");
|
||||
doReturn(Mono.just(mockResult)).when(newActionServiceSpy).executeAction(any());
|
||||
doReturn(Mono.just(mockResult)).when(newActionServiceSpy).executeAction(any(), any());
|
||||
doReturn(Mono.just(newAction)).when(newActionServiceSpy).findByBranchNameAndDefaultActionId(any(), any(), any());
|
||||
|
||||
|
||||
|
|
@ -380,7 +381,7 @@ public class NewActionServiceCEImplTest {
|
|||
|
||||
NewActionServiceCE newActionServiceSpy = spy(newActionService);
|
||||
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionServiceSpy.executeAction(partsFlux, null);
|
||||
Mono<ActionExecutionResult> actionExecutionResultMono = newActionServiceSpy.executeAction(partsFlux, null, null);
|
||||
|
||||
ActionExecutionResult mockResult = new ActionExecutionResult();
|
||||
mockResult.setIsExecutionSuccess(true);
|
||||
|
|
@ -389,7 +390,7 @@ public class NewActionServiceCEImplTest {
|
|||
|
||||
NewAction newAction = new NewAction();
|
||||
newAction.setId("63285a3388e48972c7519b18");
|
||||
doReturn(Mono.just(mockResult)).when(newActionServiceSpy).executeAction(any());
|
||||
doReturn(Mono.just(mockResult)).when(newActionServiceSpy).executeAction(any(), any());
|
||||
doReturn(Mono.just(newAction)).when(newActionServiceSpy).findByBranchNameAndDefaultActionId(any(), any(), any());
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user