Skip to content

Commit

Permalink
Add option to use global openshift kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
lfabriko committed May 30, 2023
1 parent 221928b commit 9a351f8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class TestConfiguration extends Configuration {
public static final String STREAM_LOGS = "stream.logs";
public static final String JIRA_ALLOWED_RESOLUTIONS = "jira.allowed.resolutions";
public static final String PARALLEL = "test.parallel";
public static final String TEST_USE_GLOBAL_OPENSHIFT_KAFKA = "test.use.global.openshift.kafka";

public static final String VARIABLE_PLACEHOLDER_START = "\\$\\{";
public static final String VARIABLE_PLACEHOLDER_END = "\\}";
Expand Down Expand Up @@ -161,4 +162,8 @@ public static Set<String> jiraAllowedResolutions() {
public static boolean parallel() {
return getBoolean(PARALLEL, false);
}

public static boolean useGlobalOpenshiftKafka() {
return getBoolean(TEST_USE_GLOBAL_OPENSHIFT_KAFKA, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public Predicate<Pod> podSelector() {
@Override
public void undeploy() {
// https://github.com/strimzi/strimzi-kafka-operator/issues/5042
if (!TestConfiguration.skipTearDownOpenshiftAMQStreams()) {
KAFKA_CRD_CLIENT.withName(name()).withPropagationPolicy(DeletionPropagation.BACKGROUND).delete();
OpenShiftWaiters.get(OpenshiftClient.get(), () -> false).areNoPodsPresent("strimzi.io/cluster", name())
.timeout(120_000).waitFor();
deleteSubscription(() -> OpenshiftClient.get().getLabeledPods("strimzi.io/kind", "cluster-operator").isEmpty());
if (!usePreparedGlobalInstallation()) {
if (!TestConfiguration.skipTearDownOpenshiftAMQStreams()) {
KAFKA_CRD_CLIENT.withName(name()).withPropagationPolicy(DeletionPropagation.BACKGROUND).delete();
OpenShiftWaiters.get(OpenshiftClient.get(), () -> false).areNoPodsPresent("strimzi.io/cluster", name())
.timeout(120_000).waitFor();
deleteSubscription(() -> OpenshiftClient.get().getLabeledPods("strimzi.io/kind", "cluster-operator").isEmpty());
}
}
}

Expand All @@ -101,8 +103,10 @@ public void openResources() {

@Override
public void create() {
createSubscription();
deployKafkaCR();
if (!usePreparedGlobalInstallation()) { // could be: if (prepareGlobalKafka || !usePreparedGlobalInstallation)
createSubscription();
deployKafkaCR();
}
}

/**
Expand All @@ -120,6 +124,7 @@ public List<EnvVar> getOperatorEnvVariables() {
public boolean isReady() {
try {
return KAFKA_CRD_CLIENT
.inNamespace(targetNamespace())
.withName(name())
.get()
.getStatus().getConditions()
Expand All @@ -135,8 +140,8 @@ public boolean isReady() {

@Override
public boolean isDeployed() {
return OpenshiftClient.get().getLabeledPods("name", "amq-streams-cluster-operator").size() != 0
&& KAFKA_CRD_CLIENT.withName(name()).get() != null;
return OpenshiftClient.get().inNamespace(targetNamespace(), c -> c.getLabeledPods("name", "amq-streams-cluster-operator").size() != 0
&& KAFKA_CRD_CLIENT.inNamespace(targetNamespace()).withName(name()).get() != null);
}

@Override
Expand Down Expand Up @@ -183,7 +188,7 @@ private void deployKafkaCR() {
.build();
//@formatter:on

KAFKA_CRD_CLIENT.createOrReplace(kafka);
KAFKA_CRD_CLIENT.inNamespace(targetNamespace()).createOrReplace(kafka);
}

@Override
Expand All @@ -197,7 +202,7 @@ public String bootstrapSSLServers() {
}

private String findBootstrapServers(String listnerType) {
return KAFKA_CRD_CLIENT.withName(name()).get().getStatus().getListeners()
return KAFKA_CRD_CLIENT.inNamespace(targetNamespace()).withName(name()).get().getStatus().getListeners()
.stream()
.filter(l -> listnerType.equals(l.getType()))
.findFirst().get().getBootstrapServers();
Expand All @@ -217,7 +222,7 @@ public void createTopic(String name, int partitions, int replicas) {
.endSpec()
.build();
//@formatter:on
KAFKA_TOPIC_CRD_CLIENT.createOrReplace(kafkaTopic);
KAFKA_TOPIC_CRD_CLIENT.inNamespace(targetNamespace()).createOrReplace(kafkaTopic);
}

private void createBasicUser() { // via https://access.redhat.com/documentation/en-us/red_hat_amq/2021
Expand Down Expand Up @@ -249,10 +254,12 @@ public void cleanup() {

public void extractCertificate() {
LOG.debug("Extracting kafka certificate");
String cert = new String(Base64.getDecoder()
.decode(OpenshiftClient.get().secrets().withName(name() + "-cluster-ca-cert").get().getData().get("ca.crt")));
String cert = new String(Base64.getDecoder() // created while installation of kafka in target namespace
.decode(OpenshiftClient.get().inNamespace(targetNamespace(), c -> c.secrets().withName(name() + "-cluster-ca-cert")).get().getData()
.get("ca.crt")));
String password = new String(Base64.getDecoder()
.decode(OpenshiftClient.get().secrets().withName(name() + "-cluster-ca-cert").get().getData().get("ca.password")));
.decode(OpenshiftClient.get().inNamespace(targetNamespace(), c -> c.secrets().withName(name() + "-cluster-ca-cert")).get().getData()
.get("ca.password")));
account().setTrustStorePassword(password);
try {
KeyStore ks = KeyStore.getInstance("PKCS12");
Expand Down Expand Up @@ -282,4 +289,13 @@ private void connectionProperties() {
public String operatorName() {
return "amq-streams";
}

private boolean usePreparedGlobalInstallation() {
return TestConfiguration.useGlobalOpenshiftKafka();
}

@Override
public String targetNamespace() {
return usePreparedGlobalInstallation() ? "openshift-operators" : OpenshiftClient.get().getNamespace();
}
}

0 comments on commit 9a351f8

Please sign in to comment.