From eddc84c5599b01d434ac35ce1dc81bb2b5df6de1 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 31 Dec 2024 18:50:23 +0530 Subject: [PATCH 1/6] initial oauth2 token refresh logic done --- x-pack/filebeat/input/streaming/config.go | 6 ++ x-pack/filebeat/input/streaming/input.go | 2 + x-pack/filebeat/input/streaming/websocket.go | 65 ++++++++++++++++++-- 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index eea8c2afc704..1b63c1d31628 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -88,6 +88,12 @@ type oAuth2Config struct { EndpointParams map[string][]string `config:"endpoint_params"` Scopes []string `config:"scopes"` TokenURL string `config:"token_url"` + // accessToken is only used internally to set the initial headers via formHeader() if oauth2 is enabled + accessToken string +} + +func (o oAuth2Config) isEnabled() bool { + return o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != "" } type urlConfig struct { diff --git a/x-pack/filebeat/input/streaming/input.go b/x-pack/filebeat/input/streaming/input.go index 12a362625bff..07ba8dfd1afd 100644 --- a/x-pack/filebeat/input/streaming/input.go +++ b/x-pack/filebeat/input/streaming/input.go @@ -382,6 +382,8 @@ func formHeader(cfg config) map[string][]string { header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value} case cfg.Auth.BearerToken != "": header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken} + case cfg.Auth.OAuth2.accessToken != "": + header["Authorization"] = []string{"Bearer " + cfg.Auth.OAuth2.accessToken} case cfg.Auth.BasicToken != "": header["Authorization"] = []string{"Basic " + cfg.Auth.BasicToken} } diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 225bc76e8d9f..d1e0c500eabd 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -21,6 +21,7 @@ import ( "github.com/gorilla/websocket" "go.uber.org/zap/zapcore" + "golang.org/x/oauth2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/elastic-agent-libs/logp" @@ -31,11 +32,13 @@ import ( type websocketStream struct { processor - id string - cfg config - cursor map[string]any - - time func() time.Time + id string + cfg config + cursor map[string]any + isOauth2Enabled bool + tokenSource oauth2.TokenSource + tokenExpiry <-chan time.Time + time func() time.Time } // NewWebsocketFollower performs environment construction including CEL @@ -53,9 +56,35 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map redact: cfg.Redact, metrics: newInputMetrics(id), }, + tokenExpiry: nil, } s.metrics.url.Set(cfg.URL.String()) s.metrics.errorsTotal.Set(0) + // initialize the oauth2 token source if oauth2 is enabled and set access token in the config + if cfg.Auth.OAuth2.isEnabled() { + config := &oauth2.Config{ + ClientID: cfg.Auth.OAuth2.ClientID, + ClientSecret: cfg.Auth.OAuth2.ClientSecret, + Endpoint: oauth2.Endpoint{ + TokenURL: cfg.Auth.OAuth2.TokenURL, + }, + Scopes: cfg.Auth.OAuth2.Scopes, + } + s.tokenSource = config.TokenSource(ctx, nil) + s.isOauth2Enabled = true + // get the initial token + token, err := s.tokenSource.Token() + if err != nil { + s.metrics.errorsTotal.Inc() + s.Close() + return nil, fmt.Errorf("failed to obtain oauth2 token: %w", err) + } + // set the initial token in the config if oauth2 is enabled + // this allows seamless header creation in formHeader() for the initial connection + s.cfg.Auth.OAuth2.accessToken = token.AccessToken + // set the initial token expiry channel with buffer of 2 mins + s.tokenExpiry = time.After(time.Until(token.Expiry) - 120*time.Second) + } patterns, err := regexpsFromConfig(cfg) if err != nil { @@ -114,6 +143,32 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { case <-ctx.Done(): s.log.Debugw("context cancelled, closing websocket connection") return ctx.Err() + // s.tokenExpiry channel will only trigger if oauth2 is enabled and the token is about to expire + case <-s.tokenExpiry: + // get the new token + token, err := s.tokenSource.Token() + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to obtain oauth2 token during token refresh", "error", err) + return err + } + // gracefully close current connection + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + // set the new token in the config + s.cfg.Auth.OAuth2.accessToken = token.AccessToken + // set the new token expiry channel with 2 mins buffer + s.tokenExpiry = time.After(time.Until(token.Expiry) - 120*time.Second) + // establish a new connection with the new token + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.metrics, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to establish websocket connection on token refresh", "error", err) + return err + } default: _, message, err := c.ReadMessage() if err != nil { From 8370e651e9f210c23ecddeb02e1f197ff2cb3c3b Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 3 Jan 2025 18:06:33 +0530 Subject: [PATCH 2/6] working oauth2 implementation with tests --- x-pack/filebeat/input/streaming/config.go | 41 +++- .../filebeat/input/streaming/config_test.go | 73 ++++++ x-pack/filebeat/input/streaming/input.go | 8 +- .../filebeat/input/streaming/input_manager.go | 1 - x-pack/filebeat/input/streaming/input_test.go | 217 +++++++++++++++++- x-pack/filebeat/input/streaming/websocket.go | 49 +++- 6 files changed, 359 insertions(+), 30 deletions(-) diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 1b63c1d31628..08eebf7262d2 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -12,10 +12,17 @@ import ( "regexp" "time" + "golang.org/x/oauth2" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) +const ( + authStyleInHeader = "in_header" + authStyleInParams = "in_params" +) + type config struct { // Type is the type of the stream being followed. The // zero value indicates websocket. @@ -83,11 +90,13 @@ type customAuthConfig struct { type oAuth2Config struct { // common oauth fields - ClientID string `config:"client_id"` - ClientSecret string `config:"client_secret"` - EndpointParams map[string][]string `config:"endpoint_params"` - Scopes []string `config:"scopes"` - TokenURL string `config:"token_url"` + AuthStyle string `config:"auth_style"` + ClientID string `config:"client_id"` + ClientSecret string `config:"client_secret"` + EndpointParams url.Values `config:"endpoint_params"` + Scopes []string `config:"scopes"` + TokenExpiryBuffer time.Duration `config:"token_expiry_buffer" validate:"min=0"` + TokenURL string `config:"token_url"` // accessToken is only used internally to set the initial headers via formHeader() if oauth2 is enabled accessToken string } @@ -96,6 +105,17 @@ func (o oAuth2Config) isEnabled() bool { return o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != "" } +func (o oAuth2Config) getAuthStyle() oauth2.AuthStyle { + switch o.AuthStyle { + case authStyleInHeader: + return oauth2.AuthStyleInHeader + case authStyleInParams: + return oauth2.AuthStyleInParams + default: + return oauth2.AuthStyleAutoDetect + } +} + type urlConfig struct { *url.URL } @@ -148,6 +168,12 @@ func (c config) Validate() error { return errors.New("wait_min must be less than or equal to wait_max") } } + + if c.Auth.OAuth2.isEnabled() { + if c.Auth.OAuth2.AuthStyle != authStyleInHeader && c.Auth.OAuth2.AuthStyle != authStyleInParams && c.Auth.OAuth2.AuthStyle != "" { + return fmt.Errorf("unsupported auth style: %s", c.Auth.OAuth2.AuthStyle) + } + } return nil } @@ -177,6 +203,11 @@ func defaultConfig() config { Transport: httpcommon.HTTPTransportSettings{ Timeout: 180 * time.Second, }, + Auth: authConfig{ + OAuth2: oAuth2Config{ + TokenExpiryBuffer: 2 * time.Minute, + }, + }, Retry: &retry{ MaxAttempts: 5, WaitMin: 1 * time.Second, diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 840c35d400fa..b58f17bc7628 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -130,6 +130,79 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_authStyle_default", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "valid_authStyle_in_params", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_params", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "valid_authStyle_in_header", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_header", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "invalid_authStyle", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_query", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("unsupported auth style: in_query accessing config"), + }, + { + name: "valid_tokenExpiryBuffer", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + "token_expiry_buffer": "5m", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "invalid_tokenExpiryBuffer", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + "token_expiry_buffer": "-1s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("requires duration >= 0 accessing 'auth.token_expiry_buffer'"), + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input.go b/x-pack/filebeat/input/streaming/input.go index 07ba8dfd1afd..3df6b3845548 100644 --- a/x-pack/filebeat/input/streaming/input.go +++ b/x-pack/filebeat/input/streaming/input.go @@ -378,14 +378,14 @@ func errorMessage(msg string) map[string]interface{} { func formHeader(cfg config) map[string][]string { header := make(map[string][]string) switch { - case cfg.Auth.CustomAuth != nil: - header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value} - case cfg.Auth.BearerToken != "": - header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken} case cfg.Auth.OAuth2.accessToken != "": header["Authorization"] = []string{"Bearer " + cfg.Auth.OAuth2.accessToken} + case cfg.Auth.BearerToken != "": + header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken} case cfg.Auth.BasicToken != "": header["Authorization"] = []string{"Basic " + cfg.Auth.BasicToken} + case cfg.Auth.CustomAuth != nil: + header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value} } return header } diff --git a/x-pack/filebeat/input/streaming/input_manager.go b/x-pack/filebeat/input/streaming/input_manager.go index c685452c34f1..6a1bd8bc5a46 100644 --- a/x-pack/filebeat/input/streaming/input_manager.go +++ b/x-pack/filebeat/input/streaming/input_manager.go @@ -38,7 +38,6 @@ func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, erro if err := cfg.Unpack(&src.cfg); err != nil { return nil, nil, err } - if src.cfg.Program == "" { // set default program src.cfg.Program = ` diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index c11784ea3dbf..70ba545b3abf 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -43,7 +43,9 @@ var inputTests = []struct { name string server func(*testing.T, WebSocketHandler, map[string]interface{}, []string) proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server + oauth2Server func(*testing.T, http.HandlerFunc, map[string]interface{}) handler WebSocketHandler + oauth2Handler http.HandlerFunc config map[string]interface{} response []string time func() time.Time @@ -417,13 +419,13 @@ var inputTests = []struct { }, }, response: []string{` - { - "pps": { - "agent": "example.proofpoint.com", - "cid": "mmeng_uivm071" - }, - "ts": 1502908200 - }`, + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, }, want: []map[string]interface{}{ { @@ -586,6 +588,171 @@ var inputTests = []struct { }, }, }, + { + name: "oauth2_blank_auth_style", + oauth2Server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + s := httptest.NewServer(h) + config["auth.token_url"] = s.URL + "/token" + config["url"] = "ws://placeholder" + t.Cleanup(s.Close) + }, + oauth2Handler: oauth2TokenHandler, + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "scopes": []string{ + "scope1", + "scope2", + }, + "endpoint_params": map[string]string{ + "param1": "v1", + }, + }, + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, + { + name: "oauth2_in_params_auth_style", + oauth2Server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + s := httptest.NewServer(h) + config["auth.token_url"] = s.URL + "/token" + config["url"] = "ws://placeholder" + t.Cleanup(s.Close) + }, + oauth2Handler: oauth2TokenHandler, + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_params", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "scopes": []string{ + "scope1", + "scope2", + }, + "endpoint_params": map[string]string{ + "param1": "v1", + }, + }, + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, } var urlEvalTests = []struct { @@ -693,6 +860,9 @@ func TestInput(t *testing.T) { logp.TestingSetup() for _, test := range inputTests { t.Run(test.name, func(t *testing.T) { + if test.oauth2Server != nil { + test.oauth2Server(t, test.oauth2Handler, test.config) + } if test.server != nil { test.server(t, test.handler, test.config, test.response) } @@ -870,7 +1040,7 @@ func webSocketTestServerWithAuth(serve func(http.Handler) *httptest.Server) func handler(t, conn, response) })) // only set the resource URL if it is not already set - if config["url"] == nil { + if config["url"] == nil || config["url"] == "ws://placeholder" { config["url"] = "ws" + server.URL[4:] } t.Cleanup(server.Close) @@ -1029,3 +1199,34 @@ func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config config["proxy_url"] = "ws" + backendServer.URL[4:] return httptest.NewServer(webSocketProxyHandler(config["url"].(string))) } + +//nolint:errcheck // no point checking errors in test server. +func oauth2TokenHandler(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/token" { + return + } + w.Header().Set("content-type", "application/json") + r.ParseForm() + switch { + case r.Method != http.MethodPost: + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong method"}`)) + case r.FormValue("grant_type") != "client_credentials": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong grant_type"}`)) + case r.FormValue("client_id") != "a_client_id": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong client_id"}`)) + case r.FormValue("client_secret") != "a_client_secret": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong client_secret"}`)) + case r.FormValue("scope") != "scope1 scope2": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong scope"}`)) + case r.FormValue("param1") != "v1": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong param1"}`)) + default: + w.Write([]byte(`{"token_type": "Bearer", "expires_in": "3600", "access_token": "` + bearerToken + `"}`)) + } +} diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index d1e0c500eabd..5746f9040cbe 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -22,6 +22,7 @@ import ( "github.com/gorilla/websocket" "go.uber.org/zap/zapcore" "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/elastic-agent-libs/logp" @@ -41,6 +42,23 @@ type websocketStream struct { time func() time.Time } +type loggingRoundTripper struct { + rt http.RoundTripper + log *logp.Logger +} + +func (l *loggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := l.rt.RoundTrip(req) + // avoided logging request and and response body as it may contain sensitive information and can be huge + if l.log.Core().Enabled(zapcore.DebugLevel) { + l.log.Debugf("request: %v %v\nHeaders: %v\n", req.Method, req.URL, req.Header) + if err == nil { + l.log.Debugf("response: %v\nHeaders: %v\n", resp.Status, resp.Header) + } + } + return resp, err +} + // NewWebsocketFollower performs environment construction including CEL // program and regexp compilation, and input metrics set-up for a websocket // stream follower. @@ -56,21 +74,28 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map redact: cfg.Redact, metrics: newInputMetrics(id), }, + // this will never trigger unless a valid expiry time is assigned tokenExpiry: nil, } s.metrics.url.Set(cfg.URL.String()) s.metrics.errorsTotal.Set(0) // initialize the oauth2 token source if oauth2 is enabled and set access token in the config if cfg.Auth.OAuth2.isEnabled() { - config := &oauth2.Config{ - ClientID: cfg.Auth.OAuth2.ClientID, - ClientSecret: cfg.Auth.OAuth2.ClientSecret, - Endpoint: oauth2.Endpoint{ - TokenURL: cfg.Auth.OAuth2.TokenURL, - }, - Scopes: cfg.Auth.OAuth2.Scopes, + config := &clientcredentials.Config{ + AuthStyle: cfg.Auth.OAuth2.getAuthStyle(), + ClientID: cfg.Auth.OAuth2.ClientID, + ClientSecret: cfg.Auth.OAuth2.ClientSecret, + TokenURL: cfg.Auth.OAuth2.TokenURL, + Scopes: cfg.Auth.OAuth2.Scopes, + EndpointParams: cfg.Auth.OAuth2.EndpointParams, } - s.tokenSource = config.TokenSource(ctx, nil) + // injecting a custom http client with loggingRoundTripper to debug log the request and response for oauth2 token + client := &http.Client{ + Transport: &loggingRoundTripper{http.DefaultTransport, log}, + } + oauth2Ctx := context.WithValue(ctx, oauth2.HTTPClient, client) + + s.tokenSource = config.TokenSource(oauth2Ctx) s.isOauth2Enabled = true // get the initial token token, err := s.tokenSource.Token() @@ -83,7 +108,7 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map // this allows seamless header creation in formHeader() for the initial connection s.cfg.Auth.OAuth2.accessToken = token.AccessToken // set the initial token expiry channel with buffer of 2 mins - s.tokenExpiry = time.After(time.Until(token.Expiry) - 120*time.Second) + s.tokenExpiry = time.After(time.Until(token.Expiry) - cfg.Auth.OAuth2.TokenExpiryBuffer) } patterns, err := regexpsFromConfig(cfg) @@ -155,18 +180,18 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { // gracefully close current connection if err := c.Close(); err != nil { s.metrics.errorsTotal.Inc() - s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + s.log.Errorw("encountered an error while closing the existing websocket connection during token refresh", "error", err) } // set the new token in the config s.cfg.Auth.OAuth2.accessToken = token.AccessToken // set the new token expiry channel with 2 mins buffer - s.tokenExpiry = time.After(time.Until(token.Expiry) - 120*time.Second) + s.tokenExpiry = time.After(time.Until(token.Expiry) - s.cfg.Auth.OAuth2.TokenExpiryBuffer) // establish a new connection with the new token c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) handleConnectionResponse(resp, s.metrics, s.log) if err != nil { s.metrics.errorsTotal.Inc() - s.log.Errorw("failed to establish websocket connection on token refresh", "error", err) + s.log.Errorw("failed to establish a new websocket connection on token refresh", "error", err) return err } default: From 538cd43b2ff1135352fb68845411ae59ca781726 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 3 Jan 2025 22:28:27 +0530 Subject: [PATCH 3/6] updated documentations --- .../docs/inputs/input-streaming.asciidoc | 45 ++++++++++++++++++- x-pack/filebeat/input/streaming/websocket.go | 2 +- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 7f07fb4954f6..5239c7a78edd 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -20,6 +20,7 @@ The websocket streaming input supports: ** Basic ** Bearer ** Custom +** OAuth2.0 NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart. @@ -113,7 +114,7 @@ This will include any sensitive or secret information kept in the `state` object ==== Authentication -The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively. +The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication, authentication via a custom auth config and OAuth2 based authentication. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively. Example configurations with authentication: @@ -166,6 +167,48 @@ filebeat.inputs: token_url: https://api.crowdstrike.com/oauth2/token ---- +==== Websocket OAuth2.0 + +The `websocket` streaming input supports OAuth2.0 authentication. The `auth` configuration field is used to specify the OAuth2.0 configuration. These values are not exposed to the `state` object. + +The `auth` configuration field has the following subfields: + + - `client_id`: The client ID to use for OAuth2.0 authentication. + - `client_secret`: The client secret to use for OAuth2.0 authentication. + - `token_url`: The token URL to use for OAuth2.0 authentication. + - `scopes`: The scopes to use for OAuth2.0 authentication. + - `endpoint_params`: The endpoint parameters to use for OAuth2.0 authentication. + - `auth_style`: The authentication style to use for OAuth2.0 authentication. The default value is `AuthStyleAutoDetect`. + - `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The default value is `2m`. + +**Explanations for `auth_style` and `token_expiry_buffer`:** + +- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default value is `AuthStyleAutoDetect` and is configured internally if no value is provided. With `AuthStyleAutoDetect`, the OAuth2 client decides the authentication style automatically based on internal handshakes with the OAuth2 server. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values: + + * `in_header`: The `client_id` and `client_secret` is sent in the header as a base64 encoded `Authorization` header. + * `in_params`: The `client_id` and `client_secret` is sent in the request body along with the other OAuth2 parameters. + +- `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The `token_expiry_buffer` is used as a safety net to ensure that the token does not expire before the input can refresh it. The `token_expiry_buffer` configuration field is optional. If the `token_expiry_buffer` configuration field is not set, the default value of `2m` is used. + +NOTE: We recommend leaving the `auth_style` configuration field unset (`AuthStyleAutoDetect` is used internally) for most scenarios, except where manual intervention is required. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: streaming + auth: + client_id: a23fcea2643868ef1a41565a1a8a1c7c + client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK + token_url: https://api.sample-url.com/oauth2/token + scopes: ["read", "write"] + endpoint_params: + param1: value1 + param2: value2 + auth_style: in_params + token_expiry_buffer: 5m + url: wss://localhost:443/_stream +---- + [[input-state-streaming]] ==== Input state diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 5746f9040cbe..d72dde995971 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -89,7 +89,7 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map Scopes: cfg.Auth.OAuth2.Scopes, EndpointParams: cfg.Auth.OAuth2.EndpointParams, } - // injecting a custom http client with loggingRoundTripper to debug log the request and response for oauth2 token + // injecting a custom http client with loggingRoundTripper to debug-log request and response attributes for oauth2 token client := &http.Client{ Transport: &loggingRoundTripper{http.DefaultTransport, log}, } From 171a55da85377eddc96b437acc5ec2e217e6289e Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 3 Jan 2025 22:40:10 +0530 Subject: [PATCH 4/6] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7107b4197c10..69616b44deb3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -377,6 +377,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] - Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094] +- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212] *Auditbeat* From 589c8c82304862c92424e6cb72580654304b799a Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 3 Jan 2025 22:51:39 +0530 Subject: [PATCH 5/6] removed unused variable --- x-pack/filebeat/input/streaming/websocket.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index d72dde995971..486db1f204d4 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -33,13 +33,12 @@ import ( type websocketStream struct { processor - id string - cfg config - cursor map[string]any - isOauth2Enabled bool - tokenSource oauth2.TokenSource - tokenExpiry <-chan time.Time - time func() time.Time + id string + cfg config + cursor map[string]any + tokenSource oauth2.TokenSource + tokenExpiry <-chan time.Time + time func() time.Time } type loggingRoundTripper struct { @@ -94,9 +93,7 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map Transport: &loggingRoundTripper{http.DefaultTransport, log}, } oauth2Ctx := context.WithValue(ctx, oauth2.HTTPClient, client) - s.tokenSource = config.TokenSource(oauth2Ctx) - s.isOauth2Enabled = true // get the initial token token, err := s.tokenSource.Token() if err != nil { From 38d95787096ee3fb3ad1f99e59c7c8400733bb2c Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 7 Jan 2025 18:29:51 +0530 Subject: [PATCH 6/6] resolved merge conflicts and addressed Chris's suggested PR changes --- .../docs/inputs/input-streaming.asciidoc | 8 +++---- .../filebeat/input/streaming/config_test.go | 22 +++++++++---------- x-pack/filebeat/input/streaming/websocket.go | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index bebb5260bef1..1ee343e4a9b7 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -178,19 +178,19 @@ The `auth` configuration field has the following subfields: - `token_url`: The token URL to use for OAuth2.0 authentication. - `scopes`: The scopes to use for OAuth2.0 authentication. - `endpoint_params`: The endpoint parameters to use for OAuth2.0 authentication. - - `auth_style`: The authentication style to use for OAuth2.0 authentication. The default value is `AuthStyleAutoDetect`. - - `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The default value is `2m`. + - `auth_style`: The authentication style to use for OAuth2.0 authentication. If left unset, the style will be automatically detected. + - `token_expiry_buffer`: Minimum valid time remaining before attempting an OAuth2 token renewal. The default value is `2m`. **Explanations for `auth_style` and `token_expiry_buffer`:** -- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default value is `AuthStyleAutoDetect` and is configured internally if no value is provided. With `AuthStyleAutoDetect`, the OAuth2 client decides the authentication style automatically based on internal handshakes with the OAuth2 server. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values: +- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default style value is automatically inferred and used appropriately if no value is provided. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values: * `in_header`: The `client_id` and `client_secret` is sent in the header as a base64 encoded `Authorization` header. * `in_params`: The `client_id` and `client_secret` is sent in the request body along with the other OAuth2 parameters. - `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The `token_expiry_buffer` is used as a safety net to ensure that the token does not expire before the input can refresh it. The `token_expiry_buffer` configuration field is optional. If the `token_expiry_buffer` configuration field is not set, the default value of `2m` is used. -NOTE: We recommend leaving the `auth_style` configuration field unset (`AuthStyleAutoDetect` is used internally) for most scenarios, except where manual intervention is required. +NOTE: We recommend leaving the `auth_style` configuration field unset (automatically inferred internally) for most scenarios, except where manual intervention is required. ["source","yaml",subs="attributes"] ---- diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 5adb9b3ed81f..99b3cc805597 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -130,17 +130,6 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, - { - name: "valid_authStyle_default", - config: map[string]interface{}{ - "auth": map[string]interface{}{ - "client_id": "a_client_id", - "client_secret": "a_client_secret", - "token_url": "https://localhost:443/token", - }, - "url": "wss://localhost:443/v1/stream", - }, - }, { name: "valid_retry_with_infinite", config: map[string]interface{}{ @@ -153,6 +142,17 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_authStyle_default", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, { name: "valid_authStyle_in_params", config: map[string]interface{}{ diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 375101d81c12..eeb89ad5c9b8 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -73,7 +73,7 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map redact: cfg.Redact, metrics: newInputMetrics(id), }, - // this will never trigger unless a valid expiry time is assigned + // the token expiry handler will never trigger unless a valid expiry time is assigned tokenExpiry: nil, } s.metrics.url.Set(cfg.URL.String())