Skip to content

Commit

Permalink
Load ML using Kibana API (#6604)
Browse files Browse the repository at this point in the history
Since Kibana 6.1 new ML API is available to load jobs, datafeeds, etc. Thus, it's not necessary to store ML configs in Beats and load them using setup.

From now on if Filebeat connects to a Kibana 6.1 or newer the Kibana API is called. If Kibana is older than 6.1, Filebeat loads the ML config from the repo, as it's done in previous versions.
  • Loading branch information
kvch authored and tsg committed Mar 25, 2018
1 parent d679264 commit 6e27e4c
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 17 deletions.
44 changes: 38 additions & 6 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/setup/kibana"

fbautodiscover "github.com/elastic/beats/filebeat/autodiscover"
"github.com/elastic/beats/filebeat/channel"
Expand Down Expand Up @@ -120,8 +121,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

// register `setup` callback for ML jobs
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
b.SetupMLCallback = func(b *beat.Beat, kibanaConfig *common.Config) error {
return fb.loadModulesML(b, kibanaConfig)
}
return fb, nil
}
Expand All @@ -144,10 +145,11 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
return nil
}

func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")
func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) error {
var errs multierror.Errors

logp.Debug("machine-learning", "Setting up ML jobs for modules")

if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
" modules because the Elasticsearch output is not configured/enabled.")
Expand All @@ -159,7 +161,22 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
if err := fb.moduleRegistry.LoadML(esClient); err != nil {

if kibanaConfig == nil {
kibanaConfig = common.NewConfig()
}

kibanaClient, err := kibana.NewKibanaClient(kibanaConfig)
if err != nil {
return errors.Errorf("Error creating Kibana client: %v", err)
}

kibanaVersion, err := common.NewVersion(kibanaClient.GetVersion())
if err != nil {
return err
}

if err := setupMLBasedOnVersion(fb.moduleRegistry, esClient, kibanaClient, kibanaVersion); err != nil {
errs = append(errs, err)
}

Expand All @@ -185,15 +202,30 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
continue
}

if err := set.LoadML(esClient); err != nil {
if err := setupMLBasedOnVersion(set, esClient, kibanaClient, kibanaVersion); err != nil {
errs = append(errs, err)
}

}
}

return errs.Err()
}

func setupMLBasedOnVersion(reg *fileset.ModuleRegistry, esClient *elasticsearch.Client, kibanaClient *kibana.Client, kibanaVersion *common.Version) error {
if isElasticsearchLoads(kibanaVersion) {
return reg.LoadML(esClient)
}
return reg.SetupML(esClient, kibanaClient)
}

func isElasticsearchLoads(kibanaVersion *common.Version) bool {
if kibanaVersion.Major < 6 || kibanaVersion.Major == 6 && kibanaVersion.Minor < 1 {
return true
}
return false
}

// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
Expand Down
57 changes: 52 additions & 5 deletions filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ import (
"github.com/elastic/beats/libbeat/logp"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/setup/kibana"
)

var availableMLModules = map[string]string{
"apache2": "access",
"nginx": "access",
}

type ModuleRegistry struct {
registry map[string]map[string]*Fileset // module -> fileset -> Fileset
}
Expand Down Expand Up @@ -449,23 +455,23 @@ func interpretError(initialErr error, body []byte) error {
return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// LoadML loads the machine-learning configurations into Elasticsearch, if Xpack is available
// LoadML loads the machine-learning configurations into Elasticsearch, if X-Pack is available
func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
haveXpack, err := mlimporter.HaveXpackML(esClient)
if err != nil {
return errors.Errorf("Error checking if xpack is available: %v", err)
return errors.Errorf("error checking if xpack is available: %v", err)
}
if !haveXpack {
logp.Warn("Xpack Machine Learning is not enabled")
logp.Warn("X-Pack Machine Learning is not enabled")
return nil
}

for module, filesets := range reg.registry {
for name, fileset := range filesets {
for _, mlConfig := range fileset.GetMLConfigs() {
err = mlimporter.ImportMachineLearningJob(esClient, &mlConfig)
err := mlimporter.ImportMachineLearningJob(esClient, &mlConfig)
if err != nil {
return errors.Errorf("Error loading ML config from %s/%s: %v", module, name, err)
return errors.Errorf("error loading ML config from %s/%s: %v", module, name, err)
}
}
}
Expand All @@ -474,10 +480,51 @@ func (reg *ModuleRegistry) LoadML(esClient PipelineLoader) error {
return nil
}

// SetupML sets up the machine-learning configurations into Elasticsearch using Kibana, if X-Pack is available
func (reg *ModuleRegistry) SetupML(esClient PipelineLoader, kibanaClient *kibana.Client) error {
haveXpack, err := mlimporter.HaveXpackML(esClient)
if err != nil {
return errors.Errorf("Error checking if xpack is available: %v", err)
}
if !haveXpack {
logp.Warn("X-Pack Machine Learning is not enabled")
return nil
}

modules := make(map[string]string)
if reg.Empty() {
modules = availableMLModules
} else {
for _, module := range reg.ModuleNames() {
if fileset, ok := availableMLModules[module]; ok {
modules[module] = fileset
}
}
}

for module, fileset := range modules {
prefix := fmt.Sprintf("filebeat-%s-%s-", module, fileset)
err := mlimporter.SetupModule(kibanaClient, module, prefix)
if err != nil {
return errors.Errorf("Error setting up ML for %s: %v", module, err)
}
}
return nil
}

func (reg *ModuleRegistry) Empty() bool {
count := 0
for _, filesets := range reg.registry {
count += len(filesets)
}
return count == 0
}

// ModuleNames returns the names of modules in the ModuleRegistry.
func (reg *ModuleRegistry) ModuleNames() []string {
var modules []string
for m := range reg.registry {
modules = append(modules, m)
}
return modules
}
2 changes: 1 addition & 1 deletion libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ type BeatConfig struct {

// SetupMLCallback can be used by the Beat to register MachineLearning configurations
// for the enabled modules.
type SetupMLCallback func(*Beat) error
type SetupMLCallback func(*Beat, *common.Config) error
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (b *Beat) launch(bt beat.Creator) error {
return err
}
if setup && b.SetupMLCallback != nil {
err = b.SetupMLCallback(&b.Beat)
err = b.SetupMLCallback(&b.Beat, b.Config.Kibana)
if err != nil {
return err
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning bool
}

if machineLearning && b.SetupMLCallback != nil {
err = b.SetupMLCallback(&b.Beat)
err = b.SetupMLCallback(&b.Beat, b.Config.Kibana)
if err != nil {
return err
}
Expand Down
145 changes: 143 additions & 2 deletions libbeat/ml-importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@ package mlimporter
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/url"
"strings"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

var (
esDataFeedURL = "/_xpack/ml/datafeeds/datafeed-%s"
esJobURL = "/_xpack/ml/anomaly_detectors/%s"
kibanaGetModuleURL = "/api/ml/modules/get_module/%s"
kibanaRecognizeURL = "/api/ml/modules/recognize/%s"
kibanaSetupModuleURL = "/api/ml/modules/setup/%s"
)

// MLConfig contains the required configuration for loading one job and the associated
// datafeed.
type MLConfig struct {
Expand All @@ -29,6 +41,56 @@ type MLLoader interface {
GetVersion() string
}

// MLSetupper is a subset of the Kibana client API capable of setting up ML objects.
type MLSetupper interface {
Request(method, path string, params url.Values, body io.Reader) (int, []byte, error)
GetVersion() string
}

// MLResponse stores the relevant parts of the response from Kibana to check for errors.
type MLResponse struct {
Datafeeds []struct {
ID string
Success bool
Error struct {
Msg string
}
}
Jobs []struct {
ID string
Success bool
Error struct {
Msg string
}
}
Kibana struct {
Dashboard []struct {
Success bool
ID string
Exists bool
Error struct {
Message string
}
}
Search []struct {
Success bool
ID string
Exists bool
Error struct {
Message string
}
}
Visualization []struct {
Success bool
ID string
Exists bool
Error struct {
Message string
}
}
}
}

func readJSONFile(path string) (common.MapStr, error) {
file, err := ioutil.ReadFile(path)
if err != nil {
Expand All @@ -41,8 +103,8 @@ func readJSONFile(path string) (common.MapStr, error) {

// ImportMachineLearningJob uploads the job and datafeed configuration to ES/xpack.
func ImportMachineLearningJob(esClient MLLoader, cfg *MLConfig) error {
jobURL := fmt.Sprintf("/_xpack/ml/anomaly_detectors/%s", cfg.ID)
datafeedURL := fmt.Sprintf("/_xpack/ml/datafeeds/datafeed-%s", cfg.ID)
jobURL := fmt.Sprintf(esJobURL, cfg.ID)
datafeedURL := fmt.Sprintf(esDataFeedURL, cfg.ID)

if len(cfg.MinVersion) > 0 {
esVersion, err := common.NewVersion(esClient.GetVersion())
Expand Down Expand Up @@ -121,3 +183,82 @@ func HaveXpackML(esClient MLLoader) (bool, error) {
}
return xpack.Features.ML.Available && xpack.Features.ML.Enabled, nil
}

// SetupModule creates ML jobs, data feeds and dashboards for modules.
func SetupModule(kibanaClient MLSetupper, module, prefix string) error {
setupURL := fmt.Sprintf(kibanaSetupModuleURL, module)
prefixPayload := fmt.Sprintf("{\"prefix\": \"%s\"}", prefix)
status, response, err := kibanaClient.Request("POST", setupURL, nil, strings.NewReader(prefixPayload))
if status != 200 {
return errors.Errorf("cannot set up ML with prefix: %s", prefix)
}
if err != nil {
return err
}

return checkResponse(response)
}

func checkResponse(r []byte) error {
var errs multierror.Errors

var resp MLResponse
err := json.Unmarshal(r, &resp)
if err != nil {
return err
}

for _, feed := range resp.Datafeeds {
if !feed.Success {
if strings.HasPrefix(feed.Error.Msg, "[resource_already_exists_exception]") {
logp.Debug("machine-learning", "Datafeed already exists: %s", feed.ID)
continue
}
errs = append(errs, errors.Errorf(feed.Error.Msg))
}
}
for _, job := range resp.Jobs {
if strings.HasPrefix(job.Error.Msg, "[resource_already_exists_exception]") {
logp.Debug("machine-learning", "Job already exists: %s", job.ID)
continue
}
if !job.Success {
errs = append(errs, errors.Errorf(job.Error.Msg))
}
}
for _, dashboard := range resp.Kibana.Dashboard {
if !dashboard.Success {
if dashboard.Exists {
logp.Debug("machine-learning", "Dashboard already exists: %s", dashboard.ID)
} else if strings.Contains(dashboard.Error.Message, "version conflict, document already exists") {
continue
} else {
errs = append(errs, errors.Errorf("error while setting up dashboard: %s", dashboard.ID))
}
}
}
for _, search := range resp.Kibana.Search {
if !search.Success {
if search.Exists {
logp.Debug("machine-learning", "Search already exists: %s", search.ID)
} else if strings.Contains(search.Error.Message, "version conflict, document already exists") {
continue
} else {
errs = append(errs, errors.Errorf("error while setting up search: %s", search.ID))
}
}
}
for _, visualization := range resp.Kibana.Visualization {
if !visualization.Success {
if visualization.Exists {
logp.Debug("machine-learning", "Visualization already exists: %s", visualization.ID)
} else if strings.Contains(visualization.Error.Message, "version conflict, document already exists") {
continue
} else {
errs = append(errs, errors.Errorf("error while setting up visualization: %s", visualization.ID))
}
}
}

return errs.Err()
}
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/estest/estest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func GetTestingElasticsearch(t internal.TestLogger) *elasticsearch.Client {
URL: internal.GetURL(),
Index: outil.MakeSelector(),
Username: internal.GetUser(),
Password: internal.GetUser(),
Password: internal.GetPass(),
Timeout: 60 * time.Second,
CompressionLevel: 3,
}, nil)
Expand Down

0 comments on commit 6e27e4c

Please sign in to comment.