Skip to content

Commit

Permalink
[improve][build] Upgrade Apache ZooKeeper to 3.9.1 (#20933)
Browse files Browse the repository at this point in the history
Co-authored-by: Lari Hotari <[email protected]>
Co-authored-by: xiangying <[email protected]>
Co-authored-by: Jiwe Guo <[email protected]>
  • Loading branch information
4 people committed Dec 4, 2023
1 parent d458b80 commit a614aad
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 87 deletions.
7 changes: 3 additions & 4 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ The Apache Software License, Version 2.0
- io.netty-netty-transport-4.1.100.Final.jar
- io.netty-netty-transport-classes-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-transport-native-epoll-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final.jar
- io.netty-netty-transport-native-unix-common-4.1.100.Final-linux-x86_64.jar
- io.netty-netty-tcnative-boringssl-static-2.0.61.Final.jar
Expand Down Expand Up @@ -480,9 +479,9 @@ The Apache Software License, Version 2.0
- io.vertx-vertx-web-common-4.3.8.jar
- io.vertx-vertx-grpc-4.3.5.jar
* Apache ZooKeeper
- org.apache.zookeeper-zookeeper-3.8.3.jar
- org.apache.zookeeper-zookeeper-jute-3.8.3.jar
- org.apache.zookeeper-zookeeper-prometheus-metrics-3.8.3.jar
- org.apache.zookeeper-zookeeper-3.9.1.jar
- org.apache.zookeeper-zookeeper-jute-3.9.1.jar
- org.apache.zookeeper-zookeeper-prometheus-metrics-3.9.1.jar
* Snappy Java
- org.xerial.snappy-snappy-java-1.1.10.5.jar
* Google HTTP Client
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ flexible messaging model and an intuitive client API.</description>
<commons-compress.version>1.21</commons-compress.version>

<bookkeeper.version>4.16.3</bookkeeper.version>
<zookeeper.version>3.8.3</zookeeper.version>
<zookeeper.version>3.9.1</zookeeper.version>
<commons-cli.version>1.5.0</commons-cli.version>
<commons-text.version>1.10.0</commons-text.version>
<snappy.version>1.1.10.5</snappy.version> <!-- ZooKeeper server -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,33 @@
package org.apache.pulsar.metadata;

import static org.testng.Assert.assertTrue;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.lang.reflect.Field;
import java.net.Socket;

import java.nio.charset.StandardCharsets;

import java.util.Properties;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.ContainerManager;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.embedded.ExitHandler;
import org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded;
import org.assertj.core.util.Files;

@Slf4j
public class TestZKServer implements AutoCloseable {

public static final int TICK_TIME = 1000;
protected ZooKeeperServer zks;
private final File zkDataDir;
private ServerCnxnFactory serverFactory;
private ContainerManager containerManager;

private int zkPort = 0;
private final File zkDataDir;
private int zkPort; // initially this is zero
private ZooKeeperServerEmbedded zooKeeperServerEmbedded;

public TestZKServer() throws Exception {
this.zkDataDir = Files.newTemporaryFolder();
Expand All @@ -64,86 +58,86 @@ public TestZKServer() throws Exception {
}

public void start() throws Exception {
this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, TICK_TIME);
this.zks.setMaxSessionTimeout(300_000);
this.serverFactory = new NIOServerCnxnFactory();
this.serverFactory.configure(new InetSocketAddress(zkPort), 1000);
this.serverFactory.startup(zks, true);

this.zkPort = serverFactory.getLocalPort();
log.info("Started test ZK server on port {}", zkPort);
final Properties configZookeeper = new Properties();
configZookeeper.put("clientPort", zkPort + "");
configZookeeper.put("host", "127.0.0.1");
configZookeeper.put("ticktime", TICK_TIME + "");
zooKeeperServerEmbedded = ZooKeeperServerEmbedded
.builder()
.baseDir(zkDataDir.toPath())
.configuration(configZookeeper)
.exitHandler(ExitHandler.LOG_ONLY)
.build();

zooKeeperServerEmbedded.start(60_000);
log.info("Started test ZK server on at {}", zooKeeperServerEmbedded.getConnectionString());

ZooKeeperServerMain zooKeeperServerMain = getZooKeeperServerMain(zooKeeperServerEmbedded);
ServerCnxnFactory serverCnxnFactory = getServerCnxnFactory(zooKeeperServerMain);
// save the port, in order to allow restarting on the same port
zkPort = serverCnxnFactory.getLocalPort();

boolean zkServerReady = waitForServerUp(this.getConnectionString(), 30_000);
assertTrue(zkServerReady);
}

this.containerManager = new ContainerManager(zks.getZKDatabase(), new RequestProcessor() {
@Override
public void processRequest(Request request) throws RequestProcessorException {
String path = StandardCharsets.UTF_8.decode(request.request).toString();
try {
zks.getZKDatabase().getDataTree().deleteNode(path, -1);
} catch (KeeperException.NoNodeException e) {
// Ok
}
}
@SneakyThrows
private static ZooKeeperServerMain getZooKeeperServerMain(ZooKeeperServerEmbedded zooKeeperServerEmbedded) {
ZooKeeperServerMain zooKeeperServerMain = readField(zooKeeperServerEmbedded.getClass(),
"mainsingle", zooKeeperServerEmbedded);
return zooKeeperServerMain;
}

@Override
public void shutdown() {
@SneakyThrows
private static ContainerManager getContainerManager(ZooKeeperServerMain zooKeeperServerMain) {
ContainerManager containerManager = readField(ZooKeeperServerMain.class, "containerManager", zooKeeperServerMain);
return containerManager;
}

}
}, 10, 10000, 0L);
@SneakyThrows
private static ZooKeeperServer getZooKeeperServer(ZooKeeperServerMain zooKeeperServerMain) {
ServerCnxnFactory serverCnxnFactory = getServerCnxnFactory(zooKeeperServerMain);
ZooKeeperServer zkServer = readField(ServerCnxnFactory.class, "zkServer", serverCnxnFactory);
return zkServer;
}

@SneakyThrows
private static <T> T readField(Class clazz, String field, Object object) {
Field declaredField = clazz.getDeclaredField(field);
boolean accessible = declaredField.isAccessible();
if (!accessible) {
declaredField.setAccessible(true);
}
try {
return (T) declaredField.get(object);
} finally {
declaredField.setAccessible(accessible);
}
}

private static ServerCnxnFactory getServerCnxnFactory(ZooKeeperServerMain zooKeeperServerMain) throws Exception {
ServerCnxnFactory serverCnxnFactory = readField(ZooKeeperServerMain.class, "cnxnFactory", zooKeeperServerMain);
return serverCnxnFactory;
}

public void checkContainers() throws Exception {
// Make sure the container nodes are actually deleted
Thread.sleep(1000);

ContainerManager containerManager = getContainerManager(getZooKeeperServerMain(zooKeeperServerEmbedded));
containerManager.checkContainers();
}

public void stop() throws Exception {
if (containerManager != null) {
containerManager.stop();
containerManager = null;
}

if (serverFactory != null) {
serverFactory.shutdown();
serverFactory = null;
}

if (zks != null) {
SessionTracker sessionTracker = zks.getSessionTracker();
zks.shutdown();
zks.getZKDatabase().close();
if (sessionTracker instanceof Thread) {
Thread sessionTrackerThread = (Thread) sessionTracker;
sessionTrackerThread.interrupt();
sessionTrackerThread.join();
}
zks = null;
if (zooKeeperServerEmbedded != null) {
zooKeeperServerEmbedded.close();
}

log.info("Stopped test ZK server");
}

public void expireSession(long sessionId) {
zks.expire(new SessionTracker.Session() {
@Override
public long getSessionId() {
return sessionId;
}

@Override
public int getTimeout() {
return 10_000;
}

@Override
public boolean isClosing() {
return false;
}
});
getZooKeeperServer(getZooKeeperServerMain(zooKeeperServerEmbedded))
.expire(sessionId);
}

@Override
Expand All @@ -152,12 +146,9 @@ public void close() throws Exception {
FileUtils.deleteDirectory(zkDataDir);
}

public int getPort() {
return zkPort;
}

@SneakyThrows
public String getConnectionString() {
return "127.0.0.1:" + getPort();
return zooKeeperServerEmbedded.getConnectionString();
}

public static boolean waitForServerUp(String hp, long timeout) {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-package-management/bookkeeper-storage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
4 changes: 2 additions & 2 deletions pulsar-sql/presto-distribution/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ The Apache Software License, Version 2.0
- memory-0.8.3.jar
- sketches-core-0.8.3.jar
* Apache Zookeeper
- zookeeper-3.8.3.jar
- zookeeper-jute-3.8.3.jar
- zookeeper-3.9.1.jar
- zookeeper-jute-3.9.1.jar
* Apache Yetus Audience Annotations
- audience-annotations-0.12.0.jar
* Perfmark
Expand Down

0 comments on commit a614aad

Please sign in to comment.