feat: Support mongodb transactions for import application flow (#14939)

## Description

Import application flow is a multi-stage process where we touch all the
mongo DB collections required to store the application in DB, which
today includes pages, actions, JSObjects, etc. When the flow is complete
then only we can say the application is successfully imported and DB
won't have any stale objects stored. But in a negative scenario where
the flow might break because of some unknown exceptions, objects stored
earlier become stale and reside in DB forever. With this PR we are
establishing the infra for transactions to avoid saving stale DB
objects. This is achieved by a rollback mechanism in case an exception
is thrown in the middle of execution.

Note: Since transactions are built on concepts of logical sessions they
require mecahnics (like oplog) which are only available in replica set
environment.

You can always convert a standalone to a single noded replica set and
transactions will work with this one node.


[https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/](https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/?_ga=2.53712296.1453502529.1669195955-2065030687.1664009965)

```
For local dev earlier we used to run the single node without replica set. Henceforth if we want to take the advantage of transactions please follow the steps to run mongod single node in local with replica set:

- Upgrade the MongoDB version to 4.4 or higher if it’s 4.2 or lower (https://www.mongodb.com/docs/manual/release-notes/4.4/#std-label-4.4-upgrade)
- Close the mongoDB instance running in your local
- Start the mongoDB in replica set mode and initiate the replica set
    - mongod --port 27017 --dbpath <path/to/db> --replSet <replica-set-name> && mongo --eval “rs.initiate()”
- One can use following commands to check replica set status: 
    - mongo appsmith
    - rs.status()
- By this time you should have the mongo running with replica set
```
<img width="1788" alt="Screenshot 2022-07-01 at 10 31 27 PM"
src="https://user-images.githubusercontent.com/41686026/176944386-f9d94715-c0cf-4900-93b7-f73647132d60.png">

This also means mongodb connection string used in env file will now
include the replica-set name if one wants to leverage transactions:
`mongodb://localhost:27017/appsmith?replicaSet={replica-set-name}`

Fixes https://github.com/appsmithorg/appsmith/issues/14543

## Type of change

- Bug fix (non-breaking change which fixes an issue)
- This change requires a documentation update

## How Has This Been Tested?

> JUnit 
> Manual test

## Checklist:

- [x] My code follows the style guidelines of this project
- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] New and existing unit tests pass locally with my changes

---------

Co-authored-by: Anagh Hegde <anagh@appsmith.com>
Co-authored-by: Aishwarya UR <aishwarya@appsmith.com>
This commit is contained in:
Abhijeet 2023-02-09 14:50:04 +05:30 committed by GitHub
parent 2e80ccac29
commit 9f6ef23f5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 256 additions and 40 deletions

View File

@ -218,6 +218,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>

View File

@ -22,6 +22,7 @@ import org.springframework.data.convert.SimpleTypeInformationMapper;
import org.springframework.data.convert.TypeInformationMapper;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
import org.springframework.data.mongodb.config.EnableReactiveMongoAuditing;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
@ -32,6 +33,8 @@ import org.springframework.data.mongodb.core.convert.MongoTypeMapper;
import org.springframework.data.mongodb.core.convert.NoOpDbRefResolver;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.reactive.TransactionalOperator;
import java.util.Arrays;
import java.util.Collections;
@ -125,4 +128,13 @@ public class MongoConfig {
return new EncryptionMongoEventListener(encryptionService);
}
@Bean
public ReactiveTransactionManager reactiveTransactionManager(ReactiveMongoDatabaseFactory factory) {
return new ReactiveMongoTransactionManager(factory);
}
@Bean
public TransactionalOperator transactionalOperator(ReactiveTransactionManager transactionManager) {
return TransactionalOperator.create(transactionManager);
}
}

View File

@ -68,4 +68,6 @@ public interface CustomApplicationRepositoryCE extends AppsmithRepository<Applic
Mono<UpdateResult> updateFieldByDefaultIdAndBranchName(String defaultId, String defaultIdPath, Map<String,
Object> fieldNameValueMap, String branchName, String branchNamePath, AclPermission permission);
Mono<Application> findByNameAndWorkspaceId(String applicationName, String workspaceId, AclPermission permission);
}

View File

@ -275,4 +275,11 @@ public class CustomApplicationRepositoryCEImpl extends BaseAppsmithRepositoryImp
return super.updateFieldByDefaultIdAndBranchName(defaultId, defaultIdPath, fieldValueMap, branchName,
branchNamePath, permission);
}
@Override
public Mono<Application> findByNameAndWorkspaceId(String applicationName, String workspaceId, AclPermission permission) {
Criteria workspaceIdCriteria = where(fieldName(QApplication.application.workspaceId)).is(workspaceId);
Criteria applicationNameCriteria = where(fieldName(QApplication.application.name)).is(applicationName);
return queryOne(List.of(workspaceIdCriteria, applicationNameCriteria), permission);
}
}

View File

@ -14,7 +14,6 @@ import com.appsmith.server.solutions.ApplicationPermission;
import com.appsmith.server.solutions.DatasourcePermission;
import com.appsmith.server.solutions.ImportExportApplicationService;
import com.appsmith.server.solutions.PagePermission;
import io.sentry.protocol.App;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;

View File

@ -53,7 +53,6 @@ import jakarta.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.bson.types.ObjectId;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -71,6 +70,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static com.appsmith.external.helpers.AppsmithBeanUtils.copyNestedNonNullProperties;
import static com.appsmith.server.acl.AclPermission.MANAGE_APPLICATIONS;
import static org.apache.commons.lang.ObjectUtils.defaultIfNull;
@ -1175,22 +1175,25 @@ public class ApplicationPageServiceCEImpl implements ApplicationPageServiceCE {
Mono<User> userMono = sessionUserService.getCurrentUser().cache();
Mono<Application> applicationWithPoliciesMono = this.setApplicationPolicies(userMono, application.getWorkspaceId(), application);
Mono<Application> applicationMono = applicationService.findByNameAndWorkspaceId(actualName, application.getWorkspaceId(), MANAGE_APPLICATIONS);
return applicationWithPoliciesMono
.zipWith(userMono)
.flatMap(tuple -> {
Application application1 = tuple.getT1();
application1.setModifiedBy(tuple.getT2().getUsername()); // setting modified by to current user
// We can't use create or createApplication method here as we are expecting update operation if the
// _id is available with application object
return applicationService.save(application);
})
.onErrorResume(DuplicateKeyException.class, error -> {
if (error.getMessage() != null) {
return this.createOrUpdateSuffixedApplication(application, name, 1 + suffix);
}
throw error;
});
// We are taking pessimistic approach as this flow is used in import application where we are using transactions
// which creates problem if we hit duplicate key exception
return applicationMono
.flatMap(application1 ->
this.createOrUpdateSuffixedApplication(application, name, 1 + suffix)
)
.switchIfEmpty(Mono.defer(() ->
applicationWithPoliciesMono
.zipWith(userMono)
.flatMap(tuple -> {
Application application1 = tuple.getT1();
application1.setModifiedBy(tuple.getT2().getUsername()); // setting modified by to current user
// We can't use create or createApplication method here as we are expecting update operation if the
// _id is available with application object
return applicationService.save(application);
})
));
}
/**

View File

@ -78,7 +78,6 @@ public interface ApplicationServiceCE extends CrudService<Application, String> {
String defaultApplicationId,
String fieldName,
AclPermission aclPermission);
Mono<String> findBranchedApplicationId(String branchName, String defaultApplicationId, AclPermission permission);
Flux<Application> findAllApplicationsByDefaultApplicationId(String defaultApplicationId, AclPermission permission);
@ -99,4 +98,5 @@ public interface ApplicationServiceCE extends CrudService<Application, String> {
public Mono<Void> deleteAppNavigationLogo(String branchName, String applicationId);
Mono<Application> findByNameAndWorkspaceId(String applicationName, String workspaceId, AclPermission permission);
}

View File

@ -813,7 +813,7 @@ public class ApplicationServiceCEImpl extends BaseService<ApplicationRepository,
final Mono<Application> updateMono = this.update(applicationId, branchedApplication, branchName);
if (!StringUtils.hasLength(oldAssetId)){
if (!StringUtils.hasLength(oldAssetId)) {
return updateMono;
} else {
return assetService.remove(oldAssetId).then(updateMono);
@ -822,7 +822,12 @@ public class ApplicationServiceCEImpl extends BaseService<ApplicationRepository,
});
});
}
@Override
public Mono<Application> findByNameAndWorkspaceId(String applicationName, String workspaceId, AclPermission permission) {
return repository.findByNameAndWorkspaceId(applicationName, workspaceId, permission);
}
@Override

View File

@ -22,6 +22,7 @@ import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.transaction.reactive.TransactionalOperator;
@Slf4j
@Component
@ -51,12 +52,13 @@ public class ImportExportApplicationServiceImpl extends ImportExportApplicationS
ApplicationPermission applicationPermission,
PagePermission pagePermission,
ActionPermission actionPermission,
Gson gson) {
Gson gson,
TransactionalOperator transactionalOperator) {
super(datasourceService, sessionUserService, newActionRepository, datasourceRepository, pluginRepository,
workspaceService, applicationService, newPageService, applicationPageService, newPageRepository,
newActionService, sequenceService, examplesWorkspaceCloner, actionCollectionRepository,
actionCollectionService, themeService, analyticsService, customJSLibService, datasourcePermission,
workspacePermission, applicationPermission, pagePermission, actionPermission, gson);
workspacePermission, applicationPermission, pagePermission, actionPermission, gson, transactionalOperator);
}
}

View File

@ -22,6 +22,8 @@ import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
@Slf4j
@Component
@ -51,12 +53,13 @@ public class ImportExportApplicationServiceImplV2 extends ImportExportApplicatio
ApplicationPermission applicationPermission,
PagePermission pagePermission,
ActionPermission actionPermission,
Gson gson) {
Gson gson,
TransactionalOperator transactionalOperator) {
super(datasourceService, sessionUserService, newActionRepository, datasourceRepository, pluginRepository,
workspaceService, applicationService, newPageService, applicationPageService, newPageRepository,
newActionService, sequenceService, examplesWorkspaceCloner, actionCollectionRepository,
actionCollectionService, themeService, analyticsService, customJSLibService, datasourcePermission,
workspacePermission, applicationPermission, pagePermission, actionPermission, gson);
workspacePermission, applicationPermission, pagePermission, actionPermission, gson, transactionalOperator);
}
}

View File

@ -23,6 +23,7 @@ import com.appsmith.server.domains.Application;
import com.appsmith.server.domains.ApplicationPage;
import com.appsmith.server.domains.CustomJSLib;
import com.appsmith.server.domains.GitApplicationMetadata;
import com.appsmith.server.domains.GitApplicationMetadata;
import com.appsmith.server.domains.Layout;
import com.appsmith.server.domains.NewAction;
import com.appsmith.server.domains.NewPage;
@ -78,6 +79,7 @@ import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.Part;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -133,6 +135,7 @@ public class ImportExportApplicationServiceCEImpl implements ImportExportApplica
private final PagePermission pagePermission;
private final ActionPermission actionPermission;
private final Gson gson;
private final TransactionalOperator transactionalOperator;
/**
* This function will give the application resource to rebuild the application in import application flow
@ -1202,12 +1205,9 @@ public class ImportExportApplicationServiceCEImpl implements ImportExportApplica
})
.onErrorResume(throwable -> {
log.error("Error while importing the application ", throwable.getMessage());
if (importedApplication.getId() != null && applicationId == null) {
return applicationPageService.deleteApplication(importedApplication.getId())
.then(Mono.error(new AppsmithException(AppsmithError.GENERIC_JSON_IMPORT_ERROR, workspaceId, throwable.getMessage())));
}
return Mono.error(new AppsmithException(AppsmithError.UNKNOWN_PLUGIN_REFERENCE));
});
return Mono.error(new AppsmithException(AppsmithError.GENERIC_JSON_IMPORT_ERROR, workspaceId, ""));
})
.as(transactionalOperator::transactional);
// Import Application is currently a slow API because it needs to import and create application, pages, actions
// and action collection. This process may take time and the client may cancel the request. This leads to the flow

View File

@ -78,6 +78,8 @@ import org.springframework.http.ContentDisposition;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.Part;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.reactive.TransactionalOperator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -132,6 +134,7 @@ public class ImportExportApplicationServiceCEImplV2 implements ImportExportAppli
private final PagePermission pagePermission;
private final ActionPermission actionPermission;
private final Gson gson;
private final TransactionalOperator transactionalOperator;
private static final Set<MediaType> ALLOWED_CONTENT_TYPES = Set.of(MediaType.APPLICATION_JSON);
private static final String INVALID_JSON_FILE = "invalid json file";
@ -1231,12 +1234,9 @@ public class ImportExportApplicationServiceCEImplV2 implements ImportExportAppli
})
.onErrorResume(throwable -> {
log.error("Error while importing the application ", throwable.getMessage());
if (importedApplication.getId() != null && applicationId == null) {
return applicationPageService.deleteApplication(importedApplication.getId())
.then(Mono.error(new AppsmithException(AppsmithError.GENERIC_JSON_IMPORT_ERROR, workspaceId, throwable.getMessage())));
}
return Mono.error(new AppsmithException(AppsmithError.GENERIC_JSON_IMPORT_ERROR, workspaceId, ""));
});
})
.as(transactionalOperator::transactional);
// Import Application is currently a slow API because it needs to import and create application, pages, actions
// and action collection. This process may take time and the client may cancel the request. This leads to the flow

View File

@ -0,0 +1,17 @@
package com.appsmith.server.configurations;
import de.flapdoodle.embed.mongo.commands.MongodArguments;
import de.flapdoodle.embed.mongo.config.Storage;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TransactionalConfig {
@Bean
MongodArguments mongodArguments() {
return MongodArguments.builder()
.replication(Storage.of("appsmith-replica-set", 10))
.build();
}
}

View File

@ -2901,14 +2901,13 @@ public class ImportExportApplicationServiceTests {
assert appJson != null;
final String randomId = UUID.randomUUID().toString();
appJson.getDatasourceList().get(0).setPluginId(randomId);
final Mono<Application> resultMono = workspaceService
.create(newWorkspace)
.flatMap(workspace -> importExportApplicationService.importApplicationInWorkspace(workspace.getId(), appJson));
Workspace createdWorkspace = workspaceService.create(newWorkspace).block();
final Mono<Application> resultMono = importExportApplicationService.importApplicationInWorkspace(createdWorkspace.getId(), appJson);
StepVerifier
.create(resultMono)
.expectErrorMatches(throwable -> throwable instanceof AppsmithException &&
throwable.getMessage().equals(AppsmithError.UNKNOWN_PLUGIN_REFERENCE.getMessage(randomId)))
throwable.getMessage().equals(AppsmithError.GENERIC_JSON_IMPORT_ERROR.getMessage(createdWorkspace.getId(), "")))
.verify();
}

View File

@ -0,0 +1,162 @@
package com.appsmith.server.transactions;
import com.appsmith.server.domains.ActionCollection;
import com.appsmith.server.domains.Application;
import com.appsmith.server.domains.NewAction;
import com.appsmith.server.domains.NewPage;
import com.appsmith.server.domains.Workspace;
import com.appsmith.server.dtos.ApplicationJson;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.MockPluginExecutor;
import com.appsmith.server.helpers.PluginExecutorHelper;
import com.appsmith.server.migrations.JsonSchemaMigration;
import com.appsmith.server.repositories.ActionCollectionRepository;
import com.appsmith.server.repositories.NewActionRepository;
import com.appsmith.server.services.ActionCollectionService;
import com.appsmith.server.services.NewActionService;
import com.appsmith.server.services.WorkspaceService;
import com.appsmith.server.solutions.ImportExportApplicationService;
import com.google.gson.Gson;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.security.test.context.support.WithUserDetails;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.TestPropertySource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
// All the test case are for failure or exception. Test cases for valid json file is already present in ImportExportApplicationServiceTest class
@AutoConfigureDataMongo
@SpringBootTest(
properties = "de.flapdoodle.mongodb.embedded.version=5.0.5"
)
@EnableAutoConfiguration()
@TestPropertySource(properties = "property=C")
@DirtiesContext
public class ImportApplicationTransactionServiceTest {
@Autowired
@Qualifier("importExportServiceCEImplV2")
ImportExportApplicationService importExportApplicationService;
@Autowired
WorkspaceService workspaceService;
@Autowired
MongoTemplate mongoTemplate;
@MockBean
NewActionService newActionService;
@MockBean
NewActionRepository newActionRepository;
@MockBean
ActionCollectionService actionCollectionService;
@MockBean
ActionCollectionRepository actionCollectionRepository;
@MockBean
PluginExecutorHelper pluginExecutorHelper;
private ApplicationJson applicationJson = new ApplicationJson();
Long applicationCount = 0L, pageCount = 0L, actionCount = 0L, actionCollectionCount = 0L;
@BeforeEach
public void setup() {
Mockito
.when(pluginExecutorHelper.getPluginExecutor(Mockito.any()))
.thenReturn(Mono.just(new MockPluginExecutor()));
applicationJson = createAppJson("test_assets/ImportExportServiceTest/valid-application.json").block();
applicationCount = mongoTemplate.count(new Query(), Application.class);
pageCount = mongoTemplate.count(new Query(), NewPage.class);
actionCount = mongoTemplate.count(new Query(), NewAction.class);
actionCollectionCount = mongoTemplate.count(new Query(), ActionCollection.class);
}
private FilePart createFilePart(String filePath) {
FilePart filepart = Mockito.mock(FilePart.class, Mockito.RETURNS_DEEP_STUBS);
Flux<DataBuffer> dataBufferFlux = DataBufferUtils
.read(
new ClassPathResource(filePath),
new DefaultDataBufferFactory(),
4096)
.cache();
Mockito.when(filepart.content()).thenReturn(dataBufferFlux);
Mockito.when(filepart.headers().getContentType()).thenReturn(MediaType.APPLICATION_JSON);
return filepart;
}
private Mono<ApplicationJson> createAppJson(String filePath) {
FilePart filePart = createFilePart(filePath);
Mono<String> stringifiedFile = DataBufferUtils.join(filePart.content())
.map(dataBuffer -> {
byte[] data = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(data);
DataBufferUtils.release(dataBuffer);
return new String(data);
});
return stringifiedFile
.map(data -> {
Gson gson = new Gson();
return gson.fromJson(data, ApplicationJson.class);
})
.map(JsonSchemaMigration::migrateApplicationToLatestSchema);
}
@Test
@WithUserDetails(value = "api_user")
public void importApplication_exceptionDuringActionSave_savedPagesAndApplicationReverted() {
Workspace newWorkspace = new Workspace();
newWorkspace.setName("Template Workspace");
Mockito.when(newActionService.save(Mockito.any()))
.thenThrow(new AppsmithException(AppsmithError.GENERIC_BAD_REQUEST));
Workspace createdWorkspace = workspaceService.create(newWorkspace).block();
Mono<Application> resultMono = importExportApplicationService.importApplicationInWorkspace(createdWorkspace.getId(), applicationJson);
// Check if expected exception is thrown
StepVerifier
.create(resultMono)
.expectErrorMatches(error -> error instanceof AppsmithException && error.getMessage().equals(AppsmithError.GENERIC_JSON_IMPORT_ERROR.getMessage(createdWorkspace.getId(), "")))
.verify();
// After the import application failed in the middle of execution after the application and pages are saved to DB
// check if the saved pages reverted after the exception
assertThat(mongoTemplate.count(new Query(), Application.class)).isEqualTo(applicationCount);
assertThat(mongoTemplate.count(new Query(), NewPage.class)).isEqualTo(pageCount);
assertThat(mongoTemplate.count(new Query(), NewAction.class)).isEqualTo(actionCount);
}
}

View File

@ -1,2 +1,2 @@
# embedded mongo DB version which is used during junit tests
de.flapdoodle.mongodb.embedded.version=5.0.14
de.flapdoodle.mongodb.embedded.version=5.0.5

View File

@ -1,6 +1,6 @@
#!/bin/sh
APPSMITH_MONGODB_URI="mongodb://localhost:27017/appsmith"
APPSMITH_MONGODB_URI="mongodb://localhost:27017/appsmith?replicaSet=appsmith-replica-set"
APPSMITH_REDIS_URL="redis://127.0.0.1:6379"