Skip to content

Commit

Permalink
chore: refactor datasource import flow to add support for transaction…
Browse files Browse the repository at this point in the history
… in pg (#34514)

## Description


## Automation

/ok-to-test tags="@tag.Git"

### 🔍 Cypress test results
<!-- This is an auto-generated comment: Cypress test results  -->
> [!TIP]
> 🟢 🟢 🟢 All cypress tests have passed! 🎉 🎉 🎉
> Workflow run:
<https://github.com/appsmithorg/appsmith/actions/runs/9772380637>
> Commit: 1f5ab41
> <a
href="https://internal.appsmith.com/app/cypress-dashboard/rundetails-65890b3c81d7400d08fa9ee5?branch=master&workflowId=9772380637&attempt=1"
target="_blank">Cypress dashboard</a>.
> Tags: `@tag.Git`
<!-- end of auto-generated comment: Cypress test results  -->




## Communication
Should the DevRel and Marketing teams inform users about this change?
- [ ] Yes
- [ ] No


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced support for dry run queries in datasource operations. Users
can now perform dry runs when creating, saving, or importing
datasources.

- **Improvements**
- Enhanced datasource import functionality by adding logic to handle and
save dry run queries.
- Improved validation and correction processes for datasources during
action imports.

- **Internal Enhancements**
- Added a new `DryOperationRepository` for managing dry run operations
for datasources and datasource storage.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
AnaghHegde authored Jul 3, 2024
1 parent 0b4d664 commit ffc09cf
Show file tree
Hide file tree
Showing 19 changed files with 273 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,6 @@ public class FieldNameCE {
public static final String ARTIFACT_CONTEXT = "artifactContext";
public static final String ARTIFACT_ID = "artifactId";
public static final String BODY = "body";

public static final String CREATE = "save";
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface DatasourceServiceCE {
Expand All @@ -36,7 +38,7 @@ public interface DatasourceServiceCE {

Mono<Set<MustacheBindingToken>> extractKeysFromDatasource(Datasource datasource);

Mono<Datasource> save(Datasource datasource);
Mono<Datasource> save(Datasource datasource, boolean isDryOps);

/**
* Retrieves all datasources based on input params, currently only workspaceId.
Expand All @@ -58,10 +60,15 @@ public interface DatasourceServiceCE {
*/
Flux<Datasource> getAllByWorkspaceIdWithStorages(String workspaceId, AclPermission permission);

Flux<Datasource> saveAll(List<Datasource> datasourceList);

Mono<Datasource> create(Datasource datasource);

Mono<Datasource> createWithoutPermissions(Datasource datasource);

Mono<Datasource> createWithoutPermissions(
Datasource datasource, Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries);

Mono<Datasource> updateDatasourceStorage(
DatasourceStorageDTO datasourceStorageDTO, String activeEnvironmentId, Boolean IsUserRefreshedUpdate);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.appsmith.server.domains.Plugin;
import com.appsmith.server.domains.User;
import com.appsmith.server.domains.Workspace;
import com.appsmith.server.dtos.DBOpsType;
import com.appsmith.server.exceptions.AppsmithError;
import com.appsmith.server.exceptions.AppsmithException;
import com.appsmith.server.helpers.PluginExecutorHelper;
Expand Down Expand Up @@ -140,16 +141,26 @@ public DatasourceServiceCEImpl(

@Override
public Mono<Datasource> create(Datasource datasource) {
return createEx(datasource, workspacePermission.getDatasourceCreatePermission());
return createEx(datasource, workspacePermission.getDatasourceCreatePermission(), false, null);
}

// TODO: Check usage
@Override
public Mono<Datasource> createWithoutPermissions(
Datasource datasource, Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries) {
return createEx(datasource, null, true, datasourceStorageDryRunQueries);
}

@Override
public Mono<Datasource> createWithoutPermissions(Datasource datasource) {
return createEx(datasource, null);
return createEx(datasource, null, false, null);
}

private Mono<Datasource> createEx(@NotNull Datasource datasource, AclPermission permission) {
private Mono<Datasource> createEx(
@NotNull Datasource datasource,
AclPermission permission,
boolean isDryOps,
Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries) {
// Validate incoming request
String workspaceId = datasource.getWorkspaceId();
if (!hasText(workspaceId)) {
Expand Down Expand Up @@ -193,7 +204,7 @@ private Mono<Datasource> createEx(@NotNull Datasource datasource, AclPermission
Mono<User> userMono = sessionUserService.getCurrentUser();
return generateAndSetDatasourcePolicies(userMono, datasource1, permission);
})
.flatMap(this::validateAndSaveDatasourceToRepository)
.flatMap(datasourceInDb -> validateAndSaveDatasourceToRepository(datasourceInDb, isDryOps))
.flatMap(savedDatasource ->
analyticsService.sendCreateEvent(savedDatasource, getAnalyticsProperties(savedDatasource)));
} else {
Expand All @@ -217,7 +228,16 @@ private Mono<Datasource> createEx(@NotNull Datasource datasource, AclPermission
return Mono.just(datasourceStorage);
}

return datasourceStorageService.create(datasourceStorage);
return datasourceStorageService
.create(datasourceStorage, isDryOps)
.map(datasourceStorage1 -> {
if (datasourceStorageDryRunQueries != null && isDryOps) {
datasourceStorageDryRunQueries
.computeIfAbsent(DBOpsType.SAVE.name(), k -> new ArrayList<>())
.add(datasourceStorage1);
}
return datasourceStorage1;
});
})
.map(datasourceStorageService::createDatasourceStorageDTOFromDatasourceStorage)
.collectMap(DatasourceStorageDTO::getEnvironmentId)
Expand Down Expand Up @@ -303,7 +323,7 @@ public Mono<Datasource> updateDatasource(
copyNestedNonNullProperties(datasource, datasourceInDb);
return datasourceInDb;
})
.flatMap(this::validateAndSaveDatasourceToRepository)
.flatMap(datasourceInDb -> validateAndSaveDatasourceToRepository(datasourceInDb, false))
.map(savedDatasource -> {
// not required by client side in order to avoid updating it to a null storage,
// one alternative is that we find and send datasourceStorages along, but that is an expensive call
Expand Down Expand Up @@ -355,7 +375,7 @@ public Mono<Datasource> updateDatasourceStorage(
datasourceStorage.prepareTransientFields(dbDatasource);

return datasourceStorageService
.updateDatasourceStorage(datasourceStorage, activeEnvironmentId, Boolean.TRUE)
.updateDatasourceStorage(datasourceStorage, activeEnvironmentId, Boolean.TRUE, false)
.map(datasourceStorageService::createDatasourceStorageDTOFromDatasourceStorage)
.map(datasourceStorageDTO1 -> {
dbDatasource.getDatasourceStorages().put(trueEnvironmentId, datasourceStorageDTO1);
Expand All @@ -365,20 +385,31 @@ public Mono<Datasource> updateDatasourceStorage(
}

@Override
public Mono<Datasource> save(Datasource datasource) {
public Mono<Datasource> save(Datasource datasource, boolean isDryOps) {
if (datasource.getGitSyncId() == null) {
datasource.setGitSyncId(
datasource.getWorkspaceId() + "_" + Instant.now().toString());
}
if (isDryOps) {
datasource.updateForBulkWriteOperation();
return Mono.just(datasource);
}
return repository.save(datasource);
}

private Mono<Datasource> validateAndSaveDatasourceToRepository(Datasource datasource) {
private Mono<Datasource> validateAndSaveDatasourceToRepository(Datasource datasource, boolean isDryOps) {

return Mono.just(datasource)
.flatMap(this::validateDatasource)
.flatMap(unsavedDatasource -> {
return repository.save(unsavedDatasource).map(savedDatasource -> {
Mono<Datasource> datasourceMono;
if (isDryOps) {
unsavedDatasource.updateForBulkWriteOperation();
datasourceMono = Mono.just(unsavedDatasource);
} else {
datasourceMono = repository.save(unsavedDatasource);
}
return datasourceMono.map(savedDatasource -> {
// datasource.pluginName is a transient field. It was set by validateDatasource method
// object from db will have pluginName=null so set it manually from the unsaved datasource obj
savedDatasource.setPluginName(unsavedDatasource.getPluginName());
Expand Down Expand Up @@ -852,6 +883,15 @@ public Mono<Datasource> archiveById(String id) {
});
}

@Override
public Flux<Datasource> saveAll(List<Datasource> datasourceList) {
datasourceList.stream()
.filter(datasource -> datasource.getGitSyncId() == null)
.forEach(datasource -> datasource.setGitSyncId(
datasource.getWorkspaceId() + "_" + Instant.now().toString()));
return repository.saveAll(datasourceList);
}

private Mono<PluginExecutor> findPluginExecutor(String pluginId) {
final Mono<Plugin> pluginMono = pluginService.findById(pluginId).cache();
return pluginExecutorHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.appsmith.server.domains.Artifact;
import com.appsmith.server.domains.Workspace;
import com.appsmith.server.dtos.ArtifactExchangeJson;
import com.appsmith.server.dtos.DBOpsType;
import com.appsmith.server.dtos.ImportingMetaDTO;
import com.appsmith.server.dtos.MappedImportableResourcesDTO;
import com.appsmith.server.exceptions.AppsmithError;
Expand Down Expand Up @@ -211,9 +212,16 @@ private Mono<Map<String, String>> importDatasources(
// Don't update the datasource configuration for already available datasources
existingDatasource.setDatasourceConfiguration(null);
return datasourceService
.save(existingDatasource)
.map(createdDatasource ->
Tuples.of(createdDatasource.getName(), createdDatasource.getId()));
.save(existingDatasource, true)
.map(createdDatasource -> {
// Add dry run queries for the datasource
addDryOpsForEntity(
DBOpsType.SAVE,
mappedImportableResourcesDTO.getDatasourceDryRunQueries(),
createdDatasource);
return Tuples.of(
createdDatasource.getName(), createdDatasource.getId());
});
} else {
// This is explicitly copied over from the map we created before
datasourceStorage.setPluginId(pluginMap.get(datasourceStorage.getPluginId()));
Expand All @@ -235,9 +243,16 @@ private Mono<Map<String, String>> importDatasources(
datasourceStorage,
workspace,
environmentId,
importingMetaDTO.getPermissionProvider())
.map(createdDatasource ->
Tuples.of(importedDatasourceName, createdDatasource.getId()));
importingMetaDTO.getPermissionProvider(),
mappedImportableResourcesDTO)
.map(createdDatasource -> {
// Add dry run queries for the datasource
addDryOpsForEntity(
DBOpsType.SAVE,
mappedImportableResourcesDTO.getDatasourceDryRunQueries(),
createdDatasource);
return Tuples.of(importedDatasourceName, createdDatasource.getId());
});
}
});
})
Expand Down Expand Up @@ -266,7 +281,8 @@ private Mono<Datasource> createUniqueDatasourceIfNotPresent(
DatasourceStorage datasourceStorage,
Workspace workspace,
String environmentId,
ImportArtifactPermissionProvider permissionProvider) {
ImportArtifactPermissionProvider permissionProvider,
MappedImportableResourcesDTO mappedImportableResourcesDTO) {
/*
1. If same datasource is present return
2. If unable to find the datasource create a new datasource with unique name and return
Expand Down Expand Up @@ -311,11 +327,15 @@ private Mono<Datasource> createUniqueDatasourceIfNotPresent(
getUniqueSuffixForDuplicateNameEntity(duplicateNameDatasource, workspace.getId()))
.map(dsName -> {
datasourceStorage.setName(datasourceStorage.getName() + dsName);

return datasourceService.createDatasourceFromDatasourceStorage(datasourceStorage);
})
.switchIfEmpty(Mono.just(
datasourceService.createDatasourceFromDatasourceStorage(datasourceStorage)))
.flatMap(datasourceService::createWithoutPermissions);
// DRY RUN queries are not saved, so we need to create them separately at the import service
// solution
.flatMap(datasource -> datasourceService.createWithoutPermissions(
datasource, mappedImportableResourcesDTO.getDatasourceStorageDryRunQueries()));
}))
.onErrorResume(throwable -> {
log.error("failed to import datasource", throwable);
Expand Down Expand Up @@ -386,4 +406,9 @@ private Mono<String> getUniqueSuffixForDuplicateNameEntity(BaseDomain sourceEnti
public Flux<Datasource> getEntitiesPresentInWorkspace(String workspaceId) {
return datasourceService.getAllByWorkspaceIdWithStorages(workspaceId, null);
}

private void addDryOpsForEntity(
DBOpsType queryType, Map<String, List<Datasource>> dryRunOpsMap, Datasource createdDatasource) {
dryRunOpsMap.computeIfAbsent(queryType.name(), k -> new ArrayList<>()).add(createdDatasource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public interface DatasourceStorageServiceCE {

Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage);

Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage, boolean isDryOps);

Mono<DatasourceStorage> save(DatasourceStorage datasourceStorage);

Mono<DatasourceStorage> archive(DatasourceStorage datasourceStorage);
Expand All @@ -32,6 +34,12 @@ public interface DatasourceStorageServiceCE {
Mono<DatasourceStorage> updateDatasourceStorage(
DatasourceStorage datasourceStorage, String activeEnvironmentId, Boolean IsUserRefreshedUpdate);

Mono<DatasourceStorage> updateDatasourceStorage(
DatasourceStorage datasourceStorage,
String activeEnvironmentId,
Boolean IsUserRefreshedUpdate,
boolean isDryOps);

Mono<DatasourceStorage> validateDatasourceStorage(DatasourceStorage datasourceStorage);

Mono<DatasourceStorage> validateDatasourceConfiguration(DatasourceStorage datasourceStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ public DatasourceStorageServiceCEImpl(
@Override
public Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage) {
return this.checkDuplicateDatasourceStorage(datasourceStorage)
.then(this.validateAndSaveDatasourceStorageToRepository(datasourceStorage))
.then(validateAndSaveDatasourceStorageToRepository(datasourceStorage, false))
.flatMap(this::populateHintMessages) // For REST API datasource create flow.
.flatMap(savedDatasourceStorage -> analyticsService.sendCreateEvent(
savedDatasourceStorage, getAnalyticsProperties(savedDatasourceStorage)));
}

@Override
public Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage, boolean isDryOps) {
return this.checkDuplicateDatasourceStorage(datasourceStorage)
.then(validateAndSaveDatasourceStorageToRepository(datasourceStorage, isDryOps))
.flatMap(this::populateHintMessages) // For REST API datasource create flow.
.flatMap(savedDatasourceStorage -> analyticsService.sendCreateEvent(
savedDatasourceStorage, getAnalyticsProperties(savedDatasourceStorage)));
Expand Down Expand Up @@ -127,6 +136,15 @@ public Mono<DatasourceStorage> findStrictlyByDatasourceIdAndEnvironmentId(
@Override
public Mono<DatasourceStorage> updateDatasourceStorage(
DatasourceStorage datasourceStorage, String activeEnvironmentId, Boolean isUserRefreshedUpdate) {
return updateDatasourceStorage(datasourceStorage, activeEnvironmentId, isUserRefreshedUpdate, false);
}

@Override
public Mono<DatasourceStorage> updateDatasourceStorage(
DatasourceStorage datasourceStorage,
String activeEnvironmentId,
Boolean isUserRefreshedUpdate,
boolean isDryOps) {
String datasourceId = datasourceStorage.getDatasourceId();
String environmentId = datasourceStorage.getEnvironmentId();

Expand All @@ -143,7 +161,8 @@ public Mono<DatasourceStorage> updateDatasourceStorage(
}
return dbStorage;
})
.flatMap(this::validateAndSaveDatasourceStorageToRepository)
.flatMap(datasourceStorage1 ->
validateAndSaveDatasourceStorageToRepository(datasourceStorage1, isDryOps))
.flatMap(savedDatasourceStorage -> {
Map<String, Object> analyticsProperties = getAnalyticsProperties(savedDatasourceStorage);
Boolean isUserInvokedUpdate = TRUE.equals(isUserRefreshedUpdate) ? TRUE : FALSE;
Expand Down Expand Up @@ -211,14 +230,20 @@ public Mono<DatasourceStorage> validateDatasourceConfiguration(DatasourceStorage
});
}

private Mono<DatasourceStorage> validateAndSaveDatasourceStorageToRepository(DatasourceStorage datasourceStorage) {
private Mono<DatasourceStorage> validateAndSaveDatasourceStorageToRepository(
DatasourceStorage datasourceStorage, boolean isDryOps) {

return Mono.just(datasourceStorage)
.map(this::sanitizeDatasourceStorage)
.flatMap(datasourceStorage1 -> validateDatasourceStorage(datasourceStorage1))
.flatMap(this::executePreSaveActions)
.flatMap(unsavedDatasourceStorage ->
repository.save(unsavedDatasourceStorage).thenReturn(unsavedDatasourceStorage));
.flatMap(unsavedDatasourceStorage -> {
if (isDryOps) {
unsavedDatasourceStorage.updateForBulkWriteOperation();
return Mono.just(unsavedDatasourceStorage);
}
return repository.save(unsavedDatasourceStorage).thenReturn(unsavedDatasourceStorage);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.appsmith.server.dtos;

public enum DBOpsType {
SAVE,

UPDATE,

DELETE
}
Loading

0 comments on commit ffc09cf

Please sign in to comment.