fix: eliminate concurrency bug in file deletion logic by removing side effects from parallel stream (#40744)
## Description Refactored the `updateEntitiesInRepo` method to avoid mutating a non-thread-safe `HashSet` (`filesInRepo`) inside a parallel stream. The previous implementation added file paths to `filesInRepo` as a side effect within a `.parallelStream()`, which could lead to race conditions and data corruption. The new approach uses a pure mapping operation to build the map and then derives the set of file paths, ensuring thread safety and improving code clarity. Fixes #`Issue Number` _or_ Fixes `Issue URL` > [!WARNING] > _If no issue exists, please create an issue first, and check with the maintainers if the issue is valid._ ## Automation /ok-to-test tags="@tag.All" ### 🔍 Cypress test results <!-- This is an auto-generated comment: Cypress test results --> > [!IMPORTANT] > 🟣 🟣 🟣 Your tests are running. > Tests running at: <https://github.com/appsmithorg/appsmith/actions/runs/15209038828> > Commit: 3e3b28385cb1153c08f7ee7aa66e04cea58f665b > Workflow: `PR Automation test suite` > Tags: `@tag.All` > Spec: `` > <hr>Fri, 23 May 2025 11:19:05 UTC <!-- end of auto-generated comment: Cypress test results --> ## Communication Should the DevRel and Marketing teams inform users about this change? - [ ] Yes - [ ] No <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Bug Fixes** - Improved error recovery when saving updates to the Git repository, ensuring changes are still applied even if an error occurs during the update process. - **Refactor** - Enhanced the process for detecting and handling updated or deleted files in the Git repository, leading to more reliable synchronization and cleaner repository state. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
parent
1552b1d98c
commit
e5ff4c6f13
|
|
@ -265,14 +265,27 @@ public class FileUtilsCEImpl implements FileInterface {
|
||||||
baseRepoSuffix, branchName)
|
baseRepoSuffix, branchName)
|
||||||
.name("constructGitResourceMapFromGitRepo");
|
.name("constructGitResourceMapFromGitRepo");
|
||||||
|
|
||||||
return gitResourceMapFromFSMono.flatMap(gitResourceMapFromFS -> {
|
return gitResourceMapFromFSMono
|
||||||
try {
|
.flatMap(gitResourceMapFromFS -> {
|
||||||
updateEntitiesInRepo(gitResourceMapFromDB, baseRepo, gitResourceMapFromFS);
|
try {
|
||||||
} catch (IOException e) {
|
updateEntitiesInRepo(gitResourceMapFromDB, baseRepo, gitResourceMapFromFS);
|
||||||
return Mono.error(e);
|
} catch (IOException e) {
|
||||||
}
|
return Mono.error(e);
|
||||||
return Mono.just(baseRepo);
|
}
|
||||||
});
|
return Mono.just(baseRepo);
|
||||||
|
})
|
||||||
|
.onErrorResume(error -> {
|
||||||
|
return Mono.defer(() -> {
|
||||||
|
return Mono.just(baseRepo).flatMap(baseRepo1 -> {
|
||||||
|
try {
|
||||||
|
updateEntitiesInRepoFallback(gitResourceMapFromDB, baseRepo);
|
||||||
|
return Mono.just(baseRepo1);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return Mono.error(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
})
|
})
|
||||||
.subscribeOn(scheduler);
|
.subscribeOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
@ -336,23 +349,79 @@ public class FileUtilsCEImpl implements FileInterface {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Map<GitResourceIdentity, Object> resourceMapFromDB = gitResourceMapFromDB.getGitResourceMap();
|
Map<GitResourceIdentity, Object> resourceMapFromDB = gitResourceMapFromDB.getGitResourceMap();
|
||||||
|
|
||||||
Set<String> filesInRepo = new HashSet<>();
|
Set<String> filesPathsFromDB = resourceMapFromDB.keySet().parallelStream()
|
||||||
Set<String> updatedFiles = new HashSet<>();
|
|
||||||
Set<String> filesInDB = resourceMapFromDB.keySet().parallelStream()
|
|
||||||
.map(gitResourceIdentity -> gitResourceIdentity.getFilePath())
|
.map(gitResourceIdentity -> gitResourceIdentity.getFilePath())
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
Map<String, Object> filesInFS = gitResourceMapFromFS.getGitResourceMap().entrySet().parallelStream()
|
Map<String, Object> filePathToObjectsFromFS =
|
||||||
|
gitResourceMapFromFS.getGitResourceMap().entrySet().parallelStream()
|
||||||
|
.collect(Collectors.toMap(entry -> entry.getKey().getFilePath(), entry -> entry.getValue()));
|
||||||
|
|
||||||
|
Set<String> filePathsFromFS = new HashSet<>(filePathToObjectsFromFS.keySet());
|
||||||
|
|
||||||
|
// Readme files shouldn't be modified/deleted/or updated.
|
||||||
|
filePathsFromFS.remove(README_FILE_NAME);
|
||||||
|
filePathsFromFS.removeAll(filesPathsFromDB);
|
||||||
|
|
||||||
|
// Delete all the files because they are no longer needed
|
||||||
|
// This covers both older structures of storing files and,
|
||||||
|
// legitimate changes in the artifact that might cause deletions
|
||||||
|
filePathsFromFS.stream().parallel().forEach(filePath -> {
|
||||||
|
try {
|
||||||
|
Files.deleteIfExists(baseRepo.resolve(filePath));
|
||||||
|
} catch (IOException e) {
|
||||||
|
// We ignore files that could not be deleted and expect to come back to this at a later point
|
||||||
|
// Just log the path for now
|
||||||
|
log.error("Unable to delete file at path: {}", filePath);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Now go through the resource map and based on resource type, check if the resource is modified before
|
||||||
|
// serialization
|
||||||
|
Set<String> newAndUpdatedFilePaths = resourceMapFromDB.entrySet().parallelStream()
|
||||||
.map(entry -> {
|
.map(entry -> {
|
||||||
filesInRepo.add(entry.getKey().getFilePath());
|
GitResourceIdentity key = entry.getKey();
|
||||||
return entry;
|
boolean resourceUpdated = true;
|
||||||
|
try {
|
||||||
|
resourceUpdated = fileOperations.hasFileChanged(
|
||||||
|
entry.getValue(), filePathToObjectsFromFS.get(key.getFilePath()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("Error while checking if file has changed", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (resourceUpdated) {
|
||||||
|
log.info("Resource updated: {}", key.getFilePath());
|
||||||
|
String filePath = key.getFilePath();
|
||||||
|
saveResourceCommon(entry.getValue(), baseRepo.resolve(filePath));
|
||||||
|
|
||||||
|
return filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
})
|
})
|
||||||
.collect(Collectors.toMap(entry -> entry.getKey().getFilePath(), entry -> entry.getValue()));
|
.filter(Objects::nonNull)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
|
Set<String> allFileChanges = new HashSet<>();
|
||||||
|
allFileChanges.addAll(newAndUpdatedFilePaths);
|
||||||
|
allFileChanges.addAll(filePathsFromFS);
|
||||||
|
return allFileChanges;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Set<String> updateEntitiesInRepoFallback(GitResourceMap gitResourceMap, Path baseRepo)
|
||||||
|
throws IOException {
|
||||||
|
ModifiedResources modifiedResources = gitResourceMap.getModifiedResources();
|
||||||
|
Map<GitResourceIdentity, Object> resourceMap = gitResourceMap.getGitResourceMap();
|
||||||
|
|
||||||
|
Set<String> filesInRepo = getExistingFilesInRepo(baseRepo);
|
||||||
|
|
||||||
|
Set<String> updatedFilesToBeSerialized = resourceMap.keySet().parallelStream()
|
||||||
|
.map(gitResourceIdentity -> gitResourceIdentity.getFilePath())
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
// Remove all files that need to be serialized from the existing files list, as well as the README file
|
// Remove all files that need to be serialized from the existing files list, as well as the README file
|
||||||
// What we are left with are all the files to be deleted
|
// What we are left with are all the files to be deleted
|
||||||
|
filesInRepo.removeAll(updatedFilesToBeSerialized);
|
||||||
filesInRepo.removeAll(filesInDB);
|
|
||||||
filesInRepo.remove(README_FILE_NAME);
|
filesInRepo.remove(README_FILE_NAME);
|
||||||
|
|
||||||
// Delete all the files because they are no longer needed
|
// Delete all the files because they are no longer needed
|
||||||
|
|
@ -370,19 +439,20 @@ public class FileUtilsCEImpl implements FileInterface {
|
||||||
|
|
||||||
// Now go through the resource map and based on resource type, check if the resource is modified before
|
// Now go through the resource map and based on resource type, check if the resource is modified before
|
||||||
// serialization
|
// serialization
|
||||||
resourceMapFromDB.entrySet().parallelStream()
|
// Or simply choose the mechanism for serialization
|
||||||
|
Map<GitResourceType, GitResourceType> modifiedResourcesTypes = getModifiedResourcesTypes();
|
||||||
|
return resourceMap.entrySet().parallelStream()
|
||||||
.map(entry -> {
|
.map(entry -> {
|
||||||
GitResourceIdentity key = entry.getKey();
|
GitResourceIdentity key = entry.getKey();
|
||||||
boolean resourceUpdated = true;
|
boolean resourceUpdated = true;
|
||||||
try {
|
if (modifiedResourcesTypes.containsKey(key.getResourceType()) && modifiedResources != null) {
|
||||||
|
GitResourceType comparisonType = modifiedResourcesTypes.get(key.getResourceType());
|
||||||
|
|
||||||
resourceUpdated =
|
resourceUpdated =
|
||||||
fileOperations.hasFileChanged(entry.getValue(), filesInFS.get(key.getFilePath()));
|
modifiedResources.isResourceUpdatedNew(comparisonType, key.getResourceIdentifier());
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("Error while checking if file has changed", e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (resourceUpdated) {
|
if (resourceUpdated) {
|
||||||
log.info("Resource updated: {}", key.getFilePath());
|
|
||||||
String filePath = key.getFilePath();
|
String filePath = key.getFilePath();
|
||||||
saveResourceCommon(entry.getValue(), baseRepo.resolve(filePath));
|
saveResourceCommon(entry.getValue(), baseRepo.resolve(filePath));
|
||||||
|
|
||||||
|
|
@ -392,7 +462,6 @@ public class FileUtilsCEImpl implements FileInterface {
|
||||||
})
|
})
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
return updatedFiles;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Set<String> updateEntitiesInRepo(ApplicationGitReference applicationGitReference, Path baseRepo) {
|
protected Set<String> updateEntitiesInRepo(ApplicationGitReference applicationGitReference, Path baseRepo) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user