Elastic search plugin's blocking calls moved to elastic scheduler thread (#1855)

This commit is contained in:
Trisha Anand 2020-11-23 17:01:34 +05:30 committed by GitHub
parent ad52842e1c
commit 0bb99dd087
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -30,6 +30,7 @@ import org.pf4j.PluginWrapper;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.io.IOException;
import java.net.MalformedURLException;
@ -54,50 +55,56 @@ public class ElasticSearchPlugin extends BasePlugin {
public Mono<ActionExecutionResult> execute(RestClient client,
DatasourceConfiguration datasourceConfiguration,
ActionConfiguration actionConfiguration) {
final ActionExecutionResult result = new ActionExecutionResult();
String body = actionConfiguration.getBody();
return (Mono<ActionExecutionResult>) Mono.fromCallable(() -> {
final ActionExecutionResult result = new ActionExecutionResult();
final String path = actionConfiguration.getPath();
final Request request = new Request(actionConfiguration.getHttpMethod().toString(), path);
ContentType contentType = ContentType.APPLICATION_JSON;
String body = actionConfiguration.getBody();
if (isBulkQuery(path)) {
contentType = ContentType.create("application/x-ndjson");
final String path = actionConfiguration.getPath();
final Request request = new Request(actionConfiguration.getHttpMethod().toString(), path);
ContentType contentType = ContentType.APPLICATION_JSON;
// If body is a JSON Array, convert it to an ND-JSON string.
if (body != null && body.trim().startsWith("[")) {
final StringBuilder ndJsonBuilder = new StringBuilder();
try {
List<Object> commands = objectMapper.readValue(body, ArrayList.class);
for (Object object : commands) {
ndJsonBuilder.append(objectMapper.writeValueAsString(object)).append("\n");
if (isBulkQuery(path)) {
contentType = ContentType.create("application/x-ndjson");
// If body is a JSON Array, convert it to an ND-JSON string.
if (body != null && body.trim().startsWith("[")) {
final StringBuilder ndJsonBuilder = new StringBuilder();
try {
List<Object> commands = objectMapper.readValue(body, ArrayList.class);
for (Object object : commands) {
ndJsonBuilder.append(objectMapper.writeValueAsString(object)).append("\n");
}
} catch (IOException e) {
final String message = "Error converting array to ND-JSON: " + e.getMessage();
log.warn(message, e);
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message));
}
} catch (IOException e) {
final String message = "Error converting array to ND-JSON: " + e.getMessage();
log.warn(message, e);
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message));
body = ndJsonBuilder.toString();
}
body = ndJsonBuilder.toString();
}
}
if (body != null) {
request.setEntity(new NStringEntity(body, contentType));
}
if (body != null) {
request.setEntity(new NStringEntity(body, contentType));
}
try {
final String responseBody = new String(
client.performRequest(request).getEntity().getContent().readAllBytes());
result.setBody(objectMapper.readValue(responseBody, HashMap.class));
} catch (IOException e) {
final String message = "Error performing request: " + e.getMessage();
log.warn(message, e);
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message));
}
try {
final String responseBody = new String(
client.performRequest(request).getEntity().getContent().readAllBytes());
result.setBody(objectMapper.readValue(responseBody, HashMap.class));
} catch (IOException e) {
final String message = "Error performing request: " + e.getMessage();
log.warn(message, e);
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, message));
}
result.setIsExecutionSuccess(true);
return Mono.just(result);
result.setIsExecutionSuccess(true);
System.out.println(Thread.currentThread().getName() + ": In the Elastic Search Plugin, got action execution result: " + result.toString());
return Mono.just(result);
})
.flatMap(obj -> obj)
.subscribeOn(Schedulers.elastic());
}
private static boolean isBulkQuery(String path) {
@ -106,52 +113,57 @@ public class ElasticSearchPlugin extends BasePlugin {
@Override
public Mono<RestClient> datasourceCreate(DatasourceConfiguration datasourceConfiguration) {
final List<HttpHost> hosts = new ArrayList<>();
for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) {
URL url;
try {
url = new URL(endpoint.getHost());
} catch (MalformedURLException e) {
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Invalid host provided. It should be of the form http(s)://your-es-url.com"));
}
String scheme = "http";
if (url.getProtocol() != null) {
scheme = url.getProtocol();
return (Mono<RestClient>) Mono.fromCallable(() -> {
final List<HttpHost> hosts = new ArrayList<>();
for (Endpoint endpoint : datasourceConfiguration.getEndpoints()) {
URL url;
try {
url = new URL(endpoint.getHost());
} catch (MalformedURLException e) {
return Mono.error(new AppsmithPluginException(AppsmithPluginError.PLUGIN_ERROR, "Invalid host provided. It should be of the form http(s)://your-es-url.com"));
}
String scheme = "http";
if (url.getProtocol() != null) {
scheme = url.getProtocol();
}
hosts.add(new HttpHost(url.getHost(), endpoint.getPort().intValue(), scheme));
}
hosts.add(new HttpHost(url.getHost(), endpoint.getPort().intValue(), scheme));
}
final RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[]{}));
final RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[]{}));
final AuthenticationDTO authentication = datasourceConfiguration.getAuthentication();
if (authentication != null
&& !StringUtils.isEmpty(authentication.getUsername())
&& !StringUtils.isEmpty(authentication.getPassword())) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(authentication.getUsername(), authentication.getPassword())
);
final AuthenticationDTO authentication = datasourceConfiguration.getAuthentication();
if (authentication != null
&& !StringUtils.isEmpty(authentication.getUsername())
&& !StringUtils.isEmpty(authentication.getPassword())) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(authentication.getUsername(), authentication.getPassword())
);
clientBuilder
.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
);
}
clientBuilder
.setHttpClientConfigCallback(
httpClientBuilder -> httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
);
}
if (!CollectionUtils.isEmpty(datasourceConfiguration.getHeaders())) {
clientBuilder.setDefaultHeaders(
(Header[]) datasourceConfiguration.getHeaders()
.stream()
.map(h -> new BasicHeader(h.getKey(), h.getValue()))
.toArray()
);
}
if (!CollectionUtils.isEmpty(datasourceConfiguration.getHeaders())) {
clientBuilder.setDefaultHeaders(
(Header[]) datasourceConfiguration.getHeaders()
.stream()
.map(h -> new BasicHeader(h.getKey(), h.getValue()))
.toArray()
);
}
return Mono.just(clientBuilder.build());
return Mono.just(clientBuilder.build());
})
.flatMap(obj -> obj)
.subscribeOn(Schedulers.elastic());
}
@Override
@ -225,7 +237,8 @@ public class ElasticSearchPlugin extends BasePlugin {
return new DatasourceTestResult();
})
.onErrorResume(error -> Mono.just(new DatasourceTestResult(error.getMessage())));
.onErrorResume(error -> Mono.just(new DatasourceTestResult(error.getMessage())))
.subscribeOn(Schedulers.elastic());
}
}
}