Skip to content

Commit

Permalink
chore: improve KafkaBackend integration test (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
sverrehu authored Nov 12, 2024
1 parent 14fa53f commit 819d565
Showing 1 changed file with 31 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import com.purbon.kafka.topology.integration.containerutils.ContainerTestUtils;
import com.purbon.kafka.topology.integration.containerutils.SaslPlaintextKafkaContainer;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -59,32 +56,47 @@ public void after() {
@Test
public void testExpectedFlow() throws InterruptedException {

TopologyAclBinding binding =
TopologyAclBinding binding1 =
TopologyAclBinding.build(
ResourceType.TOPIC.name(), "foo", "*", "Write", "User:foo", "LITERAL");
TopologyAclBinding binding2 =
TopologyAclBinding.build(
ResourceType.TOPIC.name(), "bar", "*", "Read", "User:bar", "LITERAL");
Collection<TopologyAclBinding> bindings1 = Collections.singleton(binding1);
Collection<TopologyAclBinding> bindings2 = Arrays.asList(binding1, binding2);
Collection<TopologyAclBinding> bindings3 = Collections.singleton(binding2);

BackendState state = new BackendState();
state.addBindings(Collections.singleton(binding));
saveBindings(bindings1);
loadAndVerifyBindings(bindings1);

saveBindings(bindings2);
loadAndVerifyBindings(bindings2);

saveBindings(bindings3);
loadAndVerifyBindings(bindings3);
}

private void saveBindings(Collection<TopologyAclBinding> bindings) {
BackendState state = new BackendState();
state.addBindings(bindings);
KafkaBackend backend = new KafkaBackend();
backend.configure(config);

backend.save(state);
backend.close();
}

KafkaBackend newBackend = new KafkaBackend();

private void loadAndVerifyBindings(Collection<TopologyAclBinding> expectedBindings) {
KafkaBackend backend = new KafkaBackend();
HashMap<String, String> cliOps = new HashMap<>();
cliOps.put(BROKERS_OPTION, container.getBootstrapServers());
props.put(JULIE_KAFKA_STATE_CONSUMER_GROUP_ID, "julieops" + System.currentTimeMillis());
props.put(JULIE_KAFKA_STATE_CONSUMER_GROUP_ID, "julieops-" + KafkaBackendIT.class.getName());
Configuration config = new Configuration(cliOps, props);
newBackend.configure(config);

Thread.sleep(3000);

BackendState newState = newBackend.load();
assertThat(newState.size()).isEqualTo(1);
assertThat(newState.getBindings()).contains(binding);
newBackend.close();
backend.configure(config);
BackendState state = backend.load();
assertThat(state.size()).isEqualTo(expectedBindings.size());
for (TopologyAclBinding binding : expectedBindings) {
assertThat(state.getBindings()).contains(binding);
}
backend.close();
}
}

0 comments on commit 819d565

Please sign in to comment.