From 449ad5f7601397ed04b4f22c0bdd74b830478170 Mon Sep 17 00:00:00 2001 From: Arun Vasudevan Date: Wed, 12 Dec 2018 16:32:43 -0600 Subject: [PATCH 01/11] Removing update topics functionality in Stream Registry --- .../streamingplatform/db/dao/impl/KafkaManagerImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index 15d1a9eb7..8b061a56f 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -72,7 +72,8 @@ public void upsertTopics(Collection topics, int partitions, int replicat List topicsToCreate = new ArrayList<>(); for (String topic : topics) { if (AdminUtils.topicExists(zkUtils, topic)) { - updateTopic(zkUtils, topic, topicConfigMap); + log.info("Stream Registry does not allow updating configs for existing topics."); + //updateTopic(zkUtils, topic, topicConfigMap); } else { topicsToCreate.add(topic); } From 78bf0ea60fcaebde3e0b8dce60dbdc325c8eb0a8 Mon Sep 17 00:00:00 2001 From: Arun Vasudevan Date: Wed, 12 Dec 2018 16:39:39 -0600 Subject: [PATCH 02/11] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ec344309..29417cffc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Removed - Shell script for build commands (#43) +- Topic Config update functionality [0.3.1]: https://github.com/HomeAway/stream-registry/compare/v0.3.0...v0.3.1 [0.3.0]: https://github.com/HomeAway/stream-registry/compare/v0.2.42...v0.3.0 From 0bed4e454830b89a48a23d6ab3d34c69df559e75 Mon Sep 17 00:00:00 2001 From: Arun Vasudevan Date: Wed, 19 Dec 2018 18:55:48 -0600 Subject: [PATCH 03/11] Stop Stream Creation for existing topics if configs don't match --- core/pom.xml | 26 ++++ .../db/dao/KafkaManager.java | 2 +- .../db/dao/impl/KafkaManagerImpl.java | 26 +++- .../db/dao/impl/StreamDaoImpl.java | 14 +- .../db/dao/impl/KafkaManagerImplTest.java | 123 ++++++++++++++++++ pom.xml | 34 +++++ 6 files changed, 213 insertions(+), 12 deletions(-) create mode 100644 core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java diff --git a/core/pom.xml b/core/pom.xml index 5c64c0bc1..48d8851b0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -246,6 +246,32 @@ kafka-schema-registry test + + org.powermock + powermock-module-junit4 + test + + + org.powermock + powermock-api-mockito2 + test + + + org.powermock + powermock-core + test + + + net.bytebuddy + byte-buddy-agent + test + + + net.bytebuddy + byte-buddy + test + + diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java index 069a17b30..2d14a8ea0 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java @@ -22,6 +22,6 @@ // TODO Why do we have imperative operations here in stream registry? Does this limit composability? Can we reduce scope? public interface KafkaManager { - void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig); + void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream); } diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index 8b061a56f..69a6627ec 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import kafka.admin.AdminUtils; +import kafka.server.ConfigType; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import lombok.extern.slf4j.Slf4j; @@ -42,10 +43,13 @@ import com.homeaway.streamingplatform.configuration.KafkaProducerConfig; import com.homeaway.streamingplatform.db.dao.KafkaManager; +import com.homeaway.streamingplatform.exceptions.StreamCreationException; @Slf4j public class KafkaManagerImpl implements KafkaManager { + public List topicsToCreate; + /** * Create and/or Update Topics using AdminClient and AdminUtils * @@ -54,7 +58,7 @@ public class KafkaManagerImpl implements KafkaManager { * @param replicationFactor replication for each topic that will be created * @param properties properties that will be set on each topic in the list */ - public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties) { + public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream) { // remove client connection properties to leave only topic configs Map topicConfigMap = new HashMap<>(properties.entrySet() .stream() @@ -69,11 +73,23 @@ public void upsertTopics(Collection topics, int partitions, int replicat ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); try { - List topicsToCreate = new ArrayList<>(); + topicsToCreate = new ArrayList<>(); for (String topic : topics) { if (AdminUtils.topicExists(zkUtils, topic)) { - log.info("Stream Registry does not allow updating configs for existing topics."); - //updateTopic(zkUtils, topic, topicConfigMap); + //TODO Pass the Boolean if it is a New Stream + //TODO Read the existing topics config map and compare sand if its a new stream throw CreateStreamException if it doesn't match + Properties actualTopicConfig = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); + Map actualTopicConfigMap = actualTopicConfig + .entrySet() + .stream() + .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString())); + + // If a Stream which is created newly in Stream Registry is already present in the underlying streaming infrastructure + // then compare the configs and fail completely if its doesn't match + if(isNewStream && !actualTopicConfigMap.equals(topicConfigMap)) { + throw new StreamCreationException(topic); + } + updateTopic(zkUtils, topic, topicConfigMap); } else { topicsToCreate.add(topic); } @@ -100,7 +116,7 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map conf // TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above // TODO Timeout exception needs to propagate and not be handled here // TODO Interrupted Exception also needs to propagate and not be handled here - private void createTopics(Collection topics, int partitions, int replicationFactor, Properties adminClientProperties, Map topicConfigMap) { + protected void createTopics(Collection topics, int partitions, int replicationFactor, Properties adminClientProperties, Map topicConfigMap) { try (AdminClient adminClient = AdminClient.create(adminClientProperties)) { List newTopicList = topics.stream() .map(topic -> { diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java index fe2cad427..437a514ab 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/StreamDaoImpl.java @@ -133,6 +133,7 @@ public void upsertStream(Stream stream) { SchemaReference keyReference = schemaManager.registerSchema(keySubject, stream.getLatestKeySchema().getSchemaString()); stream.getLatestKeySchema().setId(String.valueOf(keyReference.getId())); stream.getLatestKeySchema().setVersion(keyReference.getVersion()); + boolean isNewStream = false; String valueSubject = stream.getName() + "-value"; SchemaReference valueReference = schemaManager.registerSchema(valueSubject, stream.getLatestValueSchema().getSchemaString()); @@ -168,11 +169,12 @@ public void upsertStream(Stream stream) { avroStream.setS3ConnectorList(value.get().getS3ConnectorList()); } else { log.info("key NOT available for the stream-name={}", stream.getName()); + isNewStream = true; avroStream.setCreated(System.currentTimeMillis()); key = AvroStreamKey.newBuilder().setStreamName(avroStream.getName()).build(); } - verifyAndUpsertTopics(stream); + verifyAndUpsertTopics(stream, isNewStream); kafkaProducer.log(key, avroStream); log.info("Stream upserted for {}", stream.getName()); } catch (Exception e) { @@ -208,18 +210,18 @@ private void applyDefaultPartition(Stream stream) { * * @param stream the stream that will be used to verify and/or upsert topics to */ - private void verifyAndUpsertTopics(Stream stream) { + private void verifyAndUpsertTopics(Stream stream, boolean isNewStream) { List vpcList = stream.getVpcList(); List replicatedVpcList = stream.getReplicatedVpcList(); try { log.info("creating topics for vpcList: {}", vpcList); for (String vpc : vpcList) { - upsertTopics(stream, vpc); + upsertTopics(stream, vpc, isNewStream); } if (replicatedVpcList != null && !replicatedVpcList.isEmpty()) { log.info("creating topics for replicatedVpcList: {}", replicatedVpcList); for (String vpc : replicatedVpcList) { - upsertTopics(stream, vpc); + upsertTopics(stream, vpc, isNewStream); } } } catch (Exception e) { @@ -228,7 +230,7 @@ private void verifyAndUpsertTopics(Stream stream) { } } - private void upsertTopics(Stream stream, String vpc) { + private void upsertTopics(Stream stream, String vpc, boolean isNewStream) { ClusterValue clusterValue = getClusterDetails(vpc, env, stream.getTags().getHint(), "producer"); Properties topicConfig = new Properties(); if (stream.getTopicConfig() != null) { @@ -240,7 +242,7 @@ private void upsertTopics(Stream stream, String vpc) { topicConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); topicConfig.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, zkConnect); - kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig); + kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig, isNewStream); log.info("Topic {} created/updated at {}", stream.getName(), bootstrapServer); } diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java new file mode 100644 index 000000000..fb48d8a65 --- /dev/null +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -0,0 +1,123 @@ +/* Copyright (c) 2018 Expedia Group. + * All rights reserved. http://www.homeaway.com + + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.homeaway.streamingplatform.db.dao.impl; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.when; +import static org.powermock.api.mockito.PowerMockito.mockStatic; +import static org.powermock.api.mockito.PowerMockito.spy; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +import java.util.Collections; +import java.util.Properties; + +import kafka.admin.AdminUtils; +import kafka.server.ConfigType; +import kafka.utils.ZkUtils; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import com.homeaway.streamingplatform.configuration.KafkaProducerConfig; +import com.homeaway.streamingplatform.exceptions.StreamCreationException; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({KafkaManagerImpl.class, AdminUtils.class}) +public class KafkaManagerImplTest { + + private static final String topic = "kafka-manager-test"; + private static final int partitions = 2; + private static final int replicationFactor = 3; + private static Properties props = new Properties(); + + @Mock private ZkUtils zkUtils; + @Mock private ZkClient zkClient; + @Mock private ZkConnection zkConnection; + + @InjectMocks + private KafkaManagerImpl kafkaManager; + + @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Before + public void setup() throws Exception{ + props.put("key1", "val1"); + props.put("key2", 2); + props.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, ""); + + kafkaManager = new KafkaManagerImpl(); + + mockStatic(AdminUtils.class); + + whenNew(ZkClient.class).withAnyArguments().thenReturn(zkClient); + whenNew(ZkConnection.class).withArguments(Mockito.anyString()).thenReturn(zkConnection); + whenNew(ZkUtils.class).withAnyArguments().thenReturn(zkUtils); + + when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic)).thenReturn(props); + } + + @Test(expected = StreamCreationException.class) + public void testUpsertTopicsForNewStream(){ + // Mock it as an existing topic + when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(true); + + //New Stream + kafkaManager.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, true); + } + + @Test + public void testUpsertTopicsForExistingStream() { + // Mock it as an existing topic + when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(true); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + doNothing().doThrow(new RuntimeException()).when(kafkaManagerSpy).createTopics(Mockito.anyCollection(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.anyMap()); + + //Existing Stream + kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false); + + //Assert if 0 topic is added to the list to be created + Assert.assertEquals(0,kafkaManagerSpy.topicsToCreate.size()); + } + + + @Test + public void testUpsertTopicsForNewTopic() { + // Mock it as a new topic + when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(false); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + doNothing().doThrow(new RuntimeException()).when(kafkaManagerSpy).createTopics(Mockito.anyCollection(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.anyMap()); + + //Existing Stream + kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false); + + //Assert if 1 topic is added to the list to be created + Assert.assertEquals(1, kafkaManagerSpy.topicsToCreate.size()); + } +} diff --git a/pom.xml b/pom.xml index ba6f77272..5a17737be 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ 4.12 1.3 2.18.3 + 2.0.0-beta.5 2.9.0 @@ -417,6 +418,37 @@ test test + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + + + org.powermock + powermock-core + ${powermock.version} + test + + + net.bytebuddy + byte-buddy-agent + 1.8.5 + test + + + net.bytebuddy + byte-buddy + 1.8.5 + test + + @@ -568,6 +600,8 @@ org.hamcrest:hamcrest-core:jar:${hamcrest.version} org.slf4j:slf4j-api:jar:${slf4j.version} io.dropwizard:dropwizard-testing:jar:1.3.5 + net.bytebuddy:byte-buddy:jar + net.bytebuddy:byte-buddy-agent From 44a59ebfe80c6dac8120d0add4487c405704c534 Mon Sep 17 00:00:00 2001 From: Arun Vasudevan Date: Wed, 19 Dec 2018 19:00:30 -0600 Subject: [PATCH 04/11] Updating Changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a8364b7b..a1337e300 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Schema validation support via `SchemaManager` interface with default Confluent implementation provided (#41) +### Changed +- Stop Stream Creation for existing topics if topic configs don't match + ## [0.3.2] - 20181216 ### Changed - Updated README to something that outlines this a bit better. (#54) @@ -28,7 +31,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Removed - Shell script for build commands (#43) -- Topic Config update functionality [0.4.0]: https://github.com/HomeAway/stream-registry/compare/v0.3.2...v0.4.0 [0.3.2]: https://github.com/HomeAway/stream-registry/compare/v0.3.1...v0.3.2 From 4113a0b766eebd2e672436a058da7d0507418a28 Mon Sep 17 00:00:00 2001 From: "Rene X. Parra" <552515+neoword@users.noreply.github.com> Date: Fri, 21 Dec 2018 15:40:27 -0600 Subject: [PATCH 05/11] Updating CHANGELOG to 0.4.1 --- CHANGELOG.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05452b7df..329f0706c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed - Adding un-annotated `streamName` path param to streamUpsert HTTP resource (#69) - Updated pom.xml to remove unused retrofit library, and clean up some versioning (#70) +- Stop Stream Creation for existing topics if topic configs don't match (#52) ### Removed - Deleted TODO/Documentation that referenced incorrect `http://localhost:8081/healthcheck` (#64) @@ -16,9 +17,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Schema validation support via `SchemaManager` interface with default Confluent implementation provided (#41) -### Changed -- Stop Stream Creation for existing topics if topic configs don't match - ## [0.3.2] - 20181216 ### Changed - Updated README to something that outlines this a bit better. (#54) From f3ecea918ff227c9c7e98936c51b3b56d802221f Mon Sep 17 00:00:00 2001 From: "Rene X. Parra" <552515+neoword@users.noreply.github.com> Date: Fri, 21 Dec 2018 18:27:42 -0600 Subject: [PATCH 06/11] Checkpoint. WIP! DO NOT MERGE - Still need to fix all FIXMEs - Need to get unit test working properly --- .../db/dao/KafkaManager.java | 20 ++- .../db/dao/impl/KafkaManagerImpl.java | 164 +++++++++++------- .../db/dao/impl/KafkaManagerImplTest.java | 7 +- 3 files changed, 123 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java index 2d14a8ea0..877a96dc6 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java @@ -20,8 +20,26 @@ // TODO need javadoc for KafkaManager // TODO Why do we have imperative operations here in stream registry? Does this limit composability? Can we reduce scope? +// TODO Need to consider merging this with StreamInfrastructureManager to keep it stream-platform agnostic. public interface KafkaManager { + /** + * Initializes the concrete implementation of KafkaManager. + * @param config The properties for the given KafkaManager + */ + void init(Properties config); - void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream); + /** + * Gracefully shutsdown this implementation of KafkaManager. + */ + void shutdown(); + /** + * Creates topics in underlying implementation of KafkaManager provider. + * @param topics topics to creates + * @param partitions number of partitions for each of those topics + * @param replicationFactor replicationFactor for each of those topics + * @param topicConfig topic config to use for each of these topics + * @param isNewStream whether or not this invocation results from existing or new stream in stream registry. + */ + void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream); } diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index 69a6627ec..608b11a91 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -17,13 +17,7 @@ import static java.util.stream.Collectors.toList; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ExecutionException; +import java.util.*; import java.util.stream.Collectors; import kafka.admin.AdminUtils; @@ -38,8 +32,6 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.errors.TopicExistsException; import com.homeaway.streamingplatform.configuration.KafkaProducerConfig; import com.homeaway.streamingplatform.db.dao.KafkaManager; @@ -47,8 +39,17 @@ @Slf4j public class KafkaManagerImpl implements KafkaManager { + private static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() { + private static final long serialVersionUID = -7377105429359314831L; { - public List topicsToCreate; + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, true); + put(KafkaProducerConfig.ZOOKEEPER_QUORUM, true); + }}; + + // internal state + + private ZkClient zkClient; + private ZkUtils zkUtils; /** * Create and/or Update Topics using AdminClient and AdminUtils @@ -60,51 +61,51 @@ public class KafkaManagerImpl implements KafkaManager { */ public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream) { // remove client connection properties to leave only topic configs - Map topicConfigMap = new HashMap<>(properties.entrySet() - .stream() - .filter(e -> !e.getKey().equals(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) - .filter(e -> !e.getKey().equals(KafkaProducerConfig.ZOOKEEPER_QUORUM)) - .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()))); - - String zkConnect = properties.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM); - ZkClient zkClient = new ZkClient(zkConnect); - zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); - ZkConnection zkConnection = new ZkConnection(zkConnect); - ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + Map topicConfigMap = filterPropertiesKeys(properties, TOPIC_CONFIG_KEY_FILTER); + + List topicsToCreate = new ArrayList<>(); + for (String topic : topics) { + // check if topic doesn't exist and add to list + if (!topicExists(topic)) { + // buffer creation request + topicsToCreate.add(topic); + continue; + } - try { - topicsToCreate = new ArrayList<>(); - for (String topic : topics) { - if (AdminUtils.topicExists(zkUtils, topic)) { - //TODO Pass the Boolean if it is a New Stream - //TODO Read the existing topics config map and compare sand if its a new stream throw CreateStreamException if it doesn't match - Properties actualTopicConfig = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); - Map actualTopicConfigMap = actualTopicConfig - .entrySet() - .stream() - .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString())); - - // If a Stream which is created newly in Stream Registry is already present in the underlying streaming infrastructure - // then compare the configs and fail completely if its doesn't match - if(isNewStream && !actualTopicConfigMap.equals(topicConfigMap)) { - throw new StreamCreationException(topic); - } - updateTopic(zkUtils, topic, topicConfigMap); - } else { - topicsToCreate.add(topic); - } + // update topic + Properties actualTopicConfig = getTopicConfig(topic); + Map actualTopicConfigMap = propertiesToMap(actualTopicConfig); + if(actualTopicConfig.equals(topicConfigMap)) { + // NOTHING TO DO! + log.info("topic config for {} exactly match. Ignoring.", topic); + continue; } - createTopics(topicsToCreate, partitions, replicationFactor, properties, topicConfigMap); - } finally { - try { - zkClient.close(); - zkConnection.close(); - zkUtils.close(); - } catch (InterruptedException e) { - log.info("caught an exception while closing ZK clients: {}", e.getMessage()); + // FIXME!! This wreaks of non-declarative.. in a fully declarative world we would log the request and based on some policy accept or reject the request. + // FIXME!! Here we are coding up the policy to reject the request if the config is different and in the newStream requested state. + + // NOTE: If a newly created stream is requested in Stream Registry but it is already present + // in the underlying streaming infrastructure... AND we got this far, this means configuration + // is different. We want to prevent this from actually changing the underlying infrastructure. + // Therefore the operation is failed with an exception. + // + // This provides a safety mechanism and a migration path by requiring folks + // to exactly match downstream config when the stream-registry has not "onboarded" existing topic + // for the first time. + + // TODO: Alternatively we can add a updateMetadataOnly=true flag, and this would not have any side-effects + if(isNewStream) { + // FIXME!! Fix exception reporting... just reporting the topic failed with no message is not a good developer experience. Consider replacing topic with message. (and include topic name in message). + throw new StreamCreationException(topic); } + + // If we got this far, we are not a new stream, and request config is different than + // what is in stream registry. Feel fe + updateTopic(zkUtils, topic, topicConfigMap); } + + // now create any topics that were necessary to create this run + createTopics(topicsToCreate, partitions, replicationFactor, properties, topicConfigMap); } private void updateTopic(ZkUtils zkUtils, String topic, Map configMap) { @@ -115,27 +116,60 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map conf // TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above // TODO Timeout exception needs to propagate and not be handled here - // TODO Interrupted Exception also needs to propagate and not be handled here + // TODO Need JavaDoc protected void createTopics(Collection topics, int partitions, int replicationFactor, Properties adminClientProperties, Map topicConfigMap) { - try (AdminClient adminClient = AdminClient.create(adminClientProperties)) { + try { + AdminClient adminClient = AdminClient.create(adminClientProperties); List newTopicList = topics.stream() - .map(topic -> { - NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor); - newTopic.configs(topicConfigMap); - return newTopic; - }) + .map(topic -> + new NewTopic(topic, partitions, (short) replicationFactor) + .configs(topicConfigMap)) .collect(toList()); CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList); + // synchronous block createTopicsResult.all().get(); - } catch (ExecutionException e) { - if (e.getCause() instanceof TopicExistsException) { - log.info("Topic already exists !!"); - } else if (e.getCause() instanceof TimeoutException) { - log.error("Timeout !!", e); - } - } catch (InterruptedException e) { - log.error("Exception in creating topics:", e); + } catch (Exception exception) { + // Unexpected exception + throw new IllegalStateException("Unexpected exception", exception); + } + } + + @Override + public void init(Properties config) { + String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM); + zkClient = new ZkClient(zkConnect); + zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); + ZkConnection zkConnection = new ZkConnection(zkConnect); + zkUtils = new ZkUtils(zkClient, zkConnection, false); + } + + @Override + public void shutdown() { + try { + zkUtils.close(); + zkClient.close(); + } catch (RuntimeException exception) { + log.error("Unexpected exception caught during KafkaManagerImpl shutdown.", exception); } } + + private Map filterPropertiesKeys(Properties properties, Map keyfilterMap) { + return new HashMap<>(properties.entrySet().stream() + .filter(entry -> keyfilterMap.containsKey(entry.getKey())) + .collect(Collectors.toMap(entry -> (String)entry.getKey(), entry -> (String)entry.getValue()))); + } + + private Map propertiesToMap(Properties properties) { + return properties.entrySet().stream() + .collect(Collectors.toMap(entry -> (String)entry.getKey(), entry -> (String)entry.getValue())); + } + + private boolean topicExists(String topic) { + return AdminUtils.topicExists(zkUtils, topic); + } + + private Properties getTopicConfig(String topic) { + return AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); + } } diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java index fb48d8a65..348b2b3d9 100644 --- a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -88,6 +88,7 @@ public void testUpsertTopicsForNewStream(){ //New Stream kafkaManager.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, true); + // FIXME! } @Test @@ -102,7 +103,8 @@ public void testUpsertTopicsForExistingStream() { kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false); //Assert if 0 topic is added to the list to be created - Assert.assertEquals(0,kafkaManagerSpy.topicsToCreate.size()); + //Assert.assertEquals(0,kafkaManagerSpy.topicsToCreate.size()); + // FIXME! } @@ -118,6 +120,7 @@ public void testUpsertTopicsForNewTopic() { kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false); //Assert if 1 topic is added to the list to be created - Assert.assertEquals(1, kafkaManagerSpy.topicsToCreate.size()); + //Assert.assertEquals(1, kafkaManagerSpy.topicsToCreate.size()); + // FIXME! } } From da75b2067ee78fc5b199446c45dd00f634512cfa Mon Sep 17 00:00:00 2001 From: "Rene X. Parra" <552515+neoword@users.noreply.github.com> Date: Sun, 30 Dec 2018 10:11:28 -0600 Subject: [PATCH 07/11] CHECKPOINT. IT Tests failing --- .../configuration/KafkaProducerConfig.java | 1 + .../db/dao/KafkaManager.java | 11 -- .../db/dao/impl/KafkaManagerImpl.java | 150 +++++++++--------- .../db/dao/impl/KafkaManagerImplTest.java | 79 +++++---- 4 files changed, 124 insertions(+), 117 deletions(-) diff --git a/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java b/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java index 06c5031cd..340ab8f16 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java +++ b/core/src/main/java/com/homeaway/streamingplatform/configuration/KafkaProducerConfig.java @@ -19,6 +19,7 @@ import lombok.Data; +// TODO Need documentation @Data public class KafkaProducerConfig { diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java index 877a96dc6..409335326 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/KafkaManager.java @@ -22,17 +22,6 @@ // TODO Why do we have imperative operations here in stream registry? Does this limit composability? Can we reduce scope? // TODO Need to consider merging this with StreamInfrastructureManager to keep it stream-platform agnostic. public interface KafkaManager { - /** - * Initializes the concrete implementation of KafkaManager. - * @param config The properties for the given KafkaManager - */ - void init(Properties config); - - /** - * Gracefully shutsdown this implementation of KafkaManager. - */ - void shutdown(); - /** * Creates topics in underlying implementation of KafkaManager provider. * @param topics topics to creates diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index 608b11a91..db1858ce8 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -15,12 +15,11 @@ */ package com.homeaway.streamingplatform.db.dao.impl; -import static java.util.stream.Collectors.toList; - import java.util.*; import java.util.stream.Collectors; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; @@ -28,9 +27,6 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import com.homeaway.streamingplatform.configuration.KafkaProducerConfig; @@ -39,7 +35,7 @@ @Slf4j public class KafkaManagerImpl implements KafkaManager { - private static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() { + static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() { private static final long serialVersionUID = -7377105429359314831L; { put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, true); @@ -48,8 +44,23 @@ public class KafkaManagerImpl implements KafkaManager { // internal state - private ZkClient zkClient; - private ZkUtils zkUtils; + + private ZkUtils initZkUtils(Properties config) { + String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM); + ZkClient zkClient = new ZkClient(zkConnect); + zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); + ZkConnection zkConnection = new ZkConnection(zkConnect); + ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false); + return zkUtils; + } + + public void shutdownZkUtils(ZkUtils zkUtils) { + try { + zkUtils.close(); + } catch (RuntimeException exception) { + log.error("Unexpected exception caught during zkUtils shutdown.", exception); + } + } /** * Create and/or Update Topics using AdminClient and AdminUtils @@ -60,30 +71,46 @@ public class KafkaManagerImpl implements KafkaManager { * @param properties properties that will be set on each topic in the list */ public void upsertTopics(Collection topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream) { - // remove client connection properties to leave only topic configs - Map topicConfigMap = filterPropertiesKeys(properties, TOPIC_CONFIG_KEY_FILTER); - - List topicsToCreate = new ArrayList<>(); - for (String topic : topics) { - // check if topic doesn't exist and add to list - if (!topicExists(topic)) { - // buffer creation request - topicsToCreate.add(topic); - continue; - } + // TODO - Can't guarantee against race conditions... should probably move to event-source paradigm to + // protect against this (and maybe employ optimistic locking for extra safety). + // The issue here is there is nothing that "locks" the underlying kafka store -- something + // can inevitably change the underlying store while this method is evaluating, always accounting for + // some amount of race window. + + // TODO probably need to cache a KafkaManagerImpl per "cluster" to avoid un-necessary creation / destruction of connections + ZkUtils zkUtils = initZkUtils(properties); + try { + // remove client connection properties to leave only topic configs + Map topicConfigMap = filterPropertiesKeys(properties, TOPIC_CONFIG_KEY_FILTER); + + // partition the list by whether the topic exists or not + Map> partitionMaps = topics.stream().collect(Collectors.partitioningBy(topic -> topicExists(zkUtils, topic))); + + // if it exists, update it. If it doesn't exist, create it + List topicsToUpdate = partitionMaps.get(true); + List topicsToCreate = partitionMaps.get(false); + // update any topics that are necessary + updateTopics(zkUtils, topicsToUpdate, topicConfigMap, isNewStream); + + // now create any topics that were necessary to create this run + createTopics(zkUtils, topicsToCreate, partitions, replicationFactor, topicConfigMap); + } finally { + shutdownZkUtils(zkUtils); + } + } + + void updateTopics(ZkUtils zkUtils, List topicsToUpdate, Map topicConfigMap, boolean isNewStream) { + for (String topic : topicsToUpdate) { // update topic - Properties actualTopicConfig = getTopicConfig(topic); + Properties actualTopicConfig = getTopicConfig(zkUtils, topic); Map actualTopicConfigMap = propertiesToMap(actualTopicConfig); - if(actualTopicConfig.equals(topicConfigMap)) { + if(actualTopicConfigMap.equals(topicConfigMap)) { // NOTHING TO DO! log.info("topic config for {} exactly match. Ignoring.", topic); continue; } - // FIXME!! This wreaks of non-declarative.. in a fully declarative world we would log the request and based on some policy accept or reject the request. - // FIXME!! Here we are coding up the policy to reject the request if the config is different and in the newStream requested state. - // NOTE: If a newly created stream is requested in Stream Registry but it is already present // in the underlying streaming infrastructure... AND we got this far, this means configuration // is different. We want to prevent this from actually changing the underlying infrastructure. @@ -93,19 +120,18 @@ public void upsertTopics(Collection topics, int partitions, int replicat // to exactly match downstream config when the stream-registry has not "onboarded" existing topic // for the first time. - // TODO: Alternatively we can add a updateMetadataOnly=true flag, and this would not have any side-effects + // TODO Alternatively we can add a forceSync=true flag, ignoring any user provided info, and only updating SR with the underlying settings + // We should probably do forceSync=true anyway, as it provides a simple way to keep things in sync if(isNewStream) { // FIXME!! Fix exception reporting... just reporting the topic failed with no message is not a good developer experience. Consider replacing topic with message. (and include topic name in message). + // throw new StreamCreationException("topic: " + topic " already exists but config is different than requested!"); // consider using forceSync=true throw new StreamCreationException(topic); } - // If we got this far, we are not a new stream, and request config is different than - // what is in stream registry. Feel fe + // If we got this far, we are "updating" an "existing" stream, and request config is different than + // what is in stream registry. Go ahead and update now. updateTopic(zkUtils, topic, topicConfigMap); } - - // now create any topics that were necessary to create this run - createTopics(topicsToCreate, partitions, replicationFactor, properties, topicConfigMap); } private void updateTopic(ZkUtils zkUtils, String topic, Map configMap) { @@ -117,59 +143,41 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map conf // TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above // TODO Timeout exception needs to propagate and not be handled here // TODO Need JavaDoc - protected void createTopics(Collection topics, int partitions, int replicationFactor, Properties adminClientProperties, Map topicConfigMap) { - try { - AdminClient adminClient = AdminClient.create(adminClientProperties); - List newTopicList = topics.stream() - .map(topic -> - new NewTopic(topic, partitions, (short) replicationFactor) - .configs(topicConfigMap)) - .collect(toList()); - - CreateTopicsResult createTopicsResult = adminClient.createTopics(newTopicList); - // synchronous block - createTopicsResult.all().get(); - } catch (Exception exception) { - // Unexpected exception - throw new IllegalStateException("Unexpected exception", exception); + void createTopics(ZkUtils zkUtils, Collection topics, int partitions, int replicationFactor, Map topicConfigMap) { + for(String topic : topics) { + createTopic(zkUtils, topic, partitions, replicationFactor, topicConfigMap); } } - @Override - public void init(Properties config) { - String zkConnect = config.getProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM); - zkClient = new ZkClient(zkConnect); - zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); - ZkConnection zkConnection = new ZkConnection(zkConnect); - zkUtils = new ZkUtils(zkClient, zkConnection, false); + private void createTopic(ZkUtils zkUtils, String topic, int partitions, int replicationFactor, Map topicConfigMap) { + Properties topicProperties = new Properties(); + topicProperties.putAll(topicConfigMap); + AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties, RackAwareMode.Enforced$.MODULE$); } - @Override - public void shutdown() { - try { - zkUtils.close(); - zkClient.close(); - } catch (RuntimeException exception) { - log.error("Unexpected exception caught during KafkaManagerImpl shutdown.", exception); - } - } + // utility methods for this class - private Map filterPropertiesKeys(Properties properties, Map keyfilterMap) { - return new HashMap<>(properties.entrySet().stream() - .filter(entry -> keyfilterMap.containsKey(entry.getKey())) - .collect(Collectors.toMap(entry -> (String)entry.getKey(), entry -> (String)entry.getValue()))); + @SuppressWarnings("SuspiciousMethodCalls") + Map filterPropertiesKeys(Properties properties, Map keyFilterMap) { + return new HashMap<>(properties.keySet().stream() + .filter(keyFilterMap::containsKey) + .collect(Collectors.toMap(key -> (String)key, key -> properties.getProperty((String)key)))); } - private Map propertiesToMap(Properties properties) { - return properties.entrySet().stream() - .collect(Collectors.toMap(entry -> (String)entry.getKey(), entry -> (String)entry.getValue())); + Map propertiesToMap(Properties properties) { + return properties.keySet().stream() + .filter(key -> properties.getProperty((String)key) != null) + .collect(Collectors.toMap(key -> (String)key, + key -> properties.getProperty((String)key))); } - private boolean topicExists(String topic) { - return AdminUtils.topicExists(zkUtils, topic); + private boolean topicExists(ZkUtils zkUtils, String topic) { + boolean topicExists = AdminUtils.topicExists(zkUtils, topic); + log.debug("topic: {} exists={}", topic, topicExists); + return topicExists; } - private Properties getTopicConfig(String topic) { + private Properties getTopicConfig(ZkUtils zkUtils, String topic) { return AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic); } } diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java index 348b2b3d9..63e0b00e2 100644 --- a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -15,22 +15,21 @@ */ package com.homeaway.streamingplatform.db.dao.impl; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mockStatic; -import static org.powermock.api.mockito.PowerMockito.spy; -import static org.powermock.api.mockito.PowerMockito.whenNew; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.times; +import static org.powermock.api.mockito.PowerMockito.*; import java.util.Collections; import java.util.Properties; import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; -import org.junit.Assert; +import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -40,6 +39,7 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -47,80 +47,89 @@ import com.homeaway.streamingplatform.exceptions.StreamCreationException; @RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.management.*") @PrepareForTest({KafkaManagerImpl.class, AdminUtils.class}) public class KafkaManagerImplTest { - private static final String topic = "kafka-manager-test"; - private static final int partitions = 2; - private static final int replicationFactor = 3; - private static Properties props = new Properties(); + private static final String TOPIC = "kafka-manager-test"; + private static final int PARTITIONS = 2; + private static final int REPLICATION_FACTOR = 3; + private static final Properties PROPS = new Properties(); + private static final Properties FILTERED_PROPS = new Properties(); @Mock private ZkUtils zkUtils; @Mock private ZkClient zkClient; @Mock private ZkConnection zkConnection; @InjectMocks - private KafkaManagerImpl kafkaManager; + private KafkaManagerImpl kafkaManager = new KafkaManagerImpl(); @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); @Before public void setup() throws Exception{ - props.put("key1", "val1"); - props.put("key2", 2); - props.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, ""); - - kafkaManager = new KafkaManagerImpl(); - - mockStatic(AdminUtils.class); - + PROPS.put("key1", "val1"); + PROPS.put("key2", 2); + PROPS.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); + PROPS.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + FILTERED_PROPS.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); + FILTERED_PROPS.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + + // setup kafkaManager + // kafkaManager.init(PROPS); + // un-necessary since powermock has setup mocks for zookeeper in KafkaManagerImpl + + // make sure that the * direct-caller * of these classes are in @PrepareForTest annotation above on this test whenNew(ZkClient.class).withAnyArguments().thenReturn(zkClient); whenNew(ZkConnection.class).withArguments(Mockito.anyString()).thenReturn(zkConnection); whenNew(ZkUtils.class).withAnyArguments().thenReturn(zkUtils); - when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic)).thenReturn(props); + // using power mock to allow for mocking of static classes + mockStatic(AdminUtils.class); + when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), TOPIC)).thenReturn(PROPS); } @Test(expected = StreamCreationException.class) public void testUpsertTopicsForNewStream(){ // Mock it as an existing topic - when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(true); + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true); //New Stream - kafkaManager.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, true); - // FIXME! + kafkaManager.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true); } @Test public void testUpsertTopicsForExistingStream() { // Mock it as an existing topic - when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(true); + when(AdminUtils.topicExists(eq(zkUtils), eq(TOPIC))).thenReturn(true); KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); - doNothing().doThrow(new RuntimeException()).when(kafkaManagerSpy).createTopics(Mockito.anyCollection(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.anyMap()); //Existing Stream - kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false); + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); - //Assert if 0 topic is added to the list to be created - //Assert.assertEquals(0,kafkaManagerSpy.topicsToCreate.size()); - // FIXME! + //verify change topic happens only when requested config is exact + verifyStatic(AdminUtils.class, times(1)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$); } @Test public void testUpsertTopicsForNewTopic() { // Mock it as a new topic - when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(false); + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(false); KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); - doNothing().doThrow(new RuntimeException()).when(kafkaManagerSpy).createTopics(Mockito.anyCollection(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.anyMap()); //Existing Stream - kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false); + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); - //Assert if 1 topic is added to the list to be created - //Assert.assertEquals(1, kafkaManagerSpy.topicsToCreate.size()); - // FIXME! + //verify create topic happens when requested topic does not exist + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); + verifyStatic(AdminUtils.class, times(1)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$); } } From 7ffa595876b405a0fcc3dfdb7adbfeb85eb3a03b Mon Sep 17 00:00:00 2001 From: "Rene X. Parra" <552515+neoword@users.noreply.github.com> Date: Sun, 30 Dec 2018 10:37:38 -0600 Subject: [PATCH 08/11] Tests passing --- .../db/dao/impl/KafkaManagerImpl.java | 13 +++++++++++-- .../db/dao/impl/KafkaManagerImplTest.java | 14 ++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index db1858ce8..e05e92f49 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -15,7 +15,11 @@ */ package com.homeaway.streamingplatform.db.dao.impl; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; import kafka.admin.AdminUtils; @@ -100,6 +104,7 @@ public void upsertTopics(Collection topics, int partitions, int replicat } } + // package scope so that PowerMock can verify void updateTopics(ZkUtils zkUtils, List topicsToUpdate, Map topicConfigMap, boolean isNewStream) { for (String topic : topicsToUpdate) { // update topic @@ -143,6 +148,7 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map conf // TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above // TODO Timeout exception needs to propagate and not be handled here // TODO Need JavaDoc + // package scope so that PowerMock can verify void createTopics(ZkUtils zkUtils, Collection topics, int partitions, int replicationFactor, Map topicConfigMap) { for(String topic : topics) { createTopic(zkUtils, topic, partitions, replicationFactor, topicConfigMap); @@ -157,13 +163,16 @@ private void createTopic(ZkUtils zkUtils, String topic, int partitions, int repl // utility methods for this class + // package scope so that PowerMock can leverage @SuppressWarnings("SuspiciousMethodCalls") Map filterPropertiesKeys(Properties properties, Map keyFilterMap) { return new HashMap<>(properties.keySet().stream() - .filter(keyFilterMap::containsKey) + .filter(key -> !keyFilterMap.containsKey(key)) + .filter(key -> properties.getProperty((String)key) != null) .collect(Collectors.toMap(key -> (String)key, key -> properties.getProperty((String)key)))); } + // package scope so that PowerMock can leverage Map propertiesToMap(Properties properties) { return properties.keySet().stream() .filter(key -> properties.getProperty((String)key) != null) diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java index 63e0b00e2..04ef018d8 100644 --- a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -68,12 +68,12 @@ public class KafkaManagerImplTest { @Before public void setup() throws Exception{ - PROPS.put("key1", "val1"); - PROPS.put("key2", 2); - PROPS.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); - PROPS.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); - FILTERED_PROPS.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); - FILTERED_PROPS.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + PROPS.setProperty("key1", "val1"); + PROPS.setProperty("key2", "2"); // Properties should be strings only... not ints + PROPS.setProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); + PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + FILTERED_PROPS.setProperty("key1", "val1"); + FILTERED_PROPS.setProperty("key2", "2"); // setup kafkaManager // kafkaManager.init(PROPS); @@ -96,6 +96,8 @@ public void testUpsertTopicsForNewStream(){ //New Stream kafkaManager.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true); + + // expecting an exception to be thrown because topic exists but request doesn't match the config } @Test From ae88d049c47ab095937d1d6c3c31759f1867cc24 Mon Sep 17 00:00:00 2001 From: "Rene X. Parra" <552515+neoword@users.noreply.github.com> Date: Sun, 30 Dec 2018 11:02:03 -0600 Subject: [PATCH 09/11] Adding more tests to be comprehensive --- .../db/dao/impl/KafkaManagerImpl.java | 14 +++--- .../db/dao/impl/KafkaManagerImplTest.java | 50 +++++++++++++++++-- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java index e05e92f49..05d1ab5e2 100644 --- a/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java +++ b/core/src/main/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImpl.java @@ -39,7 +39,7 @@ @Slf4j public class KafkaManagerImpl implements KafkaManager { - static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() { + private static Map TOPIC_CONFIG_KEY_FILTER = new HashMap() { private static final long serialVersionUID = -7377105429359314831L; { put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, true); @@ -166,18 +166,18 @@ private void createTopic(ZkUtils zkUtils, String topic, int partitions, int repl // package scope so that PowerMock can leverage @SuppressWarnings("SuspiciousMethodCalls") Map filterPropertiesKeys(Properties properties, Map keyFilterMap) { - return new HashMap<>(properties.keySet().stream() + return new HashMap<>(properties.stringPropertyNames().stream() .filter(key -> !keyFilterMap.containsKey(key)) - .filter(key -> properties.getProperty((String)key) != null) - .collect(Collectors.toMap(key -> (String)key, key -> properties.getProperty((String)key)))); + .filter(key -> properties.getProperty(key) != null) + .collect(Collectors.toMap(key -> (String)key, key -> properties.getProperty(key)))); } // package scope so that PowerMock can leverage Map propertiesToMap(Properties properties) { - return properties.keySet().stream() - .filter(key -> properties.getProperty((String)key) != null) + return properties.stringPropertyNames().stream() + .filter(key -> properties.getProperty(key) != null) .collect(Collectors.toMap(key -> (String)key, - key -> properties.getProperty((String)key))); + key -> properties.getProperty(key))); } private boolean topicExists(ZkUtils zkUtils, String topic) { diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java index 04ef018d8..06e3f5967 100644 --- a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -15,7 +15,6 @@ */ package com.homeaway.streamingplatform.db.dao.impl; -import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.times; import static org.powermock.api.mockito.PowerMockito.*; @@ -56,6 +55,8 @@ public class KafkaManagerImplTest { private static final int REPLICATION_FACTOR = 3; private static final Properties PROPS = new Properties(); private static final Properties FILTERED_PROPS = new Properties(); + private static final Properties TOPIC_PROPS = new Properties(); + private static final Properties TOPIC_WITH_CNXN_PROPS = new Properties(); @Mock private ZkUtils zkUtils; @Mock private ZkClient zkClient; @@ -74,6 +75,11 @@ public void setup() throws Exception{ PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); FILTERED_PROPS.setProperty("key1", "val1"); FILTERED_PROPS.setProperty("key2", "2"); + TOPIC_PROPS.setProperty("key1", "actualVal1"); // different from "val1" + TOPIC_PROPS.setProperty("key2", "2"); + TOPIC_WITH_CNXN_PROPS.putAll(TOPIC_PROPS); + TOPIC_WITH_CNXN_PROPS.setProperty(KafkaProducerConfig.ZOOKEEPER_QUORUM, "127.0.0.1:2181"); + TOPIC_WITH_CNXN_PROPS.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); // setup kafkaManager // kafkaManager.init(PROPS); @@ -86,7 +92,7 @@ public void setup() throws Exception{ // using power mock to allow for mocking of static classes mockStatic(AdminUtils.class); - when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), TOPIC)).thenReturn(PROPS); + when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), TOPIC)).thenReturn(TOPIC_PROPS); } @Test(expected = StreamCreationException.class) @@ -100,17 +106,36 @@ public void testUpsertTopicsForNewStream(){ // expecting an exception to be thrown because topic exists but request doesn't match the config } + @Test + public void testUpsertTopicsForExistingStreamWithMatchingConfig() { + // Mock it as an existing topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + + // Existing Stream, but PROPS match!! should not have an exception + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, TOPIC_WITH_CNXN_PROPS, true); + + // verify change topic DOES NOT HAPPEN because props match + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, TOPIC_PROPS); + + // verify create topic DOES NOT HAPPEN because props match + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, TOPIC_PROPS, RackAwareMode.Enforced$.MODULE$); + } + @Test public void testUpsertTopicsForExistingStream() { // Mock it as an existing topic - when(AdminUtils.topicExists(eq(zkUtils), eq(TOPIC))).thenReturn(true); + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(true); KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); //Existing Stream kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); - //verify change topic happens only when requested config is exact + //verify change topic happens because isNewStream=false verifyStatic(AdminUtils.class, times(1)); AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); verifyStatic(AdminUtils.class, times(0)); @@ -125,6 +150,23 @@ public void testUpsertTopicsForNewTopic() { KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + //Existing Stream + kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true); + + //verify create topic happens when requested topic does not exist + verifyStatic(AdminUtils.class, times(0)); + AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); + verifyStatic(AdminUtils.class, times(1)); + AdminUtils.createTopic(zkUtils, TOPIC, PARTITIONS, REPLICATION_FACTOR, FILTERED_PROPS, RackAwareMode.Enforced$.MODULE$); + } + + @Test + public void testUpsertTopicsForNewTopicExistsInSR() { + // Mock it as a new topic + when(AdminUtils.topicExists(zkUtils, TOPIC)).thenReturn(false); + + KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); + //Existing Stream kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); From f0da48860418c36707bded5f3c3b58ec12ba139a Mon Sep 17 00:00:00 2001 From: "Rene X. Parra" <552515+neoword@users.noreply.github.com> Date: Sun, 30 Dec 2018 11:08:49 -0600 Subject: [PATCH 10/11] Updating comments --- .../streamingplatform/db/dao/impl/KafkaManagerImplTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java index 06e3f5967..255651938 100644 --- a/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java +++ b/core/src/test/java/com/homeaway/streamingplatform/db/dao/impl/KafkaManagerImplTest.java @@ -150,7 +150,7 @@ public void testUpsertTopicsForNewTopic() { KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager); - //Existing Stream + // Not existing Stream kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, true); //verify create topic happens when requested topic does not exist @@ -170,6 +170,10 @@ public void testUpsertTopicsForNewTopicExistsInSR() { //Existing Stream kafkaManagerSpy.upsertTopics(Collections.singleton(TOPIC), PARTITIONS, REPLICATION_FACTOR, PROPS, false); + // note: this is a weird case, because somehow the stream exists in SR, but the underlying topic does NOT + // might want to consider this corner case a bit more. Currently the behavior honors the request and creates the topic + // with the requested config + //verify create topic happens when requested topic does not exist verifyStatic(AdminUtils.class, times(0)); AdminUtils.changeTopicConfig(zkUtils, TOPIC, FILTERED_PROPS); From 59a6d4ecd737850bd57532748b8e6e8e6dc85f49 Mon Sep 17 00:00:00 2001 From: Arun Vasudevan Date: Wed, 2 Jan 2019 16:35:49 -0600 Subject: [PATCH 11/11] Updating changelog to version 0.4.2 --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 329f0706c..9575db1a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [0.4.2] - SNAPSHOT +### Changed +- Stop Stream Creation for existing topics if topic configs don't match (#52) + ## [0.4.1] - SNAPSHOT ### Changed - Adding un-annotated `streamName` path param to streamUpsert HTTP resource (#69) - Updated pom.xml to remove unused retrofit library, and clean up some versioning (#70) -- Stop Stream Creation for existing topics if topic configs don't match (#52) ### Removed - Deleted TODO/Documentation that referenced incorrect `http://localhost:8081/healthcheck` (#64)