From 62f8306d3ffb2bc95ca07003e8816deb740d6154 Mon Sep 17 00:00:00 2001 From: Nicolai Cornelis Date: Fri, 25 Oct 2024 04:03:07 +0200 Subject: [PATCH 1/2] Adjust JSON schema; Add missing sasl_options Add missing defaults for timeouts Correctly reference cert/key from https package Deduplicate Offset type Add missing additionalProperties Make group options group ID required Remove "none" compression codec (just leave blank or omit) Fix redundant TLS enable check Improve error messages --- kafkajobs/config.go | 16 +-- schema.json | 255 ++++++++++++++++++++++++-------------------- 2 files changed, 145 insertions(+), 126 deletions(-) diff --git a/kafkajobs/config.go b/kafkajobs/config.go index 896f702..1072413 100644 --- a/kafkajobs/config.go +++ b/kafkajobs/config.go @@ -45,7 +45,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { // check for the key and cert files if _, err := os.Stat(c.TLS.Key); err != nil { if os.IsNotExist(err) { - return nil, errors.Errorf("key file '%s' does not exists", c.TLS.Key) + return nil, errors.Errorf("private key file '%s' does not exist", c.TLS.Key) } return nil, err @@ -53,7 +53,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { if _, err := os.Stat(c.TLS.Cert); err != nil { if os.IsNotExist(err) { - return nil, errors.Errorf("cert file '%s' does not exists", c.TLS.Cert) + return nil, errors.Errorf("public certificate file '%s' does not exist", c.TLS.Cert) } return nil, err @@ -63,7 +63,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { if c.TLS.RootCA != "" { if _, err := os.Stat(c.TLS.RootCA); err != nil { if os.IsNotExist(err) { - return nil, errors.Errorf("rootCA file '%s' does not exists", c.TLS.RootCA) + return nil, errors.Errorf("root CA file '%s' does not exist", c.TLS.RootCA) } return nil, err @@ -116,7 +116,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { case awsMskIam: sess, err := session.NewSession() if err != nil { - return nil, errors.Errorf("unable to initialize aws session: %v", err) + return nil, errors.Errorf("unable to initialize AWS session: %v", err) } opts = append(opts, kgo.SASL(aws.ManagedStreamingIAM(func(ctx context.Context) (aws.Auth, error) { @@ -144,7 +144,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { if c.GroupOpts != nil { if c.GroupOpts.GroupID == "" { - return nil, errors.Str("no group for the group options") + return nil, errors.Str("no group ID defined for group options") } opts = append(opts, kgo.ConsumerGroup(c.GroupOpts.GroupID)) @@ -210,7 +210,7 @@ func (c *config) InitDefault() ([]kgo.Opt, error) { opts = append(opts, kgo.ConsumeTopics(c.ConsumerOpts.Topics...)) case len(c.ConsumerOpts.ConsumePartitions) > 0: default: - return nil, errors.Str("topics should not be empty for the consumer") + return nil, errors.Str("topics and consume partitions should not be empty for the consumer") } if c.ConsumerOpts.ConsumerOffset != nil { @@ -334,7 +334,7 @@ func (c *config) tlsConfig() (*tls.Config, error) { } if ok := certPool.AppendCertsFromPEM(rca); !ok { - return nil, errors.Errorf("could not append Certs from PEM") + return nil, errors.Errorf("could not append certificates from Root CA file '%s'", c.TLS.RootCA) } tlsDialerConfig.Certificates = []tls.Certificate{cert} @@ -353,7 +353,7 @@ func (c *config) tlsConfig() (*tls.Config, error) { func (c *config) enableTLS() bool { if c.TLS != nil { - return (c.TLS.RootCA != "" && c.TLS.Key != "" && c.TLS.Cert != "") || (c.TLS.Key != "" && c.TLS.Cert != "") + return c.TLS.Key != "" && c.TLS.Cert != "" } return false } diff --git a/schema.json b/schema.json index cb01605..f87be5d 100644 --- a/schema.json +++ b/schema.json @@ -31,11 +31,12 @@ "default": false }, "producer_options": { - "description": "Kafka producer options", + "description": "Kafka producer options.", "type": "object", + "additionalProperties": false, "properties": { "disable_idempotent": { - "description": "Disable_idempotent disables idempotent produce requests, opting out of Kafka server-side deduplication in the face of reissued requests due to transient network problems. Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER (pre Kafka 3.0), and not all clients can have that permission.", + "description": "Disables idempotent produce requests, opting out of Kafka server-side deduplication in the face of reissued requests due to transient network problems. Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER (pre Kafka 3.0), and not all clients can have that permission.", "type": "boolean", "default": false }, @@ -56,13 +57,13 @@ }, "request_timeout": { "description": "The maximum duration in seconds the broker will wait the receipt of the number of required_acks.", - "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/Duration" + "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/Duration", + "default": "10s" }, "compression_codec": { + "description": "The compression codec to use, if any.", "type": "string", - "default": "none", "enum": [ - "none", "gzip", "snappy", "lz4", @@ -75,20 +76,26 @@ }, "transaction_timeout": { "description": "The allowed duration for a transaction. It is a good idea to keep this less than a group's session timeout.", - "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/Duration" + "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/Duration", + "default": "40s" } } }, "group_options": { "type": "object", + "required": [ + "group_id" + ], "description": "group_options sets the consumer group for the client to join and consume in. This option is required if using any other group options.", + "additionalProperties": false, "properties": { "group_id": { - "description": "Kafka group ID", - "type": "string" + "description": "Kafka Group ID", + "type": "string", + "minLength": 1 }, "block_rebalance_on_poll": { - "description": "Switches the client to block rebalances whenever you poll", + "description": "Switches the client to block rebalances whenever you poll.", "type": "boolean", "default": false } @@ -97,16 +104,18 @@ "consumer_options": { "description": "Kafka consumer options", "type": "object", + "additionalProperties": false, "properties": { "topics": { - "description": "List of the topics to consume. Regex also supported.", + "description": "List of the topics to consume. Regex also supported. At least a `topic` or a partition in `consume_partition` should be provided.", "type": "array", "items": { - "type": "string" + "type": "string", + "minLength": 1 } }, "consume_regexp": { - "description": "consume_regexp sets the client to parse all topics passed to `topics` as regular expressions. When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it permanently is known to match, or is permanently known to not match.", + "description": "Sets the client to parse all topics passed to `topics` as regular expressions. When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it permanently is known to match, or is permanently known to not match.", "type": "boolean", "default": false }, @@ -118,72 +127,31 @@ "type": "integer", "default": 1 }, - "consume_offset": { - "description": "consumer_offset sets the offset to start consuming from, or if OffsetOutOfRange is seen while fetching, to restart consuming from.", - "type": "object", - "required": [ - "type" - ], - "properties": { - "type": { - "description": "Partition offset type", - "type": "string", - "enum": [ - "AtEnd", - "At", - "AfterMilli", - "AtStart", - "Relative", - "WithEpoch" - ] - }, - "value": { - "description": "Value for the: At, AfterMilli, Relative and WithEpoch offsets", - "type": "integer", - "default": 0 - } - } + "consumer_offset": { + "$ref": "#/definitions/Offset" }, "consume_partitions": { "type": "object", "minProperties": 1, + "additionalProperties": false, "patternProperties": { "^[a-zA-Z0-9._-]+$": { - "description": "Topic to consume", + "description": "Topic to consume.", "type": "object", "minProperties": 1, + "additionalProperties": false, "patternProperties": { "^[0-9]+$": { - "type": "object", - "description": "Partition number.", - "required": [ - "type" - ], - "properties": { - "type": { - "description": "Partition offset type", - "type": "string", - "enum": [ - "AtEnd", - "At", - "AfterMilli", - "AtStart", - "Relative", - "WithEpoch" - ] - }, - "value": { - "description": "Value for the: At, AfterMilli, Relative and WithEpoch offsets", - "type": "integer", - "default": 0 - } - } + "$ref": "#/definitions/Offset" } } } } } } + }, + "sasl_options": { + "$ref": "#/definitions/SASL" } } } @@ -211,18 +179,24 @@ "title": "TLS Configuration", "description": "TLS configuration for TLS for Kafka.", "type": "object", + "additionalProperties": false, "properties": { + "timeout": { + "description": "Timeout of TLS connections.", + "default": "10s", + "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/Duration" + }, "key": { - "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/TLSKeyFile" + "$ref": "https://raw.githubusercontent.com/roadrunner-server/http/refs/heads/master/schema.json#/$defs/SSL/properties/key" }, "cert": { - "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/TLSCertFile" + "$ref": "https://raw.githubusercontent.com/roadrunner-server/http/refs/heads/master/schema.json#/$defs/SSL/properties/cert" }, "root_ca": { - "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/TLSCAFile" + "$ref": "https://raw.githubusercontent.com/roadrunner-server/http/refs/heads/master/schema.json#/$defs/SSL/properties/root_ca" }, "client_auth_type": { - "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/TLSClientAuthType" + "$ref": "https://raw.githubusercontent.com/roadrunner-server/http/refs/heads/master/schema.json#/$defs/ClientAuthType" } }, "required": [ @@ -231,58 +205,103 @@ ] }, "sasl": { - "title": "SASL Authentication", - "type": "object", - "description": "SASL configuration for Kafka.", - "properties": { - "mechanism": { - "description": "Mechanism used for the authentication.", - "type": "string", - "enum": [ - "aws_msk_iam", - "plain", - "SCRAM-SHA-256", - "SCRAM-SHA-512" - ] - }, - "username": { - "description": "Username for authentication.", - "type": "string" - }, - "password": { - "description": "Password for authentication.", - "type": "string" - }, - "nonce": { - "description": "Optional for the SHA auth types. Empty by default.", - "type": "string" - }, - "is_token": { - "description": "If true, suffixes the tokenauth=true extra attribute to the initial authentication message. Set this to true if the user and pass are from a delegation token. Optional for the SHA auth types. Defaults to false.", - "type": "boolean", - "default": false - }, - "zid": { - "description": "Zid is an optional authorization ID to use in authenticating", - "type": "string" - }, - "access_key": { - "description": "AWS Access key ID", - "type": "string" - }, - "secret_key": { - "description": "AWS Secret Access key ID", - "type": "string" - }, - "session_token": { - "description": "SessionToken, if non-empty, is a session / security token to use for authentication. See the following link for more details: https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html", - "type": "string" - }, - "user_agent": { - "description": "UserAgent is the user agent to for the client to use when connecting to Kafka, overriding the default franz-go//. Setting a UserAgent allows authorizing based on the aws:UserAgent condition key; see the following link for more details: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent", - "type": "string" - } - } + "$ref": "#/definitions/SASL" + }, + "ping": { + "$ref": "#/definitions/Ping" + } + } + }, + "Offset": { + "type": "object", + "description": "Sets the offset to start consuming from, or if OffsetOutOfRange is seen while fetching, to restart consuming from.", + "required": [ + "type" + ], + "additionalProperties": false, + "properties": { + "type": { + "description": "Partition offset type.", + "type": "string", + "enum": [ + "AtEnd", + "At", + "AfterMilli", + "AtStart", + "Relative", + "WithEpoch" + ] + }, + "value": { + "description": "Value for the: At, AfterMilli, Relative and WithEpoch offsets.", + "type": "integer", + "default": 0 + } + } + }, + "SASL": { + "title": "SASL Authentication", + "type": "object", + "description": "SASL configuration for Kafka.", + "additionalProperties": false, + "properties": { + "mechanism": { + "description": "Mechanism used for the authentication.", + "type": "string", + "enum": [ + "aws_msk_iam", + "plain", + "SCRAM-SHA-256", + "SCRAM-SHA-512" + ] + }, + "username": { + "description": "Username for authentication.", + "type": "string" + }, + "password": { + "description": "Password for authentication.", + "type": "string" + }, + "nonce": { + "description": "Optional for the SHA auth types. Empty by default.", + "type": "string" + }, + "is_token": { + "description": "If true, suffixes the tokenauth=true extra attribute to the initial authentication message. Set this to true if the user and pass are from a delegation token. Optional for the SHA auth types. Defaults to false.", + "type": "boolean", + "default": false + }, + "zid": { + "description": "Zid is an optional authorization ID to use in authentication.", + "type": "string" + }, + "access_key": { + "description": "AWS Access Key ID", + "type": "string" + }, + "secret_key": { + "description": "AWS Access Key Secret", + "type": "string" + }, + "session_token": { + "description": "SessionToken, if non-empty, is a session / security token to use for authentication. See the following link for more details: https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html", + "type": "string" + }, + "user_agent": { + "description": "UserAgent is the user agent to for the client to use when connecting to Kafka, overriding the default franz-go//. Setting a UserAgent allows authorizing based on the aws:UserAgent condition key; see the following link for more details: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-useragent", + "type": "string" + } + } + }, + "Ping": { + "type": "object", + "additionalProperties": false, + "properties": { + "timeout": { + "description": "Timeout when pinging Kafka.", + "default": "10s", + "$ref": "https://raw.githubusercontent.com/roadrunner-server/roadrunner/refs/heads/master/schemas/config/3.0.schema.json#/definitions/Duration" } } } From 399579485f7eb024cf0e73cb7f197c8baa8ee43d Mon Sep 17 00:00:00 2001 From: Nicolai Cornelis Date: Fri, 25 Oct 2024 17:13:40 +0200 Subject: [PATCH 2/2] typo --- schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/schema.json b/schema.json index f87be5d..95bac57 100644 --- a/schema.json +++ b/schema.json @@ -107,7 +107,7 @@ "additionalProperties": false, "properties": { "topics": { - "description": "List of the topics to consume. Regex also supported. At least a `topic` or a partition in `consume_partition` should be provided.", + "description": "List of the topics to consume. Regex also supported. At least a `topic` or a partition in `consume_partitions` should be provided.", "type": "array", "items": { "type": "string",