Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop Stream Creation for existing topics if configs don't match #52

Merged
merged 18 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Added
- Schema validation support via `SchemaManager` interface with default Confluent implementation provided (#41)

### Changed
- Stop Stream Creation for existing topics if topic configs don't match

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is being targetted for 0.4.2

## [0.3.2] - 20181216
### Changed
- Updated README to something that outlines this a bit better. (#54)
Expand Down
26 changes: 26 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,32 @@
<artifactId>kafka-schema-registry</artifactId>
<scope>test</scope>
</dependency>
<dependency>
neoword marked this conversation as resolved.
Show resolved Hide resolved
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
// TODO Why do we have imperative operations here in stream registry? Does this limit composability? Can we reduce scope?
public interface KafkaManager {

void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties topicConfig);
void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties topicConfig, boolean isNewStream);
neoword marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Collectors;

import kafka.admin.AdminUtils;
import kafka.server.ConfigType;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -42,10 +43,13 @@

import com.homeaway.streamingplatform.configuration.KafkaProducerConfig;
import com.homeaway.streamingplatform.db.dao.KafkaManager;
import com.homeaway.streamingplatform.exceptions.StreamCreationException;

@Slf4j
public class KafkaManagerImpl implements KafkaManager {

public List<String> topicsToCreate;

/**
* Create and/or Update Topics using AdminClient and AdminUtils
*
Expand All @@ -54,7 +58,7 @@ public class KafkaManagerImpl implements KafkaManager {
* @param replicationFactor replication for each topic that will be created
* @param properties properties that will be set on each topic in the list
*/
public void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties properties) {
public void upsertTopics(Collection<String> topics, int partitions, int replicationFactor, Properties properties, boolean isNewStream) {
neoword marked this conversation as resolved.
Show resolved Hide resolved
// remove client connection properties to leave only topic configs
Map<String, String> topicConfigMap = new HashMap<>(properties.entrySet()
.stream()
Expand All @@ -69,9 +73,22 @@ public void upsertTopics(Collection<String> topics, int partitions, int replicat
ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);

try {
List<String> topicsToCreate = new ArrayList<>();
topicsToCreate = new ArrayList<>();
neoword marked this conversation as resolved.
Show resolved Hide resolved
for (String topic : topics) {
if (AdminUtils.topicExists(zkUtils, topic)) {
//TODO Pass the Boolean if it is a New Stream
//TODO Read the existing topics config map and compare sand if its a new stream throw CreateStreamException if it doesn't match
Properties actualTopicConfig = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic);
Map<String, String> actualTopicConfigMap = actualTopicConfig
.entrySet()
.stream()
.collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString()));

// If a Stream which is created newly in Stream Registry is already present in the underlying streaming infrastructure
// then compare the configs and fail completely if its doesn't match
if(isNewStream && !actualTopicConfigMap.equals(topicConfigMap)) {
neoword marked this conversation as resolved.
Show resolved Hide resolved
throw new StreamCreationException(topic);
}
updateTopic(zkUtils, topic, topicConfigMap);
} else {
topicsToCreate.add(topic);
Expand Down Expand Up @@ -99,7 +116,7 @@ private void updateTopic(ZkUtils zkUtils, String topic, Map<String, String> conf
// TODO need to check if topic exists instead of relying on exception path or just create one since check already occurred above
// TODO Timeout exception needs to propagate and not be handled here
// TODO Interrupted Exception also needs to propagate and not be handled here
private void createTopics(Collection<String> topics, int partitions, int replicationFactor, Properties adminClientProperties, Map<String, String> topicConfigMap) {
protected void createTopics(Collection<String> topics, int partitions, int replicationFactor, Properties adminClientProperties, Map<String, String> topicConfigMap) {
try (AdminClient adminClient = AdminClient.create(adminClientProperties)) {
List<NewTopic> newTopicList = topics.stream()
.map(topic -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void upsertStream(Stream stream) {
SchemaReference keyReference = schemaManager.registerSchema(keySubject, stream.getLatestKeySchema().getSchemaString());
stream.getLatestKeySchema().setId(String.valueOf(keyReference.getId()));
stream.getLatestKeySchema().setVersion(keyReference.getVersion());
boolean isNewStream = false;

String valueSubject = stream.getName() + "-value";
SchemaReference valueReference = schemaManager.registerSchema(valueSubject, stream.getLatestValueSchema().getSchemaString());
Expand Down Expand Up @@ -168,11 +169,12 @@ public void upsertStream(Stream stream) {
avroStream.setS3ConnectorList(value.get().getS3ConnectorList());
} else {
log.info("key NOT available for the stream-name={}", stream.getName());
isNewStream = true;
avroStream.setCreated(System.currentTimeMillis());
key = AvroStreamKey.newBuilder().setStreamName(avroStream.getName()).build();
}

verifyAndUpsertTopics(stream);
verifyAndUpsertTopics(stream, isNewStream);
kafkaProducer.log(key, avroStream);
log.info("Stream upserted for {}", stream.getName());
} catch (Exception e) {
Expand Down Expand Up @@ -208,18 +210,18 @@ private void applyDefaultPartition(Stream stream) {
*
* @param stream the stream that will be used to verify and/or upsert topics to
*/
private void verifyAndUpsertTopics(Stream stream) {
private void verifyAndUpsertTopics(Stream stream, boolean isNewStream) {
List<String> vpcList = stream.getVpcList();
List<String> replicatedVpcList = stream.getReplicatedVpcList();
try {
log.info("creating topics for vpcList: {}", vpcList);
for (String vpc : vpcList) {
upsertTopics(stream, vpc);
upsertTopics(stream, vpc, isNewStream);
}
if (replicatedVpcList != null && !replicatedVpcList.isEmpty()) {
log.info("creating topics for replicatedVpcList: {}", replicatedVpcList);
for (String vpc : replicatedVpcList) {
upsertTopics(stream, vpc);
upsertTopics(stream, vpc, isNewStream);
}
}
} catch (Exception e) {
Expand All @@ -228,7 +230,7 @@ private void verifyAndUpsertTopics(Stream stream) {
}
}

private void upsertTopics(Stream stream, String vpc) {
private void upsertTopics(Stream stream, String vpc, boolean isNewStream) {
ClusterValue clusterValue = getClusterDetails(vpc, env, stream.getTags().getHint(), "producer");
Properties topicConfig = new Properties();
if (stream.getTopicConfig() != null) {
Expand All @@ -240,7 +242,7 @@ private void upsertTopics(Stream stream, String vpc) {
topicConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
topicConfig.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, zkConnect);

kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig);
kafkaManager.upsertTopics(Collections.singleton(stream.getName()), stream.getPartitions(), stream.getReplicationFactor(), topicConfig, isNewStream);
log.info("Topic {} created/updated at {}", stream.getName(), bootstrapServer);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/* Copyright (c) 2018 Expedia Group.
* All rights reserved. http://www.homeaway.com

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at

* http://www.apache.org/licenses/LICENSE-2.0

* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.homeaway.streamingplatform.db.dao.impl;

import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.whenNew;

import java.util.Collections;
import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import com.homeaway.streamingplatform.configuration.KafkaProducerConfig;
import com.homeaway.streamingplatform.exceptions.StreamCreationException;

@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaManagerImpl.class, AdminUtils.class})
public class KafkaManagerImplTest {

private static final String topic = "kafka-manager-test";
private static final int partitions = 2;
private static final int replicationFactor = 3;
private static Properties props = new Properties();

@Mock private ZkUtils zkUtils;
@Mock private ZkClient zkClient;
@Mock private ZkConnection zkConnection;

@InjectMocks
private KafkaManagerImpl kafkaManager;

@Rule public MockitoRule mockitoRule = MockitoJUnit.rule();

@Before
public void setup() throws Exception{
props.put("key1", "val1");
props.put("key2", 2);
props.put(KafkaProducerConfig.ZOOKEEPER_QUORUM, "");

kafkaManager = new KafkaManagerImpl();

mockStatic(AdminUtils.class);

whenNew(ZkClient.class).withAnyArguments().thenReturn(zkClient);
whenNew(ZkConnection.class).withArguments(Mockito.anyString()).thenReturn(zkConnection);
whenNew(ZkUtils.class).withAnyArguments().thenReturn(zkUtils);

when(AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topic)).thenReturn(props);
}

@Test(expected = StreamCreationException.class)
public void testUpsertTopicsForNewStream(){
// Mock it as an existing topic
when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(true);

//New Stream
kafkaManager.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, true);
}

@Test
public void testUpsertTopicsForExistingStream() {
// Mock it as an existing topic
when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(true);

KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager);
doNothing().doThrow(new RuntimeException()).when(kafkaManagerSpy).createTopics(Mockito.anyCollection(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.anyMap());

//Existing Stream
kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false);

//Assert if 0 topic is added to the list to be created
Assert.assertEquals(0,kafkaManagerSpy.topicsToCreate.size());
}


@Test
public void testUpsertTopicsForNewTopic() {
// Mock it as a new topic
when(AdminUtils.topicExists(zkUtils, topic)).thenReturn(false);

KafkaManagerImpl kafkaManagerSpy = spy(kafkaManager);
doNothing().doThrow(new RuntimeException()).when(kafkaManagerSpy).createTopics(Mockito.anyCollection(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(), Mockito.anyMap());

//Existing Stream
kafkaManagerSpy.upsertTopics(Collections.singleton(topic), partitions, replicationFactor, props, false);

//Assert if 1 topic is added to the list to be created
Assert.assertEquals(1, kafkaManagerSpy.topicsToCreate.size());
}
}
34 changes: 34 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<junit.version>4.12</junit.version>
<hamcrest.version>1.3</hamcrest.version>
<mockito.version>2.18.3</mockito.version>
<powermock.version>2.0.0-beta.5</powermock.version>
<curator-test.version>2.9.0</curator-test.version>

<!-- Utilities -->
Expand Down Expand Up @@ -417,6 +418,37 @@
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-core</artifactId>
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy-agent</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>

</dependencies>

</dependencyManagement>
Expand Down Expand Up @@ -568,6 +600,8 @@
<ignoredUnusedDeclaredDependency>org.hamcrest:hamcrest-core:jar:${hamcrest.version}</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>org.slf4j:slf4j-api:jar:${slf4j.version}</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>io.dropwizard:dropwizard-testing:jar:1.3.5</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>net.bytebuddy:byte-buddy:jar</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>net.bytebuddy:byte-buddy-agent</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
</configuration>
</execution>
Expand Down