Skip to content

Commit

Permalink
revert retryOnConnLoss changes
Browse files Browse the repository at this point in the history
  • Loading branch information
risdenk committed Oct 11, 2023
1 parent b9ec384 commit 44399c7
Show file tree
Hide file tree
Showing 136 changed files with 919 additions and 684 deletions.
11 changes: 6 additions & 5 deletions solr/core/src/java/org/apache/solr/cli/AuthTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ private int handleKerberos(CommandLine cli) throws Exception {
.withTimeout(
SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS)
.build()) {
zkClient.setData("/security.json", securityJson.getBytes(StandardCharsets.UTF_8));
zkClient.setData(
"/security.json", securityJson.getBytes(StandardCharsets.UTF_8), true);
} catch (Exception ex) {
CLIO.out(
"Unable to access ZooKeeper. Please add the following security.json to ZooKeeper (in case of SolrCloud):\n"
Expand Down Expand Up @@ -383,7 +384,7 @@ private int handleBasicAuth(CommandLine cli) throws Exception {
.withUrl(zkHost)
.withTimeout(SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS)
.build()) {
zkClient.setData("/security.json", securityJson.getBytes(StandardCharsets.UTF_8));
zkClient.setData("/security.json", securityJson.getBytes(StandardCharsets.UTF_8), true);
}
}

Expand Down Expand Up @@ -451,8 +452,8 @@ private int handleBasicAuth(CommandLine cli) throws Exception {

private void checkSecurityJsonExists(SolrZkClient zkClient)
throws KeeperException, InterruptedException {
if (zkClient.exists("/security.json")) {
byte[] oldSecurityBytes = zkClient.getData("/security.json", null, null);
if (zkClient.exists("/security.json", true)) {
byte[] oldSecurityBytes = zkClient.getData("/security.json", null, null, true);
if (!"{}".equals(new String(oldSecurityBytes, StandardCharsets.UTF_8).trim())) {
CLIO.out(
"Security is already enabled. You can disable it with 'bin/solr auth disable'. Existing security.json: \n"
Expand All @@ -478,7 +479,7 @@ private void clearSecurityJson(CommandLine cli, boolean updateIncludeFileOnly) t
.withUrl(zkHost)
.withTimeout(SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS)
.build()) {
zkClient.setData("/security.json", "{}".getBytes(StandardCharsets.UTF_8));
zkClient.setData("/security.json", "{}".getBytes(StandardCharsets.UTF_8), true);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion solr/core/src/java/org/apache/solr/cli/CreateTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ protected void createCollection(CloudSolrClient cloudSolrClient, CommandLine cli
boolean configExistsInZk =
confName != null
&& !confName.trim().isEmpty()
&& ZkStateReader.from(cloudSolrClient).getZkClient().exists("/configs/" + confName);
&& ZkStateReader.from(cloudSolrClient)
.getZkClient()
.exists("/configs/" + confName, true);

if (CollectionAdminParams.SYSTEM_COLL.equals(collectionName)) {
// do nothing
Expand Down
2 changes: 1 addition & 1 deletion solr/core/src/java/org/apache/solr/cli/ZkRmTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void runImpl(CommandLine cli) throws Exception {
.withUrl(zkHost)
.withTimeout(SolrZkClientTimeout.DEFAULT_ZK_CLIENT_TIMEOUT, TimeUnit.MILLISECONDS)
.build()) {
if (!recurse && zkClient.getChildren(znode, null).size() != 0) {
if (!recurse && zkClient.getChildren(znode, null, true).size() != 0) {
throw new SolrServerException(
"ZooKeeper node " + znode + " has children and recurse has NOT been specified.");
}
Expand Down
4 changes: 2 additions & 2 deletions solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ public static String unifiedResourcePath(SolrResourceLoader loader) {
public static Map<String, byte[]> getTrustedKeys(SolrZkClient zk, String dir) {
Map<String, byte[]> result = new HashMap<>();
try {
List<String> children = zk.getChildren("/keys/" + dir, null);
List<String> children = zk.getChildren("/keys/" + dir, null, true);
for (String key : children) {
if (key.endsWith(".der"))
result.put(key, zk.getData("/keys/" + dir + "/" + key, null, null));
result.put(key, zk.getData("/keys/" + dir + "/" + key, null, null, true));
}
} catch (KeeperException.NoNodeException e) {
log.info("Error fetching key names");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ enum State {
this.rootNodePath = rootNodePath;

try {
if (!zkClient.exists(rootNodePath)) {
zkClient.makePath(rootNodePath, new byte[0], CreateMode.PERSISTENT);
if (!zkClient.exists(rootNodePath, true)) {
zkClient.makePath(rootNodePath, new byte[0], CreateMode.PERSISTENT, true);
}
} catch (KeeperException.NodeExistsException nee) {
// Some other thread (on this or another JVM) beat us to create the node, that's ok, the
Expand All @@ -329,25 +329,27 @@ void createNewInFlightTask(String asyncId) throws KeeperException, InterruptedEx
zkClient.create(
getPath(asyncId),
State.SUBMITTED.shorthand.getBytes(StandardCharsets.UTF_8),
CreateMode.EPHEMERAL);
CreateMode.EPHEMERAL,
true);
}

void setTaskRunning(String asyncId) throws KeeperException, InterruptedException {
zkClient.setData(getPath(asyncId), State.RUNNING.shorthand.getBytes(StandardCharsets.UTF_8));
zkClient.setData(
getPath(asyncId), State.RUNNING.shorthand.getBytes(StandardCharsets.UTF_8), true);
}

void deleteInFlightTask(String asyncId) throws KeeperException, InterruptedException {
zkClient.delete(getPath(asyncId), -1);
zkClient.delete(getPath(asyncId), -1, true);
}

State getInFlightState(String asyncId) throws KeeperException, InterruptedException {
if (!zkClient.exists(getPath(asyncId))) {
if (!zkClient.exists(getPath(asyncId), true)) {
return State.NOT_FOUND;
}

final byte[] bytes;
try {
bytes = zkClient.getData(getPath(asyncId), null, null);
bytes = zkClient.getData(getPath(asyncId), null, null, true);
} catch (KeeperException.NoNodeException nne) {
// Unlikely race, but not impossible...
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ private void doStateDotJsonCasUpdate(ClusterState updatedState)
if (updater.isCollectionCreation()) {
// The state.json file does not exist yet (more precisely it is assumed not to exist)
log.debug("going to create collection {}", jsonPath);
zkStateReader.getZkClient().create(jsonPath, stateJson, CreateMode.PERSISTENT);
zkStateReader.getZkClient().create(jsonPath, stateJson, CreateMode.PERSISTENT, true);
} else {
// We're updating an existing state.json
if (log.isDebugEnabled()) {
Expand All @@ -607,7 +607,9 @@ private void doStateDotJsonCasUpdate(ClusterState updatedState)
jsonPath,
collection.getZNodeVersion());
}
zkStateReader.getZkClient().setData(jsonPath, stateJson, collection.getZNodeVersion());
zkStateReader
.getZkClient()
.setData(jsonPath, stateJson, collection.getZNodeVersion(), true);
}
}
}
Expand All @@ -621,7 +623,7 @@ private void doStateDotJsonCasUpdate(ClusterState updatedState)
private ClusterState fetchStateForCollection() throws KeeperException, InterruptedException {
String collectionStatePath = DocCollection.getCollectionPath(updater.getCollectionName());
Stat stat = new Stat();
byte[] data = zkStateReader.getZkClient().getData(collectionStatePath, null, stat);
byte[] data = zkStateReader.getZkClient().getData(collectionStatePath, null, stat, true);

// This factory method can detect a missing configName and supply it by reading it from the
// old ZK location.
Expand Down Expand Up @@ -931,7 +933,7 @@ public static void executeNodeDownStateUpdate(String nodeName, ZkStateReader zkS

try {
final List<String> collectionNames =
zkStateReader.getZkClient().getChildren(COLLECTIONS_ZKNODE, null);
zkStateReader.getZkClient().getChildren(COLLECTIONS_ZKNODE, null, true);

// Collections are totally independent of each other. Multiple threads could share the load
// here (need a ZK connection for each though).
Expand Down
20 changes: 11 additions & 9 deletions solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private void assertKeyFormat(String trackingId) {

public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
assertKeyFormat(trackingId);
zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, false);
zookeeper.makePath(
dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, false, true);
}

/**
Expand All @@ -80,24 +81,25 @@ public boolean putIfAbsent(String trackingId, byte[] data)
throws KeeperException, InterruptedException {
assertKeyFormat(trackingId);
try {
zookeeper.makePath(dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, true);
zookeeper.makePath(
dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, true, true);
return true;
} catch (NodeExistsException e) {
return false;
}
}

public byte[] get(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null);
return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null, true);
}

public boolean contains(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.exists(dir + "/" + PREFIX + trackingId);
return zookeeper.exists(dir + "/" + PREFIX + trackingId, true);
}

public int size() throws KeeperException, InterruptedException {
Stat stat = new Stat();
zookeeper.getData(dir, null, stat);
zookeeper.getData(dir, null, stat, true);
return stat.getNumChildren();
}

Expand All @@ -108,7 +110,7 @@ public int size() throws KeeperException, InterruptedException {
public boolean remove(String trackingId) throws KeeperException, InterruptedException {
final var path = dir + "/" + PREFIX + trackingId;
try {
zookeeper.delete(path, -1);
zookeeper.delete(path, -1, true);
} catch (KeeperException.NoNodeException e) {
return false;
} catch (KeeperException.NotEmptyException hack) {
Expand All @@ -121,15 +123,15 @@ public boolean remove(String trackingId) throws KeeperException, InterruptedExce

/** Helper method to clear all child nodes for a parent node. */
public void clear() throws KeeperException, InterruptedException {
List<String> childNames = zookeeper.getChildren(dir, null);
List<String> childNames = zookeeper.getChildren(dir, null, true);
for (String childName : childNames) {
zookeeper.delete(dir + "/" + childName, -1);
zookeeper.delete(dir + "/" + childName, -1, true);
}
}

/** Returns the keys of all the elements in the map */
public Collection<String> keys() throws KeeperException, InterruptedException {
List<String> childs = zookeeper.getChildren(dir, null);
List<String> childs = zookeeper.getChildren(dir, null, true);
final List<String> ids = new ArrayList<>(childs.size());
childs.stream().forEach((child) -> ids.add(child.substring(PREFIX.length())));
return ids;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void cancelElection() throws InterruptedException, KeeperException {
if (leaderSeqPath != null) {
try {
log.debug("Canceling election {}", leaderSeqPath);
zkClient.delete(leaderSeqPath, -1);
zkClient.delete(leaderSeqPath, -1, true);
} catch (NoNodeException e) {
// fine
log.debug("cancelElection did not find election node to remove {}", leaderSeqPath);
Expand Down
26 changes: 17 additions & 9 deletions solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement
context.checkIfIamLeaderFired();
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs = zkClient.getChildren(holdElectionPath, null);
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
sortSeqs(seqs);

String leaderSeqNodeName =
Expand All @@ -114,7 +114,7 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement
try {
String toDelete = holdElectionPath + "/" + elec;
log.warn("Deleting duplicate registration: {}", toDelete);
zkClient.delete(toDelete, -1);
zkClient.delete(toDelete, -1, true);
} catch (KeeperException.NoNodeException e) {
// ignore
}
Expand Down Expand Up @@ -148,7 +148,8 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement
watcher =
new ElectionWatcher(
context.leaderSeqPath, watchedNode, getSeq(context.leaderSeqPath), context),
null);
null,
true);
log.debug("Watching path {} to know if I could be the leader", watchedNode);
} catch (KeeperException.SessionExpiredException e) {
throw e;
Expand Down Expand Up @@ -239,7 +240,10 @@ public int joinElection(ElectionContext context, boolean replacement, boolean jo
if (nodes.size() < 2) {
leaderSeqPath =
zkClient.create(
shardsElectZkPath + "/" + id + "-n_", null, CreateMode.EPHEMERAL_SEQUENTIAL);
shardsElectZkPath + "/" + id + "-n_",
null,
CreateMode.EPHEMERAL_SEQUENTIAL,
false);
} else {
String firstInLine = nodes.get(1);
log.debug("The current head: {}", firstInLine);
Expand All @@ -248,20 +252,23 @@ public int joinElection(ElectionContext context, boolean replacement, boolean jo
throw new IllegalStateException("Could not find regex match in:" + firstInLine);
}
leaderSeqPath = shardsElectZkPath + "/" + id + "-n_" + m.group(1);
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL);
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
}
} else {
leaderSeqPath =
zkClient.create(
shardsElectZkPath + "/" + id + "-n_", null, CreateMode.EPHEMERAL_SEQUENTIAL);
shardsElectZkPath + "/" + id + "-n_",
null,
CreateMode.EPHEMERAL_SEQUENTIAL,
false);
}

log.debug("Joined leadership election with path: {}", leaderSeqPath);
context.leaderSeqPath = leaderSeqPath;
cont = false;
} catch (ConnectionLossException e) {
// we don't know if we made our node or not...
List<String> entries = zkClient.getChildren(shardsElectZkPath, null);
List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);

boolean foundId = false;
for (String entry : entries) {
Expand Down Expand Up @@ -329,7 +336,7 @@ public void process(WatchedEvent event) {
if (canceled) {
log.debug("This watcher is not active anymore {}", myNode);
try {
zkClient.delete(myNode, -1);
zkClient.delete(myNode, -1, true);
} catch (KeeperException.NoNodeException nne) {
// expected . don't do anything
} catch (Exception e) {
Expand Down Expand Up @@ -357,7 +364,8 @@ public void setup(final ElectionContext context) throws InterruptedException, Ke
ZkMaintenanceUtils.ensureExists(electZKPath, zkClient);
} else {
// we use 2 param so that replica won't create /collection/{collection} if it doesn't exist
ZkMaintenanceUtils.ensureExists(electZKPath, null, CreateMode.PERSISTENT, zkClient, 2);
ZkMaintenanceUtils.ensureExists(
electZKPath, (byte[]) null, CreateMode.PERSISTENT, zkClient, 2);
}

this.context = context;
Expand Down
8 changes: 4 additions & 4 deletions solr/core/src/java/org/apache/solr/cloud/Overseer.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ private void checkIfIamStillLeader() {
final String path = OVERSEER_ELECT + "/leader";
byte[] data;
try {
data = zkClient.getData(path, null, stat);
data = zkClient.getData(path, null, stat, true);
} catch (IllegalStateException e) {
return;
} catch (Exception e) {
Expand All @@ -494,7 +494,7 @@ private void checkIfIamStillLeader() {
log.warn(
"I (id={}) am exiting, but I'm still the leader",
overseerCollectionConfigSetProcessor.getId());
zkClient.delete(path, stat.getVersion());
zkClient.delete(path, stat.getVersion(), true);
} catch (KeeperException.BadVersionException e) {
// no problem ignore it some other Overseer has already taken over
} catch (Exception e) {
Expand Down Expand Up @@ -616,7 +616,7 @@ private LeaderStatus amILeader() {
String propsId = null;
try {
ZkNodeProps props =
ZkNodeProps.load(zkClient.getData(OVERSEER_ELECT + "/leader", null, null));
ZkNodeProps.load(zkClient.getData(OVERSEER_ELECT + "/leader", null, null, true));
propsId = props.getStr(ID);
if (myId.equals(propsId)) {
return LeaderStatus.YES;
Expand Down Expand Up @@ -1184,7 +1184,7 @@ OverseerTaskQueue getConfigSetQueue(final SolrZkClient zkClient, Stats zkStats)

private void createOverseerNode(final SolrZkClient zkClient) {
try {
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT);
zkClient.create("/overseer", new byte[0], CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException e) {
// ok
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
final String id = leaderSeqPath.substring(leaderSeqPath.lastIndexOf('/') + 1);
ZkNodeProps myProps = new ZkNodeProps(ID, id);

zkClient.makePath(leaderPath, Utils.toJSON(myProps), CreateMode.EPHEMERAL);
zkClient.makePath(leaderPath, Utils.toJSON(myProps), CreateMode.EPHEMERAL, true);
if (pauseBeforeStartMs > 0) {
try {
Thread.sleep(pauseBeforeStartMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public OverseerNodePrioritizer(
public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
SolrZkClient zk = zkStateReader.getZkClient();
List<String> overseerDesignates = new ArrayList<>();
if (zk.exists(ZkStateReader.ROLES)) {
Map<?, ?> m = (Map<?, ?>) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat()));
if (zk.exists(ZkStateReader.ROLES, true)) {
Map<?, ?> m =
(Map<?, ?>) Utils.fromJSON(zk.getData(ZkStateReader.ROLES, null, new Stat(), true));
@SuppressWarnings("unchecked")
List<String> l = (List<String>) m.get("overseer");
if (l != null) {
Expand Down
Loading

0 comments on commit 44399c7

Please sign in to comment.