Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a persistent cache for cloudfoundry metadata based on badger #20775

Merged
merged 18 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added experimental dataset `juniper/netscreen`. {pull}20820[20820]
- Added experimental dataset `sophos/utm`. {pull}20820[20820]
- Add Cloud Foundry tags in related events. {pull}21177[21177]
- Cloud Foundry metadata is cached to disk. {pull}20775[20775]
- Add option to select the type of index template to load: legacy, component, index. {pull}21212[21212]

*Auditbeat*
Expand Down
6,521 changes: 4,453 additions & 2,068 deletions NOTICE.txt

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20200206145737-bbfc9a55622e
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible // indirect
github.com/digitalocean/go-libvirt v0.0.0-20180301200012-6075ea3c39a1
github.com/dlclark/regexp2 v1.1.7-0.20171009020623-7632a260cbaf // indirect
Expand All @@ -55,7 +56,7 @@ require (
github.com/docker/go-units v0.4.0
github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
github.com/dustin/go-humanize v1.0.0
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
Expand Down Expand Up @@ -142,12 +143,13 @@ require (
github.com/satori/go.uuid v1.2.0 // indirect
github.com/shirou/gopsutil v2.19.11+incompatible
github.com/shopspring/decimal v1.2.0
github.com/spf13/cobra v0.0.3
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/tsg/go-daemon v0.0.0-20200207173439-e704b93fd89b
github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786
github.com/ugorji/go/codec v1.1.8
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
Expand Down
66 changes: 42 additions & 24 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion libbeat/tests/system/test_cmd_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_bash_completion(self):
def test_zsh_completion(self):
exit_code = self.run_beat(extra_args=["completion", "zsh"])
assert exit_code == 0
assert self.log_contains("#compdef mockbeat")
assert self.log_contains("#compdef _mockbeat mockbeat")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change related to the PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are not using a fixed version of cobra, and badger depends on a newer version. This version modifies autogenerated completion. I can move the upgrade of cobra to a different PR (actually this could be interesting to do because a 1.0 version has been released).


def test_unknown_completion(self):
exit_code = self.run_beat(extra_args=["completion", "awesomeshell"])
Expand Down
109 changes: 80 additions & 29 deletions x-pack/libbeat/common/cloudfoundry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
package cloudfoundry

import (
"crypto/sha1"
"encoding/base64"
"fmt"
"time"

"github.com/cloudfoundry-community/go-cfclient"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/libbeat/persistentcache"
)

// cfClient interface is provided so unit tests can mock the actual client.
Expand All @@ -22,65 +25,113 @@ type cfClient interface {

// clientCacheWrap wraps the cloudfoundry client to add a cache in front of GetAppByGuid.
type clientCacheWrap struct {
cache *common.Cache
cache *persistentcache.PersistentCache
client cfClient
log *logp.Logger
errorTTL time.Duration
}

// newClientCacheWrap creates a new cache for application data.
func newClientCacheWrap(client cfClient, ttl time.Duration, errorTTL time.Duration, log *logp.Logger) *clientCacheWrap {
func newClientCacheWrap(client cfClient, cacheName string, ttl time.Duration, errorTTL time.Duration, log *logp.Logger) (*clientCacheWrap, error) {
options := persistentcache.Options{
Timeout: ttl,
}

name := "cloudfoundry"
if cacheName != "" {
name = name + "-" + sanitizeCacheName(cacheName)
}

cache, err := persistentcache.New(name, options)
if err != nil {
return nil, fmt.Errorf("creating metadata cache: %w", err)
}

return &clientCacheWrap{
cache: common.NewCacheWithExpireOnAdd(ttl, 100),
cache: cache,
client: client,
errorTTL: errorTTL,
log: log,
}
}, nil
}

type appResponse struct {
app *cfclient.App
err error
App AppMeta `json:"a"`
Error cfclient.CloudFoundryError `json:"e,omitempty"`
ErrorMessage string `json:"em,omitempty"`
}

func (r *appResponse) fromStructs(app cfclient.App, err error) {
if err != nil {
cause := errors.Cause(err)
if cferr, ok := cause.(cfclient.CloudFoundryError); ok {
r.Error = cferr
}
r.ErrorMessage = err.Error()
return
}
r.App = AppMeta{
Name: app.Name,
Guid: app.Guid,
SpaceName: app.SpaceData.Entity.Name,
SpaceGuid: app.SpaceData.Meta.Guid,
OrgName: app.SpaceData.Entity.OrgData.Entity.Name,
OrgGuid: app.SpaceData.Entity.OrgData.Meta.Guid,
}
}

func (r *appResponse) toStructs() (*AppMeta, error) {
var empty cfclient.CloudFoundryError
if r.Error != empty {
// Wrapping the error so cfclient.IsAppNotFoundError can identify it
return nil, errors.Wrap(r.Error, r.ErrorMessage)
}
if len(r.ErrorMessage) > 0 {
return nil, errors.New(r.ErrorMessage)
}
return &r.App, nil
}

// fetchApp uses the cfClient to retrieve an App entity and
// stores it in the internal cache
func (c *clientCacheWrap) fetchAppByGuid(guid string) (*cfclient.App, error) {
func (c *clientCacheWrap) fetchAppByGuid(guid string) (*AppMeta, error) {
app, err := c.client.GetAppByGuid(guid)
resp := appResponse{
app: &app,
err: err,
}
var resp appResponse
resp.fromStructs(app, err)
timeout := time.Duration(0)
if err != nil {
// Cache nil, because is what we want to return when there was an error
resp.app = nil
timeout = c.errorTTL
}
c.cache.PutWithTimeout(guid, &resp, timeout)
return resp.app, resp.err
err = c.cache.PutWithTimeout(guid, resp, timeout)
if err != nil {
return nil, fmt.Errorf("storing app response in cache: %w", err)
}
return resp.toStructs()
}

// GetApp returns CF Application info, either from the cache or
// using the CF client.
func (c *clientCacheWrap) GetAppByGuid(guid string) (*cfclient.App, error) {
cachedResp := c.cache.Get(guid)
if cachedResp == nil {
func (c *clientCacheWrap) GetAppByGuid(guid string) (*AppMeta, error) {
var resp appResponse
err := c.cache.Get(guid, &resp)
if err != nil {
return c.fetchAppByGuid(guid)
}
resp, ok := cachedResp.(*appResponse)
if !ok {
return nil, fmt.Errorf("error converting cached app response (of type %T), this is likely a bug", cachedResp)
}
return resp.app, resp.err
return resp.toStructs()
}

// StartJanitor starts a goroutine that will periodically clean the applications cache.
func (c *clientCacheWrap) StartJanitor(interval time.Duration) {
c.cache.StartJanitor(interval)
// Close release resources associated with this client
func (c *clientCacheWrap) Close() error {
err := c.cache.Close()
if err != nil {
return fmt.Errorf("closing cache: %w", err)
}
return nil
}

// StopJanitor stops the goroutine that periodically clean the applications cache.
func (c *clientCacheWrap) StopJanitor() {
c.cache.StopJanitor()
// sanitizeCacheName returns a unique string that can be used safely as part of a file name
func sanitizeCacheName(name string) string {
hash := sha1.Sum([]byte(name))
return base64.RawURLEncoding.EncodeToString(hash[:])
}
11 changes: 7 additions & 4 deletions x-pack/libbeat/common/cloudfoundry/cache_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestGetApps(t *testing.T) {

client, err := hub.Client()
require.NoError(t, err)
apps, err := client.(*clientCacheWrap).client.(*cfclient.Client).ListApps()
apps, err := client.ListApps()
require.NoError(t, err)

t.Logf("%d applications available", len(apps))
Expand All @@ -40,8 +40,9 @@ func TestGetApps(t *testing.T) {
if len(apps) == 0 {
t.Skip("no apps in account?")
}
client, err := hub.Client()
client, err := hub.ClientWithCache()
require.NoError(t, err)
defer client.Close()

guid := apps[0].Guid
app, err := client.GetAppByGuid(guid)
Expand All @@ -50,13 +51,15 @@ func TestGetApps(t *testing.T) {
})

t.Run("handle error when application is not available", func(t *testing.T) {
client, err := hub.Client()
client, err := hub.ClientWithCache()
require.NoError(t, err)
defer client.Close()

testNotExists := func(t *testing.T) {
app, err := client.GetAppByGuid("notexists")
assert.Nil(t, app)
assert.True(t, cfclient.IsAppNotFoundError(err))
assert.Error(t, err)
assert.True(t, cfclient.IsAppNotFoundError(err), "Error found: %v", err)
}

var firstTimeDuration time.Duration
Expand Down
27 changes: 18 additions & 9 deletions x-pack/libbeat/common/cloudfoundry/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,25 @@ import (
"github.com/cloudfoundry-community/go-cfclient"
"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestClientCacheWrap(t *testing.T) {
ttl := 500 * time.Millisecond
if testing.Short() {
t.Skip("skipping in short mode")
}

ttl := 2 * time.Second
guid := mustCreateFakeGuid()
app := cfclient.App{
Guid: guid,
Memory: 1, // use this field to track if from cache or from client
Guid: guid,
Name: "Foo", // use this field to track if from cache or from client
}
fakeClient := &fakeCFClient{app, 0}
cache := newClientCacheWrap(fakeClient, ttl, ttl, logp.NewLogger("cloudfoundry"))
cache, err := newClientCacheWrap(fakeClient, "test", ttl, ttl, logp.NewLogger("cloudfoundry"))
require.NoError(t, err)

missingAppGuid := mustCreateFakeGuid()

Expand All @@ -44,25 +50,28 @@ func TestClientCacheWrap(t *testing.T) {
// fetched from client for the first time
one, err = cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, app, *one)
assert.Equal(t, app.Guid, one.Guid)
assert.Equal(t, app.Name, one.Name)
assert.Equal(t, 2, fakeClient.callCount)

// updated app in fake client, new fetch should not have updated app
updatedApp := cfclient.App{
Guid: guid,
Memory: 2,
Guid: guid,
Name: "Bar",
}
fakeClient.app = updatedApp
two, err := cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, app, *two)
assert.Equal(t, app.Guid, two.Guid)
assert.Equal(t, app.Name, two.Name)
assert.Equal(t, 2, fakeClient.callCount)

// wait the ttl, then it should have updated app
time.Sleep(ttl)
three, err := cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, updatedApp, *three)
assert.Equal(t, updatedApp.Guid, three.Guid)
assert.Equal(t, updatedApp.Name, three.Name)
assert.Equal(t, 3, fakeClient.callCount)
}

Expand Down
Loading