Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: Report unmatched requests or responses #6794

Merged
merged 4 commits into from
Apr 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add support for condition on bool type {issue}5659[5659] {pull}5954[5954]
- Fix high memory usage on HTTP body if body is not published. {pull}6680[6680]
- Allow to capture the HTTP request or response bodies independently. {pull}6784[6784]
- HTTP publishes an Error event for unmatched requests or responses. {pull}6794[6794]

*Winlogbeat*

Expand Down
180 changes: 121 additions & 59 deletions packetbeat/protos/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
)

var debugf = logp.MakeDebug("http")
Expand All @@ -33,6 +32,7 @@ const (

var (
unmatchedResponses = monitoring.NewInt(nil, "http.unmatched_responses")
unmatchedRequests = monitoring.NewInt(nil, "http.unmatched_requests")
)

type stream struct {
Expand Down Expand Up @@ -415,14 +415,31 @@ func (http *httpPlugin) handleHTTP(
}
}

func (http *httpPlugin) flushResponses(conn *httpConnectionData) {
for !conn.responses.empty() {
unmatchedResponses.Add(1)
resp := conn.responses.pop()
debugf("Response from unknown transaction: %s. Reporting error.", resp.tcpTuple)
event := http.newTransaction(nil, resp)
http.publishTransaction(event)
}
}

func (http *httpPlugin) flushRequests(conn *httpConnectionData) {
for !conn.requests.empty() {
unmatchedRequests.Add(1)
requ := conn.requests.pop()
debugf("Request from unknown transaction %s. Reporting error.", requ.tcpTuple)
event := http.newTransaction(requ, nil)
http.publishTransaction(event)
}
}

func (http *httpPlugin) correlate(conn *httpConnectionData) {

// drop responses with missing requests
if conn.requests.empty() {
for !conn.responses.empty() {
debugf("Response from unknown transaction. Ignoring.")
unmatchedResponses.Add(1)
conn.responses.pop()
}
http.flushResponses(conn)
return
}

Expand All @@ -441,74 +458,92 @@ func (http *httpPlugin) correlate(conn *httpConnectionData) {

func (http *httpPlugin) newTransaction(requ, resp *message) beat.Event {
status := common.OK_STATUS
if resp.statusCode >= 400 {
if resp == nil {
status = common.ERROR_STATUS
if requ != nil {
requ.notes = append(requ.notes, "Unmatched request")
}
} else if resp.statusCode >= 400 {
status = common.ERROR_STATUS
}

// resp_time in milliseconds
responseTime := int32(resp.ts.Sub(requ.ts).Nanoseconds() / 1e6)

path, params, err := http.extractParameters(requ)
if err != nil {
logp.Warn("Fail to parse HTTP parameters: %v", err)
if requ == nil {
status = common.ERROR_STATUS
if resp != nil {
resp.notes = append(resp.notes, "Unmatched response")
}
}

src := common.Endpoint{
IP: requ.tcpTuple.SrcIP.String(),
Port: requ.tcpTuple.SrcPort,
Proc: string(requ.cmdlineTuple.Src),
}
dst := common.Endpoint{
IP: requ.tcpTuple.DstIP.String(),
Port: requ.tcpTuple.DstPort,
Proc: string(requ.cmdlineTuple.Dst),
}
if requ.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
httpDetails := common.MapStr{}
fields := common.MapStr{
"type": "http",
"status": status,
"http": httpDetails,
}

httpDetails := common.MapStr{
"request": common.MapStr{
var timestamp time.Time

if requ != nil {
path, params, err := http.extractParameters(requ)
if err != nil {
logp.Warn("Fail to parse HTTP parameters: %v", err)
}
httpDetails["request"] = common.MapStr{
"params": params,
"headers": http.collectHeaders(requ),
},
"response": common.MapStr{
}
fields["method"] = requ.method
fields["path"] = path
fields["query"] = fmt.Sprintf("%s %s", requ.method, path)
fields["bytes_in"] = requ.size

fields["src"], fields["dst"] = requ.getEndpoints()

http.setBody(httpDetails["request"].(common.MapStr), requ)

timestamp = requ.ts

if len(requ.notes) > 0 {
fields["notes"] = requ.notes
}

if len(requ.realIP) > 0 {
fields["real_ip"] = requ.realIP
}

if http.sendRequest {
fields["request"] = string(http.makeRawMessage(requ))
}
}

if resp != nil {
httpDetails["response"] = common.MapStr{
"code": resp.statusCode,
"phrase": resp.statusPhrase,
"headers": http.collectHeaders(resp),
},
}

http.setBody(httpDetails["request"].(common.MapStr), requ)
http.setBody(httpDetails["response"].(common.MapStr), resp)
}
http.setBody(httpDetails["response"].(common.MapStr), resp)
fields["bytes_out"] = resp.size

timestamp := requ.ts
fields := common.MapStr{
"type": "http",
"status": status,
"responsetime": responseTime,
"method": requ.method,
"path": path,
"query": fmt.Sprintf("%s %s", requ.method, path),
"http": httpDetails,
"bytes_out": resp.size,
"bytes_in": requ.size,
"src": &src,
"dst": &dst,
}
if http.sendResponse {
fields["response"] = string(http.makeRawMessage(resp))
}

if http.sendRequest {
fields["request"] = http.makeRawMessage(requ)
}
if http.sendResponse {
fields["response"] = http.makeRawMessage(resp)
if len(resp.notes) > 0 {
if fields["notes"] != nil {
fields["notes"] = append(fields["notes"].([]string), resp.notes...)
} else {
fields["notes"] = resp.notes
}
}
if requ == nil {
timestamp = resp.ts
fields["src"], fields["dst"] = resp.getEndpoints()
}
}

if len(requ.notes)+len(resp.notes) > 0 {
fields["notes"] = append(requ.notes, resp.notes...)
}
if len(requ.realIP) > 0 {
fields["real_ip"] = requ.realIP
// resp_time in milliseconds
if requ != nil && resp != nil {
fields["responsetime"] = int32(resp.ts.Sub(requ.ts).Nanoseconds() / 1e6)
}

return beat.Event{
Expand Down Expand Up @@ -706,6 +741,33 @@ func (http *httpPlugin) isSecretParameter(key string) bool {
return false
}

func (http *httpPlugin) Expired(tuple *common.TCPTuple, private protos.ProtocolData) {
conn := getHTTPConnection(private)
if conn == nil {
return
}
if isDebug {
debugf("expired connection %s", tuple)
}
// terminate streams
for dir, s := range conn.streams {
// Do not send incomplete or empty messages
if s != nil && s.message != nil && s.message.headersReceived() {
if isDebug {
debugf("got message %+v", s.message)
}
http.handleHTTP(conn, s.message, tuple, uint8(dir))
s.PrepareForNewMessage()
}
}
// correlate transactions
http.correlate(conn)

// flush uncorrelated requests and responses
http.flushRequests(conn)
http.flushResponses(conn)
}

func (ml *messageList) append(msg *message) {
if ml.tail == nil {
ml.head = msg
Expand Down
22 changes: 22 additions & 0 deletions packetbeat/protos/http/http_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/streambuf"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/tcp"
)

// Http Message
Expand Down Expand Up @@ -552,6 +553,27 @@ func (parser *parser) shouldIncludeInBody(contenttype []byte, capturedContentTyp
return false
}

func (m *message) headersReceived() bool {
return m.headerOffset > 0
}

func (m *message) getEndpoints() (src *common.Endpoint, dst *common.Endpoint) {
src = &common.Endpoint{
IP: m.tcpTuple.SrcIP.String(),
Port: m.tcpTuple.SrcPort,
Proc: string(m.cmdlineTuple.Src),
}
dst = &common.Endpoint{
IP: m.tcpTuple.DstIP.String(),
Port: m.tcpTuple.DstPort,
Proc: string(m.cmdlineTuple.Dst),
}
if m.direction == tcp.TCPDirectionReverse {
src, dst = dst, src
}
return src, dst
}

func isVersion(v version, major, minor uint8) bool {
return v.major == major && v.minor == minor
}
Expand Down
9 changes: 9 additions & 0 deletions packetbeat/protos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ type UDPPlugin interface {
ParseUDP(pkt *Packet)
}

// ExpirationAwareTCPPlugin is a TCPPlugin that also provides the Expired()
// method. No need to use this type directly, just implement the method.
type ExpirationAwareTCPPlugin interface {
TCPPlugin

// Expired is called when the TCP stream is expired due to connection timeout.
Expired(tuple *common.TCPTuple, private ProtocolData)
}

// Protocol identifier.
type Protocol uint16

Expand Down
67 changes: 60 additions & 7 deletions packetbeat/protos/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tcp

import (
"fmt"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -22,10 +23,21 @@ const (
)

type TCP struct {
id uint32
streams *common.Cache
portMap map[uint16]protos.Protocol
protocols protos.Protocols
id uint32
streams *common.Cache
portMap map[uint16]protos.Protocol
protocols protos.Protocols
expiredConns expirationQueue
}

type expiredConnection struct {
mod protos.ExpirationAwareTCPPlugin
conn *TCPConnection
}

type expirationQueue struct {
mutex sync.Mutex
conns []expiredConnection
}

type Processor interface {
Expand Down Expand Up @@ -132,6 +144,8 @@ func (tcp *TCP) Process(id *flows.FlowID, tcphdr *layers.TCP, pkt *protos.Packet
// protocol modules.
defer logp.Recover("Process tcp exception")

tcp.expiredConns.notifyAll()

stream, created := tcp.getStream(pkt)
if stream.conn == nil {
return
Expand Down Expand Up @@ -298,14 +312,53 @@ func NewTCP(p protos.Protocols) (*TCP, error) {
tcp := &TCP{
protocols: p,
portMap: portMap,
streams: common.NewCache(
protos.DefaultTransactionExpiration,
protos.DefaultTransactionHashSize),
}
tcp.streams = common.NewCacheWithRemovalListener(
protos.DefaultTransactionExpiration,
protos.DefaultTransactionHashSize,
tcp.removalListener)

tcp.streams.StartJanitor(protos.DefaultTransactionExpiration)
if isDebug {
debugf("tcp", "Port map: %v", portMap)
}

return tcp, nil
}

func (tcp *TCP) removalListener(_ common.Key, value common.Value) {
conn := value.(*TCPConnection)
mod := conn.tcp.protocols.GetTCP(conn.protocol)
if mod != nil {
awareMod, ok := mod.(protos.ExpirationAwareTCPPlugin)
if ok {
tcp.expiredConns.add(awareMod, conn)
}
}
}

func (ec *expiredConnection) notify() {
ec.mod.Expired(&ec.conn.tcptuple, ec.conn.data)
}

func (eq *expirationQueue) add(mod protos.ExpirationAwareTCPPlugin, conn *TCPConnection) {
eq.mutex.Lock()
eq.conns = append(eq.conns, expiredConnection{
mod: mod,
conn: conn,
})
eq.mutex.Unlock()
}

func (eq *expirationQueue) getExpired() (conns []expiredConnection) {
eq.mutex.Lock()
conns, eq.conns = eq.conns, nil
eq.mutex.Unlock()
return conns
}

func (eq *expirationQueue) notifyAll() {
for _, expiration := range eq.getExpired() {
expiration.notify()
}
}
Loading