Skip to content

Commit

Permalink
Merge pull request #1570 from strimzi/fix-jvm-options-in-mirror-maker
Browse files Browse the repository at this point in the history
Fix JMV options for Kafka Mirror Maker
  • Loading branch information
scholzj authored Apr 24, 2019
2 parents c48f1ae + a2e87f0 commit f7552e5
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public static KafkaMirrorMakerCluster fromCrd(KafkaMirrorMaker kafkaMirrorMaker,

kafkaMirrorMakerCluster.setLogging(kafkaMirrorMaker.getSpec().getLogging());
kafkaMirrorMakerCluster.setGcLoggingEnabled(kafkaMirrorMaker.getSpec().getJvmOptions() == null ? true : kafkaMirrorMaker.getSpec().getJvmOptions().isGcLoggingEnabled());
kafkaMirrorMakerCluster.setJvmOptions(kafkaMirrorMaker.getSpec().getJvmOptions());

Map<String, Object> metrics = kafkaMirrorMaker.getSpec().getMetrics();
if (metrics != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.SecretKeySelectorBuilder;
import io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.PodSecurityContextBuilder;
Expand Down Expand Up @@ -744,4 +746,54 @@ public void testImagePullPolicy() {
dep = kc.generateDeployment(Collections.EMPTY_MAP, true, ImagePullPolicy.IFNOTPRESENT);
assertEquals(ImagePullPolicy.IFNOTPRESENT.toString(), dep.getSpec().getTemplate().getSpec().getContainers().get(0).getImagePullPolicy());
}

@Test
public void testResources() {
Map<String, Quantity> requests = new HashMap<>(2);
requests.put("cpu", new Quantity("250m"));
requests.put("memory", new Quantity("512Mi"));

Map<String, Quantity> limits = new HashMap<>(2);
limits.put("cpu", new Quantity("500m"));
limits.put("memory", new Quantity("1024Mi"));

KafkaConnect resource = new KafkaConnectBuilder(this.resource)
.editSpec()
.withResources(new ResourceRequirementsBuilder().withLimits(limits).withRequests(requests).build())
.endSpec()
.build();
KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(resource, VERSIONS);

Deployment dep = kc.generateDeployment(Collections.EMPTY_MAP, true, null);
Container cont = dep.getSpec().getTemplate().getSpec().getContainers().get(0);
assertEquals(limits, cont.getResources().getLimits());
assertEquals(requests, cont.getResources().getRequests());
}

@Test
public void testJvmOptions() {
Map<String, String> xx = new HashMap<>(2);
xx.put("UseG1GC", "true");
xx.put("MaxGCPauseMillis", "20");

KafkaConnect resource = new KafkaConnectBuilder(this.resource)
.editSpec()
.withNewJvmOptions()
.withNewXms("512m")
.withNewXmx("1024m")
.withNewServer(true)
.withXx(xx)
.endJvmOptions()
.endSpec()
.build();
KafkaConnectCluster kc = KafkaConnectCluster.fromCrd(resource, VERSIONS);

Deployment dep = kc.generateDeployment(Collections.EMPTY_MAP, true, null);
Container cont = dep.getSpec().getTemplate().getSpec().getContainers().get(0);
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-server"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-XX:+UseG1GC"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-XX:MaxGCPauseMillis=20"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_HEAP_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-Xmx1024m"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_HEAP_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-Xms512m"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.SecretKeySelectorBuilder;
import io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.PodSecurityContextBuilder;
Expand Down Expand Up @@ -58,7 +60,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;


@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public class KafkaConnectS2IClusterTest {
private static final KafkaVersion.Lookup VERSIONS = new KafkaVersion.Lookup(new StringReader(
"2.0.0 default 2.0 2.0 1234567890abcdef"),
Expand Down Expand Up @@ -829,4 +831,54 @@ public void testImagePullPolicy() {
dep = kc.generateDeploymentConfig(Collections.EMPTY_MAP, true, ImagePullPolicy.IFNOTPRESENT);
assertEquals(ImagePullPolicy.IFNOTPRESENT.toString(), dep.getSpec().getTemplate().getSpec().getContainers().get(0).getImagePullPolicy());
}

@Test
public void testResources() {
Map<String, Quantity> requests = new HashMap<>(2);
requests.put("cpu", new Quantity("250m"));
requests.put("memory", new Quantity("512Mi"));

Map<String, Quantity> limits = new HashMap<>(2);
limits.put("cpu", new Quantity("500m"));
limits.put("memory", new Quantity("1024Mi"));

KafkaConnectS2I resource = new KafkaConnectS2IBuilder(this.resource)
.editSpec()
.withResources(new ResourceRequirementsBuilder().withLimits(limits).withRequests(requests).build())
.endSpec()
.build();
KafkaConnectS2ICluster kc = KafkaConnectS2ICluster.fromCrd(resource, VERSIONS);

DeploymentConfig dep = kc.generateDeploymentConfig(Collections.EMPTY_MAP, true, null);
Container cont = dep.getSpec().getTemplate().getSpec().getContainers().get(0);
assertEquals(limits, cont.getResources().getLimits());
assertEquals(requests, cont.getResources().getRequests());
}

@Test
public void testJvmOptions() {
Map<String, String> xx = new HashMap<>(2);
xx.put("UseG1GC", "true");
xx.put("MaxGCPauseMillis", "20");

KafkaConnectS2I resource = new KafkaConnectS2IBuilder(this.resource)
.editSpec()
.withNewJvmOptions()
.withNewXms("512m")
.withNewXmx("1024m")
.withNewServer(true)
.withXx(xx)
.endJvmOptions()
.endSpec()
.build();
KafkaConnectS2ICluster kc = KafkaConnectS2ICluster.fromCrd(resource, VERSIONS);

DeploymentConfig dep = kc.generateDeploymentConfig(Collections.EMPTY_MAP, true, null);
Container cont = dep.getSpec().getTemplate().getSpec().getContainers().get(0);
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-server"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-XX:+UseG1GC"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-XX:MaxGCPauseMillis=20"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_HEAP_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-Xmx1024m"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_HEAP_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-Xms512m"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,31 @@ public void testResources() {
assertEquals(limits, cont.getResources().getLimits());
assertEquals(requests, cont.getResources().getRequests());
}

@Test
public void testJvmOptions() {
Map<String, String> xx = new HashMap<>(2);
xx.put("UseG1GC", "true");
xx.put("MaxGCPauseMillis", "20");

KafkaMirrorMaker resource = new KafkaMirrorMakerBuilder(this.resource)
.editSpec()
.withNewJvmOptions()
.withNewXms("512m")
.withNewXmx("1024m")
.withNewServer(true)
.withXx(xx)
.endJvmOptions()
.endSpec()
.build();
KafkaMirrorMakerCluster mmc = KafkaMirrorMakerCluster.fromCrd(resource, VERSIONS);

Deployment dep = mmc.generateDeployment(Collections.EMPTY_MAP, true, null);
Container cont = dep.getSpec().getTemplate().getSpec().getContainers().get(0);
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-server"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-XX:+UseG1GC"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_JVM_PERFORMANCE_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-XX:MaxGCPauseMillis=20"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_HEAP_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-Xmx1024m"));
assertTrue(cont.getEnv().stream().filter(env -> "KAFKA_HEAP_OPTS".equals(env.getName())).map(EnvVar::getValue).findFirst().orElse("").contains("-Xms512m"));
}
}

0 comments on commit f7552e5

Please sign in to comment.