Skip to content

Commit

Permalink
Fix quota manager integration (#581)
Browse files Browse the repository at this point in the history
* Fix: Quota manager is not called in main Julie Ops plan update

* Fix: Bump main docker compose testing file to 7.5.0 and manage Mac M1 testing

* Feat/Fix: use Actions for quotas instead of direct processing for dryRun process and implement quota deletion.
  • Loading branch information
ludovic-boutros authored Feb 1, 2024
1 parent b665808 commit f1fe064
Show file tree
Hide file tree
Showing 14 changed files with 472 additions and 86 deletions.
48 changes: 24 additions & 24 deletions docker/rbac-sasl/create-roles.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,52 +30,52 @@ C3=c3-cluster
################################### SETUP SUPERUSER ###################################
echo "Creating Super User role bindings"

confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $SUPER_USER_PRINCIPAL \
--role SystemAdmin \
--kafka-cluster-id $KAFKA_CLUSTER_ID
--kafka-cluster $KAFKA_CLUSTER_ID

confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $SUPER_USER_PRINCIPAL \
--role SystemAdmin \
--kafka-cluster-id $KAFKA_CLUSTER_ID \
--schema-registry-cluster-id $SR
--kafka-cluster $KAFKA_CLUSTER_ID \
--schema-registry-cluster $SR

confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $SUPER_USER_PRINCIPAL \
--role SystemAdmin \
--kafka-cluster-id $KAFKA_CLUSTER_ID \
--connect-cluster-id $CONNECT
--kafka-cluster $KAFKA_CLUSTER_ID \
--connect-cluster $CONNECT

################################### SCHEMA REGISTRY ###################################
echo "Creating Schema Registry role bindings"

# SecurityAdmin on SR cluster itself
confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $SR_PRINCIPAL \
--role SecurityAdmin \
--kafka-cluster-id $KAFKA_CLUSTER_ID \
--schema-registry-cluster-id $SR
--kafka-cluster $KAFKA_CLUSTER_ID \
--schema-registry-cluster $SR

# ResourceOwner for groups and topics on broker
for resource in Topic:_schemas Group:schema-registry
do
confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $SR_PRINCIPAL \
--role ResourceOwner \
--resource $resource \
--kafka-cluster-id $KAFKA_CLUSTER_ID
--kafka-cluster $KAFKA_CLUSTER_ID
done

################################### CONNECT ###################################
echo "Creating Connect role bindings"

# SecurityAdmin on the connect cluster itself
confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $CONNECT_PRINCIPAL \
--role SecurityAdmin \
--kafka-cluster-id $KAFKA_CLUSTER_ID \
--connect-cluster-id $CONNECT
--kafka-cluster $KAFKA_CLUSTER_ID \
--connect-cluster $CONNECT

# ResourceOwner for groups and topics on broker
declare -a ConnectResources=(
Expand All @@ -88,37 +88,37 @@ declare -a ConnectResources=(
)
for resource in ${ConnectResources[@]}
do
confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $CONNECT_PRINCIPAL \
--role ResourceOwner \
--resource $resource \
--kafka-cluster-id $KAFKA_CLUSTER_ID
--kafka-cluster $KAFKA_CLUSTER_ID
done

################################### C3 ###################################
echo "Creating C3 role bindings"

# C3 only needs SystemAdmin on the kafka cluster itself
confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $C3_PRINCIPAL \
--role SystemAdmin \
--kafka-cluster-id $KAFKA_CLUSTER_ID
--kafka-cluster $KAFKA_CLUSTER_ID

################################### OTHER ROLE ###################################

confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $OTHER_PRINCIPAL \
--role DeveloperWrite \
--resource "Topic:connect-configs" \
--kafka-cluster-id $KAFKA_CLUSTER_ID
--kafka-cluster $KAFKA_CLUSTER_ID


confluent iam rolebinding create \
confluent iam rbac role-binding create \
--principal $OTHER_PRINCIPAL \
--role ResourceOwner \
--resource "Topic:zaragoza." \
--prefix \
--kafka-cluster-id $KAFKA_CLUSTER_ID
--kafka-cluster $KAFKA_CLUSTER_ID

echo "Finished setting up role bindings"
echo " kafka cluster id: $KAFKA_CLUSTER_ID"
Expand Down
11 changes: 6 additions & 5 deletions docker/rbac-sasl/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ services:
image: rroemhild/test-openldap
hostname: openldap
container_name: openldap
platform: linux/amd64
ports:
- "10389:10389"
privileged: true

zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
Expand All @@ -31,7 +32,7 @@ services:
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-server:7.0.1
image: confluentinc/cp-server:7.5.0
hostname: broker
container_name: broker
depends_on:
Expand Down Expand Up @@ -159,7 +160,7 @@ services:
# CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

schema-registry:
image: confluentinc/cp-schema-registry:7.0.1
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
container_name: schema-registry
ports:
Expand Down Expand Up @@ -199,7 +200,7 @@ services:
SCHEMA_REGISTRY_PUBLIC_KEY_PATH: /tmp/conf/public.pem

connect:
image: confluentinc/cp-server-connect:7.0.1
image: confluentinc/cp-server-connect:7.5.0
hostname: connect
container_name: connect
depends_on:
Expand Down Expand Up @@ -292,7 +293,7 @@ services:
metadataServerUrls="http://broker:8090";
control-center:
image: confluentinc/cp-enterprise-control-center:7.0.1
image: confluentinc/cp-enterprise-control-center:7.5.0
hostname: control-center
container_name: control-center
depends_on:
Expand Down
10 changes: 9 additions & 1 deletion docs/handling-delete.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@ The user can control topic deletion by:
- setting the *allow.delete.topics* configuration in the provided file to the tool.
- set the ENV variable *ALLOW_DELETE_TOPICS* when calling the tool from the CLI.

Bindings deletion flag
Quotas deletion flag
^^^^^^^^^^^
The user can control quota deletion by:

- setting the *allow.delete.quotas* configuration in the provided file to the tool.
- set the ENV variable *ALLOW_DELETE_QUOTAS* when calling the tool from the CLI.
- **Warning**: no prefix management is implemented, therefore quotas must be managed in one unique file.

Bindings deletion flag
^^^^^^^^^^^
$
The user can control bindings deletion by:

- setting the *allow.delete.bindings* configuration in the provided file to the tool.
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ public boolean isAllowDeleteTopics() {
return config.getBoolean(ALLOW_DELETE_TOPICS);
}

public boolean isAllowDeleteQuotas() {
return config.getBoolean(ALLOW_DELETE_QUOTAS);
}

public boolean isAllowDeleteBindings() {
return config.getBoolean(ALLOW_DELETE_BINDINGS);
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class Constants {
public static final String OPTIMIZED_ACLS_CONFIG = "topology.acls.optimized";

public static final String ALLOW_DELETE_TOPICS = "allow.delete.topics";
public static final String ALLOW_DELETE_QUOTAS = "allow.delete.quotas";
public static final String ALLOW_DELETE_BINDINGS = "allow.delete.bindings";
static final String ALLOW_DELETE_PRINCIPALS = "allow.delete.principals";
public static final String ALLOW_DELETE_CONNECT_ARTEFACTS = "allow.delete.artefacts.connect";
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/com/purbon/kafka/topology/JulieOps.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.purbon.kafka.topology.audit.Auditor;
import com.purbon.kafka.topology.exceptions.ValidationException;
import com.purbon.kafka.topology.model.Topology;
import com.purbon.kafka.topology.quotas.QuotasManager;
import com.purbon.kafka.topology.schemas.SchemaRegistryManager;
import com.purbon.kafka.topology.serviceAccounts.VoidPrincipalProvider;
import io.confluent.kafka.schemaregistry.SchemaProvider;
Expand Down Expand Up @@ -43,6 +44,7 @@ public class JulieOps implements AutoCloseable {
private AccessControlManager accessControlManager;
private KafkaConnectArtefactManager connectorManager;
private KSqlArtefactManager kSqlArtefactManager;
private QuotasManager quotasManager;
private final Map<String, Topology> topologies;
private final Configuration config;
private final PrintStream outputStream;
Expand All @@ -55,7 +57,8 @@ private JulieOps(
PrincipalUpdateManager principalUpdateManager,
PrincipalDeleteManager principalDeleteManager,
KafkaConnectArtefactManager connectorManager,
KSqlArtefactManager kSqlArtefactManager) {
KSqlArtefactManager kSqlArtefactManager,
QuotasManager quotasManager) {
this.topologies = topologies;
this.config = config;
this.topicManager = topicManager;
Expand All @@ -64,6 +67,7 @@ private JulieOps(
this.principalDeleteManager = principalDeleteManager;
this.connectorManager = connectorManager;
this.kSqlArtefactManager = kSqlArtefactManager;
this.quotasManager = quotasManager;
this.outputStream = System.out;
}

Expand Down Expand Up @@ -173,6 +177,8 @@ public static JulieOps build(
KSqlArtefactManager kSqlArtefactManager =
configureKSqlArtefactManager(config, topologyFileOrDir);

QuotasManager quotasManager = new QuotasManager(adminClient, config);

configureLogsInDebugMode(config);

return new JulieOps(
Expand All @@ -183,7 +189,8 @@ public static JulieOps build(
principalUpdateManager,
principalDeleteManager,
connectorManager,
kSqlArtefactManager);
kSqlArtefactManager,
quotasManager);
}

void run(BackendController backendController, PrintStream printStream, Auditor auditor)
Expand All @@ -202,6 +209,7 @@ void run(BackendController backendController, PrintStream printStream, Auditor a
accessControlManager.updatePlan(plan, topologies);
connectorManager.updatePlan(plan, topologies);
kSqlArtefactManager.updatePlan(plan, topologies);
quotasManager.updatePlan(plan, topologies);
// Delete users should always be last,
// avoids any unlinked acls, e.g. if acl delete or something errors then there is a link still
// from the account, and can be re-run or manually fixed more easily
Expand All @@ -217,6 +225,7 @@ void run(BackendController backendController, PrintStream printStream, Auditor a
principalUpdateManager.printCurrentState(System.out);
connectorManager.printCurrentState(System.out);
kSqlArtefactManager.printCurrentState(System.out);
quotasManager.printCurrentState(System.out);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.purbon.kafka.topology.actions.quotas;

import com.purbon.kafka.topology.actions.BaseAction;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.model.users.Quota;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class CreateQuotasAction extends BaseAction {
private final TopologyBuilderAdminClient adminClient;
private final List<Quota> quotas;

public CreateQuotasAction(TopologyBuilderAdminClient adminClient, List<Quota> quotas) {
this.adminClient = adminClient;
this.quotas = quotas;
}

@Override
public void run() throws IOException {
adminClient.assignQuotasPrincipal(quotas);
}

@Override
protected Map<String, Object> props() {
Map<String, Object> map = new LinkedHashMap<>();
map.put("Operation", getClass().getName());
map.put("Quotas", quotas);
map.put("Action", "create");
return map;
}

@Override
protected List<Map<String, Object>> detailedProps() {
return List.of(props());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.purbon.kafka.topology.actions.quotas;

import com.purbon.kafka.topology.actions.BaseAction;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
import com.purbon.kafka.topology.model.User;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class DeleteQuotasAction extends BaseAction {
private final TopologyBuilderAdminClient adminClient;
private final List<User> users;

public DeleteQuotasAction(TopologyBuilderAdminClient adminClient, List<String> users) {
this.adminClient = adminClient;
this.users = users.stream().map(User::new).collect(Collectors.toList());
}

@Override
public void run() throws IOException {
adminClient.removeQuotasPrincipal(users);
}

@Override
protected Map<String, Object> props() {
Map<String, Object> map = new LinkedHashMap<>();
map.put("Operation", getClass().getName());
map.put("Users", users);
map.put("Action", "delete");
return map;
}

@Override
protected List<Map<String, Object>> detailedProps() {
return List.of(props());
}
}
Loading

0 comments on commit f1fe064

Please sign in to comment.