diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisConfig.java index c0bd7fb88a..a347f8da75 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisConfig.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisConfig.java @@ -4,11 +4,9 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; -import org.springframework.data.redis.connection.ReactiveSubscription; import org.springframework.data.redis.core.ReactiveRedisOperations; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; -import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -17,6 +15,11 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; @Slf4j public class RedisConfig { + @Bean + public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) { + return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string()); + } + /** * This is the topic to which we will publish & subscribe to. We can have multiple topics based on the messages * that we wish to broadcast. Starting with a single one for now. @@ -27,38 +30,6 @@ public class RedisConfig { return new ChannelTopic("appsmith:queue"); } - @Bean - public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) { - return new ReactiveRedisTemplate<>(factory, RedisSerializationContext.string()); - } - - /** - * This is the listener that will receive all the messages from the Redis channel topic configured in topic(). - * Adding dummy implementation with log message for now, but can be extended to include more complex behaviour - * - * @param factory - * @return - */ - @Bean - ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) { - ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory); - container - // The receive function can subscribe to multiple topics as well. Can also subscribe via regex pattern - // to multiple channels - .receive(topic()) - // Extract the message from the incoming object. By default it's String serialization. The receive() fxn - // can also configure different serialization classes based on requirements - .map(ReactiveSubscription.Message::getMessage) - // Actual processing of the message. - .map(msg -> { - log.info("**** In the redis subscribe **** : {}", msg); - return msg; - }) - // Required to subscribe else this chain is never invoked - .subscribe(); - return container; - } - @Bean ReactiveRedisOperations reactiveRedisOperations(ReactiveRedisConnectionFactory factory) { Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer<>(String.class); @@ -70,5 +41,4 @@ public class RedisConfig { return new ReactiveRedisTemplate<>(factory, context); } - } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisListenerConfig.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisListenerConfig.java new file mode 100644 index 0000000000..72e4f707a9 --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/configurations/RedisListenerConfig.java @@ -0,0 +1,62 @@ +package com.appsmith.server.configurations; + +import com.appsmith.server.dtos.InstallPluginRedisDTO; +import com.appsmith.server.services.PluginService; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer; +import reactor.core.publisher.Mono; + +@Configuration +@Slf4j +public class RedisListenerConfig { + + private final ObjectMapper objectMapper; + private final PluginService pluginService; + private final ChannelTopic topic; + + @Autowired + public RedisListenerConfig(ObjectMapper objectMapper, PluginService pluginService, ChannelTopic topic) { + this.objectMapper = objectMapper; + this.pluginService = pluginService; + this.topic = topic; + } + + /** + * This is the listener that will receive all the messages from the Redis channel topic configured in topic(). + * Currently the only topic we are listening to is for install plugin requests. + * @param factory + * @return + */ + @Bean + public ReactiveRedisMessageListenerContainer container(ReactiveRedisConnectionFactory factory) { + ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory); + container + // The receive function can subscribe to multiple topics as well. Can also subscribe via regex pattern + // to multiple channels + .receive(topic) + // Extract the message from the incoming object. By default it's String serialization. The receive() fxn + // can also configure different serialization classes based on requirements + .map(p -> p.getMessage()) + .map(msg -> { + try { + InstallPluginRedisDTO installPluginRedisDTO = objectMapper.readValue(msg, InstallPluginRedisDTO.class); + return installPluginRedisDTO; + } catch (Exception e) { + log.error("", e); + return Mono.error(e); + } + }) + // Actual processing of the message. + .map(redisPluginObj -> pluginService.redisInstallPlugin((InstallPluginRedisDTO) redisPluginObj)) + // Required to subscribe else this chain is never invoked + .subscribe(); + return container; + } + +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/dtos/InstallPluginRedisDTO.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/dtos/InstallPluginRedisDTO.java new file mode 100644 index 0000000000..114f6462e6 --- /dev/null +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/dtos/InstallPluginRedisDTO.java @@ -0,0 +1,11 @@ +package com.appsmith.server.dtos; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class InstallPluginRedisDTO { + String organizationId; + PluginOrgDTO pluginOrgDTO; +} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginService.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginService.java index b0ab2dd79b..514a5fde25 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginService.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginService.java @@ -3,6 +3,7 @@ package com.appsmith.server.services; import com.appsmith.server.domains.Organization; import com.appsmith.server.domains.Plugin; import com.appsmith.server.domains.PluginType; +import com.appsmith.server.dtos.InstallPluginRedisDTO; import com.appsmith.server.dtos.PluginOrgDTO; import reactor.core.publisher.Mono; @@ -25,4 +26,6 @@ public interface PluginService extends CrudService { Mono findByName(String name); Mono findById(String id); + + Plugin redisInstallPlugin(InstallPluginRedisDTO installPluginRedisDTO); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginServiceImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginServiceImpl.java index bbd7148045..b1c38fe535 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginServiceImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/services/PluginServiceImpl.java @@ -5,11 +5,14 @@ import com.appsmith.server.domains.OrganizationPlugin; import com.appsmith.server.domains.Plugin; import com.appsmith.server.domains.PluginType; import com.appsmith.server.domains.User; +import com.appsmith.server.dtos.InstallPluginRedisDTO; import com.appsmith.server.dtos.OrganizationPluginStatus; import com.appsmith.server.dtos.PluginOrgDTO; import com.appsmith.server.exceptions.AppsmithError; import com.appsmith.server.exceptions.AppsmithException; import com.appsmith.server.repositories.PluginRepository; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.segment.analytics.Analytics; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -18,6 +21,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.redis.core.ReactiveRedisTemplate; +import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; @@ -33,13 +38,15 @@ import java.util.List; @Service public class PluginServiceImpl extends BaseService implements PluginService { - private final PluginRepository pluginRepository; private final ApplicationContext applicationContext; private final OrganizationService organizationService; private final PluginManager pluginManager; + private final ReactiveRedisTemplate reactiveTemplate; + private final ChannelTopic topic; + private final ObjectMapper objectMapper; - private static final int CONNECTION_TIMEOUT = 1000; - private static final int READ_TIMEOUT = 1000; + private static final int CONNECTION_TIMEOUT = 10000; + private static final int READ_TIMEOUT = 10000; @Autowired public PluginServiceImpl(Scheduler scheduler, @@ -50,12 +57,16 @@ public class PluginServiceImpl extends BaseService reactiveTemplate, + ChannelTopic topic, ObjectMapper objectMapper) { super(scheduler, validator, mongoConverter, reactiveMongoTemplate, repository, analytics, sessionUserService); this.applicationContext = applicationContext; - pluginRepository = repository; this.organizationService = organizationService; this.pluginManager = pluginManager; + this.reactiveTemplate = reactiveTemplate; + this.topic = topic; + this.objectMapper = objectMapper; } public OldPluginExecutor getPluginExecutor(PluginType pluginType, String className) { @@ -78,7 +89,7 @@ public class PluginServiceImpl extends BaseService userMono = super.sessionUserService.getCurrentUser(); plugin.setDeleted(false); - return pluginRepository + return repository .save(plugin) .flatMap(this::segmentTrackCreate); } @@ -130,38 +141,27 @@ public class PluginServiceImpl extends BaseService { + log.debug("Plugin not already installed. Running the switch if empty code block"); //If the plugin is not found in the organization, its not installed already. Install now. - return pluginRepository + return repository .findById(pluginDTO.getPluginId()) - .map(plugin -> { - if (plugin.getJarLocation() == null) { - // Plugin jar location not set. Must be local - /** TODO - * In future throw an error if jar location is not set - */ - return plugin; - } + .zipWith(userMono, (plugin, user) -> { - String baseUrl = "../dist/plugins/"; - String pluginJar = plugin.getName() + ".jar"; - - // Else download the plugin jar to the local + log.debug("Before publishing to the redis queue"); + //Publish the event to the pub/sub queue + InstallPluginRedisDTO installPluginRedisDTO = new InstallPluginRedisDTO(); + installPluginRedisDTO.setOrganizationId(user.getOrganizationId()); + installPluginRedisDTO.setPluginOrgDTO(pluginDTO); + String jsonString; try { - FileUtils.copyURLToFile( - new URL(plugin.getJarLocation()), - new File(baseUrl, pluginJar), - CONNECTION_TIMEOUT, - READ_TIMEOUT); - } catch (Exception e) { - log.error("",e); - return Mono.error(new AppsmithException(AppsmithError.PLUGIN_INSTALLATION_FAILED_DOWNLOAD_ERROR)); + jsonString = objectMapper.writeValueAsString(installPluginRedisDTO); + } catch (JsonProcessingException e) { + log.error("", e); + return Mono.error(e); } - - //Now that the plugin has been downloaded, load and restart the plugin - pluginManager.loadPlugin(Path.of(baseUrl + pluginJar)); - pluginManager.startPlugins(); - - return plugin; + return reactiveTemplate + .convertAndSend(topic.getTopic(), jsonString) + .subscribe(); }) //Now that the plugin jar has been successfully downloaded, go on and add the plugin to the organization .then(userMono) @@ -172,14 +172,15 @@ public class PluginServiceImpl extends BaseService(); } - log.debug("Installing plugin {} for organization {}", pluginDTO.getPluginId(), organization.getName()); + OrganizationPlugin organizationPlugin = new OrganizationPlugin(); organizationPlugin.setPluginId(pluginDTO.getPluginId()); organizationPlugin.setStatus(status); organizationPluginList.add(organizationPlugin); organization.setPlugins(organizationPluginList); - //return the organization + log.debug("Going to save the organization with install plugin. This means that installation has been successful"); + return organization; }) .flatMap(organizationService::save); @@ -194,4 +195,48 @@ public class PluginServiceImpl extends BaseService findById(String id) { return repository.findById(id); } + + @Override + public Plugin redisInstallPlugin(InstallPluginRedisDTO installPluginRedisDTO) { + Mono pluginMono = repository.findById(installPluginRedisDTO.getPluginOrgDTO().getPluginId()); + return pluginMono + .flatMap(plugin -> downloadAndStartPlugin(installPluginRedisDTO.getOrganizationId(), plugin)) + .switchIfEmpty(Mono.defer(() -> { + log.debug("During redisInstallPlugin, no plugin with plugin id {} found. Returning without download and install", installPluginRedisDTO.getPluginOrgDTO().getPluginId()); + return Mono.just(new Plugin()); + })).block(); + } + + private Mono downloadAndStartPlugin(String organizationId, Plugin plugin) { + if (plugin.getJarLocation() == null) { + // Plugin jar location not set. Must be local + /** TODO + * In future throw an error if jar location is not set + */ + log.debug("plugin jarLocation is null. Not downloading and starting. Returning now"); + return Mono.just(plugin); + } + + String baseUrl = "../dist/plugins/"; + String pluginJar = plugin.getName() + "-" + organizationId + ".jar"; + log.debug("Going to download plugin jar with name : {}", baseUrl+pluginJar); + + try { + FileUtils.copyURLToFile( + new URL(plugin.getJarLocation()), + new File(baseUrl, pluginJar), + CONNECTION_TIMEOUT, + READ_TIMEOUT); + } catch (Exception e) { + log.error("",e); + return Mono.error(new AppsmithException(AppsmithError.PLUGIN_INSTALLATION_FAILED_DOWNLOAD_ERROR)); + } + + //Now that the plugin has been downloaded, load and restart the plugin + pluginManager.loadPlugin(Path.of(baseUrl + pluginJar)); + //The following only starts plugins which have been loaded but hasn't been started yet. + pluginManager.startPlugins(); + + return Mono.just(plugin); + } }