Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support protocol: Cassandra #1959

Merged
merged 30 commits into from
Aug 17, 2016
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1cfa948
Add protocol: Cassandra
medcl Jul 5, 2016
323b1bb
Fix gofmt style
medcl Jul 5, 2016
ecc5e37
Remove recursive call in errType
medcl Jul 5, 2016
11a8d23
remove duplicated snappy in glide.yaml
medcl Jul 18, 2016
589ae11
update beats config
medcl Jul 18, 2016
26ddaca
fix logging bug
medcl Jul 18, 2016
e3d0122
refactor and try to direct use streambuf.Buffer
medcl Jul 20, 2016
7616dfd
update config comments
medcl Jul 20, 2016
79cac0e
refactor cassandra decoder
medcl Aug 3, 2016
864b1f5
bugfix and refactor
medcl Aug 8, 2016
34ca244
cleanup comments
medcl Aug 8, 2016
beef417
bugfix
medcl Aug 9, 2016
6c4003c
prefer to use debugf, instead of logp.Debug()
medcl Aug 10, 2016
448fd48
remove unused import
medcl Aug 10, 2016
81ad366
minor improve to parser
medcl Aug 10, 2016
34ad18f
refactor framops ignoring, add test methond
medcl Aug 10, 2016
0297c36
fix build
medcl Aug 12, 2016
d0b915c
fix array_decoder
medcl Aug 12, 2016
ff7ed3d
refactor and fix tracing protocol bug
medcl Aug 12, 2016
665de42
minor improvments, add trace test
medcl Aug 15, 2016
9c6547f
minor change to pub.go
medcl Aug 15, 2016
0739c75
throw error while compressor is not set but hit compress flag
medcl Aug 16, 2016
e54663f
add test for compressed frame
medcl Aug 16, 2016
0ce69fa
add docs
medcl Aug 16, 2016
d1c467f
add ignore ops list
medcl Aug 16, 2016
2117cb6
update section in alphabetical order
medcl Aug 16, 2016
615ce3a
add sample output and kibana dashboard
medcl Aug 16, 2016
2675b38
update kibana dashboard
medcl Aug 17, 2016
ccce922
update changelog
medcl Aug 17, 2016
d5f9fbd
update dashboard
medcl Aug 17, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
minor improvments, add trace test
  • Loading branch information
medcl committed Aug 17, 2016
commit 665de42a64a4dd455bbf7485ea7bb24bbc7cac1e
5 changes: 2 additions & 3 deletions packetbeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
@@ -441,7 +441,6 @@ packetbeat.protocols.cassandra:
# the stream will decode by the compressor, currently support: `snappy` , The default is empty.
#compressor: "snappy"

# This option indicates which Operator/Operators will be ignored,
# multi value can be sperated by `,`
#ignored_ops: "SUPPORTED,OPTIONS"
# This option indicates which Operator/Operators will be ignored
#ignored_ops: ["SUPPORTED","OPTIONS"]

8 changes: 4 additions & 4 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
@@ -438,12 +438,12 @@ packetbeat.protocols.cassandra:
#send_response_header: true

# If this option is enabled, and also the flag indicates that the frame body was compressed, and
# the stream will decode by the compressor, currently support: `snappy` , The default is empty.
# the stream will decode by the compressor, currently support: `snappy` , The default compressor is nil.
#compressor: "snappy"

# This option indicates which Operator/Operators will be ignored,
# multi value can be sperated by `,`
#ignored_ops: "SUPPORTED,OPTIONS"
# This option indicates which Operator/Operators will be ignored
#ignored_ops: ["SUPPORTED","OPTIONS"]


#================================ General =====================================

15 changes: 1 addition & 14 deletions packetbeat/protos/cassandra/cassandra.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,6 @@ import (
. "github.com/elastic/beats/packetbeat/protos/cassandra/internal/gocql"
"github.com/elastic/beats/packetbeat/protos/tcp"
"github.com/elastic/beats/packetbeat/publish"
"strings"
)

// cassandra application level protocol analyzer plugin
@@ -89,20 +88,8 @@ func (cassandra *cassandra) setFromConfig(config *cassandraConfig) error {
// parsed ignored ops
if len(config.OPsIgnored) > 0 {
maps := map[FrameOp]bool{}

if strings.Contains(config.OPsIgnored, ",") {
array := strings.Split(config.OPsIgnored, ",")
for i := 0; i < len(array); i++ {
str := array[i]
if len(str) > 0 {
op := FrameOpFromString(strings.ToUpper(strings.TrimSpace(str)))
maps[op] = true
}
}
} else {
op := FrameOpFromString(strings.ToUpper(strings.TrimSpace(config.OPsIgnored)))
for _, op := range config.OPsIgnored {
maps[op] = true

}
parser.ignoredOps = maps
debugf("parsed config IgnoredOPs: %v ", parser.ignoredOps)
9 changes: 5 additions & 4 deletions packetbeat/protos/cassandra/config.go
Original file line number Diff line number Diff line change
@@ -4,15 +4,16 @@ import (
"fmt"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
. "github.com/elastic/beats/packetbeat/protos/cassandra/internal/gocql"
"github.com/pkg/errors"
)

type cassandraConfig struct {
config.ProtocolCommon `config:",inline"`
SendRequestHeader bool `config:"send_request_header"`
SendResponseHeader bool `config:"send_response_header"`
Compressor string `config:"compressor"`
OPsIgnored string `config:"ignored_ops"`
SendRequestHeader bool `config:"send_request_header"`
SendResponseHeader bool `config:"send_response_header"`
Compressor string `config:"compressor"`
OPsIgnored []FrameOp `config:"ignored_ops"`
}

var (
2 changes: 0 additions & 2 deletions packetbeat/protos/cassandra/internal/gocql/frame.go
Original file line number Diff line number Diff line change
@@ -212,8 +212,6 @@ func (f *Framer) ReadFrame() (data map[string]interface{}, err error) {
f.decoder = decoder

debugf("hit compress flags")

return nil, errors.New("Compressed content not supported yet")
}

// assumes that the frame body has been read into rbuf
77 changes: 40 additions & 37 deletions packetbeat/protos/cassandra/internal/gocql/marshal.go
Original file line number Diff line number Diff line change
@@ -502,44 +502,47 @@ func (f FrameOp) String() string {
}
}

func FrameOpFromString(str string) FrameOp {
switch str {
case "ERROR":
return opError
case "STARTUP":
return opStartup
case "READY":
return opReady
case "AUTHENTICATE":
return opAuthenticate
case "OPTIONS":
return opOptions
case "SUPPORTED":
return opSupported
case "QUERY":
return opQuery
case "RESULT":
return opResult
case "PREPARE":
return opPrepare
case "EXECUTE":
return opExecute
case "REGISTER":
return opRegister
case "EVENT":
return opEvent
case "BATCH":
return opBatch
case "AUTH_CHALLENGE":
return opAuthChallenge
case "AUTH_RESPONSE":
return opAuthResponse
case "AUTH_SUCCESS":
return opAuthSuccess
default:
debugf("unknown Op while convert: %s", str)
return opUnknown
var frameOps = map[string]FrameOp{
"ERROR": opError,
"STARTUP": opStartup,
"READY": opReady,
"AUTHENTICATE": opAuthenticate,
"OPTIONS": opOptions,
"SUPPORTED": opSupported,
"QUERY": opQuery,
"RESULT": opResult,
"PREPARE": opPrepare,
"EXECUTE": opExecute,
"REGISTER": opRegister,
"EVENT": opEvent,
"BATCH": opBatch,
"AUTH_CHALLENGE": opAuthChallenge,
"AUTH_RESPONSE": opAuthResponse,
"AUTH_SUCCESS": opAuthSuccess,
}

func FrameOpFromString(s string) (FrameOp, error) {
s = strings.ToUpper(strings.TrimSpace(s))
op, found := frameOps[s]
if !found {
return opUnknown, fmt.Errorf("unknown frame op: %v", s)
}
return op, nil
}

func (f *FrameOp) Unpack(in interface{}) error {
s, ok := in.(string)
if !ok {
return errors.New("expected string")
}

op, err := FrameOpFromString(s)
if err != nil {
return err
}

*f = op
return nil
}

const (
99 changes: 47 additions & 52 deletions packetbeat/protos/cassandra/parser.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ type parserConfig struct {
}

// check whether this ops is enabled or not
func (p *parser) FrameOpsIgnored() bool {
func (p *parser) CheckFrameOpsIgnored() bool {

if p.config.ignoredOps != nil && len(p.config.ignoredOps) > 0 {
//default map value is false
@@ -97,7 +97,6 @@ func (p *parser) feed(ts time.Time, data []byte) error {
if p.message == nil {
// allocate new message object to be used by parser with current timestamp
p.message = p.newMessage(ts)
p.message.data = map[string]interface{}{}
}

msg, err := p.parse()
@@ -129,66 +128,64 @@ func (p *parser) newMessage(ts time.Time) *message {
}
}

func (p *parser) parserBody() (map[string]interface{}, error) {
func (p *parser) parserBody() (bool, error) {

if p.framer.Header.BodyLength > 0 {
debugf("bodyLength: %d", p.framer.Header.BodyLength)
headLen := p.framer.Header.HeadLength
bdyLen := p.framer.Header.BodyLength
if bdyLen <= 0 {
return true, nil
}

//let's wait for enough buf
if !p.buf.Avail(p.framer.Header.BodyLength) {
if isDebug {
debugf("buf not enough for body, waiting for more, return")
}
return nil, nil
//let's wait for enough buf
debugf("bodyLength: %d", bdyLen)
if !p.buf.Avail(bdyLen) {
if isDebug {
debugf("buf not enough for body, waiting for more, return")
}
return false, nil
}

//check if the ops already ignored
if p.message.ignored {
//check if the ops already ignored
if p.message.ignored {
if isDebug {
debugf("message marked to be ignored, let's do this")
p.buf.Collect(p.framer.Header.BodyLength)
finalCollectedFrameLength := p.buf.BufferConsumed()
if finalCollectedFrameLength-p.framer.Header.HeadLength != p.framer.Header.BodyLength {
return nil, errors.New("data messed while parse frame body")
}

}

p.buf.Collect(bdyLen)
} else {
// start to parse body
if !p.message.ignored {
data, err := p.framer.ReadFrame()
if err != nil {
// if the frame parsed failed, should ignore the whole message
p.framer = nil
return nil, err
}

// dealing with un-parsed content
frameParsedLength := p.buf.BufferConsumed()
data, err := p.framer.ReadFrame()
if err != nil {
// if the frame parsed failed, should ignore the whole message
p.framer = nil
return false, err
}

// collect leftover
unParsedSize := p.framer.Header.BodyLength + p.framer.Header.HeadLength - frameParsedLength
if unParsedSize > 0 {
// dealing with un-parsed content
frameParsedLength := p.buf.BufferConsumed()

// double check the buf size
if p.buf.Avail(unParsedSize) {
p.buf.Collect(unParsedSize)
} else {
logp.Err("should be enough bytes for cleanup,but not enough")
return nil, errors.New("should be enough bytes,but not enough")
}
// collect leftover
unParsedSize := bdyLen + headLen - frameParsedLength
if unParsedSize > 0 {
if !p.buf.Avail(unParsedSize) {
err := errors.New("should be enough bytes for cleanup,but not enough")
logp.Err("Finishing frame failed with: %v", err)
return false, err
}
return data, nil
}

finalCollectedFrameLength := p.buf.BufferConsumed()
if finalCollectedFrameLength-p.framer.Header.HeadLength != p.framer.Header.BodyLength {
logp.Err("body_length:%d, head_length:%d, all_consumed:%d", p.framer.Header.BodyLength, p.framer.Header.HeadLength, finalCollectedFrameLength)
return nil, errors.New("data messed while parse frame body")
p.buf.Collect(unParsedSize)
}

p.message.data = data
}

finalCollectedFrameLength := p.buf.BufferConsumed()
if finalCollectedFrameLength-headLen != bdyLen {
logp.Err("body_length:%d, head_length:%d, all_consumed:%d",
bdyLen, headLen, finalCollectedFrameLength)
return false, errors.New("data messed while parse frame body")
}

return map[string]interface{}{}, nil
return true, nil
}

func (p *parser) parse() (*message, error) {
@@ -220,7 +217,7 @@ func (p *parser) parse() (*message, error) {
}

//check if the ops need to be ignored
if p.FrameOpsIgnored() {
if p.CheckFrameOpsIgnored() {
// as we already ignore the content, we now mark the result is ignored
p.message.ignored = true
if isDebug {
@@ -230,18 +227,16 @@ func (p *parser) parse() (*message, error) {

msg := p.message

data, err := p.parserBody()
finished, err := p.parserBody()
if err != nil {
return nil, err
}

//ignore and wait for more data
if data == nil {
if !finished {
return nil, nil
}

msg.data = data

dir := applayer.NetOriginalDirection

isRequest := true
17 changes: 15 additions & 2 deletions packetbeat/protos/cassandra/pub.go
Original file line number Diff line number Diff line change
@@ -67,11 +67,17 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
}

if pub.sendRequest {
if requ.data == nil {
requ.data = map[string]interface{}{}
}

if pub.sendRequestHeader {
requ.data["request_headers"] = requ.header
}

event["cassandra_request"] = requ.data
if len(requ.data) > 0 {
event["cassandra_request"] = requ.data
}
Copy link

@urso urso Aug 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block can be changed to:

if pub.sendRequestHeader {
    if requ.data == nil {
        requ.data = map[string]interface{}{}
    }
    requ.data["request_headers"] = requ.header
}

if len(requ.data) > 0 {
    event["cassandra_request"] = requ.data
}

len(requ.data) == 0 if requ.data == nil. This way we don't need to create a map if sendRequestHeader is false

}

dst := &common.Endpoint{
@@ -97,11 +103,18 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {
event["bytes_out"] = resp.Size

if pub.sendResponse {
if resp.data == nil {
resp.data = map[string]interface{}{}
}

if pub.sendResponseHeader {
resp.data["response_headers"] = resp.header
}

event["cassandra_response"] = resp.data
if len(resp.data) > 0 {
event["cassandra_response"] = resp.data
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as for requ.data


}

return event
Binary file not shown.
Loading