diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 13b7f2c7ed8..8519937f203 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -31,7 +31,6 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -44,6 +43,9 @@ import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.time.NanoTime; @@ -54,6 +56,10 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +// TODO 4131 should probably add support to AbstractFateStore, MetaFateStore, +// and UserFateStore to accept null lockID and zooCache (maybe make these fields +// Optional<>). This could replace the current createDummyLockID(). This support +// is needed since MFS and UFS aren't always created in the context of a Manager. public abstract class AbstractFateStore implements FateStore { private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); @@ -70,6 +76,9 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { } }; + // The ZooKeeper lock for the process that's running this store instance + protected final ZooUtil.LockID lockID; + protected final ZooCache zooCache; protected final Map deferred; private final int maxDeferred; private final AtomicBoolean deferredOverflow = new AtomicBoolean(); @@ -82,13 +91,20 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0); public AbstractFateStore() { - this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + this(null, null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } - public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) { + public AbstractFateStore(ZooUtil.LockID lockID, ZooCache zooCache) { + this(lockID, zooCache, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + } + + public AbstractFateStore(ZooUtil.LockID lockID, ZooCache zooCache, int maxDeferred, + FateIdGenerator fateIdGenerator) { this.maxDeferred = maxDeferred; this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); this.deferred = Collections.synchronizedMap(new HashMap<>()); + this.lockID = lockID; + this.zooCache = zooCache; } public static byte[] serialize(Object o) { @@ -317,6 +333,11 @@ public Optional> createAndReserve(FateKey fateKey) { return txStore; } + @Override + public boolean isDeadReservation(FateReservation reservation) { + return !ServiceLock.isLockHeld(zooCache, reservation.getLockID()); + } + protected abstract void create(FateId fateId, FateKey fateKey); protected abstract Pair> getStatusAndKey(FateId fateId); @@ -329,23 +350,28 @@ public Optional> createAndReserve(FateKey fateKey) { protected abstract FateTxStore newUnreservedFateTxStore(FateId fateId); - protected abstract boolean isReserved(FateId fateId); - - // TODO 4131 is public fine for this? Public for tests - public abstract List getReservedTxns(); - protected abstract class AbstractFateTxStoreImpl implements FateTxStore { protected final FateId fateId; protected boolean deleted; + protected FateReservation reservation; protected TStatus observedStatus = null; protected AbstractFateTxStoreImpl(FateId fateId) { this.fateId = fateId; this.deleted = false; + this.reservation = null; } - protected abstract boolean isReserved(); + protected AbstractFateTxStoreImpl(FateId fateId, FateReservation reservation) { + this.fateId = fateId; + this.deleted = false; + this.reservation = Objects.requireNonNull(reservation); + } + + protected boolean isReserved() { + return this.reservation != null; + } @Override public TStatus waitForStatusChange(EnumSet expected) { @@ -482,4 +508,14 @@ protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { throw new IllegalStateException("Bad node data " + txInfo); } } + + /** + * TODO 4131 this is a temporary method used to create a dummy lock when using a FateStore outside + * of the context of a Manager (one example is testing). + * + * @return a dummy {@link ZooUtil.LockID} + */ + public static ZooUtil.LockID createDummyLockID() { + return new ZooUtil.LockID("/path", "node", 123); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index eed785b39b2..b3d84b67c1f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -74,10 +74,14 @@ public class Fate { private final ExecutorService executor; private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); + private static boolean userDeadReservationCleanerRunning = false; + private static boolean metaDeadReservationCleanerRunning = false; private final AtomicBoolean keepRunning = new AtomicBoolean(true); private final TransferQueue workQueue; private final Thread workFinder; + // Will be null if this Fate instance is not running a DeadReservationCleaner + private Thread deadReservationCleaner; public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE @@ -289,6 +293,26 @@ private void undo(FateId fateId, Repo op) { } + /** + * A thread that finds reservations held by dead processes and unreserves them. Only one thread + * runs per store type across all Fate instances (one to clean up dead reservations for + * {@link org.apache.accumulo.core.fate.user.UserFateStore UserFateStore} and one to clean up dead + * reservations for {@link MetaFateStore}). + */ + private class DeadReservationCleaner implements Runnable { + // TODO 4131 periodic check runs every 30 seconds + // Should this be longer? Shorter? A configurable Property? A function of something? + private static final long INTERVAL_MILLIS = 30_000; + + @Override + public void run() { + while (keepRunning.get()) { + store.deleteDeadReservations(); + UtilWaitThread.sleep(INTERVAL_MILLIS); + } + } + } + /** * Creates a Fault-tolerant executor. * @@ -332,6 +356,33 @@ public Fate(T environment, FateStore store, Function,String> toLogStr this.workFinder.start(); } + /** + * Starts a thread that periodically cleans up "dead reservations" (see + * {@link FateStore#deleteDeadReservations()}). Only one thread is started per store type + * ({@link FateInstanceType}) for subsequent calls to this method. + */ + public void startDeadReservationCleaner() { + // TODO 4131 this is not ideal starting this thread in its own start method, but the other + // threads in the constructor. However, starting this thread in the constructor causes + // a Maven build failure, and do not want to move starting the other threads into a + // method in this PR... should be done standalone (see issue#4609). + + if ((store.type().equals(FateInstanceType.USER) && !userDeadReservationCleanerRunning) + || store.type().equals(FateInstanceType.META) && !metaDeadReservationCleanerRunning) { + if (store.type().equals(FateInstanceType.USER)) { + this.deadReservationCleaner = + Threads.createThread("USER dead reservation cleaner", new DeadReservationCleaner()); + userDeadReservationCleanerRunning = true; + } else if (store.type().equals(FateInstanceType.META)) { + this.deadReservationCleaner = + Threads.createThread("META dead reservation cleaner", new DeadReservationCleaner()); + metaDeadReservationCleanerRunning = true; + } + this.deadReservationCleaner.start(); + } + + } + // get a transaction id back to the requester before doing any work public FateId startTransaction() { return store.create(); @@ -498,13 +549,17 @@ public void shutdown(long timeout, TimeUnit timeUnit) { fatePoolWatcher.shutdown(); executor.shutdown(); workFinder.interrupt(); + if (deadReservationCleaner != null) { + deadReservationCleaner.interrupt(); + } } if (timeout > 0) { long start = System.nanoTime(); - while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) - && (workFinder.isAlive() || !executor.isTerminated())) { + while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) && (workFinder.isAlive() + || (deadReservationCleaner != null && deadReservationCleaner.isAlive()) + || !executor.isTerminated())) { try { if (!executor.awaitTermination(1, SECONDS)) { log.debug("Fate {} is waiting for worker threads to terminate", store.type()); @@ -516,19 +571,40 @@ public void shutdown(long timeout, TimeUnit timeUnit) { log.debug("Fate {} is waiting for work finder thread to terminate", store.type()); workFinder.interrupt(); } + + if (deadReservationCleaner != null) { + deadReservationCleaner.join(1_000); + } + if (deadReservationCleaner != null && deadReservationCleaner.isAlive()) { + log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate", + store.type()); + deadReservationCleaner.interrupt(); + } } catch (InterruptedException e) { throw new RuntimeException(e); } } - if (workFinder.isAlive() || !executor.isTerminated()) { + if (workFinder.isAlive() + || (deadReservationCleaner != null && deadReservationCleaner.isAlive()) + || !executor.isTerminated()) { log.warn( - "Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} executor:{}", + "Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} deadReservationCleaner:{} executor:{}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(), - workFinder.isAlive(), !executor.isTerminated()); + workFinder.isAlive(), + (deadReservationCleaner != null && deadReservationCleaner.isAlive()), + !executor.isTerminated()); } } + // Update that USER/META dead reservation cleaner is no longer running + if (deadReservationCleaner != null && !deadReservationCleaner.isAlive()) { + if (store.type().equals(FateInstanceType.USER)) { + userDeadReservationCleanerRunning = false; + } else if (store.type().equals(FateInstanceType.META)) { + metaDeadReservationCleanerRunning = false; + } + } // interrupt the background threads executor.shutdownNow(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index f93113112a3..1161f4ca2c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -18,9 +18,20 @@ */ package org.apache.accumulo.core.fate; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.hadoop.io.DataInputBuffer; /** * Transaction Store: a place to save transactions @@ -107,6 +118,157 @@ interface FateTxStore extends ReadOnlyFateTxStore { void unreserve(long deferTime, TimeUnit timeUnit); } + /** + * The value stored to indicate a FATE transaction ID ({@link FateId}) has been reserved + */ + class FateReservation { + + // The LockID (provided by the Manager running the FATE which uses this store) which is used for + // identifying dead Managers, so their reservations can be deleted and picked up again since + // they can no longer be worked on. + private final ZooUtil.LockID lockID; // TODO 4131 not sure if this is the best type for this + // The UUID generated on a reservation attempt (tryReserve()) used to uniquely identify that + // attempt. This is useful for the edge case where the reservation is sent to the server + // (Tablet Server for UserFateStore and the ZooKeeper Server for MetaFateStore), but the server + // dies before the store receives the response. It allows us to determine if the reservation + // was successful and was written by this reservation attempt (could have been successfully + // reserved by another attempt or not reserved at all, in which case, we wouldn't want to + // expose a FateTxStore). + private final UUID reservationUUID; + private final byte[] serialized; + private static final Pattern UUID_PATTERN = + Pattern.compile("^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"); + private static final Pattern LOCKID_PATTERN = Pattern.compile("^.+/.+\\$[0-9a-fA-F]+$"); + + private FateReservation(ZooUtil.LockID lockID, UUID reservationUUID) { + this.lockID = Objects.requireNonNull(lockID); + this.reservationUUID = Objects.requireNonNull(reservationUUID); + this.serialized = serialize(lockID, reservationUUID); + } + + public static FateReservation from(ZooUtil.LockID lockID, UUID reservationUUID) { + return new FateReservation(lockID, reservationUUID); + } + + public static FateReservation from(byte[] serialized) { + try (DataInputBuffer buffer = new DataInputBuffer()) { + buffer.reset(serialized, serialized.length); + ZooUtil.LockID lockID = new ZooUtil.LockID("", buffer.readUTF()); + UUID reservationUUID = UUID.fromString(buffer.readUTF()); + return new FateReservation(lockID, reservationUUID); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static FateReservation from(String fateReservationStr) { + if (isFateReservation(fateReservationStr)) { + String[] fields = fateReservationStr.split(":"); + ZooUtil.LockID lockId = new ZooUtil.LockID("", fields[0]); + UUID reservationUUID = UUID.fromString(fields[1]); + return new FateReservation(lockId, reservationUUID); + } else { + throw new IllegalArgumentException( + "Tried to create a FateReservation from an invalid string: " + fateReservationStr); + } + } + + /** + * + * @param fateReservationStr the string from a call to FateReservations toString() + * @return true if the string represents a valid FateReservation object, false otherwise + */ + public static boolean isFateReservation(String fateReservationStr) { + if (fateReservationStr != null) { + String[] fields = fateReservationStr.split(":"); + if (fields.length == 2) { + return LOCKID_PATTERN.matcher(fields[0]).matches() + && UUID_PATTERN.matcher(fields[1]).matches(); + } + } + return false; + } + + public ZooUtil.LockID getLockID() { + return lockID; + } + + public UUID getReservationUUID() { + return reservationUUID; + } + + public byte[] getSerialized() { + return serialized; + } + + private static byte[] serialize(ZooUtil.LockID lockID, UUID reservationUUID) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(lockID.serialize("/")); + dos.writeUTF(reservationUUID.toString()); + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static FateReservation deserialize(byte[] serialized) { + return FateReservation.from(serialized); + } + + public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID lockID2) { + return lockID1.serialize("/").equals(lockID2.serialize("/")); + } + + @Override + public String toString() { + return lockID.serialize("/") + ":" + reservationUUID; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FateReservation) { + FateReservation other = (FateReservation) obj; + return this.lockID.serialize("/").equals(other.lockID.serialize("/")) + && this.reservationUUID.equals(other.reservationUUID); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(lockID, reservationUUID); + } + } + + /** + * @param fateId the fateId to check + * @return true if the given fate id is reserved, false otherwise + */ + boolean isReserved(FateId fateId); + + /** + * @return a map of the current active reservations with the keys being the transaction that is + * reserved and the value being the value stored to indicate the transaction is reserved. + */ + Map getActiveReservations(); + + /** + * Deletes the current reservations which were reserved by a now dead Manager. These reservations + * can no longer be worked on so their reservation should be deleted, so they can be picked up and + * worked on again. + */ + void deleteDeadReservations(); + + /** + * The way dead reservations are determined for {@link #deleteDeadReservations()} + * + * @param reservation the fate reservation + * @return true if reservation held by a dead Manager, false otherwise + */ + boolean isDeadReservation(FateReservation reservation); + /** * Attempt to reserve the fate transaction. * diff --git a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java index 612a60d6fc8..d6f34d343c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java @@ -28,16 +28,18 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -63,25 +65,23 @@ public class MetaFateStore extends AbstractFateStore { private static final FateInstanceType fateInstanceType = FateInstanceType.META; private String path; private ZooReaderWriter zk; - // The ZooKeeper lock for the process that's running this store instance - private ZooUtil.LockID lockID; private String getTXPath(FateId fateId) { return path + "/tx_" + fateId.getTxUUIDStr(); } - public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID) + public MetaFateStore(String path, ZooReaderWriter zk, ZooCache zooCache, ZooUtil.LockID lockID) throws KeeperException, InterruptedException { - this(path, zk, lockID, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + this(path, zk, zooCache, lockID, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @VisibleForTesting - public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID, int maxDeferred, - FateIdGenerator fateIdGenerator) throws KeeperException, InterruptedException { - super(maxDeferred, fateIdGenerator); + public MetaFateStore(String path, ZooReaderWriter zk, ZooCache zooCache, ZooUtil.LockID lockID, + int maxDeferred, FateIdGenerator fateIdGenerator) + throws KeeperException, InterruptedException { + super(lockID, zooCache, maxDeferred, fateIdGenerator); this.path = path; this.zk = zk; - this.lockID = lockID; zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } @@ -96,7 +96,7 @@ public FateId create() { while (true) { try { FateId fateId = FateId.from(fateInstanceType, UUID.randomUUID()); - zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, "", "").serialize(), + zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null).serialize(), NodeExistsPolicy.FAIL); return fateId; } catch (NodeExistsException nee) { @@ -110,7 +110,7 @@ public FateId create() { @Override protected void create(FateId fateId, FateKey key) { try { - zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, "", "", key).serialize(), + zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null, key).serialize(), NodeExistsPolicy.FAIL); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); @@ -119,8 +119,12 @@ protected void create(FateId fateId, FateKey key) { @Override public Optional> tryReserve(FateId fateId) { + // return an empty option if the FateId doesn't exist + if (_getStatus(fateId).equals(TStatus.UNKNOWN)) { + return Optional.empty(); + } // uniquely identify this attempt to reserve the fate operation data - UUID uuid = UUID.randomUUID(); + FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); try { byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { @@ -128,18 +132,18 @@ public Optional> tryReserve(FateId fateId) { // The uuid handles the case where there was a ZK server fault and the write for this thread // went through but that was not acknowledged, and we are reading our own write for 2nd // time. - if (!currNodeVal.isReserved() || currNodeVal.uuid.equals(uuid.toString())) { + if (!currNodeVal.isReserved() || (currNodeVal.isReserved() + && currNodeVal.reservation.orElseThrow().equals(reservation))) { FateKey currFateKey = currNodeVal.fateKey.orElse(null); - // Add the lock and uuid to the node to reserve - return new NodeValue(currNodeVal.status, lockID.serialize(""), uuid.toString(), - currFateKey).serialize(); + // Add the FateReservation to the node to reserve + return new NodeValue(currNodeVal.status, reservation, currFateKey).serialize(); } else { // This will not change the value to null but will return null return null; } }); if (newSerNodeVal != null) { - return Optional.of(new FateTxStoreImpl(fateId, uuid)); + return Optional.of(new FateTxStoreImpl(fateId, reservation)); } else { return Optional.empty(); } @@ -149,7 +153,7 @@ public Optional> tryReserve(FateId fateId) { } @Override - protected boolean isReserved(FateId fateId) { + public boolean isReserved(FateId fateId) { boolean isReserved; try { isReserved = getNode(fateId).isReserved(); @@ -160,20 +164,50 @@ protected boolean isReserved(FateId fateId) { return isReserved; } - // TODO 4131 is public fine for this? Public for tests @Override - public List getReservedTxns() { + public Map getActiveReservations() { + Map activeReservations = new HashMap<>(); + try { - return zk.getChildren(path).stream().filter(strTxId -> { - String txUUIDStr = strTxId.split("_")[1]; - return isReserved(FateId.from(fateInstanceType, txUUIDStr)); - }).map(strTxId -> { + for (String strTxId : zk.getChildren(path)) { String txUUIDStr = strTxId.split("_")[1]; - return FateId.from(fateInstanceType, txUUIDStr); - }).collect(Collectors.toList()); + FateId fateId = FateId.from(fateInstanceType, txUUIDStr); + if (isReserved(fateId)) { + FateReservation reservation = getNode(fateId).reservation.orElseThrow(); + activeReservations.put(fateId, reservation); + } + } } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); } + + return activeReservations; + } + + @Override + public void deleteDeadReservations() { + for (Map.Entry entry : getActiveReservations().entrySet()) { + FateId fateId = entry.getKey(); + FateReservation reservation = entry.getValue(); + try { + zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { + NodeValue currNodeVal = new NodeValue(currSerNodeVal); + // Make sure the current node is still reserved and reserved with the expected reservation + // and it is dead + if (currNodeVal.isReserved() && currNodeVal.reservation.orElseThrow().equals(reservation) + && isDeadReservation(currNodeVal.reservation.orElseThrow())) { + // Delete the reservation + return new NodeValue(currNodeVal.status, null, currNodeVal.fateKey.orElse(null)) + .serialize(); + } else { + // No change + return null; + } + }); + } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { + throw new RuntimeException(e); + } + } } @Override @@ -188,21 +222,13 @@ public FateInstanceType type() { } private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private UUID reservationUUID; private FateTxStoreImpl(FateId fateId) { super(fateId); - this.reservationUUID = null; } - private FateTxStoreImpl(FateId fateId, UUID reservationUUID) { - super(fateId); - this.reservationUUID = Objects.requireNonNull(reservationUUID); - } - - @Override - protected boolean isReserved() { - return reservationUUID != null; + private FateTxStoreImpl(FateId fateId, FateReservation reservation) { + super(fateId, reservation); } private static final int RETRIES = 10; @@ -301,9 +327,9 @@ public void setStatus(TStatus status) { try { zk.mutateExisting(getTXPath(fateId), currSerializedData -> { NodeValue currNodeVal = new NodeValue(currSerializedData); + FateReservation currFateReservation = currNodeVal.reservation.orElse(null); FateKey currFateKey = currNodeVal.fateKey.orElse(null); - NodeValue newNodeValue = - new NodeValue(status, currNodeVal.lockID, currNodeVal.uuid, currFateKey); + NodeValue newNodeValue = new NodeValue(status, currFateReservation, currFateKey); return newNodeValue.serialize(); }); } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { @@ -404,9 +430,10 @@ protected void unreserve() { zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { NodeValue currNodeVal = new NodeValue(currSerNodeVal); FateKey currFateKey = currNodeVal.fateKey.orElse(null); - if (currNodeVal.uuid.equals(reservationUUID.toString())) { - // Remove the lock and uuid from the NodeValue to unreserve - return new NodeValue(currNodeVal.status, "", "", currFateKey).serialize(); + if ((currNodeVal.isReserved() + && currNodeVal.reservation.orElseThrow().equals(this.reservation))) { + // Remove the FateReservation from the NodeValue to unreserve + return new NodeValue(currNodeVal.status, null, currFateKey).serialize(); } else { // possible this is running a 2nd time in zk server fault conditions and its first // write went through @@ -414,7 +441,7 @@ protected void unreserve() { } }); } - this.reservationUUID = null; + this.reservation = null; } catch (InterruptedException | KeeperException | AcceptableThriftTableOperationException e) { throw new IllegalStateException(e); } @@ -445,7 +472,7 @@ private NodeValue getNode(FateId fateId) { try { return new NodeValue(zk.getData(getTXPath(fateId))); } catch (NoNodeException nne) { - return new NodeValue(TStatus.UNKNOWN, "", ""); + return new NodeValue(TStatus.UNKNOWN, null); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -486,34 +513,26 @@ public Stream list(FateKey.FateKeyType type) { protected static class NodeValue { final TStatus status; final Optional fateKey; - final String lockID; - final String uuid; + final Optional reservation; private NodeValue(byte[] serializedData) { try (DataInputBuffer buffer = new DataInputBuffer()) { buffer.reset(serializedData, serializedData.length); - TStatus tempStatus = TStatus.valueOf(buffer.readUTF()); - String tempLockID = buffer.readUTF(); - String tempUUID = buffer.readUTF(); - validateLockAndUUID(tempLockID, tempUUID); - this.status = tempStatus; - this.lockID = tempLockID; - this.uuid = tempUUID; + this.status = TStatus.valueOf(buffer.readUTF()); + this.reservation = deserializeFateReservation(buffer); this.fateKey = deserializeFateKey(buffer); } catch (IOException e) { throw new UncheckedIOException(e); } } - private NodeValue(TStatus status, String lockID, String uuid) { - this(status, lockID, uuid, null); + private NodeValue(TStatus status, FateReservation reservation) { + this(status, reservation, null); } - private NodeValue(TStatus status, String lockID, String uuid, FateKey fateKey) { - validateLockAndUUID(lockID, uuid); - this.status = status; - this.lockID = lockID; - this.uuid = uuid; + private NodeValue(TStatus status, FateReservation reservation, FateKey fateKey) { + this.status = Objects.requireNonNull(status); + this.reservation = Optional.ofNullable(reservation); this.fateKey = Optional.ofNullable(fateKey); } @@ -525,12 +544,26 @@ private Optional deserializeFateKey(DataInputBuffer buffer) throws IOEx return Optional.empty(); } + private Optional deserializeFateReservation(DataInputBuffer buffer) + throws IOException { + int length = buffer.readInt(); + if (length > 0) { + return Optional.of(FateReservation.deserialize(buffer.readNBytes(length))); + } + return Optional.empty(); + } + byte[] serialize() { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { dos.writeUTF(status.name()); - dos.writeUTF(lockID); - dos.writeUTF(uuid); + if (isReserved()) { + byte[] serializedFateReservation = reservation.orElseThrow().getSerialized(); + dos.writeInt(serializedFateReservation.length); + dos.write(serializedFateReservation); + } else { + dos.writeInt(0); + } if (fateKey.isPresent()) { byte[] serializedFateKey = fateKey.orElseThrow().getSerialized(); dos.writeInt(serializedFateKey.length); @@ -546,15 +579,7 @@ byte[] serialize() { } public boolean isReserved() { - return !lockID.isEmpty(); - } - - private void validateLockAndUUID(String lockID, String uuid) { - // TODO 4131 potentially need further validation? - if (!((lockID.isEmpty() && uuid.isEmpty()) || (!lockID.isEmpty() && !uuid.isEmpty()))) { - throw new IllegalArgumentException( - "One but not both of lock = '" + lockID + "' and uuid = '" + uuid + "' are empty"); - } + return reservation.isPresent(); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/ColumnValueMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/ColumnValueMappingIterator.java new file mode 100644 index 00000000000..1d73608350a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/ColumnValueMappingIterator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import java.io.IOException; +import java.util.Collection; +import java.util.Objects; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +public abstract class ColumnValueMappingIterator implements SortedKeyValueIterator { + + protected SortedKeyValueIterator source; + protected Value mappedValue; + + protected abstract void mapValue(); + + @Override + public boolean hasTop() { + return source.hasTop(); + } + + @Override + public void next() throws IOException { + source.next(); + mapValue(); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + source.seek(range, columnFamilies, inclusive); + mapValue(); + } + + @Override + public Key getTopKey() { + return source.getTopKey(); + } + + @Override + public Value getTopValue() { + return Objects.requireNonNull(mappedValue); + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 8042df117cb..e06d8638eac 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -19,10 +19,11 @@ package org.apache.accumulo.core.fate.user; import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.user.schema.FateSchema; public interface FateMutator { @@ -32,13 +33,54 @@ public interface FateMutator { FateMutator putCreateTime(long ctime); - FateMutator putReservedTx(FateId fateId); - - FateMutator putUnreserveTx(FateId fateId); - - FateMutator putInitReserveColVal(FateId fateId); - - FateMutator requireReserved(FateId fateId); + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * put the reservation if there is not already a reservation present + * + * @param reservation the reservation to attempt to put + * @return the FateMutator with this added mutation + */ + FateMutator putReservedTx(FateStore.FateReservation reservation); + + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * remove the given reservation if it matches what is present in the column. + * + * @param reservation the reservation to attempt to remove + * @return the FateMutator with this added mutation + */ + FateMutator putUnreserveTx(FateStore.FateReservation reservation); + + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * put the initial column value if it has not already been set yet + * + * @return the FateMutator with this added mutation + */ + FateMutator putInitReserveColVal(); + + /** + * Require that the transaction is reserved with a specific {@link FateStore.FateReservation} + * + * @param reservation the reservation + * @return the FateMutator with the added condition + */ + FateMutator requireReserved(FateStore.FateReservation reservation); + + /** + * Require that the transaction is reserved (can be reserved with any + * {@link FateStore.FateReservation}) + * + * @return the FateMutator with the added condition + */ + FateMutator requireReserved(); + + /** + * Require that the transaction is unreserved + * + * @return the FateMutator with the added condition + */ + FateMutator requireUnreserved(); FateMutator putName(byte[] data); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index 28ad17144ab..550e21755ab 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; @@ -49,9 +50,7 @@ public class FateMutatorImpl implements FateMutator { - // TODO 4131 these can be changed/simplified to take up less space/memory in the table - protected static final String IS_RESERVED = "isreserved"; - protected static final String NOT_RESERVED = "notreserved"; + protected static final String NOT_RESERVED = ""; private final ClientContext context; private final String tableName; @@ -84,39 +83,50 @@ public FateMutator putCreateTime(long ctime) { } @Override - public FateMutator putReservedTx(FateId fateId) { - // Require that the column value is NOT_RESERVED (the FateId is not reserved) - Condition condition = new Condition(TxColumnFamily.RESERVED_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVED_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED); + public FateMutator putReservedTx(FateStore.FateReservation reservation) { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED); mutation.addCondition(condition); - TxColumnFamily.RESERVED_COLUMN.put(mutation, new Value(IS_RESERVED)); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.toString())); return this; } @Override - public FateMutator putUnreserveTx(FateId fateId) { - // Require that the column value is IS_RESERVED (the FateId is reserved) - Condition condition = new Condition(TxColumnFamily.RESERVED_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVED_COLUMN.getColumnQualifier()).setValue(IS_RESERVED); + public FateMutator putUnreserveTx(FateStore.FateReservation reservation) { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(reservation.toString()); mutation.addCondition(condition); - TxColumnFamily.RESERVED_COLUMN.put(mutation, new Value(NOT_RESERVED)); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); return this; } @Override - public FateMutator putInitReserveColVal(FateId fateId) { - // Require that the column does not have a set value yet - Condition condition = new Condition(TxColumnFamily.RESERVED_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVED_COLUMN.getColumnQualifier()); + public FateMutator putInitReserveColVal() { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); mutation.addCondition(condition); - TxColumnFamily.RESERVED_COLUMN.put(mutation, new Value(NOT_RESERVED)); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); return this; } @Override - public FateMutator requireReserved(FateId fateId) { - Condition condition = new Condition(TxColumnFamily.RESERVED_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVED_COLUMN.getColumnQualifier()).setValue(IS_RESERVED); + public FateMutator requireReserved(FateStore.FateReservation reservation) { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(reservation.toString()); + mutation.addCondition(condition); + return this; + } + + @Override + public FateMutator requireReserved() { + Condition condition = ReservationMappingIterator.createRequireReservedCondition(); + mutation.addCondition(condition); + return this; + } + + @Override + public FateMutator requireUnreserved() { + Condition condition = ReservationMappingIterator.createRequireUnreservedCondition(); mutation.addCondition(condition); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/ReservationMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/ReservationMappingIterator.java new file mode 100644 index 00000000000..8a04c435f14 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/ReservationMappingIterator.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import static org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily.RESERVATION_COLUMN; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * An iterator used for determining if the reservation column for a FateId has a FateReservation set + * or not. Maps the value of the column to "isreserved" or "notreserved" if the column has a + * FateReservation value set or not. + */ +public class ReservationMappingIterator extends ColumnValueMappingIterator { + + private static final String IS_RESERVED = "isreserved"; + private static final String NOT_RESERVED = "notreserved"; + + @Override + public void init(SortedKeyValueIterator source, Map options, + IteratorEnvironment env) throws IOException { + this.source = source; + // No need for options or env + } + + @Override + protected void mapValue() { + if (hasTop()) { + String currVal = source.getTopValue().toString(); + mappedValue = FateStore.FateReservation.isFateReservation(currVal) ? new Value(IS_RESERVED) + : new Value(NOT_RESERVED); + } else { + mappedValue = null; + } + } + + public static Condition createRequireReservedCondition() { + Condition condition = new Condition(RESERVATION_COLUMN.getColumnFamily(), + RESERVATION_COLUMN.getColumnQualifier()); + IteratorSetting is = new IteratorSetting(100, ReservationMappingIterator.class); + + return condition.setValue(IS_RESERVED).setIterators(is); + } + + public static Condition createRequireUnreservedCondition() { + Condition condition = new Condition(RESERVATION_COLUMN.getColumnFamily(), + RESERVATION_COLUMN.getColumnQualifier()); + IteratorSetting is = new IteratorSetting(100, ReservationMappingIterator.class); + + return condition.setValue(NOT_RESERVED).setIterators(is); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java index 1a0fae5aa3e..d9c667b25b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/StatusMappingIterator.java @@ -22,18 +22,14 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashSet; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Condition; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.iterators.IteratorEnvironment; @@ -44,15 +40,13 @@ * iterator allows for checking of the status column's value against a set of acceptable statuses * within a conditional mutation. */ -public class StatusMappingIterator implements SortedKeyValueIterator { +public class StatusMappingIterator extends ColumnValueMappingIterator { private static final String PRESENT = "present"; private static final String ABSENT = "absent"; private static final String STATUS_SET_KEY = "statusSet"; - private SortedKeyValueIterator source; private final Set acceptableStatuses = new HashSet<>(); - private Value mappedValue; /** * The set of acceptable must be provided as an option to the iterator using the @@ -70,29 +64,12 @@ public void init(SortedKeyValueIterator source, Map op } } - @Override - public boolean hasTop() { - return source.hasTop(); - } - - @Override - public void next() throws IOException { - source.next(); - mapValue(); - } - - @Override - public void seek(Range range, Collection columnFamilies, boolean inclusive) - throws IOException { - source.seek(range, columnFamilies, inclusive); - mapValue(); - } - /** * Maps the value of the status column to "present" or "absent" based on its presence within the * set of statuses. */ - private void mapValue() { + @Override + protected void mapValue() { if (source.hasTop()) { String currentValue = source.getTopValue().toString(); mappedValue = @@ -102,21 +79,6 @@ private void mapValue() { } } - @Override - public Key getTopKey() { - return source.getTopKey(); - } - - @Override - public Value getTopValue() { - return Objects.requireNonNull(mappedValue); - } - - @Override - public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - throw new UnsupportedOperationException(); - } - /** * Creates a condition that checks if the status column's value is one of the given acceptable * statuses. diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index f0769b55e75..98385a5ffc1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -19,7 +19,9 @@ package org.apache.accumulo.core.fate.user; import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; @@ -45,6 +47,7 @@ import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; @@ -69,20 +72,20 @@ public class UserFateStore extends AbstractFateStore { private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, maxRepos); - public UserFateStore(ClientContext context, String tableName) { - this(context, tableName, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID) { + this(context, tableName, lockID, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @VisibleForTesting - public UserFateStore(ClientContext context, String tableName, int maxDeferred, - FateIdGenerator fateIdGenerator) { - super(maxDeferred, fateIdGenerator); + public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, + int maxDeferred, FateIdGenerator fateIdGenerator) { + super(lockID, context.getZooCache(), maxDeferred, fateIdGenerator); this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); } - public UserFateStore(ClientContext context) { - this(context, AccumuloTable.FATE.tableName()); + public UserFateStore(ClientContext context, ZooUtil.LockID lockID) { + this(context, AccumuloTable.FATE.tableName(), lockID); } @Override @@ -98,7 +101,7 @@ public FateId create() { } var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) - .putCreateTime(System.currentTimeMillis()).putInitReserveColVal(fateId).tryMutate(); + .putCreateTime(System.currentTimeMillis()).putInitReserveColVal().tryMutate(); switch (status) { case ACCEPTED: @@ -131,7 +134,7 @@ protected void create(FateId fateId, FateKey fateKey) { } var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) - .putCreateTime(System.currentTimeMillis()).putInitReserveColVal(fateId).tryMutate(); + .putCreateTime(System.currentTimeMillis()).putInitReserveColVal().tryMutate(); switch (status) { case ACCEPTED: @@ -152,35 +155,72 @@ protected void create(FateId fateId, FateKey fateKey) { @Override public Optional> tryReserve(FateId fateId) { - // TODO 4131 should this throw an exception if the id doesn't exist (status = UNKNOWN)? - FateMutator.Status status = newMutator(fateId).putReservedTx(fateId).tryMutate(); + // Create a unique FateReservation for this reservation attempt + FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); + + FateMutator.Status status = newMutator(fateId).putReservedTx(reservation).tryMutate(); if (status.equals(FateMutator.Status.ACCEPTED)) { - return Optional.of(new FateTxStoreImpl(fateId, true)); - } else { - return Optional.empty(); + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else if (status.equals(FateMutator.Status.UNKNOWN)) { + // If the status is UNKNOWN, this means an error occurred after the mutation was + // sent to the TabletServer, and it is unknown if the mutation was written. We + // need to check if the mutation was written and if it was written by this + // attempt at reservation. If it was written by this reservation attempt, + // we can return the FateTxStore since it was successfully reserved in this + // attempt, otherwise we return empty (was written by another reservation + // attempt or was not written at all). + status = newMutator(fateId).requireReserved(reservation).tryMutate(); + if (status.equals(FateMutator.Status.ACCEPTED)) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } } + return Optional.empty(); } @Override - protected boolean isReserved(FateId fateId) { - return newMutator(fateId).requireReserved(fateId).tryMutate() - .equals(FateMutator.Status.ACCEPTED); + public boolean isReserved(FateId fateId) { + return newMutator(fateId).requireReserved().tryMutate().equals(FateMutator.Status.ACCEPTED); } - // TODO 4131 is public fine for this? Public for tests @Override - public List getReservedTxns() { + public Map getActiveReservations() { + Map activeReservations = new HashMap<>(); + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(new Range()); - scanner.fetchColumn(TxColumnFamily.RESERVED_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVED_COLUMN.getColumnQualifier()); - return scanner.stream() - .filter(e -> e.getValue().toString().equals(FateMutatorImpl.IS_RESERVED)) - .map(e -> FateId.from(fateInstanceType, e.getKey().getRow().toString())) - .collect(Collectors.toList()); + scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + scanner.stream() + .filter(entry -> FateReservation.isFateReservation(entry.getValue().toString())) + .forEach(entry -> { + String reservationColVal = entry.getValue().toString(); + FateId fateId = FateId.from(fateInstanceType, entry.getKey().getRow().toString()); + FateReservation reservation = FateReservation.from(reservationColVal); + activeReservations.put(fateId, reservation); + }); } catch (TableNotFoundException e) { throw new IllegalStateException(tableName + " not found!", e); } + + return activeReservations; + } + + @Override + public void deleteDeadReservations() { + for (Entry entry : getActiveReservations().entrySet()) { + FateId fateId = entry.getKey(); + FateReservation reservation = entry.getValue(); + if (isDeadReservation(reservation)) { + newMutator(fateId).putUnreserveTx(reservation).tryMutate(); + // No need to check the status... If it is ACCEPTED, we have successfully unreserved + // the dead transaction. If it is REJECTED, the reservation has changed (i.e., + // has been unreserved so no need to do anything, or has been unreserved and reserved + // again in which case we don't want to change it). If it is UNKNOWN, the mutation + // may or may not have been written. If it was written, we have successfully unreserved + // the dead transaction. If it was not written, the next cycle/call to + // deleteDeadReservations() will try again. + } + } } @Override @@ -268,7 +308,7 @@ protected Pair> getStatusAndKey(FateId fateId) { @Override protected FateTxStore newUnreservedFateTxStore(FateId fateId) { - return new FateTxStoreImpl(fateId, false); + return new FateTxStoreImpl(fateId); } static Range getRow(FateId fateId) { @@ -297,16 +337,13 @@ public FateInstanceType type() { } private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private boolean isReserved; - private FateTxStoreImpl(FateId fateId, boolean isReserved) { + private FateTxStoreImpl(FateId fateId) { super(fateId); - this.isReserved = isReserved; } - @Override - protected boolean isReserved() { - return isReserved; + private FateTxStoreImpl(FateId fateId, FateReservation reservation) { + super(fateId, reservation); } @Override @@ -450,13 +487,13 @@ private Optional findTop() { @Override protected void unreserve() { - if (!this.deleted) { - FateMutator.Status status = newMutator(fateId).putUnreserveTx(fateId).tryMutate(); - if (!status.equals(FateMutator.Status.ACCEPTED)) { - throw new IllegalStateException("Failed to unreserve " + fateId); - } + if (!deleted) { + FateMutator.Status status; + do { + status = newMutator(fateId).putUnreserveTx(reservation).tryMutate(); + } while (status.equals(FateMutator.Status.UNKNOWN)); } - this.isReserved = false; + reservation = null; } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java index 07777687574..012e2853ff2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java @@ -36,8 +36,8 @@ public static class TxColumnFamily { public static final String CREATE_TIME = "ctime"; public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME)); - public static final String RESERVED = "reserved"; - public static final ColumnFQ RESERVED_COLUMN = new ColumnFQ(NAME, new Text(RESERVED)); + public static final String RESERVATION = "reservation"; + public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION)); } public static class TxInfoColumnFamily { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index a3c1b9cfd90..f6c32caa8e5 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.logging; import java.io.Serializable; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -169,6 +170,26 @@ public Optional> createAndReserve(FateKey fateKey) { } return txStore; } + + @Override + public boolean isReserved(FateId fateId) { + return store.isReserved(fateId); + } + + @Override + public Map getActiveReservations() { + return store.getActiveReservations(); + } + + @Override + public void deleteDeadReservations() { + store.deleteDeadReservations(); + } + + @Override + public boolean isDeadReservation(FateReservation reservation) { + return store.isDeadReservation(reservation); + } }; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index db2d7da7701..3c2c6461626 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -82,6 +82,28 @@ public Optional> tryReserve(FateId fateId) { } } + public boolean isReserved(FateId fateId) { + return reserved.contains(fateId); + } + + @Override + public Map getActiveReservations() { + // This method only makes sense for the FateStores that don't store their reservations in memory + throw new UnsupportedOperationException(); + } + + @Override + public void deleteDeadReservations() { + // This method only makes sense for the FateStores that don't store their reservations in memory + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDeadReservation(FateReservation reservation) { + // This method only makes sense for the FateStores that don't store their reservations in memory + throw new UnsupportedOperationException(); + } + private class TestFateTxStore implements FateTxStore { private final FateId fateId; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 7dbf36ab522..2e6487439d7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import java.io.BufferedWriter; import java.io.File; @@ -62,7 +63,6 @@ import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.TFateId; @@ -772,10 +772,9 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps var zTableLocksPath = ServiceLock.path(zkRoot + Constants.ZTABLE_LOCKS); String fateZkPath = zkRoot + Constants.ZFATE; ZooReaderWriter zk = context.getZooReaderWriter(); - // TODO 4131 dummy lock for now MetaFateStore mfs = - new MetaFateStore<>(fateZkPath, zk, new ZooUtil.LockID("path", "node", 1234)); - UserFateStore ufs = new UserFateStore<>(context); + new MetaFateStore<>(fateZkPath, zk, context.getZooCache(), createDummyLockID()); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID()); Map> fateStores = Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); Map> readOnlyFateStores = diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 866cb81c70d..7424d873c7b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1085,9 +1085,9 @@ boolean canSuspendTablets() { try { var metaInstance = initializeFateInstance(context, new MetaFateStore<>(getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter(), managerLock.getLockID())); + context.getZooReaderWriter(), context.getZooCache(), managerLock.getLockID())); var userInstance = initializeFateInstance(context, - new UserFateStore<>(context, AccumuloTable.FATE.tableName())); + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), managerLock.getLockID())); if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) { @@ -1199,6 +1199,7 @@ private Fate initializeFateInstance(ServerContext context, FateStore fateInstance = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); + fateInstance.startDeadReservationCleaner(); var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), System::nanoTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 3240aef0b6a..0ade4b71cfd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.manager.metrics.fate; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; + import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -27,7 +29,6 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -72,9 +73,8 @@ public FateMetrics(final ServerContext context, final long minimumRefreshDelay) this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); try { - // TODO 4131 dummy lock for now this.fateStore = new MetaFateStore<>(fateRootPath, context.getZooReaderWriter(), - new ZooUtil.LockID("path", "node", 1234)); + context.getZooCache(), createDummyLockID()); } catch (KeeperException ex) { throw new IllegalStateException( "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index a36266e6c9c..3c5f035f943 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -19,6 +19,7 @@ package org.apache.accumulo.manager.upgrade; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING; import static org.apache.accumulo.server.AccumuloDataVersion.REMOVE_DEPRECATIONS_FOR_VERSION_3; import static org.apache.accumulo.server.AccumuloDataVersion.ROOT_TABLET_META_CHANGES; @@ -40,7 +41,6 @@ import org.apache.accumulo.core.conf.ConfigCheckUtil; import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.volume.Volume; @@ -308,10 +308,9 @@ public UpgradeStatus getStatus() { justification = "Want to immediately stop all manager threads on upgrade error") private void abortIfFateTransactions(ServerContext context) { try { - // TODO 4131 dummy lock for now final ReadOnlyFateStore fate = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter(), new ZooUtil.LockID("path", "node", 1234)); + context.getZooReaderWriter(), context.getZooCache(), createDummyLockID()); try (var idStream = fate.list()) { if (idStream.findFirst().isPresent()) { throw new AccumuloException("Aborting upgrade because there are" diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 3f1fe9c55a4..635e810e90f 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.compaction; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; @@ -31,7 +32,6 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; -import static org.apache.accumulo.test.fate.meta.MetaFateIT.createTestLockID; import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles; import static org.apache.accumulo.test.util.FileMetadataUtil.splitFilesIntoRanges; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -234,7 +234,7 @@ public void testExternalCompaction() throws Exception { public void testCompactionCommitAndDeadDetectionRoot() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooReaderWriter(), createTestLockID()); + ctx.getZooReaderWriter(), ctx.getZooCache(), createDummyLockID()); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); @@ -253,7 +253,7 @@ public void testCompactionCommitAndDeadDetectionRoot() throws Exception { public void testCompactionCommitAndDeadDetectionMeta() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooReaderWriter(), createTestLockID()); + ctx.getZooReaderWriter(), ctx.getZooCache(), createDummyLockID()); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { // Metadata table by default already has 2 tablets @@ -275,7 +275,7 @@ public void testCompactionCommitAndDeadDetectionUser() throws Exception { final String tableName = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx); + UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID()); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); @@ -298,9 +298,10 @@ public void testCompactionCommitAndDeadDetectionAll() throws Exception { final String userTable = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx); - FateStore metaFateStore = new MetaFateStore<>( - ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter(), createTestLockID()); + UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID()); + FateStore metaFateStore = + new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter(), + ctx.getZooCache(), createDummyLockID()); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index 00847ad5f78..2bd54477e41 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -492,7 +492,9 @@ protected Fate initializeFate(FateStore store) { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, r -> r + "", config); + Fate fate = new Fate<>(new TestEnv(), store, r -> r + "", config); + fate.startDeadReservationCleaner(); + return fate; } protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 5bddd4f35c1..39fda418243 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -552,7 +552,9 @@ private Fate initializeFate(FateStore store) { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, Object::toString, config); + Fate fate = new Fate<>(new TestEnv(), store, Object::toString, config); + fate.startDeadReservationCleaner(); + return fate; } private boolean wordIsTStatus(String word) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index f5f88fdf05a..378c7bb896e 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -21,21 +21,25 @@ import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.DefaultConfiguration; -import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; @@ -47,19 +51,27 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.util.Wait; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.hadoop.shaded.org.mockito.Mockito; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; // TODO 4131 could potentially have separate classes for testing MetaFateStore and UserFateStore +// similar to how FateTestRunner is used, however that interface doesn't work as nicely here +// since we are using multiple stores instead of just one. Can do something similar to +// FateTestRunner here if desired public class MultipleStoresIT extends SharedMiniClusterBase { + private static final Logger LOG = LoggerFactory.getLogger(MultipleStoresIT.class); @TempDir private static File tempDir; private static ZooKeeperTestingServer szk = null; @@ -96,28 +108,25 @@ public void testReserveUnreserve() throws Exception { testReserveUnreserve(FateInstanceType.USER); } - protected void testReserveUnreserve(FateInstanceType storeType) throws Exception { + private void testReserveUnreserve(FateInstanceType storeType) throws Exception { // reserving/unreserving a FateId should be reflected across instances of the stores - String tableName = getUniqueNames(1)[0]; - List expReservedList = new ArrayList<>(); + final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - List> reservations = new ArrayList<>(); - boolean isUserStore = storeType.equals(FateInstanceType.USER); - Set allIds = new HashSet<>(); - final AbstractFateStore store1, store2; + final List> reservations = new ArrayList<>(); + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final FateStore store1, store2; + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + Map activeReservations; if (isUserStore) { createFateTable(client, tableName); - } - - if (isUserStore) { - store1 = new UserFateStore<>(client, tableName); - store2 = new UserFateStore<>(client, tableName); + store1 = new UserFateStore<>(client, tableName, lock1); + store2 = new UserFateStore<>(client, tableName, lock2); } else { - ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); - ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); - store1 = new MetaFateStore<>(FATE_DIR, zk, lock1); - store2 = new MetaFateStore<>(FATE_DIR, zk, lock2); + store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1); + store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2); } // Create the fate ids using store1 @@ -131,7 +140,6 @@ protected void testReserveUnreserve(FateInstanceType storeType) throws Exception // ids were created using store1 int count = 0; for (FateId fateId : allIds) { - expReservedList.add(fateId); if (count % 2 == 0) { reservations.add(store1.reserve(fateId)); assertTrue(store2.tryReserve(fateId).isEmpty()); @@ -141,11 +149,11 @@ protected void testReserveUnreserve(FateInstanceType storeType) throws Exception } count++; } - // Both stores should return the same list of reserved transactions - assertTrue(expReservedList.containsAll(store1.getReservedTxns()) - && expReservedList.size() == store1.getReservedTxns().size()); - assertTrue(expReservedList.containsAll(store2.getReservedTxns()) - && expReservedList.size() == store2.getReservedTxns().size()); + // Both stores should return the same reserved transactions + activeReservations = store1.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + activeReservations = store2.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); // Test setting/getting the TStatus and unreserving the transactions for (int i = 0; i < allIds.size(); i++) { @@ -156,15 +164,38 @@ protected void testReserveUnreserve(FateInstanceType storeType) throws Exception reservation.delete(); reservation.unreserve(0, TimeUnit.MILLISECONDS); // Attempt to set a status on a tx that has been unreserved (should throw exception) - try { - reservation.setStatus(ReadOnlyFateStore.TStatus.NEW); - fail(); - } catch (Exception e) { - // Expected - } + assertThrows(IllegalStateException.class, + () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW)); } - assertEquals(List.of(), store1.getReservedTxns()); - assertEquals(List.of(), store2.getReservedTxns()); + assertTrue(store1.getActiveReservations().isEmpty()); + assertTrue(store2.getActiveReservations().isEmpty()); + } + + @Test + public void testReserveNonExistentTxn() throws Exception { + testReserveNonExistentTxn(FateInstanceType.META); + testReserveNonExistentTxn(FateInstanceType.USER); + } + + private void testReserveNonExistentTxn(FateInstanceType storeType) throws Exception { + // Tests that reserve() doesn't hang indefinitely and instead throws an error + // on reserve() a non-existent transaction. Tests that tryReserve() will return + // an empty optional on non-existent transaction. + final FateStore store; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final String tableName = getUniqueNames(1)[0]; + final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + + if (isUserStore) { + createFateTable(client, tableName); + store = new UserFateStore<>(client, tableName, lock); + } else { + store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock); + } + + assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId)); + assertTrue(store.tryReserve(fakeFateId).isEmpty()); } @Test @@ -173,24 +204,21 @@ public void testReserveReservedAndUnreserveUnreserved() throws Exception { testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER); } - public void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeType) + private void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeType) throws Exception { - String tableName = getUniqueNames(1)[0]; + final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - boolean isUserStore = storeType.equals(FateInstanceType.USER); - Set allIds = new HashSet<>(); - List> reservations = new ArrayList<>(); - final AbstractFateStore store; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final List> reservations = new ArrayList<>(); + final FateStore store; + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); if (isUserStore) { createFateTable(client, tableName); - } - - if (isUserStore) { - store = new UserFateStore<>(client, tableName); + store = new UserFateStore<>(client, tableName, lock); } else { - ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); - store = new MetaFateStore<>(FATE_DIR, zk, lock); + store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock); } // Create some FateIds and ensure that they can be reserved @@ -214,12 +242,8 @@ public void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeType } // Try to unreserve again (should throw exception) for (var reservation : reservations) { - try { - reservation.unreserve(0, TimeUnit.MILLISECONDS); - fail(); - } catch (Exception e) { - // Expected - } + assertThrows(IllegalStateException.class, + () -> reservation.unreserve(0, TimeUnit.MILLISECONDS)); } } @@ -229,24 +253,21 @@ public void testReserveAfterUnreserveAndReserveAfterDeleted() throws Exception { testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER); } - public void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType) + private void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType) throws Exception { - String tableName = getUniqueNames(1)[0]; + final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - boolean isUserStore = storeType.equals(FateInstanceType.USER); - Set allIds = new HashSet<>(); - List> reservations = new ArrayList<>(); - final AbstractFateStore store; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final List> reservations = new ArrayList<>(); + final FateStore store; + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); if (isUserStore) { createFateTable(client, tableName); - } - - if (isUserStore) { - store = new UserFateStore<>(client, tableName); + store = new UserFateStore<>(client, tableName, lock); } else { - ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); - store = new MetaFateStore<>(FATE_DIR, zk, lock); + store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock); } // Create some FateIds and ensure that they can be reserved @@ -278,12 +299,7 @@ public void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType sto // Verify that the tx is now unknown since it has been deleted assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, store.read(fateId).getStatus()); // Attempt to reserve a deleted txn, should throw an exception and not wait indefinitely - try { - store.reserve(fateId); - fail(); - } catch (Exception e) { - // Expected - } + assertThrows(IllegalStateException.class, () -> store.reserve(fateId)); } } @@ -293,43 +309,42 @@ public void testMultipleFateInstances() throws Exception { testMultipleFateInstances(FateInstanceType.USER); } - public void testMultipleFateInstances(FateInstanceType storeType) throws Exception { - String tableName = getUniqueNames(1)[0]; + private void testMultipleFateInstances(FateInstanceType storeType) throws Exception { + final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - boolean isUserStore = storeType.equals(FateInstanceType.USER); - Set allIds = new HashSet<>(); - final AbstractFateStore store1, store2; - final TestEnv testEnv1 = new TestEnv(); - final TestEnv testEnv2 = new TestEnv(); + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final FateStore store1, store2; + final SleepingTestEnv testEnv1 = new SleepingTestEnv(50); + final SleepingTestEnv testEnv2 = new SleepingTestEnv(50); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); if (isUserStore) { createFateTable(client, tableName); - } - - if (isUserStore) { - store1 = new UserFateStore<>(client, tableName); - store2 = new UserFateStore<>(client, tableName); + store1 = new UserFateStore<>(client, tableName, lock1); + store2 = new UserFateStore<>(client, tableName, lock2); } else { - ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); - ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); - store1 = new MetaFateStore<>(FATE_DIR, zk, lock1); - store2 = new MetaFateStore<>(FATE_DIR, zk, lock2); + store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1); + store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2); } - Fate fate1 = + Fate fate1 = new Fate<>(testEnv1, store1, Object::toString, DefaultConfiguration.getInstance()); - Fate fate2 = + fate1.startDeadReservationCleaner(); + Fate fate2 = new Fate<>(testEnv2, store2, Object::toString, DefaultConfiguration.getInstance()); + fate2.startDeadReservationCleaner(); for (int i = 0; i < numFateIds; i++) { FateId fateId; // Start half the txns using fate1, and the other half using fate2 if (i % 2 == 0) { fateId = fate1.startTransaction(); - fate1.seedTransaction("op" + i, fateId, new TestRepo(), true, "test"); + fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, "test"); } else { fateId = fate2.startTransaction(); - fate2.seedTransaction("op" + i, fateId, new TestRepo(), true, "test"); + fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, "test"); } allIds.add(fateId); } @@ -347,11 +362,124 @@ public void testMultipleFateInstances(FateInstanceType storeType) throws Excepti fate2.shutdown(1, TimeUnit.MINUTES); } - public static class TestRepo implements Repo { + @Test + public void testDeadReservationsCleanup() throws Exception { + testDeadReservationsCleanup(FateInstanceType.META); + testDeadReservationsCleanup(FateInstanceType.USER); + } + + private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exception { + // Tests reserving some transactions, then simulating that the Manager died by creating + // a new Fate instance and store with a new LockID. The transactions which were + // reserved using the old LockID should be cleaned up by Fate's DeadReservationCleaner, + // then picked up by the new Fate/store. + + final String tableName = getUniqueNames(1)[0]; + // One transaction for each FATE worker thread + final int numFateIds = + Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue()); + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final FateStore store1, store2, spyStore1; + final LatchTestEnv testEnv1 = new LatchTestEnv(); + final LatchTestEnv testEnv2 = new LatchTestEnv(); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + Map reservations; + + if (isUserStore) { + createFateTable(client, tableName); + store1 = new UserFateStore<>(client, tableName, lock1); + } else { + store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1); + } + + // Redefine isDeadReservation() for store1 as always being false. We don't want fate1/store1 to + // delete any reservations yet (we are simulating that the Manager is alive right now) + spyStore1 = Mockito.spy(store1); + Mockito.doAnswer(invocation -> false).when(spyStore1) + .isDeadReservation(Mockito.any(FateStore.FateReservation.class)); + + Fate fate1 = + new Fate<>(testEnv1, spyStore1, Object::toString, DefaultConfiguration.getInstance()); + fate1.startDeadReservationCleaner(); + + // Ensure nothing is reserved yet + assertTrue(spyStore1.getActiveReservations().isEmpty()); + + // Create transactions + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + fateId = fate1.startTransaction(); + fate1.seedTransaction("op" + i, fateId, new LatchTestRepo(), true, "test"); + allIds.add(fateId); + } + assertEquals(numFateIds, allIds.size()); + + // Wait for all the fate worker threads to start working on the transactions + Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds); + // Each fate worker will be hung up working (IN_PROGRESS) on a single transaction + + // Verify spyStore1 has the transactions reserved and that they were reserved with lock1 + reservations = spyStore1.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach( + res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + + if (isUserStore) { + store2 = new UserFateStore<>(client, tableName, lock2); + } else { + store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2); + } + + // Simulate what would happen if the Manager using the Fate object (fate1) died. + // ServerLock.isLockHeld(...) would return false for the LockId of the Manager that died + // (in this case, lock1). + + // Redefine what is considered "dead" as those whose locks match lock1 + Mockito.doAnswer(invocation -> { + FateStore.FateReservation reservation = invocation.getArgument(0); + return FateStore.FateReservation.locksAreEqual(reservation.getLockID(), lock1); + }).when(spyStore1).isDeadReservation(Mockito.any(FateStore.FateReservation.class)); + + // Verify store2 can see the reserved transactions even though they were reserved using + // spyStore1 + reservations = store2.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach( + res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + + // Create the new Fate/start the Fate threads (the work finder and the workers). + // The DeadReservationCleaner for fate2 should not run/have no effect since we + // already have a DeadReservationCleaner for storeType running from fate1. + Fate fate2 = + new Fate<>(testEnv2, store2, Object::toString, DefaultConfiguration.getInstance()); + fate2.startDeadReservationCleaner(); + + // Wait for the "dead" reservations to be deleted and picked up again (reserved using + // fate2/store2/lock2 now). + // They are considered "dead" if they are held by lock1 in this test. We don't have to worry + // about fate1/spyStore1/lock1 being used to reserve the transactions again since all + // the workers for fate1 are hung up + Wait.waitFor(() -> { + Map store2Reservations = store2.getActiveReservations(); + boolean allReservedWithLock2 = store2Reservations.values().stream() + .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); + return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; + }, 60_000); + + // Finish work and shutdown + testEnv1.workersLatch.countDown(); + testEnv2.workersLatch.countDown(); + fate1.shutdown(1, TimeUnit.MINUTES); + fate2.shutdown(1, TimeUnit.MINUTES); + } + + public static class SleepingTestRepo implements Repo { private static final long serialVersionUID = 1L; @Override - public long isReady(FateId fateId, TestEnv environment) { + public long isReady(FateId fateId, SleepingTestEnv environment) { return 0; } @@ -361,14 +489,17 @@ public String getName() { } @Override - public Repo call(FateId fateId, TestEnv environment) throws Exception { + public Repo call(FateId fateId, SleepingTestEnv environment) throws Exception { environment.executedOps.add(fateId); - Thread.sleep(50); // Simulate some work + LOG.debug("Thread " + Thread.currentThread() + " in SleepingTestRepo.call() sleeping for " + + environment.sleepTimeMs + " millis"); + Thread.sleep(environment.sleepTimeMs); // Simulate some work + LOG.debug("Thread " + Thread.currentThread() + " finished SleepingTestRepo.call()"); return null; } @Override - public void undo(FateId fateId, TestEnv environment) { + public void undo(FateId fateId, SleepingTestEnv environment) { } @@ -378,7 +509,51 @@ public String getReturn() { } } - public static class TestEnv { + public static class SleepingTestEnv { public final Set executedOps = Collections.synchronizedSet(new HashSet<>()); + public final int sleepTimeMs; + + public SleepingTestEnv(int sleepTimeMs) { + this.sleepTimeMs = sleepTimeMs; + } + } + + public static class LatchTestRepo implements Repo { + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId fateId, LatchTestEnv environment) { + return 0; + } + + @Override + public String getName() { + return null; + } + + @Override + public Repo call(FateId fateId, LatchTestEnv environment) throws Exception { + LOG.debug("Thread " + Thread.currentThread() + " in LatchTestRepo.call()"); + environment.numWorkers.incrementAndGet(); + environment.workersLatch.await(); + LOG.debug("Thread " + Thread.currentThread() + " finished LatchTestRepo.call()"); + environment.numWorkers.decrementAndGet(); + return null; + } + + @Override + public void undo(FateId fateId, LatchTestEnv environment) { + + } + + @Override + public String getReturn() { + return null; + } + } + + public static class LatchTestEnv { + public final AtomicInteger numWorkers = new AtomicInteger(0); + public final CountDownLatch workersLatch = new CountDownLatch(1); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index ddb30d253a5..37aecd358dd 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; @@ -34,7 +35,6 @@ import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; @@ -76,14 +76,10 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute(new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, createTestLockID(), + testMethod.execute(new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, null, createDummyLockID(), maxDeferred, fateIdGenerator), sctx); } - public static ZooUtil.LockID createTestLockID() { - return new ZooUtil.LockID("S1", "N1", 1234); - } - @Override protected TStatus getTxStatus(ServerContext sctx, FateId fateId) { try { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 70c9bdccc6b..84cfbb7d1f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.test.fate.meta.MetaFateIT.createTestLockID; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; @@ -34,6 +34,7 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, ServerContext sctx = getCluster().getServerContext(); String path = sctx.getZooKeeperRoot() + Constants.ZFATE; ZooReaderWriter zk = sctx.getZooReaderWriter(); - testMethod.execute(new MetaFateStore<>(path, zk, createTestLockID()), sctx); + testMethod.execute(new MetaFateStore<>(path, zk, sctx.getZooCache(), createDummyLockID()), + sctx); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index c21c387275d..9ad75f84911 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; -import static org.apache.accumulo.test.fate.meta.MetaFateIT.createTestLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -29,11 +29,13 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Optional; import java.util.UUID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -76,8 +78,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - MetaFateStore store = new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, - createTestLockID(), maxDeferred, fateIdGenerator); + MetaFateStore store = new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, null, + createDummyLockID(), maxDeferred, fateIdGenerator); // Check that the store has no transactions before and after each test assertEquals(0, store.list().count()); @@ -90,39 +92,53 @@ protected void deleteKey(FateId fateId, ServerContext sctx) { try { // We have to use reflection since the NodeValue is internal to the store - // Grab both the constructors that use the serialized bytes and status, lock, uuid + // Grab both the constructors that use the serialized bytes and status, reservation Class nodeClass = Class.forName(MetaFateStore.class.getName() + "$NodeValue"); - Constructor statusLockUUIDCons = - nodeClass.getDeclaredConstructor(TStatus.class, String.class, String.class); + Constructor statusReservationCons = + nodeClass.getDeclaredConstructor(TStatus.class, FateStore.FateReservation.class); Constructor serializedCons = nodeClass.getDeclaredConstructor(byte[].class); - statusLockUUIDCons.setAccessible(true); + statusReservationCons.setAccessible(true); serializedCons.setAccessible(true); - // Get the status, lock, and uuid fields so they can be read and the serialize method + // Get the status and reservation fields so they can be read and get the serialize method Field nodeStatus = nodeClass.getDeclaredField("status"); - Field nodeLock = nodeClass.getDeclaredField("lockID"); - Field nodeUUID = nodeClass.getDeclaredField("uuid"); + Field nodeReservation = nodeClass.getDeclaredField("reservation"); Method nodeSerialize = nodeClass.getDeclaredMethod("serialize"); nodeStatus.setAccessible(true); - nodeLock.setAccessible(true); - nodeUUID.setAccessible(true); + nodeReservation.setAccessible(true); nodeSerialize.setAccessible(true); - // Get the existing status, lock, and uuid for the node and build a new node with an empty key + // Get the existing status and reservation for the node and build a new node with an empty key // but uses the existing tid String txPath = ZK_ROOT + Constants.ZFATE + "/tx_" + fateId.getTxUUIDStr(); Object currentNode = serializedCons.newInstance(new Object[] {zk.getData(txPath)}); TStatus currentStatus = (TStatus) nodeStatus.get(currentNode); - String currentLock = (String) nodeLock.get(currentNode); - String currentUUID = (String) nodeUUID.get(currentNode); - // replace the node with no key and just a tid and existing status, lock, and uuid - Object newNode = statusLockUUIDCons.newInstance(currentStatus, currentLock, currentUUID); + Optional currentReservation = + getCurrentReservation(nodeReservation, currentNode); + // replace the node with no key and just a tid and existing status and reservation + Object newNode = + statusReservationCons.newInstance(currentStatus, currentReservation.orElse(null)); - // Replace the transaction with the same status, lock, and uuid and no key + // Replace the transaction with the same status and reservation but no key zk.putPersistentData(txPath, (byte[]) nodeSerialize.invoke(newNode), NodeExistsPolicy.OVERWRITE); } catch (Exception e) { throw new IllegalStateException(e); } } + + private Optional getCurrentReservation(Field nodeReservation, + Object currentNode) throws Exception { + Object currentResAsObject = nodeReservation.get(currentNode); + Optional currentReservation = Optional.empty(); + if (currentResAsObject instanceof Optional) { + Optional currentResAsOptional = (Optional) currentResAsObject; + if (currentResAsOptional.isPresent() + && currentResAsOptional.orElseThrow() instanceof FateStore.FateReservation) { + currentReservation = + Optional.of((FateStore.FateReservation) currentResAsOptional.orElseThrow()); + } + } + return currentReservation; + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java index aaa9be01f16..0a1e6ae4d03 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java @@ -33,8 +33,11 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.user.FateMutator; import org.apache.accumulo.core.fate.user.FateMutatorImpl; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.test.fate.FateIT; import org.junit.jupiter.api.AfterAll; @@ -169,6 +172,100 @@ public void requireStatus() throws Exception { } + @Test + public void testReservations() throws Exception { + final String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table, ntc); + + ClientContext context = (ClientContext) client; + + FateId fateId = + FateId.from(FateInstanceType.fromNamespaceOrTableName(table), UUID.randomUUID()); + ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50); + FateStore.FateReservation reservation = + FateStore.FateReservation.from(lockID, UUID.randomUUID()); + FateStore.FateReservation wrongReservation = + FateStore.FateReservation.from(lockID, UUID.randomUUID()); + + // Ensure that we cannot do anything in the column until it is initialized + FateMutator.Status status = + new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).requireReserved(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireReserved().tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireUnreserved().tryMutate(); + assertEquals(REJECTED, status); + + // Initialize the column and ensure we can't do it twice + status = new FateMutatorImpl<>(context, table, fateId).putInitReserveColVal().tryMutate(); + assertEquals(ACCEPTED, status); + status = new FateMutatorImpl<>(context, table, fateId).putInitReserveColVal().tryMutate(); + assertEquals(REJECTED, status); + + // Ensure that reserving is the only thing we can do + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).requireReserved(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireReserved().tryMutate(); + assertEquals(REJECTED, status); + // It is considered unreserved since it has been initialized + status = new FateMutatorImpl<>(context, table, fateId).requireUnreserved().tryMutate(); + assertEquals(ACCEPTED, status); + + // Should be able to reserve + status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + assertEquals(ACCEPTED, status); + + // Ensure that it is reserved + status = + new FateMutatorImpl<>(context, table, fateId).requireReserved(reservation).tryMutate(); + assertEquals(ACCEPTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireReserved(wrongReservation) + .tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireReserved().tryMutate(); + assertEquals(ACCEPTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireUnreserved().tryMutate(); + assertEquals(REJECTED, status); + // Should not be able to reserve when it is already reserved + status = + new FateMutatorImpl<>(context, table, fateId).putReservedTx(wrongReservation).tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + + // Should be able to unreserve + status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(wrongReservation) + .tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(ACCEPTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + + // Ensure that it is unreserved + status = + new FateMutatorImpl<>(context, table, fateId).requireReserved(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireReserved().tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).requireUnreserved().tryMutate(); + assertEquals(ACCEPTED, status); + } + } + void logAllEntriesInTable(String tableName, AccumuloClient client) throws Exception { client.createScanner(tableName).forEach(e -> log.info(e.getKey() + " " + e.getValue())); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 4d6a8da1188..49d310cc1e5 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import java.util.stream.StreamSupport; @@ -60,7 +61,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, maxDeferred, fateIdGenerator), + testMethod.execute( + new UserFateStore<>(client, table, createDummyLockID(), maxDeferred, fateIdGenerator), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java index fde93de3460..db3341ecfa3 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; + import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.test.fate.FateOpsCommandsIT; @@ -26,7 +28,7 @@ public class UserFateOpsCommandsIT extends FateOpsCommandsIT { @Override public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - testMethod.execute(new UserFateStore<>(getCluster().getServerContext()), + testMethod.execute(new UserFateStore<>(getCluster().getServerContext(), createDummyLockID()), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java index 2a3248f7671..8cb66cb43b8 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; @@ -56,7 +57,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, maxDeferred, fateIdGenerator), + testMethod.execute( + new UserFateStore<>(client, table, createDummyLockID(), maxDeferred, fateIdGenerator), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index 129d5632568..7efbe676b2a 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -86,7 +87,7 @@ private static class TestUserFateStore extends UserFateStore { // use the list of fateIds to simulate collisions on fateIds public TestUserFateStore(ClientContext context, String tableName, List fateIds) { - super(context, tableName); + super(context, tableName, createDummyLockID()); this.fateIdIterator = fateIds.iterator(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 210220b8cb1..7153d2a1248 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -21,7 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.test.fate.meta.MetaFateIT.createTestLockID; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -263,11 +263,12 @@ public void getFateStatus() { InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - MetaFateStore mfs = new MetaFateStore<>( - ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createTestLockID()); + MetaFateStore mfs = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, + context.getZooCache(), createDummyLockID()); var lockPath = ServiceLock.path(ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId); - UserFateStore ufs = new UserFateStore<>(context); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID()); Map> fateStores = Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); @@ -358,7 +359,7 @@ private boolean lookupFateInZookeeper(final String tableName) throws KeeperExcep InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); MetaFateStore mfs = new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, - zk, createTestLockID()); + zk, context.getZooCache(), createDummyLockID()); var lockPath = ServiceLock.path(ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId); AdminUtil.FateStatus fateStatus = admin.getStatus(mfs, zk, lockPath, null, null, null); @@ -388,8 +389,8 @@ private boolean lookupFateInAccumulo(final String tableName) throws KeeperExcept log.trace("tid: {}", tableId); - UserFateStore as = new UserFateStore<>(context); - AdminUtil.FateStatus fateStatus = admin.getStatus(as, null, null, null); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID()); + AdminUtil.FateStatus fateStatus = admin.getStatus(ufs, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 825d12ed3d9..bb3d29fc6bf 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -19,8 +19,8 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; -import static org.apache.accumulo.test.fate.meta.MetaFateIT.createTestLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -232,8 +232,8 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) { AdminUtil admin = new AdminUtil<>(false); ServerContext context = cluster.getServerContext(); ZooReaderWriter zk = context.getZooReaderWriter(); - MetaFateStore mfs = - new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk, createTestLockID()); + MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, + zk, context.getZooCache(), createDummyLockID()); var lockPath = ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS); return admin.getStatus(mfs, zk, lockPath, null, null, null); } catch (KeeperException | InterruptedException e) {