diff --git a/packages/cli/package.json b/packages/cli/package.json index c4b6a104a..97576bc1d 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/cli", - "version": "5.1.0", + "version": "5.2.0", "description": "The JupiterOne cli", "main": "dist/src/index.js", "types": "dist/src/index.d.ts", @@ -24,8 +24,8 @@ "test": "jest" }, "dependencies": { - "@jupiterone/integration-sdk-core": "^5.1.0", - "@jupiterone/integration-sdk-runtime": "^5.1.0", + "@jupiterone/integration-sdk-core": "^5.2.0", + "@jupiterone/integration-sdk-runtime": "^5.2.0", "@lifeomic/attempt": "^3.0.0", "commander": "^5.0.0", "globby": "^11.0.1", diff --git a/packages/integration-sdk-cli/package.json b/packages/integration-sdk-cli/package.json index 516a2fd6f..d15f07e99 100644 --- a/packages/integration-sdk-cli/package.json +++ b/packages/integration-sdk-cli/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/integration-sdk-cli", - "version": "5.1.0", + "version": "5.2.0", "description": "The SDK for developing JupiterOne integrations", "main": "dist/src/index.js", "types": "dist/src/index.d.ts", @@ -22,7 +22,7 @@ "prepack": "yarn build:dist" }, "dependencies": { - "@jupiterone/integration-sdk-runtime": "^5.1.0", + "@jupiterone/integration-sdk-runtime": "^5.2.0", "commander": "^5.0.0", "globby": "^11.0.0", "lodash": "^4.17.19", @@ -31,7 +31,7 @@ "vis": "^4.21.0-EOL" }, "devDependencies": { - "@jupiterone/integration-sdk-private-test-utils": "^5.1.0", + "@jupiterone/integration-sdk-private-test-utils": "^5.2.0", "@pollyjs/adapter-node-http": "^4.0.4", "@pollyjs/core": "^4.0.4", "@pollyjs/persister-fs": "^4.0.4", diff --git a/packages/integration-sdk-cli/src/commands/collect.ts b/packages/integration-sdk-cli/src/commands/collect.ts index 509eb9ee3..a66e33395 100644 --- a/packages/integration-sdk-cli/src/commands/collect.ts +++ b/packages/integration-sdk-cli/src/commands/collect.ts @@ -2,6 +2,7 @@ import { createCommand } from 'commander'; import { executeIntegrationLocally, + FileSystemGraphObjectStore, prepareLocalStepCollection, } from '@jupiterone/integration-sdk-runtime'; @@ -30,6 +31,11 @@ export function collect() { const enableSchemaValidation = !options.disableSchemaValidation; const config = prepareLocalStepCollection(await loadConfig(), options); log.info('\nConfiguration loaded! Running integration...\n'); + + const graphObjectStore = new FileSystemGraphObjectStore({ + prettifyFiles: true, + }); + const results = await executeIntegrationLocally( config, { @@ -39,6 +45,7 @@ export function collect() { }, { enableSchemaValidation, + graphObjectStore, }, ); log.displayExecutionResults(results); diff --git a/packages/integration-sdk-cli/src/commands/run.ts b/packages/integration-sdk-cli/src/commands/run.ts index 9e985c066..d81d9a320 100644 --- a/packages/integration-sdk-cli/src/commands/run.ts +++ b/packages/integration-sdk-cli/src/commands/run.ts @@ -8,16 +8,19 @@ import { createIntegrationInstanceForLocalExecution, createIntegrationLogger, executeIntegrationInstance, + FileSystemGraphObjectStore, finalizeSynchronization, getAccountFromEnvironment, getApiBaseUrl, getApiKeyFromEnvironment, initiateSynchronization, - uploadCollectedData, } from '@jupiterone/integration-sdk-runtime'; import { loadConfig } from '../config'; import * as log from '../log'; +import { createPersisterApiStepGraphObjectDataUploader } from '@jupiterone/integration-sdk-runtime/dist/src/execution/uploader'; + +const DEFAULT_UPLOAD_CONCURRENCY = 5; export function run() { return createCommand('run') @@ -70,6 +73,10 @@ export function run() { const invocationConfig = await loadConfig(); + const graphObjectStore = new FileSystemGraphObjectStore({ + prettifyFiles: true, + }); + try { const executionResults = await executeIntegrationInstance( logger, @@ -82,6 +89,14 @@ export function run() { }, { enableSchemaValidation: true, + graphObjectStore, + createStepGraphObjectDataUploader(stepId) { + return createPersisterApiStepGraphObjectDataUploader({ + stepId, + synchronizationJobContext: synchronizationContext, + uploadConcurrency: DEFAULT_UPLOAD_CONCURRENCY, + }); + }, }, ); @@ -89,8 +104,6 @@ export function run() { log.displayExecutionResults(executionResults); - await uploadCollectedData(synchronizationContext); - const synchronizationResult = await finalizeSynchronization({ ...synchronizationContext, partialDatasets: executionResults.metadata.partialDatasets, diff --git a/packages/integration-sdk-cli/src/visualization/generateVisualization.ts b/packages/integration-sdk-cli/src/visualization/generateVisualization.ts index 1e134e38d..17b3ba48b 100644 --- a/packages/integration-sdk-cli/src/visualization/generateVisualization.ts +++ b/packages/integration-sdk-cli/src/visualization/generateVisualization.ts @@ -27,9 +27,11 @@ export async function generateVisualization( log.warn(`Unable to find any files under path: ${resolvedIntegrationPath}`); } - const { entities, relationships, mappedRelationships } = await retrieveIntegrationData( - entitiesAndRelationshipPaths, - ); + const { + entities, + relationships, + mappedRelationships, + } = await retrieveIntegrationData(entitiesAndRelationshipPaths); const nodeDataSets = entities.map((entity) => ({ id: getNodeIdFromEntity(entity, []), @@ -45,10 +47,10 @@ export async function generateVisualization( ); const { - mappedRelationshipEdges, + mappedRelationshipEdges, mappedRelationshipNodes, } = createMappedRelationshipNodesAndEdges({ - mappedRelationships, + mappedRelationships, explicitEntities: entities, }); @@ -56,7 +58,10 @@ export async function generateVisualization( await writeFileToPath({ path: htmlFileLocation, - content: generateVisHTML([...nodeDataSets, ...mappedRelationshipNodes], [...explicitEdgeDataSets, ...mappedRelationshipEdges]), + content: generateVisHTML( + [...nodeDataSets, ...mappedRelationshipNodes], + [...explicitEdgeDataSets, ...mappedRelationshipEdges], + ), }); return htmlFileLocation; diff --git a/packages/integration-sdk-core/package.json b/packages/integration-sdk-core/package.json index 8bd976a8f..c9285ae2e 100644 --- a/packages/integration-sdk-core/package.json +++ b/packages/integration-sdk-core/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/integration-sdk-core", - "version": "5.1.0", + "version": "5.2.0", "description": "The SDK for developing JupiterOne integrations", "main": "dist/src/index.js", "types": "dist/src/index.d.ts", diff --git a/packages/integration-sdk-core/src/types/jobState.ts b/packages/integration-sdk-core/src/types/jobState.ts index 54bae931e..f4e775efc 100644 --- a/packages/integration-sdk-core/src/types/jobState.ts +++ b/packages/integration-sdk-core/src/types/jobState.ts @@ -112,4 +112,10 @@ export interface JobState { * flushed to reduce memory consumption. */ flush: () => Promise; + + /** + * A job state may be created with a graph object uploader. This function + * resolves when all uploads have been completed. + */ + waitUntilUploadsComplete?: () => Promise; } diff --git a/packages/integration-sdk-core/src/types/storage.ts b/packages/integration-sdk-core/src/types/storage.ts index bd80f3626..2b37c38b6 100644 --- a/packages/integration-sdk-core/src/types/storage.ts +++ b/packages/integration-sdk-core/src/types/storage.ts @@ -11,11 +11,16 @@ import { Relationship } from './relationship'; * integration execution. */ export interface GraphObjectStore { - addEntities(stepId: string, newEntities: Entity[]): Promise; + addEntities( + stepId: string, + newEntities: Entity[], + onEntitiesFlushed?: (entities: Entity[]) => Promise, + ): Promise; addRelationships( stepId: string, newRelationships: Relationship[], + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, ): Promise; getEntity({ _key, _type }: GraphObjectLookupKey): Promise; @@ -30,5 +35,8 @@ export interface GraphObjectStore { iteratee: GraphObjectIteratee, ): Promise; - flush(): Promise; + flush( + onEntitiesFlushed?: (entities: Entity[]) => Promise, + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + ): Promise; } diff --git a/packages/integration-sdk-dev-tools/package.json b/packages/integration-sdk-dev-tools/package.json index 15f757f8a..4b8c6165b 100644 --- a/packages/integration-sdk-dev-tools/package.json +++ b/packages/integration-sdk-dev-tools/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/integration-sdk-dev-tools", - "version": "5.1.0", + "version": "5.2.0", "description": "A collection of developer tools that will assist with building integrations.", "repository": "git@github.com:JupiterOne/sdk.git", "author": "JupiterOne ", @@ -15,8 +15,8 @@ "access": "public" }, "dependencies": { - "@jupiterone/integration-sdk-cli": "^5.1.0", - "@jupiterone/integration-sdk-testing": "^5.1.0", + "@jupiterone/integration-sdk-cli": "^5.2.0", + "@jupiterone/integration-sdk-testing": "^5.2.0", "@types/jest": "^25.2.3", "@types/node": "^14.0.5", "@typescript-eslint/eslint-plugin": "^3.8.0", diff --git a/packages/integration-sdk-private-test-utils/package.json b/packages/integration-sdk-private-test-utils/package.json index 8ac9c592a..d134b1997 100644 --- a/packages/integration-sdk-private-test-utils/package.json +++ b/packages/integration-sdk-private-test-utils/package.json @@ -1,7 +1,7 @@ { "name": "@jupiterone/integration-sdk-private-test-utils", "private": true, - "version": "5.1.0", + "version": "5.2.0", "description": "The SDK for developing JupiterOne integrations", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -12,7 +12,7 @@ "node": "10.x || 12.x || 14.x" }, "dependencies": { - "@jupiterone/integration-sdk-core": "^5.1.0", + "@jupiterone/integration-sdk-core": "^5.2.0", "lodash": "^4.17.15", "uuid": "^7.0.3" }, diff --git a/packages/integration-sdk-private-test-utils/src/graphObject.ts b/packages/integration-sdk-private-test-utils/src/graphObject.ts new file mode 100644 index 000000000..5575c9e1d --- /dev/null +++ b/packages/integration-sdk-private-test-utils/src/graphObject.ts @@ -0,0 +1,26 @@ +import { Entity, ExplicitRelationship } from '@jupiterone/integration-sdk-core'; +import { v4 as uuid } from 'uuid'; + +export function createTestEntity(partial?: Partial): Entity { + return { + _key: uuid(), + _class: uuid(), + _type: uuid(), + [uuid()]: uuid(), + ...partial, + }; +} + +export function createTestRelationship( + partial?: Partial, +): ExplicitRelationship { + return { + _key: uuid(), + _toEntityKey: uuid(), + _fromEntityKey: uuid(), + _class: uuid(), + _type: uuid(), + [uuid()]: uuid(), + ...partial, + }; +} diff --git a/packages/integration-sdk-private-test-utils/src/index.ts b/packages/integration-sdk-private-test-utils/src/index.ts index a0959c6c8..8c2a76365 100644 --- a/packages/integration-sdk-private-test-utils/src/index.ts +++ b/packages/integration-sdk-private-test-utils/src/index.ts @@ -1,3 +1,5 @@ export * from './loadProjectStructure'; export * from './toUnixPath'; export * from './graphObjectStore'; +export * from './graphObject'; +export * from './util'; diff --git a/packages/integration-sdk-private-test-utils/src/util.ts b/packages/integration-sdk-private-test-utils/src/util.ts new file mode 100644 index 000000000..3e36c8125 --- /dev/null +++ b/packages/integration-sdk-private-test-utils/src/util.ts @@ -0,0 +1,5 @@ +export function sleep(ms: number) { + return new Promise((resolve) => { + setTimeout(() => resolve(), ms); + }); +} diff --git a/packages/integration-sdk-runtime/package.json b/packages/integration-sdk-runtime/package.json index 15efc5346..85eca9629 100644 --- a/packages/integration-sdk-runtime/package.json +++ b/packages/integration-sdk-runtime/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/integration-sdk-runtime", - "version": "5.1.0", + "version": "5.2.0", "description": "The SDK for developing JupiterOne integrations", "main": "dist/src/index.js", "types": "dist/src/index.d.ts", @@ -24,7 +24,7 @@ "prepack": "yarn build:dist" }, "dependencies": { - "@jupiterone/integration-sdk-core": "^5.1.0", + "@jupiterone/integration-sdk-core": "^5.2.0", "@lifeomic/alpha": "^1.1.3", "@lifeomic/attempt": "^3.0.0", "async-sema": "^3.1.0", @@ -43,7 +43,7 @@ "uuid": "^7.0.3" }, "devDependencies": { - "@jupiterone/integration-sdk-private-test-utils": "^5.1.0", + "@jupiterone/integration-sdk-private-test-utils": "^5.2.0", "@types/uuid": "^7.0.2", "get-port": "^5.1.1", "memfs": "^3.2.0", diff --git a/packages/integration-sdk-runtime/src/__tests__/index.test.ts b/packages/integration-sdk-runtime/src/__tests__/index.test.ts index 21707b0e8..c354c9b55 100644 --- a/packages/integration-sdk-runtime/src/__tests__/index.test.ts +++ b/packages/integration-sdk-runtime/src/__tests__/index.test.ts @@ -1,10 +1,6 @@ -import { BucketMap, FileSystemGraphObjectStore } from '../index'; +import { FileSystemGraphObjectStore } from '../index'; describe('#storage', () => { - test('should expose BucketMap', () => { - expect(BucketMap).not.toEqual(undefined); - }); - test('should expose FileSystemGraphObjectStore', () => { expect(FileSystemGraphObjectStore).not.toEqual(undefined); }); diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts index ce95f17bf..9c11b2f22 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/dependencyGraph.test.ts @@ -6,6 +6,7 @@ import waitForExpect from 'wait-for-expect'; import { Entity, + GraphObjectStore, IntegrationError, IntegrationExecutionContext, IntegrationInstance, @@ -30,6 +31,12 @@ import { import { LOCAL_INTEGRATION_INSTANCE } from '../instance'; import { DuplicateKeyTracker } from '../jobState'; import { getDefaultStepStartStates } from '../step'; +import { + CreateStepGraphObjectDataUploaderFunction, + StepGraphObjectDataUploader, +} from '../uploader'; +import { FlushedGraphObjectData } from '../../storage/types'; +import { createTestEntity } from '@jupiterone/integration-sdk-private-test-utils'; jest.mock('fs'); @@ -132,13 +139,16 @@ describe('executeStepDependencyGraph', () => { async function executeSteps( steps: IntegrationStep[], stepStartStates: StepStartStates = getDefaultStepStartStates(steps), + graphObjectStore: GraphObjectStore = new FileSystemGraphObjectStore(), + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction, ) { return executeStepDependencyGraph({ executionContext, inputGraph: buildStepDependencyGraph(steps), stepStartStates, duplicateKeyTracker: new DuplicateKeyTracker(), - graphObjectStore: new FileSystemGraphObjectStore(), + graphObjectStore, + createStepGraphObjectDataUploader, }); } @@ -419,24 +429,31 @@ describe('executeStepDependencyGraph', () => { // each step should have just generated one file const writtenData = await fs.readFile(`${directory}/${files[0]}`, 'utf8'); - expect(writtenData).toEqual(JSON.stringify({ [type]: data }, null, 2)); + expect(writtenData).toEqual(JSON.stringify({ [type]: data })); } }); - test('should perform a flush of the jobState after a step was executed', async () => { - let jobStateFlushSpy; - - await executeSteps([ + test('should perform a flush of the jobState after execution completed', async () => { + const steps: IntegrationStep[] = [ { id: 'a', name: 'a', entities: [], relationships: [], - executionHandler: ({ jobState }) => { - jobStateFlushSpy = jest.spyOn(jobState, 'flush'); + executionHandler: () => { + return Promise.resolve(); }, }, - ]); + ]; + + const graphObjectStore = new FileSystemGraphObjectStore(); + const jobStateFlushSpy = jest.spyOn(graphObjectStore, 'flush'); + + await executeSteps( + steps, + getDefaultStepStartStates(steps), + graphObjectStore, + ); expect(jobStateFlushSpy).toHaveBeenCalledTimes(1); }); @@ -613,6 +630,156 @@ describe('executeStepDependencyGraph', () => { expect(spyB).toHaveBeenCalledBefore(spyC); }); + test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { + const spyA = jest.fn(); + const spyB = jest.fn(); + const spyC = jest.fn(); + + const eA = createTestEntity(); + const eB = createTestEntity(); + const eC = createTestEntity(); + + const steps: IntegrationStep[] = [ + { + id: 'a', + name: 'a', + entities: [], + relationships: [], + async executionHandler({ jobState }) { + await jobState.addEntity(eA); + spyA(); + }, + }, + { + id: 'b', + name: 'b', + entities: [], + relationships: [], + dependsOn: ['a'], + async executionHandler({ jobState }) { + await jobState.addEntity(eB); + spyB(); + }, + }, + { + id: 'c', + name: 'c', + entities: [], + relationships: [], + dependsOn: ['b'], + async executionHandler({ jobState }) { + await jobState.addEntity(eC); + spyC(); + }, + }, + ]; + + const stepStartStates = getDefaultStepStartStates(steps); + const graphObjectStore = new FileSystemGraphObjectStore(); + + function createPassingUploader( + stepId: string, + collector: FlushedGraphObjectData[], + ): StepGraphObjectDataUploader { + return { + stepId, + async enqueue(graphObjectData) { + collector.push(graphObjectData); + return Promise.resolve(); + }, + waitUntilUploadsComplete() { + return Promise.resolve(); + }, + }; + } + + function createFailingUploader( + stepId: string, + ): StepGraphObjectDataUploader { + return { + stepId, + async enqueue() { + return Promise.resolve(); + }, + waitUntilUploadsComplete() { + return Promise.reject(new Error('expected upload wait failure')); + }, + }; + } + + const passingUploaderCollector: FlushedGraphObjectData[] = []; + + /** + * Graph: + * a - b - c + * + * In this situation, 'a' is the leaf node + * 'b' depends on 'a', + * 'c' depends on 'b' + */ + const result = await executeSteps( + steps, + stepStartStates, + graphObjectStore, + (stepId) => { + if (stepId === 'b') { + return createFailingUploader(stepId); + } else { + return createPassingUploader(stepId, passingUploaderCollector); + } + }, + ); + + const expectedCollected: FlushedGraphObjectData[] = [ + { + entities: [eA], + relationships: [], + }, + { + entities: [eC], + relationships: [], + }, + ]; + + expect(passingUploaderCollector).toEqual(expectedCollected); + + expect(result).toEqual([ + { + id: 'a', + name: 'a', + declaredTypes: [], + partialTypes: [], + encounteredTypes: [eA._type], + status: StepResultStatus.SUCCESS, + }, + { + id: 'b', + name: 'b', + declaredTypes: [], + partialTypes: [], + encounteredTypes: [eB._type], + dependsOn: ['a'], + status: StepResultStatus.FAILURE, + }, + { + id: 'c', + name: 'c', + declaredTypes: [], + partialTypes: [], + encounteredTypes: [eC._type], + dependsOn: ['b'], + status: StepResultStatus.PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE, + }, + ]); + + expect(spyA).toHaveBeenCalledTimes(1); + expect(spyB).toHaveBeenCalledTimes(1); + expect(spyC).toHaveBeenCalledTimes(1); + + expect(spyA).toHaveBeenCalledBefore(spyB); + expect(spyB).toHaveBeenCalledBefore(spyC); + }); + test('logs error after step fails', async () => { const error = new IntegrationError({ code: 'ABC-123', diff --git a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts index ac404f6bc..29f716e4f 100644 --- a/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts +++ b/packages/integration-sdk-runtime/src/execution/__tests__/jobState.test.ts @@ -8,35 +8,65 @@ import { import { v4 as uuid } from 'uuid'; import { FileSystemGraphObjectStore } from '../../storage'; import { vol } from 'memfs'; +import { Entity } from '@jupiterone/integration-sdk-core'; import { - Entity, - KeyNormalizationFunction, -} from '@jupiterone/integration-sdk-core'; + createTestEntity, + createTestRelationship, + sleep, +} from '@jupiterone/integration-sdk-private-test-utils'; +import { + createQueuedStepGraphObjectDataUploader, + CreateQueuedStepGraphObjectDataUploaderParams, +} from '../uploader'; +import { FlushedGraphObjectData } from '../../storage/types'; jest.mock('fs'); -function getMockCreateStepJobStateParams(options?: { - normalizeGraphObjectKey?: KeyNormalizationFunction; -}): CreateStepJobStateParams { +function createInMemoryStepGraphObjectDataUploaderCollector( + partial?: CreateQueuedStepGraphObjectDataUploaderParams, +) { + const graphObjectDataCollection: FlushedGraphObjectData[] = []; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: 5, + upload(graphObjectData) { + graphObjectDataCollection.push(graphObjectData); + return Promise.resolve(); + }, + ...partial, + }); + + return { + uploader, + graphObjectDataCollection, + }; +} + +function getMockCreateStepJobStateParams( + partial?: Partial, +): CreateStepJobStateParams { return { stepId: uuid(), graphObjectStore: new FileSystemGraphObjectStore(), - duplicateKeyTracker: new DuplicateKeyTracker( - options?.normalizeGraphObjectKey, - ), + duplicateKeyTracker: new DuplicateKeyTracker(), typeTracker: new TypeTracker(), dataStore: new MemoryDataStore(), + ...partial, }; } +function createTestStepJobState(params?: Partial) { + return createStepJobState(getMockCreateStepJobStateParams(params)); +} + describe('#createStepJobState', () => { afterEach(() => { vol.reset(); }); test('should allow creating job state and adding a single entity with "addEntity"', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobState(); const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -48,8 +78,7 @@ describe('#createStepJobState', () => { }); test('should allow creating job state and adding a multiple entities with "addEntities"', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobState(); const entities: Entity[] = [ { _type: 'a_entity', @@ -74,8 +103,7 @@ describe('#findEntity', () => { }); test('should find entity by _key', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobState(); const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -87,10 +115,12 @@ describe('#findEntity', () => { }); test('should find entity by _key with key normalization', async () => { - const params = getMockCreateStepJobStateParams({ - normalizeGraphObjectKey: (_key) => _key.toLowerCase(), + const jobState = createTestStepJobState({ + duplicateKeyTracker: new DuplicateKeyTracker((_key) => + _key.toLowerCase(), + ), }); - const jobState = createStepJobState(params); + const entity: Entity = { _type: 'a_entity', _class: 'A', @@ -104,8 +134,136 @@ describe('#findEntity', () => { }); test('should return "null" if entity not found', async () => { - const params = getMockCreateStepJobStateParams(); - const jobState = createStepJobState(params); + const jobState = createTestStepJobState(); expect(await jobState.findEntity('invalid-entity-key')).toEqual(null); }); }); + +describe('upload callbacks', () => { + test('#addEntities should call uploader enqueue on flushed', async () => { + const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); + const jobState = createTestStepJobState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 2, + }), + uploader: uploadCollector.uploader, + }); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + + await jobState.addEntities([e1]); + await jobState.addEntities([e2]); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [e1, e2], + relationships: [], + }, + ]; + + expect(uploadCollector.graphObjectDataCollection).toEqual(expectedUploaded); + }); + + test('#addRelationships should call uploader enqueue on flushed', async () => { + const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); + const jobState = createTestStepJobState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 2, + }), + uploader: uploadCollector.uploader, + }); + + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + + await jobState.addRelationships([r1]); + await jobState.addRelationships([r2]); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [], + relationships: [r1, r2], + }, + ]; + + expect(uploadCollector.graphObjectDataCollection).toEqual(expectedUploaded); + }); + + test('#flush should call uploader enqueue for entities and relationships', async () => { + const uploadCollector = createInMemoryStepGraphObjectDataUploaderCollector(); + const jobState = createTestStepJobState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 5, + }), + uploader: uploadCollector.uploader, + }); + + const r1 = createTestRelationship(); + const e1 = createTestEntity(); + + await jobState.addRelationships([r1]); + await jobState.addEntities([e1]); + expect(uploadCollector.graphObjectDataCollection).toEqual([]); + + await jobState.flush(); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [e1], + relationships: [], + }, + { + entities: [], + relationships: [r1], + }, + ]; + + expect(uploadCollector.graphObjectDataCollection).toEqual(expectedUploaded); + }); + + test('#waitUntilUploadsComplete should resolve when all uploads completed', async () => { + const graphObjectDataCollection: FlushedGraphObjectData[] = []; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: 5, + async upload(graphObjectData) { + await sleep(200); + graphObjectDataCollection.push(graphObjectData); + return Promise.resolve(); + }, + }); + + const jobState = createTestStepJobState({ + graphObjectStore: new FileSystemGraphObjectStore({ + graphObjectBufferThreshold: 2, + }), + uploader, + }); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + + await jobState.addEntities([e1]); + await jobState.addEntities([e2]); + + if (!jobState.waitUntilUploadsComplete) { + throw new Error( + 'Default job state should have "waitUntilUploadsComplete" function', + ); + } + + expect(graphObjectDataCollection).toEqual([]); + await jobState.waitUntilUploadsComplete(); + + const expectedUploaded: FlushedGraphObjectData[] = [ + { + entities: [e1, e2], + relationships: [], + }, + ]; + + expect(graphObjectDataCollection).toEqual(expectedUploaded); + }); +}); diff --git a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts index 69771a7cb..25e2aa6fa 100644 --- a/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts +++ b/packages/integration-sdk-runtime/src/execution/dependencyGraph.ts @@ -18,6 +18,10 @@ import { MemoryDataStore, TypeTracker, } from './jobState'; +import { + StepGraphObjectDataUploader, + CreateStepGraphObjectDataUploaderFunction, +} from './uploader'; /** * This function accepts a list of steps and constructs a dependency graph @@ -70,12 +74,14 @@ export function executeStepDependencyGraph< stepStartStates, duplicateKeyTracker, graphObjectStore, + createStepGraphObjectDataUploader, }: { executionContext: TExecutionContext; inputGraph: DepGraph>; stepStartStates: StepStartStates; duplicateKeyTracker: DuplicateKeyTracker; graphObjectStore: GraphObjectStore; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; }): Promise { // create a clone of the dependencyGraph because mutating // the input graph is icky @@ -248,6 +254,12 @@ export function executeStepDependencyGraph< const { id: stepId } = step; const typeTracker = new TypeTracker(); + let uploader: StepGraphObjectDataUploader | undefined; + + if (createStepGraphObjectDataUploader) { + uploader = createStepGraphObjectDataUploader(stepId); + } + const context = buildStepContext({ context: executionContext, duplicateKeyTracker, @@ -255,6 +267,7 @@ export function executeStepDependencyGraph< graphObjectStore, dataStore, stepId, + uploader, }); const { logger } = context; @@ -285,6 +298,19 @@ export function executeStepDependencyGraph< } await context.jobState.flush(); + + if (context.jobState.waitUntilUploadsComplete) { + try { + // Failing to upload all integration data should not be considered a + // fatal failure. We just want to make this step as a partial success + // and move on with our lives! + await context.jobState.waitUntilUploadsComplete(); + } catch (err) { + context.logger.stepFailure(step, err); + status = StepResultStatus.FAILURE; + } + } + updateStepResultStatus(stepId, status, typeTracker); enqueueLeafSteps(); } @@ -306,6 +332,7 @@ function buildStepContext({ typeTracker, graphObjectStore, dataStore, + uploader, }: { stepId: string; context: ExecutionContext; @@ -313,6 +340,7 @@ function buildStepContext({ typeTracker: TypeTracker; graphObjectStore: GraphObjectStore; dataStore: MemoryDataStore; + uploader?: StepGraphObjectDataUploader; }): TStepExecutionContext { const stepExecutionContext: StepExecutionContext = { ...context, @@ -325,6 +353,7 @@ function buildStepContext({ typeTracker, graphObjectStore, dataStore, + uploader, }), }; diff --git a/packages/integration-sdk-runtime/src/execution/executeIntegration.ts b/packages/integration-sdk-runtime/src/execution/executeIntegration.ts index a421fb29a..4dcb7b673 100644 --- a/packages/integration-sdk-runtime/src/execution/executeIntegration.ts +++ b/packages/integration-sdk-runtime/src/execution/executeIntegration.ts @@ -32,6 +32,7 @@ import { executeSteps, getDefaultStepStartStates, } from './step'; +import { CreateStepGraphObjectDataUploaderFunction } from './uploader'; import { validateStepStartStates } from './validation'; export interface ExecuteIntegrationResult { @@ -44,10 +45,12 @@ export interface ExecuteIntegrationResult { export interface ExecuteIntegrationOptions { enableSchemaValidation?: boolean; graphObjectStore?: GraphObjectStore; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; } -interface ExecuteWithContextOptions { +export interface ExecuteWithContextOptions { graphObjectStore?: GraphObjectStore; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; } const THIRTY_SECONDS_STORAGE_INTERVAL_MS = 60000 / 2; @@ -111,9 +114,7 @@ export async function executeIntegrationInstance( executionHistory, }, config, - { - graphObjectStore: options.graphObjectStore, - }, + options, ), }); } @@ -203,7 +204,10 @@ export async function executeWithContext< }); }, THIRTY_SECONDS_STORAGE_INTERVAL_MS); - const { graphObjectStore = new FileSystemGraphObjectStore() } = options; + const { + graphObjectStore = new FileSystemGraphObjectStore(), + createStepGraphObjectDataUploader, + } = options; const integrationStepResults = await executeSteps({ executionContext: context, @@ -213,6 +217,7 @@ export async function executeWithContext< config.normalizeGraphObjectKey, ), graphObjectStore, + createStepGraphObjectDataUploader, }); const partialDatasets = determinePartialDatasetsFromStepExecutionResults( diff --git a/packages/integration-sdk-runtime/src/execution/index.ts b/packages/integration-sdk-runtime/src/execution/index.ts index f9439b107..0ccc2a359 100644 --- a/packages/integration-sdk-runtime/src/execution/index.ts +++ b/packages/integration-sdk-runtime/src/execution/index.ts @@ -7,3 +7,11 @@ export { prepareLocalStepCollection } from './step'; export { loadConfigFromEnvironmentVariables } from './config'; export { DuplicateKeyTracker, TypeTracker, MemoryDataStore } from './jobState'; export { buildStepDependencyGraph } from './dependencyGraph'; +export { + StepGraphObjectDataUploader, + CreateStepGraphObjectDataUploaderFunction, + CreateQueuedStepGraphObjectDataUploaderParams, + createQueuedStepGraphObjectDataUploader, + CreatePersisterApiStepGraphObjectDataUploaderParams, + createPersisterApiStepGraphObjectDataUploader, +} from './uploader'; diff --git a/packages/integration-sdk-runtime/src/execution/jobState.ts b/packages/integration-sdk-runtime/src/execution/jobState.ts index 148ce8f28..1aed5ee0a 100644 --- a/packages/integration-sdk-runtime/src/execution/jobState.ts +++ b/packages/integration-sdk-runtime/src/execution/jobState.ts @@ -7,6 +7,7 @@ import { } from '@jupiterone/integration-sdk-core'; import { GraphObjectStore } from '../storage'; +import { StepGraphObjectDataUploader } from './uploader'; export interface DuplicateKeyTrackerGraphObjectMetadata { _type: string; @@ -78,6 +79,7 @@ export interface CreateStepJobStateParams { typeTracker: TypeTracker; graphObjectStore: GraphObjectStore; dataStore: MemoryDataStore; + uploader?: StepGraphObjectDataUploader; } export function createStepJobState({ @@ -86,6 +88,7 @@ export function createStepJobState({ typeTracker, graphObjectStore, dataStore, + uploader, }: CreateStepJobStateParams): JobState { const addEntities = async (entities: Entity[]): Promise => { entities.forEach((e) => { @@ -97,7 +100,12 @@ export function createStepJobState({ typeTracker.registerType(e._type); }); - await graphObjectStore.addEntities(stepId, entities); + await graphObjectStore.addEntities(stepId, entities, async (entities) => + uploader?.enqueue({ + entities, + relationships: [], + }), + ); return entities; }; @@ -111,7 +119,15 @@ export function createStepJobState({ typeTracker.registerType(r._type as string); }); - return graphObjectStore.addRelationships(stepId, relationships); + return graphObjectStore.addRelationships( + stepId, + relationships, + async (relationships) => + uploader?.enqueue({ + entities: [], + relationships, + }), + ); }; return { @@ -158,6 +174,22 @@ export function createStepJobState({ iterateRelationships: (filter, iteratee) => graphObjectStore.iterateRelationships(filter, iteratee), - flush: () => graphObjectStore.flush(), + flush: () => + graphObjectStore.flush( + async (entities) => + uploader?.enqueue({ + entities, + relationships: [], + }), + async (relationships) => + uploader?.enqueue({ + entities: [], + relationships, + }), + ), + + async waitUntilUploadsComplete() { + await uploader?.waitUntilUploadsComplete(); + }, }; } diff --git a/packages/integration-sdk-runtime/src/execution/step.ts b/packages/integration-sdk-runtime/src/execution/step.ts index d7c3c622b..e00aca256 100644 --- a/packages/integration-sdk-runtime/src/execution/step.ts +++ b/packages/integration-sdk-runtime/src/execution/step.ts @@ -17,6 +17,7 @@ import { executeStepDependencyGraph, } from './dependencyGraph'; import { DuplicateKeyTracker } from './jobState'; +import { CreateStepGraphObjectDataUploaderFunction } from './uploader'; export async function executeSteps< TExecutionContext extends ExecutionContext, @@ -27,12 +28,14 @@ export async function executeSteps< stepStartStates, duplicateKeyTracker, graphObjectStore, + createStepGraphObjectDataUploader, }: { executionContext: TExecutionContext; integrationSteps: Step[]; stepStartStates: StepStartStates; duplicateKeyTracker: DuplicateKeyTracker; graphObjectStore: GraphObjectStore; + createStepGraphObjectDataUploader?: CreateStepGraphObjectDataUploaderFunction; }): Promise { return executeStepDependencyGraph({ executionContext, @@ -40,6 +43,7 @@ export async function executeSteps< stepStartStates, duplicateKeyTracker, graphObjectStore, + createStepGraphObjectDataUploader, }); } diff --git a/packages/integration-sdk-runtime/src/execution/uploader.test.ts b/packages/integration-sdk-runtime/src/execution/uploader.test.ts new file mode 100644 index 000000000..e266d34a6 --- /dev/null +++ b/packages/integration-sdk-runtime/src/execution/uploader.test.ts @@ -0,0 +1,182 @@ +import { FlushedGraphObjectData } from '../storage/types'; +import { + createPersisterApiStepGraphObjectDataUploader, + createQueuedStepGraphObjectDataUploader, + StepGraphObjectDataUploader, +} from './uploader'; +import { + sleep, + createTestEntity, + createTestRelationship, +} from '@jupiterone/integration-sdk-private-test-utils'; +import times from 'lodash/times'; +import { v4 as uuid } from 'uuid'; +import { SynchronizationJobContext } from '../synchronization'; +import { createApiClient, getApiBaseUrl } from '../api'; +import { generateSynchronizationJob } from '../synchronization/__tests__/util/generateSynchronizationJob'; +import { createMockIntegrationLogger } from '../../test/util/fixtures'; + +function createFlushedGraphObjectData(): FlushedGraphObjectData { + return { + entities: [createTestEntity(), createTestEntity()], + relationships: [createTestRelationship(), createTestRelationship()], + }; +} + +async function createAndEnqueueUploads( + uploader: StepGraphObjectDataUploader, + n: number, +) { + const flushed = times(n, createFlushedGraphObjectData); + + for (const data of flushed) { + await uploader.enqueue(data); + } + + return flushed; +} + +async function createFlushedDataAndWaitForUploads( + uploader: StepGraphObjectDataUploader, + n: number, +) { + const flushed = await createAndEnqueueUploads(uploader, n); + await uploader.waitUntilUploadsComplete(); + return flushed; +} + +describe('#createQueuedStepGraphObjectDataUploader', () => { + test('should wait for all enqueued uploads to complete', async () => { + const uploaded: FlushedGraphObjectData[] = []; + let numQueued = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: Infinity, + async upload(d) { + if (numQueued) { + await sleep(100); + uploaded.push(d); + } else { + numQueued++; + await sleep(200); + uploaded.push(d); + } + + numQueued++; + }, + }); + + const flushed = await createFlushedDataAndWaitForUploads(uploader, 2); + expect(uploaded).toEqual(flushed.reverse()); + }); + + test('should throttle enqueuing if concurrency threshold hit', async () => { + const uploaded: FlushedGraphObjectData[] = []; + let throttleCount = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId: uuid(), + uploadConcurrency: 2, + async upload(d) { + await sleep(300); + uploaded.push(d); + }, + onThrottleEnqueue() { + throttleCount++; + }, + }); + + const flushed = await createFlushedDataAndWaitForUploads(uploader, 6); + expect(uploaded).toEqual(flushed); + expect(throttleCount).toEqual(1); + }); + + test('should allow enqueue after a failure', async () => { + const uploaded: FlushedGraphObjectData[] = []; + const stepId = uuid(); + const expectedErrorMessage = `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=Error: expected upload error)`; + + let numQueued = 0; + + const uploader = createQueuedStepGraphObjectDataUploader({ + stepId, + uploadConcurrency: 2, + async upload(d) { + numQueued++; + + if (numQueued === 2) { + await sleep(100); + throw new Error('expected upload error'); + } else { + await sleep(200); + uploaded.push(d); + } + }, + }); + + const flushed = await createAndEnqueueUploads(uploader, 3); + + // Ensure that the next enqueue happens _after_ a failure has occurred. + await sleep(300); + const flushedAfterFailure = createFlushedGraphObjectData(); + await uploader.enqueue(flushedAfterFailure); + + await expect(uploader.waitUntilUploadsComplete()).rejects.toThrowError( + expectedErrorMessage, + ); + + // This should _not_ be actually processed by our queue. After the + // `waitUntilUploadsComplete` promise has settled, we do not allow additional + // tasks to be added to the queue. + const flushedAfterCompleted = createFlushedGraphObjectData(); + await uploader.enqueue(flushedAfterCompleted); + + expect(uploaded).toEqual([flushed[0], flushed[2], flushedAfterFailure]); + }); +}); + +describe('#createPersisterApiStepGraphObjectDataUploader', () => { + test('should upload to persister API', async () => { + const accountId = uuid(); + + const apiClient = createApiClient({ + apiBaseUrl: getApiBaseUrl(), + account: accountId, + }); + + const postSpy = jest.spyOn(apiClient, 'post').mockResolvedValue({}); + + const job = generateSynchronizationJob(); + const synchronizationJobContext: SynchronizationJobContext = { + logger: createMockIntegrationLogger(), + apiClient, + job, + }; + + const uploader = createPersisterApiStepGraphObjectDataUploader({ + synchronizationJobContext, + stepId: uuid(), + uploadConcurrency: 2, + }); + + const flushed = await createFlushedDataAndWaitForUploads(uploader, 3); + expect(postSpy).toHaveBeenCalledTimes(6); + + for (const { entities, relationships } of flushed) { + expect(postSpy).toHaveBeenCalledWith( + `/persister/synchronization/jobs/${job.id}/entities`, + { + entities, + }, + ); + + expect(postSpy).toHaveBeenCalledWith( + `/persister/synchronization/jobs/${job.id}/relationships`, + { + relationships, + }, + ); + } + }); +}); diff --git a/packages/integration-sdk-runtime/src/execution/uploader.ts b/packages/integration-sdk-runtime/src/execution/uploader.ts new file mode 100644 index 000000000..cbbac5f6e --- /dev/null +++ b/packages/integration-sdk-runtime/src/execution/uploader.ts @@ -0,0 +1,126 @@ +import { IntegrationError } from '@jupiterone/integration-sdk-core'; +import PQueue from 'p-queue/dist'; +import { FlushedGraphObjectData } from '../storage/types'; +import { + uploadGraphObjectData, + SynchronizationJobContext, +} from '../synchronization'; + +export interface StepGraphObjectDataUploader { + stepId: string; + enqueue: (graphObjectData: FlushedGraphObjectData) => Promise; + waitUntilUploadsComplete: () => Promise; +} + +export type CreateStepGraphObjectDataUploaderFunction = ( + stepId: string, +) => StepGraphObjectDataUploader; + +export interface CreateQueuedStepGraphObjectDataUploaderParams { + stepId: string; + uploadConcurrency: number; + upload: (graphObjectData: FlushedGraphObjectData) => Promise; + onThrottleEnqueue?: () => void; +} + +export function createQueuedStepGraphObjectDataUploader({ + stepId, + uploadConcurrency: maximumQueueSize, + upload, + onThrottleEnqueue, +}: CreateQueuedStepGraphObjectDataUploaderParams): StepGraphObjectDataUploader { + const queue = new PQueue({ + concurrency: maximumQueueSize, + }); + + let completed = false; + const uploadErrors: Error[] = []; + + return { + stepId, + async enqueue(graphObjectData) { + if (completed) { + // This step has already called ran `waitUntilUploadsComplete`, so we + // do not want to allow any additional enqueuing. + return; + } + + // OPTIMIZATION: We do not want to buffer a lot of graph objects + // into memory inside of the queue. If the queue concurrency has been + // reached, we wait for the queue to become `empty` so that this step has + // the opportunity to upload more data. + // + // NOTE: Internally, `p-queue` will respect concurrency and _not_ kick off + // a task until there is bandwidth to do so. `onEmpty` is a better choice + // than `onIdle` in this case because we really don't care whether all + // promises have settled, we only care that additional work is not being + // created and subsequently kicked off. + if (queue.size >= maximumQueueSize) { + if (onThrottleEnqueue) { + // Mainly just used for testing that our custom throttling works. + onThrottleEnqueue(); + } + + await queue.onEmpty(); + } + + queue + .add(() => upload(graphObjectData)) + .catch((err) => { + // Do not pause the queue entirely. We will try to prevent additional + // tasks from being added to the queue, but even if an error occurs, + // we should try uploading the remaining data that we have queued up. + // The JupiterOne synchronization should be resilient enough to handle + // cases where this could cause an issue (e.g. a relationship getting + // uploaded that references an entity that failed to upload). + uploadErrors.push(err); + }); + }, + + async waitUntilUploadsComplete() { + try { + // Even if our `uploadErrors.length > 0` right now, we let the remaining + // promises that were added to the queue settle before rethrowing to + // maximize the amount of data that we are capable of actually uploading. + await queue.onIdle(); + } finally { + // Wait until the entire queue has settled to mark as completed. During + // this time, we could be receiving additional tasks in our queue that + // will grow the queue. + completed = true; + } + + if (uploadErrors.length) { + throw new IntegrationError({ + code: 'UPLOAD_ERROR', + message: `Error(s) uploading graph object data (stepId=${stepId}, errorMessages=${uploadErrors.join( + ',', + )})`, + // Just include the first error cause. We should be able to gather + // additional information from the joined error messages. + cause: uploadErrors[0], + }); + } + }, + }; +} + +export interface CreatePersisterApiStepGraphObjectDataUploaderParams { + stepId: string; + synchronizationJobContext: SynchronizationJobContext; + uploadConcurrency: number; +} + +export function createPersisterApiStepGraphObjectDataUploader({ + stepId, + synchronizationJobContext, + uploadConcurrency, +}: CreatePersisterApiStepGraphObjectDataUploaderParams) { + return createQueuedStepGraphObjectDataUploader({ + stepId, + uploadConcurrency, + upload(graphObjectData) { + return uploadGraphObjectData(synchronizationJobContext, graphObjectData); + }, + }); +} diff --git a/packages/integration-sdk-runtime/src/fileSystem.ts b/packages/integration-sdk-runtime/src/fileSystem.ts index a7393b3c4..bb836d761 100644 --- a/packages/integration-sdk-runtime/src/fileSystem.ts +++ b/packages/integration-sdk-runtime/src/fileSystem.ts @@ -35,6 +35,7 @@ export function getRootStorageDirectorySize(): Promise { interface WriteDataToPathInput { path: string; data: object; + pretty?: boolean; } /** @@ -47,10 +48,13 @@ interface WriteDataToPathInput { export async function writeJsonToPath({ path: relativePath, data, + pretty = false, }: WriteDataToPathInput) { + const content = pretty ? JSON.stringify(data, null, 2) : JSON.stringify(data); + await writeFileToPath({ path: relativePath, - content: JSON.stringify(data, null, 2), + content, }); } @@ -191,7 +195,7 @@ export async function removeStorageDirectory() { } function removeDirectory(directory: string) { - return new Promise((resolve, reject) => + return new Promise((resolve, reject) => rimraf(directory, (err) => { if (err) { return reject(err); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/BucketMap.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/BucketMap.ts deleted file mode 100644 index c2c4a0b2d..000000000 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/BucketMap.ts +++ /dev/null @@ -1,35 +0,0 @@ -/** - * An extension of a regular JS Map that for dealing - * with arrays. - * - * This supports appending values to an existing key - * and also tracks the total count of objects stored. - */ -export class BucketMap extends Map { - private internalTotalItemCount = 0; - - get totalItemCount() { - return this.internalTotalItemCount; - } - - add(key: string, values: T[]) { - const existingValues = this.get(key); - this.set(key, [...(existingValues ?? []), ...values]); - } - - set(key: string, values: T[]) { - const existingItemCount = this.get(key)?.length ?? 0; - super.set(key, values); - this.internalTotalItemCount += values.length - existingItemCount; - return this; - } - - delete(key: string) { - const existingItemCount = this.get(key)?.length ?? 0; - const deleteResult = super.delete(key); - - this.internalTotalItemCount -= existingItemCount; - - return deleteResult; - } -} diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts index 1d7254967..0f10a18a9 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/FileSystemGraphObjectStore.ts @@ -12,54 +12,90 @@ import { GraphObjectStore, } from '@jupiterone/integration-sdk-core'; -import { BucketMap } from './BucketMap'; import { flushDataToDisk } from './flushDataToDisk'; import { iterateEntityTypeIndex, iterateRelationshipTypeIndex, } from './indices'; +import { InMemoryGraphObjectStore } from '../memory'; -export const GRAPH_OBJECT_BUFFER_THRESHOLD = 500; // arbitrarily selected, subject to tuning +export const DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD = 500; // it is important that this value is set to 1 // to ensure that only one operation can be performed at a time. const BINARY_SEMAPHORE_CONCURRENCY = 1; +export interface FileSystemGraphObjectStoreParams { + /** + * The maximum number of graph objects that this store can buffer in memory + * before writing to disk. Machines with more memory should consider bumping + * this value up. + * + * Default: 500 + */ + graphObjectBufferThreshold?: number; + + /** + * Whether the files that are written to disk should be minified or not + */ + prettifyFiles?: boolean; +} + export class FileSystemGraphObjectStore implements GraphObjectStore { - semaphore: Sema; - entityStorageMap: BucketMap; - relationshipStorageMap: BucketMap; + private readonly semaphore: Sema; + private readonly localGraphObjectStore = new InMemoryGraphObjectStore(); + private readonly graphObjectBufferThreshold: number; + private readonly prettifyFiles: boolean; - constructor() { - this.entityStorageMap = new BucketMap(); - this.relationshipStorageMap = new BucketMap(); + constructor(params?: FileSystemGraphObjectStoreParams) { this.semaphore = new Sema(BINARY_SEMAPHORE_CONCURRENCY); + this.graphObjectBufferThreshold = + params?.graphObjectBufferThreshold || + DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD; + this.prettifyFiles = params?.prettifyFiles || false; } - async addEntities(storageDirectoryPath: string, newEntities: Entity[]) { - this.entityStorageMap.add(storageDirectoryPath, newEntities); + async addEntities( + storageDirectoryPath: string, + newEntities: Entity[], + onEntitiesFlushed?: (entities: Entity[]) => Promise, + ) { + this.localGraphObjectStore.addEntities(storageDirectoryPath, newEntities); - if (this.entityStorageMap.totalItemCount >= GRAPH_OBJECT_BUFFER_THRESHOLD) { - await this.flushEntitiesToDisk(); + if ( + this.localGraphObjectStore.getTotalEntityItemCount() >= + this.graphObjectBufferThreshold + ) { + await this.flushEntitiesToDisk(onEntitiesFlushed); } } async addRelationships( storageDirectoryPath: string, newRelationships: Relationship[], + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, ) { - this.relationshipStorageMap.add(storageDirectoryPath, newRelationships); + this.localGraphObjectStore.addRelationships( + storageDirectoryPath, + newRelationships, + ); if ( - this.relationshipStorageMap.totalItemCount >= - GRAPH_OBJECT_BUFFER_THRESHOLD + this.localGraphObjectStore.getTotalRelationshipItemCount() >= + this.graphObjectBufferThreshold ) { - await this.flushRelationshipsToDisk(); + await this.flushRelationshipsToDisk(onRelationshipsFlushed); } } async getEntity({ _key, _type }: GraphObjectLookupKey): Promise { - await this.flushEntitiesToDisk(); + const bufferedEntity = this.localGraphObjectStore.findEntity(_key); + + if (bufferedEntity) { + // This entity has not yet been flushed to disk + return bufferedEntity; + } + const entities: Entity[] = []; await this.iterateEntities({ _type }, async (e) => { @@ -87,7 +123,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { filter: GraphObjectFilter, iteratee: GraphObjectIteratee, ) { - await this.flushEntitiesToDisk(); + await this.localGraphObjectStore.iterateEntities(filter, iteratee); await iterateEntityTypeIndex({ type: filter._type, @@ -99,7 +135,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { filter: GraphObjectFilter, iteratee: GraphObjectIteratee, ) { - await this.flushRelationshipsToDisk(); + await this.localGraphObjectStore.iterateRelationships(filter, iteratee); await iterateRelationshipTypeIndex({ type: filter._type, @@ -107,41 +143,61 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { }); } - async flush() { + async flush( + onEntitiesFlushed?: (entities: Entity[]) => Promise, + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + ) { await Promise.all([ - this.flushEntitiesToDisk(), - this.flushRelationshipsToDisk(), + this.flushEntitiesToDisk(onEntitiesFlushed), + this.flushRelationshipsToDisk(onRelationshipsFlushed), ]); } - async flushEntitiesToDisk() { + async flushEntitiesToDisk( + onEntitiesFlushed?: (entities: Entity[]) => Promise, + ) { await this.lockOperation(() => - pMap([...this.entityStorageMap.keys()], (storageDirectoryPath) => { - const entities = this.entityStorageMap.get(storageDirectoryPath) ?? []; - this.entityStorageMap.delete(storageDirectoryPath); - - return flushDataToDisk({ - storageDirectoryPath, - collectionType: 'entities', - data: entities, - }); - }), + pMap( + this.localGraphObjectStore.collectEntitiesByStep(), + async ([stepId, entities]) => { + await flushDataToDisk({ + storageDirectoryPath: stepId, + collectionType: 'entities', + data: entities, + pretty: this.prettifyFiles, + }); + + this.localGraphObjectStore.flushEntities(entities); + + if (onEntitiesFlushed) { + await onEntitiesFlushed(entities); + } + }, + ), ); } - async flushRelationshipsToDisk() { + async flushRelationshipsToDisk( + onRelationshipsFlushed?: (relationships: Relationship[]) => Promise, + ) { await this.lockOperation(() => - pMap([...this.relationshipStorageMap.keys()], (storageDirectoryPath) => { - const relationships = - this.relationshipStorageMap.get(storageDirectoryPath) ?? []; - this.relationshipStorageMap.delete(storageDirectoryPath); - - return flushDataToDisk({ - storageDirectoryPath, - collectionType: 'relationships', - data: relationships, - }); - }), + pMap( + this.localGraphObjectStore.collectRelationshipsByStep(), + async ([stepId, relationships]) => { + await flushDataToDisk({ + storageDirectoryPath: stepId, + collectionType: 'relationships', + data: relationships, + pretty: this.prettifyFiles, + }); + + this.localGraphObjectStore.flushRelationships(relationships); + + if (onRelationshipsFlushed) { + await onRelationshipsFlushed(relationships); + } + }, + ), ); } @@ -166,7 +222,7 @@ export class FileSystemGraphObjectStore implements GraphObjectStore { * and prematurely end, causing the next step to start up before the * data it depends on is present on disk. */ - async lockOperation(operation: () => Promise) { + private async lockOperation(operation: () => Promise) { await this.semaphore.acquire(); try { await operation(); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/BucketMap.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/BucketMap.test.ts deleted file mode 100644 index 7b00f9c7c..000000000 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/BucketMap.test.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { BucketMap } from '../BucketMap'; - -describe('add', () => { - test('sets key with values if entry does not exist', () => { - const bucketMap = new BucketMap(); - bucketMap.add('test', [1, 2, 3]); - - expect(bucketMap.get('test')).toEqual([1, 2, 3]); - }); - - test('appends values to existing key', () => { - const bucketMap = new BucketMap(); - bucketMap.set('test', [1, 2, 3]); - - bucketMap.add('test', [4, 5, 6]); - expect(bucketMap.get('test')).toEqual([1, 2, 3, 4, 5, 6]); - }); - - test('updates total item count as objects are added', () => { - const bucketMap = new BucketMap(); - bucketMap.set('samekey', [1, 2, 3]); - expect(bucketMap.totalItemCount).toEqual(3); - - bucketMap.add('samekey', [4, 5, 6]); - expect(bucketMap.totalItemCount).toEqual(6); - - bucketMap.add('differentkey', [4, 5, 6]); - expect(bucketMap.totalItemCount).toEqual(9); - }); -}); - -describe('set', () => { - test('updates count of total count items for new keys', () => { - const bucketMap = new BucketMap(); - bucketMap.set('test', [1, 2, 3]); - expect(bucketMap.totalItemCount).toEqual(3); - - bucketMap.set('test2', [1, 2, 3]); - expect(bucketMap.totalItemCount).toEqual(6); - }); - - test('maintains accurate count when keys are replaced', () => { - const bucketMap = new BucketMap(); - bucketMap.set('samekey', [1, 2, 3]); - expect(bucketMap.totalItemCount).toEqual(3); - - bucketMap.set('samekey', [4, 5, 6]); - expect(bucketMap.totalItemCount).toEqual(3); - - bucketMap.set('samekey', [1]); - expect(bucketMap.totalItemCount).toEqual(1); - - bucketMap.set('samekey', [4, 3]); - expect(bucketMap.totalItemCount).toEqual(2); - }); -}); - -describe('delete', () => { - test('decreases count of items stored in key from total count', () => { - const bucketMap = new BucketMap(); - bucketMap.set('test', [1, 2, 3]); - expect(bucketMap.totalItemCount).toEqual(3); - - bucketMap.delete('test'); - expect(bucketMap.totalItemCount).toEqual(0); - }); -}); diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts index bf1d088e4..c1bf3072f 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/FileSystemGraphObjectStore.test.ts @@ -9,10 +9,14 @@ import { getRootStorageDirectory } from '../../../fileSystem'; import { FileSystemGraphObjectStore, - GRAPH_OBJECT_BUFFER_THRESHOLD, + DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD, + FileSystemGraphObjectStoreParams, } from '../FileSystemGraphObjectStore'; -import { generateEntity, generateRelationship } from './util/graphObjects'; +import { + createTestEntity, + createTestRelationship, +} from '@jupiterone/integration-sdk-private-test-utils'; import { Entity, @@ -32,7 +36,7 @@ describe('flushEntitiesToDisk', () => { test('should write entities to the graph directory and symlink files to the index directory', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const entityType = uuid(); - const entities = times(25, () => generateEntity({ _type: entityType })); + const entities = times(25, () => createTestEntity({ _type: entityType })); await store.addEntities(storageDirectoryPath, entities); await store.flushEntitiesToDisk(); @@ -74,7 +78,7 @@ describe('flushRelationshipsToDisk', () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const relationshipType = uuid(); const relationships = times(25, () => - generateRelationship({ _type: relationshipType }), + createTestRelationship({ _type: relationshipType }), ); await store.addRelationships(storageDirectoryPath, relationships); @@ -117,9 +121,9 @@ describe('flushRelationshipsToDisk', () => { describe('flush', () => { test('should flush both entities and relationships to disk', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); - await store.addEntities(storageDirectoryPath, [generateEntity()]); + await store.addEntities(storageDirectoryPath, [createTestEntity()]); await store.addRelationships(storageDirectoryPath, [ - generateRelationship(), + createTestRelationship(), ]); const flushEntitiesSpy = jest.spyOn(store, 'flushEntitiesToDisk'); @@ -135,8 +139,8 @@ describe('flush', () => { describe('addEntities', () => { test('should automatically flush entities to disk after hitting a certain threshold', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); - const entities = times(GRAPH_OBJECT_BUFFER_THRESHOLD - 1, () => - generateEntity(), + const entities = times(DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD - 1, () => + createTestEntity(), ); const flushEntitiesSpy = jest.spyOn(store, 'flushEntitiesToDisk'); @@ -146,7 +150,7 @@ describe('addEntities', () => { expect(flushEntitiesSpy).toHaveBeenCalledTimes(0); // adding an additional entity should trigger the flushing - await store.addEntities(storageDirectoryPath, [generateEntity()]); + await store.addEntities(storageDirectoryPath, [createTestEntity()]); expect(flushEntitiesSpy).toHaveBeenCalledTimes(1); }); @@ -183,8 +187,8 @@ describe('addEntities', () => { describe('addRelationships', () => { test('should automatically flush relationships to disk after hitting a certain threshold', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); - const relationships = times(GRAPH_OBJECT_BUFFER_THRESHOLD - 1, () => - generateRelationship(), + const relationships = times(DEFAULT_GRAPH_OBJECT_BUFFER_THRESHOLD - 1, () => + createTestRelationship(), ); const flushRelationshipsSpy = jest.spyOn(store, 'flushRelationshipsToDisk'); @@ -195,7 +199,7 @@ describe('addRelationships', () => { // adding an additional relationship should trigger the flushing await store.addRelationships(storageDirectoryPath, [ - generateRelationship(), + createTestRelationship(), ]); expect(flushRelationshipsSpy).toHaveBeenCalledTimes(1); }); @@ -244,14 +248,14 @@ describe('addRelationships', () => { }); describe('getEntity', () => { - test('should flush buffered entities and get an entity by "_type" and "_key"', async () => { + test('should find buffered entities and get an entity by "_type" and "_key"', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const _type = uuid(); const _key = uuid(); - const nonMatchingEntities = times(25, () => generateEntity({ _type })); - const matchingEntity = generateEntity({ _type, _key }); + const nonMatchingEntities = times(25, () => createTestEntity({ _type })); + const matchingEntity = createTestEntity({ _type, _key }); await store.addEntities(storageDirectoryPath, [ ...nonMatchingEntities, @@ -259,23 +263,35 @@ describe('getEntity', () => { ]); const entity = await store.getEntity({ _key, _type }); - expect(store.entityStorageMap.totalItemCount).toEqual(0); - expect(entity).toEqual(matchingEntity); }); + + test('should find non-buffered entities and get an entity by "_type" and "_key"', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 2, + }); + + const _type = uuid(); + + const entities = times(2, () => createTestEntity({ _type })); + await store.addEntities(storageDirectoryPath, entities); + + const entity = await store.getEntity({ _key: entities[1]._key, _type }); + expect(entity).toEqual(entities[1]); + }); }); describe('iterateEntities', () => { - test('should flush buffered entities and iterate the entity "_type" index stored on disk', async () => { + test('should find buffered & non-buffered entities and iterate the entity "_type" index stored on disk', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const matchingType = uuid(); const nonMatchingEntities = times(25, () => - generateEntity({ _type: uuid() }), + createTestEntity({ _type: uuid() }), ); const matchingEntities = times(25, () => - generateEntity({ _type: matchingType }), + createTestEntity({ _type: matchingType }), ); await store.addEntities(storageDirectoryPath, [ @@ -283,15 +299,26 @@ describe('iterateEntities', () => { ...matchingEntities, ]); - const collectedEntities: Entity[] = []; + await store.flushEntitiesToDisk(); + + const bufferedEntity = createTestEntity({ _type: matchingType }); + await store.addEntities(storageDirectoryPath, [bufferedEntity]); + + const collectedEntities = new Map(); const collectEntity = (e: Entity) => { - collectedEntities.push(e); + if (collectedEntities.has(e._key)) { + throw new Error( + `duplicate entity _key found in iterateEntities (_key=${e._key})`, + ); + } + collectedEntities.set(e._key, e); }; await store.iterateEntities({ _type: matchingType }, collectEntity); - expect(store.entityStorageMap.totalItemCount).toEqual(0); - - expect(collectedEntities).toEqual(matchingEntities); + expect(Array.from(collectedEntities.values())).toEqual([ + bufferedEntity, + ...matchingEntities, + ]); }); test('should allow extended types to be iterated', async () => { @@ -302,7 +329,7 @@ describe('iterateEntities', () => { type TestEntity = Entity & { randomField: string }; const entities = times(25, () => - generateEntity({ _type: entityType, randomField: 'field' }), + createTestEntity({ _type: entityType, randomField: 'field' }), ); await store.addEntities(storageDirectoryPath, entities); @@ -328,16 +355,16 @@ describe('iterateEntities', () => { }); describe('iterateRelationships', () => { - test('should flush buffered relationshipos and iterate the relationship "_type" index stored on disk', async () => { + test('should find buffered & non-buffered relationshipos and iterate the relationship "_type" index stored on disk', async () => { const { storageDirectoryPath, store } = setupFileSystemObjectStore(); const matchingType = uuid(); const nonMatchingRelationships = times(25, () => - generateRelationship({ _type: uuid() }), + createTestRelationship({ _type: uuid() }), ); const matchingRelationships = times(25, () => - generateRelationship({ _type: matchingType }), + createTestRelationship({ _type: matchingType }), ); await store.addRelationships(storageDirectoryPath, [ @@ -346,18 +373,29 @@ describe('iterateRelationships', () => { ]); await store.flush(); - const collectedRelationships: Relationship[] = []; + const bufferedRelationship = createTestRelationship({ + _type: matchingType, + }); + await store.addRelationships(storageDirectoryPath, [bufferedRelationship]); + + const collectedRelationships = new Map(); const collectRelationship = (r: Relationship) => { - collectedRelationships.push(r); + if (collectedRelationships.has(r._key)) { + throw new Error( + `duplicate relationship_key found in iterateRelationships (_key=${r._key})`, + ); + } + collectedRelationships.set(r._key, r); }; await store.iterateRelationships( { _type: matchingType }, collectRelationship, ); - expect(store.relationshipStorageMap.totalItemCount).toEqual(0); - - expect(collectedRelationships).toEqual(matchingRelationships); + expect(Array.from(collectedRelationships.values())).toEqual([ + bufferedRelationship, + ...matchingRelationships, + ]); }); test('should allow extended types to be iterated', async () => { @@ -368,7 +406,7 @@ describe('iterateRelationships', () => { type TestRelationship = Relationship & { randomField: string }; const relationships = times(25, () => - generateRelationship({ _type: relationshipType, randomField: 'field' }), + createTestRelationship({ _type: relationshipType, randomField: 'field' }), ); await store.addRelationships(storageDirectoryPath, relationships); @@ -393,9 +431,189 @@ describe('iterateRelationships', () => { }); }); -function setupFileSystemObjectStore() { +describe('flush callbacks', () => { + test('#addEntity should call flush callback when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 2, + }); + + let flushedEntitiesCollected: Entity[] = []; + let addEntitiesFlushCalledTimes = 0; + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + const e3 = createTestEntity(); + + await store.addEntities(storageDirectoryPath, [e1], async (entities) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + }); + + await store.addEntities( + storageDirectoryPath, + [e2, e3], + async (entities) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + }, + ); + + expect(addEntitiesFlushCalledTimes).toEqual(1); + expect(flushedEntitiesCollected).toEqual([e1, e2, e3]); + }); + + test('#addRelationships should call flush callback when buffer threshold reached', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 2, + }); + + let flushedRelationshipsCollected: Relationship[] = []; + let addRelationshipsFlushCalledTimes = 0; + + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + const r3 = createTestRelationship(); + + await store.addRelationships( + storageDirectoryPath, + [r1], + async (relationships) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addRelationshipsFlushCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + }, + ); + + await store.addRelationships( + storageDirectoryPath, + [r2, r3], + async (relationships) => { + // Should not call first time because `graphObjectBufferThreshold` = 2 + addRelationshipsFlushCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + }, + ); + + expect(addRelationshipsFlushCalledTimes).toEqual(1); + expect(flushedRelationshipsCollected).toEqual([r1, r2, r3]); + }); + + test('#flushEntitiesToDisk should call flush callback when flushEntitiesToDisk called', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 10, + }); + + let flushedEntitiesCollected: Entity[] = []; + let addEntitiesFlushCalledTimes = 0; + + const entities = times(3, () => createTestEntity()); + + async function onEntitiesFlushed(entities) { + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + } + + await store.addEntities(storageDirectoryPath, entities, onEntitiesFlushed); + + expect(addEntitiesFlushCalledTimes).toEqual(0); + expect(flushedEntitiesCollected).toEqual([]); + + await store.flushEntitiesToDisk(onEntitiesFlushed); + + expect(addEntitiesFlushCalledTimes).toEqual(1); + expect(flushedEntitiesCollected).toEqual(entities); + }); + + test('#flushRelationshipsToDisk should call flush callback when flushRelationshipsToDisk called', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 10, + }); + + let flushedRelationshipsCollected: Relationship[] = []; + let addRelationshipsFlushedCalledTimes = 0; + + const relationships = times(3, () => createTestRelationship()); + + async function onRelationshipsFlushed(relationships) { + addRelationshipsFlushedCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + } + + await store.addRelationships( + storageDirectoryPath, + relationships, + onRelationshipsFlushed, + ); + + expect(addRelationshipsFlushedCalledTimes).toEqual(0); + expect(flushedRelationshipsCollected).toEqual([]); + + await store.flushRelationshipsToDisk(onRelationshipsFlushed); + + expect(addRelationshipsFlushedCalledTimes).toEqual(1); + expect(flushedRelationshipsCollected).toEqual(relationships); + }); + + test('#flush should call both entity and relationship flush callbacks when flush called', async () => { + const { storageDirectoryPath, store } = setupFileSystemObjectStore({ + graphObjectBufferThreshold: 10, + }); + + let flushedRelationshipsCollected: Relationship[] = []; + let addRelationshipsFlushedCalledTimes = 0; + + let flushedEntitiesCollected: Entity[] = []; + let addEntitiesFlushCalledTimes = 0; + + const entities = times(3, () => createTestEntity()); + const relationships = times(3, () => createTestRelationship()); + + async function onEntitiesFlushed(entities) { + addEntitiesFlushCalledTimes++; + flushedEntitiesCollected = flushedEntitiesCollected.concat(entities); + return Promise.resolve(); + } + + async function onRelationshipsFlushed(relationships) { + addRelationshipsFlushedCalledTimes++; + flushedRelationshipsCollected = flushedRelationshipsCollected.concat( + relationships, + ); + return Promise.resolve(); + } + + await store.addRelationships( + storageDirectoryPath, + relationships, + onRelationshipsFlushed, + ); + await store.addEntities(storageDirectoryPath, entities, onEntitiesFlushed); + await store.flush(onEntitiesFlushed, onRelationshipsFlushed); + + expect(addEntitiesFlushCalledTimes).toEqual(1); + expect(addRelationshipsFlushedCalledTimes).toEqual(1); + expect(flushedEntitiesCollected).toEqual(entities); + expect(flushedRelationshipsCollected).toEqual(relationships); + }); +}); + +function setupFileSystemObjectStore(params?: FileSystemGraphObjectStoreParams) { const storageDirectoryPath = uuid(); - const store = new FileSystemGraphObjectStore(); + const store = new FileSystemGraphObjectStore(params); return { storageDirectoryPath, diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts index 6d2156782..13743bd97 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/flushDataToDisk.test.ts @@ -10,7 +10,7 @@ import flatten from 'lodash/flatten'; import { getRootStorageDirectory } from '../../../fileSystem'; import { flushDataToDisk } from '../flushDataToDisk'; -import { generateEntity } from './util/graphObjects'; +import { createTestEntity } from '@jupiterone/integration-sdk-private-test-utils'; jest.mock('fs'); @@ -18,9 +18,9 @@ afterEach(() => vol.reset()); test('should group objects by "_type" and write them to separate files', async () => { const testEntityData = { - A: times(25, () => generateEntity({ _type: 'A' })), - B: times(25, () => generateEntity({ _type: 'B' })), - C: times(25, () => generateEntity({ _type: 'C' })), + A: times(25, () => createTestEntity({ _type: 'A' })), + B: times(25, () => createTestEntity({ _type: 'B' })), + C: times(25, () => createTestEntity({ _type: 'C' })), }; const allEntities = randomizeOrder(flatten(Object.values(testEntityData))); @@ -70,6 +70,38 @@ test('should group objects by "_type" and write them to separate files', async ( }); }); +test('should allow prettifying files', async () => { + const _type = uuid(); + const entities = times(2, () => createTestEntity({ _type })); + const storageDirectoryPath = uuid(); + + await flushDataToDisk({ + storageDirectoryPath, + collectionType: 'entities', + data: entities, + pretty: true, + }); + + const entitiesDirectory = path.join( + getRootStorageDirectory(), + 'graph', + storageDirectoryPath, + 'entities', + ); + + const entityFiles = await fs.readdir(entitiesDirectory); + expect(entityFiles).toHaveLength(1); + + const fileData = await fs.readFile( + path.join(entitiesDirectory, entityFiles[0]), + { + encoding: 'utf-8', + }, + ); + + expect(fileData).toEqual(JSON.stringify({ entities }, null, 2)); +}); + function randomizeOrder(things: T[]): T[] { return sortBy(things, () => Math.random()); } diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts deleted file mode 100644 index f5a551462..000000000 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/__tests__/util/graphObjects.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { v4 as uuid } from 'uuid'; -import { Entity, Relationship } from '@jupiterone/integration-sdk-core'; - -export function generateEntity(overrides?: Partial): Entity { - return { - _key: uuid(), - _type: uuid(), - _class: uuid(), - ...overrides, - }; -} - -export function generateRelationship( - overrides?: Partial, -): Relationship { - return { - _key: uuid(), - _type: uuid(), - _class: uuid(), - _toEntityKey: uuid(), - _fromEntityKey: uuid(), - ...overrides, - }; -} diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts index 6171f3a61..f729d2481 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/flushDataToDisk.ts @@ -16,6 +16,7 @@ interface FlushDataToDiskInput { storageDirectoryPath: string; collectionType: CollectionType; data: Entity[] | Relationship[]; + pretty?: boolean; } /** @@ -27,6 +28,7 @@ export async function flushDataToDisk({ storageDirectoryPath, collectionType, data, + pretty, }: FlushDataToDiskInput) { // split the data by type first const groupedCollections = groupBy(data, '_type'); @@ -49,6 +51,7 @@ export async function flushDataToDisk({ data: { [collectionType]: collection, }, + pretty, }); await symlink({ diff --git a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/index.ts b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/index.ts index 2f6f20cd9..1aca85803 100644 --- a/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/index.ts +++ b/packages/integration-sdk-runtime/src/storage/FileSystemGraphObjectStore/index.ts @@ -1,2 +1 @@ export * from './FileSystemGraphObjectStore'; -export * from './BucketMap'; diff --git a/packages/integration-sdk-runtime/src/storage/index.ts b/packages/integration-sdk-runtime/src/storage/index.ts index fe96124b7..998467169 100644 --- a/packages/integration-sdk-runtime/src/storage/index.ts +++ b/packages/integration-sdk-runtime/src/storage/index.ts @@ -1,2 +1,3 @@ export { GraphObjectStore } from '@jupiterone/integration-sdk-core'; export * from './FileSystemGraphObjectStore'; +export * from './memory'; diff --git a/packages/integration-sdk-runtime/src/storage/memory.test.ts b/packages/integration-sdk-runtime/src/storage/memory.test.ts new file mode 100644 index 000000000..f9d1dd1c0 --- /dev/null +++ b/packages/integration-sdk-runtime/src/storage/memory.test.ts @@ -0,0 +1,190 @@ +import { Entity, Relationship } from '@jupiterone/integration-sdk-core'; +import { InMemoryGraphObjectStore } from './memory'; +import { v4 as uuid } from 'uuid'; +import { + createTestEntity, + createTestRelationship, +} from '@jupiterone/integration-sdk-private-test-utils'; + +async function collectEntitiesByType( + store: InMemoryGraphObjectStore, + _type: string, +): Promise { + const entities: Entity[] = []; + + await store.iterateEntities({ _type }, (e) => { + entities.push(e); + }); + + return entities; +} + +async function collectRelationshipsByType( + store: InMemoryGraphObjectStore, + _type: string, +): Promise { + const relationships: Relationship[] = []; + + await store.iterateRelationships({ _type }, (r) => { + relationships.push(r); + }); + + return relationships; +} + +describe('#InMemoryGraphObjectStore', () => { + test('should allow adding entities and finding by _key', () => { + const store = new InMemoryGraphObjectStore(); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + store.addEntities(uuid(), [e1, e2]); + + expect(store.findEntity(e1._key)).toEqual(e1); + expect(store.findEntity(e2._key)).toEqual(e2); + }); + + test('should add entities to same _type map if already exists', async () => { + const store = new InMemoryGraphObjectStore(); + + const _type = uuid(); + const e1 = createTestEntity({ _type }); + const e2 = createTestEntity({ _type }); + + store.addEntities(uuid(), [e1]); + store.addEntities(uuid(), [e2]); + + expect(await collectEntitiesByType(store, _type)).toEqual([e1, e2]); + }); + + test('should add relationships to same _type map if already exists', async () => { + const store = new InMemoryGraphObjectStore(); + + const _type = uuid(); + const r1 = createTestRelationship({ _type }); + const r2 = createTestRelationship({ _type }); + + store.addRelationships(uuid(), [r1]); + store.addRelationships(uuid(), [r2]); + + expect(await collectRelationshipsByType(store, _type)).toEqual([r1, r2]); + }); + + test('should allow iterating entities', async () => { + const store = new InMemoryGraphObjectStore(); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + store.addEntities(uuid(), [e1, e2]); + + expect(await collectEntitiesByType(store, e1._type)).toEqual([e1]); + expect(await collectEntitiesByType(store, e2._type)).toEqual([e2]); + }); + + test('should not throw if iterating entity _type that does not exist', async () => { + const store = new InMemoryGraphObjectStore(); + expect(await collectEntitiesByType(store, uuid())).toEqual([]); + }); + + test('should not throw if iterating relationship _type that does not exist', async () => { + const store = new InMemoryGraphObjectStore(); + expect(await collectRelationshipsByType(store, uuid())).toEqual([]); + }); + + test('should allow adding relationships and iterating by _type', async () => { + const store = new InMemoryGraphObjectStore(); + + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + store.addRelationships(uuid(), [r1, r2]); + + expect(await collectRelationshipsByType(store, r1._type)).toEqual([r1]); + expect(await collectRelationshipsByType(store, r2._type)).toEqual([r2]); + }); + + test('should return "undefined" if entity not found', () => { + const store = new InMemoryGraphObjectStore(); + expect(store.findEntity(uuid())).toEqual(undefined); + }); + + test('should allow collecting entities by step', () => { + const store = new InMemoryGraphObjectStore(); + + const step1 = uuid(); + const e1 = createTestEntity(); + const e2 = createTestEntity(); + store.addEntities(step1, [e1, e2]); + + const step2 = uuid(); + const e3 = createTestEntity(); + const e4 = createTestEntity(); + store.addEntities(step2, [e3, e4]); + + const collected = store.collectEntitiesByStep(); + expect(collected.size).toEqual(2); + expect(collected.get(step1)).toEqual([e1, e2]); + expect(collected.get(step2)).toEqual([e3, e4]); + }); + + test('should allow collecting relationships by step', () => { + const store = new InMemoryGraphObjectStore(); + + const step1 = uuid(); + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + store.addRelationships(step1, [r1, r2]); + + const step2 = uuid(); + const r3 = createTestRelationship(); + const r4 = createTestRelationship(); + store.addRelationships(step2, [r3, r4]); + + const collected = store.collectRelationshipsByStep(); + expect(collected.size).toEqual(2); + expect(collected.get(step1)).toEqual([r1, r2]); + expect(collected.get(step2)).toEqual([r3, r4]); + }); + + test('should get correct total entity and relationship counts', () => { + const store = new InMemoryGraphObjectStore(); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + + store.addEntities(uuid(), [e1, e2]); + store.addRelationships(uuid(), [r1, r2]); + + expect(store.getTotalEntityItemCount()).toEqual(2); + expect(store.getTotalRelationshipItemCount()).toEqual(2); + }); + + test('should delete only selected entities when flushing', () => { + const store = new InMemoryGraphObjectStore(); + + const e1 = createTestEntity(); + const e2 = createTestEntity(); + + store.addEntities(uuid(), [e1, e2]); + store.flushEntities([e1]); + + expect(store.findEntity(e1._key)).toEqual(undefined); + expect(store.findEntity(e2._key)).toEqual(e2); + expect(store.getTotalEntityItemCount()).toEqual(1); + }); + + test('should delete only selected relationships when flushing', async () => { + const store = new InMemoryGraphObjectStore(); + + const r1 = createTestRelationship(); + const r2 = createTestRelationship(); + + store.addRelationships(uuid(), [r1, r2]); + store.flushRelationships([r1]); + + expect(await collectRelationshipsByType(store, r1._type)).toEqual([]); + expect(await collectRelationshipsByType(store, r2._type)).toEqual([r2]); + expect(store.getTotalRelationshipItemCount()).toEqual(1); + }); +}); diff --git a/packages/integration-sdk-runtime/src/storage/memory.ts b/packages/integration-sdk-runtime/src/storage/memory.ts new file mode 100644 index 000000000..f1b2de228 --- /dev/null +++ b/packages/integration-sdk-runtime/src/storage/memory.ts @@ -0,0 +1,257 @@ +import { + Entity, + GraphObjectFilter, + GraphObjectIteratee, + IntegrationMissingKeyError, + Relationship, +} from '@jupiterone/integration-sdk-core'; + +export interface GraphObjectMetadata { + stepId: string; +} + +interface InMemoryGraphObjectStoreEntityData extends GraphObjectMetadata { + entity: Entity; +} + +interface InMemoryGraphObjectStoreRelationshipData extends GraphObjectMetadata { + relationship: Relationship; +} + +/** + * Stores entities and relationships in memory and may be used to buffer data in + * memory before flushing to another datastore (e.g. disk). + */ +export class InMemoryGraphObjectStore { + /** + * Maps to lookup entity/relationship with metadata by _key + * + * { + * "my_user_key1": { stepId: 'fetch-users', entity: { ...ENTITY1 } }, + * "my_user_key2": { stepId: 'fetch-users', entity: { ...ENTITY2 } } + * } + */ + private readonly entityKeyToEntityMap = new Map< + string, + InMemoryGraphObjectStoreEntityData + >(); + private readonly relationshipKeyToRelationshipMap = new Map< + string, + InMemoryGraphObjectStoreRelationshipData + >(); + + /** + * Maps to lookup all entity/relationship _key's by _type. The value of this + * data structure is Map<_key{string}, boolean> to make deleting individual + * keys super fast. See "flushEntities" for more information. + * + * { + * 'my_token_type': { + * 'my_token_key1': true, + * 'my_token_key2': true + * }, + * 'my_user_type': { + * 'my_user_key1': true, + * 'my_user_key2': true + * } + * } + */ + private readonly entityTypeToKeysMap: Map< + string, + Map + > = new Map(); + private readonly relationshipTypeToKeysMap: Map< + string, + Map + > = new Map(); + + addEntities(stepId: string, newEntities: Entity[]): void { + for (const entity of newEntities) { + this.entityKeyToEntityMap.set(entity._key, { + stepId, + entity, + }); + + const entityTypeKeysMap = this.entityTypeToKeysMap.get(entity._type); + if (entityTypeKeysMap) { + entityTypeKeysMap.set(entity._key, true); + } else { + this.entityTypeToKeysMap.set( + entity._type, + new Map([[entity._key, true]]), + ); + } + } + } + + addRelationships(stepId: string, newRelationships: Relationship[]): void { + for (const relationship of newRelationships) { + this.relationshipKeyToRelationshipMap.set(relationship._key, { + stepId, + relationship, + }); + + const relationshipTypeKeysMap = this.relationshipTypeToKeysMap.get( + relationship._type, + ); + + if (relationshipTypeKeysMap) { + relationshipTypeKeysMap.set(relationship._key, true); + } else { + this.relationshipTypeToKeysMap.set( + relationship._type, + new Map([[relationship._key, true]]), + ); + } + } + } + + findEntity(_key: string): Entity | undefined { + return this.entityKeyToEntityMap.get(_key)?.entity; + } + + async iterateEntities( + filter: GraphObjectFilter, + iteratee: GraphObjectIteratee, + ): Promise { + const entityTypeKeysMap = this.entityTypeToKeysMap.get(filter._type); + + if (!entityTypeKeysMap) { + return; + } + + for (const [_key] of entityTypeKeysMap) { + const graphObjectData = this.entityKeyToEntityMap.get(_key); + + if (!graphObjectData) { + // NOTE: This should never happen. Our data structures should stay in + // sync. + throw new IntegrationMissingKeyError( + `Failed to find entity (_type=${filter._type}, _key=${_key})`, + ); + } + + await iteratee(graphObjectData.entity as T); + } + } + + async iterateRelationships( + filter: GraphObjectFilter, + iteratee: GraphObjectIteratee, + ): Promise { + const relationshipTypeKeysMap = this.relationshipTypeToKeysMap.get( + filter._type, + ); + + if (!relationshipTypeKeysMap) { + return; + } + + for (const [_key] of relationshipTypeKeysMap) { + const graphObjectData = this.relationshipKeyToRelationshipMap.get(_key); + + if (!graphObjectData) { + // NOTE: This should never happen. Our data structures should stay in + // sync. + throw new IntegrationMissingKeyError( + `Failed to find relationship (_type=${filter._type}, _key=${_key})`, + ); + } + + await iteratee(graphObjectData.relationship as T); + } + } + + /** + * Instead of clearing the entire map, we want to delete old data individually. + * This will allow us to eventually run steps in parallel without locking when + * steps have no interdependence. If we cleared it out, it's possible that + * two steps could be running in parallel and one of the steps may clear out + * the maps while the other is still relying on it. + */ + flushEntities(entities: Entity[]) { + for (const entity of entities) { + this.entityKeyToEntityMap.delete(entity._key); + const entityTypeKeysMap = this.entityTypeToKeysMap.get(entity._type); + + if (!entityTypeKeysMap) { + // NOTE: This should never happen. It's an indicator that there is a + // bug in keeping our two maps in syc. + throw new Error( + `Could not delete entity from type keys map (_key=${entity._key}, _type=${entity._type})`, + ); + } + + entityTypeKeysMap.delete(entity._key); + } + } + + /** + * Instead of clearing the entire map, we want to delete old data individually. + * This will allow us to eventually run steps in parallel without locking when + * steps have no interdependence. If we cleared it out, it's possible that + * two steps could be running in parallel and one of the steps may clear out + * the maps while the other is still relying on it. + */ + flushRelationships(relationships: Relationship[]) { + for (const relationship of relationships) { + this.relationshipKeyToRelationshipMap.delete(relationship._key); + const relationshipTypeKeysMap = this.relationshipTypeToKeysMap.get( + relationship._type, + ); + + if (!relationshipTypeKeysMap) { + // NOTE: This should never happen. It's an indicator that there is a + // bug in keeping our two maps in syc. + throw new Error( + `Could not delete relationship from type keys map (_key=${relationship._key}, _type=${relationship._type})`, + ); + } + + relationshipTypeKeysMap.delete(relationship._key); + } + } + + collectEntitiesByStep(): Map { + const entitiesByStepMap = new Map(); + + for (const [_key, graphObjectData] of this.entityKeyToEntityMap) { + const { stepId, entity } = graphObjectData; + + const entitiesByStepArray = entitiesByStepMap.get(stepId); + if (entitiesByStepArray) { + entitiesByStepArray.push(entity); + } else { + entitiesByStepMap.set(stepId, [entity]); + } + } + + return entitiesByStepMap; + } + + collectRelationshipsByStep(): Map { + const relationshipsByStepMap = new Map(); + + for (const [_key, graphObjectData] of this + .relationshipKeyToRelationshipMap) { + const { stepId, relationship } = graphObjectData; + + const relationshipsByStepArray = relationshipsByStepMap.get(stepId); + if (relationshipsByStepArray) { + relationshipsByStepArray.push(relationship); + } else { + relationshipsByStepMap.set(stepId, [relationship]); + } + } + + return relationshipsByStepMap; + } + + getTotalEntityItemCount() { + return this.entityKeyToEntityMap.size; + } + + getTotalRelationshipItemCount() { + return this.relationshipKeyToRelationshipMap.size; + } +} diff --git a/packages/integration-sdk-runtime/src/synchronization/index.ts b/packages/integration-sdk-runtime/src/synchronization/index.ts index 901aed7e7..a6f8fba31 100644 --- a/packages/integration-sdk-runtime/src/synchronization/index.ts +++ b/packages/integration-sdk-runtime/src/synchronization/index.ts @@ -170,6 +170,45 @@ async function getPartialDatasets() { return summary.metadata.partialDatasets; } +export async function uploadGraphObjectData( + synchronizationJobContext: SynchronizationJobContext, + graphObjectData: FlushedGraphObjectData, +) { + try { + if (Array.isArray(graphObjectData.entities)) { + synchronizationJobContext.logger.info( + { + entities: graphObjectData.entities.length, + }, + 'Uploading entities', + ); + + await uploadData( + synchronizationJobContext, + 'entities', + graphObjectData.entities, + ); + } + + if (Array.isArray(graphObjectData.relationships)) { + synchronizationJobContext.logger.info( + { + relationships: graphObjectData.relationships.length, + }, + 'Uploading relationships', + ); + + await uploadData( + synchronizationJobContext, + 'relationships', + graphObjectData.relationships, + ); + } + } catch (err) { + throw synchronizationApiError(err, 'Error uploading collected data'); + } +} + /** * Uploads data collected by the integration into the */ @@ -183,28 +222,12 @@ export async function uploadCollectedData(context: SynchronizationJobContext) { walkDirectory({ path: 'graph', async iteratee({ filePath }) { - const parsedData = await readGraphObjectFile({ - filePath, - }); - - try { - if (Array.isArray(parsedData.entities)) { - await uploadData(context, 'entities', parsedData.entities); - } - - if (Array.isArray(parsedData.relationships)) { - await uploadData( - context, - 'relationships', - parsedData.relationships, - ); - } - } catch (err) { - throw synchronizationApiError( - err, - 'Error uploading collected data', - ); - } + await uploadGraphObjectData( + context, + await readGraphObjectFile({ + filePath, + }), + ); }, }), }); diff --git a/packages/integration-sdk-runtime/test/util/fixtures.ts b/packages/integration-sdk-runtime/test/util/fixtures.ts index 98cd88b3d..e906b4f03 100644 --- a/packages/integration-sdk-runtime/test/util/fixtures.ts +++ b/packages/integration-sdk-runtime/test/util/fixtures.ts @@ -1,4 +1,8 @@ -import { ExecutionHistory, IntegrationInstance } from "@jupiterone/integration-sdk-core/src"; +import { + ExecutionHistory, + IntegrationInstance, + IntegrationLogger, +} from '@jupiterone/integration-sdk-core'; export const LOCAL_INTEGRATION_INSTANCE: IntegrationInstance = { id: 'local-integration-instance', @@ -18,7 +22,9 @@ export const LOCAL_EXECUTION_HISTORY: ExecutionHistory = { // eslint-disable-next-line @typescript-eslint/no-empty-function const noop = () => {}; -export function createMockIntegrationLogger(overrides) { +export function createMockIntegrationLogger( + overrides?: Partial, +) { return { trace: noop, debug: noop, @@ -38,5 +44,5 @@ export function createMockIntegrationLogger(overrides) { publishEvent: noop, publishErrorEvent: noop, ...overrides, - }; + } as IntegrationLogger; } diff --git a/packages/integration-sdk-testing/package.json b/packages/integration-sdk-testing/package.json index e7a09e285..d9b73b706 100644 --- a/packages/integration-sdk-testing/package.json +++ b/packages/integration-sdk-testing/package.json @@ -1,6 +1,6 @@ { "name": "@jupiterone/integration-sdk-testing", - "version": "5.1.0", + "version": "5.2.0", "description": "Testing utilities for JupiterOne integrations", "main": "dist/src/index.js", "types": "dist/src/index.d.ts", @@ -23,8 +23,8 @@ "prepack": "yarn build:dist" }, "dependencies": { - "@jupiterone/integration-sdk-core": "^5.1.0", - "@jupiterone/integration-sdk-runtime": "^5.1.0", + "@jupiterone/integration-sdk-core": "^5.2.0", + "@jupiterone/integration-sdk-runtime": "^5.2.0", "@pollyjs/adapter-node-http": "^4.0.4", "@pollyjs/core": "^4.0.4", "@pollyjs/persister-fs": "^4.0.4", @@ -36,7 +36,7 @@ "lodash": "^4.17.15" }, "devDependencies": { - "@jupiterone/integration-sdk-private-test-utils": "^5.1.0", + "@jupiterone/integration-sdk-private-test-utils": "^5.2.0", "@types/lodash": "^4.14.149", "get-port": "^5.1.1", "memfs": "^3.2.0" diff --git a/packages/integration-sdk/CHANGELOG.md b/packages/integration-sdk/CHANGELOG.md index d5a352906..bfd639651 100644 --- a/packages/integration-sdk/CHANGELOG.md +++ b/packages/integration-sdk/CHANGELOG.md @@ -9,6 +9,47 @@ and this project adheres to ## Unreleased +## 5.2.0 - 2020-12-18 + +### Changed + +- OPTIMIZATION: Buffer entities and relationships in memory and allow for fast + lookups. This change allows us to skip flushing to disk anytime there is a + call to `findEntity`, `iterateEntities` or `iterateRelationships`. + + See PR [#395](https://github.com/JupiterOne/sdk/pull/395) + +- OPTIMIZATION: Allow `FileSystemGraphObjectStore` to specify + `graphObjectBufferThreshold`, which defines the maximum number of graph + objects that the graph object store can buffer into memory before flushing to + disk. Machines with more memory should consider increasing this value as the + default is `500`. + + See PR [#395](https://github.com/JupiterOne/sdk/pull/395) + +- OPTIMIZATION: Continuously upload integration data during collection phase. + Previously, our integrations had two primary phases. The first phase was the + collection phase, and the second phase was the upload phase. The integration + SDK now mixes these two phases and the integrations will upload the collected + data to JupiterOne during the integration execution step that it has been + collected in. + + See PR [#396](https://github.com/JupiterOne/sdk/pull/396) + +- OPTIMIZATION: Reduce the size of the graph object files that are stored on + disk by default. Previously, all graph object files written to disk while + running the integration locally and running the integration in the managed + JupiterOne runtime, were created with whitespace. The file whitespace is now + only created when running the integration locally via the CLI commands. + + See PR [#399](https://github.com/JupiterOne/sdk/pull/399) + +### Removed + +- Remove `BucketMap` as it's no longer used in the `FileSystemGraphObjectStore`. + `BucketMap` was technically exposed externally, but should not have been used + externally. + ## 5.1.0 - 2020-12-08 ### Added