From 78a6e0e5c1c658f3eb5029548925c742db684cf3 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Mon, 2 Dec 2024 20:49:41 +0530 Subject: [PATCH 1/7] initial commit for tls & proxy support for websocket --- x-pack/filebeat/input/streaming/config.go | 1 - x-pack/filebeat/input/streaming/websocket.go | 55 ++++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 67ee6e1eb318..590289ae12ab 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -43,7 +43,6 @@ type config struct { Retry *retry `config:"retry"` Transport httpcommon.HTTPTransportSettings `config:",inline"` - // CrowdstrikeAppID is the value used to set the // appId request parameter in the FalconHose stream // discovery request. diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 0c8de94f5ad3..edaf2d9dff40 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -7,6 +7,7 @@ package streaming import ( "bytes" "context" + "crypto/tls" "errors" "fmt" "io" @@ -14,6 +15,7 @@ import ( "math/rand/v2" "net" "net/http" + "net/url" "strings" "time" @@ -22,6 +24,8 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) type websocketStream struct { @@ -220,11 +224,14 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log var response *http.Response var err error headers := formHeader(cfg) - + dialer, err := createWebSocketDialer(cfg) + if err != nil { + return nil, nil, err + } if cfg.Retry != nil { retryConfig := cfg.Retry for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = websocket.DefaultDialer.DialContext(ctx, url, headers) + conn, response, err = dialer.DialContext(ctx, url, headers) if err == nil { return conn, response, nil } @@ -239,7 +246,7 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } - return websocket.DefaultDialer.DialContext(ctx, url, headers) + return dialer.DialContext(ctx, url, headers) } // calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm. @@ -269,3 +276,45 @@ func (s *websocketStream) Close() error { s.metrics.Close() return nil } + +func createWebSocketDialer(cfg config) (*websocket.Dialer, error) { + var tlsConfig *tls.Config + dialer := &websocket.Dialer{ + // set default handshake timeout of 20 seconds + HandshakeTimeout: 20 * time.Second, + // no proxy by default + Proxy: http.ProxyFromEnvironment, + } + // override handshake timeout if timeout is set in the transport config + if cfg.Transport.Timeout != 0 { + dialer.HandshakeTimeout = cfg.Transport.Timeout + } + // load proxy configuration if available + if cfg.Transport.Proxy.URL != nil { + var proxy func(*http.Request) (*url.URL, error) + proxyURL, err := httpcommon.NewProxyURIFromString(cfg.Transport.Proxy.URL.String()) + if err != nil { + return nil, fmt.Errorf("failed to parse proxy URL: %w", err) + } + // create a custom HTTP Transport with proxy configuration + proxyTransport := &http.Transport{ + Proxy: http.ProxyURL(proxyURL.URI()), + ProxyConnectHeader: cfg.Transport.Proxy.Headers.Headers(), + } + dialer.NetDialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return proxyTransport.DialContext(ctx, network, addr) + } + dialer.Proxy = proxy + } + // load TLS config if available + if cfg.Transport.TLS != nil { + TLSConfig, err := tlscommon.LoadTLSConfig(cfg.Transport.TLS) + if err != nil { + return nil, fmt.Errorf("failed to load TLS config: %w", err) + } + tlsConfig = TLSConfig.ToConfig() + dialer.TLSClientConfig = tlsConfig + } + + return dialer, nil +} From 8a7e3883c6d2da5a3404d776075f6f9829bc8672 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Thu, 5 Dec 2024 18:45:14 +0530 Subject: [PATCH 2/7] added test to simulate TLS validation with necessary dummy certificates & keys --- x-pack/filebeat/input/streaming/input_test.go | 110 ++++++++++++++++++ .../input/streaming/testdata/certs/ca.crt | 21 ++++ .../input/streaming/testdata/certs/cert.pem | 21 ++++ .../input/streaming/testdata/certs/key.pem | 28 +++++ 4 files changed, 180 insertions(+) create mode 100644 x-pack/filebeat/input/streaming/testdata/certs/ca.crt create mode 100644 x-pack/filebeat/input/streaming/testdata/certs/cert.pem create mode 100644 x-pack/filebeat/input/streaming/testdata/certs/key.pem diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index 6d382bdf6645..f252594872c4 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -6,6 +6,7 @@ package streaming import ( "context" + "crypto/tls" "errors" "fmt" "net/http" @@ -450,6 +451,75 @@ var inputTests = []struct { }, wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), }, + { + name: "single_event_tls", + server: webSocketServerWithTLS(httptest.NewUnstartedServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "ssl": map[string]interface{}{ + "certificate_authorities": []string{"testdata/certs/ca.crt"}, + "certificate": "testdata/certs/cert.pem", + "key": "testdata/certs/key.pem", + }, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, } var urlEvalTests = []struct { @@ -771,6 +841,46 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t } } +// webSocketServerWithTLS returns a function that creates a WebSocket server with TLS. +func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { + return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { + server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("error upgrading connection to WebSocket: %v", err) + return + } + + handler(t, conn, response) + })) + //nolint:gosec // there is no need to use a secure cert for testing + server.TLS = &tls.Config{ + Certificates: []tls.Certificate{generateSelfSignedCert(t)}, + } + server.StartTLS() + + if config["url"] == nil { + config["url"] = "ws" + server.URL[4:] + } + t.Cleanup(server.Close) + } +} + +// generateSelfSignedCert returns a self-signed certificate for testing purposes based on the dummy certs in the testdata directory +func generateSelfSignedCert(t *testing.T) tls.Certificate { + cert, err := tls.LoadX509KeyPair("testdata/certs/cert.pem", "testdata/certs/key.pem") + if err != nil { + t.Fatalf("failed to generate self-signed cert: %v", err) + } + return cert +} + // defaultHandler is a default handler for WebSocket connections. func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { for _, r := range response { diff --git a/x-pack/filebeat/input/streaming/testdata/certs/ca.crt b/x-pack/filebeat/input/streaming/testdata/certs/ca.crt new file mode 100644 index 000000000000..43e187f1367f --- /dev/null +++ b/x-pack/filebeat/input/streaming/testdata/certs/ca.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0 +MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx +ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN +AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7 +HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB +twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm +vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN +LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL +sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD +VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn +cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB +AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd +UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25 +TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU +COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3 +z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV +5h0bTbbPOyEIe5ydEIbr5kA= +-----END CERTIFICATE----- diff --git a/x-pack/filebeat/input/streaming/testdata/certs/cert.pem b/x-pack/filebeat/input/streaming/testdata/certs/cert.pem new file mode 100644 index 000000000000..43e187f1367f --- /dev/null +++ b/x-pack/filebeat/input/streaming/testdata/certs/cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0 +MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx +ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN +AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7 +HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB +twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm +vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN +LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL +sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD +VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn +cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB +AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd +UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25 +TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU +COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3 +z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV +5h0bTbbPOyEIe5ydEIbr5kA= +-----END CERTIFICATE----- diff --git a/x-pack/filebeat/input/streaming/testdata/certs/key.pem b/x-pack/filebeat/input/streaming/testdata/certs/key.pem new file mode 100644 index 000000000000..1b6ce6bade3c --- /dev/null +++ b/x-pack/filebeat/input/streaming/testdata/certs/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC4RGhVahbmMEsB ++Bh4THqoYpQvrFlymkD/EJmh+x1kUk/CqOmE0/TYsVC+ELOTpswkBWr0TfarRKOM +LagiQThMl0BtcAB7i1iGPijggbcFIPHz/rmGmFDkDmDS5Ac+0i5Xo7mWE8hsXqSJ +HPlHiUXDhN+mpyDx6XNikq9x5ryicWVFaUBGyb0O6Xu/oiy+I0jn86UDXFfUv280 +6OrWmV7qPq8hD5bIZFPzBOVQjSxa1NwFtn8yltF2v7JJFuYswogFPCjRMvOFEDOm +mZCXxYvTt/eIve7BoGIPqt+ly7F22frAlxyNwhsfqcDsdIe0vJ+4iSewW5sDOkLi +xRemxWJJAgMBAAECggEAQprvf5hWaKQiKLcN2UYDvCPN3qGUv3kEb24HqmZDjIS4 +MeuuZQXcZgtJ3TnaP0+2UHro2x/nPqcT2tKSCLe8aurtLeGjOwT2XafQTL52clMj +Qgfb9cvOyXBtDS3BdLKyb5lNtvK1qn5XSPyBGpuC7RZ1ZR7aKLcyrvnIkwpNOwXW +zH5F6pI6HAUPfgYcHfIkQ5kuPCRcvfmv6m9XLYlmiQNkReQ2fWtFF6517R6FGtZu +Z8F0pFz8VtIGQoamX9vEwQhYqBK67msl9gnKjyH3ONckkSRMVagXrwx9F5o+NeRD +IgDFnjH1HgLCXmeCa7BN+eYfGMZ24xisItD7XBzGtQKBgQDlmEncIgpkmtXZSvXs +r5i7epJDDcxC8/ZObsn3zI01t4nmI9+phu7a4fAA+AUP7+HFVdi22JcHDkHZ5J1a +93t+Tcc4yzXk8FovaavRRvJNXv3WhHvgNpe1tvgyMUc8p9QpfbHuafVhQN2qWivh +nnEWagBoguiXaXEIFRXFK6dt5wKBgQDNdZYQ4/Am7HNjI2vqmMccZxuSufUX0xxM +LuDY8UAsPbgRN8wqfY67xCdMztax5y15gF9UPw0hHMlk4m2J4lsqCqgRRWXnTser +rNAseZ1j7MZY0cKACRxvXNtPKmmHKvYHUEZkADT/HrhfZWIce7KEXaxdoGTkssPd +9/WbahLITwKBgFBa2VbTDyIg0sGHK8UXu/O5tWEEfj3clpLi0YsJq05mmzvRyGDT +2dr/gnlEVLk8Mp9XKU7tRQZyJff1vGDvBuiwng4xiP5EZLv9VuYa14jeuyaOHbDe +SoCNthYTCySedHHFDTYtHXVZN3t8raj8RAYdOWFal78OZ0H15zWnzqR3AoGBAJne +mxFxM3RjFpNDftmFq3BpA6xiGdzK3OFtJjUykAXR/xzd9chImfGjGG+cZAt9/3+E +FWCpi7KltWoZbUGbRPz6WB3/JC8Tv9OhK5JzTdz9ARqZlRmAOUxpdVEXiUqScQjP +JLhVs1rw7dF7wvtj5DDfWmwP6B+iha+huM24pfJfAoGAUYeJQ1XqEx+0+b6xrtCm +qxPiiGnJSi4J9CGaZRMTG5qFSQ5jCaWTJSWdiZJgyBMnmQsiuPacefAg91z/NjJC +xM0/sKLe/yPWP1UlwrCl1MDjMwIl/qtWiKXYXpY1qcOONLCFc6OASd1cTOtX/7Km +2g49JWZEY71f7DxLdcWfqWM= +-----END PRIVATE KEY----- From 2a00984ff2c8f292851a0a05aa5aa2736736f71b Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 6 Dec 2024 19:58:40 +0530 Subject: [PATCH 3/7] added support for functionsl tls & forward proxy configurations for websocket --- x-pack/filebeat/input/streaming/config.go | 20 ++- .../filebeat/input/streaming/input_manager.go | 2 +- x-pack/filebeat/input/streaming/input_test.go | 141 +++++++++++++++++- x-pack/filebeat/input/streaming/websocket.go | 15 +- 4 files changed, 166 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 590289ae12ab..d6733e7312df 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -41,8 +41,8 @@ type config struct { Redact *redact `config:"redact"` // Retry is the configuration for retrying failed connections. Retry *retry `config:"retry"` - - Transport httpcommon.HTTPTransportSettings `config:",inline"` + // Transport is the HTTP transport configuration. + Transport transport `config:",inline"` // CrowdstrikeAppID is the value used to set the // appId request parameter in the FalconHose stream // discovery request. @@ -94,6 +94,11 @@ type urlConfig struct { *url.URL } +type transport struct { + httpcommon.HTTPTransportSettings `config:",inline"` + HandShakeTimeOut time.Duration `config:"handshake_timeout" validate:"min=10"` +} + func (u *urlConfig) Unpack(in string) error { parsed, err := url.Parse(in) if err != nil { @@ -165,3 +170,14 @@ func checkURLScheme(c config) error { return fmt.Errorf("unknown stream type: %s", c.Type) } } + +func defaultConfig() config { + return config{ + Transport: transport{ + HTTPTransportSettings: httpcommon.HTTPTransportSettings{ + Timeout: 90 * time.Second, + }, + HandShakeTimeOut: 20 * time.Second, + }, + } +} diff --git a/x-pack/filebeat/input/streaming/input_manager.go b/x-pack/filebeat/input/streaming/input_manager.go index f20b867755b2..c685452c34f1 100644 --- a/x-pack/filebeat/input/streaming/input_manager.go +++ b/x-pack/filebeat/input/streaming/input_manager.go @@ -34,7 +34,7 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage } func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) { - src := &source{cfg: config{}} + src := &source{cfg: defaultConfig()} if err := cfg.Unpack(&src.cfg); err != nil { return nil, nil, err } diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index f252594872c4..c11784ea3dbf 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -42,6 +42,7 @@ type WebSocketHandler func(*testing.T, *websocket.Conn, []string) var inputTests = []struct { name string server func(*testing.T, WebSocketHandler, map[string]interface{}, []string) + proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server handler WebSocketHandler config map[string]interface{} response []string @@ -461,6 +462,7 @@ var inputTests = []struct { "events": [inner_body], })`, "ssl": map[string]interface{}{ + "enabled": true, "certificate_authorities": []string{"testdata/certs/ca.crt"}, "certificate": "testdata/certs/cert.pem", "key": "testdata/certs/key.pem", @@ -520,6 +522,70 @@ var inputTests = []struct { }, }, }, + { + name: "basic_proxy_forwarding", + proxyServer: newWebSocketProxyTestServer, + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, } var urlEvalTests = []struct { @@ -630,6 +696,9 @@ func TestInput(t *testing.T) { if test.server != nil { test.server(t, test.handler, test.config, test.response) } + if test.proxyServer != nil { + test.proxyServer(t, test.handler, test.config, test.response) + } cfg := conf.MustNewConfigFrom(test.config) @@ -841,7 +910,7 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t } } -// webSocketServerWithTLS returns a function that creates a WebSocket server with TLS. +// webSocketServerWithTLS simulates a WebSocket server with TLS based authentication. func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -890,3 +959,73 @@ func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { } } } + +// webSocketTestServer creates a WebSocket target server that communicates with the proxy handler. +func webSocketTestServer(t *testing.T, handler WebSocketHandler, response []string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("failed to upgrade WebSocket connection: %v", err) + return + } + handler(t, conn, response) + })) +} + +// webSocketProxyHandler forwards WebSocket connections to the target server. +// +//nolint:errcheck //we can safely ignore errors checks here +func webSocketProxyHandler(targetURL string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Response.Body.Close() + //nolint:bodyclose // we can ignore the body close here + targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil) + if err != nil { + http.Error(w, "failed to connect to backend WebSocket server", http.StatusBadGateway) + return + } + defer targetConn.Close() + + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + clientConn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + http.Error(w, "failed to upgrade client connection", http.StatusInternalServerError) + return + } + defer clientConn.Close() + // forward messages between client and target server + go func() { + for { + messageType, message, err := targetConn.ReadMessage() + if err != nil { + break + } + clientConn.WriteMessage(messageType, message) + } + }() + for { + messageType, message, err := clientConn.ReadMessage() + if err != nil { + break + } + targetConn.WriteMessage(messageType, message) + } + } +} + +// newWebSocketProxyTestServer creates a proxy server forwarding WebSocket traffic. +func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) *httptest.Server { + backendServer := webSocketTestServer(t, handler, response) + config["url"] = "ws" + backendServer.URL[4:] + config["proxy_url"] = "ws" + backendServer.URL[4:] + return httptest.NewServer(webSocketProxyHandler(config["url"].(string))) +} diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index edaf2d9dff40..647ea655108c 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -235,6 +235,7 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log if err == nil { return conn, response, nil } + //nolint:errorlint // it will never be a wrapped error at this point if err == websocket.ErrBadHandshake { log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) continue @@ -280,15 +281,10 @@ func (s *websocketStream) Close() error { func createWebSocketDialer(cfg config) (*websocket.Dialer, error) { var tlsConfig *tls.Config dialer := &websocket.Dialer{ - // set default handshake timeout of 20 seconds - HandshakeTimeout: 20 * time.Second, - // no proxy by default - Proxy: http.ProxyFromEnvironment, - } - // override handshake timeout if timeout is set in the transport config - if cfg.Transport.Timeout != 0 { - dialer.HandshakeTimeout = cfg.Transport.Timeout + HandshakeTimeout: cfg.Transport.HandShakeTimeOut, + Proxy: http.ProxyFromEnvironment, } + // load proxy configuration if available if cfg.Transport.Proxy.URL != nil { var proxy func(*http.Request) (*url.URL, error) @@ -300,6 +296,9 @@ func createWebSocketDialer(cfg config) (*websocket.Dialer, error) { proxyTransport := &http.Transport{ Proxy: http.ProxyURL(proxyURL.URI()), ProxyConnectHeader: cfg.Transport.Proxy.Headers.Headers(), + DialContext: (&net.Dialer{ + Timeout: cfg.Transport.HTTPTransportSettings.Timeout, + }).DialContext, } dialer.NetDialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { return proxyTransport.DialContext(ctx, network, addr) From 151f674d1f02f198f75059017fac327530a14ed3 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 6 Dec 2024 21:16:16 +0530 Subject: [PATCH 4/7] updated docs --- .../docs/inputs/input-streaming.asciidoc | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index b80deda9c778..f1100e27f75f 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -349,6 +349,33 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. +[float] +==== `handshake_timeout` +This specifies the time to wait for the `websocket` handshake and the `http.Upgrade()` operation to complete. This timeout occurs at the application layer and not at the TCP layer. The `default value` is `20` seconds. This setting is specific to the `websocket` streaming input type only. + +[float] +=== `timeout` +This specifies the timeout that occurs after the initial handshake has taken place. This timeout occurs at the TCP layer and deals with the time taken to establish the actual tcp connection. The `default value` is `90` seconds. + +[float] +==== `proxy_url` +This specifies the forward proxy URL to use for the connection. The `proxy_url` configuration is optional and can be used to configure the proxy settings for the connection. The `proxy_url` default value is set by `http.ProxyFromEnvironment` which reads the `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables. + +[float] +==== `proxy_headers` +This specifies the headers to be sent to the proxy server. The `proxy_headers` configuration is optional and can be used to configure the headers to be sent to the proxy server. + +[float] +==== `ssl` +This specifies the SSL configuration for the connection. The `ssl` configuration is optional and can be used to configure the SSL settings for the connection. The `ssl` configuration has the following subfields: + + - `certificate_authorities`: A list of root certificates to use for verifying the server's certificate. + - `certificate`: The (PEM encoded) certificate to use for client authentication. + - `key`: The (PEM encoded) private key to use for client authentication. + +If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself. + + [float] === Metrics From 3aa901d0d091feb33fbf23b6dcda6cff7f251107 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 6 Dec 2024 21:19:57 +0530 Subject: [PATCH 5/7] updated changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a846236e934b..93ff52ecb9f3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -357,6 +357,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] - Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] +- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934] *Auditbeat* From 9d86b0bed9311143b3062580870a8f6de515d1f7 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Fri, 6 Dec 2024 21:21:26 +0530 Subject: [PATCH 6/7] removed extra newline --- x-pack/filebeat/docs/inputs/input-streaming.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index f1100e27f75f..eb407f9da5e5 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -375,7 +375,6 @@ This specifies the SSL configuration for the connection. The `ssl` configuration If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself. - [float] === Metrics From b2fc1c15bc99bd367353e8a093b44910b9a59f24 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 10 Dec 2024 10:02:47 +0530 Subject: [PATCH 7/7] Adressed dan's PR suggestions --- .../docs/inputs/input-streaming.asciidoc | 6 +----- x-pack/filebeat/input/streaming/config.go | 16 ++++------------ x-pack/filebeat/input/streaming/websocket.go | 5 ++--- 3 files changed, 7 insertions(+), 20 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index eb407f9da5e5..9a6f67e5bc49 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -349,13 +349,9 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. -[float] -==== `handshake_timeout` -This specifies the time to wait for the `websocket` handshake and the `http.Upgrade()` operation to complete. This timeout occurs at the application layer and not at the TCP layer. The `default value` is `20` seconds. This setting is specific to the `websocket` streaming input type only. - [float] === `timeout` -This specifies the timeout that occurs after the initial handshake has taken place. This timeout occurs at the TCP layer and deals with the time taken to establish the actual tcp connection. The `default value` is `90` seconds. +Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. [float] ==== `proxy_url` diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index d6733e7312df..6ccaf0c73493 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -41,8 +41,8 @@ type config struct { Redact *redact `config:"redact"` // Retry is the configuration for retrying failed connections. Retry *retry `config:"retry"` - // Transport is the HTTP transport configuration. - Transport transport `config:",inline"` + // Transport is the common the transport config. + Transport httpcommon.HTTPTransportSettings `config:",inline"` // CrowdstrikeAppID is the value used to set the // appId request parameter in the FalconHose stream // discovery request. @@ -94,11 +94,6 @@ type urlConfig struct { *url.URL } -type transport struct { - httpcommon.HTTPTransportSettings `config:",inline"` - HandShakeTimeOut time.Duration `config:"handshake_timeout" validate:"min=10"` -} - func (u *urlConfig) Unpack(in string) error { parsed, err := url.Parse(in) if err != nil { @@ -173,11 +168,8 @@ func checkURLScheme(c config) error { func defaultConfig() config { return config{ - Transport: transport{ - HTTPTransportSettings: httpcommon.HTTPTransportSettings{ - Timeout: 90 * time.Second, - }, - HandShakeTimeOut: 20 * time.Second, + Transport: httpcommon.HTTPTransportSettings{ + Timeout: 180 * time.Second, }, } } diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 647ea655108c..8a78757f0e96 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -281,8 +281,7 @@ func (s *websocketStream) Close() error { func createWebSocketDialer(cfg config) (*websocket.Dialer, error) { var tlsConfig *tls.Config dialer := &websocket.Dialer{ - HandshakeTimeout: cfg.Transport.HandShakeTimeOut, - Proxy: http.ProxyFromEnvironment, + Proxy: http.ProxyFromEnvironment, } // load proxy configuration if available @@ -297,7 +296,7 @@ func createWebSocketDialer(cfg config) (*websocket.Dialer, error) { Proxy: http.ProxyURL(proxyURL.URI()), ProxyConnectHeader: cfg.Transport.Proxy.Headers.Headers(), DialContext: (&net.Dialer{ - Timeout: cfg.Transport.HTTPTransportSettings.Timeout, + Timeout: cfg.Transport.Timeout, }).DialContext, } dialer.NetDialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {