From d2d7c20c9599b26dad360eca3f85474aadc81fc0 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Tue, 16 Jul 2024 09:48:25 +0200 Subject: [PATCH 01/17] move bodyWrapper into the internal package --- instrumentation/net/http/otelhttp/handler.go | 11 ++-- .../http/otelhttp/internal/body_wrapper.go | 53 +++++++++++++++++++ .../otelhttp/internal/body_wrapper_test.go | 27 ++++++++++ .../net/http/otelhttp/transport.go | 7 +-- instrumentation/net/http/otelhttp/wrap.go | 27 ---------- 5 files changed, 90 insertions(+), 35 deletions(-) create mode 100644 instrumentation/net/http/otelhttp/internal/body_wrapper.go create mode 100644 instrumentation/net/http/otelhttp/internal/body_wrapper_test.go diff --git a/instrumentation/net/http/otelhttp/handler.go b/instrumentation/net/http/otelhttp/handler.go index c49069d1f9d..5791147baa6 100644 --- a/instrumentation/net/http/otelhttp/handler.go +++ b/instrumentation/net/http/otelhttp/handler.go @@ -9,6 +9,7 @@ import ( "github.com/felixge/httpsnoop" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil" "go.opentelemetry.io/otel" @@ -166,13 +167,13 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } } - var bw bodyWrapper + var bw internal.BodyWrapper // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. if r.Body != nil && r.Body != http.NoBody { bw.ReadCloser = r.Body - bw.record = readRecordFunc + bw.OnRead = readRecordFunc r.Body = &bw } @@ -220,8 +221,8 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http span.SetStatus(semconv.ServerStatus(rww.statusCode)) span.SetAttributes(h.traceSemconv.ResponseTraceAttrs(semconv.ResponseTelemetry{ StatusCode: rww.statusCode, - ReadBytes: bw.read.Load(), - ReadError: bw.err, + ReadBytes: bw.BytesRead(), + ReadError: bw.Error(), WriteBytes: rww.written, WriteError: rww.err, })...) @@ -233,7 +234,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } o := metric.WithAttributeSet(attribute.NewSet(attributes...)) - h.requestBytesCounter.Add(ctx, bw.read.Load(), o) + h.requestBytesCounter.Add(ctx, bw.BytesRead(), o) h.responseBytesCounter.Add(ctx, rww.written, o) // Use floating point division here for higher precision (instead of Millisecond method). diff --git a/instrumentation/net/http/otelhttp/internal/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/body_wrapper.go new file mode 100644 index 00000000000..bdb50706b58 --- /dev/null +++ b/instrumentation/net/http/otelhttp/internal/body_wrapper.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" + +import ( + "io" + "sync/atomic" +) + +var _ io.ReadCloser = &BodyWrapper{} + +// BodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number +// of bytes read and the last error. +type BodyWrapper struct { + io.ReadCloser + OnRead func(n int64) // must not be nil + + read atomic.Int64 + err atomic.Value +} + +// Read reads the data from the io.ReadCloser, and stores the number of bytes +// read and the error. +func (w *BodyWrapper) Read(b []byte) (int, error) { + n, err := w.ReadCloser.Read(b) + n1 := int64(n) + w.read.Add(n1) + if err != nil { + w.err.Store(err) + } + w.OnRead(n1) + return n, err +} + +// Closes closes the io.ReadCloser. +func (w *BodyWrapper) Close() error { + return w.ReadCloser.Close() +} + +// BytesRead returns the number of bytes read up to this point. +func (w *BodyWrapper) BytesRead() int64 { + return w.read.Load() +} + +// Error returns the last error. +func (w *BodyWrapper) Error() error { + err, ok := w.err.Load().(error) + if !ok { + return nil + } + return err +} diff --git a/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go new file mode 100644 index 00000000000..205469e75b3 --- /dev/null +++ b/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBodyWrapper(t *testing.T) { + bw := &BodyWrapper{ + ReadCloser: io.NopCloser(strings.NewReader("hello world")), + OnRead: func(int64) {}, + } + + data, err := io.ReadAll(bw) + require.NoError(t, err) + assert.Equal(t, "hello world", string(data)) + + assert.Equal(t, int64(11), bw.BytesRead()) + assert.Equal(t, io.EOF, bw.Error()) +} diff --git a/instrumentation/net/http/otelhttp/transport.go b/instrumentation/net/http/otelhttp/transport.go index 06948c11cb1..ce864d898d4 100644 --- a/instrumentation/net/http/otelhttp/transport.go +++ b/instrumentation/net/http/otelhttp/transport.go @@ -11,6 +11,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil" "go.opentelemetry.io/otel" @@ -147,14 +148,14 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request. // use a body wrapper to determine the request size - var bw bodyWrapper + var bw internal.BodyWrapper // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. if r.Body != nil && r.Body != http.NoBody { bw.ReadCloser = r.Body // noop to prevent nil panic. not using this record fun yet. - bw.record = func(int64) {} + bw.OnRead = func(int64) {} r.Body = &bw } @@ -176,7 +177,7 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { } o := metric.WithAttributeSet(attribute.NewSet(metricAttrs...)) - t.requestBytesCounter.Add(ctx, bw.read.Load(), o) + t.requestBytesCounter.Add(ctx, bw.BytesRead(), o) // For handling response bytes we leverage a callback when the client reads the http response readRecordFunc := func(n int64) { t.responseBytesCounter.Add(ctx, n, o) diff --git a/instrumentation/net/http/otelhttp/wrap.go b/instrumentation/net/http/otelhttp/wrap.go index 948f8406c09..11c39b977c5 100644 --- a/instrumentation/net/http/otelhttp/wrap.go +++ b/instrumentation/net/http/otelhttp/wrap.go @@ -5,38 +5,11 @@ package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http import ( "context" - "io" "net/http" - "sync/atomic" "go.opentelemetry.io/otel/propagation" ) -var _ io.ReadCloser = &bodyWrapper{} - -// bodyWrapper wraps a http.Request.Body (an io.ReadCloser) to track the number -// of bytes read and the last error. -type bodyWrapper struct { - io.ReadCloser - record func(n int64) // must not be nil - - read atomic.Int64 - err error -} - -func (w *bodyWrapper) Read(b []byte) (int, error) { - n, err := w.ReadCloser.Read(b) - n1 := int64(n) - w.read.Add(n1) - w.err = err - w.record(n1) - return n, err -} - -func (w *bodyWrapper) Close() error { - return w.ReadCloser.Close() -} - var _ http.ResponseWriter = &respWriterWrapper{} // respWriterWrapper wraps a http.ResponseWriter in order to track the number of From 72fe270cadd01a26bde7993f61d2f73ac7a02a45 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Tue, 16 Jul 2024 10:16:29 +0200 Subject: [PATCH 02/17] move respWriterWrapper into internal package --- instrumentation/net/http/otelhttp/handler.go | 24 ++-- .../otelhttp/internal/resp_writer_wrapper.go | 106 ++++++++++++++++++ .../resp_writer_wrapper_test.go} | 17 +-- instrumentation/net/http/otelhttp/wrap.go | 72 ------------ 4 files changed, 120 insertions(+), 99 deletions(-) create mode 100644 instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go rename instrumentation/net/http/otelhttp/{wrap_test.go => internal/resp_writer_wrapper_test.go} (73%) delete mode 100644 instrumentation/net/http/otelhttp/wrap.go diff --git a/instrumentation/net/http/otelhttp/handler.go b/instrumentation/net/http/otelhttp/handler.go index 5791147baa6..74bed3fc2f8 100644 --- a/instrumentation/net/http/otelhttp/handler.go +++ b/instrumentation/net/http/otelhttp/handler.go @@ -184,13 +184,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } } - rww := &respWriterWrapper{ - ResponseWriter: w, - record: writeRecordFunc, - ctx: ctx, - props: h.propagators, - statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything - } + rww := internal.NewRespWriterWrapper(w, writeRecordFunc) // Wrap w to use our ResponseWriter methods while also exposing // other interfaces that w may implement (http.CloseNotifier, @@ -218,24 +212,26 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http next.ServeHTTP(w, r.WithContext(ctx)) - span.SetStatus(semconv.ServerStatus(rww.statusCode)) + statusCode := rww.StatusCode() + bytesWritten := rww.BytesWritten() + span.SetStatus(semconv.ServerStatus(statusCode)) span.SetAttributes(h.traceSemconv.ResponseTraceAttrs(semconv.ResponseTelemetry{ - StatusCode: rww.statusCode, + StatusCode: statusCode, ReadBytes: bw.BytesRead(), ReadError: bw.Error(), - WriteBytes: rww.written, - WriteError: rww.err, + WriteBytes: bytesWritten, + WriteError: rww.Error(), })...) // Add metrics attributes := append(labeler.Get(), semconvutil.HTTPServerRequestMetrics(h.server, r)...) - if rww.statusCode > 0 { - attributes = append(attributes, semconv.HTTPStatusCode(rww.statusCode)) + if statusCode > 0 { + attributes = append(attributes, semconv.HTTPStatusCode(statusCode)) } o := metric.WithAttributeSet(attribute.NewSet(attributes...)) h.requestBytesCounter.Add(ctx, bw.BytesRead(), o) - h.responseBytesCounter.Add(ctx, rww.written, o) + h.responseBytesCounter.Add(ctx, bytesWritten, o) // Use floating point division here for higher precision (instead of Millisecond method). elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond) diff --git a/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go new file mode 100644 index 00000000000..e0f4f2ad223 --- /dev/null +++ b/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" + +import ( + "net/http" + "sync" +) + +var _ http.ResponseWriter = &RespWriterWrapper{} + +// RespWriterWrapper wraps a http.ResponseWriter in order to track the number of +// bytes written, the last error, and to catch the first written statusCode. +// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional +// types (http.Hijacker, http.Pusher, http.CloseNotifier, etc) +// that may be useful when using it in real life situations. +type RespWriterWrapper struct { + http.ResponseWriter + OnWrite func(n int64) // must not be nil + + mu sync.RWMutex + written int64 + statusCode int + err error + wroteHeader bool +} + +// NewRespWriterWrapper creates a new RespWriterWrapper. +func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWriterWrapper { + return &RespWriterWrapper{ + ResponseWriter: w, + OnWrite: onWrite, + statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything + } +} + +// Header returns the response writer HTTP headers. +func (w *RespWriterWrapper) Header() http.Header { + return w.ResponseWriter.Header() +} + +// Write writes the bytes array into the [ResponseWriter], and tracks the +// number of bytes written and last error. +func (w *RespWriterWrapper) Write(p []byte) (int, error) { + w.WriteHeader(http.StatusOK) + + w.mu.RLock() + defer w.mu.RUnlock() + + n, err := w.ResponseWriter.Write(p) + n1 := int64(n) + w.OnWrite(n1) + w.written += n1 + w.err = err + return n, err +} + +// WriteHeader persists initial statusCode for span attribution. +// All calls to WriteHeader will be propagated to the underlying ResponseWriter +// and will persist the statusCode from the first call. +// Blocking consecutive calls to WriteHeader alters expected behavior and will +// remove warning logs from net/http where developers will notice incorrect handler implementations. +func (w *RespWriterWrapper) WriteHeader(statusCode int) { + w.mu.Lock() + defer w.mu.Unlock() + + if !w.wroteHeader { + w.wroteHeader = true + w.statusCode = statusCode + } + w.ResponseWriter.WriteHeader(statusCode) +} + +// Flush implements [http.Flusher]. +func (w *RespWriterWrapper) Flush() { + w.WriteHeader(http.StatusOK) + + if f, ok := w.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + +// BytesWritten returns the number of bytes written. +func (w *RespWriterWrapper) BytesWritten() int64 { + w.mu.RLock() + defer w.mu.RUnlock() + + return w.written +} + +// BytesWritten returns the HTTP status code that was sent. +func (w *RespWriterWrapper) StatusCode() int { + w.mu.RLock() + defer w.mu.RUnlock() + + return w.statusCode +} + +// Error returns the last error. +func (w *RespWriterWrapper) Error() error { + w.mu.RLock() + defer w.mu.RUnlock() + + return w.err +} diff --git a/instrumentation/net/http/otelhttp/wrap_test.go b/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper_test.go similarity index 73% rename from instrumentation/net/http/otelhttp/wrap_test.go rename to instrumentation/net/http/otelhttp/internal/resp_writer_wrapper_test.go index d4b89411a29..f67613a9307 100644 --- a/instrumentation/net/http/otelhttp/wrap_test.go +++ b/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package otelhttp +package internal import ( "net/http" @@ -12,10 +12,7 @@ import ( ) func TestRespWriterWriteHeader(t *testing.T) { - rw := &respWriterWrapper{ - ResponseWriter: &httptest.ResponseRecorder{}, - record: func(int64) {}, - } + rw := NewRespWriterWrapper(&httptest.ResponseRecorder{}, func(int64) {}) rw.WriteHeader(http.StatusTeapot) assert.Equal(t, http.StatusTeapot, rw.statusCode) @@ -26,10 +23,7 @@ func TestRespWriterWriteHeader(t *testing.T) { } func TestRespWriterFlush(t *testing.T) { - rw := &respWriterWrapper{ - ResponseWriter: &httptest.ResponseRecorder{}, - record: func(int64) {}, - } + rw := NewRespWriterWrapper(&httptest.ResponseRecorder{}, func(int64) {}) rw.Flush() assert.Equal(t, http.StatusOK, rw.statusCode) @@ -49,10 +43,7 @@ func (_ nonFlushableResponseWriter) Write([]byte) (int, error) { func (_ nonFlushableResponseWriter) WriteHeader(int) {} func TestRespWriterFlushNoFlusher(t *testing.T) { - rw := &respWriterWrapper{ - ResponseWriter: nonFlushableResponseWriter{}, - record: func(int64) {}, - } + rw := NewRespWriterWrapper(nonFlushableResponseWriter{}, func(int64) {}) rw.Flush() assert.Equal(t, http.StatusOK, rw.statusCode) diff --git a/instrumentation/net/http/otelhttp/wrap.go b/instrumentation/net/http/otelhttp/wrap.go deleted file mode 100644 index 11c39b977c5..00000000000 --- a/instrumentation/net/http/otelhttp/wrap.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package otelhttp // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" - -import ( - "context" - "net/http" - - "go.opentelemetry.io/otel/propagation" -) - -var _ http.ResponseWriter = &respWriterWrapper{} - -// respWriterWrapper wraps a http.ResponseWriter in order to track the number of -// bytes written, the last error, and to catch the first written statusCode. -// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional -// types (http.Hijacker, http.Pusher, http.CloseNotifier, http.Flusher, etc) -// that may be useful when using it in real life situations. -type respWriterWrapper struct { - http.ResponseWriter - record func(n int64) // must not be nil - - // used to inject the header - ctx context.Context - - props propagation.TextMapPropagator - - written int64 - statusCode int - err error - wroteHeader bool -} - -func (w *respWriterWrapper) Header() http.Header { - return w.ResponseWriter.Header() -} - -func (w *respWriterWrapper) Write(p []byte) (int, error) { - if !w.wroteHeader { - w.WriteHeader(http.StatusOK) - } - n, err := w.ResponseWriter.Write(p) - n1 := int64(n) - w.record(n1) - w.written += n1 - w.err = err - return n, err -} - -// WriteHeader persists initial statusCode for span attribution. -// All calls to WriteHeader will be propagated to the underlying ResponseWriter -// and will persist the statusCode from the first call. -// Blocking consecutive calls to WriteHeader alters expected behavior and will -// remove warning logs from net/http where developers will notice incorrect handler implementations. -func (w *respWriterWrapper) WriteHeader(statusCode int) { - if !w.wroteHeader { - w.wroteHeader = true - w.statusCode = statusCode - } - w.ResponseWriter.WriteHeader(statusCode) -} - -func (w *respWriterWrapper) Flush() { - if !w.wroteHeader { - w.WriteHeader(http.StatusOK) - } - - if f, ok := w.ResponseWriter.(http.Flusher); ok { - f.Flush() - } -} From 8a0aa5d32dc9be46c342eaa74806d5e2c2420241 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Tue, 16 Jul 2024 10:19:57 +0200 Subject: [PATCH 03/17] add changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a01b70bb7..5367fe563eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The deprecated `go.opentelemetry.io/contrib/processors/baggagecopy` package is removed. (#5853) +### Fixed + +- Race condition when reading the HTTP body and writing the response in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#TBD) + From 32e4f2174263c07eba4447011d7a75018c1e6456 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Tue, 16 Jul 2024 10:20:18 +0200 Subject: [PATCH 04/17] PR number --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5367fe563eb..e0cfe77e010 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,7 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed -- Race condition when reading the HTTP body and writing the response in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#TBD) +- Race condition when reading the HTTP body and writing the response in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#5916) From 077c1a67d919a67edba4874b4f2dbe2abfc7f3ee Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Jul 2024 09:20:28 +0200 Subject: [PATCH 05/17] add NewBodyWrapper method --- instrumentation/net/http/otelhttp/handler.go | 6 ++---- .../net/http/otelhttp/internal/body_wrapper.go | 8 ++++++++ .../net/http/otelhttp/internal/body_wrapper_test.go | 5 +---- instrumentation/net/http/otelhttp/transport.go | 8 ++------ 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/instrumentation/net/http/otelhttp/handler.go b/instrumentation/net/http/otelhttp/handler.go index 74bed3fc2f8..187bb7094b4 100644 --- a/instrumentation/net/http/otelhttp/handler.go +++ b/instrumentation/net/http/otelhttp/handler.go @@ -167,14 +167,12 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } } - var bw internal.BodyWrapper // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. + bw := internal.NewBodyWrapper(r.Body, readRecordFunc) if r.Body != nil && r.Body != http.NoBody { - bw.ReadCloser = r.Body - bw.OnRead = readRecordFunc - r.Body = &bw + r.Body = bw } writeRecordFunc := func(int64) {} diff --git a/instrumentation/net/http/otelhttp/internal/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/body_wrapper.go index bdb50706b58..8d6cbcd32b0 100644 --- a/instrumentation/net/http/otelhttp/internal/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/body_wrapper.go @@ -20,6 +20,14 @@ type BodyWrapper struct { err atomic.Value } +// NewBodyWrapper creates a new BodyWrapper. +func NewBodyWrapper(rc io.ReadCloser, onRead func(int64)) *BodyWrapper { + return &BodyWrapper{ + ReadCloser: rc, + OnRead: onRead, + } +} + // Read reads the data from the io.ReadCloser, and stores the number of bytes // read and the error. func (w *BodyWrapper) Read(b []byte) (int, error) { diff --git a/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go index 205469e75b3..45fb9823317 100644 --- a/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go @@ -13,10 +13,7 @@ import ( ) func TestBodyWrapper(t *testing.T) { - bw := &BodyWrapper{ - ReadCloser: io.NopCloser(strings.NewReader("hello world")), - OnRead: func(int64) {}, - } + bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) data, err := io.ReadAll(bw) require.NoError(t, err) diff --git a/instrumentation/net/http/otelhttp/transport.go b/instrumentation/net/http/otelhttp/transport.go index ce864d898d4..06049e96ae9 100644 --- a/instrumentation/net/http/otelhttp/transport.go +++ b/instrumentation/net/http/otelhttp/transport.go @@ -147,16 +147,12 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request. - // use a body wrapper to determine the request size - var bw internal.BodyWrapper // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. + bw := internal.NewBodyWrapper(r.Body, func(int64) {}) if r.Body != nil && r.Body != http.NoBody { - bw.ReadCloser = r.Body - // noop to prevent nil panic. not using this record fun yet. - bw.OnRead = func(int64) {} - r.Body = &bw + r.Body = bw } span.SetAttributes(t.semconv.RequestTraceAttrs(r)...) From c3bfee3db96bdbb0174c87a35016a8002344ad98 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Jul 2024 09:22:15 +0200 Subject: [PATCH 06/17] move wrappers into an internal request package --- instrumentation/net/http/otelhttp/handler.go | 6 +++--- .../http/otelhttp/internal/{ => request}/body_wrapper.go | 2 +- .../otelhttp/internal/{ => request}/body_wrapper_test.go | 2 +- .../otelhttp/internal/{ => request}/resp_writer_wrapper.go | 2 +- .../internal/{ => request}/resp_writer_wrapper_test.go | 2 +- instrumentation/net/http/otelhttp/transport.go | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) rename instrumentation/net/http/otelhttp/internal/{ => request}/body_wrapper.go (92%) rename instrumentation/net/http/otelhttp/internal/{ => request}/body_wrapper_test.go (96%) rename instrumentation/net/http/otelhttp/internal/{ => request}/resp_writer_wrapper.go (96%) rename instrumentation/net/http/otelhttp/internal/{ => request}/resp_writer_wrapper_test.go (98%) diff --git a/instrumentation/net/http/otelhttp/handler.go b/instrumentation/net/http/otelhttp/handler.go index 187bb7094b4..5169ebf02eb 100644 --- a/instrumentation/net/http/otelhttp/handler.go +++ b/instrumentation/net/http/otelhttp/handler.go @@ -9,7 +9,7 @@ import ( "github.com/felixge/httpsnoop" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil" "go.opentelemetry.io/otel" @@ -170,7 +170,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. - bw := internal.NewBodyWrapper(r.Body, readRecordFunc) + bw := request.NewBodyWrapper(r.Body, readRecordFunc) if r.Body != nil && r.Body != http.NoBody { r.Body = bw } @@ -182,7 +182,7 @@ func (h *middleware) serveHTTP(w http.ResponseWriter, r *http.Request, next http } } - rww := internal.NewRespWriterWrapper(w, writeRecordFunc) + rww := request.NewRespWriterWrapper(w, writeRecordFunc) // Wrap w to use our ResponseWriter methods while also exposing // other interfaces that w may implement (http.CloseNotifier, diff --git a/instrumentation/net/http/otelhttp/internal/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go similarity index 92% rename from instrumentation/net/http/otelhttp/internal/body_wrapper.go rename to instrumentation/net/http/otelhttp/internal/request/body_wrapper.go index 8d6cbcd32b0..98890bac5cf 100644 --- a/instrumentation/net/http/otelhttp/internal/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" +package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" import ( "io" diff --git a/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go similarity index 96% rename from instrumentation/net/http/otelhttp/internal/body_wrapper_test.go rename to instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go index 45fb9823317..d5dbcd52418 100644 --- a/instrumentation/net/http/otelhttp/internal/body_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package request import ( "io" diff --git a/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go similarity index 96% rename from instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go rename to instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go index e0f4f2ad223..ce6d7bbe2fa 100644 --- a/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" +package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" import ( "net/http" diff --git a/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go similarity index 98% rename from instrumentation/net/http/otelhttp/internal/resp_writer_wrapper_test.go rename to instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go index f67613a9307..c75dfda754e 100644 --- a/instrumentation/net/http/otelhttp/internal/resp_writer_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal +package request import ( "net/http" diff --git a/instrumentation/net/http/otelhttp/transport.go b/instrumentation/net/http/otelhttp/transport.go index 06049e96ae9..80bcfee4f04 100644 --- a/instrumentation/net/http/otelhttp/transport.go +++ b/instrumentation/net/http/otelhttp/transport.go @@ -11,7 +11,7 @@ import ( "sync/atomic" "time" - "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/request" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconv" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil" "go.opentelemetry.io/otel" @@ -150,7 +150,7 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) { // if request body is nil or NoBody, we don't want to mutate the body as it // will affect the identity of it in an unforeseeable way because we assert // ReadCloser fulfills a certain interface and it is indeed nil or NoBody. - bw := internal.NewBodyWrapper(r.Body, func(int64) {}) + bw := request.NewBodyWrapper(r.Body, func(int64) {}) if r.Body != nil && r.Body != http.NoBody { r.Body = bw } From 267a87520e71c9250bc4f4c05f68bbcd156faf09 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Jul 2024 09:38:29 +0200 Subject: [PATCH 07/17] replace atomic types in BodyWrapper with a mutex atomic.Value requests the exact same type to be set at all times, which we can't guarantee with errors. --- .../otelhttp/internal/request/body_wrapper.go | 37 +++++++++------ .../internal/request/body_wrapper_test.go | 45 +++++++++++++++++++ 2 files changed, 69 insertions(+), 13 deletions(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go index 98890bac5cf..d7eba5300d1 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -5,7 +5,7 @@ package request // import "go.opentelemetry.io/contrib/instrumentation/net/http/ import ( "io" - "sync/atomic" + "sync" ) var _ io.ReadCloser = &BodyWrapper{} @@ -16,8 +16,9 @@ type BodyWrapper struct { io.ReadCloser OnRead func(n int64) // must not be nil - read atomic.Int64 - err atomic.Value + mu sync.Mutex + read int64 + err error } // NewBodyWrapper creates a new BodyWrapper. @@ -33,14 +34,22 @@ func NewBodyWrapper(rc io.ReadCloser, onRead func(int64)) *BodyWrapper { func (w *BodyWrapper) Read(b []byte) (int, error) { n, err := w.ReadCloser.Read(b) n1 := int64(n) - w.read.Add(n1) - if err != nil { - w.err.Store(err) - } + + w.updateReadData(n1, err) w.OnRead(n1) return n, err } +func (w *BodyWrapper) updateReadData(n int64, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + w.read = w.read + n + if err != nil { + w.err = err + } +} + // Closes closes the io.ReadCloser. func (w *BodyWrapper) Close() error { return w.ReadCloser.Close() @@ -48,14 +57,16 @@ func (w *BodyWrapper) Close() error { // BytesRead returns the number of bytes read up to this point. func (w *BodyWrapper) BytesRead() int64 { - return w.read.Load() + w.mu.Lock() + defer w.mu.Unlock() + + return w.read } // Error returns the last error. func (w *BodyWrapper) Error() error { - err, ok := w.err.Load().(error) - if !ok { - return nil - } - return err + w.mu.Lock() + defer w.mu.Unlock() + + return w.err } diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go index d5dbcd52418..58fa0fce49a 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go @@ -4,6 +4,7 @@ package request import ( + "errors" "io" "strings" "testing" @@ -22,3 +23,47 @@ func TestBodyWrapper(t *testing.T) { assert.Equal(t, int64(11), bw.BytesRead()) assert.Equal(t, io.EOF, bw.Error()) } + +type multipleErrorsReader struct { + calls int +} + +type errorWrapper struct{} + +func (errorWrapper) Error() string { + return "subsequent calls" +} + +func (mer *multipleErrorsReader) Read([]byte) (int, error) { + mer.calls = mer.calls + 1 + if mer.calls == 1 { + return 0, errors.New("first call") + } + + return 0, errorWrapper{} +} + +func TestBodyWrapperWithErrors(t *testing.T) { + bw := NewBodyWrapper(io.NopCloser(&multipleErrorsReader{}), func(int64) {}) + + data, err := io.ReadAll(bw) + require.Equal(t, errors.New("first call"), err) + assert.Equal(t, "", string(data)) + require.Equal(t, errors.New("first call"), bw.Error()) + + data, err = io.ReadAll(bw) + require.Equal(t, errorWrapper{}, err) + assert.Equal(t, "", string(data)) + require.Equal(t, errorWrapper{}, bw.Error()) +} + +func TestConcurrentBodyWrapper(t *testing.T) { + bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) + + go func() { + _, _ = io.ReadAll(bw) + }() + + assert.NotNil(t, bw.BytesRead()) + assert.NoError(t, bw.Error()) +} From cf2dba728ab014fd5a80aa0c97b2a246d2eed928 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Thu, 18 Jul 2024 09:41:14 +0200 Subject: [PATCH 08/17] add race detection test to resp writer test --- .../internal/request/resp_writer_wrapper_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go index c75dfda754e..21229b4dc69 100644 --- a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper_test.go @@ -49,3 +49,15 @@ func TestRespWriterFlushNoFlusher(t *testing.T) { assert.Equal(t, http.StatusOK, rw.statusCode) assert.True(t, rw.wroteHeader) } + +func TestConcurrentRespWriterWrapper(t *testing.T) { + rw := NewRespWriterWrapper(&httptest.ResponseRecorder{}, func(int64) {}) + + go func() { + _, _ = rw.Write([]byte("hello world")) + }() + + assert.NotNil(t, rw.BytesWritten()) + assert.NotNil(t, rw.StatusCode()) + assert.NoError(t, rw.Error()) +} From c747d41c6c6e55c5b3114044f35532fe88cf74af Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Tue, 23 Jul 2024 18:33:59 +0200 Subject: [PATCH 09/17] Update instrumentation/net/http/otelhttp/internal/request/body_wrapper.go Co-authored-by: Tyler Yahn --- .../net/http/otelhttp/internal/request/body_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go index d7eba5300d1..2f8d9c595d8 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -22,7 +22,7 @@ type BodyWrapper struct { } // NewBodyWrapper creates a new BodyWrapper. -func NewBodyWrapper(rc io.ReadCloser, onRead func(int64)) *BodyWrapper { +func NewBodyWrapper(body io.ReadCloser, onRead func(int64)) *BodyWrapper { return &BodyWrapper{ ReadCloser: rc, OnRead: onRead, From 549707bf293ebed510079cf3d3907bcc10038be2 Mon Sep 17 00:00:00 2001 From: Damien Mathieu <42@dmathieu.com> Date: Tue, 23 Jul 2024 18:34:14 +0200 Subject: [PATCH 10/17] Update instrumentation/net/http/otelhttp/internal/request/body_wrapper.go Co-authored-by: Tyler Yahn --- .../net/http/otelhttp/internal/request/body_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go index 2f8d9c595d8..5041faa5e7e 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -44,7 +44,7 @@ func (w *BodyWrapper) updateReadData(n int64, err error) { w.mu.Lock() defer w.mu.Unlock() - w.read = w.read + n + w.read += n if err != nil { w.err = err } From 9d67079bf8c53fa070affeae6709ea5135765afe Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 24 Jul 2024 09:22:30 +0200 Subject: [PATCH 11/17] fix var name --- .../net/http/otelhttp/internal/request/body_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go index 5041faa5e7e..8b4cfae1360 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -24,7 +24,7 @@ type BodyWrapper struct { // NewBodyWrapper creates a new BodyWrapper. func NewBodyWrapper(body io.ReadCloser, onRead func(int64)) *BodyWrapper { return &BodyWrapper{ - ReadCloser: rc, + ReadCloser: body, OnRead: onRead, } } From 6ea0cf459f972b6b6f0bd174c9e2a7cfb95da1b1 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 24 Jul 2024 09:24:08 +0200 Subject: [PATCH 12/17] use a global var --- .../http/otelhttp/internal/request/body_wrapper_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go index 58fa0fce49a..c558543097b 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" ) +var errFirstCall = errors.New("first call") + func TestBodyWrapper(t *testing.T) { bw := NewBodyWrapper(io.NopCloser(strings.NewReader("hello world")), func(int64) {}) @@ -37,7 +39,7 @@ func (errorWrapper) Error() string { func (mer *multipleErrorsReader) Read([]byte) (int, error) { mer.calls = mer.calls + 1 if mer.calls == 1 { - return 0, errors.New("first call") + return 0, errFirstCall } return 0, errorWrapper{} @@ -47,9 +49,9 @@ func TestBodyWrapperWithErrors(t *testing.T) { bw := NewBodyWrapper(io.NopCloser(&multipleErrorsReader{}), func(int64) {}) data, err := io.ReadAll(bw) - require.Equal(t, errors.New("first call"), err) + require.Equal(t, errFirstCall, err) assert.Equal(t, "", string(data)) - require.Equal(t, errors.New("first call"), bw.Error()) + require.Equal(t, errFirstCall, bw.Error()) data, err = io.ReadAll(bw) require.Equal(t, errorWrapper{}, err) From 300e96930d91422ec1f7d013cf4dea1e8a70197c Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 24 Jul 2024 09:24:45 +0200 Subject: [PATCH 13/17] rely on underlying object directly --- .../http/otelhttp/internal/request/resp_writer_wrapper.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go index ce6d7bbe2fa..1425c9b7b64 100644 --- a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go @@ -35,11 +35,6 @@ func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWrite } } -// Header returns the response writer HTTP headers. -func (w *RespWriterWrapper) Header() http.Header { - return w.ResponseWriter.Header() -} - // Write writes the bytes array into the [ResponseWriter], and tracks the // number of bytes written and last error. func (w *RespWriterWrapper) Write(p []byte) (int, error) { From e197b1de7dce908172d8e87385dbdb6024cc773d Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 24 Jul 2024 09:27:28 +0200 Subject: [PATCH 14/17] document the callbacks --- .../net/http/otelhttp/internal/request/body_wrapper.go | 3 +++ .../net/http/otelhttp/internal/request/resp_writer_wrapper.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go index 8b4cfae1360..a945f556616 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper.go @@ -22,6 +22,9 @@ type BodyWrapper struct { } // NewBodyWrapper creates a new BodyWrapper. +// +// The onRead attribute is a callback that will be called every time the data +// is read, with the number of bytes being read. func NewBodyWrapper(body io.ReadCloser, onRead func(int64)) *BodyWrapper { return &BodyWrapper{ ReadCloser: body, diff --git a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go index 1425c9b7b64..5ddc20954dd 100644 --- a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go @@ -27,6 +27,9 @@ type RespWriterWrapper struct { } // NewRespWriterWrapper creates a new RespWriterWrapper. +// +// The onWrite attribute is a callback that will be called every time the data +// is written, with the number of bytes that were written. func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWriterWrapper { return &RespWriterWrapper{ ResponseWriter: w, From 6946ddd22a373fb33c1f78392242294af5bc4fc4 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 24 Jul 2024 09:30:15 +0200 Subject: [PATCH 15/17] acquire the lock only once when writing --- .../otelhttp/internal/request/resp_writer_wrapper.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go index 5ddc20954dd..ca5c5e8c778 100644 --- a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go @@ -41,11 +41,11 @@ func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWrite // Write writes the bytes array into the [ResponseWriter], and tracks the // number of bytes written and last error. func (w *RespWriterWrapper) Write(p []byte) (int, error) { - w.WriteHeader(http.StatusOK) - w.mu.RLock() defer w.mu.RUnlock() + w.writeHeader(http.StatusOK) + n, err := w.ResponseWriter.Write(p) n1 := int64(n) w.OnWrite(n1) @@ -63,6 +63,14 @@ func (w *RespWriterWrapper) WriteHeader(statusCode int) { w.mu.Lock() defer w.mu.Unlock() + w.writeHeader(statusCode) +} + +// writeHeader persists the status code for span attribution, and propagates +// the call to the underlying ResponseWriter. +// It does not acquire a lock, and therefore assumes that is being handled by a +// parent method. +func (w *RespWriterWrapper) writeHeader(statusCode int) { if !w.wroteHeader { w.wroteHeader = true w.statusCode = statusCode From 01a1bb1d37ba5e5fedfee26c9a3d6a5111469d09 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Wed, 24 Jul 2024 09:42:00 +0200 Subject: [PATCH 16/17] Write needs a write lock --- .../net/http/otelhttp/internal/request/resp_writer_wrapper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go index ca5c5e8c778..aea171fb260 100644 --- a/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go +++ b/instrumentation/net/http/otelhttp/internal/request/resp_writer_wrapper.go @@ -41,8 +41,8 @@ func NewRespWriterWrapper(w http.ResponseWriter, onWrite func(int64)) *RespWrite // Write writes the bytes array into the [ResponseWriter], and tracks the // number of bytes written and last error. func (w *RespWriterWrapper) Write(p []byte) (int, error) { - w.mu.RLock() - defer w.mu.RUnlock() + w.mu.Lock() + defer w.mu.Unlock() w.writeHeader(http.StatusOK) From 28a972b5438073b578f373c48269d59c8007bf25 Mon Sep 17 00:00:00 2001 From: dmathieu <42@dmathieu.com> Date: Tue, 30 Jul 2024 15:46:08 +0200 Subject: [PATCH 17/17] wait until the reader has finished --- .../net/http/otelhttp/internal/request/body_wrapper_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go index c558543097b..794e54fb9c8 100644 --- a/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go +++ b/instrumentation/net/http/otelhttp/internal/request/body_wrapper_test.go @@ -8,6 +8,7 @@ import ( "io" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -67,5 +68,7 @@ func TestConcurrentBodyWrapper(t *testing.T) { }() assert.NotNil(t, bw.BytesRead()) - assert.NoError(t, bw.Error()) + assert.Eventually(t, func() bool { + return errors.Is(bw.Error(), io.EOF) + }, time.Second, 10*time.Millisecond) }