From 7930f9ed0f75696bfc8feff4400a896232184231 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Tue, 7 Jul 2020 09:26:52 +0200 Subject: [PATCH] Use a cache instead of sync.Pool in script processor (#19562) This updates the script processor to keep a simple cache of Javascript VM sessions instead of relying on sync.Pool for this task. The size of this cache can be controlled by the new `max_cached_sessions` option. The reasoning behind this change is to avoid sync.Pool discarding and re-allocating new sessions every garbage collection cycle. For large Javascript pipelines, allocating a new session is a very expensive operation that can take hundreds of milliseconds or even seconds to complete. This has a severe impact in ingestion rates. --- CHANGELOG.next.asciidoc | 1 + .../processors/script/docs/script.asciidoc | 3 ++ .../processors/script/javascript/config.go | 18 +++++---- .../processors/script/javascript/session.go | 40 +++++++++++++------ 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index aadf24c6a95..ce1dd3b7bb0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -323,6 +323,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add the `ignore_failure` configuration option to the dissect processor. {pull}19464[19464] - Add the `overwrite_keys` configuration option to the dissect processor. {pull}19464[19464] - Add support to trim captured values in the dissect processor. {pull}19464[19464] +- Added the `max_cached_sessions` option to the script processor. {pull}19562[19562] *Auditbeat* diff --git a/libbeat/processors/script/docs/script.asciidoc b/libbeat/processors/script/docs/script.asciidoc index 106a98a001b..60636f3fcac 100644 --- a/libbeat/processors/script/docs/script.asciidoc +++ b/libbeat/processors/script/docs/script.asciidoc @@ -113,6 +113,9 @@ is interrupted. You can set this option to prevent a script from running for too long (like preventing an infinite `while` loop). By default there is no timeout. +`max_cached_sessions`:: This sets the maximum number of Javascript VM sessions +that will be cached to avoid reallocation. The default is `4`. + [float] ==== Event API diff --git a/libbeat/processors/script/javascript/config.go b/libbeat/processors/script/javascript/config.go index 16aae135242..56d17bcfd2d 100644 --- a/libbeat/processors/script/javascript/config.go +++ b/libbeat/processors/script/javascript/config.go @@ -25,13 +25,14 @@ import ( // Config defines the Javascript source files to use for the processor. type Config struct { - Tag string `config:"tag"` // Processor ID for debug and metrics. - Source string `config:"source"` // Inline script to execute. - File string `config:"file"` // Source file. - Files []string `config:"files"` // Multiple source files. - Params map[string]interface{} `config:"params"` // Parameters to pass to script. - Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout. - TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens. + Tag string `config:"tag"` // Processor ID for debug and metrics. + Source string `config:"source"` // Inline script to execute. + File string `config:"file"` // Source file. + Files []string `config:"files"` // Multiple source files. + Params map[string]interface{} `config:"params"` // Parameters to pass to script. + Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout. + TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens. + MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions. } // Validate returns an error if one (and only one) option is not set. @@ -57,6 +58,7 @@ func (c Config) Validate() error { func defaultConfig() Config { return Config{ - TagOnException: "_js_exception", + TagOnException: "_js_exception", + MaxCachedSessions: 4, } } diff --git a/libbeat/processors/script/javascript/session.go b/libbeat/processors/script/javascript/session.go index 1bade0cc2ba..adfacd9214a 100644 --- a/libbeat/processors/script/javascript/session.go +++ b/libbeat/processors/script/javascript/session.go @@ -19,7 +19,6 @@ package javascript import ( "reflect" - "sync" "time" "github.com/dop251/goja" @@ -83,17 +82,25 @@ type session struct { } func newSession(p *goja.Program, conf Config, test bool) (*session, error) { + // Create a logger + logger := logp.NewLogger(logName) + if conf.Tag != "" { + logger = logger.With("instance_id", conf.Tag) + } + // Measure load times + start := time.Now() + defer func() { + took := time.Now().Sub(start) + logger.Debugf("Load of javascript pipeline took %v", took) + }() // Setup JS runtime. s := &session{ vm: goja.New(), - log: logp.NewLogger(logName), + log: logger, makeEvent: newBeatEventV0, timeout: conf.Timeout, tagOnException: conf.TagOnException, } - if conf.Tag != "" { - s.log = s.log.With("instance_id", conf.Tag) - } // Register modules. for _, registerModule := range sessionHooks { @@ -266,7 +273,8 @@ func init() { } type sessionPool struct { - pool *sync.Pool + New func() *session + C chan *session } func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) { @@ -275,24 +283,32 @@ func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) { return nil, err } - pool := &sync.Pool{ - New: func() interface{} { + pool := sessionPool{ + New: func() *session { s, _ := newSession(p, c, false) return s }, + C: make(chan *session, c.MaxCachedSessions), } pool.Put(s) - return &sessionPool{pool}, nil + return &pool, nil } func (p *sessionPool) Get() *session { - s, _ := p.pool.Get().(*session) - return s + select { + case s := <-p.C: + return s + default: + return p.New() + } } func (p *sessionPool) Put(s *session) { if s != nil { - p.pool.Put(s) + select { + case p.C <- s: + default: + } } }