diff --git a/CHANGELOG.md b/CHANGELOG.md
index ecf46c7b6..ccf90e9fa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,7 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [0.4.2] - SNAPSHOT
### Changed
--
+- Stop Stream Creation for existing topics if topic configs don't match (#52)
## [0.4.1] - 20190102
### Added
diff --git a/core/pom.xml b/core/pom.xml
index f66d8dfd2..5d18072bd 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -246,6 +246,32 @@
kafka-schema-registry
test
+
+ org.powermock
+ powermock-module-junit4
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ test
+
+
+ org.powermock
+ powermock-core
+ test
+
+
+ net.bytebuddy
+ byte-buddy-agent
+ test
+
+
+ net.bytebuddy
+ byte-buddy
+ test
+
+
diff --git a/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java b/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java
index 06c5031cd..340ab8f16 100644
--- a/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java
+++ b/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java
@@ -19,6 +19,7 @@
import lombok.Data;
+// TODO Need documentation
@Data
public class KafkaProducerConfig {
diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java
index 069a17b30..409335326 100644
--- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java
+++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java
@@ -20,8 +20,15 @@
// TODO need javadoc for KafkaManager
// TODO Why do we have imperative operations here in stream registry? Does this limit composability? Can we reduce scope?
+// TODO Need to consider merging this with StreamInfrastructureManager to keep it stream-platform agnostic.
public interface KafkaManager {
-
- void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig);
-
+ /**
+ * Creates topics in underlying implementation of KafkaManager provider.
+ * @param topics topics to creates
+ * @param partitions number of partitions for each of those topics
+ * @param replicationFactor replicationFactor for each of those topics
+ * @param topicConfig topic config to use for each of these topics
+ * @param isNewStream whether or not this invocation results from existing or new stream in stream registry.
+ */
+ void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream);
}
diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java
index 15d1a9eb7..05d1ab5e2 100644
--- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java
+++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java
@@ -15,36 +15,56 @@
*/
package com.homeaway.streamingplatform.db.dao.impl;
-import static java.util.stream.Collectors.toList;
-
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.ConfigType;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.TopicExistsException;
import com.homeaway.streamingplatform.configuration.KafkaProducerConfig;
import com.homeaway.streamingplatform.db.dao.KafkaManager;
+import com.homeaway.streamingplatform.exceptions.StreamCreationException;
@Slf4j
public class KafkaManagerImpl implements KafkaManager {
+ private static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() {
+ private static final long serialVersionUID = -7377105429359314831L; {
+
+ put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, true);
+ put(KafkaProducerConfig.ZOOKEEPER_QUORUM, true);
+ }};
+
+ // internal state
+
+
+ private ZkUtils initZkUtils(Properties config) {
+ String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM);
+ ZkClient zkClient = new ZkClient(zkConnect);
+ zkClient.setZkSerializer(ZKStringSerializer$.MODULE$);
+ ZkConnection zkConnection = new ZkConnection(zkConnect);
+ ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
+ return zkUtils;
+ }
+
+ public void shutdownZkUtils(ZkUtils zkUtils) {
+ try {
+ zkUtils.close();
+ } catch (RuntimeException exception) {
+ log.error("Unexpected exception caught during zkUtils shutdown.", exception);
+ }
+ }
/**
* Create and/or Update Topics using AdminClient and AdminUtils
@@ -54,39 +74,68 @@ public class KafkaManagerImpl implements KafkaManager {
* @param replicationFactor replication for each topic that will be created
* @param properties properties that will be set on each topic in the list
*/
- public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties) {
- // remove client connection properties to leave only topic configs
- Map topicConfigMap = new HashMap<>(properties.entrySet()
- .stream()
- .filter(e -> !e.getKey().equals(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))
- .filter(e -> !e.getKey().equals(KafkaProducerConfig.ZOOKEEPER_QUORUM))
- .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString())));
-
- String zkConnect = properties.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM);
- ZkClient zkClient = new ZkClient(zkConnect);
- zkClient.setZkSerializer(ZKStringSerializer$.MODULE$);
- ZkConnection zkConnection = new ZkConnection(zkConnect);
- ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-
+ public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream) {
+ // TODO - Can't guarantee against race conditions... should probably move to event-source paradigm to
+ // protect against this (and maybe employ optimistic locking for extra safety).
+ // The issue here is there is nothing that "locks" the underlying kafka store -- something
+ // can inevitably change the underlying store while this method is evaluating, always accounting for
+ // some amount of race window.
+
+ // TODO probably need to cache a KafkaManagerImpl per "cluster" to avoid un-necessary creation / destruction of connections
+ ZkUtils zkUtils = initZkUtils(properties);
try {
- List topicsToCreate = new ArrayList<>();
- for (String topic : topics) {
- if (AdminUtils.topicExists(zkUtils, topic)) {
- updateTopic(zkUtils, topic, topicConfigMap);
- } else {
- topicsToCreate.add(topic);
- }
- }
+ // remove client connection properties to leave only topic configs
+ Map topicConfigMap = filterPropertiesKeys(properties, TOPIC_CONFIG_KEY_FILTER);
+
+ // partition the list by whether the topic exists or not
+ Map> partitionMaps = topics.stream().collect(Collectors.partitioningBy(topic -> topicExists(zkUtils, topic)));
- createTopics(topicsToCreate, partitions, replicationFactor, properties, topicConfigMap);
+ // if it exists, update it. If it doesn't exist, create it
+ List topicsToUpdate = partitionMaps.get(true);
+ List topicsToCreate = partitionMaps.get(false);
+
+ // update any topics that are necessary
+ updateTopics(zkUtils, topicsToUpdate, topicConfigMap, isNewStream);
+
+ // now create any topics that were necessary to create this run
+ createTopics(zkUtils, topicsToCreate, partitions, replicationFactor, topicConfigMap);
} finally {
- try {
- zkClient.close();
- zkConnection.close();
- zkUtils.close();
- } catch (InterruptedException e) {
- log.info("caught an exception while closing ZK clients: {}", e.getMessage());
+ shutdownZkUtils(zkUtils);
+ }
+ }
+
+ // package scope so that PowerMock can verify
+ void updateTopics(ZkUtils zkUtils, List topicsToUpdate, Map topicConfigMap, boolean isNewStream) {
+ for (String topic : topicsToUpdate) {
+ // update topic
+ Properties actualTopicConfig = getTopicConfig(zkUtils, topic);
+ Map actualTopicConfigMap = propertiesToMap(actualTopicConfig);
+ if(actualTopicConfigMap.equals(topicConfigMap)) {
+ // NOTHING TO DO!
+ log.info("topic config for {} exactly match. Ignoring.", topic);
+ continue;
}
+
+ // NOTE: If a newly created stream is requested in Stream Registry but it is already present
+ // in the underlying streaming infrastructure... AND we got this far, this means configuration
+ // is different. We want to prevent this from actually changing the underlying infrastructure.
+ // Therefore the operation is failed with an exception.
+ //
+ // This provides a safety mechanism and a migration path by requiring folks
+ // to exactly match downstream config when the stream-registry has not "onboarded" existing topic
+ // for the first time.
+
+ // TODO Alternatively we can add a forceSync=true flag, ignoring any user provided info, and only updating SR with the underlying settings
+ // We should probably do forceSync=true anyway, as it provides a simple way to keep things in sync
+ if(isNewStream) {
+ // FIXME!! Fix exception reporting... just reporting the topic failed with no message is not a good developer experience. Consider replacing topic with message. (and include topic name in message).
+ // throw new StreamCreationException("topic: " + topic " already exists but config is different than requested!"); // consider using forceSync=true
+ throw new StreamCreationException(topic);
+ }
+
+ // If we got this far, we are "updating" an "existing" stream, and request config is different than
+ // what is in stream registry. Go ahead and update now.
+ updateTopic(zkUtils, topic, topicConfigMap);
}
}
@@ -98,27 +147,46 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map conf
// TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above
// TODO Timeout exception needs to propagate and not be handled here
- // TODO Interrupted Exception also needs to propagate and not be handled here
- private void createTopics(Collection topics, int partitions, int replicationFactor, Properties adminClientProperties, Map topicConfigMap) {
- try (AdminClient adminClient = AdminClient.create(adminClientProperties)) {
- List newTopicList = topics.stream()
- .map(topic -> {
- NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
- newTopic.configs(topicConfigMap);
- return newTopic;
- })
- .collect(toList());
-
- CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList);
- createTopicsResult.all().get();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof TopicExistsException) {
- log.info("Topic already exists !!");
- } else if (e.getCause() instanceof TimeoutException) {
- log.error("Timeout !!", e);
- }
- } catch (InterruptedException e) {
- log.error("Exception in creating topics:", e);
+ // TODO Need JavaDoc
+ // package scope so that PowerMock can verify
+ void createTopics(ZkUtils zkUtils, Collection topics, int partitions, int replicationFactor, Map topicConfigMap) {
+ for(String topic : topics) {
+ createTopic(zkUtils, topic, partitions, replicationFactor, topicConfigMap);
}
}
+
+ private void createTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Map topicConfigMap) {
+ Properties topicProperties = new Properties();
+ topicProperties.putAll(topicConfigMap);
+ AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties, RackAwareMode.Enforced$.MODULE$);
+ }
+
+ // utility methods for this class
+
+ // package scope so that PowerMock can leverage
+ @SuppressWarnings("SuspiciousMethodCalls")
+ Map filterPropertiesKeys(Properties properties, Map keyFilterMap) {
+ return new HashMap<>(properties.stringPropertyNames().stream()
+ .filter(key -> !keyFilterMap.containsKey(key))
+ .filter(key -> properties.getProperty(key) != null)
+ .collect(Collectors.toMap(key -> (String)key, key -> properties.getProperty(key))));
+ }
+
+ // package scope so that PowerMock can leverage
+ Map propertiesToMap(Properties properties) {
+ return properties.stringPropertyNames().stream()
+ .filter(key -> properties.getProperty(key) != null)
+ .collect(Collectors.toMap(key -> (String)key,
+ key -> properties.getProperty(key)));
+ }
+
+ private boolean topicExists(ZkUtils zkUtils, String topic) {
+ boolean topicExists = AdminUtils.topicExists(zkUtils, topic);
+ log.debug("topic: {} exists={}", topic, topicExists);
+ return topicExists;
+ }
+
+ private Properties getTopicConfig(ZkUtils zkUtils, String topic) {
+ return AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
+ }
}
diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java
index fe2cad427..437a514ab 100644
--- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java
+++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java
@@ -133,6 +133,7 @@ public void upsertStream(Stream stream) {
SchemaReference keyReference = schemaManager.registerSchema(keySubject, stream.getLatestKeySchema().getSchemaString());
stream.getLatestKeySchema().setId(String.valueOf(keyReference.getId()));
stream.getLatestKeySchema().setVersion(keyReference.getVersion());
+ boolean isNewStream = false;
String valueSubject = stream.getName() + "-value";
SchemaReference valueReference = schemaManager.registerSchema(valueSubject, stream.getLatestValueSchema().getSchemaString());
@@ -168,11 +169,12 @@ public void upsertStream(Stream stream) {
avroStream.setS3ConnectorList(value.get().getS3ConnectorList());
} else {
log.info("key NOT available for the stream-name={}", stream.getName());
+ isNewStream = true;
avroStream.setCreated(System.currentTimeMillis());
key = AvroStreamKey.newBuilder().setStreamName(avroStream.getName()).build();
}
- verifyAndUpsertTopics(stream);
+ verifyAndUpsertTopics(stream, isNewStream);
kafkaProducer.log(key, avroStream);
log.info("Stream upserted for {}", stream.getName());
} catch (Exception e) {
@@ -208,18 +210,18 @@ private void applyDefaultPartition(Stream stream) {
*
* @param stream the stream that will be used to verify and/or upsert topics to
*/
- private void verifyAndUpsertTopics(Stream stream) {
+ private void verifyAndUpsertTopics(Stream stream, boolean isNewStream) {
List vpcList = stream.getVpcList();
List replicatedVpcList = stream.getReplicatedVpcList();
try {
log.info("creating topics for vpcList: {}", vpcList);
for (String vpc : vpcList) {
- upsertTopics(stream, vpc);
+ upsertTopics(stream, vpc, isNewStream);
}
if (replicatedVpcList != null && !replicatedVpcList.isEmpty()) {
log.info("creating topics for replicatedVpcList: {}", replicatedVpcList);
for (String vpc : replicatedVpcList) {
- upsertTopics(stream, vpc);
+ upsertTopics(stream, vpc, isNewStream);
}
}
} catch (Exception e) {
@@ -228,7 +230,7 @@ private void verifyAndUpsertTopics(Stream stream) {
}
}
- private void upsertTopics(Stream stream, String vpc) {
+ private void upsertTopics(Stream stream, String vpc, boolean isNewStream) {
ClusterValue clusterValue = getClusterDetails(vpc, env, stream.getTags().getHint(), "producer");
Properties topicConfig = new Properties();
if (stream.getTopicConfig() != null) {
@@ -240,7 +242,7 @@ private void upsertTopics(Stream stream, String vpc) {
topicConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
topicConfig.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, zkConnect);
- kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig);
+ kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig, isNewStream);
log.info("Topic {} created/updated at {}", stream.getName(), bootstrapServer);
}
diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java
new file mode 100644
index 000000000..255651938
--- /dev/null
+++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java
@@ -0,0 +1,183 @@
+/* Copyright (c) 2018 Expedia Group.
+ * All rights reserved. http://www.homeaway.com
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.homeaway.streamingplatform.db.dao.impl;
+
+import static org.mockito.Mockito.times;
+import static org.powermock.api.mockito.PowerMockito.*;
+
+import java.util.Collections;
+import java.util.Properties;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.ConfigType;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.homeaway.streamingplatform.configuration.KafkaProducerConfig;
+import com.homeaway.streamingplatform.exceptions.StreamCreationException;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({KafkaManagerImpl.class, AdminUtils.class})
+public class KafkaManagerImplTest {
+
+ private static final String TOPIC = "kafka-manager-test";
+ private static final int PARTITIONS = 2;
+ private static final int REPLICATION_FACTOR = 3;
+ private static final Properties PROPS = new Properties();
+ private static final Properties FILTERED_PROPS = new Properties();
+ private static final Properties TOPIC_PROPS = new Properties();
+ private static final Properties TOPIC_WITH_CNXN_PROPS = new Properties();
+
+ @Mock private ZkUtils zkUtils;
+ @Mock private ZkClient zkClient;
+ @Mock private ZkConnection zkConnection;
+
+ @InjectMocks
+ private KafkaManagerImpl kafkaManager = new KafkaManagerImpl();
+
+ @Rule public MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Before
+ public void setup() throws Exception{
+ PROPS.setProperty("key1", "val1");
+ PROPS.setProperty("key2", "2"); // Properties should be strings only... not ints
+ PROPS.setProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181");
+ PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+ FILTERED_PROPS.setProperty("key1", "val1");
+ FILTERED_PROPS.setProperty("key2", "2");
+ TOPIC_PROPS.setProperty("key1", "actualVal1"); // different from "val1"
+ TOPIC_PROPS.setProperty("key2", "2");
+ TOPIC_WITH_CNXN_PROPS.putAll(TOPIC_PROPS);
+ TOPIC_WITH_CNXN_PROPS.setProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181");
+ TOPIC_WITH_CNXN_PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+
+ // setup kafkaManager
+ // kafkaManager.init(PROPS);
+ // un-necessary since powermock has setup mocks for zookeeper in KafkaManagerImpl
+
+ // make sure that the * direct-caller * of these classes are in @PrepareForTest annotation above on this test
+ whenNew(ZkClient.class).withAnyArguments().thenReturn(zkClient);
+ whenNew(ZkConnection.class).withArguments(Mockito.anyString()).thenReturn(zkConnection);
+ whenNew(ZkUtils.class).withAnyArguments().thenReturn(zkUtils);
+
+ // using power mock to allow for mocking of static classes
+ mockStatic(AdminUtils.class);
+ when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), TOPIC)).thenReturn(TOPIC_PROPS);
+ }
+
+ @Test(expected = StreamCreationException.class)
+ public void testUpsertTopicsForNewStream(){
+ // Mock it as an existing topic
+ when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true);
+
+ //New Stream
+ kafkaManager.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true);
+
+ // expecting an exception to be thrown because topic exists but request doesn't match the config
+ }
+
+ @Test
+ public void testUpsertTopicsForExistingStreamWithMatchingConfig() {
+ // Mock it as an existing topic
+ when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true);
+
+ KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager);
+
+ // Existing Stream, but PROPS match!! should not have an exception
+ kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, TOPIC_WITH_CNXN_PROPS, true);
+
+ // verify change topic DOES NOT HAPPEN because props match
+ verifyStatic(AdminUtils.class, times(0));
+ AdminUtils.changeTopicConfig(zkUtils, TOPIC, TOPIC_PROPS);
+
+ // verify create topic DOES NOT HAPPEN because props match
+ verifyStatic(AdminUtils.class, times(0));
+ AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, TOPIC_PROPS, RackAwareMode.Enforced$.MODULE$);
+ }
+
+ @Test
+ public void testUpsertTopicsForExistingStream() {
+ // Mock it as an existing topic
+ when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true);
+
+ KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager);
+
+ //Existing Stream
+ kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false);
+
+ //verify change topic happens because isNewStream=false
+ verifyStatic(AdminUtils.class, times(1));
+ AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS);
+ verifyStatic(AdminUtils.class, times(0));
+ AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$);
+ }
+
+
+ @Test
+ public void testUpsertTopicsForNewTopic() {
+ // Mock it as a new topic
+ when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(false);
+
+ KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager);
+
+ // Not existing Stream
+ kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true);
+
+ //verify create topic happens when requested topic does not exist
+ verifyStatic(AdminUtils.class, times(0));
+ AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS);
+ verifyStatic(AdminUtils.class, times(1));
+ AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$);
+ }
+
+ @Test
+ public void testUpsertTopicsForNewTopicExistsInSR() {
+ // Mock it as a new topic
+ when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(false);
+
+ KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager);
+
+ //Existing Stream
+ kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false);
+
+ // note: this is a weird case, because somehow the stream exists in SR, but the underlying topic does NOT
+ // might want to consider this corner case a bit more. Currently the behavior honors the request and creates the topic
+ // with the requested config
+
+ //verify create topic happens when requested topic does not exist
+ verifyStatic(AdminUtils.class, times(0));
+ AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS);
+ verifyStatic(AdminUtils.class, times(1));
+ AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$);
+ }
+}
diff --git a/pom.xml b/pom.xml
index 6e30bbcbf..a746ee05b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,7 @@
4.12
1.3
2.18.3
+ 2.0.0-beta.5
2.9.0
@@ -412,6 +413,37 @@
test
test
+
+ org.powermock
+ powermock-module-junit4
+ ${powermock.version}
+ test
+
+
+ org.powermock
+ powermock-api-mockito2
+ ${powermock.version}
+ test
+
+
+ org.powermock
+ powermock-core
+ ${powermock.version}
+ test
+
+
+ net.bytebuddy
+ byte-buddy-agent
+ 1.8.5
+ test
+
+
+ net.bytebuddy
+ byte-buddy
+ 1.8.5
+ test
+
+
@@ -563,6 +595,8 @@
org.hamcrest:hamcrest-core:jar:${hamcrest.version}
org.slf4j:slf4j-api:jar:${slf4j.version}
io.dropwizard:dropwizard-testing:jar:1.3.5
+ net.bytebuddy:byte-buddy:jar
+ net.bytebuddy:byte-buddy-agent