-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-33304 Kafka plugin: Support custom client configurations in Helm chart #19472
base: candidate-9.8.x
Are you sure you want to change the base?
HPCC-33304 Kafka plugin: Support custom client configurations in Helm chart #19472
Conversation
Jira Issue: https://hpccsystems.atlassian.net//browse/HPCC-33304 Jirabot Action Result: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcamper - looks like there are some compilation issues.
@@ -1728,6 +1733,11 @@ | |||
"default": false, | |||
"description": "Require SOAPCALL and HTTPCALL URLs are secrets or mapped to secrets" | |||
}, | |||
"plugins": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this (and others in other engines) should reuse the "definitions.plugins" you've added at line 434
e.g.:
"plugins": {
"$ref": "#/definitions/plugins"
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though I admit that I find this syntax confusing.
plugins/kafka/README.md
Outdated
@@ -345,7 +440,8 @@ long as needed. | |||
|
|||
### Saved Topic Offsets | |||
|
|||
By default, consumers save to a file the offset of the last-read message from a | |||
By default, in a bare-metal environment, consumers save to a file the offset of the | |||
last-read message from a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the newline/wrap need adjusting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed here as well as the other one. That's what I get for staring at the Markdown rendered version.
plugins/kafka/README.md
Outdated
@@ -356,8 +452,11 @@ exact path may be different if you have named an engine differently in your HPCC | |||
configuration). The format of the saved offset filename is | |||
`<TopicName>-<PartitionNum>-<ConsumerGroup>.offset`. | |||
|
|||
Note that saving partition offsets is engine-specific. One practical | |||
Note that saving partition offsets is engine-specific in a bare-metal environment. | |||
One practical |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcamper - please see comments.
plugins/kafka/kafka.cpp
Outdated
@@ -54,24 +58,80 @@ namespace KafkaPlugin | |||
// Static Methods (internal) | |||
//-------------------------------------------------------------------------- | |||
|
|||
#ifdef _CONTAINERIZED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we normally try to compile both code alternatives, by using isContainerized(), unless we can't because it's dependent on bits of the system that are [e.g.] not in containerized.
With some hope that eventually almost all the code base will follow the 'containerized' approach.
In this case.. I guess it's too late for the code to be common?
e.g. could the BM system use yaml config files, and thus this applyConfigProps be common? Prob. not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I don't think that would work. We do have active users on the bare metal side of things, so that configuration layout and usage of local files needs to stay in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought so, but wanted to check.
Avoiding the #ifdef _CONTAINERIZED I think still worthwhile though. It prevents hitting issues like the compile problem as both code paths are being compiled either way (hopefully there isn't too much code bloat, and BM code will slowly fade away).
plugins/kafka/kafka.cpp
Outdated
|
||
if (name && *name) | ||
{ | ||
if (stricmp(name, "metadata.broker.list") != 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: clearer to combine the 2 ifs statements and use strsame
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, but the else from the first if() leads to a DBGLOG that is useful. I did change the second if() to use !strisame() though. Also changed similar test on line ~146.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DBGLOG may be enough, but at some point we aim to turn them off, so should they be OWARN or similar? (or will that be too verbose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent point. I converted a few DBGLOG calls to OWARNLOG.
plugins/kafka/kafka.cpp
Outdated
{ | ||
if (stricmp(name, "metadata.broker.list") != 0) | ||
{ | ||
if (value && *value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: !isEmptyString ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it; done (in both places).
plugins/kafka/kafka.cpp
Outdated
if (configName && *configName && configPtr) | ||
{ | ||
applyConfigProps(getGlobalConfigSP()->getPropTree("plugins/kafka"), configName, configPtr); | ||
applyConfigProps(getComponentConfigSP()->getPropTree("plugins/kafka"), configName, configPtr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there ever any expense with calling RdKafka::Conf->set ? And if so, would it be better to compile (merge) the global/component configs, and call applyConfigProps just once ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. could do:
Owned<IPropertyTree> globalCopy = createPTreeFromIPT(getGlobalConfigSP()->queryPropTree("plugins/kafka"));
synchronizePTree(globalCopy, getComponentConfigSP()->queryPropTree("plugins/kafka"), false);
applyConfigProps(globalCopy, configName, configPtr);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is very little expense in Conf->set; it is updating some C struct values. I would leave this version as-is, especially since it is not on a hot path (used only during setup, not when passing messages).
plugins/kafka/kafka.cpp
Outdated
{ | ||
if (configName && *configName && configPtr) | ||
{ | ||
applyConfigProps(getGlobalConfigSP()->getPropTree("plugins/kafka"), configName, configPtr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is leaking the IPT from getPropTree, should be queryPropTree
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
plugins/kafka/kafka.cpp
Outdated
@@ -54,24 +58,80 @@ namespace KafkaPlugin | |||
// Static Methods (internal) | |||
//-------------------------------------------------------------------------- | |||
|
|||
#ifdef _CONTAINERIZED | |||
static void applyConfigProps(IPropertyTree * props, const char* configName, RdKafka::Conf* configPtr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: perhaps make 'props' const for clarity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea; done.
plugins/kafka/kafka.cpp
Outdated
@@ -495,6 +560,7 @@ namespace KafkaPlugin | |||
consumerPtr = NULL; | |||
topicPtr = NULL; | |||
|
|||
#ifndef _CONTAINERIZED | |||
char cpath[_MAX_DIR]; | |||
|
|||
GetCurrentDirectory(_MAX_DIR, cpath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll get a compiler warning because not checking the return value for this (getpwd).
(we validate return elsewhere)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a check with an exception if the cwd couldn't be found.
plugins/kafka/kafka.cpp
Outdated
* @param configPtr A pointer to the configuration object that | ||
* will receive any found parameters | ||
*/ | ||
static void applyConfig(const char* configName, RdKafka::Conf* configPtr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than having 2 'static void applyConfig(const char* configName, RdKafka::Conf* configPtr)' definitions, I'd vote for having the some isContainerized() code 1.
plugins/kafka/kafka.hpp
Outdated
/** | ||
* If the offset file does not exist, create one with a | ||
* default offset | ||
*/ | ||
void initFileOffsetIfNotExist() const; | ||
void _initFileOffsetIfNotExist() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like this is what's causing the compile error issues (in BM).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well that was a rookie mistake. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for now, but meant to ask.
How long are the Consumer/Publisher instances expected to last?
Max = lifetime of a query?
If beyond the lifetime of a query, then might need to think about refreshing the configuration, when a component configuration is pushed.
We have 'installConfigUpdateHook' (and CConfigUpdateHook) for that purpose, registered and called back to perform update when the config changes.
Publishers can hang around until module destruction; the underlying library maintains its own cache with another thread to push messages, so it needs time to drain pending messages. Consumers last only as long as the query. I suspect that this is correct behavior and that pushed (containerized) config changes would get picked up during the next query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcamper - looks good. Please squash.
Is 9.8 the right target (needed there?) or is 9.10 more suitable?
1a72522
to
f1dca9b
Compare
9.8 if possible, for external customers who have not yet migrated to 9.10 but are trying containerized. There is no new functionality in this change, just pulling the same configurations from a different source. I will defer to you however and rebase if 9.10 is more appropriate. Squashed. |
f1dca9b
to
9e8b0dd
Compare
@@ -431,6 +431,11 @@ | |||
"type": "object", | |||
"additionalProperties": { "type": "string" }, | |||
"description": "Global component annotations, generated into all components" | |||
}, | |||
"plugins": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this definition is under "globals", it should be under "definitions".
And this should be a reference to the definition just as it is under the engine schemas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcamper - 2 of the github action checks failing are relevant - both are related to the values schema.
helm/hpcc/values.schema.json
Outdated
@@ -3511,6 +3523,11 @@ | |||
"expert": { | |||
"description": "Settings for developers, debugging and testing", | |||
"type": "object" | |||
} | |||
}, | |||
"plugins": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation level is off.
Back to you, Jake. There are two failing tests but AFAICS they don't relate to these changes. |
Type of change:
Checklist:
Smoketest:
Testing:
Manual testing to ensure that helm chart is configured correctly and the plugin code can read the Kafka configuration options there.