diff --git a/app/server/appsmith-plugins/elasticSearchPlugin/src/main/java/com/external/plugins/ElasticSearchPlugin.java b/app/server/appsmith-plugins/elasticSearchPlugin/src/main/java/com/external/plugins/ElasticSearchPlugin.java index fbe05759ab..69435d3772 100644 --- a/app/server/appsmith-plugins/elasticSearchPlugin/src/main/java/com/external/plugins/ElasticSearchPlugin.java +++ b/app/server/appsmith-plugins/elasticSearchPlugin/src/main/java/com/external/plugins/ElasticSearchPlugin.java @@ -30,6 +30,7 @@ import org.pf4j.PluginWrapper; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.io.IOException; import java.net.MalformedURLException; @@ -54,50 +55,56 @@ public class ElasticSearchPlugin extends BasePlugin { public Mono execute(RestClient client, DatasourceConfiguration datasourceConfiguration, ActionConfiguration actionConfiguration) { - final ActionExecutionResult result = new ActionExecutionResult(); - String body = actionConfiguration.getBody(); + return (Mono) Mono.fromCallable(() -> { + final ActionExecutionResult result = new ActionExecutionResult(); - final String path = actionConfiguration.getPath(); - final Request request = new Request(actionConfiguration.getHttpMethod().toString(), path); - ContentType contentType = ContentType.APPLICATION_JSON; + String body = actionConfiguration.getBody(); - if (isBulkQuery(path)) { - contentType = ContentType.create("application/x-ndjson"); + final String path = actionConfiguration.getPath(); + final Request request = new Request(actionConfiguration.getHttpMethod().toString(), path); + ContentType contentType = ContentType.APPLICATION_JSON; - // If body is a JSON Array, convert it to an ND-JSON string. - if (body != null && body.trim().startsWith("[")) { - final StringBuilder ndJsonBuilder = new StringBuilder(); - try { - List commands = objectMapper.readValue(body, ArrayList.class); - for (Object object : commands) { - ndJsonBuilder.append(objectMapper.writeValueAsString(object)).append("\n"); + if (isBulkQuery(path)) { + contentType = ContentType.create("application/x-ndjson"); + + // If body is a JSON Array, convert it to an ND-JSON string. + if (body != null && body.trim().startsWith("[")) { + final StringBuilder ndJsonBuilder = new StringBuilder(); + try { + List commands = objectMapper.readValue(body, ArrayList.class); + for (Object object : commands) { + ndJsonBuilder.append(objectMapper.writeValueAsString(object)).append("\n"); + } + } catch (IOException e) { + final String message = "Error converting array to ND-JSON: " + e.getMessage(); + log.warn(message, e); + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message)); } - } catch (IOException e) { - final String message = "Error converting array to ND-JSON: " + e.getMessage(); - log.warn(message, e); - return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message)); + body = ndJsonBuilder.toString(); } - body = ndJsonBuilder.toString(); } - } - if (body != null) { - request.setEntity(new NStringEntity(body, contentType)); - } + if (body != null) { + request.setEntity(new NStringEntity(body, contentType)); + } - try { - final String responseBody = new String( - client.performRequest(request).getEntity().getContent().readAllBytes()); - result.setBody(objectMapper.readValue(responseBody, HashMap.class)); - } catch (IOException e) { - final String message = "Error performing request: " + e.getMessage(); - log.warn(message, e); - return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message)); - } + try { + final String responseBody = new String( + client.performRequest(request).getEntity().getContent().readAllBytes()); + result.setBody(objectMapper.readValue(responseBody, HashMap.class)); + } catch (IOException e) { + final String message = "Error performing request: " + e.getMessage(); + log.warn(message, e); + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message)); + } - result.setIsExecutionSuccess(true); - return Mono.just(result); + result.setIsExecutionSuccess(true); + System.out.println(Thread.currentThread().getName() + ": In the Elastic Search Plugin, got action execution result: " + result.toString()); + return Mono.just(result); + }) + .flatMap(obj -> obj) + .subscribeOn(Schedulers.elastic()); } private static boolean isBulkQuery(String path) { @@ -106,52 +113,57 @@ public class ElasticSearchPlugin extends BasePlugin { @Override public Mono datasourceCreate(DatasourceConfiguration datasourceConfiguration) { - final List hosts = new ArrayList<>(); - for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) { - URL url; - try { - url = new URL(endpoint.getHost()); - } catch (MalformedURLException e) { - return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Invalid host provided. It should be of the form http(s)://your-es-url.com")); - } - String scheme = "http"; - if (url.getProtocol() != null) { - scheme = url.getProtocol(); + return (Mono) Mono.fromCallable(() -> { + final List hosts = new ArrayList<>(); + + for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) { + URL url; + try { + url = new URL(endpoint.getHost()); + } catch (MalformedURLException e) { + return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Invalid host provided. It should be of the form http(s)://your-es-url.com")); + } + String scheme = "http"; + if (url.getProtocol() != null) { + scheme = url.getProtocol(); + } + + hosts.add(new HttpHost(url.getHost(), endpoint.getPort().intValue(), scheme)); } - hosts.add(new HttpHost(url.getHost(), endpoint.getPort().intValue(), scheme)); - } + final RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[]{})); - final RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[]{})); + final AuthenticationDTO authentication = datasourceConfiguration.getAuthentication(); + if (authentication != null + && !StringUtils.isEmpty(authentication.getUsername()) + && !StringUtils.isEmpty(authentication.getPassword())) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + AuthScope.ANY, + new UsernamePasswordCredentials(authentication.getUsername(), authentication.getPassword()) + ); - final AuthenticationDTO authentication = datasourceConfiguration.getAuthentication(); - if (authentication != null - && !StringUtils.isEmpty(authentication.getUsername()) - && !StringUtils.isEmpty(authentication.getPassword())) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials( - AuthScope.ANY, - new UsernamePasswordCredentials(authentication.getUsername(), authentication.getPassword()) - ); + clientBuilder + .setHttpClientConfigCallback( + httpClientBuilder -> httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + ); + } - clientBuilder - .setHttpClientConfigCallback( - httpClientBuilder -> httpClientBuilder - .setDefaultCredentialsProvider(credentialsProvider) - ); - } + if (!CollectionUtils.isEmpty(datasourceConfiguration.getHeaders())) { + clientBuilder.setDefaultHeaders( + (Header[]) datasourceConfiguration.getHeaders() + .stream() + .map(h -> new BasicHeader(h.getKey(), h.getValue())) + .toArray() + ); + } - if (!CollectionUtils.isEmpty(datasourceConfiguration.getHeaders())) { - clientBuilder.setDefaultHeaders( - (Header[]) datasourceConfiguration.getHeaders() - .stream() - .map(h -> new BasicHeader(h.getKey(), h.getValue())) - .toArray() - ); - } - - return Mono.just(clientBuilder.build()); + return Mono.just(clientBuilder.build()); + }) + .flatMap(obj -> obj) + .subscribeOn(Schedulers.elastic()); } @Override @@ -225,7 +237,8 @@ public class ElasticSearchPlugin extends BasePlugin { return new DatasourceTestResult(); }) - .onErrorResume(error -> Mono.just(new DatasourceTestResult(error.getMessage()))); + .onErrorResume(error -> Mono.just(new DatasourceTestResult(error.getMessage()))) + .subscribeOn(Schedulers.elastic()); } } }