Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16116: Use apache curator to manage the Solr Zookeeper interactions #760

Merged
merged 47 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b4fbc8c
Use Curator - Step 1
HoustonPutman Mar 18, 2022
e31dea0
It compiles
HoustonPutman Mar 21, 2022
df93241
SolrJ Tests all pass
HoustonPutman Mar 22, 2022
3923bf6
Fix a lot of errors
HoustonPutman Mar 22, 2022
fbcce63
Everything works. Just some leader election issues when the client cn…
HoustonPutman Mar 23, 2022
d3949b1
Small changes.
HoustonPutman Mar 24, 2022
bb7d04a
Add-back-in path start index for leader election
HoustonPutman Mar 24, 2022
5137cb8
Change test to do stuff correctly. Add in sessionTimeout
HoustonPutman Mar 24, 2022
fc773af
Fix unrelated issue found in PR
HoustonPutman Mar 24, 2022
9cea7af
Merge branch 'main' into curator
risdenk Oct 8, 2023
539ca79
Fixes after merging main
risdenk Oct 8, 2023
b9ec384
Merge branch 'main' into curator
risdenk Oct 11, 2023
44399c7
revert retryOnConnLoss changes
risdenk Oct 11, 2023
784a9b2
Fix ZkStateWriterTest
risdenk Oct 11, 2023
5d20221
Fix VMParamsZkACLAndCredentialsProvidersTest
risdenk Oct 11, 2023
b874042
Add back solrClassLoader to solrzkclient
risdenk Oct 12, 2023
28759c9
Merge branch 'main' into curator
risdenk Oct 12, 2023
a12176b
Merge branch 'main' into curator
risdenk Oct 24, 2023
a0b449c
Merge branch 'main' into curator
risdenk Jan 9, 2024
cb33a75
Fix SolrClientCacheTest
risdenk Jan 9, 2024
b71f0c5
Merge branch 'main' into curator
risdenk Mar 11, 2024
d3bbefa
handle SOLR-16699
risdenk Mar 11, 2024
eed8450
updateLicenses
risdenk Mar 11, 2024
a419a14
Merge remote-tracking branch 'apache/main' into curator
HoustonPutman Oct 22, 2024
08804b5
Fix small bug
HoustonPutman Oct 22, 2024
1122dcc
Upgrade curator to 5.7.1
HoustonPutman Oct 22, 2024
4029224
Spotless
HoustonPutman Oct 22, 2024
93583d8
Fix some build issues
HoustonPutman Oct 22, 2024
73cd016
Some review comments
HoustonPutman Oct 22, 2024
09fc7bf
Fix makePath race condition
HoustonPutman Oct 22, 2024
e3bd66f
Fix tests
HoustonPutman Oct 23, 2024
5754bf5
Spotless
HoustonPutman Oct 23, 2024
a7bccd1
Remove method that exposes zk
HoustonPutman Oct 23, 2024
0ef5424
Some gradle dependency stuff
HoustonPutman Oct 23, 2024
6785d1d
Add a changelog entry
HoustonPutman Oct 23, 2024
927e261
Curator zk operations will wait until the connection is ready
HoustonPutman Oct 23, 2024
ea1a82a
Try treating suspended and lost the same
HoustonPutman Oct 23, 2024
99e17e3
Spotless
HoustonPutman Oct 23, 2024
5a76cd0
Fix overseer test
HoustonPutman Oct 24, 2024
31ebd14
Make thread a daemon thread, and fix test until curator fix
HoustonPutman Oct 25, 2024
a59779d
Review comments
HoustonPutman Oct 25, 2024
f92b4d1
Review comments
HoustonPutman Oct 28, 2024
6a1a41f
Some last review comments
HoustonPutman Oct 29, 2024
b084640
Make curator an api dependency again
HoustonPutman Oct 29, 2024
72d9623
Fix PRS defaulting
HoustonPutman Oct 30, 2024
c71105d
Revert "Fix PRS defaulting"
HoustonPutman Oct 30, 2024
7a12fe5
Add NoPRS to two tests
HoustonPutman Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public static String unifiedResourcePath(SolrResourceLoader loader) {
public static Map<String, byte[]> getTrustedKeys(SolrZkClient zk, String dir) {
Map<String, byte[]> result = new HashMap<>();
try {
List<String> children = zk.getChildren("/keys/" + dir, null, true);
List<String> children = zk.getChildren("/keys/" + dir, null);
for (String key : children) {
if (key.endsWith(".der"))
result.put(key, zk.getData("/keys/" + dir + "/" + key, null, null, true));
result.put(key, zk.getData("/keys/" + dir + "/" + key, null, null));
}
} catch (KeeperException.NoNodeException e) {
log.info("Error fetching key names");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ enum State {
this.rootNodePath = rootNodePath;

try {
if (!zkClient.exists(rootNodePath, true)) {
zkClient.makePath(rootNodePath, new byte[0], CreateMode.PERSISTENT, true);
if (!zkClient.exists(rootNodePath)) {
zkClient.makePath(rootNodePath, new byte[0], CreateMode.PERSISTENT);
}
} catch (KeeperException.NodeExistsException nee) {
// Some other thread (on this or another JVM) beat us to create the node, that's ok, the
Expand All @@ -329,27 +329,27 @@ void createNewInFlightTask(String asyncId) throws KeeperException, InterruptedEx
zkClient.create(
getPath(asyncId),
State.SUBMITTED.shorthand.getBytes(StandardCharsets.UTF_8),
CreateMode.EPHEMERAL,
true);
CreateMode.EPHEMERAL
);
}

void setTaskRunning(String asyncId) throws KeeperException, InterruptedException {
zkClient.setData(
getPath(asyncId), State.RUNNING.shorthand.getBytes(StandardCharsets.UTF_8), true);
getPath(asyncId), State.RUNNING.shorthand.getBytes(StandardCharsets.UTF_8));
}

void deleteInFlightTask(String asyncId) throws KeeperException, InterruptedException {
zkClient.delete(getPath(asyncId), -1, true);
zkClient.delete(getPath(asyncId), -1);
}

State getInFlightState(String asyncId) throws KeeperException, InterruptedException {
if (!zkClient.exists(getPath(asyncId), true)) {
if (!zkClient.exists(getPath(asyncId))) {
return State.NOT_FOUND;
}

final byte[] bytes;
try {
bytes = zkClient.getData(getPath(asyncId), null, null, true);
bytes = zkClient.getData(getPath(asyncId), null, null);
} catch (KeeperException.NoNodeException nne) {
// Unlikely race, but not impossible...
if (log.isInfoEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ private void doStateDotJsonCasUpdate(ClusterState updatedState)
if (updater.isCollectionCreation()) {
// The state.json file does not exist yet (more precisely it is assumed not to exist)
log.debug("going to create collection {}", jsonPath);
zkStateReader.getZkClient().create(jsonPath, stateJson, CreateMode.PERSISTENT, true);
zkStateReader.getZkClient().create(jsonPath, stateJson, CreateMode.PERSISTENT);
} else {
// We're updating an existing state.json
if (log.isDebugEnabled()) {
Expand All @@ -589,7 +589,7 @@ private void doStateDotJsonCasUpdate(ClusterState updatedState)
}
zkStateReader
.getZkClient()
.setData(jsonPath, stateJson, collection.getZNodeVersion(), true);
.setData(jsonPath, stateJson, collection.getZNodeVersion());
}
}
}
Expand All @@ -603,7 +603,7 @@ private void doStateDotJsonCasUpdate(ClusterState updatedState)
private ClusterState fetchStateForCollection() throws KeeperException, InterruptedException {
String collectionStatePath = ZkStateReader.getCollectionPath(updater.getCollectionName());
Stat stat = new Stat();
byte[] data = zkStateReader.getZkClient().getData(collectionStatePath, null, stat, true);
byte[] data = zkStateReader.getZkClient().getData(collectionStatePath, null, stat);

// This factory method can detect a missing configName and supply it by reading it from the
// old ZK location.
Expand Down Expand Up @@ -910,7 +910,7 @@ public static void executeNodeDownStateUpdate(String nodeName, ZkStateReader zkS

try {
final List<String> collectionNames =
zkStateReader.getZkClient().getChildren(COLLECTIONS_ZKNODE, null, true);
zkStateReader.getZkClient().getChildren(COLLECTIONS_ZKNODE, null);

// Collections are totally independent of each other. Multiple threads could share the load
// here (need a ZK connection for each though).
Expand Down
22 changes: 10 additions & 12 deletions solr/core/src/java/org/apache/solr/cloud/DistributedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
Expand All @@ -42,9 +41,8 @@ public class DistributedMap {
public DistributedMap(SolrZkClient zookeeper, String dir) {
this.dir = dir;

ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout());
try {
cmdExecutor.ensureExists(dir, zookeeper);
zookeeper.ensureExists(dir);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
Expand All @@ -57,7 +55,7 @@ public DistributedMap(SolrZkClient zookeeper, String dir) {

public void put(String trackingId, byte[] data) throws KeeperException, InterruptedException {
zookeeper.makePath(
dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, false, true);
dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, false);
}

/**
Expand All @@ -69,24 +67,24 @@ public boolean putIfAbsent(String trackingId, byte[] data)
throws KeeperException, InterruptedException {
try {
zookeeper.makePath(
dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, true, true);
dir + "/" + PREFIX + trackingId, data, CreateMode.PERSISTENT, null, true);
return true;
} catch (NodeExistsException e) {
return false;
}
}

public byte[] get(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null, true);
return zookeeper.getData(dir + "/" + PREFIX + trackingId, null, null);
}

public boolean contains(String trackingId) throws KeeperException, InterruptedException {
return zookeeper.exists(dir + "/" + PREFIX + trackingId, true);
return zookeeper.exists(dir + "/" + PREFIX + trackingId);
}

public int size() throws KeeperException, InterruptedException {
Stat stat = new Stat();
zookeeper.getData(dir, null, stat, true);
zookeeper.getData(dir, null, stat);
return stat.getNumChildren();
}

Expand All @@ -96,7 +94,7 @@ public int size() throws KeeperException, InterruptedException {
*/
public boolean remove(String trackingId) throws KeeperException, InterruptedException {
try {
zookeeper.delete(dir + "/" + PREFIX + trackingId, -1, true);
zookeeper.delete(dir + "/" + PREFIX + trackingId, -1);
} catch (KeeperException.NoNodeException e) {
return false;
}
Expand All @@ -105,15 +103,15 @@ public boolean remove(String trackingId) throws KeeperException, InterruptedExce

/** Helper method to clear all child nodes for a parent node. */
public void clear() throws KeeperException, InterruptedException {
List<String> childNames = zookeeper.getChildren(dir, null, true);
List<String> childNames = zookeeper.getChildren(dir, null);
for (String childName : childNames) {
zookeeper.delete(dir + "/" + childName, -1, true);
zookeeper.delete(dir + "/" + childName, -1);
}
}

/** Returns the keys of all the elements in the map */
public Collection<String> keys() throws KeeperException, InterruptedException {
List<String> childs = zookeeper.getChildren(dir, null, true);
List<String> childs = zookeeper.getChildren(dir, null);
final List<String> ids = new ArrayList<>(childs.size());
childs.stream().forEach((child) -> ids.add(child.substring(PREFIX.length())));
return ids;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void cancelElection() throws InterruptedException, KeeperException {
if (leaderSeqPath != null) {
try {
log.debug("Canceling election {}", leaderSeqPath);
zkClient.delete(leaderSeqPath, -1, true);
zkClient.delete(leaderSeqPath, -1);
} catch (NoNodeException e) {
// fine
log.debug("cancelElection did not find election node to remove {}", leaderSeqPath);
Expand Down
37 changes: 16 additions & 21 deletions solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -62,8 +61,6 @@ public class LeaderElector {

protected SolrZkClient zkClient;

private ZkCmdExecutor zkCmdExecutor;

private volatile ElectionContext context;

private ElectionWatcher watcher;
Expand All @@ -73,13 +70,11 @@ public class LeaderElector {

public LeaderElector(SolrZkClient zkClient) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
}

public LeaderElector(
SolrZkClient zkClient, ContextKey key, Map<ContextKey, ElectionContext> electionContexts) {
this.zkClient = zkClient;
zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
this.electionContexts = electionContexts;
this.contextKey = key;
}
Expand All @@ -100,7 +95,7 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement
context.checkIfIamLeaderFired();
// get all other numbers...
final String holdElectionPath = context.electionPath + ELECTION_NODE;
List<String> seqs = zkClient.getChildren(holdElectionPath, null, true);
List<String> seqs = zkClient.getChildren(holdElectionPath, null);
sortSeqs(seqs);

String leaderSeqNodeName =
Expand All @@ -112,15 +107,15 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement

// If any double-registrations exist for me, remove all but this latest one!
// TODO: can we even get into this state?
String prefix = zkClient.getSolrZooKeeper().getSessionId() + "-" + context.id + "-";
String prefix = zkClient.getZkSessionId() + "-" + context.id + "-";
Iterator<String> it = seqs.iterator();
while (it.hasNext()) {
String elec = it.next();
if (!elec.equals(leaderSeqNodeName) && elec.startsWith(prefix)) {
try {
String toDelete = holdElectionPath + "/" + elec;
log.warn("Deleting duplicate registration: {}", toDelete);
zkClient.delete(toDelete, -1, true);
zkClient.delete(toDelete, -1);
} catch (KeeperException.NoNodeException e) {
// ignore
}
Expand Down Expand Up @@ -154,8 +149,8 @@ private void checkIfIamLeader(final ElectionContext context, boolean replacement
watcher =
new ElectionWatcher(
context.leaderSeqPath, watchedNode, getSeq(context.leaderSeqPath), context),
null,
true);
null
);
log.debug("Watching path {} to know if I could be the leader", watchedNode);
} catch (KeeperException.SessionExpiredException e) {
throw e;
Expand Down Expand Up @@ -232,7 +227,7 @@ public int joinElection(ElectionContext context, boolean replacement, boolean jo

final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;

long sessionId = zkClient.getSolrZooKeeper().getSessionId();
long sessionId = zkClient.getZkSessionId();
String id = sessionId + "-" + context.id;
String leaderSeqPath = null;
boolean cont = true;
Expand All @@ -248,8 +243,8 @@ public int joinElection(ElectionContext context, boolean replacement, boolean jo
zkClient.create(
shardsElectZkPath + "/" + id + "-n_",
null,
CreateMode.EPHEMERAL_SEQUENTIAL,
false);
CreateMode.EPHEMERAL_SEQUENTIAL
);
} else {
String firstInLine = nodes.get(1);
log.debug("The current head: {}", firstInLine);
Expand All @@ -258,23 +253,23 @@ public int joinElection(ElectionContext context, boolean replacement, boolean jo
throw new IllegalStateException("Could not find regex match in:" + firstInLine);
}
leaderSeqPath = shardsElectZkPath + "/" + id + "-n_" + m.group(1);
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL);
}
} else {
leaderSeqPath =
zkClient.create(
shardsElectZkPath + "/" + id + "-n_",
null,
CreateMode.EPHEMERAL_SEQUENTIAL,
false);
CreateMode.EPHEMERAL_SEQUENTIAL
);
}

log.debug("Joined leadership election with path: {}", leaderSeqPath);
context.leaderSeqPath = leaderSeqPath;
cont = false;
} catch (ConnectionLossException e) {
// we don't know if we made our node or not...
List<String> entries = zkClient.getChildren(shardsElectZkPath, null, true);
List<String> entries = zkClient.getChildren(shardsElectZkPath, null);

boolean foundId = false;
for (String entry : entries) {
Expand Down Expand Up @@ -342,7 +337,7 @@ public void process(WatchedEvent event) {
if (canceled) {
log.debug("This watcher is not active anymore {}", myNode);
try {
zkClient.delete(myNode, -1, true);
zkClient.delete(myNode, -1);
} catch (KeeperException.NoNodeException nne) {
// expected . don't do anything
} catch (Exception e) {
Expand All @@ -353,7 +348,7 @@ public void process(WatchedEvent event) {
try {
// am I the next leader?
checkIfIamLeader(context, true);
} catch (AlreadyClosedException e) {
} catch (IllegalStateException e) {

} catch (Exception e) {
if (!zkClient.isClosed()) {
Expand All @@ -367,10 +362,10 @@ public void process(WatchedEvent event) {
public void setup(final ElectionContext context) throws InterruptedException, KeeperException {
String electZKPath = context.electionPath + LeaderElector.ELECTION_NODE;
if (context instanceof OverseerElectionContext) {
zkCmdExecutor.ensureExists(electZKPath, zkClient);
zkClient.ensureExists(electZKPath, null, CreateMode.PERSISTENT);
} else {
// we use 2 param so that replica won't create /collection/{collection} if it doesn't exist
zkCmdExecutor.ensureExists(electZKPath, (byte[]) null, CreateMode.PERSISTENT, zkClient, 2);
zkClient.ensureExists(electZKPath, null, CreateMode.PERSISTENT, 2);
}

this.context = context;
Expand Down
Loading