-
Notifications
You must be signed in to change notification settings - Fork 150
flush
does not guarantee that all inflight messages are sent
#309
Comments
flush
does not guarantee that all queued messages are sentflush
does not guarantee that all in-flight messages are sent
flush
does not guarantee that all in-flight messages are sentflush
does not guarantee that all inflight messages are sent
Hello @yo1dog, thanks for your contribution to the library! I just tested the cases mentioned above and clearly flush creates a race condition in which 2 simultaneous downloads are executed in parallel. I'll be working on a solution to solve this next week. Thanks, Segment team. |
I'm also experiencing issues with queueing and flush. Would love to see this fixed to help eliminate issues in my debugging. edit: we are using serverless functions. |
@benjaminhoffman My quick workaround for Lambda functions was to manually track the callbacks for every Segment call. I used a small class to help. This works OK for cloud function executions as they have a predictably short and linear lifecycle. This solution would not work in a long-lived environment as the class SegmentBatch {
constructor() {
this.promises = [];
}
createCB() {
let resolve;
this.promises.push(new Promise(_resolve => {resolve = _resolve;}));
return err => resolve(err);
}
async finish() {
const results = await Promise.all(this.promises);
const err = results.find(err => err);
if (err) throw err;
}
} All the class does is maintain a list of promises which are resolved when the callback is called. Then we can await all the promises to ensure they all callbacks have been called and thus all events have been fullsent. I use it like so: const Analytics = require('analytics-node');
const client = new Analytics(writeKey);
client.flushed = true; // Prevent flushing after first event.
const batch = new SegmentBatch();
client.track({...}, batch.createCB());
client.track({...}, batch.createCB());
...
client.track({...}, batch.createCB());
client.flush();
await batch.finish(); You could modify this for use in long-lived environments by adding logic that removes promises from the array once they have been resolved. |
That worked great for me! Thank you 🙏 . I rewrote this in typescript and it's worth noting the @yo1dog should I be concerned about latency or any delay from the Segment API? Even at 200ms, I hate to add unnecessary delay. Or what if the Segment API times out? |
Yes. The only way to guarantee events are fully sent to Segment is to wait for the underlying HTTP request to complete. Network unreliability can cause this to take a significant amount of time: 100s to 10,000s of milliseconds depending on your setup and timeout settings. Some cloud functions provide execution environments independent of individual executions (outside of the function's handler). You could move the management and flushing of events to this space so long as you can guarantee the flush can be awaited before the environment is stopped/shutdown. This is not possible with AWS Lambda, for example, as the execution environment is destroyed without warning. In this case, the only alternative to blocking invocation ("add unnecessary delay") would be to not emit events to Segment directly but instead to send the events to an external queue to be processed later. Presumably that queue could be "nearby" (peer process, or in same data center, etc.) resulting in much lower latency would and much higher reliability. |
@benjaminhoffman I just realized you stated you are "not using serverless functions." The solution I posted above was geared specifically for short-lived environments such as a cloud function invocation. In a traditional long-lived server process, rather than flushing events and blocking, you should manage the events outside of request handlers (as in, you should send events to Segment API "in the background"). This is what the Segment library aims to provide (I assume. I am not affiliated with Segment or this library in any way): You can simply call My solution here was to use a wrapper around the Segment library which was similar to the batch class above. Something like this (untested): class ManagedSegmentClient {
private readonly client: Analytics;
private readonly promises: Promise<Error | undefined>[];
public constructor(client: Analytics) {
this.client = client;
this.promises = [];
}
public track(...args: Parameters<Analytics['track']>) {
this.client.track(args[0], this.createCB(args[1]));
}
public identify(...args: Parameters<Analytics['identify']>) {
this.client.identify(args[0], this.createCB(args[1]));
}
public async finish() {
const results = await Promise.all(this.promises);
const err = results.find(err => err);
if (err) throw err;
}
private createCB(cb?: (err: Error) => void) {
let resolve: (err?: Error) => void;
const promise = new Promise<Error | undefined>(_resolve => {resolve = _resolve;});
this.promises.push(promise);
promise.finally(() => {
const index = this.promises.indexOf(promise);
if (index > -1) this.promises.splice(index, 1);
});
return (err?: Error) => {
resolve(err);
if (cb) cb(err);
};
}
} And use it like so: const managedSegmentClient = new ManagedSegmentClient(new Analytics(writeKey));
expressApp.get('/fubar', (req, res) => {
// ...
const utmSource = req.query.utm_source;
managedSegmentClient.track({event: 'blah', properties: {utmSource}});
return res.status(200).json({ok: true});
});
process.once('SIGINT', () => exitGracefully());
process.once('SIGTERM', () => exitGracefully());
function exitGracefully() {
// ...
// Flush Segment messages before exiting.
managedSegmentClient.finish(err => {
if (err) console.error('Error flushing Segment messages.');
else console.log('Segment messages flushed.');
// Now exit.
process.exit(exitCode);
});
} Whenever a track or identify event is called, it adds a promise to an array which is resolved when the callback is called (which is after the message has been fully sent). Whenever the promise is resolved, it is removed from the array. Then the |
super helpful @yo1dog and i think others reading this will also gain from your sample code... turns our we are using serverless functions (specifically, nextJS API routes deployed on Vercel) and your first solution worked great. I spoke with our CTO and he said the latency concerns at this point would be a pre-mature optimization and to keep an eye on things. If we see trouble from the |
We are hitting this issue as well. Seems like a pretty fundamental error in the Segment sdk that can cause dropped events which can corrupt data/monitoring in downstream systems. Surprised it has been open for so long. Here is the example I provided Segment support to reproduce. Set a
|
@yo1dog I think you rather want to use |
@emont01 I could see that. However, in this case It doesn’t matter as it is impossible for the promises to reject by design. The promises are resolved with an optional error instead of being rejected. Which you could argue is strange. But I was specially avoiding throwing and rejecting as the intent was to collect errors to process later. I thought ‘allSettled’ could give the impression that a rejected promise was a possibility that could or should be handled. |
@seekayel and @yo1dog: you guys probably are aware of this, but your examples seems to me like an illustration of a misuse of
Here, the expected behavior would be that the long-running processes to be terminated without enough time to complete. Now if we apply this to the original ticket: "if you were to call flush while another flush was already in progress" in fact the second flush callback should be called first. See the following example: // Simple promise that resolves after a given time
const resolveAfter = t => new Promise((resolve, reject) => {
setTimeout(() => resolve(`Completed in ${t}`), t)
})
Promise.allSettled([
resolveAfter(20000),
resolveAfter(20000)
]);
Promise.allSettled([
resolveAfter(10000),
resolveAfter(10000)
]);
Promise.resolve(0).then(() => {
// The process would exit before any promise is processed.
console.log('done');
process.exit(0);
}); I agree with @yo1dog that you probably want to handle closing events (SIGINT, SIGTERM...) to call flush before the app is terminated to avoid losing queued events. process.once('SIGINT', () => exitGracefully());
process.once('SIGTERM', () => exitGracefully()); |
@emont01: I structured the example code using the flush example syntax given in the segment docs: (at the end of this section) https://segment.com/docs/connections/sources/catalog/libraries/server/node/#batching
analytics.flush(function(err, batch){
console.log('Flushed, and now this program can exit!');
}); This code implies that all segment api calls will have completed by the time the callback passed to the invoke of flush is called. So I think we are in agreement? |
@seekayel I agree that you must call flush, but you shouldn't call exit directly, instead make sure there is no additional work pending in the event loop, in case you need to change the exit code set it using |
@emont01 I'm not trying to call exit 😆 ... just pointing out that flush doesn't actually behave as described in the docs. My solution is to just call the api directly with axios and then track the promises myself instead of trying to manage through the sdk. |
@emont01 Yes, I believe it is clear to everyone here that calling As for you workaround of setting the exit code and simply waiting for the event loop to end (which means waiting for the JavaScript event queue to empty), this is not always practical or possible. Database connection pools, HTTP servers, TCP connections, message queues (AMQP), scheduled tasks, etc. could all keep the event queue populated for an indiscriminate amount of time. Ideally these would be shutdown gracefully as well, but not all libraries handle this properly. Further, some environment lifecycles are not dictated by the event loop at all. Cloud functions such as AWS Lambda may execute multiple functions in the same Node.js worker (thus sharing the same event loop across multiple function executions). It may also abruptly end the worker after your function has returned (possibly before the event queue is emptied). In all these cases, the ability to explicitly await a full flushing of Segment messages is necessary. Regardless, the library does not operate as described. Thus this ticket. |
Hey folks, reading through the thread, looks like we have a solution here: #309 (comment) ... closing this issue. Hopefully in the next version of our library, we will support a Promise based API and add the ability to wait for all the inflight messages to be flushed. |
It does. It's workaround for part of the issue (deterministic flushing) but it is not a "solution" in that it does not solve the underlying problem.
Looking forward to the new version. |
Anywhere I can stay tuned to this updated library's progress? I was surprised to find my analytics bugs were coming from the library and not my code. I don't have a ton of time for analytics and I need it to all work. Adding analytics to make sure my analytics is working seems like madness. |
@pooyaj What's the status of this issue and the status of the new SDK since the 23 march ? |
Running into this problem too. Really scary to find out some of my events are being dropped. Considering dropping segment all together and just building my own pipeline, but I really don't want to have to do that. |
This is a bit embarrassing. Lambdas are a very good candidate for event processing and make sense for sending data to segment. This library is unusable at the moment |
Unfortunately, I think "embarrassing" is the right word. I recommend that no one use this library. At least not directly. Just grab your favorite async batching library instead. Segment has failed to fix a critical bug 9 months after it was pointed out. Nor has the documentation been updated. In fact, I found that they have copied my suggestions and workarounds, sometimes verbatim, into their documentation. Normally I wouldn't mind this at all, but they are being used incorrectly: near opposite in meaning. The documentation is now even more misleading.
https://segment.com/docs/connections/sources/catalog/libraries/server/node/#long-running-process I feel I have explained in minute, step-by-step detail exactly what the issue is and how to fix it. For this issue and several others. Segment even emailed asking me to open a PR... to just do it for them... for free. They did not respond when I asked if they had a bug bounty program. |
Any updates here? I don't understand why this has not been prioritized ASAP, this library serves one purpose and it's not being fulfilled due to this 10 month bug. |
@yo1dog they did you dirty. sorry man. they should at least get the bounty program running. fwiw your solution helped me so much that without it, we wouldn’t have been able to use this at all within our app. much appreciated 🙏🏽 |
We too are in this situation. I have opened a support ticket citing this issue. I will report back with their response. |
What is so pernicious about this bug is it manifests in only intermittent drops of events. The only way to catch is if you have an out of band measurement system. The reputation risk to an analytics company dropping events seems huge. I guess they don’t see it that way. |
Yup. 10 months in and not an ETA in sight. I don't blame the devs. Segment is clearly not devoting any resources to this. A complete re-write should take a single dev 1 month tops. And if you don't reinvent the async-batching wheel, 2 weeks tops. Segment, I do consulting work if you still want me to do it. lol |
I have started a very early npm module to do just that. Ran into a wall trying to get esm and cjs both to work. Might just fall back to cjs. Would not say it is production ready yet. Open to PRs, suggestions or guidance. Still finding my way as a side project. |
FYI, that is what they said 6 months ago as well:
|
Would it be possible to update the docs to reference that |
@pooyaj Any updates? |
@benjaminhoffman See my comment: #353 (comment) They are just throwing spaghetti at the wall and not even bothering to test if the solution fixes the example scenarios listed in the bug. Much less solves the class of problem. Given this is the 3rd (4th?) failed attempt to fix this, it's obvious they either don't know what they are doing and/or are putting forth 0 effort. Either way, I must again stress that no one use this library. Extremely unlikely it will be fixed anytime soon. |
I mean, just to be clear, this library is built and maintained by a commercial company with the sole purpose to integrate node apps with their services, correct? I'm struggling to understand why this library is not deserving of attention & budget from Segment? I mean, they're a multi-billion dollar company... We're paying for this. |
Hey everyone, wanted to share an update on where we are with analytics-node. We've created a new package - As part of that work we also focused on the flushing behavior. We introduced a The README for the package has some usage instructions - we tried to keep most of the public API the same to ease migration - and we're working on more thorough documentation to add to the Segment documentation website. We'd love to get any early feedback on the library - here's a link to the repo, and the beta has been published to npm: |
New code base looks fantastic! Great work. Love to see queue classes and event emitters. Code quality looks very high as well. Complements to the chef. Until the new library is released, please, at least update the incorrect and misleading documentation to reflect the current (broken) behavior of Still sucks that the current library has been left broken for so long. |
flush
does not guarantee that all inflight messages are sent before calling the given callback. Instead,flush
simply sends a batch of queued messages and waits for only that batch's response before callback.This is contrary to common expectations that a "flush" function completely empties the buffer to the destination. In fact, there are several open issues caused by this assumption.
Further, this means there is no way to know when both all queued and all inflight messages are full sent (fully flushed). For example, if you were to call
flush
while another flush was already in progress, the callback could be called before the previous flush was complete.This can be a very common occurrence in short-lived environments like cloud functions.
Example:
Similarly:
I suggest adding a new function that
flush()
es all queued messages and does not callback until all concurrent flushes are complete. Also updating the documentation to make it clear thatflush
may defy assumptions about awaiting all queued and inflight messages.The text was updated successfully, but these errors were encountered: