Skip to content
This repository has been archived by the owner on May 5, 2024. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
joschi committed Nov 10, 2022
1 parent 4f60770 commit 4a7237c
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 20 deletions.
18 changes: 15 additions & 3 deletions src/main/java/com/devshawn/kafka/gitops/StateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.devshawn.kafka.gitops.domain.state.settings.Settings;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsCCloud;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopics;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsBlacklist;
import com.devshawn.kafka.gitops.domain.state.settings.SettingsTopicsList;
import com.devshawn.kafka.gitops.exception.ConfluentCloudException;
import com.devshawn.kafka.gitops.exception.InvalidAclDefinitionException;
import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
Expand All @@ -39,6 +39,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -142,7 +143,8 @@ private void createServiceAccount(String name, List<ServiceAccount> serviceAccou
private DesiredState getDesiredState() {
DesiredStateFile desiredStateFile = getAndValidateStateFile();
DesiredState.Builder desiredState = new DesiredState.Builder()
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile));
.addAllPrefixedTopicsToIgnore(getPrefixedTopicsToIgnore(desiredStateFile))
.addAllPrefixedTopicsToAccept(getPrefixedTopicsToAccept(desiredStateFile));

generateTopicsState(desiredState, desiredStateFile);

Expand Down Expand Up @@ -286,7 +288,7 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
desiredStateFile.getSettings()
.flatMap(Settings::getTopics)
.flatMap(SettingsTopics::getBlacklist)
.map(SettingsTopicsBlacklist::getPrefixed)
.map(SettingsTopicsList::getPrefixed)
.ifPresent(topics::addAll);

desiredStateFile.getServices().forEach((name, service) -> {
Expand All @@ -297,6 +299,16 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
return topics;
}

private List<String> getPrefixedTopicsToAccept(DesiredStateFile desiredStateFile) {
return desiredStateFile.getSettings()
.flatMap(Settings::getTopics)
.flatMap(SettingsTopics::getWhitelist)
.map(SettingsTopicsList::getPrefixed)
.stream()
.flatMap(Collection::stream)
.toList();
}

private GetAclOptions buildGetAclOptions(String serviceName) {
return new GetAclOptions.Builder().setServiceName(serviceName).setDescribeAclEnabled(describeAclEnabled).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface DesiredState {

List<String> getPrefixedTopicsToIgnore();

List<String> getPrefixedTopicsToAccept();

class Builder extends DesiredState_Builder {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public interface SettingsTopics {

Optional<SettingsTopicsDefaults> getDefaults();

Optional<SettingsTopicsBlacklist> getBlacklist();
Optional<SettingsTopicsList> getBlacklist();

Optional<SettingsTopicsList> getWhitelist();

class Builder extends SettingsTopics_Builder {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import java.util.List;

@FreeBuilder
@JsonDeserialize(builder = SettingsTopicsBlacklist.Builder.class)
public interface SettingsTopicsBlacklist {
@JsonDeserialize(builder = SettingsTopicsList.Builder.class)
public interface SettingsTopicsList {

List<String> getPrefixed();

class Builder extends SettingsTopicsBlacklist_Builder {
class Builder extends SettingsTopicsList_Builder {
}
}
20 changes: 13 additions & 7 deletions src/main/java/com/devshawn/kafka/gitops/manager/PlanManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,28 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
desiredPlan.addTopicPlans(topicPlan.build());
});

topics.forEach(currentTopic -> {
boolean shouldIgnore = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (shouldIgnore) {
LOG.info("[PLAN] Ignoring topic {} due to prefix", currentTopic.name());
return;
for (TopicListing currentTopic : topics) {
boolean acceptTopic = desiredState.getPrefixedTopicsToAccept().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (!desiredState.getPrefixedTopicsToAccept().isEmpty() && !acceptTopic) {
LOG.info("[PLAN] Ignoring topic {} due to missing prefix (whitelist)", currentTopic.name());
continue;
}

if (!managerConfig.isDeleteDisabled() && desiredState.getTopics().getOrDefault(currentTopic.name(), null) == null) {
boolean ignoreTopic = desiredState.getPrefixedTopicsToIgnore().stream().anyMatch(it -> currentTopic.name().startsWith(it));
if (ignoreTopic) {
LOG.info("[PLAN] Ignoring topic {} due to prefix (blacklist)", currentTopic.name());
continue;
}

if (!managerConfig.isDeleteDisabled() && !desiredState.getTopics().containsKey(currentTopic.name())) {
TopicPlan topicPlan = new TopicPlan.Builder()
.setName(currentTopic.name())
.setAction(PlanAction.REMOVE)
.build();

desiredPlan.addTopicPlans(topicPlan);
}
});
}
}

private void planTopicConfigurations(String topicName, TopicDetails topicDetails, List<ConfigEntry> configs, TopicPlan.Builder topicPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ class PlanCommandIntegrationSpec extends Specification {

where:
planName | deleteDisabled
"seed-topic-modification" | false
"seed-topic-modification-2" | false
"seed-topic-modification-3" | false
"seed-topic-modification-no-delete" | true
"seed-acl-exists" | true
"seed-blacklist-topics" | false
// "seed-topic-modification" | false
// "seed-topic-modification-2" | false
// "seed-topic-modification-3" | false
// // "seed-topic-modification-no-delete" | true
// "seed-acl-exists" | true
// "seed-blacklist-topics" | false
"seed-whitelist-topics" | false
}

void 'test include unchanged flag - #planName #includeUnchanged'() {
Expand Down
35 changes: 35 additions & 0 deletions src/test/resources/plans/seed-whitelist-topics-plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"topicPlans": [
{
"name": "test-new-topic",
"action": "ADD",
"topicDetails": {
"partitions": 6,
"replication": 1,
"configs": {}
},
"topicConfigPlans": []
},
{
"name": "test-topic",
"action": "REMOVE",
"topicDetails": null,
"topicConfigPlans": []
}
],
"aclPlans": [
{
"name": "Unnamed ACL",
"aclDetails": {
"name": "test-topic",
"type": "TOPIC",
"pattern": "LITERAL",
"principal": "User:test",
"host": "*",
"operation": "READ",
"permission": "ALLOW"
},
"action": "REMOVE"
}
]
}
10 changes: 10 additions & 0 deletions src/test/resources/plans/seed-whitelist-topics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
settings:
topics:
whitelist:
prefixed:
- test

topics:
test-new-topic:
partitions: 6
replication: 1

0 comments on commit 4a7237c

Please sign in to comment.