diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index ef9091ef4b4..7ffba364328 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -91,6 +91,8 @@ public class Constants { public static final String ZTABLE_LOCKS = "/table_locks"; public static final String ZMINI_LOCK = "/mini"; + public static final String ZADMIN_LOCK = "/admin/lock"; + public static final String ZTEST_LOCK = "/test/lock"; public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index ff5e45d3103..69631ecf4ee 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -85,10 +85,6 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { // Keeps track of the number of concurrent callers to waitForStatusChange() private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0); - public AbstractFateStore() { - this(createDummyLockID(), null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); - } - public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLockHeld) { this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @@ -98,9 +94,7 @@ public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLock this.maxDeferred = maxDeferred; this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); this.deferred = Collections.synchronizedMap(new HashMap<>()); - this.lockID = Objects.requireNonNull(lockID); - // If the store is used for a Fate which runs a dead reservation cleaner, - // this should be non-null, otherwise null is fine + this.lockID = lockID; this.isLockHeld = isLockHeld; } @@ -275,6 +269,10 @@ protected void verifyFateKey(FateId fateId, Optional fateKeySeen, "Collision detected for fate id " + fateId); } + protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) { + Preconditions.checkState(lockID != null, "Tried to reserve " + fateId + " with null lockID"); + } + protected abstract Stream getTransactions(EnumSet statuses); protected abstract TStatus _getStatus(FateId fateId); @@ -428,14 +426,4 @@ protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { throw new IllegalStateException("Bad node data " + txInfo); } } - - /** - * this is a temporary method used to create a dummy lock when using a FateStore outside the - * context of a Manager (one example is testing) so reservations can still be made. - * - * @return a dummy {@link ZooUtil.LockID} - */ - public static ZooUtil.LockID createDummyLockID() { - return new ZooUtil.LockID("/path", "node", 123); - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 97c47b39ca6..1e9689f9801 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -44,14 +45,12 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.util.Retry; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - /** * A utility to administer FATE operations */ @@ -204,18 +203,19 @@ public Map> getDanglingWaitingLocks() { * instance type. This method does not process lock information, if lock information is desired, * use {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet, EnumSet)} * - * @param fateStores read-only fate stores + * @param readOnlyFateStores read-only fate stores * @param fateIdFilter filter results to include only provided fate transaction ids * @param statusFilter filter results to include only provided status types * @param typesFilter filter results to include only provided fate instance types * @return list of FATE transactions that match filter criteria */ public List getTransactionStatus( - Map> fateStores, Set fateIdFilter, + Map> readOnlyFateStores, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) { - FateStatus status = getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, - Collections.>emptyMap(), Collections.>emptyMap()); + FateStatus status = getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, + typesFilter, Collections.>emptyMap(), + Collections.>emptyMap()); return status.getTransactions(); } @@ -224,7 +224,7 @@ public List getTransactionStatus( * Get the FATE transaction status and lock information stored in zookeeper, optionally filtered * by fate id, status, and fate instance type * - * @param mfs read-only MetaFateStore + * @param readOnlyMFS read-only MetaFateStore * @param zk zookeeper reader. * @param lockPath the zookeeper path for locks * @param fateIdFilter filter results to include only provided fate transaction ids @@ -234,36 +234,37 @@ public List getTransactionStatus( * @throws KeeperException if zookeeper exception occurs * @throws InterruptedException if process is interrupted. */ - public FateStatus getStatus(ReadOnlyFateStore mfs, ZooReader zk, ServiceLockPath lockPath, - Set fateIdFilter, EnumSet statusFilter, + public FateStatus getStatus(ReadOnlyFateStore readOnlyMFS, ZooReader zk, + ServiceLockPath lockPath, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); Map> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(Map.of(FateInstanceType.META, mfs), fateIdFilter, statusFilter, - typesFilter, heldLocks, waitingLocks); + return getTransactionStatus(Map.of(FateInstanceType.META, readOnlyMFS), fateIdFilter, + statusFilter, typesFilter, heldLocks, waitingLocks); } - public FateStatus getStatus(ReadOnlyFateStore ufs, Set fateIdFilter, + public FateStatus getStatus(ReadOnlyFateStore readOnlyUFS, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { - return getTransactionStatus(Map.of(FateInstanceType.USER, ufs), fateIdFilter, statusFilter, - typesFilter, new HashMap<>(), new HashMap<>()); + return getTransactionStatus(Map.of(FateInstanceType.USER, readOnlyUFS), fateIdFilter, + statusFilter, typesFilter, new HashMap<>(), new HashMap<>()); } - public FateStatus getStatus(Map> fateStores, ZooReader zk, - ServiceLockPath lockPath, Set fateIdFilter, EnumSet statusFilter, - EnumSet typesFilter) throws KeeperException, InterruptedException { + public FateStatus getStatus(Map> readOnlyFateStores, + ZooReader zk, ServiceLockPath lockPath, Set fateIdFilter, + EnumSet statusFilter, EnumSet typesFilter) + throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); Map> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, heldLocks, - waitingLocks); + return getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, typesFilter, + heldLocks, waitingLocks); } /** @@ -340,7 +341,7 @@ private void findLocks(ZooReader zk, final ServiceLockPath lockPath, /** * Returns fate status, possibly filtered * - * @param fateStores read-only access to populated transaction stores. + * @param readOnlyFateStores read-only access to populated transaction stores. * @param fateIdFilter Optional. List of transactions to filter results - if null, all * transactions are returned * @param statusFilter Optional. List of status types to filter results - if null, all @@ -352,12 +353,12 @@ private void findLocks(ZooReader zk, final ServiceLockPath lockPath, * @return current fate and lock status */ public static FateStatus getTransactionStatus( - Map> fateStores, Set fateIdFilter, + Map> readOnlyFateStores, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter, Map> heldLocks, Map> waitingLocks) { final List statuses = new ArrayList<>(); - fateStores.forEach((type, store) -> { + readOnlyFateStores.forEach((type, store) -> { try (Stream fateIds = store.list().map(FateIdStatus::getFateId)) { fateIds.forEach(fateId -> { @@ -410,17 +411,17 @@ private static boolean includeByInstanceType(FateInstanceType type, return typesFilter == null || typesFilter.isEmpty() || typesFilter.contains(type); } - public void printAll(Map> fateStores, ZooReader zk, + public void printAll(Map> readOnlyFateStores, ZooReader zk, ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException { - print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); + print(readOnlyFateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); } - public void print(Map> fateStores, ZooReader zk, + public void print(Map> readOnlyFateStores, ZooReader zk, ServiceLockPath tableLocksPath, Formatter fmt, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { FateStatus fateStatus = - getStatus(fateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); + getStatus(readOnlyFateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); for (TransactionStatus txStatus : fateStatus.getTransactions()) { fmt.format( @@ -443,11 +444,7 @@ public void print(Map> fateStores, ZooRead } } - public boolean prepDelete(Map> stores, ZooReaderWriter zk, - ServiceLockPath path, String fateIdStr) { - if (!checkGlobalLock(zk, path)) { - return false; - } + public boolean prepDelete(Map> stores, String fateIdStr) { FateId fateId; try { @@ -461,36 +458,37 @@ public boolean prepDelete(Map> stores, ZooReaderWr // determine which store to use FateStore store = stores.get(fateId.getType()); - FateTxStore txStore = store.reserve(fateId); - try { - TStatus ts = txStore.getStatus(); - switch (ts) { - case UNKNOWN: - System.out.println("Invalid transaction ID: " + fateId); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - case FAILED: - case FAILED_IN_PROGRESS: - case SUCCESSFUL: - System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); - txStore.delete(); - state = true; - break; + Optional> opTxStore = tryReserve(store, fateId, "delete"); + if (opTxStore.isPresent()) { + var txStore = opTxStore.orElseThrow(); + + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid transaction ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + case FAILED: + case FAILED_IN_PROGRESS: + case SUCCESSFUL: + System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); + txStore.delete(); + state = true; + break; + } + } finally { + txStore.unreserve(Duration.ZERO); } - } finally { - txStore.unreserve(Duration.ZERO); } + return state; } - public boolean prepFail(Map> stores, ZooReaderWriter zk, - ServiceLockPath zLockManagerPath, String fateIdStr) { - if (!checkGlobalLock(zk, zLockManagerPath)) { - return false; - } + public boolean prepFail(Map> stores, String fateIdStr) { FateId fateId; try { @@ -504,39 +502,75 @@ public boolean prepFail(Map> stores, ZooReaderWrit // determine which store to use FateStore store = stores.get(fateId.getType()); - FateTxStore txStore = store.reserve(fateId); - try { - TStatus ts = txStore.getStatus(); - switch (ts) { - case UNKNOWN: - System.out.println("Invalid fate ID: " + fateId); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); - txStore.setStatus(TStatus.FAILED_IN_PROGRESS); - state = true; - break; - - case SUCCESSFUL: - System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); - break; - - case FAILED: - case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); - state = true; - break; + Optional> opTxStore = tryReserve(store, fateId, "fail"); + if (opTxStore.isPresent()) { + var txStore = opTxStore.orElseThrow(); + + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid fate ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); + state = true; + break; + + case SUCCESSFUL: + System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); + break; + + case FAILED: + case FAILED_IN_PROGRESS: + System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); + state = true; + break; + } + } finally { + txStore.unreserve(Duration.ZERO); } - } finally { - txStore.unreserve(Duration.ZERO); } return state; } + /** + * Try to reserve the transaction for a minute. If it could not be reserved, return an empty + * optional + */ + private Optional> tryReserve(FateStore store, FateId fateId, String op) { + var retry = Retry.builder().maxRetriesWithinDuration(Duration.ofMinutes(1)) + .retryAfter(Duration.ofMillis(25)).incrementBy(Duration.ofMillis(25)) + .maxWait(Duration.ofSeconds(15)).backOffFactor(1.5).logInterval(Duration.ofSeconds(15)) + .createRetry(); + + Optional> reserveAttempt = store.tryReserve(fateId); + while (reserveAttempt.isEmpty() && retry.canRetry()) { + retry.useRetry(); + try { + retry.waitForNextAttempt(log, "Attempting to reserve " + fateId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalArgumentException(e); + } + reserveAttempt = store.tryReserve(fateId); + } + if (reserveAttempt.isPresent()) { + retry.logCompletion(log, "Attempting to reserve " + fateId); + } else { + log.error("Could not {} {} in a reasonable time. This indicates the Manager is currently " + + "working on {}. If {} {} is still desired, the Manager needs to be stopped and " + + "the command needs to be rerun.", op, fateId, fateId, op, fateId); + } + + return reserveAttempt; + } + public void deleteLocks(ZooReaderWriter zk, ServiceLockPath path, String fateIdStr) throws KeeperException, InterruptedException { // delete any locks assoc w/ fate operation @@ -555,35 +589,4 @@ public void deleteLocks(ZooReaderWriter zk, ServiceLockPath path, String fateIdS } } } - - @SuppressFBWarnings(value = "DM_EXIT", - justification = "TODO - should probably avoid System.exit here; " - + "this code is used by the fate admin shell command") - public boolean checkGlobalLock(ZooReaderWriter zk, ServiceLockPath zLockManagerPath) { - try { - if (ServiceLock.getLockData(zk.getZooKeeper(), zLockManagerPath).isPresent()) { - System.err.println("ERROR: Manager lock is held, not running"); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } - } catch (KeeperException e) { - System.err.println("ERROR: Could not read manager lock, not running " + e.getMessage()); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } catch (InterruptedException e) { - System.err.println("ERROR: Could not read manager lock, not running" + e.getMessage()); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } - return true; - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 7446d1fafe3..6b5e84175f3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; @@ -52,7 +53,6 @@ import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.UtilWaitThread; @@ -75,6 +75,17 @@ public class UserFateStore extends AbstractFateStore { private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, maxRepos); + /** + * Constructs a UserFateStore + * + * @param context the {@link ClientContext} + * @param tableName the name of the table which will store the Fate data + * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be + * null if this store will be used as read-only (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, Predicate isLockHeld) { this(context, tableName, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); @@ -88,11 +99,6 @@ public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID loc this.tableName = Objects.requireNonNull(tableName); } - public UserFateStore(ClientContext context, ZooUtil.LockID lockID, - Predicate isLockHeld) { - this(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld); - } - @Override public FateId create() { @@ -128,8 +134,9 @@ public FateId getFateId() { @Override public Optional> createAndReserve(FateKey fateKey) { - final var reservation = FateReservation.from(lockID, UUID.randomUUID()); final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + verifyLock(lockID, fateId); + final var reservation = FateReservation.from(lockID, UUID.randomUUID()); Optional> txStore = Optional.empty(); int maxAttempts = 5; FateMutator.Status status = null; @@ -225,6 +232,7 @@ public Optional> createAndReserve(FateKey fateKey) { @Override public Optional> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); // Create a unique FateReservation for this reservation attempt FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index d19db170048..4be9fad9062 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -75,6 +75,17 @@ private String getTXPath(FateId fateId) { return path + "/tx_" + fateId.getTxUUIDStr(); } + /** + * Constructs a MetaFateStore + * + * @param path the path in ZK where the fate data will reside + * @param zk the {@link ZooReaderWriter} + * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be + * null if this store will be used as read-only (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID, Predicate isLockHeld) throws KeeperException, InterruptedException { this(path, zk, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); @@ -91,11 +102,6 @@ public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID, zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } - /** - * For testing only - */ - MetaFateStore() {} - @Override public FateId create() { while (true) { @@ -114,8 +120,9 @@ public FateId create() { @Override public Optional> createAndReserve(FateKey fateKey) { - final var reservation = FateReservation.from(lockID, UUID.randomUUID()); final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + verifyLock(lockID, fateId); + final var reservation = FateReservation.from(lockID, UUID.randomUUID()); try { byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId), @@ -163,6 +170,7 @@ public Optional> createAndReserve(FateKey fateKey) { @Override public Optional> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); // uniquely identify this attempt to reserve the fate operation data FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index 263f17e440c..6ef7c31cf92 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -91,7 +91,7 @@ public String serialize(String root) { @Override public String toString() { - return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); + return "path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index fd3d765a3b7..5989ee392d3 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -64,7 +64,8 @@ private ServiceLockPath(String root, String type) { this.type = Objects.requireNonNull(type); Preconditions.checkArgument(this.type.equals(Constants.ZGC_LOCK) || this.type.equals(Constants.ZMANAGER_LOCK) || this.type.equals(Constants.ZMONITOR_LOCK) - || this.type.equals(Constants.ZTABLE_LOCKS), "Unsupported type: " + type); + || this.type.equals(Constants.ZTABLE_LOCKS) || this.type.equals(Constants.ZADMIN_LOCK) + || this.type.equals(Constants.ZTEST_LOCK), "Unsupported type: " + type); // These server types support only one active instance, so they use a lock at // a known path, not the server's address. this.resourceGroup = null; @@ -189,6 +190,10 @@ private static String determineServerType(final String path) { return Constants.ZMONITOR_LOCK; } else if (path.contains(Constants.ZMINI_LOCK)) { return Constants.ZMINI_LOCK; + } else if (path.contains(Constants.ZADMIN_LOCK)) { + return Constants.ZADMIN_LOCK; + } else if (path.contains(Constants.ZTEST_LOCK)) { + return Constants.ZTEST_LOCK; } else if (path.contains(Constants.ZCOMPACTORS)) { return Constants.ZCOMPACTORS; } else if (path.contains(Constants.ZSSERVERS)) { @@ -216,6 +221,8 @@ public static ServiceLockPath parse(Optional serverType, String path) { case Constants.ZGC_LOCK: case Constants.ZMANAGER_LOCK: case Constants.ZMONITOR_LOCK: + case Constants.ZADMIN_LOCK: + case Constants.ZTEST_LOCK: return new ServiceLockPath(path.substring(0, path.indexOf(type)), type); default: { final String[] pathParts = path.replaceFirst("/", "").split("/"); @@ -285,6 +292,14 @@ public ServiceLockPath createDeadTabletServerPath(String resourceGroup, serverAddress.toString()); } + public ServiceLockPath createAdminLockPath() { + return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZADMIN_LOCK); + } + + public ServiceLockPath createTestLockPath() { + return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTEST_LOCK); + } + public Set getCompactor(ResourceGroupPredicate resourceGroupPredicate, AddressSelector address, boolean withLock) { return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index e8a55081c31..370555ba1f2 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -682,7 +682,7 @@ public void unableToMonitorLockNode(Exception e) { @Override public void acquiredLock() { - log.warn("Acquired ZK lock for MiniAccumuloClusterImpl"); + log.debug("Acquired ZK lock for MiniAccumuloClusterImpl"); lockAcquired.set(true); lockWatcherInvoked.countDown(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index ace5a00dbad..25de7a8f422 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import java.io.BufferedWriter; import java.io.File; @@ -43,6 +42,8 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -77,7 +78,9 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.FateService; @@ -108,7 +111,9 @@ import org.apache.accumulo.server.util.checkCommand.UserFilesCheckRunner; import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,6 +134,7 @@ @AutoService(KeywordExecutable.class) public class Admin implements KeywordExecutable { private static final Logger log = LoggerFactory.getLogger(Admin.class); + final CountDownLatch lockAcquiredLatch = new CountDownLatch(1); static class AdminOpts extends ServerUtilOpts { @Parameter(names = {"-f", "--force"}, @@ -340,11 +346,11 @@ static class FateOpsCommand { boolean cancel; @Parameter(names = {"-f", "--fail"}, - description = "... Transition FaTE transaction status to FAILED_IN_PROGRESS (requires Manager to be down)") + description = "... Transition FaTE transaction status to FAILED_IN_PROGRESS") boolean fail; @Parameter(names = {"-d", "--delete"}, - description = "... Delete locks associated with transactions (Requires Manager to be down)") + description = "... Delete locks associated with transactions") boolean delete; @Parameter(names = {"-p", "--print", "-print", "-l", "--list", "-list"}, @@ -368,6 +374,29 @@ static class FateOpsCommand { List instanceTypes = new ArrayList<>(); } + class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher { + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + log.warn("Lost lock: " + reason.toString()); + } + + @Override + public void unableToMonitorLockNode(Exception e) { + log.warn("Unable to monitor lock: " + e.getMessage()); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for Admin"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for Admin, msg: " + e.getMessage()); + } + } + public static void main(String[] args) { new Admin().execute(args); } @@ -910,50 +939,111 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps AdminUtil admin = new AdminUtil<>(true); final String zkRoot = context.getZooKeeperRoot(); - var zLockManagerPath = context.getServerPaths().createManagerPath(); var zTableLocksPath = context.getServerPaths().createTableLocksPath(); String fateZkPath = zkRoot + Constants.ZFATE; ZooReaderWriter zk = context.getZooReaderWriter(); - MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); - Map> readOnlyFateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); - - if (fateOpsCommand.cancel) { - cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); - } else if (fateOpsCommand.fail) { - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) { - throw new AccumuloException("Could not fail transaction: " + fateIdStr); + ServiceLock adminLock = null; + Map> fateStores; + Map> readOnlyFateStores = null; + + try { + if (fateOpsCommand.cancel) { + cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); + } else if (fateOpsCommand.fail) { + adminLock = createAdminLock(context); + fateStores = createFateStores(context, zk, fateZkPath, adminLock); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepFail(fateStores, fateIdStr)) { + throw new AccumuloException("Could not fail transaction: " + fateIdStr); + } } + } else if (fateOpsCommand.delete) { + adminLock = createAdminLock(context); + fateStores = createFateStores(context, zk, fateZkPath, adminLock); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepDelete(fateStores, fateIdStr)) { + throw new AccumuloException("Could not delete transaction: " + fateIdStr); + } + admin.deleteLocks(zk, zTableLocksPath, fateIdStr); + } + } + + if (fateOpsCommand.print) { + final Set fateIdFilter = new TreeSet<>(); + fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); + EnumSet statusFilter = + getCmdLineStatusFilters(fateOpsCommand.states); + EnumSet typesFilter = + getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); + readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath); + admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), + fateIdFilter, statusFilter, typesFilter); + // print line break at the end + System.out.println(); } - } else if (fateOpsCommand.delete) { - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) { - throw new AccumuloException("Could not delete transaction: " + fateIdStr); + + if (fateOpsCommand.summarize) { + if (readOnlyFateStores == null) { + readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath); } - admin.deleteLocks(zk, zTableLocksPath, fateIdStr); + summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); + } + } finally { + if (adminLock != null) { + adminLock.unlock(); } } + } - if (fateOpsCommand.print) { - final Set fateIdFilter = new TreeSet<>(); - fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); - EnumSet statusFilter = - getCmdLineStatusFilters(fateOpsCommand.states); - EnumSet typesFilter = - getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); - admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), fateIdFilter, - statusFilter, typesFilter); - // print line break at the end - System.out.println(); - } + private Map> createFateStores(ServerContext context, + ZooReaderWriter zk, String fateZkPath, ServiceLock adminLock) + throws InterruptedException, KeeperException { + var lockId = adminLock.getLockID(); + MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, lockId, null); + UserFateStore ufs = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId, null); + return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + } - if (fateOpsCommand.summarize) { - summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); + private Map> + createReadOnlyFateStores(ServerContext context, ZooReaderWriter zk, String fateZkPath) + throws InterruptedException, KeeperException { + MetaFateStore readOnlyMFS = new MetaFateStore<>(fateZkPath, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); + } + + private ServiceLock createAdminLock(ServerContext context) throws InterruptedException { + var zk = context.getZooReaderWriter().getZooKeeper(); + UUID uuid = UUID.randomUUID(); + ServiceLockPath slp = context.getServerPaths().createAdminLockPath(); + ServiceLock adminLock = new ServiceLock(context.getZooReaderWriter().getZooKeeper(), slp, uuid); + AdminLockWatcher lw = new AdminLockWatcher(); + ServiceLockData.ServiceDescriptors descriptors = new ServiceLockData.ServiceDescriptors(); + descriptors.addService(new ServiceLockData.ServiceDescriptor(uuid, + ServiceLockData.ThriftService.NONE, "localhost", Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + String lockPath = slp.toString(); + String parentLockPath = lockPath.substring(0, lockPath.indexOf("/lock")); + + try { + if (zk.exists(parentLockPath, false) == null) { + zk.create(parentLockPath, new byte[0], ZooUtil.PUBLIC, CreateMode.PERSISTENT); + log.info("Created: {}", parentLockPath); + } + if (zk.exists(lockPath, false) == null) { + zk.create(lockPath, new byte[0], ZooUtil.PUBLIC, CreateMode.PERSISTENT); + log.info("Created: {}", lockPath); + } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); } + + adminLock.lock(lw, sld); + lockAcquiredLatch.await(); + + return adminLock; } private void validateFateUserInput(FateOpsCommand cmd) { @@ -1108,15 +1198,16 @@ private static long printDanglingFateOperations(ServerContext context, } }; - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter(), createDummyLockID(), null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + MetaFateStore readOnlyMFS = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, + context.getZooReaderWriter(), null, null); LoadingCache fateStatusCache = Caffeine.newBuilder() .maximumSize(100_000).expireAfterWrite(10, TimeUnit.SECONDS).build(fateId -> { if (fateId.getType() == FateInstanceType.META) { - return mfs.read(fateId).getStatus(); + return readOnlyMFS.read(fateId).getStatus(); } else { - return ufs.read(fateId).getStatus(); + return readOnlyUFS.read(fateId).getStatus(); } }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 200b3ff2b48..23cc841e9ce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -53,7 +53,7 @@ public abstract class FateMetrics implements Metrics private static final String OP_TYPE_TAG = "op.type"; protected final ServerContext context; - protected final ReadOnlyFateStore> fateStore; + protected final ReadOnlyFateStore> readOnlyFateStore; protected final long refreshDelay; protected final AtomicLong totalCurrentOpsCount = new AtomicLong(0); @@ -62,14 +62,14 @@ public abstract class FateMetrics implements Metrics public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { this.context = context; this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); - this.fateStore = Objects.requireNonNull(buildStore(context)); + this.readOnlyFateStore = Objects.requireNonNull(buildReadOnlyStore(context)); for (TStatus status : TStatus.values()) { txStatusCounters.put(status, new AtomicLong(0)); } } - protected abstract ReadOnlyFateStore> buildStore(ServerContext context); + protected abstract ReadOnlyFateStore> buildReadOnlyStore(ServerContext context); protected abstract T getMetricValues(); @@ -95,7 +95,7 @@ protected void update(T metricValues) { @Override public void registerMetrics(final MeterRegistry registry) { - String type = fateStore.type().name().toLowerCase(); + String type = readOnlyFateStore.type().name().toLowerCase(); Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get) .description(FATE_OPS.getDescription()).register(registry); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java index d26cf259d9b..71033331a03 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; @@ -62,10 +61,11 @@ public void registerMetrics(MeterRegistry registry) { } @Override - protected ReadOnlyFateStore> buildStore(ServerContext context) { + protected ReadOnlyFateStore> + buildReadOnlyStore(ServerContext context) { try { - return new MetaFateStore<>(getFateRootPath(context), context.getZooReaderWriter(), - AbstractFateStore.createDummyLockID(), null); + return new MetaFateStore<>(getFateRootPath(context), context.getZooReaderWriter(), null, + null); } catch (KeeperException ex) { throw new IllegalStateException( "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); @@ -78,7 +78,7 @@ protected ReadOnlyFateStore> buildStore(Server @Override protected MetaFateMetricValues getMetricValues() { - return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, fateStore); + return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, readOnlyFateStore); } private static String getFateRootPath(ServerContext context) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java index 92ac8568810..e4fad01899c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.manager.metrics.fate.user; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.manager.metrics.fate.FateMetrics; import org.apache.accumulo.server.ServerContext; @@ -31,12 +31,13 @@ public UserFateMetrics(ServerContext context, long minimumRefreshDelay) { } @Override - protected ReadOnlyFateStore> buildStore(ServerContext context) { - return new UserFateStore<>(context, AbstractFateStore.createDummyLockID(), null); + protected ReadOnlyFateStore> + buildReadOnlyStore(ServerContext context) { + return new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); } @Override protected UserFateMetricValues getMetricValues() { - return UserFateMetricValues.getUserStoreMetrics(fateStore); + return UserFateMetricValues.getUserStoreMetrics(readOnlyFateStore); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index e8955e465ab..425e28ece6c 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.compaction; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; @@ -85,6 +84,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; @@ -99,17 +99,20 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.FindCompactionTmpFiles; +import org.apache.accumulo.test.fate.TestLock; import org.apache.accumulo.test.functional.CompactionIT.ErrorThrowingSelector; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import com.google.common.base.Preconditions; public class ExternalCompaction_1_IT extends SharedMiniClusterBase { + static ServiceLock testLock; public static class ExternalCompaction1Config implements MiniClusterConfigurationCallback { @Override @@ -121,6 +124,14 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS @BeforeAll public static void beforeTests() throws Exception { startMiniClusterWithConfig(new ExternalCompaction1Config()); + testLock = new TestLock().createTestLock(getCluster().getServerContext()); + } + + @AfterAll + public static void afterTests() throws Exception { + if (testLock != null) { + testLock.unlock(); + } } public static class TestFilter extends Filter { @@ -234,7 +245,7 @@ public void testExternalCompaction() throws Exception { public void testCompactionCommitAndDeadDetectionRoot() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooReaderWriter(), createDummyLockID(), null); + ctx.getZooReaderWriter(), testLock.getLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); @@ -253,7 +264,7 @@ public void testCompactionCommitAndDeadDetectionRoot() throws Exception { public void testCompactionCommitAndDeadDetectionMeta() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooReaderWriter(), createDummyLockID(), null); + ctx.getZooReaderWriter(), testLock.getLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { // Metadata table by default already has 2 tablets @@ -275,7 +286,8 @@ public void testCompactionCommitAndDeadDetectionUser() throws Exception { final String tableName = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); + UserFateStore userFateStore = + new UserFateStore<>(ctx, AccumuloTable.FATE.tableName(), testLock.getLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); @@ -298,10 +310,11 @@ public void testCompactionCommitAndDeadDetectionAll() throws Exception { final String userTable = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); + UserFateStore userFateStore = + new UserFateStore<>(ctx, AccumuloTable.FATE.tableName(), testLock.getLockID(), null); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter(), - createDummyLockID(), null); + testLock.getLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index 71b198c0ac9..f15fb6caaa1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -27,7 +27,8 @@ import org.apache.accumulo.core.fate.Repo; /** - * A FATE which performs the dead reservation cleanup with a much shorter delay between + * A FATE which performs the dead reservation cleanup with a much shorter delay between. Useful for + * shortening test times for tests that are waiting for a cleanup to occur. */ public class FastFate extends Fate { @@ -38,6 +39,6 @@ public FastFate(T environment, FateStore store, boolean runDeadResCleaner, @Override public Duration getDeadResCleanupDelay() { - return Duration.ofSeconds(15); + return Duration.ofSeconds(5); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index e72a5fb4217..697d16faff6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.fate; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; @@ -29,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.lang.reflect.Method; import java.time.Duration; import java.util.ArrayList; @@ -50,7 +50,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.AdminUtil; @@ -65,6 +65,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -72,6 +73,8 @@ import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; import org.apache.accumulo.server.util.fateCommand.FateTxnDetails; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestRepo; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.functional.ReadWriteIT; import org.apache.accumulo.test.functional.SlowIterator; @@ -82,7 +85,7 @@ import org.junit.jupiter.api.Test; public abstract class FateOpsCommandsIT extends ConfigurableMacBase - implements FateTestRunner { + implements FateTestRunner { @Override protected Duration defaultTimeout() { @@ -91,12 +94,12 @@ protected Duration defaultTimeout() { @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - // Used for tests that shutdown the manager so the sleep time after shutdown isn't too long + // Keeps sleep time low after initiating the shutdown for the manager cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); } @BeforeEach - public void shutdownCompactor() throws Exception { + public void beforeEachSetup() throws Exception { // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes // this issue. @@ -110,10 +113,10 @@ public void testFateSummaryCommand() throws Exception { executeTest(this::testFateSummaryCommand); } - protected void testFateSummaryCommand(FateStore store, ServerContext sctx) + protected void testFateSummaryCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // validate blank report, no transactions have started ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); @@ -306,10 +309,10 @@ public void testFateSummaryCommandPlainText() throws Exception { executeTest(this::testFateSummaryCommandPlainText); } - protected void testFateSummaryCommandPlainText(FateStore store, ServerContext sctx) + protected void testFateSummaryCommandPlainText(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -334,10 +337,10 @@ public void testFatePrintCommand() throws Exception { executeTest(this::testFatePrintCommand); } - protected void testFatePrintCommand(FateStore store, ServerContext sctx) + protected void testFatePrintCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // validate no transactions ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print"); @@ -436,7 +439,7 @@ public void testTransactionNameAndStep() throws Exception { executeTest(this::testTransactionNameAndStep); } - protected void testTransactionNameAndStep(FateStore store, ServerContext sctx) + protected void testTransactionNameAndStep(FateStore store, ServerContext sctx) throws Exception { // Since the other tests just use NEW transactions for simplicity, there are some fields of the // summary and print outputs which are null and not tested for (transaction name and transaction @@ -504,10 +507,10 @@ public void testFateCancelCommand() throws Exception { executeTest(this::testFateCancelCommand); } - protected void testFateCancelCommand(FateStore store, ServerContext sctx) + protected void testFateCancelCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -533,14 +536,15 @@ protected void testFateCancelCommand(FateStore store, ServerContext sct } @Test - public void testFateFailCommand() throws Exception { - executeTest(this::testFateFailCommand); + public void testFateFailCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateFailCommandTimeout); } - protected void testFateFailCommand(FateStore store, ServerContext sctx) + protected void testFateFailCommandTimeout(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + LatchTestEnv env = new LatchTestEnv(); + FastFate fate = initFateWithDeadResCleaner(store, env); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -551,19 +555,50 @@ protected void testFateFailCommand(FateStore store, ServerContext sctx) assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Attempt to --fail the transaction. Should not work as the Manager is still up + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction("op1", fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to fail fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); - assertEquals(1, p.getProcess().waitFor()); + assertNotEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not fail " + fateId1 + " in a reasonable time")); fateIdsFromSummary = getFateIdsFromSummary(); - assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Stop MANAGER so --fail can be called - getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); - Thread.sleep(20_000); + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(10, TimeUnit.MINUTES); + } + + @Test + public void testFateFailCommandSuccess() throws Exception { + executeTest(this::testFateFailCommandSuccess); + } + + protected void testFateFailCommandSuccess(FateStore store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate fate = initFateNoDeadResCleaner(store); - // Fail the first transaction and ensure that it was failed - p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Try to fail fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); @@ -578,14 +613,15 @@ protected void testFateFailCommand(FateStore store, ServerContext sctx) } @Test - public void testFateDeleteCommand() throws Exception { - executeTest(this::testFateDeleteCommand); + public void testFateDeleteCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateDeleteCommandTimeout); } - protected void testFateDeleteCommand(FateStore store, ServerContext sctx) + protected void testFateDeleteCommandTimeout(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + LatchTestEnv env = new LatchTestEnv(); + FastFate fate = initFateWithDeadResCleaner(store, env); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -596,19 +632,50 @@ protected void testFateDeleteCommand(FateStore store, ServerContext sct assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Attempt to --delete the transaction. Should not work as the Manager is still up + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction("op1", fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to delete fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); - assertEquals(1, p.getProcess().waitFor()); + assertNotEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not delete " + fateId1 + " in a reasonable time")); fateIdsFromSummary = getFateIdsFromSummary(); - assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Stop MANAGER so --delete can be called - getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); - Thread.sleep(20_000); + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(10, TimeUnit.MINUTES); + } + + @Test + public void testFateDeleteCommandSuccess() throws Exception { + executeTest(this::testFateDeleteCommandSuccess); + } + + protected void testFateDeleteCommandSuccess(FateStore store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); - // Delete the first transaction and ensure that it was deleted - p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + // Check that summary output lists both the transactions with a NEW status + Map fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Try to delete fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); @@ -624,7 +691,7 @@ public void testFatePrintAndSummaryCommandsWithInProgressTxns() throws Exception executeTest(this::testFatePrintAndSummaryCommandsWithInProgressTxns); } - protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore store, + protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore store, ServerContext sctx) throws Exception { // This test was written for an issue with the 'admin fate --print' and 'admin fate --summary' // commands where transactions could complete mid-print causing the command to fail. These @@ -632,21 +699,22 @@ protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore mockedStore; + FateStore mockedStore; // This error was occurring in AdminUtil.getTransactionStatus(), so we will test this method. if (store.type().equals(FateInstanceType.USER)) { Method listMethod = UserFateStore.class.getMethod("list"); mockedStore = EasyMock.createMockBuilder(UserFateStore.class) - .withConstructor(ClientContext.class, ZooUtil.LockID.class, Predicate.class) - .withArgs(sctx, createDummyLockID(), null).addMockedMethod(listMethod).createMock(); + .withConstructor(ClientContext.class, String.class, ZooUtil.LockID.class, Predicate.class) + .withArgs(sctx, AccumuloTable.FATE.tableName(), null, null).addMockedMethod(listMethod) + .createMock(); } else { Method listMethod = MetaFateStore.class.getMethod("list"); mockedStore = EasyMock.createMockBuilder(MetaFateStore.class) .withConstructor(String.class, ZooReaderWriter.class, ZooUtil.LockID.class, Predicate.class) - .withArgs(sctx.getZooKeeperRoot() + Constants.ZFATE, sctx.getZooReaderWriter(), - createDummyLockID(), null) + .withArgs(sctx.getZooKeeperRoot() + Constants.ZFATE, sctx.getZooReaderWriter(), null, + null) .addMockedMethod(listMethod).createMock(); } @@ -781,11 +849,17 @@ private void validateFateDetails(Set details, int expDetailsSize } } - private Fate initializeFate(FateStore store) { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, false, Object::toString, config); + protected FastFate initFateWithDeadResCleaner(FateStore store, + LatchTestEnv env) { + // Using FastFate so the cleanup will run often. This ensures that the cleanup will run when + // there are reservations present and that the cleanup will not unexpectedly delete these live + // reservations + return new FastFate<>(env, store, true, Object::toString, DefaultConfiguration.getInstance()); + } + + protected Fate initFateNoDeadResCleaner(FateStore store) { + return new Fate<>(new LatchTestEnv(), store, false, Object::toString, + DefaultConfiguration.getInstance()); } private boolean wordIsTStatus(String word) { @@ -796,4 +870,20 @@ private boolean wordIsTStatus(String word) { } return true; } + + /** + * Stop the MANAGER. For some of our tests, we want to be able to seed transactions with our own + * test repos. We want our fate to reserve these transactions (and not the real fates running in + * the Manager as that will lead to exceptions since the real fates wouldn't be able to handle our + * test repos). So, we essentially have the fates created here acting as the real fates: they have + * the same threads running that the real fates would, use a fate store with a ZK lock, use the + * same locations to store fate data that the Manager does, and are running in a separate process + * from the Admin process. Note that we cannot simply use different locations for our fate data + * from Manager to keep our test env separate from Manager. Admin uses the real fate data + * locations, so our test must also use the real locations. + */ + protected void stopManager() throws InterruptedException, IOException { + getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); + Thread.sleep(20_000); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java index 244f7991116..dc9b7d22fc6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java @@ -34,6 +34,11 @@ default void executeTest(FateTestExecutor testMethod) throws Exception { AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); } + default void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } + interface FateTestExecutor { void execute(FateStore store, ServerContext sctx) throws Exception; } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index edd6a538597..ede8d463add 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -443,8 +443,8 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce // Create the new Fate/start the Fate threads (the work finder and the workers). // Don't run another dead reservation cleaner since we already have one running from fate1. - FastFate fate2 = new FastFate<>(testEnv2, store2, false, Object::toString, - DefaultConfiguration.getInstance()); + Fate fate2 = + new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); // Wait for the "dead" reservations to be deleted and picked up again (reserved using // fate2/store2/lock2 now). @@ -499,7 +499,7 @@ public String getReturn() { } } - public static class SleepingTestEnv { + public static class SleepingTestEnv extends FateTestRunner.TestEnv { public final Set executedOps = Collections.synchronizedSet(new HashSet<>()); public final int sleepTimeMs; @@ -542,7 +542,7 @@ public String getReturn() { } } - public static class LatchTestEnv { + public static class LatchTestEnv extends FateTestRunner.TestEnv { public final AtomicInteger numWorkers = new AtomicInteger(0); public final CountDownLatch workersLatch = new CountDownLatch(1); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java b/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java new file mode 100644 index 00000000000..e0588b36828 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLock { + private static final Logger log = LoggerFactory.getLogger(TestLock.class); + final CountDownLatch lockAcquiredLatch; + + public TestLock() { + this.lockAcquiredLatch = new CountDownLatch(1); + } + + /** + * Used to create a dummy lock id to be passed in the creation of a {@link UserFateStore} or a + * {@link MetaFateStore}. Useful as a quicker and simpler alternative to + * {@link TestLock#createTestLock(ServerContext)} for tests where reserving transactions is needed + * AND the reservations for the test will be stored in a different location from the Managers fate + * stores. Can always use {@link TestLock#createTestLock(ServerContext)} to be safe if unsure + * which to use. + */ + public static ZooUtil.LockID createDummyLockID() { + return new ZooUtil.LockID("/path", "node", 123); + } + + /** + * Used to create a real lock (one held in ZK) to be passed in the creation of a + * {@link UserFateStore} or a {@link MetaFateStore}. Useful for tests where reserving transactions + * is needed AND the reservations for the test will be stored in the same location as the Managers + * fate stores. This is needed so the Manager will recognize and not delete these reservations. + * See similar {@link TestLock#createDummyLockID()} + */ + public ServiceLock createTestLock(ServerContext context) throws InterruptedException { + var zk = context.getZooReaderWriter().getZooKeeper(); + UUID uuid = UUID.randomUUID(); + ServiceLockPaths.ServiceLockPath slp = context.getServerPaths().createTestLockPath(); + ServiceLock lock = new ServiceLock(context.getZooReaderWriter().getZooKeeper(), slp, uuid); + TestLockWatcher lw = new TestLockWatcher(); + ServiceLockData.ServiceDescriptors descriptors = new ServiceLockData.ServiceDescriptors(); + descriptors.addService(new ServiceLockData.ServiceDescriptor(uuid, + ServiceLockData.ThriftService.NONE, "localhost", Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + String lockPath = slp.toString(); + String parentLockPath = lockPath.substring(0, lockPath.indexOf("/lock")); + + try { + if (zk.exists(parentLockPath, false) == null) { + zk.create(parentLockPath, new byte[0], ZooUtil.PUBLIC, CreateMode.PERSISTENT); + log.info("Created: {}", parentLockPath); + } + if (zk.exists(lockPath, false) == null) { + zk.create(lockPath, new byte[0], ZooUtil.PUBLIC, CreateMode.PERSISTENT); + log.info("Created: {}", lockPath); + } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); + } + + lock.lock(lw, sld); + lockAcquiredLatch.await(); + + return lock; + } + + class TestLockWatcher implements ServiceLock.AccumuloLockWatcher { + + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + log.warn("Lost lock: " + reason.toString()); + } + + @Override + public void unableToMonitorLockNode(Exception e) { + log.warn("Unable to monitor lock: " + e.getMessage()); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for test"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for test, msg: " + e.getMessage()); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index c5f541b5e9d..596ab46711c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index d306e0bfefd..86e6ca7d4e2 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import java.util.UUID; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index c4c1e5b24a5..19e1be3e451 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -18,22 +18,59 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateOpsCommandsIT; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.TestLock; public class MetaFateOpsCommandsIT extends FateOpsCommandsIT { + /** + * This should be used for tests that will not seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateNoDeadResCleaner(FateStore)} + */ @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred, + public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - ServerContext sctx = getCluster().getServerContext(); - String path = sctx.getZooKeeperRoot() + Constants.ZFATE; - ZooReaderWriter zk = sctx.getZooReaderWriter(); - testMethod.execute(new MetaFateStore<>(path, zk, createDummyLockID(), null), sctx); + ServerContext context = getCluster().getServerContext(); + String path = context.getZooKeeperRoot() + Constants.ZFATE; + ZooReaderWriter zk = context.getZooReaderWriter(); + // test should not be reserving txns or checking reservations, so null lockID and isLockHeld + testMethod.execute(new MetaFateStore<>(path, zk, null, null), context); + } + + /** + * This should be used for tests that will seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateWithDeadResCleaner(FateStore, LatchTestEnv)} + */ + @Override + public void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + stopManager(); + ServerContext context = getCluster().getServerContext(); + String path = context.getZooKeeperRoot() + Constants.ZFATE; + ZooReaderWriter zk = context.getZooReaderWriter(); + ServiceLock testLock = null; + try { + testLock = new TestLock().createTestLock(context); + ZooUtil.LockID lockID = testLock.getLockID(); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + testMethod.execute(new MetaFateStore<>(path, zk, lockID, isLockHeld), context); + } finally { + if (testLock != null) { + testLock.unlock(); + } + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index af8b98db0f9..7eca386d9a6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 7f0383e6f4c..36aaf321d23 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import java.util.stream.StreamSupport; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java index 83a87db975d..781f78e6efa 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import org.apache.accumulo.core.client.Accumulo; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java index 3fe3192e6ba..92f5fae847f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java @@ -18,18 +18,56 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import java.util.function.Predicate; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.test.fate.FateOpsCommandsIT; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.TestLock; public class UserFateOpsCommandsIT extends FateOpsCommandsIT { + /** + * This should be used for tests that will not seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateNoDeadResCleaner(FateStore)} + */ @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred, + public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - testMethod.execute( - new UserFateStore<>(getCluster().getServerContext(), createDummyLockID(), null), - getCluster().getServerContext()); + var context = getCluster().getServerContext(); + // the test should not be reserving or checking reservations, so null lockID and isLockHeld + testMethod.execute(new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null), + context); + } + + /** + * This should be used for tests that will seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateWithDeadResCleaner(FateStore, LatchTestEnv)} + */ + @Override + public void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + stopManager(); + var context = getCluster().getServerContext(); + ServiceLock testLock = null; + try { + testLock = new TestLock().createTestLock(context); + ZooUtil.LockID lockID = testLock.getLockID(); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + testMethod.execute( + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld), + context); + } finally { + if (testLock != null) { + testLock.unlock(); + } + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java index c967fbea5ab..42e4b886c27 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import org.apache.accumulo.core.client.Accumulo; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index c82662182ce..8f852a9e3f2 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -37,9 +38,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.user.UserFateStore; @@ -62,8 +61,6 @@ public class UserFateStoreIT extends SharedMiniClusterBase { - private static final FateInstanceType fateInstanceType = FateInstanceType.USER; - @BeforeAll public static void setup() throws Exception { SharedMiniClusterBase.startMiniCluster(); @@ -134,7 +131,7 @@ public void setup() throws Exception { client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); tableName = getUniqueNames(1)[0]; createFateTable(client, tableName); - store = new UserFateStore<>(client, tableName, AbstractFateStore.createDummyLockID(), null); + store = new UserFateStore<>(client, tableName, createDummyLockID(), null); fateId = store.create(); txStore = store.reserve(fateId); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 5e5775110f9..c14c1a79dd9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -21,7 +21,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -57,6 +56,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; @@ -262,17 +262,18 @@ public void getFateStatus() { InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - MetaFateStore mfs = new MetaFateStore<>( - ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + Map> readOnlyFateStores = + Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); - withLocks = admin.getStatus(fateStores, zk, lockPath, null, null, null); + withLocks = admin.getStatus(readOnlyFateStores, zk, lockPath, null, null, null); // call method that does not use locks. - noLocks = admin.getTransactionStatus(fateStores, null, null, null); + noLocks = admin.getTransactionStatus(readOnlyFateStores, null, null, null); // no zk exception, no need to retry break; @@ -355,10 +356,11 @@ private boolean lookupFateInZookeeper(final String tableName) throws KeeperExcep InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - MetaFateStore mfs = new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, - zk, createDummyLockID(), null); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - AdminUtil.FateStatus fateStatus = admin.getStatus(mfs, zk, lockPath, null, null, null); + AdminUtil.FateStatus fateStatus = + admin.getStatus(readOnlyMFS, zk, lockPath, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); @@ -385,8 +387,9 @@ private boolean lookupFateInAccumulo(final String tableName) throws KeeperExcept log.trace("tid: {}", tableId); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - AdminUtil.FateStatus fateStatus = admin.getStatus(ufs, null, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + AdminUtil.FateStatus fateStatus = admin.getStatus(readOnlyUFS, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 28b08dbbf02..85d5426eea5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -234,13 +233,14 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) { AdminUtil admin = new AdminUtil<>(false); ServerContext context = cluster.getServerContext(); ZooReaderWriter zk = context.getZooReaderWriter(); - MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, - zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + Map> readOnlyFateStores = + Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); var lockPath = context.getServerPaths().createTableLocksPath(); - return admin.getStatus(fateStores, zk, lockPath, null, null, null); + return admin.getStatus(readOnlyFateStores, zk, lockPath, null, null, null); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); }