Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-33304 Kafka plugin: Support custom client configurations in Helm chart #19472

Open
wants to merge 3 commits into
base: candidate-9.8.x
Choose a base branch
from

Conversation

dcamper
Copy link
Contributor

@dcamper dcamper commented Feb 3, 2025

Type of change:

  • This change is a bug fix (non-breaking change which fixes an issue).
  • This change is a new feature (non-breaking change which adds functionality).
  • This change improves the code (refactor or other change that does not change the functionality)
  • This change fixes warnings (the fix does not alter the functionality or the generated code)
  • This change is a breaking change (fix or feature that will cause existing behavior to change).
  • This change alters the query API (existing queries will have to be recompiled)

Checklist:

  • My code follows the code style of this project.
    • My code does not create any new warnings from compiler, build system, or lint.
  • The commit message is properly formatted and free of typos.
    • The commit message title makes sense in a changelog, by itself.
    • The commit is signed.
  • My change requires a change to the documentation.
    • I have updated the documentation accordingly, or...
    • I have created a JIRA ticket to update the documentation.
    • Any new interfaces or exported functions are appropriately commented.
  • I have read the CONTRIBUTORS document.
  • The change has been fully tested:
    • I have added tests to cover my changes.
    • All new and existing tests passed.
    • I have checked that this change does not introduce memory leaks.
    • I have used Valgrind or similar tools to check for potential issues.
  • I have given due consideration to all of the following potential concerns:
    • Scalability
    • Performance
    • Security
    • Thread-safety
    • Cloud-compatibility
    • Premature optimization
    • Existing deployed queries will not be broken
    • This change fixes the problem, not just the symptom
    • The target branch of this pull request is appropriate for such a change.
  • There are no similar instances of the same problem that should be addressed
    • I have addressed them here
    • I have raised JIRA issues to address them separately
  • This is a user interface / front-end modification
    • I have tested my changes in multiple modern browsers
    • The component(s) render as expected

Smoketest:

  • Send notifications about my Pull Request position in Smoketest queue.
  • Test my draft Pull Request.

Testing:

Manual testing to ensure that helm chart is configured correctly and the plugin code can read the Kafka configuration options there.

@dcamper dcamper requested a review from jakesmith February 3, 2025 17:49
Copy link

github-actions bot commented Feb 3, 2025

Jira Issue: https://hpccsystems.atlassian.net//browse/HPCC-33304

Jirabot Action Result:
Workflow Transition To: Merge Pending
Updated PR

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcamper - looks like there are some compilation issues.

@@ -1728,6 +1733,11 @@
"default": false,
"description": "Require SOAPCALL and HTTPCALL URLs are secrets or mapped to secrets"
},
"plugins": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this (and others in other engines) should reuse the "definitions.plugins" you've added at line 434
e.g.:

        "plugins": {
          "$ref": "#/definitions/plugins"
        },

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, though I admit that I find this syntax confusing.

@@ -345,7 +440,8 @@ long as needed.

### Saved Topic Offsets

By default, consumers save to a file the offset of the last-read message from a
By default, in a bare-metal environment, consumers save to a file the offset of the
last-read message from a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the newline/wrap need adjusting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed here as well as the other one. That's what I get for staring at the Markdown rendered version.

@@ -356,8 +452,11 @@ exact path may be different if you have named an engine differently in your HPCC
configuration). The format of the saved offset filename is
`<TopicName>-<PartitionNum>-<ConsumerGroup>.offset`.

Note that saving partition offsets is engine-specific. One practical
Note that saving partition offsets is engine-specific in a bare-metal environment.
One practical
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@dcamper dcamper requested a review from jakesmith February 6, 2025 16:13
Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcamper - please see comments.

@@ -54,24 +58,80 @@ namespace KafkaPlugin
// Static Methods (internal)
//--------------------------------------------------------------------------

#ifdef _CONTAINERIZED
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we normally try to compile both code alternatives, by using isContainerized(), unless we can't because it's dependent on bits of the system that are [e.g.] not in containerized.
With some hope that eventually almost all the code base will follow the 'containerized' approach.

In this case.. I guess it's too late for the code to be common?
e.g. could the BM system use yaml config files, and thus this applyConfigProps be common? Prob. not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't think that would work. We do have active users on the bare metal side of things, so that configuration layout and usage of local files needs to stay in place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so, but wanted to check.
Avoiding the #ifdef _CONTAINERIZED I think still worthwhile though. It prevents hitting issues like the compile problem as both code paths are being compiled either way (hopefully there isn't too much code bloat, and BM code will slowly fade away).


if (name && *name)
{
if (stricmp(name, "metadata.broker.list") != 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: clearer to combine the 2 ifs statements and use strsame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but the else from the first if() leads to a DBGLOG that is useful. I did change the second if() to use !strisame() though. Also changed similar test on line ~146.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DBGLOG may be enough, but at some point we aim to turn them off, so should they be OWARN or similar? (or will that be too verbose

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent point. I converted a few DBGLOG calls to OWARNLOG.

{
if (stricmp(name, "metadata.broker.list") != 0)
{
if (value && *value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: !isEmptyString ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it; done (in both places).

if (configName && *configName && configPtr)
{
applyConfigProps(getGlobalConfigSP()->getPropTree("plugins/kafka"), configName, configPtr);
applyConfigProps(getComponentConfigSP()->getPropTree("plugins/kafka"), configName, configPtr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there ever any expense with calling RdKafka::Conf->set ? And if so, would it be better to compile (merge) the global/component configs, and call applyConfigProps just once ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. could do:

                Owned<IPropertyTree> globalCopy = createPTreeFromIPT(getGlobalConfigSP()->queryPropTree("plugins/kafka"));
                synchronizePTree(globalCopy, getComponentConfigSP()->queryPropTree("plugins/kafka"), false);
                applyConfigProps(globalCopy, configName, configPtr);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is very little expense in Conf->set; it is updating some C struct values. I would leave this version as-is, especially since it is not on a hot path (used only during setup, not when passing messages).

{
if (configName && *configName && configPtr)
{
applyConfigProps(getGlobalConfigSP()->getPropTree("plugins/kafka"), configName, configPtr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is leaking the IPT from getPropTree, should be queryPropTree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -54,24 +58,80 @@ namespace KafkaPlugin
// Static Methods (internal)
//--------------------------------------------------------------------------

#ifdef _CONTAINERIZED
static void applyConfigProps(IPropertyTree * props, const char* configName, RdKafka::Conf* configPtr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: perhaps make 'props' const for clarity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea; done.

@@ -495,6 +560,7 @@ namespace KafkaPlugin
consumerPtr = NULL;
topicPtr = NULL;

#ifndef _CONTAINERIZED
char cpath[_MAX_DIR];

GetCurrentDirectory(_MAX_DIR, cpath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll get a compiler warning because not checking the return value for this (getpwd).
(we validate return elsewhere)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a check with an exception if the cwd couldn't be found.

* @param configPtr A pointer to the configuration object that
* will receive any found parameters
*/
static void applyConfig(const char* configName, RdKafka::Conf* configPtr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than having 2 'static void applyConfig(const char* configName, RdKafka::Conf* configPtr)' definitions, I'd vote for having the some isContainerized() code 1.

/**
* If the offset file does not exist, create one with a
* default offset
*/
void initFileOffsetIfNotExist() const;
void _initFileOffsetIfNotExist() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is what's causing the compile error issues (in BM).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that was a rookie mistake. Fixed.

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for now, but meant to ask.
How long are the Consumer/Publisher instances expected to last?
Max = lifetime of a query?

If beyond the lifetime of a query, then might need to think about refreshing the configuration, when a component configuration is pushed.
We have 'installConfigUpdateHook' (and CConfigUpdateHook) for that purpose, registered and called back to perform update when the config changes.

@dcamper
Copy link
Contributor Author

dcamper commented Feb 6, 2025

Not for now, but meant to ask. How long are the Consumer/Publisher instances expected to last? Max = lifetime of a query?

If beyond the lifetime of a query, then might need to think about refreshing the configuration, when a component configuration is pushed. We have 'installConfigUpdateHook' (and CConfigUpdateHook) for that purpose, registered and called back to perform update when the config changes.

Publishers can hang around until module destruction; the underlying library maintains its own cache with another thread to push messages, so it needs time to drain pending messages. Consumers last only as long as the query. I suspect that this is correct behavior and that pushed (containerized) config changes would get picked up during the next query.

@dcamper dcamper requested a review from jakesmith February 6, 2025 18:00
Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcamper - looks good. Please squash.

Is 9.8 the right target (needed there?) or is 9.10 more suitable?

@dcamper dcamper force-pushed the hpcc-33304-containerized-kafka branch from 1a72522 to f1dca9b Compare February 6, 2025 18:41
@dcamper
Copy link
Contributor Author

dcamper commented Feb 6, 2025

@dcamper - looks good. Please squash.

Is 9.8 the right target (needed there?) or is 9.10 more suitable?

9.8 if possible, for external customers who have not yet migrated to 9.10 but are trying containerized. There is no new functionality in this change, just pulling the same configurations from a different source.

I will defer to you however and rebase if 9.10 is more appropriate.

Squashed.

@dcamper dcamper force-pushed the hpcc-33304-containerized-kafka branch from f1dca9b to 9e8b0dd Compare February 6, 2025 19:07
@jakesmith jakesmith self-requested a review February 7, 2025 10:11
@@ -431,6 +431,11 @@
"type": "object",
"additionalProperties": { "type": "string" },
"description": "Global component annotations, generated into all components"
},
"plugins": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this definition is under "globals", it should be under "definitions".
And this should be a reference to the definition just as it is under the engine schemas.

Copy link
Member

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcamper - 2 of the github action checks failing are relevant - both are related to the values schema.

@@ -3511,6 +3523,11 @@
"expert": {
"description": "Settings for developers, debugging and testing",
"type": "object"
}
},
"plugins": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation level is off.

@dcamper
Copy link
Contributor Author

dcamper commented Feb 7, 2025

Back to you, Jake. There are two failing tests but AFAICS they don't relate to these changes.

@dcamper dcamper requested a review from jakesmith February 7, 2025 16:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants