diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index a3aa54a9e1756..6233a835997cf 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -307,7 +307,6 @@ The Apache Software License, Version 2.0
- io.netty-netty-transport-4.1.100.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar
- - io.netty-netty-transport-native-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar
@@ -480,9 +479,9 @@ The Apache Software License, Version 2.0
- io.vertx-vertx-web-common-4.3.8.jar
- io.vertx-vertx-grpc-4.3.5.jar
* Apache ZooKeeper
- - org.apache.zookeeper-zookeeper-3.8.3.jar
- - org.apache.zookeeper-zookeeper-jute-3.8.3.jar
- - org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.3.jar
+ - org.apache.zookeeper-zookeeper-3.9.1.jar
+ - org.apache.zookeeper-zookeeper-jute-3.9.1.jar
+ - org.apache.zookeeper-zookeeper-prometheus-metrics-3.9.1.jar
* Snappy Java
- org.xerial.snappy-snappy-java-1.1.10.5.jar
* Google HTTP Client
diff --git a/pom.xml b/pom.xml
index 1519e8e16d3ba..ea3e1e3dc9ace 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,7 +134,7 @@ flexible messaging model and an intuitive client API.
1.21
4.16.3
- 3.8.3
+ 3.9.1
1.5.0
1.10.0
1.1.10.5
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
index 726f5ae312d19..33034ddb3fe0f 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java
@@ -19,39 +19,33 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertTrue;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.net.InetSocketAddress;
+import java.lang.reflect.Field;
import java.net.Socket;
-
-import java.nio.charset.StandardCharsets;
-
+import java.util.Properties;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.ContainerManager;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.embedded.ExitHandler;
+import org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded;
import org.assertj.core.util.Files;
@Slf4j
public class TestZKServer implements AutoCloseable {
+
public static final int TICK_TIME = 1000;
- protected ZooKeeperServer zks;
- private final File zkDataDir;
- private ServerCnxnFactory serverFactory;
- private ContainerManager containerManager;
- private int zkPort = 0;
+ private final File zkDataDir;
+ private int zkPort; // initially this is zero
+ private ZooKeeperServerEmbedded zooKeeperServerEmbedded;
public TestZKServer() throws Exception {
this.zkDataDir = Files.newTemporaryFolder();
@@ -64,86 +58,86 @@ public TestZKServer() throws Exception {
}
public void start() throws Exception {
- this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, TICK_TIME);
- this.zks.setMaxSessionTimeout(300_000);
- this.serverFactory = new NIOServerCnxnFactory();
- this.serverFactory.configure(new InetSocketAddress(zkPort), 1000);
- this.serverFactory.startup(zks, true);
-
- this.zkPort = serverFactory.getLocalPort();
- log.info("Started test ZK server on port {}", zkPort);
+ final Properties configZookeeper = new Properties();
+ configZookeeper.put("clientPort", zkPort + "");
+ configZookeeper.put("host", "127.0.0.1");
+ configZookeeper.put("ticktime", TICK_TIME + "");
+ zooKeeperServerEmbedded = ZooKeeperServerEmbedded
+ .builder()
+ .baseDir(zkDataDir.toPath())
+ .configuration(configZookeeper)
+ .exitHandler(ExitHandler.LOG_ONLY)
+ .build();
+
+ zooKeeperServerEmbedded.start(60_000);
+ log.info("Started test ZK server on at {}", zooKeeperServerEmbedded.getConnectionString());
+
+ ZooKeeperServerMain zooKeeperServerMain = getZooKeeperServerMain(zooKeeperServerEmbedded);
+ ServerCnxnFactory serverCnxnFactory = getServerCnxnFactory(zooKeeperServerMain);
+ // save the port, in order to allow restarting on the same port
+ zkPort = serverCnxnFactory.getLocalPort();
boolean zkServerReady = waitForServerUp(this.getConnectionString(), 30_000);
assertTrue(zkServerReady);
+ }
- this.containerManager = new ContainerManager(zks.getZKDatabase(), new RequestProcessor() {
- @Override
- public void processRequest(Request request) throws RequestProcessorException {
- String path = StandardCharsets.UTF_8.decode(request.request).toString();
- try {
- zks.getZKDatabase().getDataTree().deleteNode(path, -1);
- } catch (KeeperException.NoNodeException e) {
- // Ok
- }
- }
+ @SneakyThrows
+ private static ZooKeeperServerMain getZooKeeperServerMain(ZooKeeperServerEmbedded zooKeeperServerEmbedded) {
+ ZooKeeperServerMain zooKeeperServerMain = readField(zooKeeperServerEmbedded.getClass(),
+ "mainsingle", zooKeeperServerEmbedded);
+ return zooKeeperServerMain;
+ }
- @Override
- public void shutdown() {
+ @SneakyThrows
+ private static ContainerManager getContainerManager(ZooKeeperServerMain zooKeeperServerMain) {
+ ContainerManager containerManager = readField(ZooKeeperServerMain.class, "containerManager", zooKeeperServerMain);
+ return containerManager;
+ }
- }
- }, 10, 10000, 0L);
+ @SneakyThrows
+ private static ZooKeeperServer getZooKeeperServer(ZooKeeperServerMain zooKeeperServerMain) {
+ ServerCnxnFactory serverCnxnFactory = getServerCnxnFactory(zooKeeperServerMain);
+ ZooKeeperServer zkServer = readField(ServerCnxnFactory.class, "zkServer", serverCnxnFactory);
+ return zkServer;
+ }
+
+ @SneakyThrows
+ private static T readField(Class clazz, String field, Object object) {
+ Field declaredField = clazz.getDeclaredField(field);
+ boolean accessible = declaredField.isAccessible();
+ if (!accessible) {
+ declaredField.setAccessible(true);
+ }
+ try {
+ return (T) declaredField.get(object);
+ } finally {
+ declaredField.setAccessible(accessible);
+ }
+ }
+
+ private static ServerCnxnFactory getServerCnxnFactory(ZooKeeperServerMain zooKeeperServerMain) throws Exception {
+ ServerCnxnFactory serverCnxnFactory = readField(ZooKeeperServerMain.class, "cnxnFactory", zooKeeperServerMain);
+ return serverCnxnFactory;
}
public void checkContainers() throws Exception {
// Make sure the container nodes are actually deleted
Thread.sleep(1000);
+ ContainerManager containerManager = getContainerManager(getZooKeeperServerMain(zooKeeperServerEmbedded));
containerManager.checkContainers();
}
public void stop() throws Exception {
- if (containerManager != null) {
- containerManager.stop();
- containerManager = null;
- }
-
- if (serverFactory != null) {
- serverFactory.shutdown();
- serverFactory = null;
- }
-
- if (zks != null) {
- SessionTracker sessionTracker = zks.getSessionTracker();
- zks.shutdown();
- zks.getZKDatabase().close();
- if (sessionTracker instanceof Thread) {
- Thread sessionTrackerThread = (Thread) sessionTracker;
- sessionTrackerThread.interrupt();
- sessionTrackerThread.join();
- }
- zks = null;
+ if (zooKeeperServerEmbedded != null) {
+ zooKeeperServerEmbedded.close();
}
-
log.info("Stopped test ZK server");
}
public void expireSession(long sessionId) {
- zks.expire(new SessionTracker.Session() {
- @Override
- public long getSessionId() {
- return sessionId;
- }
-
- @Override
- public int getTimeout() {
- return 10_000;
- }
-
- @Override
- public boolean isClosing() {
- return false;
- }
- });
+ getZooKeeperServer(getZooKeeperServerMain(zooKeeperServerEmbedded))
+ .expire(sessionId);
}
@Override
@@ -152,12 +146,9 @@ public void close() throws Exception {
FileUtils.deleteDirectory(zkDataDir);
}
- public int getPort() {
- return zkPort;
- }
-
+ @SneakyThrows
public String getConnectionString() {
- return "127.0.0.1:" + getPort();
+ return zooKeeperServerEmbedded.getConnectionString();
}
public static boolean waitForServerUp(String hp, long timeout) {
diff --git a/pulsar-package-management/bookkeeper-storage/pom.xml b/pulsar-package-management/bookkeeper-storage/pom.xml
index 0beedab88cae2..b6ce6cac887be 100644
--- a/pulsar-package-management/bookkeeper-storage/pom.xml
+++ b/pulsar-package-management/bookkeeper-storage/pom.xml
@@ -71,6 +71,12 @@
+
+ org.hamcrest
+ hamcrest
+ test
+
+
io.dropwizard.metrics
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e9dcabd5bb32c..49f4b38a1361c 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -472,8 +472,8 @@ The Apache Software License, Version 2.0
- memory-0.8.3.jar
- sketches-core-0.8.3.jar
* Apache Zookeeper
- - zookeeper-3.8.3.jar
- - zookeeper-jute-3.8.3.jar
+ - zookeeper-3.9.1.jar
+ - zookeeper-jute-3.9.1.jar
* Apache Yetus Audience Annotations
- audience-annotations-0.12.0.jar
* Perfmark