diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 51670aacfda..055d721bcbe 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate.TxInfo; @@ -55,8 +55,8 @@ public abstract class AbstractFateStore implements FateStore { // all existing transactions are processed immediately again protected static final int DEFAULT_MAX_DEFERRED = 100_000; - protected final Set reserved; - protected final Map deferred; + protected final Set reserved; + protected final Map deferred; private final int maxDeferred; private final AtomicBoolean deferredOverflow = new AtomicBoolean(); @@ -101,26 +101,26 @@ public static Object deserialize(byte[] ser) { } /** - * Attempt to reserve transaction + * Attempt to reserve the fate transaction. * - * @param tid transaction id + * @param fateId The FateId * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or * an empty Optional if the transaction was already reserved. */ @Override - public Optional> tryReserve(long tid) { - synchronized (AbstractFateStore.this) { - if (!reserved.contains(tid)) { - return Optional.of(reserve(tid)); + public Optional> tryReserve(FateId fateId) { + synchronized (this) { + if (!reserved.contains(fateId)) { + return Optional.of(reserve(fateId)); } return Optional.empty(); } } @Override - public FateTxStore reserve(long tid) { + public FateTxStore reserve(FateId fateId) { synchronized (AbstractFateStore.this) { - while (reserved.contains(tid)) { + while (reserved.contains(fateId)) { try { AbstractFateStore.this.wait(100); } catch (InterruptedException e) { @@ -129,13 +129,13 @@ public FateTxStore reserve(long tid) { } } - reserved.add(tid); - return newFateTxStore(tid, true); + reserved.add(fateId); + return newFateTxStore(fateId, true); } } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { AtomicLong seen = new AtomicLong(0); @@ -145,21 +145,21 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { try (Stream transactions = getTransactions()) { transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus())) - .mapToLong(FateIdStatus::getTxid).filter(txid -> { + .map(FateIdStatus::getFateId).filter(fateId -> { synchronized (AbstractFateStore.this) { - var deferredTime = deferred.get(txid); + var deferredTime = deferred.get(fateId); if (deferredTime != null) { if ((deferredTime - System.nanoTime()) >= 0) { return false; } else { - deferred.remove(txid); + deferred.remove(fateId); } } - return !reserved.contains(txid); + return !reserved.contains(fateId); } - }).forEach(txid -> { + }).forEach(fateId -> { seen.incrementAndGet(); - idConsumer.accept(txid); + idConsumer.accept(fateId); }); } @@ -202,8 +202,8 @@ public Stream list() { } @Override - public ReadOnlyFateTxStore read(long tid) { - return newFateTxStore(tid, false); + public ReadOnlyFateTxStore read(FateId fateId) { + return newFateTxStore(fateId, false); } protected boolean isRunnable(TStatus status) { @@ -211,20 +211,16 @@ protected boolean isRunnable(TStatus status) { || status == TStatus.SUBMITTED; } - protected long parseTid(String txdir) { - return Long.parseLong(txdir.split("_")[1], 16); - } - public static abstract class FateIdStatusBase implements FateIdStatus { - private final long txid; + private final FateId fateId; - public FateIdStatusBase(long txid) { - this.txid = txid; + public FateIdStatusBase(FateId fateId) { + this.fateId = fateId; } @Override - public long getTxid() { - return txid; + public FateId getFateId() { + return fateId; } } @@ -245,30 +241,30 @@ public int getDeferredCount() { protected abstract Stream getTransactions(); - protected abstract TStatus _getStatus(long tid); + protected abstract TStatus _getStatus(FateId fateId); - protected abstract FateTxStore newFateTxStore(long tid, boolean isReserved); + protected abstract FateTxStore newFateTxStore(FateId fateId, boolean isReserved); protected abstract class AbstractFateTxStoreImpl implements FateTxStore { - protected final long tid; + protected final FateId fateId; protected final boolean isReserved; protected TStatus observedStatus = null; - protected AbstractFateTxStoreImpl(long tid, boolean isReserved) { - this.tid = tid; + protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) { + this.fateId = fateId; this.isReserved = isReserved; } @Override public TStatus waitForStatusChange(EnumSet expected) { Preconditions.checkState(!isReserved, - "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); + "Attempted to wait for status change while reserved " + fateId); while (true) { long countBefore = unreservedNonNewCount.getCount(); - TStatus status = _getStatus(tid); + TStatus status = _getStatus(fateId); if (expected.contains(status)) { return status; } @@ -286,9 +282,8 @@ public void unreserve(long deferTime, TimeUnit timeUnit) { } synchronized (AbstractFateStore.this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); + if (!reserved.remove(fateId)) { + throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId); } // notify any threads waiting to reserve @@ -306,7 +301,7 @@ public void unreserve(long deferTime, TimeUnit timeUnit) { deferredOverflow.set(true); deferred.clear(); } else { - deferred.put(tid, System.nanoTime() + deferTime); + deferred.put(fateId, System.nanoTime() + deferTime); } } } @@ -327,9 +322,8 @@ protected void verifyReserved(boolean isWrite) { if (isReserved) { synchronized (AbstractFateStore.this) { - if (!reserved.contains(tid)) { - throw new IllegalStateException( - "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); + if (!reserved.contains(fateId)) { + throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId); } } } @@ -338,14 +332,14 @@ protected void verifyReserved(boolean isWrite) { @Override public TStatus getStatus() { verifyReserved(false); - var status = _getStatus(tid); + var status = _getStatus(fateId); observedStatus = status; return status; } @Override - public long getID() { - return tid; + public FateId getID() { + return fateId; } protected byte[] serializeTxInfo(Serializable so) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 85bc34141ce..2a436b34445 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -369,20 +369,22 @@ private FateStatus getTransactionStatus(Map statuses = new ArrayList<>(); fateStores.forEach((type, store) -> { - try (Stream tids = store.list().map(FateIdStatus::getTxid)) { - tids.forEach(tid -> { + try (Stream fateIds = store.list().map(FateIdStatus::getFateId)) { + fateIds.forEach(fateId -> { - ReadOnlyFateTxStore txStore = store.read(tid); + ReadOnlyFateTxStore txStore = store.read(fateId); String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); - List hlocks = heldLocks.remove(tid); + // ELASTICITY_TODO DEFERRED - ISSUE 4044 + List hlocks = heldLocks.remove(fateId.getTid()); if (hlocks == null) { hlocks = Collections.emptyList(); } - List wlocks = waitingLocks.remove(tid); + // ELASTICITY_TODO DEFERRED - ISSUE 4044 + List wlocks = waitingLocks.remove(fateId.getTid()); if (wlocks == null) { wlocks = Collections.emptyList(); @@ -398,9 +400,10 @@ private FateStatus getTransactionStatus(Map zs, ZooReaderWriter zk, ServiceLockPath p return false; } boolean state = false; - FateTxStore txStore = zs.reserve(txid); + // ELASTICITY_TODO DEFERRED - ISSUE 4044 + FateId fateId = FateId.from(FateInstanceType.META, txid); + FateTxStore txStore = zs.reserve(fateId); try { TStatus ts = txStore.getStatus(); switch (ts) { @@ -500,7 +505,9 @@ public boolean prepFail(FateStore zs, ZooReaderWriter zk, ServiceLockPath zLo return false; } boolean state = false; - FateTxStore txStore = zs.reserve(txid); + // ELASTICITY_TODO DEFERRED - ISSUE 4044 + FateId fateId = FateId.from(FateInstanceType.META, txid); + FateTxStore txStore = zs.reserve(fateId); try { TStatus ts = txStore.getStatus(); switch (ts) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 55845078475..938b76ef4c2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -73,7 +73,7 @@ public class Fate { private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); private final AtomicBoolean keepRunning = new AtomicBoolean(true); - private final TransferQueue workQueue; + private final TransferQueue workQueue; private final Thread workFinder; public enum TxInfo { @@ -90,7 +90,7 @@ private class WorkFinder implements Runnable { public void run() { while (keepRunning.get()) { try { - store.runnable(keepRunning, txid -> { + store.runnable(keepRunning, fateId -> { while (keepRunning.get()) { try { // The reason for calling transfer instead of queueing is avoid rescanning the @@ -98,7 +98,7 @@ public void run() { // were busy, the queue size was 100, and there are three runnable things in the // store. Do not want to keep scanning the store adding those same 3 runnable things // until the queue is full. - if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) { + if (workQueue.tryTransfer(fateId, 100, MILLISECONDS)) { break; } } catch (InterruptedException e) { @@ -124,12 +124,12 @@ private class TransactionRunner implements Runnable { private Optional> reserveFateTx() throws InterruptedException { while (keepRunning.get()) { - var unreservedTid = workQueue.poll(100, MILLISECONDS); + FateId unreservedFateId = workQueue.poll(100, MILLISECONDS); - if (unreservedTid == null) { + if (unreservedFateId == null) { continue; } - var optionalopStore = store.tryReserve(unreservedTid); + var optionalopStore = store.tryReserve(unreservedFateId); if (optionalopStore.isPresent()) { return optionalopStore; } @@ -157,7 +157,7 @@ public void run() { } else if (status == SUBMITTED || status == IN_PROGRESS) { Repo prevOp = null; try { - deferTime = op.isReady(txStore.getID(), environment); + deferTime = op.isReady(txStore.getID().getTid(), environment); // Here, deferTime is only used to determine success (zero) or failure (non-zero), // proceeding on success and returning to the while loop on failure. @@ -167,7 +167,7 @@ public void run() { if (status == SUBMITTED) { txStore.setStatus(IN_PROGRESS); } - op = op.call(txStore.getID(), environment); + op = op.call(txStore.getID().getTid(), environment); } else { continue; } @@ -214,18 +214,17 @@ public void run() { * transaction just wait for process to die. When the manager start elsewhere the FATE * transaction can resume. */ - private void blockIfHadoopShutdown(long tid, Exception e) { + private void blockIfHadoopShutdown(FateId fateId, Exception e) { if (ShutdownUtil.isShutdownInProgress()) { - String tidStr = FateTxId.formatTid(tid); if (e instanceof AcceptableException) { - log.debug("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", tidStr, e); + log.debug("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e); } else if (isIOException(e)) { - log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", tidStr, e); + log.info("Ignoring exception likely caused by Hadoop Shutdown hook. {} ", fateId, e); } else { // sometimes code will catch an IOException caused by the hadoop shutdown hook and throw // another exception without setting the cause. - log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", tidStr, e); + log.warn("Ignoring exception possibly caused by Hadoop Shutdown hook. {} ", fateId, e); } while (true) { @@ -237,8 +236,7 @@ private void blockIfHadoopShutdown(long tid, Exception e) { } private void transitionToFailed(FateTxStore txStore, Exception e) { - String tidStr = FateTxId.formatTid(txStore.getID()); - final String msg = "Failed to execute Repo " + tidStr; + final String msg = "Failed to execute Repo " + txStore.getID(); // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor // as a warning. They're a normal, handled failure condition. if (e instanceof AcceptableException) { @@ -250,7 +248,7 @@ private void transitionToFailed(FateTxStore txStore, Exception e) { } txStore.setTransactionInfo(TxInfo.EXCEPTION, e); txStore.setStatus(FAILED_IN_PROGRESS); - log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", tidStr); + log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", txStore.getID()); } private void processFailed(FateTxStore txStore, Repo op) { @@ -278,11 +276,11 @@ private void doCleanUp(FateTxStore txStore) { } } - private void undo(long tid, Repo op) { + private void undo(FateId fateId, Repo op) { try { - op.undo(tid, environment); + op.undo(fateId.getTid(), environment); } catch (Exception e) { - log.warn("Failed to undo Repo, " + FateTxId.formatTid(tid), e); + log.warn("Failed to undo Repo, " + fateId, e); } } @@ -332,20 +330,20 @@ public Fate(T environment, FateStore store, Function,String> toLogStr } // get a transaction id back to the requester before doing any work - public long startTransaction() { + public FateId startTransaction() { return store.create(); } // start work in the transaction.. it is safe to call this // multiple times for a transaction... but it will only seed once - public void seedTransaction(String txName, long tid, Repo repo, boolean autoCleanUp, + public void seedTransaction(String txName, FateId fateId, Repo repo, boolean autoCleanUp, String goalMessage) { - FateTxStore txStore = store.reserve(tid); + FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() == NEW) { if (txStore.top() == null) { try { - log.info("Seeding {} {}", FateTxId.formatTid(tid), goalMessage); + log.info("Seeding {} {}", fateId, goalMessage); txStore.push(repo); } catch (StackOverflowException e) { // this should not happen @@ -368,21 +366,20 @@ public void seedTransaction(String txName, long tid, Repo repo, boolean autoC } // check on the transaction - public TStatus waitForCompletion(long tid) { - return store.read(tid).waitForStatusChange(FINISHED_STATES); + public TStatus waitForCompletion(FateId fateId) { + return store.read(fateId).waitForStatusChange(FINISHED_STATES); } /** * Attempts to cancel a running Fate transaction * - * @param tid transaction id + * @param fateId fate transaction id * @return true if transaction transitioned to a failed state or already in a completed state, * false otherwise */ - public boolean cancel(long tid) { - String tidStr = FateTxId.formatTid(tid); + public boolean cancel(FateId fateId) { for (int retries = 0; retries < 5; retries++) { - Optional> optionalTxStore = store.tryReserve(tid); + Optional> optionalTxStore = store.tryReserve(fateId); if (optionalTxStore.isPresent()) { var txStore = optionalTxStore.orElseThrow(); try { @@ -393,10 +390,10 @@ public boolean cancel(long tid) { TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user")); txStore.setStatus(FAILED_IN_PROGRESS); log.info("Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", - tidStr); + fateId); return true; } else { - log.info("{} cancelled by user but already in progress or finished state", tidStr); + log.info("{} cancelled by user but already in progress or finished state", fateId); return false; } } finally { @@ -407,13 +404,13 @@ public boolean cancel(long tid) { UtilWaitThread.sleep(500); } } - log.info("Unable to reserve transaction {} to cancel it", tid); + log.info("Unable to reserve transaction {} to cancel it", fateId); return false; } // resource cleanup - public void delete(long tid) { - FateTxStore txStore = store.reserve(tid); + public void delete(FateId fateId) { + FateTxStore txStore = store.reserve(fateId); try { switch (txStore.getStatus()) { case NEW: @@ -424,8 +421,7 @@ public void delete(long tid) { break; case FAILED_IN_PROGRESS: case IN_PROGRESS: - throw new IllegalStateException( - "Can not delete in progress transaction " + FateTxId.formatTid(tid)); + throw new IllegalStateException("Can not delete in progress transaction " + fateId); case UNKNOWN: // nothing to do, it does not exist break; @@ -435,12 +431,12 @@ public void delete(long tid) { } } - public String getReturn(long tid) { - FateTxStore txStore = store.reserve(tid); + public String getReturn(FateId fateId) { + FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() != SUCCESSFUL) { - throw new IllegalStateException("Tried to get exception when transaction " - + FateTxId.formatTid(tid) + " not in successful state"); + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in successful state"); } return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE); } finally { @@ -449,12 +445,12 @@ public String getReturn(long tid) { } // get reportable failures - public Exception getException(long tid) { - FateTxStore txStore = store.reserve(tid); + public Exception getException(FateId fateId) { + FateTxStore txStore = store.reserve(fateId); try { if (txStore.getStatus() != FAILED) { - throw new IllegalStateException("Tried to get exception when transaction " - + FateTxId.formatTid(tid) + " not in failed state"); + throw new IllegalStateException( + "Tried to get exception when transaction " + fateId + " not in failed state"); } return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION); } finally { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java index 54b349858d1..4e1beb1b9b2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java @@ -105,7 +105,7 @@ private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) { public void ageOff() { store.list().filter(ids -> AGE_OFF_STATUSES.contains(ids.getStatus())) - .forEach(idStatus -> store.tryReserve(idStatus.getTxid()).ifPresent(txStore -> { + .forEach(idStatus -> store.tryReserve(idStatus.getFateId()).ifPresent(txStore -> { try { AgeOffInfo ageOffInfo = readAgeOffInfo(txStore); TStatus currStatus = txStore.getStatus(); @@ -116,11 +116,10 @@ public void ageOff() { var newAgeOffInfo = new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), currStatus); txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, newAgeOffInfo.toString()); - log.trace("Set age off data {} {}", FateTxId.formatTid(idStatus.getTxid()), - newAgeOffInfo); + log.trace("Set age off data {} {}", idStatus.getFateId(), newAgeOffInfo); } else if (shouldAgeOff(currStatus, ageOffInfo)) { txStore.delete(); - log.debug("Aged off FATE tx {}", FateTxId.formatTid(idStatus.getTxid())); + log.debug("Aged off FATE tx {}", idStatus.getFateId()); } } finally { txStore.unreserve(0, TimeUnit.MILLISECONDS); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java new file mode 100644 index 00000000000..90e87c67d56 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.util.regex.Pattern; + +import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.manager.thrift.TFateId; +import org.apache.accumulo.core.util.FastFormat; + +/** + * A strongly typed FATE Transaction ID. This is used to uniquely identify a FATE transaction. + * Consists of its {@link FateInstanceType} and its transaction id (long). The canonical string is + * of the form "FATE:[FateInstanceType]:[hex long tid]" (without the brackets). + */ +public class FateId extends AbstractId { + + private static final long serialVersionUID = 1L; + private static final String PREFIX = "FATE:"; + private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$"); + + private FateId(String canonical) { + super(canonical); + } + + /** + * @return the {@link FateInstanceType} + */ + public FateInstanceType getType() { + return FateInstanceType.valueOf(canonical().split(":")[1]); + } + + /** + * @return the decimal value of the transaction id + */ + public long getTid() { + return Long.parseLong(getHexTid(), 16); + } + + /** + * @return the hexadecimal value of the transaction id + */ + public String getHexTid() { + return canonical().split(":")[2]; + } + + /** + * Creates a new FateId object from the given parameters + * + * @param type the {@link FateInstanceType} + * @param tid the decimal transaction id + * @return a new FateId object + */ + public static FateId from(FateInstanceType type, long tid) { + return new FateId(PREFIX + type + ":" + formatTid(tid)); + } + + /** + * Creates a new FateId object from the given parameters + * + * @param type the {@link FateInstanceType} + * @param hexTid the hexadecimal transaction id + * @return a new FateId object + */ + public static FateId from(FateInstanceType type, String hexTid) { + if (HEX_PATTERN.matcher(hexTid).matches()) { + return new FateId(PREFIX + type + ":" + hexTid); + } else { + throw new IllegalArgumentException("Invalid Hex Transaction ID: " + hexTid); + } + } + + public static FateId fromThrift(TFateId tFateId) { + FateInstanceType type; + long tid = tFateId.getTid(); + + switch (tFateId.getType()) { + case USER: + type = FateInstanceType.USER; + break; + case META: + type = FateInstanceType.META; + break; + default: + throw new IllegalArgumentException("Invalid TFateInstanceType: " + tFateId.getType()); + } + + return new FateId(PREFIX + type + ":" + formatTid(tid)); + } + + /** + * Formats transaction ids in a consistent way that is useful for logging and persisting. + */ + public static String formatTid(long tid) { + // do not change how this formats without considering implications for persistence + return FastFormat.toHexString(tid); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index b5ccae52684..d8495906e3a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -25,19 +25,19 @@ /** * Transaction Store: a place to save transactions * - * A transaction consists of a number of operations. To use, first create a transaction id, and then - * seed the transaction with an initial operation. An executor service can then execute the + * A transaction consists of a number of operations. To use, first create a fate transaction id, and + * then seed the transaction with an initial operation. An executor service can then execute the * transaction's operation, possibly pushing more operations onto the transaction as each step * successfully completes. If a step fails, the stack can be unwound, undoing each operation. */ public interface FateStore extends ReadOnlyFateStore { /** - * Create a new transaction id + * Create a new fate transaction id * - * @return a transaction id + * @return a new FateId */ - long create(); + FateId create(); /** * An interface that allows read/write access to the data related to a single fate operation. @@ -86,27 +86,27 @@ interface FateTxStore extends ReadOnlyFateTxStore { * longer interact with it. * * @param deferTime time in millis to keep this transaction from being returned by - * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.LongConsumer)}. + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, java.util.function.Consumer)}. * Must be non-negative. */ void unreserve(long deferTime, TimeUnit timeUnit); } /** - * Attempt to reserve transaction + * Attempt to reserve the fate transaction. * - * @param tid transaction id + * @param fateId The FateId * @return true if reserved by this call, false if already reserved */ - Optional> tryReserve(long tid); + Optional> tryReserve(FateId fateId); /** - * Reserve the specific tid. + * Reserve the fate transaction. * - * Reserving a transaction id ensures that nothing else in-process interacting via the same - * instance will be operating on that transaction id. + * Reserving a fate transaction ensures that nothing else in-process interacting via the same + * instance will be operating on that fate transaction. * */ - FateTxStore reserve(long tid); + FateTxStore reserve(FateId fateId); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index 4ddf9afae5c..6be4e76506b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -22,7 +22,7 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Stream; /** @@ -56,7 +56,7 @@ enum TStatus { /** * Reads the data related to fate transaction without reserving it. */ - ReadOnlyFateTxStore read(long tid); + ReadOnlyFateTxStore read(FateId fateId); /** * Storage for an individual fate transaction @@ -115,11 +115,11 @@ interface ReadOnlyFateTxStore { /** * @return the id of the FATE transaction */ - long getID(); + FateId getID(); } interface FateIdStatus { - long getTxid(); + FateId getFateId(); TStatus getStatus(); } @@ -137,7 +137,7 @@ interface FateIdStatus { * is found or until the keepWaiting parameter is false. It will return once all runnable ids * found were passed to the consumer. */ - void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer); + void runnable(AtomicBoolean keepWaiting, Consumer idConsumer); /** * Returns true if the deferred map was cleared and if deferred executions are currently disabled diff --git a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index 1d8c7126c22..de103c7902b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@ -86,7 +86,7 @@ public long timeCreated() { } @Override - public long getID() { + public FateId getID() { return wrapped.getID(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index fb8f7ee7ed4..99c47a5624d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.util.FastFormat; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -49,11 +48,12 @@ public class ZooStore extends AbstractFateStore { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); + private static final FateInstanceType fateInstanceType = FateInstanceType.META; private String path; private ZooReaderWriter zk; - private String getTXPath(long tid) { - return FastFormat.toHexString(path + "/tx_", tid, ""); + private String getTXPath(FateId fateId) { + return path + "/tx_" + fateId.getHexTid(); } public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, InterruptedException { @@ -75,14 +75,15 @@ public ZooStore(String path, ZooReaderWriter zk, int maxDeferred) ZooStore() {} @Override - public long create() { + public FateId create() { while (true) { try { // looking at the code for SecureRandom, it appears to be thread safe long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; - zk.putPersistentData(getTXPath(tid), TStatus.NEW.name().getBytes(UTF_8), + FateId fateId = FateId.from(fateInstanceType, tid); + zk.putPersistentData(getTXPath(fateId), TStatus.NEW.name().getBytes(UTF_8), NodeExistsPolicy.FAIL); - return tid; + return fateId; } catch (NodeExistsException nee) { // exist, so just try another random # } catch (KeeperException | InterruptedException e) { @@ -93,8 +94,8 @@ public long create() { private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private FateTxStoreImpl(long tid, boolean isReserved) { - super(tid, isReserved); + private FateTxStoreImpl(FateId fateId, boolean isReserved) { + super(fateId, isReserved); } private static final int RETRIES = 10; @@ -104,7 +105,7 @@ public Repo top() { verifyReserved(false); for (int i = 0; i < RETRIES; i++) { - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); try { String top; try { @@ -155,7 +156,7 @@ private String findTop(String txpath) throws KeeperException, InterruptedExcepti public void push(Repo repo) throws StackOverflowException { verifyReserved(true); - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); try { String top = findTop(txpath); if (top != null && Long.parseLong(top.split("_")[1]) > 100) { @@ -175,10 +176,10 @@ public void pop() { verifyReserved(true); try { - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); String top = findTop(txpath); if (top == null) { - throw new IllegalStateException("Tried to pop when empty " + FateTxId.formatTid(tid)); + throw new IllegalStateException("Tried to pop when empty " + fateId); } zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP); } catch (KeeperException | InterruptedException e) { @@ -191,7 +192,7 @@ public void setStatus(TStatus status) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), + zk.putPersistentData(getTXPath(fateId), status.name().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); @@ -205,7 +206,7 @@ public void delete() { verifyReserved(true); try { - zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP); + zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -216,7 +217,7 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(tid) + "/" + txInfo, serializeTxInfo(so), + zk.putPersistentData(getTXPath(fateId) + "/" + txInfo, serializeTxInfo(so), NodeExistsPolicy.OVERWRITE); } catch (KeeperException | InterruptedException e2) { throw new IllegalStateException(e2); @@ -228,7 +229,7 @@ public Serializable getTransactionInfo(Fate.TxInfo txInfo) { verifyReserved(false); try { - return deserializeTxInfo(txInfo, zk.getData(getTXPath(tid) + "/" + txInfo)); + return deserializeTxInfo(txInfo, zk.getData(getTXPath(fateId) + "/" + txInfo)); } catch (NoNodeException nne) { return null; } catch (KeeperException | InterruptedException e) { @@ -241,7 +242,7 @@ public long timeCreated() { verifyReserved(false); try { - Stat stat = zk.getZooKeeper().exists(getTXPath(tid), false); + Stat stat = zk.getZooKeeper().exists(getTXPath(fateId), false); return stat.getCtime(); } catch (Exception e) { return 0; @@ -251,7 +252,7 @@ public long timeCreated() { @Override public List> getStack() { verifyReserved(false); - String txpath = getTXPath(tid); + String txpath = getTXPath(fateId); outer: while (true) { List ops; @@ -291,9 +292,9 @@ public List> getStack() { } @Override - protected TStatus _getStatus(long tid) { + protected TStatus _getStatus(FateId fateId) { try { - return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); + return TStatus.valueOf(new String(zk.getData(getTXPath(fateId)), UTF_8)); } catch (NoNodeException nne) { return TStatus.UNKNOWN; } catch (KeeperException | InterruptedException e) { @@ -302,18 +303,20 @@ protected TStatus _getStatus(long tid) { } @Override - protected FateTxStore newFateTxStore(long tid, boolean isReserved) { - return new FateTxStoreImpl(tid, isReserved); + protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { + return new FateTxStoreImpl(fateId, isReserved); } @Override protected Stream getTransactions() { try { return zk.getChildren(path).stream().map(strTxid -> { + String hexTid = strTxid.split("_")[1]; + FateId fateId = FateId.from(fateInstanceType, hexTid); // Memoizing for two reasons. First the status may never be requested, so in that case avoid // the lookup. Second, if its requested multiple times the result will always be consistent. - Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(parseTid(strTxid))); - return new FateIdStatusBase(parseTid(strTxid)) { + Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(fateId)); + return new FateIdStatusBase(fateId) { @Override public TStatus getStatus() { return statusSupplier.get(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index e4c36fd63a3..9ae596bb83e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -34,6 +34,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; @@ -43,7 +45,6 @@ import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -58,6 +59,7 @@ public class AccumuloStore extends AbstractFateStore { private final ClientContext context; private final String tableName; + private static final FateInstanceType fateInstanceType = FateInstanceType.USER; private static final int maxRepos = 100; private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, maxRepos); @@ -77,23 +79,23 @@ public AccumuloStore(ClientContext context) { } @Override - public long create() { + public FateId create() { final int maxAttempts = 5; - long tid = 0L; for (int attempt = 0; attempt < maxAttempts; attempt++) { + FateId fateId = getFateId(); + if (attempt >= 1) { - log.debug("Failed to create new id: {}, trying again", tid); + log.debug("Failed to create new id: {}, trying again", fateId); UtilWaitThread.sleep(100); } - tid = getTid(); - var status = newMutator(tid).requireStatus().putStatus(TStatus.NEW) + var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) .putCreateTime(System.currentTimeMillis()).tryMutate(); switch (status) { case ACCEPTED: - return tid; + return fateId; case UNKNOWN: case REJECTED: continue; @@ -105,8 +107,9 @@ public long create() { throw new IllegalStateException("Failed to create new id after " + maxAttempts + " attempts"); } - public long getTid() { - return RANDOM.get().nextLong() & 0x7fffffffffffffffL; + public FateId getFateId() { + long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + return FateId.from(fateInstanceType, tid); } @Override @@ -116,7 +119,9 @@ protected Stream getTransactions() { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { - return new FateIdStatusBase(parseTid(e.getKey().getRow().toString())) { + String hexTid = e.getKey().getRow().toString().split("_")[1]; + FateId fateId = FateId.from(fateInstanceType, hexTid); + return new FateIdStatusBase(fateId) { @Override public TStatus getStatus() { return TStatus.valueOf(e.getValue().toString()); @@ -129,9 +134,9 @@ public TStatus getStatus() { } @Override - protected TStatus _getStatus(long tid) { + protected TStatus _getStatus(FateId fateId) { return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst() .orElse(TStatus.UNKNOWN); @@ -139,16 +144,16 @@ protected TStatus _getStatus(long tid) { } @Override - protected FateTxStore newFateTxStore(long tid, boolean isReserved) { - return new FateTxStoreImpl(tid, isReserved); + protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { + return new FateTxStoreImpl(fateId, isReserved); } - static Range getRow(long tid) { - return new Range("tx_" + FastFormat.toHexString(tid)); + static Range getRow(FateId fateId) { + return new Range("tx_" + fateId.getHexTid()); } - private FateMutatorImpl newMutator(long tid) { - return new FateMutatorImpl<>(context, tableName, tid); + private FateMutatorImpl newMutator(FateId fateId) { + return new FateMutatorImpl<>(context, tableName, fateId); } private R scanTx(Function func) { @@ -161,8 +166,8 @@ private R scanTx(Function func) { private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private FateTxStoreImpl(long tid, boolean isReserved) { - super(tid, isReserved); + private FateTxStoreImpl(FateId fateId, boolean isReserved) { + super(fateId, isReserved); } @Override @@ -170,7 +175,7 @@ public Repo top() { verifyReserved(false); return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.setBatchSize(1); scanner.fetchColumnFamily(RepoColumnFamily.NAME); return scanner.stream().map(e -> { @@ -186,7 +191,7 @@ public List> getStack() { verifyReserved(false); return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.fetchColumnFamily(RepoColumnFamily.NAME); return scanner.stream().map(e -> { @SuppressWarnings("unchecked") @@ -201,7 +206,7 @@ public Serializable getTransactionInfo(TxInfo txInfo) { verifyReserved(false); try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); final ColumnFQ cq; switch (txInfo) { @@ -237,7 +242,7 @@ public long timeCreated() { verifyReserved(false); return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner); return scanner.stream().map(e -> Long.parseLong(e.getValue().toString())).findFirst() .orElse(0L); @@ -254,7 +259,7 @@ public void push(Repo repo) throws StackOverflowException { throw new StackOverflowException("Repo stack size too large"); } - FateMutator fateMutator = newMutator(tid); + FateMutator fateMutator = newMutator(fateId); fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); } @@ -263,14 +268,14 @@ public void pop() { verifyReserved(true); Optional top = findTop(); - top.ifPresent(t -> newMutator(tid).deleteRepo(t).mutate()); + top.ifPresent(t -> newMutator(fateId).deleteRepo(t).mutate()); } @Override public void setStatus(TStatus status) { verifyReserved(true); - newMutator(tid).putStatus(status).mutate(); + newMutator(fateId).putStatus(status).mutate(); observedStatus = status; } @@ -280,19 +285,19 @@ public void setTransactionInfo(TxInfo txInfo, Serializable so) { final byte[] serialized = serializeTxInfo(so); - newMutator(tid).putTxInfo(txInfo, serialized).mutate(); + newMutator(fateId).putTxInfo(txInfo, serialized).mutate(); } @Override public void delete() { verifyReserved(true); - newMutator(tid).delete().mutate(); + newMutator(fateId).delete().mutate(); } private Optional findTop() { return scanTx(scanner -> { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.setBatchSize(1); scanner.fetchColumnFamily(RepoColumnFamily.NAME); return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java index 7056438d213..c373190487d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java @@ -36,27 +36,27 @@ import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.FastFormat; import org.apache.hadoop.io.Text; public class FateMutatorImpl implements FateMutator { private final ClientContext context; private final String tableName; - private final long tid; + private final FateId fateId; private final ConditionalMutation mutation; - public FateMutatorImpl(ClientContext context, String tableName, long tid) { + public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) { this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); - this.tid = tid; - this.mutation = new ConditionalMutation(new Text("tx_" + FastFormat.toHexString(tid))); + this.fateId = fateId; + this.mutation = new ConditionalMutation(new Text("tx_" + fateId.getHexTid())); } @Override @@ -142,7 +142,7 @@ public FateMutator deleteRepo(int position) { public FateMutator delete() { try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(getRow(tid)); + scanner.setRange(getRow(fateId)); scanner.forEach( (key, value) -> mutation.putDelete(key.getColumnFamily(), key.getColumnQualifier())); } catch (TableNotFoundException e) { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 189df12362c..fe525bf37d3 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -18,16 +18,15 @@ */ package org.apache.accumulo.core.logging; -import static org.apache.accumulo.core.fate.FateTxId.formatTid; - import java.io.Serializable; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.LongConsumer; import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; @@ -57,7 +56,7 @@ private LoggingFateTxStore(FateTxStore wrapped, Function,String> toLo public void push(Repo repo) throws StackOverflowException { super.push(repo); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} pushed {}", formatTid(getID()), toLogString.apply(repo)); + storeLog.trace("{} pushed {}", getID(), toLogString.apply(repo)); } } @@ -65,7 +64,7 @@ public void push(Repo repo) throws StackOverflowException { public void pop() { super.pop(); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} popped", formatTid(getID())); + storeLog.trace("{} popped", getID()); } } @@ -73,7 +72,7 @@ public void pop() { public void setStatus(ReadOnlyFateStore.TStatus status) { super.setStatus(status); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} setStatus to {}", formatTid(getID()), status); + storeLog.trace("{} setStatus to {}", getID(), status); } } @@ -81,7 +80,7 @@ public void setStatus(ReadOnlyFateStore.TStatus status) { public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { super.setTransactionInfo(txInfo, val); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} setting {} to {}", formatTid(getID()), txInfo, val); + storeLog.trace("{} setting {} to {}", getID(), txInfo, val); } } @@ -89,7 +88,7 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { public void delete() { super.delete(); if (storeLog.isTraceEnabled()) { - storeLog.trace("{} deleted fate transaction", formatTid(getID())); + storeLog.trace("{} deleted fate transaction", getID()); } } } @@ -100,18 +99,18 @@ public static FateStore wrap(FateStore store, Function,String> return new FateStore<>() { @Override - public FateTxStore reserve(long tid) { - return new LoggingFateTxStore<>(store.reserve(tid), toLogString); + public FateTxStore reserve(FateId fateId) { + return new LoggingFateTxStore<>(store.reserve(fateId), toLogString); } @Override - public Optional> tryReserve(long tid) { - return store.tryReserve(tid).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); + public Optional> tryReserve(FateId fateId) { + return store.tryReserve(fateId).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); } @Override - public ReadOnlyFateTxStore read(long tid) { - return store.read(tid); + public ReadOnlyFateTxStore read(FateId fateId) { + return store.read(fateId); } @Override @@ -120,10 +119,19 @@ public Stream list() { } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { store.runnable(keepWaiting, idConsumer); } + @Override + public FateId create() { + FateId fateId = store.create(); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} created fate transaction", fateId); + } + return fateId; + } + @Override public int getDeferredCount() { return store.getDeferredCount(); @@ -133,15 +141,6 @@ public int getDeferredCount() { public boolean isDeferredOverflow() { return store.isDeferredOverflow(); } - - @Override - public long create() { - long tid = store.create(); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} created fate transaction", formatTid(tid)); - } - return tid; - } }; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java index ed851917e70..1a5a4fb7086 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java @@ -52,15 +52,15 @@ public void testBasic() throws InterruptedException, KeeperException { cleaner.ageOff(); - long txid1 = testStore.create(); - var txStore1 = testStore.reserve(txid1); + FateId fateId1 = testStore.create(); + var txStore1 = testStore.reserve(fateId1); txStore1.setStatus(TStatus.IN_PROGRESS); txStore1.unreserve(0, TimeUnit.MILLISECONDS); cleaner.ageOff(); - long txid2 = testStore.create(); - var txStore2 = testStore.reserve(txid2); + FateId fateId2 = testStore.create(); + var txStore2 = testStore.reserve(fateId2); txStore2.setStatus(TStatus.IN_PROGRESS); txStore2.setStatus(TStatus.FAILED); txStore2.unreserve(0, TimeUnit.MILLISECONDS); @@ -69,33 +69,33 @@ public void testBasic() throws InterruptedException, KeeperException { tts.time = 6; - long txid3 = testStore.create(); - var txStore3 = testStore.reserve(txid3); + FateId fateId3 = testStore.create(); + var txStore3 = testStore.reserve(fateId3); txStore3.setStatus(TStatus.IN_PROGRESS); txStore3.setStatus(TStatus.SUCCESSFUL); txStore3.unreserve(0, TimeUnit.MILLISECONDS); cleaner.ageOff(); - Long txid4 = testStore.create(); + FateId fateId4 = testStore.create(); cleaner.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); tts.time = 15; cleaner.ageOff(); - assertEquals(Set.of(txid1, txid3, txid4), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); tts.time = 30; cleaner.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); } @Test @@ -104,43 +104,43 @@ public void testNonEmpty() { TestTimeSource tts = new TestTimeSource(); TestStore testStore = new TestStore(); - long txid1 = testStore.create(); - var txStore1 = testStore.reserve(txid1); + FateId fateId1 = testStore.create(); + var txStore1 = testStore.reserve(fateId1); txStore1.setStatus(TStatus.IN_PROGRESS); txStore1.unreserve(0, TimeUnit.MILLISECONDS); - long txid2 = testStore.create(); - var txStore2 = testStore.reserve(txid2); + FateId fateId2 = testStore.create(); + var txStore2 = testStore.reserve(fateId2); txStore2.setStatus(TStatus.IN_PROGRESS); txStore2.setStatus(TStatus.FAILED); txStore2.unreserve(0, TimeUnit.MILLISECONDS); - long txid3 = testStore.create(); - var txStore3 = testStore.reserve(txid3); + FateId fateId3 = testStore.create(); + var txStore3 = testStore.reserve(fateId3); txStore3.setStatus(TStatus.IN_PROGRESS); txStore3.setStatus(TStatus.SUCCESSFUL); txStore3.unreserve(0, TimeUnit.MILLISECONDS); - Long txid4 = testStore.create(); + FateId fateId4 = testStore.create(); FateCleaner cleaner = new FateCleaner<>(testStore, Duration.ofNanos(10), tts); cleaner.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); cleaner.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3, txid4), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); tts.time = 15; cleaner.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); - txStore1 = testStore.reserve(txid1); + txStore1 = testStore.reserve(fateId1); txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); txStore1.unreserve(0, TimeUnit.MILLISECONDS); @@ -148,15 +148,15 @@ public void testNonEmpty() { cleaner.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); - txStore1 = testStore.reserve(txid1); + txStore1 = testStore.reserve(fateId1); txStore1.setStatus(TStatus.FAILED); txStore1.unreserve(0, TimeUnit.MILLISECONDS); cleaner.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); tts.time = 42; @@ -177,31 +177,31 @@ public void testStatusChange() { cleaner.ageOff(); // create a something in the NEW state - long txid1 = testStore.create(); + FateId fateId1 = testStore.create(); // create another that is complete - long txid2 = testStore.create(); - var txStore2 = testStore.reserve(txid2); + FateId fateId2 = testStore.create(); + var txStore2 = testStore.reserve(fateId2); txStore2.setStatus(TStatus.IN_PROGRESS); txStore2.setStatus(TStatus.FAILED); txStore2.unreserve(0, TimeUnit.MILLISECONDS); // create another in the NEW state - long txid3 = testStore.create(); + FateId fateId3 = testStore.create(); // start tracking what can age off, both should be candidates cleaner.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // advance time by 9 hours, nothing should age off tts.time += Duration.ofHours(9).toNanos(); cleaner.ageOff(); - assertEquals(Set.of(txid1, txid2, txid3), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); - var txStore1 = testStore.reserve(txid1); + var txStore1 = testStore.reserve(fateId1); txStore1.setStatus(TStatus.IN_PROGRESS); txStore1.setStatus(TStatus.FAILED); txStore1.unreserve(0, TimeUnit.MILLISECONDS); @@ -211,17 +211,17 @@ public void testStatusChange() { tts.time += Duration.ofHours(2).toNanos(); cleaner.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // advance time by 9 hours, nothing should age off tts.time += Duration.ofHours(9).toNanos(); cleaner.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // advance time by 2 hours, should age off everything tts.time += Duration.ofHours(2).toNanos(); cleaner.ageOff(); - assertEquals(Set.of(), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); } @Test @@ -232,24 +232,24 @@ public void testNewCleaner() { TestStore testStore = new TestStore(); FateCleaner cleaner1 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); - long txid1 = testStore.create(); + FateId fateId1 = testStore.create(); cleaner1.ageOff(); - assertEquals(Set.of(txid1), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); tts.time += Duration.ofHours(5).toNanos(); - long txid2 = testStore.create(); + FateId fateId2 = testStore.create(); cleaner1.ageOff(); - assertEquals(Set.of(txid1, txid2), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId1, fateId2), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); tts.time += Duration.ofHours(6).toNanos(); - long txid3 = testStore.create(); + FateId fateId3 = testStore.create(); cleaner1.ageOff(); - assertEquals(Set.of(txid2, txid3), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // create a new cleaner, it should ignore any data stored by previous cleaner FateCleaner cleaner2 = new FateCleaner<>(testStore, Duration.ofHours(10), tts); @@ -257,18 +257,18 @@ public void testNewCleaner() { tts.time += Duration.ofHours(5).toNanos(); // since this is a new cleaner instance, it should reset the clock cleaner2.ageOff(); - assertEquals(Set.of(txid2, txid3), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // since the clock was reset, advancing time should not age anything off tts.time += Duration.ofHours(9).toNanos(); cleaner2.ageOff(); - assertEquals(Set.of(txid2, txid3), - testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(fateId2, fateId3), + testStore.list().map(FateIdStatus::getFateId).collect(toSet())); // this should advance time enough to age everything off tts.time += Duration.ofHours(2).toNanos(); cleaner2.ageOff(); - assertEquals(Set.of(), testStore.list().map(FateIdStatus::getTxid).collect(toSet())); + assertEquals(Set.of(), testStore.list().map(FateIdStatus::getFateId).collect(toSet())); } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 5df2e0fa0a0..3c81318c54a 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -29,7 +29,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongConsumer; +import java.util.function.Consumer; import java.util.stream.Stream; /** @@ -38,32 +38,35 @@ public class TestStore implements FateStore { private long nextId = 1; - private Map statuses = new HashMap<>(); - private Map> txInfos = new HashMap<>(); - private Set reserved = new HashSet<>(); + private Map statuses = new HashMap<>(); + private Set reserved = new HashSet<>(); + + private static final FateInstanceType fateInstanceType = FateInstanceType.USER; + private Map> txInfos = new HashMap<>(); @Override - public long create() { - statuses.put(nextId, TStatus.NEW); - return nextId++; + public FateId create() { + FateId fateId = FateId.from(fateInstanceType, nextId++); + statuses.put(fateId, TStatus.NEW); + return fateId; } @Override - public FateTxStore reserve(long tid) { - if (reserved.contains(tid)) { + public FateTxStore reserve(FateId fateId) { + if (reserved.contains(fateId)) { throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve } // twice... if test change, then change this - reserved.add(tid); - return new TestFateTxStore(tid); + reserved.add(fateId); + return new TestFateTxStore(fateId); } @Override - public Optional> tryReserve(long tid) { + public Optional> tryReserve(FateId fateId) { synchronized (this) { - if (!reserved.contains(tid)) { - reserve(tid); - return Optional.of(new TestFateTxStore(tid)); + if (!reserved.contains(fateId)) { + reserve(fateId); + return Optional.of(new TestFateTxStore(fateId)); } return Optional.empty(); } @@ -71,10 +74,10 @@ public Optional> tryReserve(long tid) { private class TestFateTxStore implements FateTxStore { - private final long tid; + private final FateId fateId; - TestFateTxStore(long tid) { - this.tid = tid; + TestFateTxStore(FateId fateId) { + this.fateId = fateId; } @Override @@ -89,11 +92,11 @@ public List> getStack() { @Override public TStatus getStatus() { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - TStatus status = statuses.get(tid); + TStatus status = statuses.get(fateId); if (status == null) { return TStatus.UNKNOWN; } @@ -107,7 +110,7 @@ public TStatus waitForStatusChange(EnumSet expected) { @Override public Serializable getTransactionInfo(Fate.TxInfo txInfo) { - var submap = txInfos.get(tid); + var submap = txInfos.get(fateId); if (submap == null) { return null; } @@ -121,8 +124,8 @@ public long timeCreated() { } @Override - public long getID() { - return tid; + public FateId getID() { + return fateId; } @Override @@ -137,42 +140,42 @@ public void pop() { @Override public void setStatus(TStatus status) { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - if (!statuses.containsKey(tid)) { + if (!statuses.containsKey(fateId)) { throw new IllegalStateException(); } - statuses.put(tid, status); + statuses.put(fateId, status); } @Override public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - txInfos.computeIfAbsent(tid, t -> new HashMap<>()).put(txInfo, val); + txInfos.computeIfAbsent(fateId, t -> new HashMap<>()).put(txInfo, val); } @Override public void delete() { - if (!reserved.contains(tid)) { + if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - statuses.remove(tid); + statuses.remove(fateId); } @Override public void unreserve(long deferTime, TimeUnit timeUnit) { - if (!reserved.remove(tid)) { + if (!reserved.remove(fateId)) { throw new IllegalStateException(); } } } @Override - public ReadOnlyFateTxStore read(long tid) { + public ReadOnlyFateTxStore read(FateId fateId) { throw new UnsupportedOperationException(); } @@ -180,7 +183,7 @@ public ReadOnlyFateTxStore read(long tid) { public Stream list() { return new ArrayList<>(statuses.entrySet()).stream().map(e -> new FateIdStatus() { @Override - public long getTxid() { + public FateId getFateId() { return e.getKey(); } @@ -192,7 +195,7 @@ public TStatus getStatus() { } @Override - public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) { + public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { throw new UnsupportedOperationException(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index abef59d7132..e35088bd10f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -80,7 +81,6 @@ import org.apache.accumulo.core.manager.thrift.ThriftPropertyException; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.Validator; import org.apache.accumulo.core.util.tables.TableNameUtil; @@ -126,7 +126,8 @@ public FateServiceHandler(Manager manager) { public TFateId beginFateOperation(TInfo tinfo, TCredentials credentials, TFateInstanceType type) throws ThriftSecurityException { authenticate(credentials); - return new TFateId(type, manager.fate(FateInstanceType.fromThrift(type)).startTransaction()); + return new TFateId(type, + manager.fate(FateInstanceType.fromThrift(type)).startTransaction().getTid()); } @Override @@ -137,6 +138,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate String goalMessage = op.toString() + " "; long tid = opid.getTid(); FateInstanceType type = FateInstanceType.fromThrift(opid.getType()); + FateId fateId = FateId.from(type, tid); switch (op) { case NAMESPACE_CREATE: { @@ -149,7 +151,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Create " + namespace + " namespace."; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace, options)), autoCleanup, goalMessage); break; @@ -168,7 +170,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Rename " + oldName + " namespace to " + newName; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new RenameNamespace(namespaceId, oldName, newName)), autoCleanup, goalMessage); break; @@ -186,7 +188,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Delete namespace Id: " + namespaceId; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup, goalMessage); break; } @@ -248,8 +250,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Create table " + tableName + " " + initialTableState + " with " + splitCount + " splits and initial hosting goal of " + initialHostingGoal; - - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName, timeType, options, splitsPath, splitCount, splitsDirsPath, initialTableState, initialHostingGoal, namespaceId)), @@ -285,7 +286,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to " + oldTableName; try { - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new RenameTable(namespaceId, tableId, oldTableName, newTableName)), autoCleanup, goalMessage); } catch (NamespaceNotFoundException e) { @@ -357,7 +358,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } manager.fate(type).seedTransaction( - op.toString(), tid, new TraceRepo<>(new CloneTable(c.getPrincipal(), namespaceId, + op.toString(), fateId, new TraceRepo<>(new CloneTable(c.getPrincipal(), namespaceId, srcTableId, tableName, propertiesToSet, propertiesToExclude, keepOffline)), autoCleanup, goalMessage); @@ -386,7 +387,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Delete table " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -411,7 +412,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Online table " + tableId; final EnumSet expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -439,7 +440,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Offline table " + tableId; final EnumSet expectedCurrStates = EnumSet.of(TableState.ONLINE, TableState.OFFLINE); - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new ChangeTableState(namespaceId, tableId, tableOp, expectedCurrStates)), autoCleanup, goalMessage); @@ -475,7 +476,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate startRowStr, endRowStr); goalMessage += "Merge table " + tableName + "(" + tableId + ") splits from " + startRowStr + " to " + endRowStr; - manager.fate(type).seedTransaction(op.toString(), tid, new TraceRepo<>( + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -507,7 +508,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Delete table " + tableName + "(" + tableId + ") range " + startRow + " to " + endRow; - manager.fate(type).seedTransaction(op.toString(), tid, new TraceRepo<>( + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>( new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId, startRow, endRow)), autoCleanup, goalMessage); break; @@ -533,7 +534,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Compact table (" + tableId + ") with config " + compactionConfig; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CompactRange(namespaceId, tableId, compactionConfig)), autoCleanup, goalMessage); break; @@ -557,7 +558,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Cancel compaction of table (" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new CancelCompactions(namespaceId, tableId)), autoCleanup, goalMessage); break; } @@ -598,10 +599,10 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Import table with new name: " + tableName + " from " + exportDirs; - manager.fate(type) - .seedTransaction(op.toString(), tid, new TraceRepo<>(new ImportTable(c.getPrincipal(), - tableName, exportDirs, namespaceId, keepMappings, keepOffline)), autoCleanup, - goalMessage); + manager.fate(type).seedTransaction(op.toString(), fateId, + new TraceRepo<>(new ImportTable(c.getPrincipal(), tableName, exportDirs, namespaceId, + keepMappings, keepOffline)), + autoCleanup, goalMessage); break; } case TABLE_EXPORT: { @@ -628,7 +629,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage += "Export table " + tableName + "(" + tableId + ") to " + exportDir; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId, exportDir)), autoCleanup, goalMessage); break; @@ -665,7 +666,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate manager.updateBulkImportStatus(dir, BulkImportState.INITIAL); goalMessage += "Bulk import (v2) " + dir + " to " + tableName + "(" + tableId + ")"; - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new PrepBulkImport(tableId, dir, setTime)), autoCleanup, goalMessage); break; } @@ -707,7 +708,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate goalMessage += "Set Hosting Goal for table: " + tableName + "(" + tableId + ") range: " + tRange + " to: " + goal.name(); - manager.fate(type).seedTransaction(op.toString(), tid, + manager.fate(type).seedTransaction(op.toString(), fateId, new TraceRepo<>(new SetHostingGoal(tableId, namespaceId, tRange, goal)), autoCleanup, goalMessage); break; @@ -784,7 +785,7 @@ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid, Fate } goalMessage = "Splitting " + extent + " for user into " + (splits.size() + 1) + " tablets"; - manager.fate(type).seedTransaction(op.toString(), tid, new PreSplit(extent, splits), + manager.fate(type).seedTransaction(op.toString(), fateId, new PreSplit(extent, splits), autoCleanup, goalMessage); break; } @@ -835,10 +836,11 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI throws ThriftSecurityException, ThriftTableOperationException { authenticate(credentials); - FateInstanceType type = FateInstanceType.fromThrift(opid.getType()); - TStatus status = manager.fate(type).waitForCompletion(opid.getTid()); + FateId fateId = FateId.fromThrift(opid); + FateInstanceType type = fateId.getType(); + TStatus status = manager.fate(type).waitForCompletion(fateId); if (status == TStatus.FAILED) { - Exception e = manager.fate(type).getException(opid.getTid()); + Exception e = manager.fate(type).getException(fateId); if (e instanceof ThriftTableOperationException) { throw (ThriftTableOperationException) e; } else if (e instanceof ThriftSecurityException) { @@ -850,7 +852,7 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI } } - String ret = manager.fate(type).getReturn(opid.getTid()); + String ret = manager.fate(type).getReturn(fateId); if (ret == null) { ret = ""; // thrift does not like returning null } @@ -861,7 +863,8 @@ public String waitForFateOperation(TInfo tinfo, TCredentials credentials, TFateI public void finishFateOperation(TInfo tinfo, TCredentials credentials, TFateId opid) throws ThriftSecurityException { authenticate(credentials); - manager.fate(FateInstanceType.fromThrift(opid.getType())).delete(opid.getTid()); + FateId fateId = FateId.fromThrift(opid); + manager.fate(fateId.getType()).delete(fateId); } protected void authenticate(TCredentials credentials) throws ThriftSecurityException { @@ -942,8 +945,8 @@ private void writeSplitsToFile(Path splitsPath, final List arguments */ public Path mkTempDir(TFateId opid) throws IOException { Volume vol = manager.getVolumeManager().getFirst(); - Path p = vol - .prefixChild("/tmp/fate-" + opid.getType() + "-" + FastFormat.toHexString(opid.getTid())); + FateId fateId = FateId.fromThrift(opid); + Path p = vol.prefixChild("/tmp/fate-" + fateId.getType() + "-" + fateId.getHexTid()); FileSystem fs = vol.getFileSystem(); if (fs.exists(p)) { fs.delete(p, true); @@ -955,12 +958,13 @@ public Path mkTempDir(TFateId opid) throws IOException { @Override public boolean cancelFateOperation(TInfo tinfo, TCredentials credentials, TFateId opid) throws ThriftSecurityException, ThriftNotActiveServiceException { + FateId fateId = FateId.fromThrift(opid); if (!manager.security.canPerformSystemActions(credentials)) { throw new ThriftSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED); } - return manager.fate(FateInstanceType.fromThrift(opid.getType())).cancel(opid.getTid()); + return manager.fate(fateId.getType()).cancel(fateId); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index de1328571c4..0c568862e84 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -57,6 +57,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.manager.thrift.ManagerClientService; @@ -330,14 +331,14 @@ public void shutdownTabletServer(TInfo info, TCredentials c, String tabletServer } Fate fate = manager.fate(FateInstanceType.META); - long tid = fate.startTransaction(); + FateId fateId = fate.startTransaction(); String msg = "Shutdown tserver " + tabletServer; - fate.seedTransaction("ShutdownTServer", tid, + fate.seedTransaction("ShutdownTServer", fateId, new TraceRepo<>(new ShutdownTServer(doomed, force)), false, msg); - fate.waitForCompletion(tid); - fate.delete(tid); + fate.waitForCompletion(fateId); + fate.delete(fateId); log.debug("FATE op shutting down " + tabletServer + " finished"); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java index 18958c92be0..999de0f7e99 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SplitTask.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.util.SortedSet; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.manager.Manager; @@ -85,9 +86,9 @@ public void run() { } var fateInstanceType = FateInstanceType.fromTableId((tablet.getTableId())); - long fateTxId = manager.fate(fateInstanceType).startTransaction(); + FateId fateId = manager.fate(fateInstanceType).startTransaction(); - manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", fateTxId, + manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT", fateId, new PreSplit(extent, splits), true, "System initiated split of tablet " + extent + " into " + splits.size() + " splits"); } catch (Exception e) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index 380876aee07..ee9635b7e58 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; @@ -166,18 +167,18 @@ protected void testTransactionStatus(FateStore store, ServerContext sct callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - assertEquals(TStatus.NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new TestRepo("testTransactionStatus"), true, + FateId fateId = fate.startTransaction(); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testTransactionStatus"), true, "Test Op"); - assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid)); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); // wait for call() to be called callStarted.await(); - assertEquals(IN_PROGRESS, getTxStatus(sctx, txid)); + assertEquals(IN_PROGRESS, getTxStatus(sctx, fateId)); // tell the op to exit the method finishCall.countDown(); - Wait.waitFor(() -> getTxStatus(sctx, txid) == UNKNOWN); + Wait.waitFor(() -> getTxStatus(sctx, fateId) == UNKNOWN); } finally { fate.shutdown(); } @@ -198,20 +199,20 @@ protected void testCancelWhileNew(FateStore store, ServerContext sctx) callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileNew with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(sctx, txid)); + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileNew with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); // cancel the transaction - assertTrue(fate.cancel(txid)); + assertTrue(fate.cancel(fateId)); assertTrue( - FAILED_IN_PROGRESS == getTxStatus(sctx, txid) || FAILED == getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new TestRepo("testCancelWhileNew"), true, + FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED == getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileNew"), true, "Test Op"); - Wait.waitFor(() -> FAILED == getTxStatus(sctx, txid)); + Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId)); // nothing should have run assertEquals(1, callStarted.getCount()); - fate.delete(txid); - assertEquals(UNKNOWN, getTxStatus(sctx, txid)); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); } finally { fate.shutdown(); } @@ -233,20 +234,20 @@ protected void testCancelWhileSubmittedAndRunning(FateStore store, Serv callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileSubmitted with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileSubmittedAndRunning"), false, "Test Op"); - Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, txid)); + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId)); // This is false because the transaction runner has reserved the FaTe // transaction. - assertFalse(fate.cancel(txid)); + assertFalse(fate.cancel(fateId)); callStarted.await(); finishCall.countDown(); - Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, txid)); - fate.delete(txid); - assertEquals(UNKNOWN, getTxStatus(sctx, txid)); + Wait.waitFor(() -> IN_PROGRESS != getTxStatus(sctx, fateId)); + fate.delete(fateId); + assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); } finally { fate.shutdown(); } @@ -268,16 +269,16 @@ protected void testCancelWhileInCall(FateStore store, ServerContext sct callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); - long txid = fate.startTransaction(); - LOG.debug("Starting test testCancelWhileInCall with {}", FateTxId.formatTid(txid)); - assertEquals(NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new TestRepo("testCancelWhileInCall"), true, + FateId fateId = fate.startTransaction(); + LOG.debug("Starting test testCancelWhileInCall with {}", fateId); + assertEquals(NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new TestRepo("testCancelWhileInCall"), true, "Test Op"); - assertEquals(SUBMITTED, getTxStatus(sctx, txid)); + assertEquals(SUBMITTED, getTxStatus(sctx, fateId)); // wait for call() to be called callStarted.await(); // cancel the transaction - assertFalse(fate.cancel(txid)); + assertFalse(fate.cancel(fateId)); } finally { fate.shutdown(); } @@ -306,7 +307,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx // so it will be deferred when submitted DeferredTestRepo.delay.set(30000); - Set transactions = new HashSet<>(); + Set transactions = new HashSet<>(); // Start by creating 10 transactions that are all deferred which should // fill up the deferred map with all 10 as we set the max deferred limit @@ -346,7 +347,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx // Verify all 20 unique transactions finished Wait.waitFor(() -> { - transactions.removeIf(txid -> getTxStatus(sctx, txid) == UNKNOWN); + transactions.removeIf(fateId -> getTxStatus(sctx, fateId) == UNKNOWN); return transactions.isEmpty(); }); @@ -355,13 +356,13 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx } } - private void submitDeferred(Fate fate, ServerContext sctx, Set transactions) { - long txid = fate.startTransaction(); - transactions.add(txid); - assertEquals(TStatus.NEW, getTxStatus(sctx, txid)); - fate.seedTransaction("TestOperation", txid, new DeferredTestRepo("testDeferredOverflow"), true, - "Test Op"); - assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, txid)); + private void submitDeferred(Fate fate, ServerContext sctx, Set transactions) { + FateId fateId = fate.startTransaction(); + transactions.add(fateId); + assertEquals(TStatus.NEW, getTxStatus(sctx, fateId)); + fate.seedTransaction("TestOperation", fateId, new DeferredTestRepo("testDeferredOverflow"), + true, "Test Op"); + assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId)); } protected Fate initializeFate(FateStore store) { @@ -371,7 +372,7 @@ protected Fate initializeFate(FateStore store) { return new Fate<>(new TestEnv(), store, r -> r + "", config); } - protected abstract TStatus getTxStatus(ServerContext sctx, long txid); + protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); private static void inCall() throws InterruptedException { // signal that call started diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java index 0dec7e442b2..0da99ada403 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java @@ -25,11 +25,11 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; @@ -62,9 +62,9 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exc } @Override - protected TStatus getTxStatus(ServerContext context, long txid) { + protected TStatus getTxStatus(ServerContext context, FateId fateId) { try (Scanner scanner = context.createScanner(table, Authorizations.EMPTY)) { - scanner.setRange(getRow(txid)); + scanner.setRange(getRow(fateId)); TxColumnFamily.STATUS_COLUMN.fetch(scanner); return StreamSupport.stream(scanner.spliterator(), false) .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); @@ -73,7 +73,7 @@ protected TStatus getTxStatus(ServerContext context, long txid) { } } - private static Range getRow(long tid) { - return new Range("tx_" + FastFormat.toHexString(tid)); + private static Range getRow(FateId fateId) { + return new Range("tx_" + fateId.getHexTid()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java index 88c2ac48844..af9280f8500 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java @@ -25,9 +25,12 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.junit.jupiter.api.AfterAll; @@ -39,6 +42,7 @@ public class AccumuloStoreIT extends SharedMiniClusterBase { private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class); + private static final FateInstanceType fateInstanceType = FateInstanceType.USER; @BeforeAll public static void setup() throws Exception { @@ -51,20 +55,20 @@ public static void teardown() { } private static class TestAccumuloStore extends AccumuloStore { - private final Iterator tidIterator; + private final Iterator fateIdIterator; - // use the list of txids to simulate collisions on txids - public TestAccumuloStore(ClientContext context, String tableName, List txids) { + // use the list of fateIds to simulate collisions on fateIds + public TestAccumuloStore(ClientContext context, String tableName, List fateIds) { super(context, tableName); - this.tidIterator = txids.iterator(); + this.fateIdIterator = fateIds.iterator(); } @Override - public long getTid() { - if (tidIterator.hasNext()) { - return tidIterator.next(); + public FateId getFateId() { + if (fateIdIterator.hasNext()) { + return fateIdIterator.next(); } else { - return -1L; + return FateId.from(fateInstanceType, -1L); } } } @@ -77,14 +81,16 @@ public void testCreateWithCollisionAndExceedRetryLimit() throws Exception { client.tableOperations().create(table); List txids = List.of(1L, 1L, 1L, 2L, 3L, 3L, 3L, 3L, 4L, 4L, 5L, 5L, 5L, 5L, 5L, 5L); - Set expectedTids = new TreeSet<>(txids); - TestAccumuloStore store = new TestAccumuloStore(client, table, txids); + List fateIds = txids.stream().map(txid -> FateId.from(fateInstanceType, txid)) + .collect(Collectors.toList()); + Set expectedFateIds = new TreeSet<>(fateIds); + TestAccumuloStore store = new TestAccumuloStore(client, table, fateIds); // call create and expect we get the unique txids - for (Long expectedTid : expectedTids) { - long tid = store.create(); - log.info("Created tid: " + tid); - assertEquals(expectedTid, tid, "Expected " + expectedTid + " but got " + tid); + for (FateId expectedFateId : expectedFateIds) { + FateId fateId = store.create(); + log.info("Created fate id: " + fateId); + assertEquals(expectedFateId, fateId, "Expected " + expectedFateId + " but got " + fateId); } // Calling create again on 5L should throw an exception since we've exceeded the max retries diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java index 27e6dd650ba..98b7da72c9c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java @@ -31,6 +31,8 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.accumulo.FateMutatorImpl; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -71,21 +73,22 @@ public void putRepo() throws Exception { ClientContext context = (ClientContext) client; final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + FateId fateId = FateId.from(FateInstanceType.USER, tid); // add some repos in order - FateMutatorImpl fateMutator = new FateMutatorImpl<>(context, table, tid); + FateMutatorImpl fateMutator = new FateMutatorImpl<>(context, table, fateId); fateMutator.putRepo(100, new FateIT.TestRepo("test")).mutate(); - FateMutatorImpl fateMutator1 = new FateMutatorImpl<>(context, table, tid); + FateMutatorImpl fateMutator1 = new FateMutatorImpl<>(context, table, fateId); fateMutator1.putRepo(99, new FateIT.TestRepo("test")).mutate(); - FateMutatorImpl fateMutator2 = new FateMutatorImpl<>(context, table, tid); + FateMutatorImpl fateMutator2 = new FateMutatorImpl<>(context, table, fateId); fateMutator2.putRepo(98, new FateIT.TestRepo("test")).mutate(); // make sure we cant add a repo that has already been added - FateMutatorImpl fateMutator3 = new FateMutatorImpl<>(context, table, tid); + FateMutatorImpl fateMutator3 = new FateMutatorImpl<>(context, table, fateId); assertThrows(IllegalStateException.class, () -> fateMutator3.putRepo(98, new FateIT.TestRepo("test")).mutate(), "Repo in position 98 already exists. Expected to not be able to add it again."); - FateMutatorImpl fateMutator4 = new FateMutatorImpl<>(context, table, tid); + FateMutatorImpl fateMutator4 = new FateMutatorImpl<>(context, table, fateId); assertThrows(IllegalStateException.class, () -> fateMutator4.putRepo(99, new FateIT.TestRepo("test")).mutate(), "Repo in position 99 already exists. Expected to not be able to add it again."); @@ -101,62 +104,63 @@ public void requireStatus() throws Exception { ClientContext context = (ClientContext) client; final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + FateId fateId = FateId.from(FateInstanceType.USER, tid); // use require status passing all statuses. without the status column present this should fail assertThrows(IllegalStateException.class, - () -> new FateMutatorImpl<>(context, table, tid) + () -> new FateMutatorImpl<>(context, table, fateId) .requireStatus(ReadOnlyFateStore.TStatus.values()) .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate()); assertEquals(0, client.createScanner(table).stream().count()); - var status = new FateMutatorImpl<>(context, table, tid) + var status = new FateMutatorImpl<>(context, table, fateId) .requireStatus(ReadOnlyFateStore.TStatus.values()) .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); assertEquals(REJECTED, status); assertEquals(0, client.createScanner(table).stream().count()); // use require status without passing any statuses to require that the status column is absent - status = new FateMutatorImpl<>(context, table, tid).requireStatus() + status = new FateMutatorImpl<>(context, table, fateId).requireStatus() .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); assertEquals(ACCEPTED, status); // try again with requiring an absent status column. this time it should fail because we just // put status NEW assertThrows(IllegalStateException.class, - () -> new FateMutatorImpl<>(context, table, tid).requireStatus() + () -> new FateMutatorImpl<>(context, table, fateId).requireStatus() .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate(), "Expected to not be able to use requireStatus() without passing any statuses"); - status = new FateMutatorImpl<>(context, table, tid).requireStatus() + status = new FateMutatorImpl<>(context, table, fateId).requireStatus() .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); assertEquals(REJECTED, status, "Expected to not be able to use requireStatus() without passing any statuses"); // now use require same with the current status, NEW passed in status = - new FateMutatorImpl<>(context, table, tid).requireStatus(ReadOnlyFateStore.TStatus.NEW) + new FateMutatorImpl<>(context, table, fateId).requireStatus(ReadOnlyFateStore.TStatus.NEW) .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); assertEquals(ACCEPTED, status); // use require same with an array of statuses, none of which are the current status // (SUBMITTED) assertThrows(IllegalStateException.class, - () -> new FateMutatorImpl<>(context, table, tid) + () -> new FateMutatorImpl<>(context, table, fateId) .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN) .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).mutate(), "Expected to not be able to use requireStatus() with statuses that do not match the current status"); - status = new FateMutatorImpl<>(context, table, tid) + status = new FateMutatorImpl<>(context, table, fateId) .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN) .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); assertEquals(REJECTED, status, "Expected to not be able to use requireStatus() with statuses that do not match the current status"); // use require same with an array of statuses, one of which is the current status (SUBMITTED) - status = new FateMutatorImpl<>(context, table, tid) + status = new FateMutatorImpl<>(context, table, fateId) .requireStatus(ReadOnlyFateStore.TStatus.UNKNOWN, ReadOnlyFateStore.TStatus.SUBMITTED) .putStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS).tryMutate(); assertEquals(ACCEPTED, status); // one more time check that we can use require same with the current status (IN_PROGRESS) - status = new FateMutatorImpl<>(context, table, tid) + status = new FateMutatorImpl<>(context, table, fateId) .requireStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS) .putStatus(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS).tryMutate(); assertEquals(ACCEPTED, status); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java index 8ddd3b81b0e..3c41fd0705f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; @@ -66,8 +67,8 @@ protected void testReadWrite(FateStore store, ServerContext sctx) assertEquals(0, store.list().count()); // Create a new transaction and get the store for it - long tid = store.create(); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + FateTxStore txStore = store.reserve(fateId); assertTrue(txStore.timeCreated() > 0); assertEquals(1, store.list().count()); @@ -121,8 +122,8 @@ public void testReadWriteTxInfo() throws Exception { } protected void testReadWriteTxInfo(FateStore store, ServerContext sctx) { - long tid = store.create(); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + FateTxStore txStore = store.reserve(fateId); try { // Go through all enum values to verify each TxInfo type will be properly @@ -150,11 +151,11 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx assertFalse(store.isDeferredOverflow()); // Store 10 transactions that are all deferred - final Set transactions = new HashSet<>(); + final Set transactions = new HashSet<>(); for (int i = 0; i < 10; i++) { - long tid = store.create(); - transactions.add(tid); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + transactions.add(fateId); + FateTxStore txStore = store.reserve(fateId); txStore.setStatus(TStatus.SUBMITTED); assertTrue(txStore.timeCreated() > 0); txStore.unreserve(10, TimeUnit.SECONDS); @@ -183,9 +184,9 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx // Store one more that should go over the max deferred of 10 // and should clear the map and set the overflow flag - long tid = store.create(); - transactions.add(tid); - FateTxStore txStore = store.reserve(tid); + FateId fateId = store.create(); + transactions.add(fateId); + FateTxStore txStore = store.reserve(fateId); txStore.setStatus(TStatus.SUBMITTED); txStore.unreserve(30, TimeUnit.SECONDS); @@ -216,7 +217,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx } finally { executor.shutdownNow(); // Cleanup so we don't interfere with other tests - store.list().forEach(fateIdStatus -> store.reserve(fateIdStatus.getTxid()).delete()); + store.list().forEach(fateIdStatus -> store.reserve(fateIdStatus.getFateId()).delete()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java index 64a18d38a1c..bd58df57009 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -74,9 +75,9 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exc } @Override - protected TStatus getTxStatus(ServerContext sctx, long txid) { + protected TStatus getTxStatus(ServerContext sctx, FateId fateId) { try { - return getTxStatus(sctx.getZooReaderWriter(), txid); + return getTxStatus(sctx.getZooReaderWriter(), fateId); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -86,10 +87,10 @@ protected TStatus getTxStatus(ServerContext sctx, long txid) { * Get the status of the TX from ZK directly. Unable to call ZooStore.getStatus because this test * thread does not have the reservation (the FaTE thread does) */ - private static TStatus getTxStatus(ZooReaderWriter zrw, long txid) + private static TStatus getTxStatus(ZooReaderWriter zrw, FateId fateId) throws KeeperException, InterruptedException { zrw.sync(ZK_ROOT); - String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid); + String txdir = String.format("%s%s/tx_%s", ZK_ROOT, Constants.ZFATE, fateId.getHexTid()); try { return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); } catch (KeeperException.NoNodeException e) {