Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for continuous integration data uploads #396

Merged
merged 16 commits into from
Dec 16, 2020

Conversation

austinkelleher
Copy link
Contributor

@austinkelleher austinkelleher commented Dec 16, 2020

Our previous integration flow had two primary phases:

Phase 1: Collect all of the data
Phase 2: Upload all of the data

These changes improve this drastically by mixing the two phases. We will now queue up graph object uploads in each step and ensure that all of the uploads have successfully completed before triggering dependent steps.

Copy link
Contributor

@ndowmon ndowmon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another sweet change. Added some comments for your review.

try {
// Failing to upload all integration data should not be considered a
// fatal failure. We just want to make this step as a partial success
// and move on with our lives!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@@ -284,6 +297,20 @@ export function executeStepDependencyGraph<
status = StepResultStatus.FAILURE;
}

await context.jobState.flush();
Copy link
Contributor

@ndowmon ndowmon Dec 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Looks like we now flush after each new step. 😈 advocate: do we want to change the implementation of the maps in localGraphDataStore, which tracks stepId and _key for entities? It seems like we should no longer need to track stepId now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing...I had slightly changed the behavior in the previous PR and ended up changing it back to the original in this PR. Should have just rolled back the change in the first PR. See here: https://github.com/JupiterOne/sdk/pull/395/files#diff-8912f1fec8d408545c592a8420e4d837119d01694950f32598718ba7abc57d7aL287

Comment on lines +40 to +61
async enqueue(graphObjectData) {
if (queue.isPaused) {
// This step already failed an upload. We do not want to enqueue more
// for this step.
return;
}

// OPTIMIZATION: We do not want to buffer a lot of graph objects
// into memory inside of the queue. If the queue concurrency has been
// reached, we wait for the queue to flush so that this step has the
// opportunity to upload more data.
if (
queue.pending >= uploadConcurrency ||
queue.size >= uploadConcurrency
) {
if (onThrottleEnqueue) {
// Mainly just used for testing that our custom throttling works.
onThrottleEnqueue();
}

await queue.onIdle();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the enqueue function be async (does it have to be)? I'm thinking from the caller's perspective:

    await graphObjectStore.addEntities(stepId, entities, async (entities) =>
      uploader?.enqueue({
        entities,
        relationships: [],
      }),
    );

Maybe the caller should only need to wait for entities to hit the graph object store, and not for these to be published to the persister. 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Error uploading collected data',
);
}
await uploadGraphObjectData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that uploadCollectedData is still being called from synchronizeCollectedData (which is used in the j1-integration sync command). I haven't looked at the follow-on PRs yet so maybe you will do this, but I think it'd be unsafe to leave this functionality in the off chance that it duplicates uploads to the persister.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still want this. I don't personally use the sync command. run collects & uploads. sync only uploads. The sync command assumes that you already have data on disk that now should be uploaded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems kind of weird to me that there is any interaction with the persister at all in the open-source SDK as that is J1 specific. Am I wrong with the goal of the SDK?

expect(flushedRelationshipsCollected).toEqual([r1, r2, r3]);
});

test('#flushEntitiesToDisk should call flush callback when buffer threshold reached', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like copy-paste error? The buffer threshold has not been reached here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will fix.

});
},

async waitUntilUploadsComplete() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw something about queue.isPaused in src/execution/uploader.ts, should that be set somewhere? Here? IDK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, but no it should not be. We should only pause execution of the queue when we see an error. This function rethrows a collection of errors that we saw.

),

async waitUntilUploadsComplete() {
await uploader?.waitUntilUploadsComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may not have read the code correctly, but it does seem since uploader.enqueue is async and calls await queue.onIdle(), this will always return right away...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be the case. The enqueue method is, in fact, async, but waiting for the enqueued function to actually settle is not async. The only reason why the enqueue function is async is so that we can throttle how many functions we are pushing into our queue. Throttling the enqueue will prevent us from buffering too many entities and relationships into memory while the uploads haven't actually completed.

See: https://github.com/JupiterOne/sdk/pull/396/files#diff-38e59a3db7780e509c55171310fde11a5d109af248e2b859c4f0378ee65e049fR47

@@ -622,6 +630,156 @@ describe('executeStepDependencyGraph', () => {
expect(spyB).toHaveBeenCalledBefore(spyC);
});

test('should mark steps failed executionHandlers with status FAILURE a dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are typos here? Hard to understand.

Suggested change
test('should mark steps failed executionHandlers with status FAILURE a dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => {
test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this also doesn't seem like a test that's relevant to this PR. The scope seems to be about dependencies here, although the functionality is about creating a failing uploader. Consider trimming this, breaking it apart, or making the test name clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix the grammar. Thanks for the suggestion. The test is relevant to the overall changes though. The test validates that our uploads respect the existing behavior of our internal dependency graph.

Comment on lines +57 to +61
await sleep(100);
uploaded.push(d);
} else {
numQueued++;
await sleep(200);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of these sleep()s? It's not very clear to me and makes the test a bit more confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Will add a comment for clarity. The sleeps validate that the promises are indeed executed concurrently instead of in serial. We enqueue a bunch of uploads and, as long as we haven't reached our concurrency limit, the promises will all be run concurrently.

n: number,
) {
const flushed = await createAndEnqueueUploads(uploader, n);
await uploader.waitUntilUploadsComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced that all data is not uploaded at this point in the process. Can you add a test to prove that un-uploaded data can exist in in the upload queue and that waitUntilUploadsComplete does indeed properly wait for uploads to complete?

Alternatively, explain it to me like I'm a 5 year old.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I'm not following, but the function is indeed supposed to wait until uploads are complete. It's used as a utility function throughout these tests, so that we can just assert that we've collected the correct data in the correct order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, the queue will never be that big, but there is a small amount of time when the final concurrent requests need to drain from the queue. I believe waitUntilUploadsComplete will wait for those final requests to resolve.

@austinkelleher austinkelleher force-pushed the 1765-continuous-uploads branch from 9426a2a to b05beb8 Compare December 16, 2020 15:59
Only write prettified files to the file system on local collection
Share graph object creation test utils across tests and cleanup
@austinkelleher austinkelleher merged commit b2b44c3 into 1786-optimize-flushing Dec 16, 2020
@austinkelleher austinkelleher deleted the 1765-continuous-uploads branch December 16, 2020 18:28
onThrottleEnqueue();
}

await queue.onIdle();
Copy link
Contributor

@mknoedel mknoedel Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might not want to wait for onIdle here, as I think it loses some concurrency benefits. I think what this means is we won't start on the next batch of uploads until every one from the last group is finished. We should likely wait on onEmpty and instead of checking that the queue.size and queue.pending are both empty, just check that the queue.size is empty. This way the maximum amount of concurrent calls are being made at all times.

} from '@jupiterone/integration-sdk-runtime';

import { loadConfig } from '../config';
import * as log from '../log';
import { createPersisterApiStepGraphObjectDataUploader } from '@jupiterone/integration-sdk-runtime/dist/src/execution/uploader';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this have to be this way, digging into the dist/ directory?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants