From b22e7e1c23a7624a291ef5d7b71763ee5832b70e Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Fri, 18 Dec 2020 15:59:02 +0100 Subject: [PATCH 1/5] Add pubsub_alternative_host to gcp pubsub input --- x-pack/filebeat/input/gcppubsub/config.go | 2 ++ x-pack/filebeat/input/gcppubsub/input.go | 30 ++++++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/gcppubsub/config.go b/x-pack/filebeat/input/gcppubsub/config.go index fdb4c30dfeec..827e4e502825 100644 --- a/x-pack/filebeat/input/gcppubsub/config.go +++ b/x-pack/filebeat/input/gcppubsub/config.go @@ -36,6 +36,8 @@ type config struct { // JSON blob containing authentication credentials and key. CredentialsJSON []byte `config:"credentials_json"` + + PubsubAlternativeHost string `config:"pubsub_alternative_host"` } func (c *config) Validate() error { diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index 69e8bb64cc26..9f804dc6cb5a 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -8,9 +8,12 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "sync" "time" + "google.golang.org/grpc" + "cloud.google.com/go/pubsub" "github.com/pkg/errors" "google.golang.org/api/option" @@ -147,15 +150,7 @@ func (in *pubsubInput) run() error { ctx, cancel := context.WithCancel(in.workerCtx) defer cancel() - // Make pubsub client. - opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat"))} - if in.CredentialsFile != "" { - opts = append(opts, option.WithCredentialsFile(in.CredentialsFile)) - } else if len(in.CredentialsJSON) > 0 { - option.WithCredentialsJSON(in.CredentialsJSON) - } - - client, err := pubsub.NewClient(ctx, in.ProjectID, opts...) + client, err := in.newPubsubClient(ctx) if err != nil { return err } @@ -250,3 +245,20 @@ func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubs return nil, errors.New("no subscription exists and 'subscription.create' is not enabled") } + +func (in *pubsubInput) newPubsubClient(ctx context.Context) (*pubsub.Client, error) { + opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat"))} + if in.PubsubAlternativeHost != "" { + // this will be typically set because we want to point the input to a testing pubsub emulator + conn, err := grpc.Dial(in.PubsubAlternativeHost, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("grpc.Dial: %v", err) + } + opts = append(opts, option.WithGRPCConn(conn), option.WithTelemetryDisabled()) + } else if in.CredentialsFile != "" { + opts = append(opts, option.WithCredentialsFile(in.CredentialsFile)) + } else if len(in.CredentialsJSON) > 0 { + opts = append(opts, option.WithCredentialsJSON(in.CredentialsJSON)) + } + return pubsub.NewClient(ctx, in.ProjectID, opts...) +} From b8af3f89ac76ef2db856c14366c30aa9e85f5759 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Dec 2020 09:12:58 +0100 Subject: [PATCH 2/5] Apply suggestions --- x-pack/filebeat/input/gcppubsub/config.go | 2 +- x-pack/filebeat/input/gcppubsub/input.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/x-pack/filebeat/input/gcppubsub/config.go b/x-pack/filebeat/input/gcppubsub/config.go index 827e4e502825..06792e7684f0 100644 --- a/x-pack/filebeat/input/gcppubsub/config.go +++ b/x-pack/filebeat/input/gcppubsub/config.go @@ -37,7 +37,7 @@ type config struct { // JSON blob containing authentication credentials and key. CredentialsJSON []byte `config:"credentials_json"` - PubsubAlternativeHost string `config:"pubsub_alternative_host"` + AlternativeHost string `config:"alternative_host"` } func (c *config) Validate() error { diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index 9f804dc6cb5a..c59c02c48d51 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -248,17 +248,21 @@ func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubs func (in *pubsubInput) newPubsubClient(ctx context.Context) (*pubsub.Client, error) { opts := []option.ClientOption{option.WithUserAgent(useragent.UserAgent("Filebeat"))} - if in.PubsubAlternativeHost != "" { + + if in.AlternativeHost != "" { // this will be typically set because we want to point the input to a testing pubsub emulator - conn, err := grpc.Dial(in.PubsubAlternativeHost, grpc.WithInsecure()) + conn, err := grpc.Dial(in.AlternativeHost, grpc.WithInsecure()) if err != nil { - return nil, fmt.Errorf("grpc.Dial: %v", err) + return nil, fmt.Errorf("grpc.Dial: %w", err) } opts = append(opts, option.WithGRPCConn(conn), option.WithTelemetryDisabled()) - } else if in.CredentialsFile != "" { + } + + if in.CredentialsFile != "" { opts = append(opts, option.WithCredentialsFile(in.CredentialsFile)) } else if len(in.CredentialsJSON) > 0 { opts = append(opts, option.WithCredentialsJSON(in.CredentialsJSON)) } + return pubsub.NewClient(ctx, in.ProjectID, opts...) } From 951ffdb29d17289ad7ccbfd995f69b5b2d73bf5e Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Dec 2020 09:16:05 +0100 Subject: [PATCH 3/5] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4268362a21f1..9c2f888ea1fd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -786,6 +786,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Misp improvements: Migration to httpjson v2 config, pagination and deduplication ID {pull}23070[23070] - Add Google Workspace module and mark Gsuite module as deprecated {pull}22950[22950] - Mark m365 defender, defender atp, okta and google workspace modules as GA {pull}23113[23113] +- Added `alternative_host` option to google pubsub input {pull}23215[23215] *Heartbeat* From fef6689fddcd917bcb9a7e571b756aa454bd95f1 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Dec 2020 09:18:28 +0100 Subject: [PATCH 4/5] Add new option comment --- x-pack/filebeat/input/gcppubsub/config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/filebeat/input/gcppubsub/config.go b/x-pack/filebeat/input/gcppubsub/config.go index 06792e7684f0..ec49368c0d9c 100644 --- a/x-pack/filebeat/input/gcppubsub/config.go +++ b/x-pack/filebeat/input/gcppubsub/config.go @@ -37,6 +37,7 @@ type config struct { // JSON blob containing authentication credentials and key. CredentialsJSON []byte `config:"credentials_json"` + // Overrides the default Pub/Sub service address and disables TLS. For testing. AlternativeHost string `config:"alternative_host"` } From 5e9b6741ff5f927fd81f4b1224cf5e4c918daab8 Mon Sep 17 00:00:00 2001 From: Marc Guasch Date: Mon, 21 Dec 2020 12:20:19 +0100 Subject: [PATCH 5/5] Make error more descriptive and reorder imports --- x-pack/filebeat/input/gcppubsub/input.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/gcppubsub/input.go b/x-pack/filebeat/input/gcppubsub/input.go index c59c02c48d51..d71af4bb2d47 100644 --- a/x-pack/filebeat/input/gcppubsub/input.go +++ b/x-pack/filebeat/input/gcppubsub/input.go @@ -12,11 +12,10 @@ import ( "sync" "time" - "google.golang.org/grpc" - "cloud.google.com/go/pubsub" "github.com/pkg/errors" "google.golang.org/api/option" + "google.golang.org/grpc" "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" @@ -253,7 +252,7 @@ func (in *pubsubInput) newPubsubClient(ctx context.Context) (*pubsub.Client, err // this will be typically set because we want to point the input to a testing pubsub emulator conn, err := grpc.Dial(in.AlternativeHost, grpc.WithInsecure()) if err != nil { - return nil, fmt.Errorf("grpc.Dial: %w", err) + return nil, fmt.Errorf("cannot connect to alternative host %q: %w", in.AlternativeHost, err) } opts = append(opts, option.WithGRPCConn(conn), option.WithTelemetryDisabled()) }