diff --git a/helm/hpcc/templates/_helpers.tpl b/helm/hpcc/templates/_helpers.tpl index 0e2f2d1e661..0f4f354284d 100644 --- a/helm/hpcc/templates/_helpers.tpl +++ b/helm/hpcc/templates/_helpers.tpl @@ -190,6 +190,10 @@ Pass in root as . {{- $certificates := (.Values.certificates | default dict) -}} {{- $issuers := ($certificates.issuers | default dict) -}} {{- $security := .Values.security | default dict -}} +{{- if .Values.global.plugins }} +plugins: +{{- toYaml .Values.global.plugins | nindent 2 }} +{{- end }} deploymentName: {{ (include "hpcc.fullname" (dict "root" $)) }} mtls: {{ (include "hpcc.isMtlsEnabled" (dict "root" $)) }} imageVersion: {{ .Values.global.image.version | default .Chart.Version }} diff --git a/helm/hpcc/values.schema.json b/helm/hpcc/values.schema.json index 86d2966d332..2d1299226fd 100644 --- a/helm/hpcc/values.schema.json +++ b/helm/hpcc/values.schema.json @@ -431,6 +431,9 @@ "type": "object", "additionalProperties": { "type": "string" }, "description": "Global component annotations, generated into all components" + }, + "plugins": { + "$ref": "#/definitions/plugins" } }, "additionalProperties": false @@ -1728,6 +1731,9 @@ "default": false, "description": "Require SOAPCALL and HTTPCALL URLs are secrets or mapped to secrets" }, + "plugins": { + "$ref": "#/definitions/plugins" + }, "expert": { "$ref": "#/definitions/expert" } @@ -2451,6 +2457,9 @@ }, "hpa": { "$ref": "#/definitions/hpa" + }, + "plugins": { + "$ref": "#/definitions/plugins" } } }, @@ -2710,6 +2719,9 @@ "type": "boolean", "default": false, "description": "Require SOAPCALL and HTTPCALL URLs are secrets or mapped to secrets" + }, + "plugins": { + "$ref": "#/definitions/plugins" } } }, @@ -3511,6 +3523,11 @@ "expert": { "description": "Settings for developers, debugging and testing", "type": "object" + }, + "plugins": { + "description": "plugin configurations", + "type": "object", + "additionalProperties": true } } } diff --git a/plugins/kafka/README.md b/plugins/kafka/README.md index 645fa52ee40..2cf6b11beff 100644 --- a/plugins/kafka/README.md +++ b/plugins/kafka/README.md @@ -28,6 +28,11 @@ your installation by creating a topic and interacting with it. ## Plugin Configuration +*Expand the section that matches your runtime environment (bare metal vs containerized)* + +
+Configuring in a Bare Metal Environment + The Apache Kafka plugin uses sensible default configuration values but these can be modified via configuration files. @@ -82,6 +87,96 @@ overriding their normal default values: queued.max.messages.kbytes=10000000 fetch.message.max.bytes=10000000 auto.offset.reset=smallest +
+ +
+Configuring in a Containerized Environment + +The Apache Kafka plugin uses sensible default configuration values but these can +be modified via configuration entries in the HPCC Systems Helm chart. + +Configuration entries can be placed in the global section of your Helm chart (which +means that the settings are applied everywhere) or they can be placed within +a component (meaning, they apply to only that component). The latter is useful +for settings things differently for differently-configured Thors, for instance. + +Note that global and per-component configuration settings are **merged**. Per- +component settings override global settings. Further details regarding the merge +are at the end of this section. + +There are two types of configurations: Global and per-topic. Some +configuration parameters are applicable only to publishers (producers, in Apache +Kafka's terminology), others only to consumers, and some to both. Details on +the supported configuration parameters can be found on the [librdkafka +configuration +page](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). + +A configuration block is a Helm entry with a series of key/value +parameters, formatted like this: + + plugins: + kafka: + global: + - name: key + value: value + - name: key + value: value + ... + - name: key + value: value + +This block can be added to the globals section of the Helm chart or +to a specific component. + +Whenever a new connection is created (either publisher or consumer) the plugin +will scan for configuration entries. The global +configuration block should be named `global`. Per-topic configuration +blocks are also supported, and they can be different for a publisher or a +consumer. For a publisher, the naming convention is +`publisher_topic_` and for a consumer it is +`consumer_topic_`. In both cases, `` is the +name of the topic you are publishing to or consuming from. + +Settings that affect the protocol used to connect to the Kafka broker (such as +using SSL) should be placed only in the global configuration block, not in +any per-topic configuration block. + +Configuration parameters loaded from a Helm chart override those set by the plugin +with one exception: the `metadata.broker.list` setting, if found in a +configuration block, is ignored. Apache Kafka brokers are always set in ECL. + +If configuration blocks are found in multiple locations, their keys/values are **merged**. +The order of merging is: + + 1. Kafka global settings from the chart's Global/plugins section + 2. Kafka global settings from the per-component section + 3. Kafka consumer/producer settings from the chart's Global/plugins section + 4. Kafka consumer/producer settings from the per-component section + +The following configuration parameters are set by the plugin for publishers, +overriding their normal default values: + + - name: queue.buffering.max.messages + value: 1000000 + - name: compression.codec + value: snappy + - name: message.send.max.retries + value: 3 + - name: retry.backoff.ms + value: 500 + +The following configuration parameters are set by the plugin for consumers, +overriding their normal default values: + + - name: compression.codec + value: snappy + - name: queued.max.messages.kbytes + value: 10000000 + - name: fetch.message.max.bytes + value: 10000000 + - name: auto.offset.reset + value: smallest +
## Publishing messages with the plugin @@ -299,11 +394,11 @@ Topic partitioning is covered in Apache Kafka's is a performance relationship between the number of partitions in a topic and the size of the HPCC cluster when consuming messages. Ideally, the number of partitions will exactly equal the number of HPCC nodes consuming messages. For -Thor, this means the total number of slaves rather than the number of nodes, as -that can be different in a multi-slave setup. For Roxie, the number is always -one. If there are fewer partitions than nodes (slaves) then not all of your +Thor, this means the total number of workers rather than the number of nodes, as +that can be different in a multi-worker setup. For Roxie, the number is always +one. If there are fewer partitions than nodes (workers) then not all of your cluster will be utilized when consuming messages; if there are more partitions -than nodes (slaves) then some nodes will be performing extra work, consuming +than nodes (workers) then some nodes will be performing extra work, consuming from multiple partitions. In either mismatch case, you may want to consider using the ECL DISTRIBUTE() function to redistribute your data before processing. @@ -345,19 +440,21 @@ long as needed. ### Saved Topic Offsets -By default, consumers save to a file the offset of the last-read message from a -given topic, consumer group, and partition combination. The offset is saved so -that the next time the consumer is fired up for that particular connection -combination, the consumption process can pick up where it left off. The file is -saved to the HPCC engine's data directory which is typically +By default, in a bare-metal environment, consumers save to a file the offset of the +last-read message from a given topic, consumer group, and partition combination. +The offset is saved so that the next time the consumer is fired up for that +particular connection combination, the consumption process can pick up where it left +off. The file is saved to the HPCC engine's data directory which is typically `/var/lib/HPCCSystems/mythor/`, `/var/lib/HPCCSystems/myroxie/` or `/var/lib/HPCCSystems/myeclagent/` depending on the engine you're using (the exact path may be different if you have named an engine differently in your HPCC configuration). The format of the saved offset filename is `--.offset`. -Note that saving partition offsets is engine-specific. One practical -consideration of this is that you cannot have one engine (e.g. Thor) consume -from a given topic and then have another engine (e.g. Roxie) consume the next -set of messages from that topic. Both engines can consume messages without a -problem, but they will not track each other's last-read positions. +Note that saving partition offsets is engine-specific in a bare-metal environment. +One practical consideration of this is that you cannot have one engine (e.g. Thor) +consume from a given topic and then have another engine (e.g. Roxie) consume the +next set of messages from that topic. Both engines can consume messages without a +problem, but they will not track each other's last-read positions. Note that in a +containerized environment different engines will use each others' offsets provided +that they use the same consumer group. diff --git a/plugins/kafka/kafka.cpp b/plugins/kafka/kafka.cpp index f59247aa6cd..fa22f235694 100644 --- a/plugins/kafka/kafka.cpp +++ b/plugins/kafka/kafka.cpp @@ -40,7 +40,11 @@ namespace KafkaPlugin //-------------------------------------------------------------------------- // Filename of global Kafka configuration file - const char* GLOBAL_CONFIG_FILENAME = "kafka_global.conf"; +#ifdef _CONTAINERIZED + const char* GLOBAL_CONFIG_NAME = "global"; +#else + const char* GLOBAL_CONFIG_NAME = "kafka_global.conf"; +#endif // The minimum number of seconds that a cached object can live // without activity @@ -54,24 +58,79 @@ namespace KafkaPlugin // Static Methods (internal) //-------------------------------------------------------------------------- + static void applyConfigProps(const IPropertyTree * props, const char* configName, RdKafka::Conf* configPtr) + { + if (props) + { + Owned iter = props->getElements(configName); + std::string errStr; + + ForEach(*iter) + { + IPropertyTree & entry = iter->query(); + const char* name = entry.queryProp("@name"); + const char* value = entry.queryProp("@value"); + + if (name && *name) + { + if (!strisame(name, "metadata.broker.list")) + { + if (!isEmptyString(value)) + { + if (configPtr->set(name, value, errStr) != RdKafka::Conf::CONF_OK) + { + OWARNLOG("Kafka: Failed to set config param from entry %s: '%s' = '%s'; error: '%s'", configName, name, value, errStr.c_str()); + } + else if (doTrace(traceKafka)) + { + DBGLOG("Kafka: Set config param from entry %s: '%s' = '%s'", configName, name, value); + } + } + } + else + { + OWARNLOG("Kafka: Setting '%s' ignored in config %s", name, configName); + } + } + } + } + } + + /** + * Look for an optional configuration entry and apply any found parameters + * to a librdkafka configuration object. + * + * @param configName The name of the configuration key within plugins/kafka; + * it is not required for the key to exist + * @param configPtr A pointer to the configuration object that + * will receive any found parameters + */ + static void applyYAMLConfig(const char* configName, RdKafka::Conf* configPtr) + { + if (configName && *configName && configPtr) + { + applyConfigProps(getGlobalConfigSP()->queryPropTree("plugins/kafka"), configName, configPtr); + applyConfigProps(getComponentConfigSP()->queryPropTree("plugins/kafka"), configName, configPtr); + } + } + /** - * Look for an optional configuration file and apply any found configuration - * parameters to a librdkafka configuration object. + * Look for an optional configuration file in a bare metal environment and + * apply any found configuration parameters to a librdkafka configuration object. * - * @param configFilePath The path to a configuration file; it is not + * @param configName The name of the configuration file; it is not * necessary for the file to exist - * @param globalConfigPtr A pointer to the configuration object that + * @param configPtr A pointer to the configuration object that * will receive any found parameters */ - static void applyConfig(const char* configFilePath, RdKafka::Conf* globalConfigPtr) + static void applyConfig(const char* configName, RdKafka::Conf* configPtr) { - if (configFilePath && *configFilePath && globalConfigPtr) + if (configName && *configName && configPtr) { std::string errStr; StringBuffer fullConfigPath; - fullConfigPath.append(hpccBuildInfo.configDir).append(PATHSEPSTR).append(configFilePath); - + fullConfigPath.append(hpccBuildInfo.configDir).append(PATHSEPSTR).append(configName); Owned properties = createProperties(fullConfigPath.str(), true); Owned props = properties->getIterator(); @@ -83,25 +142,25 @@ namespace KafkaPlugin if (key.length() > 0 && key.charAt(0) != '#') { - if (strcmp(key.str(), "metadata.broker.list") != 0) + if (!strisame(key.str(), "metadata.broker.list")) { const char* value = properties->queryProp(key); - if (value && *value) + if (!isEmptyString(value)) { - if (globalConfigPtr->set(key.str(), value, errStr) != RdKafka::Conf::CONF_OK) + if (configPtr->set(key.str(), value, errStr) != RdKafka::Conf::CONF_OK) { - DBGLOG("Kafka: Failed to set config param from file %s: '%s' = '%s'; error: '%s'", configFilePath, key.str(), value, errStr.c_str()); + OWARNLOG("Kafka: Failed to set config param from file %s: '%s' = '%s'; error: '%s'", configName, key.str(), value, errStr.c_str()); } else if (doTrace(traceKafka)) { - DBGLOG("Kafka: Set config param from file %s: '%s' = '%s'", configFilePath, key.str(), value); + DBGLOG("Kafka: Set config param from file %s: '%s' = '%s'", configName, key.str(), value); } } } else { - DBGLOG("Kafka: Setting '%s' ignored in config file %s", key.str(), configFilePath); + OWARNLOG("Kafka: Setting '%s' ignored in config file %s", key.str(), configName); } } } @@ -367,7 +426,14 @@ namespace KafkaPlugin // Set any global configurations from file, allowing // overrides of above settings - applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig); + if (isContainerized()) + { + applyYAMLConfig(GLOBAL_CONFIG_NAME, globalConfig); + } + else + { + applyConfig(GLOBAL_CONFIG_NAME, globalConfig); + } // Set producer callbacks globalConfig->set("event_cb", static_cast(this), errStr); @@ -382,8 +448,17 @@ namespace KafkaPlugin RdKafka::Conf* topicConfPtr = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); // Set any topic configurations from file - std::string confName = "kafka_publisher_topic_" + topic + ".conf"; - applyConfig(confName.c_str(), topicConfPtr); + std::string confName; + if (isContainerized()) + { + confName = "publisher_topic_" + topic; + applyYAMLConfig(confName.c_str(), topicConfPtr); + } + else + { + confName = "kafka_publisher_topic_" + topic + ".conf"; + applyConfig(confName.c_str(), topicConfPtr); + } // Create the topic topicPtr.store(RdKafka::Topic::create(producerPtr, topic, topicConfPtr, errStr), std::memory_order_release); @@ -480,7 +555,7 @@ namespace KafkaPlugin else payloadStr.append(message.len(), static_cast(message.payload())); - DBGLOG("Kafka: Error publishing message: %d (%s); message: '%s'", message.err(), message.errstr().c_str(), payloadStr.str()); + OWARNLOG("Kafka: Error publishing message: %d (%s); message: '%s'", message.err(), message.errstr().c_str(), payloadStr.str()); } } @@ -495,21 +570,25 @@ namespace KafkaPlugin consumerPtr = NULL; topicPtr = NULL; - char cpath[_MAX_DIR]; + if (!isContainerized()) + { + char cpath[_MAX_DIR]; - GetCurrentDirectory(_MAX_DIR, cpath); - offsetPath.append(cpath); - addPathSepChar(offsetPath); + if (!GetCurrentDirectory(_MAX_DIR, cpath)) + throw MakeStringException(-1, "Unable to determine current directory in order to save Kafka consumer offset file"); + offsetPath.append(cpath); + addPathSepChar(offsetPath); - offsetPath.append(topic.c_str()); - offsetPath.append("-"); - offsetPath.append(partitionNum); - if (!consumerGroup.empty()) - { + offsetPath.append(topic.c_str()); offsetPath.append("-"); - offsetPath.append(consumerGroup.c_str()); + offsetPath.append(partitionNum); + if (!consumerGroup.empty()) + { + offsetPath.append("-"); + offsetPath.append(consumerGroup.c_str()); + } + offsetPath.append(".offset"); } - offsetPath.append(".offset"); } Consumer::~Consumer() @@ -536,7 +615,8 @@ namespace KafkaPlugin if (!topicPtr.load(std::memory_order_relaxed)) { - initFileOffsetIfNotExist(); + if (!isContainerized()) + initFileOffsetIfNotExist(); std::string errStr; RdKafka::Conf* globalConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); @@ -551,7 +631,14 @@ namespace KafkaPlugin // Set any global configurations from file, allowing // overrides of above settings - applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig); + if (isContainerized()) + { + applyYAMLConfig(GLOBAL_CONFIG_NAME, globalConfig); + } + else + { + applyConfig(GLOBAL_CONFIG_NAME, globalConfig); + } // Set consumer callbacks globalConfig->set("event_cb", static_cast(this), errStr); @@ -570,16 +657,28 @@ namespace KafkaPlugin // Set any topic configurations from file, allowing // overrides of above settings - std::string confName = "kafka_consumer_topic_" + topic + ".conf"; - applyConfig(confName.c_str(), topicConfPtr); + std::string confName; + if (isContainerized()) + { + confName = "consumer_topic_" + topic; + applyYAMLConfig(confName.c_str(), topicConfPtr); + } + else + { + confName = "kafka_consumer_topic_" + topic + ".conf"; + applyConfig(confName.c_str(), topicConfPtr); + } // Ensure that some items are set a certain way // by setting them after loading the external conf topicConfPtr->set("auto.commit.enable", "false", errStr); - // Additional settings for updated librdkafka topicConfPtr->set("enable.auto.commit", "false", errStr); - topicConfPtr->set("offset.store.method", "file", errStr); - topicConfPtr->set("offset.store.path", offsetPath.str(), errStr); + + if (!isContainerized()) + { + topicConfPtr->set("offset.store.method", "file", errStr); + topicConfPtr->set("offset.store.path", offsetPath.str(), errStr); + } // Create the topic topicPtr.store(RdKafka::Topic::create(consumerPtr, topic, topicConfPtr, errStr), std::memory_order_release); @@ -633,20 +732,29 @@ namespace KafkaPlugin { if (offset >= 0) { - // Not using librdkafka's offset_store because it seems to be broken - // topicPtr->offset_store(partitionNum, offset); - - // Create/overwrite a file using the same naming convention and - // file contents that librdkafka uses so it can pick up where - // we left off; NOTE: librdkafka does not clean the topic name - // or consumer group name when constructing this path - // (which is actually a security concern), so we can't clean, either - std::ofstream outFile(offsetPath.str(), std::ofstream::trunc); - outFile << offset; + if (isContainerized()) + { + topicPtr.load()->offset_store(partitionNum, offset); - if (doTrace(traceKafka)) + if (doTrace(traceKafka)) + { + DBGLOG("Kafka: Saved offset %lld", offset); + } + } + else { - DBGLOG("Kafka: Saved offset %lld to %s", offset, offsetPath.str()); + // Create/overwrite a file using the same naming convention and + // file contents that librdkafka uses so it can pick up where + // we left off; NOTE: librdkafka does not clean the topic name + // or consumer group name when constructing this path + // (which is actually a security concern), so we can't clean, either + std::ofstream outFile(offsetPath.str(), std::ofstream::trunc); + outFile << offset; + + if (doTrace(traceKafka)) + { + DBGLOG("Kafka: Saved offset %lld to %s", offset, offsetPath.str()); + } } } } @@ -889,7 +997,14 @@ namespace KafkaPlugin if (globalConfig) { // Load global config to pick up any protocol modifications - applyConfig(GLOBAL_CONFIG_FILENAME, globalConfig); + if (isContainerized()) + { + applyYAMLConfig(GLOBAL_CONFIG_NAME, globalConfig); + } + else + { + applyConfig(GLOBAL_CONFIG_NAME, globalConfig); + } // rd_kafka_new() takes ownership of the lower-level conf object, which in this case is a // pointer currently owned by globalConfig; we need to pass a duplicate