diff --git a/hack/e2e/values.venafi-kubernetes-agent.yaml b/hack/e2e/values.venafi-kubernetes-agent.yaml index 18d5a073..aead734f 100644 --- a/hack/e2e/values.venafi-kubernetes-agent.yaml +++ b/hack/e2e/values.venafi-kubernetes-agent.yaml @@ -9,5 +9,7 @@ authentication: enabled: true extraArgs: -- --logging-format=json -- --log-level=2 +- --logging-format=text +# Show trace logs for the venafi-connection-lib client +# See https://github.com/jetstack/venafi-connection-lib/blob/13c2342fe0140ff084d2aabfd29ae3d10721691b/internal/http_client/metrics_transport.go#L93-L115 +- --vmodule=metrics_transport=6 diff --git a/pkg/agent/run.go b/pkg/agent/run.go index 54832dbe..091c9338 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -231,8 +231,6 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { // If any of the go routines exit (with nil or error) the main context will // be cancelled, which will cause this blocking loop to exit // instead of waiting for the time period. - // TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't - // have to wait for it to finish before exiting the process. for { if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil { return err @@ -397,9 +395,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client if config.AuthMode == VenafiCloudKeypair || config.AuthMode == VenafiCloudVenafiConnection { // orgID and clusterID are not required for Venafi Cloud auth - // TODO(wallrj): Pass the context to PostDataReadingsWithOptions, so - // that its network operations can be cancelled. - err := preflightClient.PostDataReadingsWithOptions(readings, client.Options{ + err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{ ClusterName: config.ClusterID, ClusterDescription: config.ClusterDescription, }) @@ -427,10 +423,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client if path == "" { path = "/api/v1/datareadings" } - // TODO(wallrj): Pass the context to Post, so that its network - // operations can be cancelled. - res, err := preflightClient.Post(path, bytes.NewBuffer(data)) - + res, err := preflightClient.Post(ctx, path, bytes.NewBuffer(data)) if err != nil { return fmt.Errorf("failed to post data: %+v", err) } @@ -453,9 +446,7 @@ func postData(ctx context.Context, config CombinedConfig, preflightClient client return fmt.Errorf("post to server failed: missing clusterID from agent configuration") } - // TODO(wallrj): Pass the context to PostDataReadings, so - // that its network operations can be cancelled. - err := preflightClient.PostDataReadings(config.OrganizationID, config.ClusterID, readings) + err := preflightClient.PostDataReadings(ctx, config.OrganizationID, config.ClusterID, readings) if err != nil { return fmt.Errorf("post to server failed: %+v", err) } diff --git a/pkg/client/client.go b/pkg/client/client.go index fef5be65..fea102bb 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -1,6 +1,7 @@ package client import ( + "context" "fmt" "io" "net/http" @@ -29,9 +30,9 @@ type ( // The Client interface describes types that perform requests against the Jetstack Secure backend. Client interface { - PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error - PostDataReadingsWithOptions(readings []*api.DataReading, options Options) error - Post(path string, body io.Reader) (*http.Response, error) + PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error + PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, options Options) error + Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) } // The Credentials interface describes methods for credential types to implement for verification. diff --git a/pkg/client/client_api_token.go b/pkg/client/client_api_token.go index 2ee04cb1..ad94e2f1 100644 --- a/pkg/client/client_api_token.go +++ b/pkg/client/client_api_token.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -40,13 +41,13 @@ func NewAPITokenClient(agentMetadata *api.AgentMetadata, apiToken, baseURL strin // PostDataReadingsWithOptions uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later // viewing in the user-interface. -func (c *APITokenClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error { - return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings) +func (c *APITokenClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error { + return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings) } // PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later // viewing in the user-interface. -func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error { +func (c *APITokenClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error { payload := api.DataReadingsPost{ AgentMetadata: c.agentMetadata, DataGatherTime: time.Now().UTC(), @@ -57,7 +58,7 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a return err } - res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data)) + res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data)) if err != nil { return err } @@ -77,8 +78,8 @@ func (c *APITokenClient) PostDataReadings(orgID, clusterID string, readings []*a } // Post performs an HTTP POST request. -func (c *APITokenClient) Post(path string, body io.Reader) (*http.Response, error) { - req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body) +func (c *APITokenClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body) if err != nil { return nil, err } diff --git a/pkg/client/client_oauth.go b/pkg/client/client_oauth.go index 89aca601..b1df16fa 100644 --- a/pkg/client/client_oauth.go +++ b/pkg/client/client_oauth.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -97,13 +98,13 @@ func NewOAuthClient(agentMetadata *api.AgentMetadata, credentials *OAuthCredenti }, nil } -func (c *OAuthClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error { - return c.PostDataReadings(opts.OrgID, opts.ClusterID, readings) +func (c *OAuthClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error { + return c.PostDataReadings(ctx, opts.OrgID, opts.ClusterID, readings) } // PostDataReadings uploads the slice of api.DataReading to the Jetstack Secure backend to be processed for later // viewing in the user-interface. -func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api.DataReading) error { +func (c *OAuthClient) PostDataReadings(ctx context.Context, orgID, clusterID string, readings []*api.DataReading) error { payload := api.DataReadingsPost{ AgentMetadata: c.agentMetadata, DataGatherTime: time.Now().UTC(), @@ -114,7 +115,7 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api. return err } - res, err := c.Post(filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data)) + res, err := c.Post(ctx, filepath.Join("/api/v1/org", orgID, "datareadings", clusterID), bytes.NewBuffer(data)) if err != nil { return err } @@ -134,13 +135,13 @@ func (c *OAuthClient) PostDataReadings(orgID, clusterID string, readings []*api. } // Post performs an HTTP POST request. -func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error) { - token, err := c.getValidAccessToken() +func (c *OAuthClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) { + token, err := c.getValidAccessToken(ctx) if err != nil { return nil, err } - req, err := http.NewRequest(http.MethodPost, fullURL(c.baseURL, path), body) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(c.baseURL, path), body) if err != nil { return nil, err } @@ -157,9 +158,9 @@ func (c *OAuthClient) Post(path string, body io.Reader) (*http.Response, error) // getValidAccessToken returns a valid access token. It will fetch a new access // token from the auth server in case the current access token does not exist // or it is expired. -func (c *OAuthClient) getValidAccessToken() (*accessToken, error) { +func (c *OAuthClient) getValidAccessToken(ctx context.Context) (*accessToken, error) { if c.accessToken.needsRenew() { - err := c.renewAccessToken() + err := c.renewAccessToken(ctx) if err != nil { return nil, err } @@ -168,7 +169,7 @@ func (c *OAuthClient) getValidAccessToken() (*accessToken, error) { return c.accessToken, nil } -func (c *OAuthClient) renewAccessToken() error { +func (c *OAuthClient) renewAccessToken(ctx context.Context) error { tokenURL := fmt.Sprintf("https://%s/oauth/token", c.credentials.AuthServerDomain) audience := "https://preflight.jetstack.io/api/v1" payload := url.Values{} @@ -178,7 +179,7 @@ func (c *OAuthClient) renewAccessToken() error { payload.Set("audience", audience) payload.Set("username", c.credentials.UserID) payload.Set("password", c.credentials.UserSecret) - req, err := http.NewRequest("POST", tokenURL, strings.NewReader(payload.Encode())) + req, err := http.NewRequestWithContext(ctx, "POST", tokenURL, strings.NewReader(payload.Encode())) if err != nil { return errors.WithStack(err) } @@ -188,7 +189,8 @@ func (c *OAuthClient) renewAccessToken() error { if err != nil { return errors.WithStack(err) } - + // TODO(wallrj): This will block. Read the body incrementally and check for + // context cancellation. body, err := io.ReadAll(res.Body) if err != nil { return errors.WithStack(err) diff --git a/pkg/client/client_venafi_cloud.go b/pkg/client/client_venafi_cloud.go index 7e317faf..eec7a952 100644 --- a/pkg/client/client_venafi_cloud.go +++ b/pkg/client/client_venafi_cloud.go @@ -2,6 +2,7 @@ package client import ( "bytes" + "context" "crypto" "crypto/ecdsa" "crypto/ed25519" @@ -168,7 +169,7 @@ func (c *VenafiSvcAccountCredentials) IsClientSet() (ok bool, why string) { // PostDataReadingsWithOptions uploads the slice of api.DataReading to the Venafi Cloud backend to be processed. // The Options are then passed as URL params in the request -func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error { +func (c *VenafiCloudClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error { payload := api.DataReadingsPost{ AgentMetadata: c.agentMetadata, DataGatherTime: time.Now().UTC(), @@ -199,7 +200,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead } venafiCloudUploadURL.RawQuery = query.Encode() - res, err := c.Post(venafiCloudUploadURL.String(), bytes.NewBuffer(data)) + res, err := c.Post(ctx, venafiCloudUploadURL.String(), bytes.NewBuffer(data)) if err != nil { return err } @@ -219,7 +220,7 @@ func (c *VenafiCloudClient) PostDataReadingsWithOptions(readings []*api.DataRead // PostDataReadings uploads the slice of api.DataReading to the Venafi Cloud backend to be processed for later // viewing in the user-interface. -func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api.DataReading) error { +func (c *VenafiCloudClient) PostDataReadings(ctx context.Context, _ string, _ string, readings []*api.DataReading) error { // orgID and clusterID are ignored in Venafi Cloud auth payload := api.DataReadingsPost{ @@ -235,7 +236,7 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api if !strings.HasSuffix(c.uploadPath, "/") { c.uploadPath = fmt.Sprintf("%s/", c.uploadPath) } - res, err := c.Post(filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data)) + res, err := c.Post(ctx, filepath.Join(c.uploadPath, c.uploaderID), bytes.NewBuffer(data)) if err != nil { return err } @@ -254,8 +255,8 @@ func (c *VenafiCloudClient) PostDataReadings(_ string, _ string, readings []*api } // Post performs an HTTP POST request. -func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, error) { - token, err := c.getValidAccessToken() +func (c *VenafiCloudClient) Post(ctx context.Context, path string, body io.Reader) (*http.Response, error) { + token, err := c.getValidAccessToken(ctx) if err != nil { return nil, err } @@ -278,9 +279,9 @@ func (c *VenafiCloudClient) Post(path string, body io.Reader) (*http.Response, e // getValidAccessToken returns a valid access token. It will fetch a new access // token from the auth server in case the current access token does not exist // or it is expired. -func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, error) { +func (c *VenafiCloudClient) getValidAccessToken(ctx context.Context) (*venafiCloudAccessToken, error) { if c.accessToken == nil || time.Now().Add(time.Minute).After(c.accessToken.expirationTime) { - err := c.updateAccessToken() + err := c.updateAccessToken(ctx) if err != nil { return nil, err } @@ -289,7 +290,7 @@ func (c *VenafiCloudClient) getValidAccessToken() (*venafiCloudAccessToken, erro return c.accessToken, nil } -func (c *VenafiCloudClient) updateAccessToken() error { +func (c *VenafiCloudClient) updateAccessToken(ctx context.Context) error { jwtToken, err := c.generateAndSignJwtToken() if err != nil { return err @@ -302,7 +303,7 @@ func (c *VenafiCloudClient) updateAccessToken() error { tokenURL := fullURL(c.baseURL, accessTokenEndpoint) encoded := values.Encode() - request, err := http.NewRequest(http.MethodPost, tokenURL, strings.NewReader(encoded)) + request, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenURL, strings.NewReader(encoded)) if err != nil { return err } diff --git a/pkg/client/client_venconn.go b/pkg/client/client_venconn.go index d8553624..606331e9 100644 --- a/pkg/client/client_venconn.go +++ b/pkg/client/client_venconn.go @@ -123,12 +123,12 @@ func (c *VenConnClient) Start(ctx context.Context) error { // `opts.ClusterName` and `opts.ClusterDescription` are the only values used // from the Options struct. OrgID and ClusterID are not used in Venafi Cloud. -func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading, opts Options) error { +func (c *VenConnClient) PostDataReadingsWithOptions(ctx context.Context, readings []*api.DataReading, opts Options) error { if opts.ClusterName == "" { return fmt.Errorf("programmer mistake: the cluster name (aka `cluster_id` in the config file) cannot be left empty") } - _, token, err := c.connHandler.Get(context.Background(), c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS}) + _, token, err := c.connHandler.Get(ctx, c.installNS, auth.Scope{}, types.NamespacedName{Name: c.venConnName, Namespace: c.venConnNS}) if err != nil { return fmt.Errorf("while loading the VenafiConnection %s/%s: %w", c.venConnNS, c.venConnName, err) } @@ -161,7 +161,7 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading, // The path parameter "no" is a dummy parameter to make the Venafi Cloud // backend happy. This parameter, named `uploaderID` in the backend, is not // actually used by the backend. - req, err := http.NewRequest(http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fullURL(token.BaseURL, "/v1/tlspk/upload/clusterdata/no"), encodedBody) if err != nil { return err } @@ -206,13 +206,13 @@ func (c *VenConnClient) PostDataReadingsWithOptions(readings []*api.DataReading, // Cloud needs a `clusterName` and `clusterDescription`, but this function can // only pass `orgID` and `clusterID` which are both useless in Venafi Cloud. Use // PostDataReadingsWithOptions instead. -func (c *VenConnClient) PostDataReadings(_orgID, _clusterID string, readings []*api.DataReading) error { +func (c *VenConnClient) PostDataReadings(_ context.Context, _orgID, _clusterID string, readings []*api.DataReading) error { return fmt.Errorf("programmer mistake: PostDataReadings is not implemented for Venafi Cloud") } // Post isn't implemented for Venafi Cloud because /v1/tlspk/upload/clusterdata // requires using the query parameters `name` and `description` which can't be // set using Post. Use PostDataReadingsWithOptions instead. -func (c *VenConnClient) Post(path string, body io.Reader) (*http.Response, error) { +func (c *VenConnClient) Post(_ context.Context, path string, body io.Reader) (*http.Response, error) { return nil, fmt.Errorf("programmer mistake: Post is not implemented for Venafi Cloud") }