From 6db9779abae329f7c7899f0f54e8c5ef5eb10b02 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Tue, 15 Dec 2020 16:35:29 -0500 Subject: [PATCH 01/13] Initial continuous upload support --- .../integration-sdk-cli/src/commands/run.ts | 13 ++- .../src/types/jobState.ts | 6 ++ .../integration-sdk-core/src/types/storage.ts | 12 ++- .../src/execution/dependencyGraph.ts | 14 ++- .../src/execution/executeIntegration.ts | 15 ++- .../src/execution/jobState.ts | 38 +++++++- .../src/execution/step.ts | 4 + .../src/execution/uploader.ts | 95 +++++++++++++++++++ .../FileSystemGraphObjectStore.ts | 36 +++++-- .../src/synchronization/index.ts | 53 ++++++----- 10 files changed, 242 insertions(+), 44 deletions(-) create mode 100644 packages/integration-sdk-runtime/src/execution/uploader.ts diff --git a/packages/integration-sdk-cli/src/commands/run.ts b/packages/integration-sdk-cli/src/commands/run.ts index 9e985c066..544e9cc32 100644 --- a/packages/integration-sdk-cli/src/commands/run.ts +++ b/packages/integration-sdk-cli/src/commands/run.ts @@ -13,11 +13,14 @@ import { getApiBaseUrl, getApiKeyFromEnvironment, initiateSynchronization, - uploadCollectedData, + // uploadCollectedData } from '@jupiterone/integration-sdk-runtime'; import { loadConfig } from '../config'; import * as log from '../log'; +import { createPersisterApiStepGraphObjectDataUploader } from '@jupiterone/integration-sdk-runtime/dist/src/execution/uploader'; + +const DEFAULT_UPLOAD_CONCURRENCY = 5; export function run() { return createCommand('run') @@ -70,6 +73,11 @@ export function run() { const invocationConfig = await loadConfig(); + const uploader = createPersisterApiStepGraphObjectDataUploader({ + synchronizationJobContext: synchronizationContext, + uploadConcurrency: DEFAULT_UPLOAD_CONCURRENCY, + }); + try { const executionResults = await executeIntegrationInstance( logger, @@ -82,6 +90,7 @@ export function run() { }, { enableSchemaValidation: true, + uploader, }, ); @@ -89,8 +98,6 @@ export function run() { log.displayExecutionResults(executionResults); - await uploadCollectedData(synchronizationContext); - const synchronizationResult = await finalizeSynchronization({ ...synchronizationContext, partialDatasets: executionResults.metadata.partialDatasets, diff --git a/packages/integration-sdk-core/src/types/jobState.ts b/packages/integration-sdk-core/src/types/jobState.ts index 54bae931e..f4e775efc 100644 --- a/packages/integration-sdk-core/src/types/jobState.ts +++ b/packages/integration-sdk-core/src/types/jobState.ts @@ -112,4 +112,10 @@ export interface JobState { * flushed to reduce memory consumption. */ flush: () => Promise; + + /** + * A job state may be created with a graph object uploader. This function + * resolves when all uploads have been completed. + */ + waitUntilUploadsComplete?: () => Promise; } diff --git a/packages/integration-sdk-core/src/types/storage.ts b/packages/integration-sdk-core/src/types/storage.ts index bd80f3626..2b37c38b6 100644 --- a/packages/integration-sdk-core/src/types/storage.ts +++ b/packages/integration-sdk-core/src/types/storage.ts @@ -11,11 +11,16 @@ import { Relationship } from './relationship'; * integration execution. */ export interface GraphObjectStore { - addEntities(stepId: string, newEntities: Entity[]): Promise; + addEntities( + stepId: string, + newEntities: Entity[], + onEntitiesFlushed?: (entities: Entity[]) => Promise, + ): Promise; addRelationships( stepId: string, newRelationships: Relationship[], + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, ): Promise; getEntity({ _key, _type }: GraphObjectLookupKey): Promise; @@ -30,5 +35,8 @@ export interface GraphObjectStore { iteratee: GraphObjectIteratee, ): Promise; - flush(): Promise; + flush( + onEntitiesFlushed?: (entities: Entity[]) => Promise, + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + ): Promise; } diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index ffb76b700..85c35f379 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -18,6 +18,7 @@ import { MemoryDataStore, TypeTracker, } from './jobState'; +import { StepGraphObjectDataUploader } from './uploader'; /** * This function accepts a list of steps and constructs a dependency graph @@ -70,12 +71,14 @@ export function executeStepDependencyGraph< stepStartStates, duplicateKeyTracker, graphObjectStore, + uploader, }: { executionContext: TExecutionContext; inputGraph: DepGraph>; stepStartStates: StepStartStates; duplicateKeyTracker: DuplicateKeyTracker; graphObjectStore: GraphObjectStore; + uploader?: StepGraphObjectDataUploader; }): Promise { // create a clone of the dependencyGraph because mutating // the input graph is icky @@ -255,6 +258,7 @@ export function executeStepDependencyGraph< graphObjectStore, dataStore, stepId, + uploader, }); const { logger } = context; @@ -284,6 +288,12 @@ export function executeStepDependencyGraph< status = StepResultStatus.FAILURE; } + await context.jobState.flush(); + + if (context.jobState.waitUntilUploadsComplete) { + await context.jobState.waitUntilUploadsComplete(); + } + updateStepResultStatus(stepId, status, typeTracker); enqueueLeafSteps(); } @@ -293,7 +303,6 @@ export function executeStepDependencyGraph< void promiseQueue .onIdle() - .then(() => graphObjectStore.flush()) .then(() => resolve([...stepResultsMap.values()])) .catch(reject); }); @@ -306,6 +315,7 @@ function buildStepContext({ typeTracker, graphObjectStore, dataStore, + uploader, }: { stepId: string; context: ExecutionContext; @@ -313,6 +323,7 @@ function buildStepContext({ typeTracker: TypeTracker; graphObjectStore: GraphObjectStore; dataStore: MemoryDataStore; + uploader?: StepGraphObjectDataUploader; }): TStepExecutionContext { const stepExecutionContext: StepExecutionContext = { ...context, @@ -325,6 +336,7 @@ function buildStepContext({ typeTracker, graphObjectStore, dataStore, + uploader, }), }; diff --git a/packages/integration-sdk-runtime/src/execution/executeIntegration.ts b/packages/integration-sdk-runtime/src/execution/executeIntegration.ts index a421fb29a..14311db06 100644 --- a/packages/integration-sdk-runtime/src/execution/executeIntegration.ts +++ b/packages/integration-sdk-runtime/src/execution/executeIntegration.ts @@ -32,6 +32,7 @@ import { executeSteps, getDefaultStepStartStates, } from './step'; +import { StepGraphObjectDataUploader } from './uploader'; import { validateStepStartStates } from './validation'; export interface ExecuteIntegrationResult { @@ -44,10 +45,12 @@ export interface ExecuteIntegrationResult { export interface ExecuteIntegrationOptions { enableSchemaValidation?: boolean; graphObjectStore?: GraphObjectStore; + uploader?: StepGraphObjectDataUploader; } -interface ExecuteWithContextOptions { +export interface ExecuteWithContextOptions { graphObjectStore?: GraphObjectStore; + uploader?: StepGraphObjectDataUploader; } const THIRTY_SECONDS_STORAGE_INTERVAL_MS = 60000 / 2; @@ -111,9 +114,7 @@ export async function executeIntegrationInstance( executionHistory, }, config, - { - graphObjectStore: options.graphObjectStore, - }, + options, ), }); } @@ -203,7 +204,10 @@ export async function executeWithContext< }); }, THIRTY_SECONDS_STORAGE_INTERVAL_MS); - const { graphObjectStore = new FileSystemGraphObjectStore() } = options; + const { + graphObjectStore = new FileSystemGraphObjectStore(), + uploader, + } = options; const integrationStepResults = await executeSteps({ executionContext: context, @@ -213,6 +217,7 @@ export async function executeWithContext< config.normalizeGraphObjectKey, ), graphObjectStore, + uploader, }); const partialDatasets = determinePartialDatasetsFromStepExecutionResults( diff --git a/packages/integration-sdk-runtime/src/execution/jobState.ts b/packages/integration-sdk-runtime/src/execution/jobState.ts index 148ce8f28..1aed5ee0a 100644 --- a/packages/integration-sdk-runtime/src/execution/jobState.ts +++ b/packages/integration-sdk-runtime/src/execution/jobState.ts @@ -7,6 +7,7 @@ import { } from '@jupiterone/integration-sdk-core'; import { GraphObjectStore } from '../storage'; +import { StepGraphObjectDataUploader } from './uploader'; export interface DuplicateKeyTrackerGraphObjectMetadata { _type: string; @@ -78,6 +79,7 @@ export interface CreateStepJobStateParams { typeTracker: TypeTracker; graphObjectStore: GraphObjectStore; dataStore: MemoryDataStore; + uploader?: StepGraphObjectDataUploader; } export function createStepJobState({ @@ -86,6 +88,7 @@ export function createStepJobState({ typeTracker, graphObjectStore, dataStore, + uploader, }: CreateStepJobStateParams): JobState { const addEntities = async (entities: Entity[]): Promise => { entities.forEach((e) => { @@ -97,7 +100,12 @@ export function createStepJobState({ typeTracker.registerType(e._type); }); - await graphObjectStore.addEntities(stepId, entities); + await graphObjectStore.addEntities(stepId, entities, async (entities) => + uploader?.enqueue({ + entities, + relationships: [], + }), + ); return entities; }; @@ -111,7 +119,15 @@ export function createStepJobState({ typeTracker.registerType(r._type as string); }); - return graphObjectStore.addRelationships(stepId, relationships); + return graphObjectStore.addRelationships( + stepId, + relationships, + async (relationships) => + uploader?.enqueue({ + entities: [], + relationships, + }), + ); }; return { @@ -158,6 +174,22 @@ export function createStepJobState({ iterateRelationships: (filter, iteratee) => graphObjectStore.iterateRelationships(filter, iteratee), - flush: () => graphObjectStore.flush(), + flush: () => + graphObjectStore.flush( + async (entities) => + uploader?.enqueue({ + entities, + relationships: [], + }), + async (relationships) => + uploader?.enqueue({ + entities: [], + relationships, + }), + ), + + async waitUntilUploadsComplete() { + await uploader?.waitUntilUploadsComplete(); + }, }; } diff --git a/packages/integration-sdk-runtime/src/execution/step.ts b/packages/integration-sdk-runtime/src/execution/step.ts index d7c3c622b..50600f4f3 100644 --- a/packages/integration-sdk-runtime/src/execution/step.ts +++ b/packages/integration-sdk-runtime/src/execution/step.ts @@ -17,6 +17,7 @@ import { executeStepDependencyGraph, } from './dependencyGraph'; import { DuplicateKeyTracker } from './jobState'; +import { StepGraphObjectDataUploader } from './uploader'; export async function executeSteps< TExecutionContext extends ExecutionContext, @@ -27,12 +28,14 @@ export async function executeSteps< stepStartStates, duplicateKeyTracker, graphObjectStore, + uploader, }: { executionContext: TExecutionContext; integrationSteps: Step[]; stepStartStates: StepStartStates; duplicateKeyTracker: DuplicateKeyTracker; graphObjectStore: GraphObjectStore; + uploader?: StepGraphObjectDataUploader; }): Promise { return executeStepDependencyGraph({ executionContext, @@ -40,6 +43,7 @@ export async function executeSteps< stepStartStates, duplicateKeyTracker, graphObjectStore, + uploader, }); } diff --git a/packages/integration-sdk-runtime/src/execution/uploader.ts b/packages/integration-sdk-runtime/src/execution/uploader.ts new file mode 100644 index 000000000..351f92ec9 --- /dev/null +++ b/packages/integration-sdk-runtime/src/execution/uploader.ts @@ -0,0 +1,95 @@ +import { IntegrationError } from '@jupiterone/integration-sdk-core'; +import PQueue from 'p-queue/dist'; +import { FlushedGraphObjectData } from '../storage/types'; +import { + uploadGraphObjectData, + SynchronizationJobContext, +} from '../synchronization'; + +export interface StepGraphObjectDataUploader { + enqueue: (graphObjectData: FlushedGraphObjectData) => Promise; + waitUntilUploadsComplete: () => Promise; +} + +export interface CreateQueuedStepGraphObjectDataUploaderParams { + uploadConcurrency: number; + upload: (graphObjectData: FlushedGraphObjectData) => Promise; +} + +export function createQueuedStepGraphObjectDataUploader({ + uploadConcurrency, + upload, +}: CreateQueuedStepGraphObjectDataUploaderParams): StepGraphObjectDataUploader { + const queue = new PQueue({ + concurrency: uploadConcurrency, + }); + + const uploadErrors: Error[] = []; + + return { + async enqueue(graphObjectData) { + if (queue.isPaused) { + // This step already failed an upload. We do not want to enqueue more + // for this step. + return; + } + + // OPTIMIZATION: We do not want to buffer a lot of graph objects + // into memory inside of the queue. If the queue concurrency has been + // reached, we wait for the queue to flush so that this step has the + // opportunity to upload more data. + if (queue.size >= uploadConcurrency) { + await queue.onIdle(); + } + + queue + .add(() => upload(graphObjectData)) + .catch((err) => { + // Pause this queue and free up any memory on it. We do not want to + // continue processing if there is a failure uploading for a step. This + // step should get marked as "partial" and other steps can continue + // to run. + queue.pause(); + queue.clear(); + + uploadErrors.push(err); + }); + }, + + async waitUntilUploadsComplete() { + if (uploadErrors.length) { + throw new IntegrationError({ + code: 'UPLOAD_ERROR', + message: `Error(s) uploading graph object data (errorMessages=${uploadErrors.join( + ',', + )})`, + }); + } + + await queue.onIdle(); + }, + }; +} + +export interface CreatePersisterApiStepGraphObjectDataUploaderParams { + synchronizationJobContext: SynchronizationJobContext; + uploadConcurrency: number; +} + +export function createPersisterApiStepGraphObjectDataUploader({ + synchronizationJobContext, + uploadConcurrency, +}: CreatePersisterApiStepGraphObjectDataUploaderParams) { + return createQueuedStepGraphObjectDataUploader({ + uploadConcurrency, + upload(graphObjectData) { + // TODO: Remove this log + synchronizationJobContext.logger.trace('Uploading graph objects', { + entities: graphObjectData.entities.length, + relationships: graphObjectData.relationships.length, + }); + + return uploadGraphObjectData(synchronizationJobContext, graphObjectData); + }, + }); +} diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 50a98cbf2..7c95a98a2 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -48,20 +48,25 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD; } - async addEntities(storageDirectoryPath: string, newEntities: Entity[]) { + async addEntities( + storageDirectoryPath: string, + newEntities: Entity[], + onEntitiesFlushed?: (entities: Entity[]) => Promise, + ) { this.localGraphObjectStore.addEntities(storageDirectoryPath, newEntities); if ( this.localGraphObjectStore.getTotalEntityItemCount() >= this.graphObjectBufferThreshold ) { - await this.flushEntitiesToDisk(); + await this.flushEntitiesToDisk(onEntitiesFlushed); } } async addRelationships( storageDirectoryPath: string, newRelationships: Relationship[], + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, ) { this.localGraphObjectStore.addRelationships( storageDirectoryPath, @@ -72,7 +77,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { this.localGraphObjectStore.getTotalRelationshipItemCount() >= this.graphObjectBufferThreshold ) { - await this.flushRelationshipsToDisk(); + await this.flushRelationshipsToDisk(onRelationshipsFlushed); } } @@ -131,14 +136,19 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { }); } - async flush() { + async flush( + onEntitiesFlushed?: (entities: Entity[]) => Promise, + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + ) { await Promise.all([ - this.flushEntitiesToDisk(), - this.flushRelationshipsToDisk(), + this.flushEntitiesToDisk(onEntitiesFlushed), + this.flushRelationshipsToDisk(onRelationshipsFlushed), ]); } - async flushEntitiesToDisk() { + async flushEntitiesToDisk( + onEntitiesFlushed?: (entities: Entity[]) => Promise, + ) { await this.lockOperation(() => pMap( this.localGraphObjectStore.collectEntitiesByStep(), @@ -150,12 +160,18 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { }); this.localGraphObjectStore.flushEntities(entities); + + if (onEntitiesFlushed) { + await onEntitiesFlushed(entities); + } }, ), ); } - async flushRelationshipsToDisk() { + async flushRelationshipsToDisk( + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + ) { await this.lockOperation(() => pMap( this.localGraphObjectStore.collectRelationshipsByStep(), @@ -167,6 +183,10 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { }); this.localGraphObjectStore.flushRelationships(relationships); + + if (onRelationshipsFlushed) { + await onRelationshipsFlushed(relationships); + } }, ), ); diff --git a/packages/integration-sdk-runtime/src/synchronization/index.ts b/packages/integration-sdk-runtime/src/synchronization/index.ts index 901aed7e7..7e5f17a1a 100644 --- a/packages/integration-sdk-runtime/src/synchronization/index.ts +++ b/packages/integration-sdk-runtime/src/synchronization/index.ts @@ -170,6 +170,31 @@ async function getPartialDatasets() { return summary.metadata.partialDatasets; } +export async function uploadGraphObjectData( + synchronizationJobContext: SynchronizationJobContext, + graphObjectData: FlushedGraphObjectData, +) { + try { + if (Array.isArray(graphObjectData.entities)) { + await uploadData( + synchronizationJobContext, + 'entities', + graphObjectData.entities, + ); + } + + if (Array.isArray(graphObjectData.relationships)) { + await uploadData( + synchronizationJobContext, + 'relationships', + graphObjectData.relationships, + ); + } + } catch (err) { + throw synchronizationApiError(err, 'Error uploading collected data'); + } +} + /** * Uploads data collected by the integration into the */ @@ -183,28 +208,12 @@ export async function uploadCollectedData(context: SynchronizationJobContext) { walkDirectory({ path: 'graph', async iteratee({ filePath }) { - const parsedData = await readGraphObjectFile({ - filePath, - }); - - try { - if (Array.isArray(parsedData.entities)) { - await uploadData(context, 'entities', parsedData.entities); - } - - if (Array.isArray(parsedData.relationships)) { - await uploadData( - context, - 'relationships', - parsedData.relationships, - ); - } - } catch (err) { - throw synchronizationApiError( - err, - 'Error uploading collected data', - ); - } + await uploadGraphObjectData( + context, + await readGraphObjectFile({ + filePath, + }), + ); }, }), }); From 8d22caffd655e4192c03dcb3c1f3fd788da8f007 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Tue, 15 Dec 2020 19:51:31 -0500 Subject: [PATCH 02/13] More tests around continuous uploads and various improvements --- .../integration-sdk-cli/src/commands/run.ts | 13 +- .../src/graphObject.ts | 26 +++ .../src/index.ts | 2 + .../src/util.ts | 5 + .../src/execution/dependencyGraph.ts | 15 +- .../src/execution/executeIntegration.ts | 10 +- .../src/execution/step.ts | 8 +- .../src/execution/uploader.test.ts | 177 ++++++++++++++++++ .../src/execution/uploader.ts | 31 ++- 9 files changed, 261 insertions(+), 26 deletions(-) create mode 100644 packages/integration-sdk-private-test-utils/src/graphObject.ts create mode 100644 packages/integration-sdk-private-test-utils/src/util.ts create mode 100644 packages/integration-sdk-runtime/src/execution/uploader.test.ts diff --git a/packages/integration-sdk-cli/src/commands/run.ts b/packages/integration-sdk-cli/src/commands/run.ts index 544e9cc32..f235c0ff0 100644 --- a/packages/integration-sdk-cli/src/commands/run.ts +++ b/packages/integration-sdk-cli/src/commands/run.ts @@ -73,11 +73,6 @@ export function run() { const invocationConfig = await loadConfig(); - const uploader = createPersisterApiStepGraphObjectDataUploader({ - synchronizationJobContext: synchronizationContext, - uploadConcurrency: DEFAULT_UPLOAD_CONCURRENCY, - }); - try { const executionResults = await executeIntegrationInstance( logger, @@ -90,7 +85,13 @@ export function run() { }, { enableSchemaValidation: true, - uploader, + createStepGraphObjectDataUploader(stepId) { + return createPersisterApiStepGraphObjectDataUploader({ + stepId, + synchronizationJobContext: synchronizationContext, + uploadConcurrency: DEFAULT_UPLOAD_CONCURRENCY, + }); + }, }, ); diff --git a/packages/integration-sdk-private-test-utils/src/graphObject.ts b/packages/integration-sdk-private-test-utils/src/graphObject.ts new file mode 100644 index 000000000..5575c9e1d --- /dev/null +++ b/packages/integration-sdk-private-test-utils/src/graphObject.ts @@ -0,0 +1,26 @@ +import { Entity, ExplicitRelationship } from '@jupiterone/integration-sdk-core'; +import { v4 as uuid } from 'uuid'; + +export function createTestEntity(partial?: Partial): Entity { + return { + _key: uuid(), + _class: uuid(), + _type: uuid(), + [uuid()]: uuid(), + ...partial, + }; +} + +export function createTestRelationship( + partial?: Partial, +): ExplicitRelationship { + return { + _key: uuid(), + _toEntityKey: uuid(), + _fromEntityKey: uuid(), + _class: uuid(), + _type: uuid(), + [uuid()]: uuid(), + ...partial, + }; +} diff --git a/packages/integration-sdk-private-test-utils/src/index.ts b/packages/integration-sdk-private-test-utils/src/index.ts index a0959c6c8..8c2a76365 100644 --- a/packages/integration-sdk-private-test-utils/src/index.ts +++ b/packages/integration-sdk-private-test-utils/src/index.ts @@ -1,3 +1,5 @@ export * from './loadProjectStructure'; export * from './toUnixPath'; export * from './graphObjectStore'; +export * from './graphObject'; +export * from './util'; diff --git a/packages/integration-sdk-private-test-utils/src/util.ts b/packages/integration-sdk-private-test-utils/src/util.ts new file mode 100644 index 000000000..3e36c8125 --- /dev/null +++ b/packages/integration-sdk-private-test-utils/src/util.ts @@ -0,0 +1,5 @@ +export function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(() => resolve(), ms); + }); +} diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index 85c35f379..ba6cec10d 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -18,7 +18,10 @@ import { MemoryDataStore, TypeTracker, } from './jobState'; -import { StepGraphObjectDataUploader } from './uploader'; +import { + StepGraphObjectDataUploader, + CreateStepGraphObjectDataUploaderFunction, +} from './uploader'; /** * This function accepts a list of steps and constructs a dependency graph @@ -71,14 +74,14 @@ export function executeStepDependencyGraph< stepStartStates, duplicateKeyTracker, graphObjectStore, - uploader, + createStepGraphObjectDataUploader, }: { executionContext: TExecutionContext; inputGraph: DepGraph>; stepStartStates: StepStartStates; duplicateKeyTracker: DuplicateKeyTracker; graphObjectStore: GraphObjectStore; - uploader?: StepGraphObjectDataUploader; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; }): Promise { // create a clone of the dependencyGraph because mutating // the input graph is icky @@ -251,6 +254,12 @@ export function executeStepDependencyGraph< const { id: stepId } = step; const typeTracker = new TypeTracker(); + let uploader: StepGraphObjectDataUploader | undefined; + + if (createStepGraphObjectDataUploader) { + uploader = createStepGraphObjectDataUploader(stepId); + } + const context = buildStepContext({ context: executionContext, duplicateKeyTracker, diff --git a/packages/integration-sdk-runtime/src/execution/executeIntegration.ts b/packages/integration-sdk-runtime/src/execution/executeIntegration.ts index 14311db06..4dcb7b673 100644 --- a/packages/integration-sdk-runtime/src/execution/executeIntegration.ts +++ b/packages/integration-sdk-runtime/src/execution/executeIntegration.ts @@ -32,7 +32,7 @@ import { executeSteps, getDefaultStepStartStates, } from './step'; -import { StepGraphObjectDataUploader } from './uploader'; +import { CreateStepGraphObjectDataUploaderFunction } from './uploader'; import { validateStepStartStates } from './validation'; export interface ExecuteIntegrationResult { @@ -45,12 +45,12 @@ export interface ExecuteIntegrationResult { export interface ExecuteIntegrationOptions { enableSchemaValidation?: boolean; graphObjectStore?: GraphObjectStore; - uploader?: StepGraphObjectDataUploader; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; } export interface ExecuteWithContextOptions { graphObjectStore?: GraphObjectStore; - uploader?: StepGraphObjectDataUploader; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; } const THIRTY_SECONDS_STORAGE_INTERVAL_MS = 60000 / 2; @@ -206,7 +206,7 @@ export async function executeWithContext< const { graphObjectStore = new FileSystemGraphObjectStore(), - uploader, + createStepGraphObjectDataUploader, } = options; const integrationStepResults = await executeSteps({ @@ -217,7 +217,7 @@ export async function executeWithContext< config.normalizeGraphObjectKey, ), graphObjectStore, - uploader, + createStepGraphObjectDataUploader, }); const partialDatasets = determinePartialDatasetsFromStepExecutionResults( diff --git a/packages/integration-sdk-runtime/src/execution/step.ts b/packages/integration-sdk-runtime/src/execution/step.ts index 50600f4f3..e00aca256 100644 --- a/packages/integration-sdk-runtime/src/execution/step.ts +++ b/packages/integration-sdk-runtime/src/execution/step.ts @@ -17,7 +17,7 @@ import { executeStepDependencyGraph, } from './dependencyGraph'; import { DuplicateKeyTracker } from './jobState'; -import { StepGraphObjectDataUploader } from './uploader'; +import { CreateStepGraphObjectDataUploaderFunction } from './uploader'; export async function executeSteps< TExecutionContext extends ExecutionContext, @@ -28,14 +28,14 @@ export async function executeSteps< stepStartStates, duplicateKeyTracker, graphObjectStore, - uploader, + createStepGraphObjectDataUploader, }: { executionContext: TExecutionContext; integrationSteps: Step[]; stepStartStates: StepStartStates; duplicateKeyTracker: DuplicateKeyTracker; graphObjectStore: GraphObjectStore; - uploader?: StepGraphObjectDataUploader; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; }): Promise { return executeStepDependencyGraph({ executionContext, @@ -43,7 +43,7 @@ export async function executeSteps< stepStartStates, duplicateKeyTracker, graphObjectStore, - uploader, + createStepGraphObjectDataUploader, }); } diff --git a/packages/integration-sdk-runtime/src/execution/uploader.test.ts b/packages/integration-sdk-runtime/src/execution/uploader.test.ts new file mode 100644 index 000000000..f3471e063 --- /dev/null +++ b/packages/integration-sdk-runtime/src/execution/uploader.test.ts @@ -0,0 +1,177 @@ +import { FlushedGraphObjectData } from '../storage/types'; +import { + createPersisterApiStepGraphObjectDataUploader, + createQueuedStepGraphObjectDataUploader, + StepGraphObjectDataUploader, +} from './uploader'; +import { + sleep, + createTestEntity, + createTestRelationship, +} from '@jupiterone/integration-sdk-private-test-utils'; +import times from 'lodash/times'; +import { v4 as uuid } from 'uuid'; +import { SynchronizationJobContext } from '../synchronization'; +import { createApiClient, getApiBaseUrl } from '../api'; +import { generateSynchronizationJob } from '../synchronization/__tests__/util/generateSynchronizationJob'; + +function createFlushedGraphObjectData(): FlushedGraphObjectData { + return { + entities: [createTestEntity(), createTestEntity()], + relationships: [createTestRelationship(), createTestRelationship()], + }; +} + +async function createAndEnqueueUploads( + uploader: StepGraphObjectDataUploader, + n: number, +) { + const flushed = times(n, createFlushedGraphObjectData); + + for (const data of flushed) { + await uploader.enqueue(data); + } + + return flushed; +} + +async function createFlushedDataAndWaitForUploads( + uploader: StepGraphObjectDataUploader, + n: number, +) { + const flushed = await createAndEnqueueUploads(uploader, n); + await uploader.waitUntilUploadsComplete(); + return flushed; +} + +describe('#createQueuedStepGraphObjectDataUploader', () => { + test('should wait for all enqueued uploads to complete', async () => { + const uploaded: FlushedGraphObjectData[] = []; + let numQueued = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: Infinity, + async upload(d) { + if (numQueued) { + await sleep(100); + uploaded.push(d); + } else { + numQueued++; + await sleep(200); + uploaded.push(d); + } + + numQueued++; + }, + }); + + const flushed = await createFlushedDataAndWaitForUploads(uploader, 2); + expect(uploaded).toEqual(flushed.reverse()); + }); + + test('should throttle enqueuing if concurrency threshold hit', async () => { + const uploaded: FlushedGraphObjectData[] = []; + let throttleCount = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: 2, + async upload(d) { + await sleep(300); + uploaded.push(d); + }, + onThrottleEnqueue() { + throttleCount++; + }, + }); + + const flushed = await createFlushedDataAndWaitForUploads(uploader, 6); + expect(uploaded).toEqual(flushed); + expect(throttleCount).toEqual(2); + }); + + test('should pause and clear queue if error occurs', async () => { + const uploaded: FlushedGraphObjectData[] = []; + const stepId = uuid(); + const expectedErrorMessage = `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=Error: expected upload error)`; + + let numQueued = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId, + uploadConcurrency: 2, + async upload(d) { + numQueued++; + + if (numQueued === 2) { + await sleep(100); + throw new Error('expected upload error'); + } else { + await sleep(200); + uploaded.push(d); + } + }, + }); + + const flushed = await createAndEnqueueUploads(uploader, 3); + + await expect(uploader.waitUntilUploadsComplete()).rejects.toThrowError( + expectedErrorMessage, + ); + + const flushedAfterPause = createFlushedGraphObjectData(); + await uploader.enqueue(flushedAfterPause); + + await expect(uploader.waitUntilUploadsComplete()).rejects.toThrowError( + expectedErrorMessage, + ); + + expect(uploaded).toEqual([flushed[0]]); + }); +}); + +describe('#createPersisterApiStepGraphObjectDataUploader', () => { + test('should upload to persister API', async () => { + const accountId = uuid(); + + const apiClient = createApiClient({ + apiBaseUrl: getApiBaseUrl(), + account: accountId, + }); + + const postSpy = jest.spyOn(apiClient, 'post').mockResolvedValue({}); + + const job = generateSynchronizationJob(); + const synchronizationJobContext: SynchronizationJobContext = { + logger: ({} as unknown) as any, + apiClient, + job, + }; + + const uploader = createPersisterApiStepGraphObjectDataUploader({ + synchronizationJobContext, + stepId: uuid(), + uploadConcurrency: 2, + }); + + const flushed = await createFlushedDataAndWaitForUploads(uploader, 3); + expect(postSpy).toHaveBeenCalledTimes(6); + + for (const { entities, relationships } of flushed) { + expect(postSpy).toHaveBeenCalledWith( + `/persister/synchronization/jobs/${job.id}/entities`, + { + entities, + }, + ); + + expect(postSpy).toHaveBeenCalledWith( + `/persister/synchronization/jobs/${job.id}/relationships`, + { + relationships, + }, + ); + } + }); +}); diff --git a/packages/integration-sdk-runtime/src/execution/uploader.ts b/packages/integration-sdk-runtime/src/execution/uploader.ts index 351f92ec9..02ec670a2 100644 --- a/packages/integration-sdk-runtime/src/execution/uploader.ts +++ b/packages/integration-sdk-runtime/src/execution/uploader.ts @@ -7,18 +7,27 @@ import { } from '../synchronization'; export interface StepGraphObjectDataUploader { + stepId: string; enqueue: (graphObjectData: FlushedGraphObjectData) => Promise; waitUntilUploadsComplete: () => Promise; } +export type CreateStepGraphObjectDataUploaderFunction = ( + stepId: string, +) => StepGraphObjectDataUploader; + export interface CreateQueuedStepGraphObjectDataUploaderParams { + stepId: string; uploadConcurrency: number; upload: (graphObjectData: FlushedGraphObjectData) => Promise; + onThrottleEnqueue?: () => void; } export function createQueuedStepGraphObjectDataUploader({ + stepId, uploadConcurrency, upload, + onThrottleEnqueue, }: CreateQueuedStepGraphObjectDataUploaderParams): StepGraphObjectDataUploader { const queue = new PQueue({ concurrency: uploadConcurrency, @@ -27,6 +36,7 @@ export function createQueuedStepGraphObjectDataUploader({ const uploadErrors: Error[] = []; return { + stepId, async enqueue(graphObjectData) { if (queue.isPaused) { // This step already failed an upload. We do not want to enqueue more @@ -38,7 +48,15 @@ export function createQueuedStepGraphObjectDataUploader({ // into memory inside of the queue. If the queue concurrency has been // reached, we wait for the queue to flush so that this step has the // opportunity to upload more data. - if (queue.size >= uploadConcurrency) { + if ( + queue.pending >= uploadConcurrency || + queue.size >= uploadConcurrency + ) { + if (onThrottleEnqueue) { + // Mainly just used for testing that our custom throttling works. + onThrottleEnqueue(); + } + await queue.onIdle(); } @@ -60,7 +78,7 @@ export function createQueuedStepGraphObjectDataUploader({ if (uploadErrors.length) { throw new IntegrationError({ code: 'UPLOAD_ERROR', - message: `Error(s) uploading graph object data (errorMessages=${uploadErrors.join( + message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join( ',', )})`, }); @@ -72,23 +90,20 @@ export function createQueuedStepGraphObjectDataUploader({ } export interface CreatePersisterApiStepGraphObjectDataUploaderParams { + stepId: string; synchronizationJobContext: SynchronizationJobContext; uploadConcurrency: number; } export function createPersisterApiStepGraphObjectDataUploader({ + stepId, synchronizationJobContext, uploadConcurrency, }: CreatePersisterApiStepGraphObjectDataUploaderParams) { return createQueuedStepGraphObjectDataUploader({ + stepId, uploadConcurrency, upload(graphObjectData) { - // TODO: Remove this log - synchronizationJobContext.logger.trace('Uploading graph objects', { - entities: graphObjectData.entities.length, - relationships: graphObjectData.relationships.length, - }); - return uploadGraphObjectData(synchronizationJobContext, graphObjectData); }, }); From 81963394f92af4e293a22f18fa08acab35447c7c Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Tue, 15 Dec 2020 20:18:20 -0500 Subject: [PATCH 03/13] Additional test for FileSystemGraphObjectStore callbacks --- .../FileSystemGraphObjectStore.test.ts | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index 49b5533f8..218e9010f 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -426,6 +426,186 @@ describe('iterateRelationships', () => { }); }); +describe('flush callbacks', () => { + test('#addEntity should call flush callback when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 2, + }); + + let flushedEntitiesCollected: Entity[] = []; + let addEntitiesFlushCalledTimes = 0; + + const e1 = generateEntity(); + const e2 = generateEntity(); + const e3 = generateEntity(); + + await store.addEntities(storageDirectoryPath, [e1], async (entities) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + }); + + await store.addEntities( + storageDirectoryPath, + [e2, e3], + async (entities) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + }, + ); + + expect(addEntitiesFlushCalledTimes).toEqual(1); + expect(flushedEntitiesCollected).toEqual([e1, e2, e3]); + }); + + test('#addRelationships should call flush callback when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 2, + }); + + let flushedRelationshipsCollected: Relationship[] = []; + let addRelationshipsFlushCalledTimes = 0; + + const r1 = generateRelationship(); + const r2 = generateRelationship(); + const r3 = generateRelationship(); + + await store.addRelationships( + storageDirectoryPath, + [r1], + async (relationships) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addRelationshipsFlushCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + }, + ); + + await store.addRelationships( + storageDirectoryPath, + [r2, r3], + async (relationships) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addRelationshipsFlushCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + }, + ); + + expect(addRelationshipsFlushCalledTimes).toEqual(1); + expect(flushedRelationshipsCollected).toEqual([r1, r2, r3]); + }); + + test('#flushEntitiesToDisk should call flush callback when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 10, + }); + + let flushedEntitiesCollected: Entity[] = []; + let addEntitiesFlushCalledTimes = 0; + + const entities = times(3, () => generateEntity()); + + async function onEntitiesFlushed(entities) { + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + } + + await store.addEntities(storageDirectoryPath, entities, onEntitiesFlushed); + + expect(addEntitiesFlushCalledTimes).toEqual(0); + expect(flushedEntitiesCollected).toEqual([]); + + await store.flushEntitiesToDisk(onEntitiesFlushed); + + expect(addEntitiesFlushCalledTimes).toEqual(1); + expect(flushedEntitiesCollected).toEqual(entities); + }); + + test('#flushRelationshipsToDisk should call flush callback when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 10, + }); + + let flushedRelationshipsCollected: Relationship[] = []; + let addRelationshipsFlushedCalledTimes = 0; + + const relationships = times(3, () => generateRelationship()); + + async function onRelationshipsFlushed(relationships) { + addRelationshipsFlushedCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + } + + await store.addRelationships( + storageDirectoryPath, + relationships, + onRelationshipsFlushed, + ); + + expect(addRelationshipsFlushedCalledTimes).toEqual(0); + expect(flushedRelationshipsCollected).toEqual([]); + + await store.flushRelationshipsToDisk(onRelationshipsFlushed); + + expect(addRelationshipsFlushedCalledTimes).toEqual(1); + expect(flushedRelationshipsCollected).toEqual(relationships); + }); + + test('#flush should call both entity and relationship flush callbacks when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 10, + }); + + let flushedRelationshipsCollected: Relationship[] = []; + let addRelationshipsFlushedCalledTimes = 0; + + let flushedEntitiesCollected: Entity[] = []; + let addEntitiesFlushCalledTimes = 0; + + const entities = times(3, () => generateEntity()); + const relationships = times(3, () => generateRelationship()); + + async function onEntitiesFlushed(entities) { + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + } + + async function onRelationshipsFlushed(relationships) { + addRelationshipsFlushedCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + } + + await store.addRelationships( + storageDirectoryPath, + relationships, + onRelationshipsFlushed, + ); + await store.addEntities(storageDirectoryPath, entities, onEntitiesFlushed); + await store.flush(onEntitiesFlushed, onRelationshipsFlushed); + + expect(addEntitiesFlushCalledTimes).toEqual(1); + expect(addRelationshipsFlushedCalledTimes).toEqual(1); + expect(flushedEntitiesCollected).toEqual(entities); + expect(flushedRelationshipsCollected).toEqual(relationships); + }); +}); + function setupFileSystemObjectStore(params?: FileSystemGraphObjectStoreParams) { const storageDirectoryPath = uuid(); const store = new FileSystemGraphObjectStore(params); From 1271491c81b009296056c2b07401c0b35877f42f Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 07:51:39 -0500 Subject: [PATCH 04/13] Mark step as a failure if uploading fails in a step --- .../__tests__/dependencyGraph.test.ts | 158 ++++++++++++++++++ .../src/execution/dependencyGraph.ts | 10 +- 2 files changed, 167 insertions(+), 1 deletion(-) diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts index 13d6d472b..c7c052192 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts @@ -31,6 +31,12 @@ import { import { LOCAL_INTEGRATION_INSTANCE } from '../instance'; import { DuplicateKeyTracker } from '../jobState'; import { getDefaultStepStartStates } from '../step'; +import { + CreateStepGraphObjectDataUploaderFunction, + StepGraphObjectDataUploader, +} from '../uploader'; +import { FlushedGraphObjectData } from '../../storage/types'; +import { createTestEntity } from '@jupiterone/integration-sdk-private-test-utils'; jest.mock('fs'); @@ -134,6 +140,7 @@ describe('executeStepDependencyGraph', () => { steps: IntegrationStep[], stepStartStates: StepStartStates = getDefaultStepStartStates(steps), graphObjectStore: GraphObjectStore = new FileSystemGraphObjectStore(), + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction, ) { return executeStepDependencyGraph({ executionContext, @@ -141,6 +148,7 @@ describe('executeStepDependencyGraph', () => { stepStartStates, duplicateKeyTracker: new DuplicateKeyTracker(), graphObjectStore, + createStepGraphObjectDataUploader, }); } @@ -622,6 +630,156 @@ describe('executeStepDependencyGraph', () => { expect(spyB).toHaveBeenCalledBefore(spyC); }); + test('should mark steps failed executionHandlers with status FAILURE a dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { + const spyA = jest.fn(); + const spyB = jest.fn(); + const spyC = jest.fn(); + + const eA = createTestEntity(); + const eB = createTestEntity(); + const eC = createTestEntity(); + + const steps: IntegrationStep[] = [ + { + id: 'a', + name: 'a', + entities: [], + relationships: [], + async executionHandler({ jobState }) { + await jobState.addEntity(eA); + spyA(); + }, + }, + { + id: 'b', + name: 'b', + entities: [], + relationships: [], + dependsOn: ['a'], + async executionHandler({ jobState }) { + await jobState.addEntity(eB); + spyB(); + }, + }, + { + id: 'c', + name: 'c', + entities: [], + relationships: [], + dependsOn: ['b'], + async executionHandler({ jobState }) { + await jobState.addEntity(eC); + spyC(); + }, + }, + ]; + + const stepStartStates = getDefaultStepStartStates(steps); + const graphObjectStore = new FileSystemGraphObjectStore(); + + function createPassingUploader( + stepId: string, + collector: FlushedGraphObjectData[], + ): StepGraphObjectDataUploader { + return { + stepId, + async enqueue(graphObjectData) { + collector.push(graphObjectData); + return Promise.resolve(); + }, + waitUntilUploadsComplete() { + return Promise.resolve(); + }, + }; + } + + function createFailingUploader( + stepId: string, + ): StepGraphObjectDataUploader { + return { + stepId, + async enqueue() { + return Promise.resolve(); + }, + waitUntilUploadsComplete() { + return Promise.reject(new Error('expected upload wait failure')); + }, + }; + } + + const passingUploaderCollector: FlushedGraphObjectData[] = []; + + /** + * Graph: + * a - b - c + * + * In this situation, 'a' is the leaf node + * 'b' depends on 'a', + * 'c' depends on 'b' + */ + const result = await executeSteps( + steps, + stepStartStates, + graphObjectStore, + (stepId) => { + if (stepId === 'b') { + return createFailingUploader(stepId); + } else { + return createPassingUploader(stepId, passingUploaderCollector); + } + }, + ); + + const expectedCollected: FlushedGraphObjectData[] = [ + { + entities: [eA], + relationships: [], + }, + { + entities: [eC], + relationships: [], + }, + ]; + + expect(passingUploaderCollector).toEqual(expectedCollected); + + expect(result).toEqual([ + { + id: 'a', + name: 'a', + declaredTypes: [], + partialTypes: [], + encounteredTypes: [eA._type], + status: StepResultStatus.SUCCESS, + }, + { + id: 'b', + name: 'b', + declaredTypes: [], + partialTypes: [], + encounteredTypes: [eB._type], + dependsOn: ['a'], + status: StepResultStatus.FAILURE, + }, + { + id: 'c', + name: 'c', + declaredTypes: [], + partialTypes: [], + encounteredTypes: [eC._type], + dependsOn: ['b'], + status: StepResultStatus.PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE, + }, + ]); + + expect(spyA).toHaveBeenCalledTimes(1); + expect(spyB).toHaveBeenCalledTimes(1); + expect(spyC).toHaveBeenCalledTimes(1); + + expect(spyA).toHaveBeenCalledBefore(spyB); + expect(spyB).toHaveBeenCalledBefore(spyC); + }); + test('logs error after step fails', async () => { const error = new IntegrationError({ code: 'ABC-123', diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index ba6cec10d..25e2aa6fa 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -300,7 +300,15 @@ export function executeStepDependencyGraph< await context.jobState.flush(); if (context.jobState.waitUntilUploadsComplete) { - await context.jobState.waitUntilUploadsComplete(); + try { + // Failing to upload all integration data should not be considered a + // fatal failure. We just want to make this step as a partial success + // and move on with our lives! + await context.jobState.waitUntilUploadsComplete(); + } catch (err) { + context.logger.stepFailure(step, err); + status = StepResultStatus.FAILURE; + } } updateStepResultStatus(stepId, status, typeTracker); From b05beb82e7ea8daac73e4ebb009e36f5db5388d0 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 08:13:18 -0500 Subject: [PATCH 05/13] Export relevant functions and types from uploader --- packages/integration-sdk-runtime/src/execution/index.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/integration-sdk-runtime/src/execution/index.ts b/packages/integration-sdk-runtime/src/execution/index.ts index f9439b107..0ccc2a359 100644 --- a/packages/integration-sdk-runtime/src/execution/index.ts +++ b/packages/integration-sdk-runtime/src/execution/index.ts @@ -7,3 +7,11 @@ export { prepareLocalStepCollection } from './step'; export { loadConfigFromEnvironmentVariables } from './config'; export { DuplicateKeyTracker, TypeTracker, MemoryDataStore } from './jobState'; export { buildStepDependencyGraph } from './dependencyGraph'; +export { + StepGraphObjectDataUploader, + CreateStepGraphObjectDataUploaderFunction, + CreateQueuedStepGraphObjectDataUploaderParams, + createQueuedStepGraphObjectDataUploader, + CreatePersisterApiStepGraphObjectDataUploaderParams, + createPersisterApiStepGraphObjectDataUploader, +} from './uploader'; From 5c0ad9893ffa8f74842ec9c532440a48ba7bb1c5 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 11:37:34 -0500 Subject: [PATCH 06/13] Remove old comment, update test descriptions. --- packages/integration-sdk-cli/src/commands/run.ts | 1 - .../src/execution/__tests__/dependencyGraph.test.ts | 2 +- .../__tests__/FileSystemGraphObjectStore.test.ts | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/integration-sdk-cli/src/commands/run.ts b/packages/integration-sdk-cli/src/commands/run.ts index f235c0ff0..b6def649a 100644 --- a/packages/integration-sdk-cli/src/commands/run.ts +++ b/packages/integration-sdk-cli/src/commands/run.ts @@ -13,7 +13,6 @@ import { getApiBaseUrl, getApiKeyFromEnvironment, initiateSynchronization, - // uploadCollectedData } from '@jupiterone/integration-sdk-runtime'; import { loadConfig } from '../config'; diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts index c7c052192..b8efa66d7 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts @@ -630,7 +630,7 @@ describe('executeStepDependencyGraph', () => { expect(spyB).toHaveBeenCalledBefore(spyC); }); - test('should mark steps failed executionHandlers with status FAILURE a dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { + test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { const spyA = jest.fn(); const spyB = jest.fn(); const spyC = jest.fn(); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index 218e9010f..a9b0b993b 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -503,7 +503,7 @@ describe('flush callbacks', () => { expect(flushedRelationshipsCollected).toEqual([r1, r2, r3]); }); - test('#flushEntitiesToDisk should call flush callback when buffer threshold reached', async () => { + test('#flushEntitiesToDisk should call flush callback when flushEntitiesToDisk called', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore({ graphObjectBufferThreshold: 10, }); @@ -530,7 +530,7 @@ describe('flush callbacks', () => { expect(flushedEntitiesCollected).toEqual(entities); }); - test('#flushRelationshipsToDisk should call flush callback when buffer threshold reached', async () => { + test('#flushRelationshipsToDisk should call flush callback when flushRelationshipsToDisk called', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore({ graphObjectBufferThreshold: 10, }); @@ -563,7 +563,7 @@ describe('flush callbacks', () => { expect(flushedRelationshipsCollected).toEqual(relationships); }); - test('#flush should call both entity and relationship flush callbacks when buffer threshold reached', async () => { + test('#flush should call both entity and relationship flush callbacks when flush called', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore({ graphObjectBufferThreshold: 10, }); From b1c08047bbb31959dde0dd5178ce2c4d592d3e39 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 06:16:37 -0500 Subject: [PATCH 07/13] Share graph object creation test utils across tests and cleanup --- .../src/execution/__tests__/jobState.test.ts | 39 +++++------ .../FileSystemGraphObjectStore.test.ts | 65 ++++++++++--------- .../__tests__/flushDataToDisk.test.ts | 8 +-- .../__tests__/util/graphObjects.ts | 24 ------- 4 files changed, 59 insertions(+), 77 deletions(-) delete mode 100644 packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts index ac404f6bc..9bf76a9ae 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts @@ -8,35 +8,36 @@ import { import { v4 as uuid } from 'uuid'; import { FileSystemGraphObjectStore } from '../../storage'; import { vol } from 'memfs'; -import { - Entity, - KeyNormalizationFunction, -} from '@jupiterone/integration-sdk-core'; +import { Entity } from '@jupiterone/integration-sdk-core'; jest.mock('fs'); -function getMockCreateStepJobStateParams(options?: { - normalizeGraphObjectKey?: KeyNormalizationFunction; -}): CreateStepJobStateParams { +function getMockCreateStepJobStateParams( + partial?: Partial, +): CreateStepJobStateParams { return { stepId: uuid(), graphObjectStore: new FileSystemGraphObjectStore(), - duplicateKeyTracker: new DuplicateKeyTracker( - options?.normalizeGraphObjectKey, - ), + duplicateKeyTracker: new DuplicateKeyTracker(), typeTracker: new TypeTracker(), dataStore: new MemoryDataStore(), + ...partial, }; } +function createTestStepJobStateState( + params?: Partial, +) { + return createStepJobState(getMockCreateStepJobStateParams(params)); +} + describe('#createStepJobState', () => { afterEach(() => { vol.reset(); }); test('should allow creating job state and adding a single entity with "addEntity"', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobStateState(); const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -48,8 +49,7 @@ describe('#createStepJobState', () => { }); test('should allow creating job state and adding a multiple entities with "addEntities"', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobStateState(); const entities: Entity[] = [ { _type: 'a_entity', @@ -74,8 +74,7 @@ describe('#findEntity', () => { }); test('should find entity by _key', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobStateState(); const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -87,10 +86,12 @@ describe('#findEntity', () => { }); test('should find entity by _key with key normalization', async () => { - const params = getMockCreateStepJobStateParams({ - normalizeGraphObjectKey: (_key) => _key.toLowerCase(), + const jobState = createTestStepJobStateState({ + duplicateKeyTracker: new DuplicateKeyTracker((_key) => + _key.toLowerCase(), + ), }); - const jobState = createStepJobState(params); + const entity: Entity = { _type: 'a_entity', _class: 'A', diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index a9b0b993b..c1bf3072f 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -13,7 +13,10 @@ import { FileSystemGraphObjectStoreParams, } from '../FileSystemGraphObjectStore'; -import { generateEntity, generateRelationship } from './util/graphObjects'; +import { + createTestEntity, + createTestRelationship, +} from '@jupiterone/integration-sdk-private-test-utils'; import { Entity, @@ -33,7 +36,7 @@ describe('flushEntitiesToDisk', () => { test('should write entities to the graph directory and symlink files to the index directory', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const entityType = uuid(); - const entities = times(25, () => generateEntity({ _type: entityType })); + const entities = times(25, () => createTestEntity({ _type: entityType })); await store.addEntities(storageDirectoryPath, entities); await store.flushEntitiesToDisk(); @@ -75,7 +78,7 @@ describe('flushRelationshipsToDisk', () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const relationshipType = uuid(); const relationships = times(25, () => - generateRelationship({ _type: relationshipType }), + createTestRelationship({ _type: relationshipType }), ); await store.addRelationships(storageDirectoryPath, relationships); @@ -118,9 +121,9 @@ describe('flushRelationshipsToDisk', () => { describe('flush', () => { test('should flush both entities and relationships to disk', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); - await store.addEntities(storageDirectoryPath, [generateEntity()]); + await store.addEntities(storageDirectoryPath, [createTestEntity()]); await store.addRelationships(storageDirectoryPath, [ - generateRelationship(), + createTestRelationship(), ]); const flushEntitiesSpy = jest.spyOn(store, 'flushEntitiesToDisk'); @@ -137,7 +140,7 @@ describe('addEntities', () => { test('should automatically flush entities to disk after hitting a certain threshold', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const entities = times(DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD - 1, () => - generateEntity(), + createTestEntity(), ); const flushEntitiesSpy = jest.spyOn(store, 'flushEntitiesToDisk'); @@ -147,7 +150,7 @@ describe('addEntities', () => { expect(flushEntitiesSpy).toHaveBeenCalledTimes(0); // adding an additional entity should trigger the flushing - await store.addEntities(storageDirectoryPath, [generateEntity()]); + await store.addEntities(storageDirectoryPath, [createTestEntity()]); expect(flushEntitiesSpy).toHaveBeenCalledTimes(1); }); @@ -185,7 +188,7 @@ describe('addRelationships', () => { test('should automatically flush relationships to disk after hitting a certain threshold', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const relationships = times(DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD - 1, () => - generateRelationship(), + createTestRelationship(), ); const flushRelationshipsSpy = jest.spyOn(store, 'flushRelationshipsToDisk'); @@ -196,7 +199,7 @@ describe('addRelationships', () => { // adding an additional relationship should trigger the flushing await store.addRelationships(storageDirectoryPath, [ - generateRelationship(), + createTestRelationship(), ]); expect(flushRelationshipsSpy).toHaveBeenCalledTimes(1); }); @@ -251,8 +254,8 @@ describe('getEntity', () => { const _type = uuid(); const _key = uuid(); - const nonMatchingEntities = times(25, () => generateEntity({ _type })); - const matchingEntity = generateEntity({ _type, _key }); + const nonMatchingEntities = times(25, () => createTestEntity({ _type })); + const matchingEntity = createTestEntity({ _type, _key }); await store.addEntities(storageDirectoryPath, [ ...nonMatchingEntities, @@ -270,7 +273,7 @@ describe('getEntity', () => { const _type = uuid(); - const entities = times(2, () => generateEntity({ _type })); + const entities = times(2, () => createTestEntity({ _type })); await store.addEntities(storageDirectoryPath, entities); const entity = await store.getEntity({ _key: entities[1]._key, _type }); @@ -285,10 +288,10 @@ describe('iterateEntities', () => { const matchingType = uuid(); const nonMatchingEntities = times(25, () => - generateEntity({ _type: uuid() }), + createTestEntity({ _type: uuid() }), ); const matchingEntities = times(25, () => - generateEntity({ _type: matchingType }), + createTestEntity({ _type: matchingType }), ); await store.addEntities(storageDirectoryPath, [ @@ -298,7 +301,7 @@ describe('iterateEntities', () => { await store.flushEntitiesToDisk(); - const bufferedEntity = generateEntity({ _type: matchingType }); + const bufferedEntity = createTestEntity({ _type: matchingType }); await store.addEntities(storageDirectoryPath, [bufferedEntity]); const collectedEntities = new Map(); @@ -326,7 +329,7 @@ describe('iterateEntities', () => { type TestEntity = Entity & { randomField: string }; const entities = times(25, () => - generateEntity({ _type: entityType, randomField: 'field' }), + createTestEntity({ _type: entityType, randomField: 'field' }), ); await store.addEntities(storageDirectoryPath, entities); @@ -358,10 +361,10 @@ describe('iterateRelationships', () => { const matchingType = uuid(); const nonMatchingRelationships = times(25, () => - generateRelationship({ _type: uuid() }), + createTestRelationship({ _type: uuid() }), ); const matchingRelationships = times(25, () => - generateRelationship({ _type: matchingType }), + createTestRelationship({ _type: matchingType }), ); await store.addRelationships(storageDirectoryPath, [ @@ -370,7 +373,9 @@ describe('iterateRelationships', () => { ]); await store.flush(); - const bufferedRelationship = generateRelationship({ _type: matchingType }); + const bufferedRelationship = createTestRelationship({ + _type: matchingType, + }); await store.addRelationships(storageDirectoryPath, [bufferedRelationship]); const collectedRelationships = new Map(); @@ -401,7 +406,7 @@ describe('iterateRelationships', () => { type TestRelationship = Relationship & { randomField: string }; const relationships = times(25, () => - generateRelationship({ _type: relationshipType, randomField: 'field' }), + createTestRelationship({ _type: relationshipType, randomField: 'field' }), ); await store.addRelationships(storageDirectoryPath, relationships); @@ -435,9 +440,9 @@ describe('flush callbacks', () => { let flushedEntitiesCollected: Entity[] = []; let addEntitiesFlushCalledTimes = 0; - const e1 = generateEntity(); - const e2 = generateEntity(); - const e3 = generateEntity(); + const e1 = createTestEntity(); + const e2 = createTestEntity(); + const e3 = createTestEntity(); await store.addEntities(storageDirectoryPath, [e1], async (entities) => { // Should not call first time because `graphObjectBufferThreshold` = 2 @@ -469,9 +474,9 @@ describe('flush callbacks', () => { let flushedRelationshipsCollected: Relationship[] = []; let addRelationshipsFlushCalledTimes = 0; - const r1 = generateRelationship(); - const r2 = generateRelationship(); - const r3 = generateRelationship(); + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + const r3 = createTestRelationship(); await store.addRelationships( storageDirectoryPath, @@ -511,7 +516,7 @@ describe('flush callbacks', () => { let flushedEntitiesCollected: Entity[] = []; let addEntitiesFlushCalledTimes = 0; - const entities = times(3, () => generateEntity()); + const entities = times(3, () => createTestEntity()); async function onEntitiesFlushed(entities) { addEntitiesFlushCalledTimes++; @@ -538,7 +543,7 @@ describe('flush callbacks', () => { let flushedRelationshipsCollected: Relationship[] = []; let addRelationshipsFlushedCalledTimes = 0; - const relationships = times(3, () => generateRelationship()); + const relationships = times(3, () => createTestRelationship()); async function onRelationshipsFlushed(relationships) { addRelationshipsFlushedCalledTimes++; @@ -574,8 +579,8 @@ describe('flush callbacks', () => { let flushedEntitiesCollected: Entity[] = []; let addEntitiesFlushCalledTimes = 0; - const entities = times(3, () => generateEntity()); - const relationships = times(3, () => generateRelationship()); + const entities = times(3, () => createTestEntity()); + const relationships = times(3, () => createTestRelationship()); async function onEntitiesFlushed(entities) { addEntitiesFlushCalledTimes++; diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts index 6d2156782..608dfec21 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts @@ -10,7 +10,7 @@ import flatten from 'lodash/flatten'; import { getRootStorageDirectory } from '../../../fileSystem'; import { flushDataToDisk } from '../flushDataToDisk'; -import { generateEntity } from './util/graphObjects'; +import { createTestEntity } from '@jupiterone/integration-sdk-private-test-utils'; jest.mock('fs'); @@ -18,9 +18,9 @@ afterEach(() => vol.reset()); test('should group objects by "_type" and write them to separate files', async () => { const testEntityData = { - A: times(25, () => generateEntity({ _type: 'A' })), - B: times(25, () => generateEntity({ _type: 'B' })), - C: times(25, () => generateEntity({ _type: 'C' })), + A: times(25, () => createTestEntity({ _type: 'A' })), + B: times(25, () => createTestEntity({ _type: 'B' })), + C: times(25, () => createTestEntity({ _type: 'C' })), }; const allEntities = randomizeOrder(flatten(Object.values(testEntityData))); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts deleted file mode 100644 index f5a551462..000000000 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { v4 as uuid } from 'uuid'; -import { Entity, Relationship } from '@jupiterone/integration-sdk-core'; - -export function generateEntity(overrides?: Partial): Entity { - return { - _key: uuid(), - _type: uuid(), - _class: uuid(), - ...overrides, - }; -} - -export function generateRelationship( - overrides?: Partial, -): Relationship { - return { - _key: uuid(), - _type: uuid(), - _class: uuid(), - _toEntityKey: uuid(), - _fromEntityKey: uuid(), - ...overrides, - }; -} From 43e57a908e483812ceba54fcd777a1367889c5d9 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 11:39:46 -0500 Subject: [PATCH 08/13] Fix typo in test function --- .../src/execution/__tests__/jobState.test.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts index 9bf76a9ae..f465a6d5a 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts @@ -25,9 +25,7 @@ function getMockCreateStepJobStateParams( }; } -function createTestStepJobStateState( - params?: Partial, -) { +function createTestStepJobState(params?: Partial) { return createStepJobState(getMockCreateStepJobStateParams(params)); } @@ -37,7 +35,7 @@ describe('#createStepJobState', () => { }); test('should allow creating job state and adding a single entity with "addEntity"', async () => { - const jobState = createTestStepJobStateState(); + const jobState = createTestStepJobState(); const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -49,7 +47,7 @@ describe('#createStepJobState', () => { }); test('should allow creating job state and adding a multiple entities with "addEntities"', async () => { - const jobState = createTestStepJobStateState(); + const jobState = createTestStepJobState(); const entities: Entity[] = [ { _type: 'a_entity', @@ -74,7 +72,7 @@ describe('#findEntity', () => { }); test('should find entity by _key', async () => { - const jobState = createTestStepJobStateState(); + const jobState = createTestStepJobState(); const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -86,7 +84,7 @@ describe('#findEntity', () => { }); test('should find entity by _key with key normalization', async () => { - const jobState = createTestStepJobStateState({ + const jobState = createTestStepJobState({ duplicateKeyTracker: new DuplicateKeyTracker((_key) => _key.toLowerCase(), ), From 2b1052caac63a0ac706f4299cb464d708348281d Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 06:42:49 -0500 Subject: [PATCH 09/13] Add tests for job state upload calls --- .../src/execution/__tests__/jobState.test.ts | 160 +++++++++++++++++- 1 file changed, 158 insertions(+), 2 deletions(-) diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts index f465a6d5a..6395fd963 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts @@ -9,9 +9,40 @@ import { v4 as uuid } from 'uuid'; import { FileSystemGraphObjectStore } from '../../storage'; import { vol } from 'memfs'; import { Entity } from '@jupiterone/integration-sdk-core'; +import { + createTestEntity, + createTestRelationship, + sleep, +} from '@jupiterone/integration-sdk-private-test-utils'; +import { + createQueuedStepGraphObjectDataUploader, + CreateQueuedStepGraphObjectDataUploaderParams, +} from '../uploader'; +import { FlushedGraphObjectData } from '../../storage/types'; jest.mock('fs'); +function createInMemoryStepGraphObjectDataUploaderCollector( + partial?: CreateQueuedStepGraphObjectDataUploaderParams, +) { + const graphObjectDataCollection: FlushedGraphObjectData[] = []; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: 5, + upload(graphObjectData) { + graphObjectDataCollection.push(graphObjectData); + return Promise.resolve(); + }, + ...partial, + }); + + return { + uploader, + graphObjectDataCollection, + }; +} + function getMockCreateStepJobStateParams( partial?: Partial, ): CreateStepJobStateParams { @@ -103,8 +134,133 @@ describe('#findEntity', () => { }); test('should return "null" if entity not found', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobStateState(); expect(await jobState.findEntity('invalid-entity-key')).toEqual(null); }); }); + +describe('upload callbacks', () => { + test('#addEntities should call uploader enqueue on flushed', async () => { + const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); + const jobState = createTestStepJobStateState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 2, + }), + uploader: uploadCollector.uploader, + }); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + + await jobState.addEntities([e1]); + await jobState.addEntities([e2]); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [e1, e2], + relationships: [], + }, + ]; + + expect(uploadCollector.graphObjectDataCollection).toEqual(expectedUploaded); + }); + + test('#addRelationships should call uploader enqueue on flushed', async () => { + const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); + const jobState = createTestStepJobStateState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 2, + }), + uploader: uploadCollector.uploader, + }); + + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + + await jobState.addRelationships([r1]); + await jobState.addRelationships([r2]); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [], + relationships: [r1, r2], + }, + ]; + + expect(uploadCollector.graphObjectDataCollection).toEqual(expectedUploaded); + }); + + test('#flush should call uploader enqueue for entities and relationships', async () => { + const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); + const jobState = createTestStepJobStateState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 5, + }), + uploader: uploadCollector.uploader, + }); + + const r1 = createTestRelationship(); + const e1 = createTestEntity(); + + await jobState.addRelationships([r1]); + await jobState.addEntities([e1]); + await jobState.flush(); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [e1], + relationships: [], + }, + { + entities: [], + relationships: [r1], + }, + ]; + + expect(uploadCollector.graphObjectDataCollection).toEqual(expectedUploaded); + }); + + test('#waitUntilUploadsComplete should resolve when all uploads completed', async () => { + const graphObjectDataCollection: FlushedGraphObjectData[] = []; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: 5, + async upload(graphObjectData) { + await sleep(200); + graphObjectDataCollection.push(graphObjectData); + return Promise.resolve(); + }, + }); + + const jobState = createTestStepJobStateState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 2, + }), + uploader, + }); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + + await jobState.addEntities([e1]); + await jobState.addEntities([e2]); + + if (!jobState.waitUntilUploadsComplete) { + throw new Error( + 'Default job state should have "waitUntilUploadsComplete" function', + ); + } + + await jobState.waitUntilUploadsComplete(); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [e1, e2], + relationships: [], + }, + ]; + + expect(graphObjectDataCollection).toEqual(expectedUploaded); + }); +}); From 47843b74baba3d3a7e4802d791f7af5a7792d41a Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 11:48:08 -0500 Subject: [PATCH 10/13] Fix test function names --- .../src/execution/__tests__/jobState.test.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts index 6395fd963..0d2a14d6d 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts @@ -134,7 +134,7 @@ describe('#findEntity', () => { }); test('should return "null" if entity not found', async () => { - const jobState = createTestStepJobStateState(); + const jobState = createTestStepJobState(); expect(await jobState.findEntity('invalid-entity-key')).toEqual(null); }); }); @@ -142,7 +142,7 @@ describe('#findEntity', () => { describe('upload callbacks', () => { test('#addEntities should call uploader enqueue on flushed', async () => { const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); - const jobState = createTestStepJobStateState({ + const jobState = createTestStepJobState({ graphObjectStore: new FileSystemGraphObjectStore({ graphObjectBufferThreshold: 2, }), @@ -167,7 +167,7 @@ describe('upload callbacks', () => { test('#addRelationships should call uploader enqueue on flushed', async () => { const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); - const jobState = createTestStepJobStateState({ + const jobState = createTestStepJobState({ graphObjectStore: new FileSystemGraphObjectStore({ graphObjectBufferThreshold: 2, }), @@ -192,7 +192,7 @@ describe('upload callbacks', () => { test('#flush should call uploader enqueue for entities and relationships', async () => { const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); - const jobState = createTestStepJobStateState({ + const jobState = createTestStepJobState({ graphObjectStore: new FileSystemGraphObjectStore({ graphObjectBufferThreshold: 5, }), @@ -233,7 +233,7 @@ describe('upload callbacks', () => { }, }); - const jobState = createTestStepJobStateState({ + const jobState = createTestStepJobState({ graphObjectStore: new FileSystemGraphObjectStore({ graphObjectBufferThreshold: 2, }), From ca68b8a6c35b14e0e6a4a88305f6ad49e9aa0b74 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 12:39:11 -0500 Subject: [PATCH 11/13] Test assertion improvements --- .../src/execution/__tests__/jobState.test.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts index 0d2a14d6d..29f716e4f 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts @@ -204,6 +204,8 @@ describe('upload callbacks', () => { await jobState.addRelationships([r1]); await jobState.addEntities([e1]); + expect(uploadCollector.graphObjectDataCollection).toEqual([]); + await jobState.flush(); const expectedUploaded: FlushedGraphObjectData[] = [ @@ -252,6 +254,7 @@ describe('upload callbacks', () => { ); } + expect(graphObjectDataCollection).toEqual([]); await jobState.waitUntilUploadsComplete(); const expectedUploaded: FlushedGraphObjectData[] = [ From 06b339460775a15a7161564f8076425bcfd0fb0c Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 08:47:32 -0500 Subject: [PATCH 12/13] Only write prettified files to the file system on local collection --- .../src/commands/collect.ts | 7 ++++ .../integration-sdk-cli/src/commands/run.ts | 6 ++++ .../visualization/generateVisualization.ts | 17 ++++++---- .../__tests__/dependencyGraph.test.ts | 2 +- .../integration-sdk-runtime/src/fileSystem.ts | 8 +++-- .../FileSystemGraphObjectStore.ts | 11 ++++++- .../__tests__/flushDataToDisk.test.ts | 32 +++++++++++++++++++ .../flushDataToDisk.ts | 3 ++ 8 files changed, 76 insertions(+), 10 deletions(-) diff --git a/packages/integration-sdk-cli/src/commands/collect.ts b/packages/integration-sdk-cli/src/commands/collect.ts index 509eb9ee3..a3cec8266 100644 --- a/packages/integration-sdk-cli/src/commands/collect.ts +++ b/packages/integration-sdk-cli/src/commands/collect.ts @@ -2,6 +2,7 @@ import { createCommand } from 'commander'; import { executeIntegrationLocally, + FileSystemGraphObjectStore, prepareLocalStepCollection, } from '@jupiterone/integration-sdk-runtime'; @@ -30,6 +31,11 @@ export function collect() { const enableSchemaValidation = !options.disableSchemaValidation; const config = prepareLocalStepCollection(await loadConfig(), options); log.info('\nConfiguration loaded! Running integration...\n'); + + const graphObjectStore = new FileSystemGraphObjectStore({ + prettyFile: true, + }); + const results = await executeIntegrationLocally( config, { @@ -39,6 +45,7 @@ export function collect() { }, { enableSchemaValidation, + graphObjectStore, }, ); log.displayExecutionResults(results); diff --git a/packages/integration-sdk-cli/src/commands/run.ts b/packages/integration-sdk-cli/src/commands/run.ts index b6def649a..9e30c9381 100644 --- a/packages/integration-sdk-cli/src/commands/run.ts +++ b/packages/integration-sdk-cli/src/commands/run.ts @@ -8,6 +8,7 @@ import { createIntegrationInstanceForLocalExecution, createIntegrationLogger, executeIntegrationInstance, + FileSystemGraphObjectStore, finalizeSynchronization, getAccountFromEnvironment, getApiBaseUrl, @@ -72,6 +73,10 @@ export function run() { const invocationConfig = await loadConfig(); + const graphObjectStore = new FileSystemGraphObjectStore({ + prettyFile: true, + }); + try { const executionResults = await executeIntegrationInstance( logger, @@ -84,6 +89,7 @@ export function run() { }, { enableSchemaValidation: true, + graphObjectStore, createStepGraphObjectDataUploader(stepId) { return createPersisterApiStepGraphObjectDataUploader({ stepId, diff --git a/packages/integration-sdk-cli/src/visualization/generateVisualization.ts b/packages/integration-sdk-cli/src/visualization/generateVisualization.ts index 1e134e38d..17b3ba48b 100644 --- a/packages/integration-sdk-cli/src/visualization/generateVisualization.ts +++ b/packages/integration-sdk-cli/src/visualization/generateVisualization.ts @@ -27,9 +27,11 @@ export async function generateVisualization( log.warn(`Unable to find any files under path: ${resolvedIntegrationPath}`); } - const { entities, relationships, mappedRelationships } = await retrieveIntegrationData( - entitiesAndRelationshipPaths, - ); + const { + entities, + relationships, + mappedRelationships, + } = await retrieveIntegrationData(entitiesAndRelationshipPaths); const nodeDataSets = entities.map((entity) => ({ id: getNodeIdFromEntity(entity, []), @@ -45,10 +47,10 @@ export async function generateVisualization( ); const { - mappedRelationshipEdges, + mappedRelationshipEdges, mappedRelationshipNodes, } = createMappedRelationshipNodesAndEdges({ - mappedRelationships, + mappedRelationships, explicitEntities: entities, }); @@ -56,7 +58,10 @@ export async function generateVisualization( await writeFileToPath({ path: htmlFileLocation, - content: generateVisHTML([...nodeDataSets, ...mappedRelationshipNodes], [...explicitEdgeDataSets, ...mappedRelationshipEdges]), + content: generateVisHTML( + [...nodeDataSets, ...mappedRelationshipNodes], + [...explicitEdgeDataSets, ...mappedRelationshipEdges], + ), }); return htmlFileLocation; diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts index b8efa66d7..9c11b2f22 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts @@ -429,7 +429,7 @@ describe('executeStepDependencyGraph', () => { // each step should have just generated one file const writtenData = await fs.readFile(`${directory}/${files[0]}`, 'utf8'); - expect(writtenData).toEqual(JSON.stringify({ [type]: data }, null, 2)); + expect(writtenData).toEqual(JSON.stringify({ [type]: data })); } }); diff --git a/packages/integration-sdk-runtime/src/fileSystem.ts b/packages/integration-sdk-runtime/src/fileSystem.ts index a7393b3c4..bb836d761 100644 --- a/packages/integration-sdk-runtime/src/fileSystem.ts +++ b/packages/integration-sdk-runtime/src/fileSystem.ts @@ -35,6 +35,7 @@ export function getRootStorageDirectorySize(): Promise { interface WriteDataToPathInput { path: string; data: object; + pretty?: boolean; } /** @@ -47,10 +48,13 @@ interface WriteDataToPathInput { export async function writeJsonToPath({ path: relativePath, data, + pretty = false, }: WriteDataToPathInput) { + const content = pretty ? JSON.stringify(data, null, 2) : JSON.stringify(data); + await writeFileToPath({ path: relativePath, - content: JSON.stringify(data, null, 2), + content, }); } @@ -191,7 +195,7 @@ export async function removeStorageDirectory() { } function removeDirectory(directory: string) { - return new Promise((resolve, reject) => + return new Promise((resolve, reject) => rimraf(directory, (err) => { if (err) { return reject(err); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 7c95a98a2..1c5ff6c10 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -33,19 +33,26 @@ export interface FileSystemGraphObjectStoreParams { * * Default: 500 */ - graphObjectBufferThreshold: number; + graphObjectBufferThreshold?: number; + + /** + * Whether the files that are written to disk should be minified or not + */ + prettyFile?: boolean; } export class FileSystemGraphObjectStore implements GraphObjectStore { private readonly semaphore: Sema; private readonly localGraphObjectStore = new InMemoryGraphObjectStore(); private readonly graphObjectBufferThreshold: number; + private readonly prettyFile: boolean; constructor(params?: FileSystemGraphObjectStoreParams) { this.semaphore = new Sema(BINARY_SEMAPHORE_CONCURRENCY); this.graphObjectBufferThreshold = params?.graphObjectBufferThreshold || DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD; + this.prettyFile = params?.prettyFile || false; } async addEntities( @@ -157,6 +164,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { storageDirectoryPath: stepId, collectionType: 'entities', data: entities, + pretty: this.prettyFile, }); this.localGraphObjectStore.flushEntities(entities); @@ -180,6 +188,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { storageDirectoryPath: stepId, collectionType: 'relationships', data: relationships, + pretty: this.prettyFile, }); this.localGraphObjectStore.flushRelationships(relationships); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts index 608dfec21..13743bd97 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts @@ -70,6 +70,38 @@ test('should group objects by "_type" and write them to separate files', async ( }); }); +test('should allow prettifying files', async () => { + const _type = uuid(); + const entities = times(2, () => createTestEntity({ _type })); + const storageDirectoryPath = uuid(); + + await flushDataToDisk({ + storageDirectoryPath, + collectionType: 'entities', + data: entities, + pretty: true, + }); + + const entitiesDirectory = path.join( + getRootStorageDirectory(), + 'graph', + storageDirectoryPath, + 'entities', + ); + + const entityFiles = await fs.readdir(entitiesDirectory); + expect(entityFiles).toHaveLength(1); + + const fileData = await fs.readFile( + path.join(entitiesDirectory, entityFiles[0]), + { + encoding: 'utf-8', + }, + ); + + expect(fileData).toEqual(JSON.stringify({ entities }, null, 2)); +}); + function randomizeOrder(things: T[]): T[] { return sortBy(things, () => Math.random()); } diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts index 6171f3a61..f729d2481 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts @@ -16,6 +16,7 @@ interface FlushDataToDiskInput { storageDirectoryPath: string; collectionType: CollectionType; data: Entity[] | Relationship[]; + pretty?: boolean; } /** @@ -27,6 +28,7 @@ export async function flushDataToDisk({ storageDirectoryPath, collectionType, data, + pretty, }: FlushDataToDiskInput) { // split the data by type first const groupedCollections = groupBy(data, '_type'); @@ -49,6 +51,7 @@ export async function flushDataToDisk({ data: { [collectionType]: collection, }, + pretty, }); await symlink({ From 55bb7b55f89838c84188c14305f2b45607269733 Mon Sep 17 00:00:00 2001 From: Austin Kelleher Date: Wed, 16 Dec 2020 12:41:41 -0500 Subject: [PATCH 13/13] Change prettyFile to prettifyFiles --- packages/integration-sdk-cli/src/commands/collect.ts | 2 +- packages/integration-sdk-cli/src/commands/run.ts | 2 +- .../FileSystemGraphObjectStore.ts | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/integration-sdk-cli/src/commands/collect.ts b/packages/integration-sdk-cli/src/commands/collect.ts index a3cec8266..a66e33395 100644 --- a/packages/integration-sdk-cli/src/commands/collect.ts +++ b/packages/integration-sdk-cli/src/commands/collect.ts @@ -33,7 +33,7 @@ export function collect() { log.info('\nConfiguration loaded! Running integration...\n'); const graphObjectStore = new FileSystemGraphObjectStore({ - prettyFile: true, + prettifyFiles: true, }); const results = await executeIntegrationLocally( diff --git a/packages/integration-sdk-cli/src/commands/run.ts b/packages/integration-sdk-cli/src/commands/run.ts index 9e30c9381..d81d9a320 100644 --- a/packages/integration-sdk-cli/src/commands/run.ts +++ b/packages/integration-sdk-cli/src/commands/run.ts @@ -74,7 +74,7 @@ export function run() { const invocationConfig = await loadConfig(); const graphObjectStore = new FileSystemGraphObjectStore({ - prettyFile: true, + prettifyFiles: true, }); try { diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 1c5ff6c10..0f10a18a9 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -38,21 +38,21 @@ export interface FileSystemGraphObjectStoreParams { /** * Whether the files that are written to disk should be minified or not */ - prettyFile?: boolean; + prettifyFiles?: boolean; } export class FileSystemGraphObjectStore implements GraphObjectStore { private readonly semaphore: Sema; private readonly localGraphObjectStore = new InMemoryGraphObjectStore(); private readonly graphObjectBufferThreshold: number; - private readonly prettyFile: boolean; + private readonly prettifyFiles: boolean; constructor(params?: FileSystemGraphObjectStoreParams) { this.semaphore = new Sema(BINARY_SEMAPHORE_CONCURRENCY); this.graphObjectBufferThreshold = params?.graphObjectBufferThreshold || DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD; - this.prettyFile = params?.prettyFile || false; + this.prettifyFiles = params?.prettifyFiles || false; } async addEntities( @@ -164,7 +164,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { storageDirectoryPath: stepId, collectionType: 'entities', data: entities, - pretty: this.prettyFile, + pretty: this.prettifyFiles, }); this.localGraphObjectStore.flushEntities(entities); @@ -188,7 +188,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { storageDirectoryPath: stepId, collectionType: 'relationships', data: relationships, - pretty: this.prettyFile, + pretty: this.prettifyFiles, }); this.localGraphObjectStore.flushRelationships(relationships);