Skip to content

Commit

Permalink
Merge branch 'master' into mcp-canonical-command
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Aug 30, 2022
2 parents ad2e105 + 6aec44d commit c25a3ed
Show file tree
Hide file tree
Showing 41 changed files with 1,230 additions and 469 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import com.linkedin.datahub.graphql.resolvers.ingest.execution.CreateTestConnectionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.GetIngestionExecutionRequestResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.IngestionSourceExecutionRequestsResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.execution.RollbackIngestionResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.CreateSecretResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.DeleteSecretResolver;
import com.linkedin.datahub.graphql.resolvers.ingest.secret.GetSecretValuesResolver;
Expand Down Expand Up @@ -756,7 +757,7 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("createNativeUserInviteToken", new CreateNativeUserInviteTokenResolver(this.nativeUserService))
.dataFetcher("createNativeUserResetToken", new CreateNativeUserResetTokenResolver(this.nativeUserService))
.dataFetcher("batchUpdateSoftDeleted", new BatchUpdateSoftDeletedResolver(this.entityService))

.dataFetcher("rollbackIngestion", new RollbackIngestionResolver(this.entityClient))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public static IngestionConfig mapIngestionSourceConfig(final DataHubIngestionSou
result.setRecipe(config.getRecipe());
result.setVersion(config.getVersion());
result.setExecutorId(config.getExecutorId());
result.setDebugMode(config.isDebugMode());
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class CreateIngestionExecutionRequestResolver implements DataFetcher<Comp
private static final String MANUAL_EXECUTION_SOURCE_NAME = "MANUAL_INGESTION_SOURCE";
private static final String RECIPE_ARG_NAME = "recipe";
private static final String VERSION_ARG_NAME = "version";
private static final String DEBUG_MODE_ARG_NAME = "debug_mode";

private final EntityClient _entityClient;
private final IngestionConfiguration _ingestionConfiguration;
Expand Down Expand Up @@ -112,6 +113,11 @@ public CompletableFuture<String> get(final DataFetchingEnvironment environment)
if (ingestionSourceInfo.getConfig().hasVersion()) {
arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().getVersion());
}
String debugMode = "false";
if (ingestionSourceInfo.getConfig().hasDebugMode()) {
debugMode = ingestionSourceInfo.getConfig().isDebugMode() ? "true" : "false";
}
arguments.put(DEBUG_MODE_ARG_NAME, debugMode);
execInput.setArgs(new StringMap(arguments));

proposal.setEntityType(Constants.EXECUTION_REQUEST_ENTITY_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.datahub.graphql.resolvers.ingest.execution;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.RollbackIngestionInput;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
import com.linkedin.entity.client.EntityClient;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;

import java.util.concurrent.CompletableFuture;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.bindArgument;

public class RollbackIngestionResolver implements DataFetcher<CompletableFuture<Boolean>> {
private final EntityClient _entityClient;

public RollbackIngestionResolver(final EntityClient entityClient) {
_entityClient = entityClient;
}

@Override
public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();

return CompletableFuture.supplyAsync(() -> {

if (!IngestionAuthUtils.canManageIngestion(context)) {
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

final RollbackIngestionInput input = bindArgument(environment.getArgument("input"), RollbackIngestionInput.class);
final String runId = input.getRunId();

rollbackIngestion(runId, context);
return true;
});
}

public CompletableFuture<Boolean> rollbackIngestion(final String runId, final QueryContext context) {
return CompletableFuture.supplyAsync(() -> {
try {
_entityClient.rollbackIngestion(runId, context.getAuthentication());
return true;
} catch (Exception e) {
throw new RuntimeException("Failed to rollback ingestion execution", e);
}
});

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig
if (input.getExecutorId() != null) {
result.setExecutorId(input.getExecutorId());
}
result.setDebugMode(input.getDebugMode());
return result;
}

Expand Down
26 changes: 26 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ extend type Mutation {
input: Input required for creating a test connection request
"""
createTestConnectionRequest(input: CreateTestConnectionRequestInput!): String

"""
Rollback a specific ingestion execution run based on its runId
"""
rollbackIngestion(input: RollbackIngestionInput!): String
}

"""
Expand Down Expand Up @@ -317,6 +322,11 @@ type IngestionConfig {
Advanced: The version of the ingestion framework to use
"""
version: String

"""
Advanced: Whether or not to run ingestion in debug mode
"""
debugMode: Boolean
}

"""
Expand Down Expand Up @@ -453,6 +463,11 @@ input UpdateIngestionSourceConfigInput {
The id of the executor to use for executing the recipe
"""
executorId: String!

"""
Whether or not to run ingestion in debug mode
"""
debugMode: Boolean!
}

"""
Expand Down Expand Up @@ -544,3 +559,14 @@ type SecretValue {
"""
value: String!
}

"""
Input for rolling back an ingestion execution
"""
input RollbackIngestionInput {
"""
An ingestion run ID
"""
runId: String!
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.linkedin.datahub.graphql.resolvers.ingest.execution;

import com.datahub.authentication.Authentication;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.RollbackIngestionInput;
import com.linkedin.entity.client.EntityClient;
import graphql.schema.DataFetchingEnvironment;
import org.mockito.Mockito;
import org.testng.annotations.Test;

import static com.linkedin.datahub.graphql.resolvers.ingest.IngestTestUtils.*;
import static org.testng.Assert.*;


public class RollbackIngestionResolverTest {

private static final String RUN_ID = "testRunId";
private static final RollbackIngestionInput TEST_INPUT = new RollbackIngestionInput(RUN_ID);

@Test
public void testGetSuccess() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
RollbackIngestionResolver resolver = new RollbackIngestionResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

Boolean result = resolver.get(mockEnv).get();
assertTrue(result);
}

@Test
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
RollbackIngestionResolver resolver = new RollbackIngestionResolver(mockClient);

// Execute resolver
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
QueryContext mockContext = getMockDenyContext();
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(TEST_INPUT);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(RuntimeException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).rollbackIngestion(
Mockito.eq(RUN_ID),
Mockito.any(Authentication.class));
}

@Test
public void testRollbackIngestionMethod() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
RollbackIngestionResolver resolver = new RollbackIngestionResolver(mockClient);

QueryContext mockContext = getMockAllowContext();
resolver.rollbackIngestion(RUN_ID, mockContext).get();

Mockito.verify(mockClient, Mockito.times(1)).rollbackIngestion(
Mockito.eq(RUN_ID),
Mockito.any(Authentication.class)
);
}

@Test
public void testGetEntityClientException() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);
Mockito.doThrow(RuntimeException.class).when(mockClient).rollbackIngestion(
Mockito.any(),
Mockito.any(Authentication.class));

RollbackIngestionResolver resolver = new RollbackIngestionResolver(mockClient);

QueryContext mockContext = getMockAllowContext();

assertThrows(RuntimeException.class, () -> resolver.rollbackIngestion(RUN_ID, mockContext).join());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class UpsertIngestionSourceResolverTest {
"Test source",
"mysql", "Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id")
new UpdateIngestionSourceConfigInput("my test recipe", "0.8.18", "executor id", false)
);

@Test
Expand Down Expand Up @@ -58,6 +58,7 @@ public void testGetSuccess() throws Exception {
.setRecipe(TEST_INPUT.getConfig().getRecipe())
.setVersion(TEST_INPUT.getConfig().getVersion())
.setExecutorId(TEST_INPUT.getConfig().getExecutorId())
.setDebugMode(TEST_INPUT.getConfig().getDebugMode())
);

Mockito.verify(mockClient, Mockito.times(1)).ingestProposal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
public class RestoreIndices implements Upgrade {
public static final String BATCH_SIZE_ARG_NAME = "batchSize";
public static final String BATCH_DELAY_MS_ARG_NAME = "batchDelayMs";
public static final String NUM_THREADS_ARG_NAME = "numThreads";
public static final String ASPECT_NAME_ARG_NAME = "aspectName";
public static final String URN_ARG_NAME = "urn";

private final List<UpgradeStep> _steps;

Expand Down
Loading

0 comments on commit c25a3ed

Please sign in to comment.