Skip to content

Commit

Permalink
refactor datasource import flow to add support for dryOps queries
Browse files Browse the repository at this point in the history
  • Loading branch information
AnaghHegde committed Jun 26, 2024
1 parent 854d3f0 commit 0853b38
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,6 @@ public class FieldNameCE {
public static final String ARTIFACT_CONTEXT = "artifactContext";
public static final String ARTIFACT_ID = "artifactId";
public static final String BODY = "body";

public static final String CREATE = "save";
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface DatasourceServiceCE {
Expand Down Expand Up @@ -39,6 +40,8 @@ public interface DatasourceServiceCE {

Mono<Datasource> save(Datasource datasource);

Mono<Datasource> save(Datasource datasource, boolean isDryOps);

/**
* Retrieves all datasources based on input params, currently only workspaceId.
* The retrieved datasources will contain configuration from the default environment,
Expand All @@ -63,7 +66,8 @@ public interface DatasourceServiceCE {

Mono<Datasource> create(Datasource datasource);

Mono<Datasource> createWithoutPermissions(Datasource datasource);
Mono<Datasource> createWithoutPermissions(
Datasource datasource, Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries);

Mono<Datasource> updateDatasourceStorage(
DatasourceStorageDTO datasourceStorageDTO, String activeEnvironmentId, Boolean IsUserRefreshedUpdate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,21 @@ public DatasourceServiceCEImpl(

@Override
public Mono<Datasource> create(Datasource datasource) {
return createEx(datasource, workspacePermission.getDatasourceCreatePermission());
return createEx(datasource, workspacePermission.getDatasourceCreatePermission(), false, null);
}

// TODO: Check usage
@Override
public Mono<Datasource> createWithoutPermissions(Datasource datasource) {
return createEx(datasource, null);
public Mono<Datasource> createWithoutPermissions(
Datasource datasource, Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries) {
return createEx(datasource, null, true, datasourceStorageDryRunQueries);
}

private Mono<Datasource> createEx(@NotNull Datasource datasource, AclPermission permission) {
private Mono<Datasource> createEx(
@NotNull Datasource datasource,
AclPermission permission,
boolean isDryOps,
Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries) {
// Validate incoming request
String workspaceId = datasource.getWorkspaceId();
if (!hasText(workspaceId)) {
Expand Down Expand Up @@ -193,7 +198,7 @@ private Mono<Datasource> createEx(@NotNull Datasource datasource, AclPermission
Mono<User> userMono = sessionUserService.getCurrentUser();
return generateAndSetDatasourcePolicies(userMono, datasource1, permission);
})
.flatMap(this::validateAndSaveDatasourceToRepository)
.flatMap(datasourceInDb -> validateAndSaveDatasourceToRepository(datasourceInDb, isDryOps))
.flatMap(savedDatasource ->
analyticsService.sendCreateEvent(savedDatasource, getAnalyticsProperties(savedDatasource)));
} else {
Expand All @@ -217,7 +222,18 @@ private Mono<Datasource> createEx(@NotNull Datasource datasource, AclPermission
return Mono.just(datasourceStorage);
}

return datasourceStorageService.create(datasourceStorage);
return datasourceStorageService
.create(datasourceStorage, true)
.map(datasourceStorage1 -> {
if (datasourceStorageDryRunQueries.containsKey(FieldName.CREATE)) {
datasourceStorageDryRunQueries
.get(FieldName.CREATE)
.add(datasourceStorage1);
} else {
datasourceStorageDryRunQueries.put(FieldName.CREATE, List.of(datasourceStorage1));
}
return datasourceStorage1;
});
})
.map(datasourceStorageService::createDatasourceStorageDTOFromDatasourceStorage)
.collectMap(DatasourceStorageDTO::getEnvironmentId)
Expand Down Expand Up @@ -303,7 +319,7 @@ public Mono<Datasource> updateDatasource(
copyNestedNonNullProperties(datasource, datasourceInDb);
return datasourceInDb;
})
.flatMap(this::validateAndSaveDatasourceToRepository)
.flatMap(datasourceInDb -> validateAndSaveDatasourceToRepository(datasourceInDb, false))
.map(savedDatasource -> {
// not required by client side in order to avoid updating it to a null storage,
// one alternative is that we find and send datasourceStorages along, but that is an expensive call
Expand Down Expand Up @@ -355,7 +371,7 @@ public Mono<Datasource> updateDatasourceStorage(
datasourceStorage.prepareTransientFields(dbDatasource);

return datasourceStorageService
.updateDatasourceStorage(datasourceStorage, activeEnvironmentId, Boolean.TRUE)
.updateDatasourceStorage(datasourceStorage, activeEnvironmentId, Boolean.TRUE, false)
.map(datasourceStorageService::createDatasourceStorageDTOFromDatasourceStorage)
.map(datasourceStorageDTO1 -> {
dbDatasource.getDatasourceStorages().put(trueEnvironmentId, datasourceStorageDTO1);
Expand All @@ -373,12 +389,29 @@ public Mono<Datasource> save(Datasource datasource) {
return repository.save(datasource);
}

private Mono<Datasource> validateAndSaveDatasourceToRepository(Datasource datasource) {
@Override
public Mono<Datasource> save(Datasource datasource, boolean isDryOps) {
if (datasource.getGitSyncId() == null) {
datasource.setGitSyncId(
datasource.getWorkspaceId() + "_" + Instant.now().toString());
}
datasource.updateForBulkWriteOperation();
return Mono.just(datasource);
}

private Mono<Datasource> validateAndSaveDatasourceToRepository(Datasource datasource, boolean isDryOps) {

return Mono.just(datasource)
.flatMap(this::validateDatasource)
.flatMap(unsavedDatasource -> {
return repository.save(unsavedDatasource).map(savedDatasource -> {
Mono<Datasource> datasourceMono;
if (isDryOps) {
unsavedDatasource.updateForBulkWriteOperation();
datasourceMono = Mono.just(unsavedDatasource);
} else {
datasourceMono = repository.save(unsavedDatasource);
}
return datasourceMono.map(savedDatasource -> {
// datasource.pluginName is a transient field. It was set by validateDatasource method
// object from db will have pluginName=null so set it manually from the unsaved datasource obj
savedDatasource.setPluginName(unsavedDatasource.getPluginName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,16 @@ private Mono<Map<String, String>> importDatasources(
// Don't update the datasource configuration for already available datasources
existingDatasource.setDatasourceConfiguration(null);
return datasourceService
.save(existingDatasource)
.map(createdDatasource ->
Tuples.of(createdDatasource.getName(), createdDatasource.getId()));
.save(existingDatasource, true)
.map(createdDatasource -> {
// Add dry run queries for the datasource
addDryOpsForEntity(
FieldName.CREATE,
mappedImportableResourcesDTO.getDatasourceDryRunQueries(),
createdDatasource);
return Tuples.of(
createdDatasource.getName(), createdDatasource.getId());
});
} else {
// This is explicitly copied over from the map we created before
datasourceStorage.setPluginId(pluginMap.get(datasourceStorage.getPluginId()));
Expand All @@ -235,9 +242,16 @@ private Mono<Map<String, String>> importDatasources(
datasourceStorage,
workspace,
environmentId,
importingMetaDTO.getPermissionProvider())
.map(createdDatasource ->
Tuples.of(importedDatasourceName, createdDatasource.getId()));
importingMetaDTO.getPermissionProvider(),
mappedImportableResourcesDTO)
.map(createdDatasource -> {
// Add dry run queries for the datasource
addDryOpsForEntity(
FieldName.CREATE,
mappedImportableResourcesDTO.getDatasourceDryRunQueries(),
createdDatasource);
return Tuples.of(importedDatasourceName, createdDatasource.getId());
});
}
});
})
Expand Down Expand Up @@ -266,7 +280,8 @@ private Mono<Datasource> createUniqueDatasourceIfNotPresent(
DatasourceStorage datasourceStorage,
Workspace workspace,
String environmentId,
ImportArtifactPermissionProvider permissionProvider) {
ImportArtifactPermissionProvider permissionProvider,
MappedImportableResourcesDTO mappedImportableResourcesDTO) {
/*
1. If same datasource is present return
2. If unable to find the datasource create a new datasource with unique name and return
Expand Down Expand Up @@ -311,11 +326,15 @@ private Mono<Datasource> createUniqueDatasourceIfNotPresent(
getUniqueSuffixForDuplicateNameEntity(duplicateNameDatasource, workspace.getId()))
.map(dsName -> {
datasourceStorage.setName(datasourceStorage.getName() + dsName);

return datasourceService.createDatasourceFromDatasourceStorage(datasourceStorage);
})
.switchIfEmpty(Mono.just(
datasourceService.createDatasourceFromDatasourceStorage(datasourceStorage)))
.flatMap(datasourceService::createWithoutPermissions);
// DRY RUN queries are not saved, so we need to create them separately at the import service
// solution
.flatMap(datasource -> datasourceService.createWithoutPermissions(
datasource, mappedImportableResourcesDTO.getDatasourceStorageDryRunQueries()));
}))
.onErrorResume(throwable -> {
log.error("failed to import datasource", throwable);
Expand Down Expand Up @@ -386,4 +405,13 @@ private Mono<String> getUniqueSuffixForDuplicateNameEntity(BaseDomain sourceEnti
public Flux<Datasource> getEntitiesPresentInWorkspace(String workspaceId) {
return datasourceService.getAllByWorkspaceIdWithStorages(workspaceId, null);
}

private void addDryOpsForEntity(
String queryType, Map<String, List<Datasource>> dryRunOpsMap, Datasource createdDatasource) {
if (dryRunOpsMap.containsKey(queryType)) {
dryRunOpsMap.get(queryType).add(createdDatasource);
} else {
dryRunOpsMap.put(queryType, List.of(createdDatasource));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public interface DatasourceStorageServiceCE {

Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage);

Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage, boolean isDryOps);

Mono<DatasourceStorage> save(DatasourceStorage datasourceStorage);

Mono<DatasourceStorage> archive(DatasourceStorage datasourceStorage);
Expand All @@ -30,7 +32,10 @@ public interface DatasourceStorageServiceCE {
Mono<DatasourceStorage> findStrictlyByDatasourceIdAndEnvironmentId(String datasourceId, String environmentId);

Mono<DatasourceStorage> updateDatasourceStorage(
DatasourceStorage datasourceStorage, String activeEnvironmentId, Boolean IsUserRefreshedUpdate);
DatasourceStorage datasourceStorage,
String activeEnvironmentId,
Boolean IsUserRefreshedUpdate,
boolean isDryOps);

Mono<DatasourceStorage> validateDatasourceStorage(DatasourceStorage datasourceStorage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,16 @@ public DatasourceStorageServiceCEImpl(
@Override
public Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage) {
return this.checkDuplicateDatasourceStorage(datasourceStorage)
.then(this.validateAndSaveDatasourceStorageToRepository(datasourceStorage))
.then(validateAndSaveDatasourceStorageToRepository(datasourceStorage, false))
.flatMap(this::populateHintMessages) // For REST API datasource create flow.
.flatMap(savedDatasourceStorage -> analyticsService.sendCreateEvent(
savedDatasourceStorage, getAnalyticsProperties(savedDatasourceStorage)));
}

@Override
public Mono<DatasourceStorage> create(DatasourceStorage datasourceStorage, boolean isDryOps) {
return this.checkDuplicateDatasourceStorage(datasourceStorage)
.then(validateAndSaveDatasourceStorageToRepository(datasourceStorage, isDryOps))
.flatMap(this::populateHintMessages) // For REST API datasource create flow.
.flatMap(savedDatasourceStorage -> analyticsService.sendCreateEvent(
savedDatasourceStorage, getAnalyticsProperties(savedDatasourceStorage)));
Expand Down Expand Up @@ -126,7 +135,10 @@ public Mono<DatasourceStorage> findStrictlyByDatasourceIdAndEnvironmentId(

@Override
public Mono<DatasourceStorage> updateDatasourceStorage(
DatasourceStorage datasourceStorage, String activeEnvironmentId, Boolean isUserRefreshedUpdate) {
DatasourceStorage datasourceStorage,
String activeEnvironmentId,
Boolean isUserRefreshedUpdate,
boolean isDryOps) {
String datasourceId = datasourceStorage.getDatasourceId();
String environmentId = datasourceStorage.getEnvironmentId();

Expand All @@ -143,7 +155,8 @@ public Mono<DatasourceStorage> updateDatasourceStorage(
}
return dbStorage;
})
.flatMap(this::validateAndSaveDatasourceStorageToRepository)
.flatMap(datasourceStorage1 ->
validateAndSaveDatasourceStorageToRepository(datasourceStorage1, isDryOps))
.flatMap(savedDatasourceStorage -> {
Map<String, Object> analyticsProperties = getAnalyticsProperties(savedDatasourceStorage);
Boolean isUserInvokedUpdate = TRUE.equals(isUserRefreshedUpdate) ? TRUE : FALSE;
Expand Down Expand Up @@ -211,14 +224,20 @@ public Mono<DatasourceStorage> validateDatasourceConfiguration(DatasourceStorage
});
}

private Mono<DatasourceStorage> validateAndSaveDatasourceStorageToRepository(DatasourceStorage datasourceStorage) {
private Mono<DatasourceStorage> validateAndSaveDatasourceStorageToRepository(
DatasourceStorage datasourceStorage, boolean isDryOps) {

return Mono.just(datasourceStorage)
.map(this::sanitizeDatasourceStorage)
.flatMap(datasourceStorage1 -> validateDatasourceStorage(datasourceStorage1))
.flatMap(this::executePreSaveActions)
.flatMap(unsavedDatasourceStorage ->
repository.save(unsavedDatasourceStorage).thenReturn(unsavedDatasourceStorage));
.flatMap(unsavedDatasourceStorage -> {
if (isDryOps) {
unsavedDatasourceStorage.updateForBulkWriteOperation();
return Mono.just(unsavedDatasourceStorage);
}
return repository.save(unsavedDatasourceStorage).thenReturn(unsavedDatasourceStorage);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.appsmith.server.dtos.ce;

import com.appsmith.external.models.Datasource;
import com.appsmith.external.models.DatasourceStorage;
import com.appsmith.server.domains.Context;
import com.appsmith.server.dtos.CustomJSLibContextDTO;
import com.appsmith.server.dtos.ImportActionCollectionResultDTO;
Expand Down Expand Up @@ -43,4 +45,9 @@ public class MappedImportableResourcesCE_DTO {

// This is being used to carry the resources from ArtifactExchangeJson
Map<String, Object> resourceStoreFromArtifactExchangeJson = new HashMap<>();

// Dry ops queries
Map<String, List<Datasource>> datasourceDryRunQueries = new HashMap<>();

Map<String, List<DatasourceStorage>> datasourceStorageDryRunQueries = new HashMap<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Mono<Object> updateDatasourceAndSetAuthentication(Object connection, Data
.setAuthentication(updatableConnection.getAuthenticationDTO(
datasourceStorage.getDatasourceConfiguration().getAuthentication()));
datasourceStorageMono = datasourceStorageService.updateDatasourceStorage(
datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE);
datasourceStorage, datasourceStorage.getEnvironmentId(), Boolean.FALSE, false);
}
return datasourceStorageMono.thenReturn(connection);
}
Expand Down

0 comments on commit 0853b38

Please sign in to comment.