-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for continuous integration data uploads #396
Support for continuous integration data uploads #396
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another sweet change. Added some comments for your review.
try { | ||
// Failing to upload all integration data should not be considered a | ||
// fatal failure. We just want to make this step as a partial success | ||
// and move on with our lives! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
@@ -284,6 +297,20 @@ export function executeStepDependencyGraph< | |||
status = StepResultStatus.FAILURE; | |||
} | |||
|
|||
await context.jobState.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Looks like we now flush after each new step. 😈 advocate: do we want to change the implementation of the maps in localGraphDataStore
, which tracks stepId
and _key
for entities? It seems like we should no longer need to track stepId
now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is confusing...I had slightly changed the behavior in the previous PR and ended up changing it back to the original in this PR. Should have just rolled back the change in the first PR. See here: https://github.com/JupiterOne/sdk/pull/395/files#diff-8912f1fec8d408545c592a8420e4d837119d01694950f32598718ba7abc57d7aL287
async enqueue(graphObjectData) { | ||
if (queue.isPaused) { | ||
// This step already failed an upload. We do not want to enqueue more | ||
// for this step. | ||
return; | ||
} | ||
|
||
// OPTIMIZATION: We do not want to buffer a lot of graph objects | ||
// into memory inside of the queue. If the queue concurrency has been | ||
// reached, we wait for the queue to flush so that this step has the | ||
// opportunity to upload more data. | ||
if ( | ||
queue.pending >= uploadConcurrency || | ||
queue.size >= uploadConcurrency | ||
) { | ||
if (onThrottleEnqueue) { | ||
// Mainly just used for testing that our custom throttling works. | ||
onThrottleEnqueue(); | ||
} | ||
|
||
await queue.onIdle(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the enqueue
function be async
(does it have to be)? I'm thinking from the caller's perspective:
await graphObjectStore.addEntities(stepId, entities, async (entities) =>
uploader?.enqueue({
entities,
relationships: [],
}),
);
Maybe the caller should only need to wait for entities to hit the graph object store, and not for these to be published to the persister. 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does have to be async. See this comment: https://github.com/JupiterOne/sdk/pull/396/files#diff-38e59a3db7780e509c55171310fde11a5d109af248e2b859c4f0378ee65e049fR47
'Error uploading collected data', | ||
); | ||
} | ||
await uploadGraphObjectData( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that uploadCollectedData
is still being called from synchronizeCollectedData
(which is used in the j1-integration sync
command). I haven't looked at the follow-on PRs yet so maybe you will do this, but I think it'd be unsafe to leave this functionality in the off chance that it duplicates uploads to the persister.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still want this. I don't personally use the sync
command. run
collects & uploads. sync
only uploads. The sync
command assumes that you already have data on disk that now should be uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems kind of weird to me that there is any interaction with the persister at all in the open-source SDK as that is J1 specific. Am I wrong with the goal of the SDK?
expect(flushedRelationshipsCollected).toEqual([r1, r2, r3]); | ||
}); | ||
|
||
test('#flushEntitiesToDisk should call flush callback when buffer threshold reached', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like copy-paste error? The buffer threshold has not been reached here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Will fix.
}); | ||
}, | ||
|
||
async waitUntilUploadsComplete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw something about queue.isPaused
in src/execution/uploader.ts
, should that be set somewhere? Here? IDK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, but no it should not be. We should only pause execution of the queue when we see an error. This function rethrows a collection of errors that we saw.
), | ||
|
||
async waitUntilUploadsComplete() { | ||
await uploader?.waitUntilUploadsComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may not have read the code correctly, but it does seem since uploader.enqueue
is async and calls await queue.onIdle()
, this will always return right away...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that would be the case. The enqueue
method is, in fact, async
, but waiting for the enqueued function to actually settle is not async. The only reason why the enqueue
function is async is so that we can throttle how many functions we are pushing into our queue. Throttling the enqueue
will prevent us from buffering too many entities and relationships into memory while the uploads haven't actually completed.
@@ -622,6 +630,156 @@ describe('executeStepDependencyGraph', () => { | |||
expect(spyB).toHaveBeenCalledBefore(spyC); | |||
}); | |||
|
|||
test('should mark steps failed executionHandlers with status FAILURE a dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are typos here? Hard to understand.
test('should mark steps failed executionHandlers with status FAILURE a dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { | |
test('should mark steps with failed executionHandlers with status FAILURE and dependent steps with status PARTIAL_SUCCESS_DUE_TO_DEPENDENCY_FAILURE when step upload fails', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this also doesn't seem like a test that's relevant to this PR. The scope seems to be about dependencies here, although the functionality is about creating a failing uploader. Consider trimming this, breaking it apart, or making the test name clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix the grammar. Thanks for the suggestion. The test is relevant to the overall changes though. The test validates that our uploads respect the existing behavior of our internal dependency graph.
await sleep(100); | ||
uploaded.push(d); | ||
} else { | ||
numQueued++; | ||
await sleep(200); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of these sleep()
s? It's not very clear to me and makes the test a bit more confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. Will add a comment for clarity. The sleeps validate that the promises are indeed executed concurrently instead of in serial. We enqueue a bunch of uploads and, as long as we haven't reached our concurrency limit, the promises will all be run concurrently.
n: number, | ||
) { | ||
const flushed = await createAndEnqueueUploads(uploader, n); | ||
await uploader.waitUntilUploadsComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced that all data is not uploaded at this point in the process. Can you add a test to prove that un-uploaded data can exist in in the upload queue and that waitUntilUploadsComplete
does indeed properly wait for uploads to complete?
Alternatively, explain it to me like I'm a 5 year old.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I'm not following, but the function is indeed supposed to wait until uploads are complete. It's used as a utility function throughout these tests, so that we can just assert that we've collected the correct data in the correct order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can tell, the queue will never be that big, but there is a small amount of time when the final concurrent requests need to drain from the queue. I believe waitUntilUploadsComplete
will wait for those final requests to resolve.
9426a2a
to
b05beb8
Compare
Only write prettified files to the file system on local collection
Add tests for job state upload calls
Share graph object creation test utils across tests and cleanup
onThrottleEnqueue(); | ||
} | ||
|
||
await queue.onIdle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might not want to wait for onIdle
here, as I think it loses some concurrency benefits. I think what this means is we won't start on the next batch of uploads until every one from the last group is finished. We should likely wait on onEmpty
and instead of checking that the queue.size
and queue.pending
are both empty, just check that the queue.size
is empty. This way the maximum amount of concurrent calls are being made at all times.
} from '@jupiterone/integration-sdk-runtime'; | ||
|
||
import { loadConfig } from '../config'; | ||
import * as log from '../log'; | ||
import { createPersisterApiStepGraphObjectDataUploader } from '@jupiterone/integration-sdk-runtime/dist/src/execution/uploader'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this have to be this way, digging into the dist/
directory?
Our previous integration flow had two primary phases:
Phase 1: Collect all of the data
Phase 2: Upload all of the data
These changes improve this drastically by mixing the two phases. We will now queue up graph object uploads in each step and ensure that all of the uploads have successfully completed before triggering dependent steps.