I'm trying to execute a method asynchronously within an existing reactive chain in my Project Reactor-based application. The method doUpdateLayoutInAsync is intended to perform a heavy background task, but it seems like my approach isn't working as expected. Here's my current implementation:
public Mono<Boolean> publishPackage(String branchedPackageId) {
PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
publishingMetaDTO.setPublishEvent(true);
return packageRepository
.findById(branchedPackageId, packagePermission.getPublishPermission())
.switchIfEmpty(Mono.error(new AppsmithException(
AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
.flatMap(originalPackage -> {
String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());
Package packageToBePublished = constructPackageToBePublished(originalPackage);
originalPackage.setVersion(nextVersion);
originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
publishingMetaDTO.setOriginPackageId(branchedPackageId);
publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());
Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage);
Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished);
return unsetCurrentLatestMono
.then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
.flatMap(tuple2 -> {
Package publishedPackage = tuple2.getT2();
publishingMetaDTO.setPublishedPackage(publishedPackage);
return modulePackagePublishableService
.publishEntities(publishingMetaDTO)
.flatMap(publishedModules -> {
if (publishedModules.isEmpty()) {
return Mono.error(new AppsmithException(
AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
originalPackage.getUnpublishedPackage().getName()));
}
return moduleInstancePackagePublishableService
.publishEntities(publishingMetaDTO)
.then(Mono.defer(() ->
newActionPackagePublishableService.publishEntities(publishingMetaDTO))
.then(Mono.defer(() ->
actionCollectionPackagePublishableService
.publishEntities(publishingMetaDTO))));
})
.then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
})
.as(transactionalOperator::transactional)
.then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
});
}
private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
.flatMap(pageId -> updateLayoutService
.updatePageLayoutsByPageId(pageId)
.onErrorResume(throwable -> {
log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage());
return Mono.just(pageId);
}))
.collectList();
// Running the updateLayoutsMono task asynchronously
updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();
return Mono.just(Boolean.TRUE);
}
Issue: I want doUpdateLayoutInAsync
to run in the background while the rest of the reactive chain completes. However, the method seems to execute synchronously, and the reactive chain does not continue as expected.
Question: How can I ensure that doUpdateLayoutInAsync
runs asynchronously and does not block the continuation of the reactive chain?
Top comments (0)