diff --git a/CHANGELOG.md b/CHANGELOG.md index ecf46c7b6..ccf90e9fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [0.4.2] - SNAPSHOT ### Changed -- +- Stop Stream Creation for existing topics if topic configs don't match (#52) ## [0.4.1] - 20190102 ### Added diff --git a/core/pom.xml b/core/pom.xml index f66d8dfd2..5d18072bd 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -246,6 +246,32 @@ kafka-schema-registry test + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + + + org.powermock + powermock-core + test + + + net.bytebuddy + byte-buddy-agent + test + + + net.bytebuddy + byte-buddy + test + + diff --git a/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java b/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java index 06c5031cd..340ab8f16 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java +++ b/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java @@ -19,6 +19,7 @@ import lombok.Data; +// TODO Need documentation @Data public class KafkaProducerConfig { diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java index 069a17b30..409335326 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java @@ -20,8 +20,15 @@ // TODO need javadoc for KafkaManager // TODO Why do we have imperative operations here in stream registry? Does this limit composability? Can we reduce scope? +// TODO Need to consider merging this with StreamInfrastructureManager to keep it stream-platform agnostic. public interface KafkaManager { - - void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig); - + /** + * Creates topics in underlying implementation of KafkaManager provider. + * @param topics topics to creates + * @param partitions number of partitions for each of those topics + * @param replicationFactor replicationFactor for each of those topics + * @param topicConfig topic config to use for each of these topics + * @param isNewStream whether or not this invocation results from existing or new stream in stream registry. + */ + void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream); } diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index 15d1a9eb7..05d1ab5e2 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -15,36 +15,56 @@ */ package com.homeaway.streamingplatform.db.dao.impl; -import static java.util.stream.Collectors.toList; - -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.ConfigType; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.TopicExistsException; import com.homeaway.streamingplatform.configuration.KafkaProducerConfig; import com.homeaway.streamingplatform.db.dao.KafkaManager; +import com.homeaway.streamingplatform.exceptions.StreamCreationException; @Slf4j public class KafkaManagerImpl implements KafkaManager { + private static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() { + private static final long serialVersionUID = -7377105429359314831L; { + + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, true); + put(KafkaProducerConfig.ZOOKEEPER_QUORUM, true); + }}; + + // internal state + + + private ZkUtils initZkUtils(Properties config) { + String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM); + ZkClient zkClient = new ZkClient(zkConnect); + zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); + ZkConnection zkConnection = new ZkConnection(zkConnect); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + return zkUtils; + } + + public void shutdownZkUtils(ZkUtils zkUtils) { + try { + zkUtils.close(); + } catch (RuntimeException exception) { + log.error("Unexpected exception caught during zkUtils shutdown.", exception); + } + } /** * Create and/or Update Topics using AdminClient and AdminUtils @@ -54,39 +74,68 @@ public class KafkaManagerImpl implements KafkaManager { * @param replicationFactor replication for each topic that will be created * @param properties properties that will be set on each topic in the list */ - public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties) { - // remove client connection properties to leave only topic configs - Map topicConfigMap = new HashMap<>(properties.entrySet() - .stream() - .filter(e -> !e.getKey().equals(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) - .filter(e -> !e.getKey().equals(KafkaProducerConfig.ZOOKEEPER_QUORUM)) - .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()))); - - String zkConnect = properties.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM); - ZkClient zkClient = new ZkClient(zkConnect); - zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); - ZkConnection zkConnection = new ZkConnection(zkConnect); - ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); - + public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream) { + // TODO - Can't guarantee against race conditions... should probably move to event-source paradigm to + // protect against this (and maybe employ optimistic locking for extra safety). + // The issue here is there is nothing that "locks" the underlying kafka store -- something + // can inevitably change the underlying store while this method is evaluating, always accounting for + // some amount of race window. + + // TODO probably need to cache a KafkaManagerImpl per "cluster" to avoid un-necessary creation / destruction of connections + ZkUtils zkUtils = initZkUtils(properties); try { - List topicsToCreate = new ArrayList<>(); - for (String topic : topics) { - if (AdminUtils.topicExists(zkUtils, topic)) { - updateTopic(zkUtils, topic, topicConfigMap); - } else { - topicsToCreate.add(topic); - } - } + // remove client connection properties to leave only topic configs + Map topicConfigMap = filterPropertiesKeys(properties, TOPIC_CONFIG_KEY_FILTER); + + // partition the list by whether the topic exists or not + Map> partitionMaps = topics.stream().collect(Collectors.partitioningBy(topic -> topicExists(zkUtils, topic))); - createTopics(topicsToCreate, partitions, replicationFactor, properties, topicConfigMap); + // if it exists, update it. If it doesn't exist, create it + List topicsToUpdate = partitionMaps.get(true); + List topicsToCreate = partitionMaps.get(false); + + // update any topics that are necessary + updateTopics(zkUtils, topicsToUpdate, topicConfigMap, isNewStream); + + // now create any topics that were necessary to create this run + createTopics(zkUtils, topicsToCreate, partitions, replicationFactor, topicConfigMap); } finally { - try { - zkClient.close(); - zkConnection.close(); - zkUtils.close(); - } catch (InterruptedException e) { - log.info("caught an exception while closing ZK clients: {}", e.getMessage()); + shutdownZkUtils(zkUtils); + } + } + + // package scope so that PowerMock can verify + void updateTopics(ZkUtils zkUtils, List topicsToUpdate, Map topicConfigMap, boolean isNewStream) { + for (String topic : topicsToUpdate) { + // update topic + Properties actualTopicConfig = getTopicConfig(zkUtils, topic); + Map actualTopicConfigMap = propertiesToMap(actualTopicConfig); + if(actualTopicConfigMap.equals(topicConfigMap)) { + // NOTHING TO DO! + log.info("topic config for {} exactly match. Ignoring.", topic); + continue; } + + // NOTE: If a newly created stream is requested in Stream Registry but it is already present + // in the underlying streaming infrastructure... AND we got this far, this means configuration + // is different. We want to prevent this from actually changing the underlying infrastructure. + // Therefore the operation is failed with an exception. + // + // This provides a safety mechanism and a migration path by requiring folks + // to exactly match downstream config when the stream-registry has not "onboarded" existing topic + // for the first time. + + // TODO Alternatively we can add a forceSync=true flag, ignoring any user provided info, and only updating SR with the underlying settings + // We should probably do forceSync=true anyway, as it provides a simple way to keep things in sync + if(isNewStream) { + // FIXME!! Fix exception reporting... just reporting the topic failed with no message is not a good developer experience. Consider replacing topic with message. (and include topic name in message). + // throw new StreamCreationException("topic: " + topic " already exists but config is different than requested!"); // consider using forceSync=true + throw new StreamCreationException(topic); + } + + // If we got this far, we are "updating" an "existing" stream, and request config is different than + // what is in stream registry. Go ahead and update now. + updateTopic(zkUtils, topic, topicConfigMap); } } @@ -98,27 +147,46 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map conf // TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above // TODO Timeout exception needs to propagate and not be handled here - // TODO Interrupted Exception also needs to propagate and not be handled here - private void createTopics(Collection topics, int partitions, int replicationFactor, Properties adminClientProperties, Map topicConfigMap) { - try (AdminClient adminClient = AdminClient.create(adminClientProperties)) { - List newTopicList = topics.stream() - .map(topic -> { - NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor); - newTopic.configs(topicConfigMap); - return newTopic; - }) - .collect(toList()); - - CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList); - createTopicsResult.all().get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof TopicExistsException) { - log.info("Topic already exists !!"); - } else if (e.getCause() instanceof TimeoutException) { - log.error("Timeout !!", e); - } - } catch (InterruptedException e) { - log.error("Exception in creating topics:", e); + // TODO Need JavaDoc + // package scope so that PowerMock can verify + void createTopics(ZkUtils zkUtils, Collection topics, int partitions, int replicationFactor, Map topicConfigMap) { + for(String topic : topics) { + createTopic(zkUtils, topic, partitions, replicationFactor, topicConfigMap); } } + + private void createTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Map topicConfigMap) { + Properties topicProperties = new Properties(); + topicProperties.putAll(topicConfigMap); + AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties, RackAwareMode.Enforced$.MODULE$); + } + + // utility methods for this class + + // package scope so that PowerMock can leverage + @SuppressWarnings("SuspiciousMethodCalls") + Map filterPropertiesKeys(Properties properties, Map keyFilterMap) { + return new HashMap<>(properties.stringPropertyNames().stream() + .filter(key -> !keyFilterMap.containsKey(key)) + .filter(key -> properties.getProperty(key) != null) + .collect(Collectors.toMap(key -> (String)key, key -> properties.getProperty(key)))); + } + + // package scope so that PowerMock can leverage + Map propertiesToMap(Properties properties) { + return properties.stringPropertyNames().stream() + .filter(key -> properties.getProperty(key) != null) + .collect(Collectors.toMap(key -> (String)key, + key -> properties.getProperty(key))); + } + + private boolean topicExists(ZkUtils zkUtils, String topic) { + boolean topicExists = AdminUtils.topicExists(zkUtils, topic); + log.debug("topic: {} exists={}", topic, topicExists); + return topicExists; + } + + private Properties getTopicConfig(ZkUtils zkUtils, String topic) { + return AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); + } } diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java index fe2cad427..437a514ab 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java @@ -133,6 +133,7 @@ public void upsertStream(Stream stream) { SchemaReference keyReference = schemaManager.registerSchema(keySubject, stream.getLatestKeySchema().getSchemaString()); stream.getLatestKeySchema().setId(String.valueOf(keyReference.getId())); stream.getLatestKeySchema().setVersion(keyReference.getVersion()); + boolean isNewStream = false; String valueSubject = stream.getName() + "-value"; SchemaReference valueReference = schemaManager.registerSchema(valueSubject, stream.getLatestValueSchema().getSchemaString()); @@ -168,11 +169,12 @@ public void upsertStream(Stream stream) { avroStream.setS3ConnectorList(value.get().getS3ConnectorList()); } else { log.info("key NOT available for the stream-name={}", stream.getName()); + isNewStream = true; avroStream.setCreated(System.currentTimeMillis()); key = AvroStreamKey.newBuilder().setStreamName(avroStream.getName()).build(); } - verifyAndUpsertTopics(stream); + verifyAndUpsertTopics(stream, isNewStream); kafkaProducer.log(key, avroStream); log.info("Stream upserted for {}", stream.getName()); } catch (Exception e) { @@ -208,18 +210,18 @@ private void applyDefaultPartition(Stream stream) { * * @param stream the stream that will be used to verify and/or upsert topics to */ - private void verifyAndUpsertTopics(Stream stream) { + private void verifyAndUpsertTopics(Stream stream, boolean isNewStream) { List vpcList = stream.getVpcList(); List replicatedVpcList = stream.getReplicatedVpcList(); try { log.info("creating topics for vpcList: {}", vpcList); for (String vpc : vpcList) { - upsertTopics(stream, vpc); + upsertTopics(stream, vpc, isNewStream); } if (replicatedVpcList != null && !replicatedVpcList.isEmpty()) { log.info("creating topics for replicatedVpcList: {}", replicatedVpcList); for (String vpc : replicatedVpcList) { - upsertTopics(stream, vpc); + upsertTopics(stream, vpc, isNewStream); } } } catch (Exception e) { @@ -228,7 +230,7 @@ private void verifyAndUpsertTopics(Stream stream) { } } - private void upsertTopics(Stream stream, String vpc) { + private void upsertTopics(Stream stream, String vpc, boolean isNewStream) { ClusterValue clusterValue = getClusterDetails(vpc, env, stream.getTags().getHint(), "producer"); Properties topicConfig = new Properties(); if (stream.getTopicConfig() != null) { @@ -240,7 +242,7 @@ private void upsertTopics(Stream stream, String vpc) { topicConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); topicConfig.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, zkConnect); - kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig); + kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig, isNewStream); log.info("Topic {} created/updated at {}", stream.getName(), bootstrapServer); } diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java new file mode 100644 index 000000000..255651938 --- /dev/null +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -0,0 +1,183 @@ +/* Copyright (c) 2018 Expedia Group. + * All rights reserved. http://www.homeaway.com + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.homeaway.streamingplatform.db.dao.impl; + +import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.*; + +import java.util.Collections; +import java.util.Properties; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.ConfigType; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import com.homeaway.streamingplatform.configuration.KafkaProducerConfig; +import com.homeaway.streamingplatform.exceptions.StreamCreationException; + +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.management.*") +@PrepareForTest({KafkaManagerImpl.class, AdminUtils.class}) +public class KafkaManagerImplTest { + + private static final String TOPIC = "kafka-manager-test"; + private static final int PARTITIONS = 2; + private static final int REPLICATION_FACTOR = 3; + private static final Properties PROPS = new Properties(); + private static final Properties FILTERED_PROPS = new Properties(); + private static final Properties TOPIC_PROPS = new Properties(); + private static final Properties TOPIC_WITH_CNXN_PROPS = new Properties(); + + @Mock private ZkUtils zkUtils; + @Mock private ZkClient zkClient; + @Mock private ZkConnection zkConnection; + + @InjectMocks + private KafkaManagerImpl kafkaManager = new KafkaManagerImpl(); + + @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + public void setup() throws Exception{ + PROPS.setProperty("key1", "val1"); + PROPS.setProperty("key2", "2"); // Properties should be strings only... not ints + PROPS.setProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); + PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + FILTERED_PROPS.setProperty("key1", "val1"); + FILTERED_PROPS.setProperty("key2", "2"); + TOPIC_PROPS.setProperty("key1", "actualVal1"); // different from "val1" + TOPIC_PROPS.setProperty("key2", "2"); + TOPIC_WITH_CNXN_PROPS.putAll(TOPIC_PROPS); + TOPIC_WITH_CNXN_PROPS.setProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); + TOPIC_WITH_CNXN_PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + + // setup kafkaManager + // kafkaManager.init(PROPS); + // un-necessary since powermock has setup mocks for zookeeper in KafkaManagerImpl + + // make sure that the * direct-caller * of these classes are in @PrepareForTest annotation above on this test + whenNew(ZkClient.class).withAnyArguments().thenReturn(zkClient); + whenNew(ZkConnection.class).withArguments(Mockito.anyString()).thenReturn(zkConnection); + whenNew(ZkUtils.class).withAnyArguments().thenReturn(zkUtils); + + // using power mock to allow for mocking of static classes + mockStatic(AdminUtils.class); + when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), TOPIC)).thenReturn(TOPIC_PROPS); + } + + @Test(expected = StreamCreationException.class) + public void testUpsertTopicsForNewStream(){ + // Mock it as an existing topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true); + + //New Stream + kafkaManager.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true); + + // expecting an exception to be thrown because topic exists but request doesn't match the config + } + + @Test + public void testUpsertTopicsForExistingStreamWithMatchingConfig() { + // Mock it as an existing topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + + // Existing Stream, but PROPS match!! should not have an exception + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, TOPIC_WITH_CNXN_PROPS, true); + + // verify change topic DOES NOT HAPPEN because props match + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, TOPIC_PROPS); + + // verify create topic DOES NOT HAPPEN because props match + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, TOPIC_PROPS, RackAwareMode.Enforced$.MODULE$); + } + + @Test + public void testUpsertTopicsForExistingStream() { + // Mock it as an existing topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + + //Existing Stream + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); + + //verify change topic happens because isNewStream=false + verifyStatic(AdminUtils.class, times(1)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$); + } + + + @Test + public void testUpsertTopicsForNewTopic() { + // Mock it as a new topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(false); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + + // Not existing Stream + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true); + + //verify create topic happens when requested topic does not exist + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); + verifyStatic(AdminUtils.class, times(1)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$); + } + + @Test + public void testUpsertTopicsForNewTopicExistsInSR() { + // Mock it as a new topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(false); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + + //Existing Stream + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); + + // note: this is a weird case, because somehow the stream exists in SR, but the underlying topic does NOT + // might want to consider this corner case a bit more. Currently the behavior honors the request and creates the topic + // with the requested config + + //verify create topic happens when requested topic does not exist + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); + verifyStatic(AdminUtils.class, times(1)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$); + } +} diff --git a/pom.xml b/pom.xml index 6e30bbcbf..a746ee05b 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ 4.12 1.3 2.18.3 + 2.0.0-beta.5 2.9.0 @@ -412,6 +413,37 @@ test test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + + + org.powermock + powermock-core + ${powermock.version} + test + + + net.bytebuddy + byte-buddy-agent + 1.8.5 + test + + + net.bytebuddy + byte-buddy + 1.8.5 + test + + @@ -563,6 +595,8 @@ org.hamcrest:hamcrest-core:jar:${hamcrest.version} org.slf4j:slf4j-api:jar:${slf4j.version} io.dropwizard:dropwizard-testing:jar:1.3.5 + net.bytebuddy:byte-buddy:jar + net.bytebuddy:byte-buddy-agent