Skip to content

Commit

Permalink
Remove github.com/gravitational/ttlmap dependency (#46899)
Browse files Browse the repository at this point in the history
* Replace sessionCache with FnCache.

* Use FnCache in hostCertificate cache.

* Use FnCache for forwarder transport cache.

* Use a normal map for bpf arg collection.

* Remove github.com/gravitational/ttlmap dependency.

* Use FnCache for bfp.

* fix: add test coverage to new FnCache methods

* fix: handle cache error for bpf args

* fix: bring back transport cache ttl const

* fix: restore comments and fix spacing

* fix: tidy integrations/terraform module

* fix: app session cache should reload on transient errors

---------

Co-authored-by: Tim Ross <[email protected]>
  • Loading branch information
Joerger and rosstimothy authored Oct 8, 2024
1 parent c4ddf0c commit d40c0db
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 249 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ require (
github.com/gravitational/roundtrip v1.0.2
github.com/gravitational/teleport/api v0.0.0
github.com/gravitational/trace v1.4.0
github.com/gravitational/ttlmap v0.0.0-20171116003245-91fd36b9004c
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
github.com/guptarohit/asciigraph v0.7.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1536,8 +1536,6 @@ github.com/gravitational/saml v0.4.15-teleport.1 h1:kYSLpxEBEc7JLJJ+VjsZU8PbWI4g
github.com/gravitational/saml v0.4.15-teleport.1/go.mod h1:S4+611dxnKt8z/ulbvaJzcgSHsuhjVc1QHNTcr1R7Fw=
github.com/gravitational/trace v1.4.0 h1:TtTeMElVwMX21Udb1nmK2tpWYAAMJoyjevzKOaxIFZQ=
github.com/gravitational/trace v1.4.0/go.mod h1:g79NZzwCjWS/VVubYowaFAQsTjVTohGi0hFbIWSyGoY=
github.com/gravitational/ttlmap v0.0.0-20171116003245-91fd36b9004c h1:C2iWDiod8vQ3YnOiCdMP9qYeg2UifQ8KSk36r0NswSE=
github.com/gravitational/ttlmap v0.0.0-20171116003245-91fd36b9004c/go.mod h1:erKVikttPjeHKDCQZcqowEqiccy23cJAqPadZgfjNm8=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpSQwGEnkcRpTqNOIR6bJbR0gAorgP9CSALpRcKoAA=
Expand Down
2 changes: 0 additions & 2 deletions integrations/terraform/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1333,8 +1333,6 @@ github.com/gravitational/terraform-plugin-docs v0.19.5-0.20240627183239-7e7e22a2
github.com/gravitational/terraform-plugin-docs v0.19.5-0.20240627183239-7e7e22a2c1f6/go.mod h1:8eiBaRanEugPy3lh7UZ5NW6yaISaXXS4R56pi1D962k=
github.com/gravitational/trace v1.4.0 h1:TtTeMElVwMX21Udb1nmK2tpWYAAMJoyjevzKOaxIFZQ=
github.com/gravitational/trace v1.4.0/go.mod h1:g79NZzwCjWS/VVubYowaFAQsTjVTohGi0hFbIWSyGoY=
github.com/gravitational/ttlmap v0.0.0-20171116003245-91fd36b9004c h1:C2iWDiod8vQ3YnOiCdMP9qYeg2UifQ8KSk36r0NswSE=
github.com/gravitational/ttlmap v0.0.0-20171116003245-91fd36b9004c/go.mod h1:erKVikttPjeHKDCQZcqowEqiccy23cJAqPadZgfjNm8=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 h1:qnpSQwGEnkcRpTqNOIR6bJbR0gAorgP9CSALpRcKoAA=
Expand Down
50 changes: 28 additions & 22 deletions lib/bpf/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ import (
"unsafe"

"github.com/gravitational/trace"
"github.com/gravitational/ttlmap"

ossteleport "github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/constants"
apievents "github.com/gravitational/teleport/api/types/events"
controlgroup "github.com/gravitational/teleport/lib/cgroup"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/service/servicecfg"
"github.com/gravitational/teleport/lib/utils"
)

//go:embed bytecode
Expand Down Expand Up @@ -96,7 +96,7 @@ type Service struct {

// argsCache holds the arguments to execve because they come a different
// event than the result.
argsCache *ttlmap.TTLMap
argsCache *utils.FnCache

// closeContext is used to signal the BPF service is shutting down to all
// goroutines.
Expand Down Expand Up @@ -133,10 +133,8 @@ func New(config *servicecfg.BPFConfig) (bpf BPF, err error) {
closeContext, closeFunc := context.WithCancel(context.Background())

s := &Service{
BPFConfig: config,

watch: NewSessionWatch(),

BPFConfig: config,
watch: NewSessionWatch(),
closeContext: closeContext,
closeFunc: closeFunc,
}
Expand All @@ -157,8 +155,9 @@ func New(config *servicecfg.BPFConfig) (bpf BPF, err error) {
}
}()

// Create args cache used by the exec BPF program.
s.argsCache, err = ttlmap.New(ArgsCacheSize)
s.argsCache, err = utils.NewFnCache(utils.FnCacheConfig{
TTL: 24 * time.Hour,
})
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -364,27 +363,34 @@ func (s *Service) emitCommandEvent(eventBytes []byte) {
// Args are sent in their own event by execsnoop to save stack space. Store
// the args in a ttlmap, so they can be retrieved when the return event arrives.
case eventArg:
var buf []string
buffer, ok := s.argsCache.Get(strconv.FormatUint(event.PID, 10))
if !ok {
buf = make([]string, 0)
} else {
buf = buffer.([]string)
key := strconv.FormatUint(event.PID, 10)

args, err := utils.FnCacheGet(s.closeContext, s.argsCache, key, func(ctx context.Context) ([]string, error) {
return make([]string, 0), nil
})
if err != nil {
log.WithError(err).Warn("Unable to retrieve args from FnCahe - this is a bug!")
args = []string{}
}

argv := (*C.char)(unsafe.Pointer(&event.Argv))
buf = append(buf, C.GoString(argv))
s.argsCache.Set(strconv.FormatUint(event.PID, 10), buf, 24*time.Hour)
args = append(args, C.GoString(argv))

s.argsCache.SetWithTTL(key, args, 24*time.Hour)
// The event has returned, emit the fully parsed event.
case eventRet:
// The args should have come in a previous event, find them by PID.
args, ok := s.argsCache.Get(strconv.FormatUint(event.PID, 10))
if !ok {
key := strconv.FormatUint(event.PID, 10)

args, err := utils.FnCacheGet(s.closeContext, s.argsCache, key, func(ctx context.Context) ([]string, error) {
return nil, trace.NotFound("args missing")
})

if err != nil {
log.Debugf("Got event with missing args: skipping.")
lostCommandEvents.Add(float64(1))
return
}
argv := args.([]string)

// Emit "command" event.
sessionCommandEvent := &apievents.SessionCommand{
Expand Down Expand Up @@ -412,15 +418,15 @@ func (s *Service) emitCommandEvent(eventBytes []byte) {
},
PPID: event.PPID,
ReturnCode: event.ReturnCode,
Path: argv[0],
Argv: argv[1:],
Path: args[0],
Argv: args[1:],
}
if err := ctx.Emitter.EmitAuditEvent(ctx.Context, sessionCommandEvent); err != nil {
log.WithError(err).Warn("Failed to emit command event.")
}

// Now that the event has been processed, remove from cache.
s.argsCache.Remove(strconv.FormatUint(event.PID, 10))
s.argsCache.Remove(key)
}
}

Expand Down
13 changes: 8 additions & 5 deletions lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/google/uuid"
gwebsocket "github.com/gorilla/websocket"
"github.com/gravitational/trace"
"github.com/gravitational/ttlmap"
"github.com/jonboulle/clockwork"
"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -277,6 +276,9 @@ func (f *ForwarderConfig) CheckAndSetDefaults() error {
return nil
}

// transportCacheTTL is the TTL for the transport cache.
const transportCacheTTL = 5 * time.Hour

// NewForwarder returns new instance of Kubernetes request
// forwarding proxy.
func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
Expand All @@ -288,7 +290,10 @@ func NewForwarder(cfg ForwarderConfig) (*Forwarder, error) {
// deleting expired entried clusters and kube_servers entries.
// In the meantime, we need to make sure that the cache is cleaned
// from time to time.
transportClients, err := ttlmap.New(defaults.ClientCacheSize, ttlmap.Clock(cfg.Clock))
transportClients, err := utils.NewFnCache(utils.FnCacheConfig{
TTL: transportCacheTTL,
Clock: cfg.Clock,
})
if err != nil {
return nil, trace.Wrap(err)
}
Expand Down Expand Up @@ -385,9 +390,7 @@ type Forwarder struct {
// cachedTransport is a cache of cachedTransportEntry objects used to
// connect to Teleport services.
// TODO(tigrato): Implement a cache eviction policy using watchers.
cachedTransport *ttlmap.TTLMap
// cachedTransportMu is a mutex used to protect the cachedTransport.
cachedTransportMu sync.Mutex
cachedTransport *utils.FnCache
}

// cachedTransportEntry is a cached transport entry used to connect to
Expand Down
7 changes: 4 additions & 3 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/gravitational/trace"
"github.com/gravitational/ttlmap"
"github.com/jonboulle/clockwork"
"github.com/julienschmidt/httprouter"
"github.com/sirupsen/logrus"
Expand All @@ -58,7 +57,6 @@ import (
"github.com/gravitational/teleport/lib/auth/testauthority"
"github.com/gravitational/teleport/lib/authz"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/fixtures"
testingkubemock "github.com/gravitational/teleport/lib/kube/proxy/testing/kube_server"
"github.com/gravitational/teleport/lib/modules"
Expand Down Expand Up @@ -1105,7 +1103,10 @@ func TestKubeFwdHTTPProxyEnv(t *testing.T) {

func newMockForwader(ctx context.Context, t *testing.T) *Forwarder {
clock := clockwork.NewFakeClock()
cachedTransport, err := ttlmap.New(defaults.ClientCacheSize, ttlmap.Clock(clock))
cachedTransport, err := utils.NewFnCache(utils.FnCacheConfig{
TTL: transportCacheTTL,
Clock: clock,
})
require.NoError(t, err)
csrClient, err := newMockCSRClient(clock)
require.NoError(t, err)
Expand Down
68 changes: 29 additions & 39 deletions lib/kube/proxy/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,56 +59,46 @@ type dialContextFunc func(context.Context, string, string) (net.Conn, error)
// The transport is cached in the forwarder so that it can be reused for future
// requests. If the transport is not cached, a new one is created and cached.
func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (http.RoundTripper, *tls.Config, error) {
// transportCacheTTL is the TTL for the transport cache.
const transportCacheTTL = 5 * time.Hour
// If the cluster is remote, the key is the teleport cluster name.
// If the cluster is local, the key is the teleport cluster name and the kubernetes
// cluster name: <teleport-cluster-name>/<kubernetes-cluster-name>.
key := transportCacheKey(sess)

// Check if the transport is cached.
f.cachedTransportMu.Lock()
cachedI, ok := f.cachedTransport.Get(key)
f.cachedTransportMu.Unlock()
if ok {
if cached, ok := cachedI.(cachedTransportEntry); ok {
return cached.transport, cached.tlsConfig.Clone(), nil
t, err := utils.FnCacheGet(f.ctx, f.cachedTransport, key, func(ctx context.Context) (*cachedTransportEntry, error) {
var (
httpTransport http.RoundTripper
tlsConfig *tls.Config
err error
)
if sess.teleportCluster.isRemote {
// If the cluster is remote, create a new transport for the remote cluster.
httpTransport, tlsConfig, err = f.newRemoteClusterTransport(sess.teleportCluster.name)
} else if sess.kubeAPICreds != nil {
// If agent is running in agent mode, get the transport from the configured cluster
// credentials.
httpTransport, tlsConfig = sess.kubeAPICreds.getTransport(), sess.kubeAPICreds.getTLSConfig()
} else if f.cfg.ReverseTunnelSrv != nil {
// If agent is running in proxy mode, create a new transport for the local cluster.
httpTransport, tlsConfig, err = f.newLocalClusterTransport(sess.kubeClusterName)
} else {
return nil, trace.BadParameter("no reverse tunnel server or credentials provided")
}
}

var (
httpTransport http.RoundTripper
err error
tlsConfig *tls.Config
)
if sess.teleportCluster.isRemote {
// If the cluster is remote, create a new transport for the remote cluster.
httpTransport, tlsConfig, err = f.newRemoteClusterTransport(sess.teleportCluster.name)
} else if sess.kubeAPICreds != nil {
// If agent is running in agent mode, get the transport from the configured cluster
// credentials.
return sess.kubeAPICreds.getTransport(), sess.kubeAPICreds.getTLSConfig(), nil
} else if f.cfg.ReverseTunnelSrv != nil {
// If agent is running in proxy mode, create a new transport for the local cluster.
httpTransport, tlsConfig, err = f.newLocalClusterTransport(sess.kubeClusterName)
} else {
return nil, nil, trace.BadParameter("no reverse tunnel server or credentials provided")
}
if err != nil {
return nil, nil, trace.Wrap(err)
}
if err != nil {
return nil, trace.Wrap(err)
}

// Cache the transport.
f.cachedTransportMu.Lock()
f.cachedTransport.Set(key,
cachedTransportEntry{
return &cachedTransportEntry{
transport: httpTransport,
tlsConfig: tlsConfig,
},
transportCacheTTL)
f.cachedTransportMu.Unlock()
}, nil
})

if err != nil {
return nil, nil, trace.Wrap(err)
}

return httpTransport, tlsConfig.Clone(), nil
return t.transport, t.tlsConfig.Clone(), nil
}

// transportCacheKey returns a key used to cache transports.
Expand Down
Loading

0 comments on commit d40c0db

Please sign in to comment.