Adding the redis listener via spring-data-redis-reactive.

The listeners need to be configured in the RedisConfig class via Beans. These beans can then invoke complex business logic based on requirements.
This commit is contained in:
Trisha Anand 2019-10-07 09:02:02 +00:00
parent 0d63de8404
commit d1bcc282f8
5 changed files with 161 additions and 70 deletions

View File

@ -4,11 +4,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@ -17,6 +15,11 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
@Slf4j
public class RedisConfig {
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string());
}
/**
* This is the topic to which we will publish & subscribe to. We can have multiple topics based on the messages
* that we wish to broadcast. Starting with a single one for now.
@ -27,38 +30,6 @@ public class RedisConfig {
return new ChannelTopic("appsmith:queue");
}
@Bean
public ReactiveRedisTemplate<String, String> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string());
}
/**
* This is the listener that will receive all the messages from the Redis channel topic configured in topic().
* Adding dummy implementation with log message for now, but can be extended to include more complex behaviour
*
* @param factory
* @return
*/
@Bean
ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) {
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
container
// The receive function can subscribe to multiple topics as well. Can also subscribe via regex pattern
// to multiple channels
.receive(topic())
// Extract the message from the incoming object. By default it's String serialization. The receive() fxn
// can also configure different serialization classes based on requirements
.map(ReactiveSubscription.Message::getMessage)
// Actual processing of the message.
.map(msg -> {
log.info("**** In the redis subscribe **** : {}", msg);
return msg;
})
// Required to subscribe else this chain is never invoked
.subscribe();
return container;
}
@Bean
ReactiveRedisOperations<String, String> reactiveRedisOperations(ReactiveRedisConnectionFactory factory) {
Jackson2JsonRedisSerializer<String> serializer = new Jackson2JsonRedisSerializer<>(String.class);
@ -70,5 +41,4 @@ public class RedisConfig {
return new ReactiveRedisTemplate<>(factory, context);
}
}

View File

@ -0,0 +1,62 @@
package com.appsmith.server.configurations;
import com.appsmith.server.dtos.InstallPluginRedisDTO;
import com.appsmith.server.services.PluginService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import reactor.core.publisher.Mono;
@Configuration
@Slf4j
public class RedisListenerConfig {
private final ObjectMapper objectMapper;
private final PluginService pluginService;
private final ChannelTopic topic;
@Autowired
public RedisListenerConfig(ObjectMapper objectMapper, PluginService pluginService, ChannelTopic topic) {
this.objectMapper = objectMapper;
this.pluginService = pluginService;
this.topic = topic;
}
/**
* This is the listener that will receive all the messages from the Redis channel topic configured in topic().
* Currently the only topic we are listening to is for install plugin requests.
* @param factory
* @return
*/
@Bean
public ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) {
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);
container
// The receive function can subscribe to multiple topics as well. Can also subscribe via regex pattern
// to multiple channels
.receive(topic)
// Extract the message from the incoming object. By default it's String serialization. The receive() fxn
// can also configure different serialization classes based on requirements
.map(p -> p.getMessage())
.map(msg -> {
try {
InstallPluginRedisDTO installPluginRedisDTO = objectMapper.readValue(msg, InstallPluginRedisDTO.class);
return installPluginRedisDTO;
} catch (Exception e) {
log.error("", e);
return Mono.error(e);
}
})
// Actual processing of the message.
.map(redisPluginObj -> pluginService.redisInstallPlugin((InstallPluginRedisDTO) redisPluginObj))
// Required to subscribe else this chain is never invoked
.subscribe();
return container;
}
}

View File

@ -0,0 +1,11 @@
package com.appsmith.server.dtos;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class InstallPluginRedisDTO {
String organizationId;
PluginOrgDTO pluginOrgDTO;
}

View File

@ -3,6 +3,7 @@ package com.appsmith.server.services;
import com.appsmith.server.domains.Organization;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.domains.PluginType;
import com.appsmith.server.dtos.InstallPluginRedisDTO;
import com.appsmith.server.dtos.PluginOrgDTO;
import reactor.core.publisher.Mono;
@ -25,4 +26,6 @@ public interface PluginService extends CrudService<Plugin, String> {
Mono<Plugin> findByName(String name);
Mono<Plugin> findById(String id);
Plugin redisInstallPlugin(InstallPluginRedisDTO installPluginRedisDTO);
}

View File

@ -5,11 +5,14 @@ import com.appsmith.server.domains.OrganizationPlugin;
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.domains.PluginType;
import com.appsmith.server.domains.User;
import com.appsmith.server.dtos.InstallPluginRedisDTO;
import com.appsmith.server.dtos.OrganizationPluginStatus;
import com.appsmith.server.dtos.PluginOrgDTO;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.repositories.PluginRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.segment.analytics.Analytics;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
@ -18,6 +21,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
@ -33,13 +38,15 @@ import java.util.List;
@Service
public class PluginServiceImpl extends BaseService<PluginRepository, Plugin, String> implements PluginService {
private final PluginRepository pluginRepository;
private final ApplicationContext applicationContext;
private final OrganizationService organizationService;
private final PluginManager pluginManager;
private final ReactiveRedisTemplate<String, String> reactiveTemplate;
private final ChannelTopic topic;
private final ObjectMapper objectMapper;
private static final int CONNECTION_TIMEOUT = 1000;
private static final int READ_TIMEOUT = 1000;
private static final int CONNECTION_TIMEOUT = 10000;
private static final int READ_TIMEOUT = 10000;
@Autowired
public PluginServiceImpl(Scheduler scheduler,
@ -50,12 +57,16 @@ public class PluginServiceImpl extends BaseService<PluginRepository, Plugin, Str
ApplicationContext applicationContext,
OrganizationService organizationService,
Analytics analytics,
SessionUserService sessionUserService, PluginManager pluginManager) {
SessionUserService sessionUserService, PluginManager pluginManager,
ReactiveRedisTemplate<String, String> reactiveTemplate,
ChannelTopic topic, ObjectMapper objectMapper) {
super(scheduler, validator, mongoConverter, reactiveMongoTemplate, repository, analytics, sessionUserService);
this.applicationContext = applicationContext;
pluginRepository = repository;
this.organizationService = organizationService;
this.pluginManager = pluginManager;
this.reactiveTemplate = reactiveTemplate;
this.topic = topic;
this.objectMapper = objectMapper;
}
public OldPluginExecutor getPluginExecutor(PluginType pluginType, String className) {
@ -78,7 +89,7 @@ public class PluginServiceImpl extends BaseService<PluginRepository, Plugin, Str
Mono<User> userMono = super.sessionUserService.getCurrentUser();
plugin.setDeleted(false);
return pluginRepository
return repository
.save(plugin)
.flatMap(this::segmentTrackCreate);
}
@ -130,38 +141,27 @@ public class PluginServiceImpl extends BaseService<PluginRepository, Plugin, Str
//If plugin is already present for the organization, just return the organization, else install and return organization
return pluginInOrganizationMono
.switchIfEmpty(Mono.defer(() -> {
log.debug("Plugin not already installed. Running the switch if empty code block");
//If the plugin is not found in the organization, its not installed already. Install now.
return pluginRepository
return repository
.findById(pluginDTO.getPluginId())
.map(plugin -> {
if (plugin.getJarLocation() == null) {
// Plugin jar location not set. Must be local
/** TODO
* In future throw an error if jar location is not set
*/
return plugin;
}
.zipWith(userMono, (plugin, user) -> {
String baseUrl = "../dist/plugins/";
String pluginJar = plugin.getName() + ".jar";
// Else download the plugin jar to the local
log.debug("Before publishing to the redis queue");
//Publish the event to the pub/sub queue
InstallPluginRedisDTO installPluginRedisDTO = new InstallPluginRedisDTO();
installPluginRedisDTO.setOrganizationId(user.getOrganizationId());
installPluginRedisDTO.setPluginOrgDTO(pluginDTO);
String jsonString;
try {
FileUtils.copyURLToFile(
new URL(plugin.getJarLocation()),
new File(baseUrl, pluginJar),
CONNECTION_TIMEOUT,
READ_TIMEOUT);
} catch (Exception e) {
log.error("",e);
return Mono.error(new AppsmithException(AppsmithError.PLUGIN_INSTALLATION_FAILED_DOWNLOAD_ERROR));
jsonString = objectMapper.writeValueAsString(installPluginRedisDTO);
} catch (JsonProcessingException e) {
log.error("", e);
return Mono.error(e);
}
//Now that the plugin has been downloaded, load and restart the plugin
pluginManager.loadPlugin(Path.of(baseUrl + pluginJar));
pluginManager.startPlugins();
return plugin;
return reactiveTemplate
.convertAndSend(topic.getTopic(), jsonString)
.subscribe();
})
//Now that the plugin jar has been successfully downloaded, go on and add the plugin to the organization
.then(userMono)
@ -172,14 +172,15 @@ public class PluginServiceImpl extends BaseService<PluginRepository, Plugin, Str
if (organizationPluginList == null) {
organizationPluginList = new ArrayList<OrganizationPlugin>();
}
log.debug("Installing plugin {} for organization {}", pluginDTO.getPluginId(), organization.getName());
OrganizationPlugin organizationPlugin = new OrganizationPlugin();
organizationPlugin.setPluginId(pluginDTO.getPluginId());
organizationPlugin.setStatus(status);
organizationPluginList.add(organizationPlugin);
organization.setPlugins(organizationPluginList);
//return the organization
log.debug("Going to save the organization with install plugin. This means that installation has been successful");
return organization;
})
.flatMap(organizationService::save);
@ -194,4 +195,48 @@ public class PluginServiceImpl extends BaseService<PluginRepository, Plugin, Str
public Mono<Plugin> findById(String id) {
return repository.findById(id);
}
@Override
public Plugin redisInstallPlugin(InstallPluginRedisDTO installPluginRedisDTO) {
Mono<Plugin> pluginMono = repository.findById(installPluginRedisDTO.getPluginOrgDTO().getPluginId());
return pluginMono
.flatMap(plugin -> downloadAndStartPlugin(installPluginRedisDTO.getOrganizationId(), plugin))
.switchIfEmpty(Mono.defer(() -> {
log.debug("During redisInstallPlugin, no plugin with plugin id {} found. Returning without download and install", installPluginRedisDTO.getPluginOrgDTO().getPluginId());
return Mono.just(new Plugin());
})).block();
}
private Mono<Plugin> downloadAndStartPlugin(String organizationId, Plugin plugin) {
if (plugin.getJarLocation() == null) {
// Plugin jar location not set. Must be local
/** TODO
* In future throw an error if jar location is not set
*/
log.debug("plugin jarLocation is null. Not downloading and starting. Returning now");
return Mono.just(plugin);
}
String baseUrl = "../dist/plugins/";
String pluginJar = plugin.getName() + "-" + organizationId + ".jar";
log.debug("Going to download plugin jar with name : {}", baseUrl+pluginJar);
try {
FileUtils.copyURLToFile(
new URL(plugin.getJarLocation()),
new File(baseUrl, pluginJar),
CONNECTION_TIMEOUT,
READ_TIMEOUT);
} catch (Exception e) {
log.error("",e);
return Mono.error(new AppsmithException(AppsmithError.PLUGIN_INSTALLATION_FAILED_DOWNLOAD_ERROR));
}
//Now that the plugin has been downloaded, load and restart the plugin
pluginManager.loadPlugin(Path.of(baseUrl + pluginJar));
//The following only starts plugins which have been loaded but hasn't been started yet.
pluginManager.startPlugins();
return Mono.just(plugin);
}
}