-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
influxdb output is messed if server shutdown the connection non-gracefully #6614
Comments
The separate Serializer sounds good, can you open a pull request? |
reader maybe used in an async manner, like CompressWithGzip, which may continue to consume the serializer even after server return a error. If the client retry in a short period, there may be more than one reader consuming the single serializer, which leads to malformed output data refer influxdata#6614 for details
@danielnelson i found the first solution is not enough: if the single client send output in a vary shot interval, there still a chance to have more than one Reader consuming the serializer. since CompressWithGzip has no way to know the first requests is gone. to over come this. we should either create new serializer for each request, or , we find a way to inform the CompressWithGzip to stop. the second commit add a "close flag" in the reader. which will be closed while the request is finished. and the following read for the reader will receive a EOF. |
or, i think the most "correct" way is to use a sync.Pool for the serializer , and for each write request, grab a instance from the pool. |
I'd really like to avoid a sync.Pool. Only one query at a time is ran, so it seems like what we really need to do is ensure the goroutine within Could we have CompressWithGzip return a new type that wraps the PipeReader with a Close function that will wait for the goroutine to finish? type CloseWaitReader struct {
reader io.Reader
wg sync.WaitGroup
}
func (r *CloseWaitReader) Close() error {
r.wg.Wait()
r.reader.Close()
} The goroutine in CompressWithGzip would call Done() on the waitgroup as its last action. |
reader maybe used in an async manner, like CompressWithGzip, which may continue to consume the serializer even after server return a error. If the client retry in a short period, there may be more than one reader consuming the single serializer, which leads to malformed output data refer influxdata#6614 for details
sounds good, but i found it's better to implement the logic in the source(influx.Reader). http client will not automatically close the reader when it receive error response, which means we may wait for a pretty lone time before CompressWithGzip's goroutine return. i have updated the second commit, please have a look |
My issue with this method is that it still leaves the argument to CompressWithGzip unsafe for future use, without similar code for any user, since we can't determine when the goroutine has exited. It imposes the concurrency related sync on any user of the function, instead of encapsulating the concurrency which is a detail of it's implementation. Often the influx.Reader isn't even using gzip so it doesn't feel like it should have sync code. To me it feels like its in the wrong spot. But I see what you mean, until io.Copy completes we can't exit the goroutine, which would mean we need to compress the full body even when we have already failed. I think what this means for my previous solution is that we would need to take an io.ReadCloser as an argument to CompressWithGzip and close it when the returned io.ReadCloser is closed, so io.Copy will return immediately, instead of only the influx.Reader. We need to make sure that we have a handle to the returned reader so maybe add a function that creates the body io.Reader to the output plugin? func (c *httpClient) requestBodyReader(metrics []telegraf.Metric) (io.ReadCloser, error) {
reader := influx.NewReader(metrics, c.config.Serializer)
if c.config.ContentEncoding == "gzip" {
reader, err = internal.CompressWithGzip(body)
if err != nil {
return nil, err
}
}
return reader
} I hope that makes sense... it's all fairly tricky code in my book. |
i got your point. make sense for me. i've updated the PR. and i will supplement unit test later if you think the api is ok. |
PR looks great, thanks so much. Let's go ahead with some unit tests if possible. |
i added a test for please have a look~ |
Relevant telegraf.conf:
System info:
telegraf version: master branch 9efc376
os: Ubuntu 19.04
Steps to reproduce:
Expected behavior:
to see all data written correctly, with some "429 Too Many Requests" error log
Actual behavior:
output are messed up during the retry for the second url: lots of errors of malformed line protocol are printed:
Additional info:
i think the root case is multiple clients share a single Serializer instance, and the first async gzip pipe(
CompressWithGzip
) is still working after that the output receive a error response, and retry on the second url with another clienti have 2 solutions, both are tested:
influx.Reader
, which will be set on error response arrivedthe first one is preferred, since the Serializer is a lightweight object, and each Serializer for each client make the code cleaner, and more bug-proof.
The text was updated successfully, but these errors were encountered: