Skip to content

Commit

Permalink
Add a persistent cache for cloudfoundry metadata based on badger (#20775
Browse files Browse the repository at this point in the history
)

Cache on disk is used by add_cloudfoundry_metadata.
Cache is written into the beats data directory. Objects in cache
are serialized using CBOR encoding.
Badger DB is added as dependency.
  • Loading branch information
jsoriano authored Oct 6, 2020
1 parent 889854e commit 76905a2
Show file tree
Hide file tree
Showing 17 changed files with 5,487 additions and 2,223 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Added experimental dataset `juniper/netscreen`. {pull}20820[20820]
- Added experimental dataset `sophos/utm`. {pull}20820[20820]
- Add Cloud Foundry tags in related events. {pull}21177[21177]
- Cloud Foundry metadata is cached to disk. {pull}20775[20775]
- Add option to select the type of index template to load: legacy, component, index. {pull}21212[21212]

*Auditbeat*
Expand Down
6,521 changes: 4,453 additions & 2,068 deletions NOTICE.txt

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 // indirect
github.com/denisenkom/go-mssqldb v0.0.0-20200206145737-bbfc9a55622e
github.com/devigned/tab v0.1.2-0.20190607222403-0c15cf42f9a2 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2
github.com/dgrijalva/jwt-go v3.2.1-0.20190620180102-5e25c22bd5d6+incompatible // indirect
github.com/digitalocean/go-libvirt v0.0.0-20180301200012-6075ea3c39a1
github.com/dlclark/regexp2 v1.1.7-0.20171009020623-7632a260cbaf // indirect
Expand All @@ -55,7 +56,7 @@ require (
github.com/docker/go-units v0.4.0
github.com/dop251/goja v0.0.0-20200831102558-9af81ddcf0e1
github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
github.com/dustin/go-humanize v1.0.0
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
Expand Down Expand Up @@ -142,12 +143,13 @@ require (
github.com/satori/go.uuid v1.2.0 // indirect
github.com/shirou/gopsutil v2.19.11+incompatible
github.com/shopspring/decimal v1.2.0
github.com/spf13/cobra v0.0.3
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/tsg/go-daemon v0.0.0-20200207173439-e704b93fd89b
github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786
github.com/ugorji/go/codec v1.1.8
github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec
github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
Expand Down
66 changes: 42 additions & 24 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion libbeat/tests/system/test_cmd_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_bash_completion(self):
def test_zsh_completion(self):
exit_code = self.run_beat(extra_args=["completion", "zsh"])
assert exit_code == 0
assert self.log_contains("#compdef mockbeat")
assert self.log_contains("#compdef _mockbeat mockbeat")

def test_unknown_completion(self):
exit_code = self.run_beat(extra_args=["completion", "awesomeshell"])
Expand Down
109 changes: 80 additions & 29 deletions x-pack/libbeat/common/cloudfoundry/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
package cloudfoundry

import (
"crypto/sha1"
"encoding/base64"
"fmt"
"time"

"github.com/cloudfoundry-community/go-cfclient"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/libbeat/persistentcache"
)

// cfClient interface is provided so unit tests can mock the actual client.
Expand All @@ -22,65 +25,113 @@ type cfClient interface {

// clientCacheWrap wraps the cloudfoundry client to add a cache in front of GetAppByGuid.
type clientCacheWrap struct {
cache *common.Cache
cache *persistentcache.PersistentCache
client cfClient
log *logp.Logger
errorTTL time.Duration
}

// newClientCacheWrap creates a new cache for application data.
func newClientCacheWrap(client cfClient, ttl time.Duration, errorTTL time.Duration, log *logp.Logger) *clientCacheWrap {
func newClientCacheWrap(client cfClient, cacheName string, ttl time.Duration, errorTTL time.Duration, log *logp.Logger) (*clientCacheWrap, error) {
options := persistentcache.Options{
Timeout: ttl,
}

name := "cloudfoundry"
if cacheName != "" {
name = name + "-" + sanitizeCacheName(cacheName)
}

cache, err := persistentcache.New(name, options)
if err != nil {
return nil, fmt.Errorf("creating metadata cache: %w", err)
}

return &clientCacheWrap{
cache: common.NewCacheWithExpireOnAdd(ttl, 100),
cache: cache,
client: client,
errorTTL: errorTTL,
log: log,
}
}, nil
}

type appResponse struct {
app *cfclient.App
err error
App AppMeta `json:"a"`
Error cfclient.CloudFoundryError `json:"e,omitempty"`
ErrorMessage string `json:"em,omitempty"`
}

func (r *appResponse) fromStructs(app cfclient.App, err error) {
if err != nil {
cause := errors.Cause(err)
if cferr, ok := cause.(cfclient.CloudFoundryError); ok {
r.Error = cferr
}
r.ErrorMessage = err.Error()
return
}
r.App = AppMeta{
Name: app.Name,
Guid: app.Guid,
SpaceName: app.SpaceData.Entity.Name,
SpaceGuid: app.SpaceData.Meta.Guid,
OrgName: app.SpaceData.Entity.OrgData.Entity.Name,
OrgGuid: app.SpaceData.Entity.OrgData.Meta.Guid,
}
}

func (r *appResponse) toStructs() (*AppMeta, error) {
var empty cfclient.CloudFoundryError
if r.Error != empty {
// Wrapping the error so cfclient.IsAppNotFoundError can identify it
return nil, errors.Wrap(r.Error, r.ErrorMessage)
}
if len(r.ErrorMessage) > 0 {
return nil, errors.New(r.ErrorMessage)
}
return &r.App, nil
}

// fetchApp uses the cfClient to retrieve an App entity and
// stores it in the internal cache
func (c *clientCacheWrap) fetchAppByGuid(guid string) (*cfclient.App, error) {
func (c *clientCacheWrap) fetchAppByGuid(guid string) (*AppMeta, error) {
app, err := c.client.GetAppByGuid(guid)
resp := appResponse{
app: &app,
err: err,
}
var resp appResponse
resp.fromStructs(app, err)
timeout := time.Duration(0)
if err != nil {
// Cache nil, because is what we want to return when there was an error
resp.app = nil
timeout = c.errorTTL
}
c.cache.PutWithTimeout(guid, &resp, timeout)
return resp.app, resp.err
err = c.cache.PutWithTimeout(guid, resp, timeout)
if err != nil {
return nil, fmt.Errorf("storing app response in cache: %w", err)
}
return resp.toStructs()
}

// GetApp returns CF Application info, either from the cache or
// using the CF client.
func (c *clientCacheWrap) GetAppByGuid(guid string) (*cfclient.App, error) {
cachedResp := c.cache.Get(guid)
if cachedResp == nil {
func (c *clientCacheWrap) GetAppByGuid(guid string) (*AppMeta, error) {
var resp appResponse
err := c.cache.Get(guid, &resp)
if err != nil {
return c.fetchAppByGuid(guid)
}
resp, ok := cachedResp.(*appResponse)
if !ok {
return nil, fmt.Errorf("error converting cached app response (of type %T), this is likely a bug", cachedResp)
}
return resp.app, resp.err
return resp.toStructs()
}

// StartJanitor starts a goroutine that will periodically clean the applications cache.
func (c *clientCacheWrap) StartJanitor(interval time.Duration) {
c.cache.StartJanitor(interval)
// Close release resources associated with this client
func (c *clientCacheWrap) Close() error {
err := c.cache.Close()
if err != nil {
return fmt.Errorf("closing cache: %w", err)
}
return nil
}

// StopJanitor stops the goroutine that periodically clean the applications cache.
func (c *clientCacheWrap) StopJanitor() {
c.cache.StopJanitor()
// sanitizeCacheName returns a unique string that can be used safely as part of a file name
func sanitizeCacheName(name string) string {
hash := sha1.Sum([]byte(name))
return base64.RawURLEncoding.EncodeToString(hash[:])
}
11 changes: 7 additions & 4 deletions x-pack/libbeat/common/cloudfoundry/cache_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestGetApps(t *testing.T) {

client, err := hub.Client()
require.NoError(t, err)
apps, err := client.(*clientCacheWrap).client.(*cfclient.Client).ListApps()
apps, err := client.ListApps()
require.NoError(t, err)

t.Logf("%d applications available", len(apps))
Expand All @@ -40,8 +40,9 @@ func TestGetApps(t *testing.T) {
if len(apps) == 0 {
t.Skip("no apps in account?")
}
client, err := hub.Client()
client, err := hub.ClientWithCache()
require.NoError(t, err)
defer client.Close()

guid := apps[0].Guid
app, err := client.GetAppByGuid(guid)
Expand All @@ -50,13 +51,15 @@ func TestGetApps(t *testing.T) {
})

t.Run("handle error when application is not available", func(t *testing.T) {
client, err := hub.Client()
client, err := hub.ClientWithCache()
require.NoError(t, err)
defer client.Close()

testNotExists := func(t *testing.T) {
app, err := client.GetAppByGuid("notexists")
assert.Nil(t, app)
assert.True(t, cfclient.IsAppNotFoundError(err))
assert.Error(t, err)
assert.True(t, cfclient.IsAppNotFoundError(err), "Error found: %v", err)
}

var firstTimeDuration time.Duration
Expand Down
27 changes: 18 additions & 9 deletions x-pack/libbeat/common/cloudfoundry/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,25 @@ import (
"github.com/cloudfoundry-community/go-cfclient"
"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestClientCacheWrap(t *testing.T) {
ttl := 500 * time.Millisecond
if testing.Short() {
t.Skip("skipping in short mode")
}

ttl := 2 * time.Second
guid := mustCreateFakeGuid()
app := cfclient.App{
Guid: guid,
Memory: 1, // use this field to track if from cache or from client
Guid: guid,
Name: "Foo", // use this field to track if from cache or from client
}
fakeClient := &fakeCFClient{app, 0}
cache := newClientCacheWrap(fakeClient, ttl, ttl, logp.NewLogger("cloudfoundry"))
cache, err := newClientCacheWrap(fakeClient, "test", ttl, ttl, logp.NewLogger("cloudfoundry"))
require.NoError(t, err)

missingAppGuid := mustCreateFakeGuid()

Expand All @@ -44,25 +50,28 @@ func TestClientCacheWrap(t *testing.T) {
// fetched from client for the first time
one, err = cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, app, *one)
assert.Equal(t, app.Guid, one.Guid)
assert.Equal(t, app.Name, one.Name)
assert.Equal(t, 2, fakeClient.callCount)

// updated app in fake client, new fetch should not have updated app
updatedApp := cfclient.App{
Guid: guid,
Memory: 2,
Guid: guid,
Name: "Bar",
}
fakeClient.app = updatedApp
two, err := cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, app, *two)
assert.Equal(t, app.Guid, two.Guid)
assert.Equal(t, app.Name, two.Name)
assert.Equal(t, 2, fakeClient.callCount)

// wait the ttl, then it should have updated app
time.Sleep(ttl)
three, err := cache.GetAppByGuid(guid)
assert.NoError(t, err)
assert.Equal(t, updatedApp, *three)
assert.Equal(t, updatedApp.Guid, three.Guid)
assert.Equal(t, updatedApp.Name, three.Name)
assert.Equal(t, 3, fakeClient.callCount)
}

Expand Down
Loading

0 comments on commit 76905a2

Please sign in to comment.