Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Allow KafkaRoller talk to controller directly (#10016)" from 0.45.x release #10944

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,7 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
protected static final String REPLICATION_PORT_NAME = "tcp-replication";
protected static final int KAFKA_AGENT_PORT = 8443;
protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent";
/**
* Port number used for control plane
*/
public static final int CONTROLPLANE_PORT = 9090;
protected static final int CONTROLPLANE_PORT = 9090;
protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,10 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return mockAdminClient;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
Expand Down Expand Up @@ -160,43 +159,6 @@ private static AdminClientProvider givenControllerFutureFailsWithTimeout() {
return mock;
}

@Test
public void testTalkingToControllersLatestVersion(VertxTestContext testContext) {
PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), KafkaVersionTestUtils.getLatestVersion().version());
AdminClientProvider mock = mock(AdminClientProvider.class);
when(mock.createControllerAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap controllers"));

TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps,
noException(), null, noException(), noException(), noException(),
brokerId -> succeededFuture(true),
true, mock, mockKafkaAgentClientProvider(), true, null, -1);

// When admin client cannot be created for a controller node, we expect it to be force restarted.
doSuccessfulRollingRestart(testContext, kafkaRoller,
asList(0),
asList(0));
}

@Test
public void testTalkingToControllersWithOldVersion(VertxTestContext testContext) throws InterruptedException {
PodOperator podOps = mockPodOpsWithVersion(podId -> succeededFuture(), "3.8.0");

AdminClientProvider mock = mock(AdminClientProvider.class);
when(mock.createAdminClient(anyString(), any(), any())).thenThrow(new RuntimeException("An error while try to create an admin client with bootstrap brokers"));

TestingKafkaRoller kafkaRoller = new TestingKafkaRoller(addKraftPodNames(0, 0, 1), podOps,
noException(), null, noException(), noException(), noException(),
brokerId -> succeededFuture(true),
true, mock, mockKafkaAgentClientProvider(), true, null, -1);

// If the controller has older version (< 3.9.0), we should only be creating admin client for brokers
// and when the operator cannot connect to brokers, we expect to fail initialising KafkaQuorumCheck
doFailingRollingRestart(testContext, kafkaRoller,
asList(0),
KafkaRoller.UnforceableProblem.class, "KafkaQuorumCheck cannot be initialised for c-kafka-0/0 because none of the brokers do not seem to responding to connection attempts",
emptyList());
}

private static KafkaAgentClientProvider mockKafkaAgentClientProvider() {
return mock(KafkaAgentClientProvider.class);
}
Expand Down Expand Up @@ -835,17 +797,12 @@ public void clearRestarted() {
}

private PodOperator mockPodOps(Function<Integer, Future<Void>> readiness) {
return mockPodOpsWithVersion(readiness, KafkaVersionTestUtils.getLatestVersion().version());
}

private PodOperator mockPodOpsWithVersion(Function<Integer, Future<Void>> readiness, String version) {
PodOperator podOps = mock(PodOperator.class);
when(podOps.get(any(), any())).thenAnswer(
invocation -> new PodBuilder()
.withNewMetadata()
.withNamespace(invocation.getArgument(0))
.withName(invocation.getArgument(1))
.addToAnnotations(KafkaCluster.ANNO_STRIMZI_IO_KAFKA_VERSION, version)
.withNamespace(invocation.getArgument(0))
.withName(invocation.getArgument(1))
.endMetadata()
.build()
);
Expand Down Expand Up @@ -944,33 +901,9 @@ KafkaAgentClient initKafkaAgentClient() {
}

@Override
protected Admin brokerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
if (delegateAdminClientCall) {
return super.brokerAdminClient(nodes);
}
RuntimeException exception = acOpenException.apply(nodes);
if (exception != null) {
throw new ForceableProblem("An error while try to create the admin client", exception);
}
Admin ac = mock(AdminClient.class, invocation -> {
if ("close".equals(invocation.getMethod().getName())) {
Admin mock = (Admin) invocation.getMock();
unclosedAdminClients.remove(mock);
if (acCloseException != null) {
throw acCloseException;
}
return null;
}
throw new RuntimeException("Not mocked " + invocation.getMethod());
});
unclosedAdminClients.put(ac, new Throwable("Pod " + nodes));
return ac;
}

@Override
protected Admin controllerAdminClient(Set<NodeRef> nodes) throws ForceableProblem, FatalProblem {
protected Admin adminClient(Set<NodeRef> nodes, boolean b) throws ForceableProblem, FatalProblem {
if (delegateAdminClientCall) {
return super.controllerAdminClient(nodes);
return super.adminClient(nodes, b);
}
RuntimeException exception = acOpenException.apply(nodes);
if (exception != null) {
Expand Down Expand Up @@ -1012,7 +945,7 @@ Future<Boolean> canRoll(int podId) {
}

@Override
protected KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) {
protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
Admin admin = mock(Admin.class);
DescribeMetadataQuorumResult qrmResult = mock(DescribeMetadataQuorumResult.class);
when(admin.describeMetadataQuorum()).thenReturn(qrmResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,20 +616,10 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return adminClientSupplier.get();
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return adminClientSupplier.get();
}

@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return adminClientSupplier.get();
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
return adminClientSupplier.get();
}
};

return new ResourceOperatorSupplier(vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public interface AdminClientProvider {

/**
* Create a Kafka Admin interface instance for brokers
* Create a Kafka Admin interface instance
*
* @param bootstrapHostnames Kafka hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
Expand All @@ -26,17 +26,7 @@ public interface AdminClientProvider {
Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity);

/**
* Create a Kafka Admin interface instance for controllers
*
* @param controllerBootstrapHostnames Kafka controller hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
* @return Instance of Kafka Admin interface
*/
Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity);

/**
* Create a Kafka Admin interface instance for brokers
* Create a Kafka Admin interface instance
*
* @param bootstrapHostnames Kafka hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
Expand All @@ -46,16 +36,4 @@ public interface AdminClientProvider {
* @return Instance of Kafka Admin interface
*/
Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config);

/**
* Create a Kafka Admin interface instance for controllers
*
* @param controllerBootstrapHostnames Kafka hostname to connect to for administration operations
* @param kafkaCaTrustSet Trust set for connecting to Kafka
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
* @param config Additional configuration for the Kafka Admin Client
*
* @return Instance of Kafka Admin interface
*/
Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
}

/**
* Create a Kafka Admin interface instance handling the following different scenarios:
*
Expand All @@ -49,30 +44,26 @@ public Admin createControllerAdminClient(String controllerBootstrapHostnames, Pe
*/
@Override
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames);
return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config));
}

@Override
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
config.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, controllerBootstrapHostnames);
return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config));
return Admin.create(adminClientConfiguration(bootstrapHostnames, kafkaCaTrustSet, authIdentity, config));
}

/**
* Utility method for preparing the Admin client configuration
*
* @param bootstrapHostnames Kafka bootstrap address
* @param kafkaCaTrustSet Trust set for connecting to Kafka
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
* @param config Custom Admin client configuration or empty properties instance
*
* @return Admin client configuration
*/
/* test */ Properties adminClientConfiguration(PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
/* test */ static Properties adminClientConfiguration(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
if (config == null) {
throw new InvalidConfigurationException("The config parameter should not be null");
}

config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames);

// configuring TLS encryption if requested
if (kafkaCaTrustSet != null) {
config.putIfAbsent(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL");
Expand Down
Loading
Loading