Skip to content

Commit

Permalink
feature: logBody() filter to log request or response body while strea…
Browse files Browse the repository at this point in the history
…ming (#2827)

doc: add logBody()
test: add 100% coverage

Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs authored Jan 9, 2024
1 parent 26fa294 commit 37a7fac
Show file tree
Hide file tree
Showing 6 changed files with 750 additions and 2 deletions.
28 changes: 27 additions & 1 deletion docs/reference/filters.md
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ The histogramResponseLatency adds latency to responses according to the histogra

### logHeader

The logHeader filter prints the request line and the header, but not the body, to
The `logHeader` filter prints the request line and the header, but not the body, to
stderr. Note that this filter should be used only in diagnostics setup and with care,
since the request headers may contain sensitive data, and they also can explode the
amount of logs. Authorization headers will be truncated in request and
Expand All @@ -730,6 +730,32 @@ Example:
* -> logHeader("request", "response") -> "https://www.example.org";
```

### logBody

The `logBody` filter logs the request or response body in chunks while
streaming. Chunks start with `logBody("request") $flowid: ` or
`logBody("response") $flowid: `, such that you can find all chunks
belonging to a given flow. See also [flowId()](#flowid) filter.

Note that this filter should be used only in diagnostics setup and
with care, since the request and response body may contain sensitive
data. Logs can also explode in the amount of bytes, so you have to
choose a limit. You can log request or response bodies. This filter
has close to no overhead other than the I/O created by the logger.

Parameters:

* type: "request" or "response" (string)
* limit: maximum number of bytes to log (int)

Example:

```
* -> logBody("request", 1024) -> "https://www.example.org";
* -> logBody("response", 1024) -> "https://www.example.org";
* -> logBody("request", 1024) -> logBody("response", 1024) -> "https://www.example.org";
```

## Timeout

### backendTimeout
Expand Down
1 change: 1 addition & 0 deletions filters/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func Filters() []filters.Spec {
diag.NewAbsorb(),
diag.NewAbsorbSilent(),
diag.NewLogHeader(),
diag.NewLogBody(),
diag.NewUniformRequestLatency(),
diag.NewUniformResponseLatency(),
diag.NewNormalRequestLatency(),
Expand Down
130 changes: 130 additions & 0 deletions filters/diag/logbody.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package diag

import (
"fmt"
"io"

"github.com/zalando/skipper/filters"
"github.com/zalando/skipper/filters/flowid"
)

type logBody struct {
limit int
request bool
response bool
}

// NewLogBody creates a filter specification for the 'logBody()' filter.
func NewLogBody() filters.Spec { return logBody{} }

// Name returns the logBody filtern name.
func (logBody) Name() string {
return filters.LogBodyName
}

func (logBody) CreateFilter(args []interface{}) (filters.Filter, error) {
var (
request = false
response = false
)

if len(args) != 2 {
return nil, filters.ErrInvalidFilterParameters
}

opt, ok := args[0].(string)
if !ok {
return nil, filters.ErrInvalidFilterParameters
}
switch opt {
case "response":
response = true
case "request":
request = true
default:
return nil, fmt.Errorf("failed to match %q: %w", opt, filters.ErrInvalidFilterParameters)
}

limit, ok := args[1].(float64)
if !ok || float64(int(limit)) != limit {
return nil, fmt.Errorf("failed to convert to int: %w", filters.ErrInvalidFilterParameters)
}

return &logBody{
limit: int(limit),
request: request,
response: response,
}, nil
}

func (lb *logBody) Request(ctx filters.FilterContext) {
if !lb.request {
return
}

req := ctx.Request()
if req.Body != nil {
req.Body = newLogBodyStream(
lb.limit,
func(chunk []byte) {
ctx.Logger().Infof(
`logBody("request") %s: %s`,
req.Header.Get(flowid.HeaderName),
chunk)
},
req.Body,
)
}
}

func (lb *logBody) Response(ctx filters.FilterContext) {
if !lb.response {
return
}

rsp := ctx.Response()
if rsp.Body != nil {
rsp.Body = newLogBodyStream(
lb.limit,
func(chunk []byte) {
ctx.Logger().Infof(
`logBody("response") %s: %s`,
ctx.Request().Header.Get(flowid.HeaderName),
chunk)
},
rsp.Body,
)
}
}

type logBodyStream struct {
left int
f func([]byte)
input io.ReadCloser
}

func newLogBodyStream(left int, f func([]byte), rc io.ReadCloser) io.ReadCloser {
return &logBodyStream{
left: left,
f: f,
input: rc,
}
}

func (lb *logBodyStream) Read(p []byte) (n int, err error) {
if lb.left <= 0 {
return lb.input.Read(p)
}

n, err = lb.input.Read(p)
if n > 0 {
lb.f(p[:min(n, lb.left)])
}
lb.left -= n

return n, err
}

func (lb *logBodyStream) Close() error {
return lb.input.Close()
}
Loading

0 comments on commit 37a7fac

Please sign in to comment.