diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index 28680cce7..34301bb55 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -34,7 +34,7 @@ default-retention-policy = "" # Password for basic user authorization when using meta API. meta-username must also be set. # meta-password = "kapapass" - # Shared secret for JWT bearer token authentication when using meta API. + # Shared secret for JWT bearer token authentication when using meta API. # If this is set, then the `meta-username` and `meta-password` settings are ignored. # This should match the `[meta] internal-shared-secret` setting on the meta nodes. # meta-internal-shared-secret = "MyVoiceIsMyPassport" @@ -573,27 +573,46 @@ default-retention-policy = "" # Use SSL but skip chain & host verification insecure-skip-verify = false ## Optional SASL Config - # sasl_username = "kafka" - # sasl_password = "secret" + # sasl-username = "kafka" + # sasl-password = "secret" + ## Arbitrary key value string pairs to pass as a TOML table. For example: + # {logicalCluster = "cluster-042", poolId = "pool-027"} + # sasl-extensions = {} ## Optional SASL: ## one of: OAUTHBEARER, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI ## (defaults to PLAIN) - # sasl_mechanism = "" + # sasl-mechanism = "" ## used if sasl_mechanism is GSSAPI - # sasl_gssapi_service_name = "" + # sasl-gssapi-service-name = "" # ## One of: KRB5_USER_AUTH and KRB5_KEYTAB_AUTH - # sasl_gssapi_auth_type = "KRB5_USER_AUTH" - # sasl_gssapi_kerberos_config_path = "/" - # sasl_gssapi_realm = "realm" - # sasl_gssapi_key_tab_path = "" - # sasl_gssapi_disable_pafxfast = false - ## Access token used if sasl_mechanism is OAUTHBEARER - # sasl_access_token = "" - ## Arbitrary key value string pairs to pass as a TOML table. For example: - # {logicalCluster = "cluster-042", poolId = "pool-027"} - # sasl_extensions = {} + # sasl-gssapi-auth-type = "KRB5_USER_AUTH" + # sasl-gssapi-kerberos-config-path = "/" + # sasl-gssapi-realm = "realm" + # sasl-gssapi-key-tab-path = "" + # sasl-gssapi-disable-pafxfast = false + ## Options if sasl-mechanism is OAUTHBEARER + ## The service name to use when authenticating with SASL/OAUTH. + # ## One of: "" or custom, auth0, azuread + # sasl-oauth-service = "" + ## The client ID to use when authenticating with SASL/OAUTH. + # sasl-oauth-client-id = "" + ## The client secret to use when authenticating with SASL/OAUTH. + # sasl-oauth-client-secret = "" + ## The token URL to use when sasl-oauth-service is custom or auth0. Leave empty otherwise. + # sasl-oauth-token-url = "" + ## The margin for the token's expiration time. + # sasl-oauth-token-expiry-margin = "10s" + ## Optional scopes to use when authenticating with SASL/OAUTH. + # sasl-oauth-scopes = "" + ## Tenant ID for the AzureAD service. + # sasl-oauth-tenant-id = "" + ## The optional params for SASL/OAUTH. e.g. audience for AUTH0 + [kafka.sasl-oauth-parameters] + # audience = "" + ## Static OAUTH token. Use this instead of other OAUTH params. + # sasl-access-token = "" ## SASL protocol version. When connecting to Azure EventHub set to 0. - # sasl_version = 1 + # sasl-version = 1 [alerta] # Configure Alerta. diff --git a/go.mod b/go.mod index ef6f5598a..73f1487d6 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,7 @@ require ( cloud.google.com/go/bigquery v1.50.0 // indirect cloud.google.com/go/bigtable v1.10.1 // indirect cloud.google.com/go/compute v1.19.1 // indirect - cloud.google.com/go/compute/metadata v0.2.3 // indirect + cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v0.13.0 // indirect cloud.google.com/go/longrunning v0.4.1 // indirect collectd.org v0.3.0 // indirect @@ -242,7 +242,7 @@ require ( golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.28.0 // indirect - golang.org/x/oauth2 v0.7.0 // indirect + golang.org/x/oauth2 v0.23.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.23.0 // indirect golang.org/x/term v0.23.0 // indirect diff --git a/go.sum b/go.sum index f32e2fe71..d287119de 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvu cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IKd5/kvShxE= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= +cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/datacatalog v1.13.0 h1:4H5IJiyUE0X6ShQBqgFFZvGGcrwGVndTwUSLP4c52gw= cloud.google.com/go/datacatalog v1.13.0/go.mod h1:E4Rj9a5ZtAxcQJlEBTLgMTphfP11/lNaAshpoBgemX8= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= @@ -1623,6 +1625,8 @@ golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= +golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= +golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/services/kafka/config.go b/services/kafka/config.go index aad24a74b..a2f270f00 100644 --- a/services/kafka/config.go +++ b/services/kafka/config.go @@ -12,10 +12,11 @@ import ( ) const ( - DefaultTimeout = 10 * time.Second - DefaultBatchSize = 100 - DefaultBatchTimeout = 1 * time.Second - DefaultID = "default" + DefaultTimeout = 10 * time.Second + DefaultBatchSize = 100 + DefaultBatchTimeout = 1 * time.Second + DefaultID = "default" + DefaultSASLOAUTHExpiryMargin = 10 * time.Second ) type Config struct { @@ -49,7 +50,7 @@ type Config struct { } func NewConfig() Config { - return Config{ID: DefaultID} + return Config{ID: DefaultID, SASLAuth: SASLAuth{SASLOAUTHExpiryMargin: DefaultSASLOAUTHExpiryMargin}} } func (c Config) Validate() error { @@ -63,7 +64,7 @@ func (c Config) Validate() error { if len(c.Brokers) == 0 { return errors.New("no brokers specified, must provide at least one broker URL") } - return nil + return c.SASLAuth.Validate() } func (c *Config) ApplyConditionalDefaults() { @@ -78,17 +79,27 @@ func (c *Config) ApplyConditionalDefaults() { } } +type Closer interface { + Close() +} + +type WriterConfig struct { + // additional resource to close + Closer Closer + Config *kafka.Config +} + type WriteTarget struct { Topic string PartitionById bool PartitionAlgorithm string } -func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka.Config, error) { +func (c Config) writerConfig(target WriteTarget) (*WriterConfig, error) { cfg := kafka.NewConfig() if target.Topic == "" { - return cfg, errors.New("topic must not be empty") + return &WriterConfig{nil, cfg}, errors.New("topic must not be empty") } var partitioner kafka.PartitionerConstructor if target.PartitionById { @@ -104,7 +115,7 @@ func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka. case "fnv-1a": partitioner = kafka.NewHashPartitioner default: - return cfg, fmt.Errorf("invalid partition algorithm: %q", target.PartitionAlgorithm) + return &WriterConfig{nil, cfg}, fmt.Errorf("invalid partition algorithm: %q", target.PartitionAlgorithm) } cfg.Producer.Partitioner = partitioner } @@ -135,10 +146,11 @@ func (c Config) writerConfig(diagnostic Diagnostic, target WriteTarget) (*kafka. cfg.Producer.Flush.Frequency = time.Duration(c.BatchTimeout) // SASL - if err := c.SASLAuth.SetSASLConfig(cfg); err != nil { + if o, err := c.SASLAuth.SetSASLConfig(cfg); err != nil { return nil, err + } else { + return &WriterConfig{o, cfg}, cfg.Validate() } - return cfg, cfg.Validate() } type Configs []Config diff --git a/services/kafka/sasl.go b/services/kafka/sasl.go index 8d5b82523..15991d028 100644 --- a/services/kafka/sasl.go +++ b/services/kafka/sasl.go @@ -1,9 +1,18 @@ package kafka import ( + "context" "errors" + "net/url" + "strings" + "time" - sarama "github.com/IBM/sarama" + "golang.org/x/oauth2/endpoints" + + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" + + kafka "github.com/IBM/sarama" ) type SASLAuth struct { @@ -21,31 +30,121 @@ type SASLAuth struct { SASLGSSAPIKeyTabPath string `toml:"sasl-gssapi-key-tab-path" override:"sasl-gssapi-key-tab-path"` SASLGSSAPIRealm string `toml:"sasl-gssapi-realm" override:"sasl-gssapi-realm"` - // OAUTHBEARER config. experimental. undoubtedly this is not good enough. + // OAUTHBEARER config + // Service name for OAuth2 token endpoint: empty or custom, auth0, azuread + SASLOAUTHService string `toml:"sasl-oauth-service" override:"sasl-oauth-service"` + SASLOAUTHClientID string `toml:"sasl-oauth-client-id" override:"sasl-oauth-client-id"` + SASLOAUTHClientSecret string `toml:"sasl-oauth-client-secret" override:"sasl-oauth-client-secret"` + SASLOAUTHTokenURL string `toml:"sasl-oauth-token-url" override:"sasl-oauth-token-url"` + SASLOAUTHScopes []string `toml:"sasl-oauth-scopes" override:"sasl-oauth-scopes"` + SASLOAUTHParams map[string]string `toml:"sasl-oauth-parameters" override:"sasl-oauth-parameters"` + SASLOAUTHExpiryMargin time.Duration `toml:"sasl-oauth-token-expiry-margin" override:"sasl-oauth-token-expiry-margin"` + // Static token, if set it will override the token source. SASLAccessToken string `toml:"sasl-access-token" override:"sasl-access-token"` + // Tenant ID for AzureAD + SASLOAUTHTenant string `toml:"sasl-oauth-tenant-id" override:"sasl-oauth-tenant-id"` +} + +func (k *SASLAuth) Validate() error { + switch k.SASLMechanism { + case "", kafka.SASLTypeSCRAMSHA256, kafka.SASLTypeSCRAMSHA512, kafka.SASLTypeGSSAPI, kafka.SASLTypePlaintext: + return nil + case kafka.SASLTypeOAuth: + if k.SASLAccessToken != "" && (k.SASLOAUTHService != "" || k.SASLOAUTHTokenURL != "") { + return errors.New("cannot set 'sasl-access-token' with 'sasl-oauth-service' and 'sasl-oauth-token-url'") + } + if k.SASLOAUTHClientID == "" || k.SASLOAUTHClientSecret == "" { + return errors.New("'sasl-oauth-client-id' and 'sasl-oauth-client-secret' are required") + } + service := strings.ToLower(k.SASLOAUTHService) + switch service { + case "", "custom": + if k.SASLOAUTHTokenURL == "" { + return errors.New("'sasl-oauth-token-url' required for custom service") + } + case "auth0": + if k.SASLOAUTHTokenURL == "" { + return errors.New("'sasl-oauth-token-url' required for Auth0") + } + if audience := k.SASLOAUTHParams["audience"]; audience == "" { + return errors.New("'audience' parameter is required for Auth0") + } + case "azuread": + if k.SASLOAUTHTenant == "" { + return errors.New("'sasl-oauth-tenant-id' required for AzureAD") + } + if k.SASLOAUTHTokenURL != "" { + return errors.New("'sasl-oauth-token-url' cannot be set for service " + k.SASLOAUTHService) + } + default: + return errors.New("service " + k.SASLOAUTHService + " not supported") + } + default: + return errors.New("invalid sasl-mechanism") + } + return nil } // SetSASLConfig configures SASL for kafka (sarama) -// We mutate instead of returning the appropriate struct, because sarama.NewConfig() already populates certain defaults +// We mutate instead of returning the appropriate struct, because kafka.NewConfig() already populates certain defaults // that we do not want to disrupt. -func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error { +func (k *SASLAuth) SetSASLConfig(config *kafka.Config) (Closer, error) { + config.Net.SASL.User = k.SASLUsername config.Net.SASL.Password = k.SASLPassword + var c Closer if k.SASLMechanism != "" { - config.Net.SASL.Mechanism = sarama.SASLMechanism(k.SASLMechanism) + config.Net.SASL.Mechanism = kafka.SASLMechanism(k.SASLMechanism) switch config.Net.SASL.Mechanism { - case sarama.SASLTypeSCRAMSHA256: - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + case kafka.SASLTypeSCRAMSHA256: + config.Net.SASL.SCRAMClientGeneratorFunc = func() kafka.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } - case sarama.SASLTypeSCRAMSHA512: - config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + case kafka.SASLTypeSCRAMSHA512: + config.Net.SASL.SCRAMClientGeneratorFunc = func() kafka.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } - case sarama.SASLTypeOAuth: - config.Net.SASL.TokenProvider = k // use self as token provider. - case sarama.SASLTypeGSSAPI: + case kafka.SASLTypeOAuth: + if k.SASLAccessToken != "" { + config.Net.SASL.TokenProvider = NewStaticToken(k.SASLAccessToken, k.SASLExtensions) + break + } + var endpoint oauth2.Endpoint + service := strings.ToLower(k.SASLOAUTHService) + switch service { + case "", "custom": + endpoint = oauth2.Endpoint{ + TokenURL: k.SASLOAUTHTokenURL, + AuthStyle: oauth2.AuthStyleAutoDetect, + } + case "auth0": + endpoint = oauth2.Endpoint{ + TokenURL: k.SASLOAUTHTokenURL, + AuthStyle: oauth2.AuthStyleInParams, + } + case "azuread": + endpoint = endpoints.AzureAD(k.SASLOAUTHTenant) + } + cfg := &clientcredentials.Config{ + ClientID: k.SASLOAUTHClientID, + ClientSecret: k.SASLOAUTHClientSecret, + TokenURL: endpoint.TokenURL, + Scopes: k.SASLOAUTHScopes, + AuthStyle: endpoint.AuthStyle, + EndpointParams: url.Values{}, + } + for k, v := range k.SASLOAUTHParams { + cfg.EndpointParams.Add(k, v) + } + ctx, cancel := context.WithCancel(context.Background()) + src := cfg.TokenSource(ctx) + source := oauth2.ReuseTokenSourceWithExpiry(nil, src, k.SASLOAUTHExpiryMargin) + r := NewRefreshingToken(source, cancel, k.SASLExtensions) + config.Net.SASL.TokenProvider = r + c = r + + case kafka.SASLTypeGSSAPI: config.Net.SASL.GSSAPI.ServiceName = k.SASLGSSAPIServiceName config.Net.SASL.GSSAPI.AuthType = gssapiAuthType(k.SASLGSSAPIAuthType) config.Net.SASL.GSSAPI.Username = k.SASLUsername @@ -54,8 +153,7 @@ func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error { config.Net.SASL.GSSAPI.KerberosConfigPath = k.SASLGSSAPIKerberosConfigPath config.Net.SASL.GSSAPI.KeyTabPath = k.SASLGSSAPIKeyTabPath config.Net.SASL.GSSAPI.Realm = k.SASLGSSAPIRealm - - case sarama.SASLTypePlaintext: + case kafka.SASLTypePlaintext: // nothing. default: } @@ -66,34 +164,26 @@ func (k *SASLAuth) SetSASLConfig(config *sarama.Config) error { version, err := SASLVersion(config.Version, k.SASLVersion) if err != nil { - return err + return nil, err } config.Net.SASL.Version = version } - return nil -} - -// Token does nothing smart, it just grabs a hard-coded token from config. -func (k *SASLAuth) Token() (*sarama.AccessToken, error) { - return &sarama.AccessToken{ - Token: k.SASLAccessToken, - Extensions: k.SASLExtensions, - }, nil + return c, nil } -func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, error) { +func SASLVersion(kafkaVersion kafka.KafkaVersion, saslVersion *int) (int16, error) { if saslVersion == nil { - if kafkaVersion.IsAtLeast(sarama.V1_0_0_0) { - return sarama.SASLHandshakeV1, nil + if kafkaVersion.IsAtLeast(kafka.V1_0_0_0) { + return kafka.SASLHandshakeV1, nil } - return sarama.SASLHandshakeV0, nil + return kafka.SASLHandshakeV0, nil } switch *saslVersion { case 0: - return sarama.SASLHandshakeV0, nil + return kafka.SASLHandshakeV0, nil case 1: - return sarama.SASLHandshakeV1, nil + return kafka.SASLHandshakeV1, nil default: return 0, errors.New("invalid SASL version") } @@ -102,9 +192,9 @@ func SASLVersion(kafkaVersion sarama.KafkaVersion, saslVersion *int) (int16, err func gssapiAuthType(authType string) int { switch authType { case "KRB5_USER_AUTH": - return sarama.KRB5_USER_AUTH + return kafka.KRB5_USER_AUTH case "KRB5_KEYTAB_AUTH": - return sarama.KRB5_KEYTAB_AUTH + return kafka.KRB5_KEYTAB_AUTH default: return 0 } diff --git a/services/kafka/sasl_test.go b/services/kafka/sasl_test.go new file mode 100644 index 000000000..4860fadb3 --- /dev/null +++ b/services/kafka/sasl_test.go @@ -0,0 +1,94 @@ +package kafka + +import "testing" + +func TestSASLAuth_Validate(t *testing.T) { + tests := []struct { + name string + auth SASLAuth + wantErr bool + }{ + { + name: "Ignore empty SASL mechanism", + auth: SASLAuth{SASLMechanism: ""}, + wantErr: false, + }, + { + name: "Invalid SASL mechanism", + auth: SASLAuth{SASLMechanism: "mech"}, + wantErr: true, + }, + { + name: "Missing client ID", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "", SASLOAUTHClientSecret: "secret"}, + wantErr: true, + }, + { + name: "Missing client secret", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: ""}, + wantErr: true, + }, + { + name: "Invalid service", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "auth"}, + wantErr: true, + }, + { + name: "Missing token url custom", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "custom", SASLOAUTHTokenURL: ""}, + wantErr: true, + }, + { + name: "Missing token url empty (custom)", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "", SASLOAUTHTokenURL: ""}, + wantErr: true, + }, + { + name: "Ok custom", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "custom", SASLOAUTHTokenURL: "url"}, + wantErr: false, + }, + { + name: "Ok custom (empty)", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "", SASLOAUTHTokenURL: "url"}, + wantErr: false, + }, + { + name: "Missing token url", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "auth0", SASLOAUTHTokenURL: ""}, + wantErr: true, + }, + { + name: "Missing auth0 audience", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "auth0", SASLOAUTHTokenURL: "url"}, + wantErr: true, + }, + { + name: "Ok auth0 ", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "auth0", SASLOAUTHTokenURL: "url", SASLOAUTHParams: map[string]string{"audience": "aud"}}, + wantErr: false, + }, + { + name: "Azure OK", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "azuread", SASLOAUTHTenant: "tenant"}, + wantErr: false, + }, + { + name: "Azure missing tenant", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "azuread"}, + wantErr: true, + }, + { + name: "Azure Redundant token url", + auth: SASLAuth{SASLMechanism: "OAUTHBEARER", SASLOAUTHClientID: "id", SASLOAUTHClientSecret: "secret", SASLOAUTHService: "azuread", SASLOAUTHTokenURL: "url"}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.auth.Validate(); (err != nil) != tt.wantErr { + t.Errorf("SASLAuth.Validate() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/services/kafka/service.go b/services/kafka/service.go index 7a5cec1ea..642969a47 100644 --- a/services/kafka/service.go +++ b/services/kafka/service.go @@ -17,7 +17,7 @@ import ( "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/server/vars" "github.com/pkg/errors" - metrics "github.com/rcrowley/go-metrics" + "github.com/rcrowley/go-metrics" ) const ( @@ -54,6 +54,8 @@ type writer struct { statsKey string done chan struct{} + + closer Closer } func (w *writer) Open() { @@ -83,6 +85,9 @@ func (w *writer) Close() { close(w.done) vars.DeleteStatistic(w.statsKey) + if w.closer != nil { + w.closer.Close() + } err := w.kafka.Close() if err != nil { @@ -162,11 +167,11 @@ func (c *Cluster) writer(target WriteTarget, diagnostic Diagnostic) (*writer, er defer c.mu.Unlock() w, ok = c.writers[topic] if !ok { - wc, err := c.cfg.writerConfig(diagnostic, target) + wc, err := c.cfg.writerConfig(target) if err != nil { return nil, err } - kp, err := kafka.NewAsyncProducer(c.cfg.Brokers, wc) + kp, err := kafka.NewAsyncProducer(c.cfg.Brokers, wc.Config) if err != nil { return nil, err @@ -174,11 +179,12 @@ func (c *Cluster) writer(target WriteTarget, diagnostic Diagnostic) (*writer, er // Create new writer w = &writer{ - requestsInFlightMetric: metrics.GetOrRegisterCounter("requests-in-flight", wc.MetricRegistry), + requestsInFlightMetric: metrics.GetOrRegisterCounter("requests-in-flight", wc.Config.MetricRegistry), kafka: kp, cluster: c.cfg.ID, topic: topic, diagnostic: diagnostic, + closer: wc.Closer, } w.Open() c.writers[topic] = w diff --git a/services/kafka/token.go b/services/kafka/token.go new file mode 100644 index 000000000..76e0584d4 --- /dev/null +++ b/services/kafka/token.go @@ -0,0 +1,57 @@ +package kafka + +import ( + "context" + + kafka "github.com/IBM/sarama" + "golang.org/x/oauth2" +) + +type RefreshingToken struct { + source oauth2.TokenSource + cancel context.CancelFunc + extensions map[string]string +} + +type StaticToken struct { + token string + extensions map[string]string +} + +func NewRefreshingToken(source oauth2.TokenSource, cancel context.CancelFunc, extensions map[string]string) *RefreshingToken { + return &RefreshingToken{ + source: source, + cancel: cancel, + extensions: extensions, + } +} + +func (k *RefreshingToken) Token() (*kafka.AccessToken, error) { + token, err := k.source.Token() + if err != nil { + return nil, err + } + return &kafka.AccessToken{ + Token: token.AccessToken, + Extensions: k.extensions, + }, nil +} + +func (k *RefreshingToken) Close() { + // canceling the token refresh + k.cancel() +} + +func NewStaticToken(token string, extensions map[string]string) *StaticToken { + return &StaticToken{ + token: token, + extensions: extensions, + } +} + +func (k *StaticToken) Token() (*kafka.AccessToken, error) { + return &kafka.AccessToken{ + Token: k.token, + Extensions: k.extensions, + }, nil +}