Skip to content

Commit

Permalink
Globally Unique FATE Transaction Ids - Part 1 (#4191)
Browse files Browse the repository at this point in the history
The end goal is to have the stronger type FateId replace the current representation of a transaction id (which is just a long). This was brought about from the addition of the AccumuloStore class - there are now two fate instance types associated with a transaction - META (for ZooStore) or USER (for AccumuloStore). FateId is a new class which includes the FateInstanceType and the transaction id. This commit is for #4044
  • Loading branch information
kevinrr888 authored Feb 1, 2024
1 parent 11949de commit 1d51e46
Show file tree
Hide file tree
Showing 23 changed files with 563 additions and 424 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
Expand All @@ -55,8 +55,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
// all existing transactions are processed immediately again
protected static final int DEFAULT_MAX_DEFERRED = 100_000;

protected final Set<Long> reserved;
protected final Map<Long,Long> deferred;
protected final Set<FateId> reserved;
protected final Map<FateId,Long> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();

Expand Down Expand Up @@ -101,26 +101,26 @@ public static Object deserialize(byte[] ser) {
}

/**
* Attempt to reserve transaction
* Attempt to reserve the fate transaction.
*
* @param tid transaction id
* @param fateId The FateId
* @return An Optional containing the FateTxStore if the transaction was successfully reserved, or
* an empty Optional if the transaction was already reserved.
*/
@Override
public Optional<FateTxStore<T>> tryReserve(long tid) {
synchronized (AbstractFateStore.this) {
if (!reserved.contains(tid)) {
return Optional.of(reserve(tid));
public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
synchronized (this) {
if (!reserved.contains(fateId)) {
return Optional.of(reserve(fateId));
}
return Optional.empty();
}
}

@Override
public FateTxStore<T> reserve(long tid) {
public FateTxStore<T> reserve(FateId fateId) {
synchronized (AbstractFateStore.this) {
while (reserved.contains(tid)) {
while (reserved.contains(fateId)) {
try {
AbstractFateStore.this.wait(100);
} catch (InterruptedException e) {
Expand All @@ -129,13 +129,13 @@ public FateTxStore<T> reserve(long tid) {
}
}

reserved.add(tid);
return newFateTxStore(tid, true);
reserved.add(fateId);
return newFateTxStore(fateId, true);
}
}

@Override
public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) {
public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {

AtomicLong seen = new AtomicLong(0);

Expand All @@ -145,21 +145,21 @@ public void runnable(AtomicBoolean keepWaiting, LongConsumer idConsumer) {

try (Stream<FateIdStatus> transactions = getTransactions()) {
transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus()))
.mapToLong(FateIdStatus::getTxid).filter(txid -> {
.map(FateIdStatus::getFateId).filter(fateId -> {
synchronized (AbstractFateStore.this) {
var deferredTime = deferred.get(txid);
var deferredTime = deferred.get(fateId);
if (deferredTime != null) {
if ((deferredTime - System.nanoTime()) >= 0) {
return false;
} else {
deferred.remove(txid);
deferred.remove(fateId);
}
}
return !reserved.contains(txid);
return !reserved.contains(fateId);
}
}).forEach(txid -> {
}).forEach(fateId -> {
seen.incrementAndGet();
idConsumer.accept(txid);
idConsumer.accept(fateId);
});
}

Expand Down Expand Up @@ -202,29 +202,25 @@ public Stream<FateIdStatus> list() {
}

@Override
public ReadOnlyFateTxStore<T> read(long tid) {
return newFateTxStore(tid, false);
public ReadOnlyFateTxStore<T> read(FateId fateId) {
return newFateTxStore(fateId, false);
}

protected boolean isRunnable(TStatus status) {
return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS
|| status == TStatus.SUBMITTED;
}

protected long parseTid(String txdir) {
return Long.parseLong(txdir.split("_")[1], 16);
}

public static abstract class FateIdStatusBase implements FateIdStatus {
private final long txid;
private final FateId fateId;

public FateIdStatusBase(long txid) {
this.txid = txid;
public FateIdStatusBase(FateId fateId) {
this.fateId = fateId;
}

@Override
public long getTxid() {
return txid;
public FateId getFateId() {
return fateId;
}
}

Expand All @@ -245,30 +241,30 @@ public int getDeferredCount() {

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(long tid);
protected abstract TStatus _getStatus(FateId fateId);

protected abstract FateTxStore<T> newFateTxStore(long tid, boolean isReserved);
protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved);

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final long tid;
protected final FateId fateId;
protected final boolean isReserved;

protected TStatus observedStatus = null;

protected AbstractFateTxStoreImpl(long tid, boolean isReserved) {
this.tid = tid;
protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
this.fateId = fateId;
this.isReserved = isReserved;
}

@Override
public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
Preconditions.checkState(!isReserved,
"Attempted to wait for status change while reserved " + FateTxId.formatTid(getID()));
"Attempted to wait for status change while reserved " + fateId);
while (true) {

long countBefore = unreservedNonNewCount.getCount();

TStatus status = _getStatus(tid);
TStatus status = _getStatus(fateId);
if (expected.contains(status)) {
return status;
}
Expand All @@ -286,9 +282,8 @@ public void unreserve(long deferTime, TimeUnit timeUnit) {
}

synchronized (AbstractFateStore.this) {
if (!reserved.remove(tid)) {
throw new IllegalStateException(
"Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid));
if (!reserved.remove(fateId)) {
throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId);
}

// notify any threads waiting to reserve
Expand All @@ -306,7 +301,7 @@ public void unreserve(long deferTime, TimeUnit timeUnit) {
deferredOverflow.set(true);
deferred.clear();
} else {
deferred.put(tid, System.nanoTime() + deferTime);
deferred.put(fateId, System.nanoTime() + deferTime);
}
}
}
Expand All @@ -327,9 +322,8 @@ protected void verifyReserved(boolean isWrite) {

if (isReserved) {
synchronized (AbstractFateStore.this) {
if (!reserved.contains(tid)) {
throw new IllegalStateException(
"Tried to operate on unreserved transaction " + FateTxId.formatTid(tid));
if (!reserved.contains(fateId)) {
throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId);
}
}
}
Expand All @@ -338,14 +332,14 @@ protected void verifyReserved(boolean isWrite) {
@Override
public TStatus getStatus() {
verifyReserved(false);
var status = _getStatus(tid);
var status = _getStatus(fateId);
observedStatus = status;
return status;
}

@Override
public long getID() {
return tid;
public FateId getID() {
return fateId;
}

protected byte[] serializeTxInfo(Serializable so) {
Expand Down
27 changes: 17 additions & 10 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,22 @@ private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T
final List<TransactionStatus> statuses = new ArrayList<>();

fateStores.forEach((type, store) -> {
try (Stream<Long> tids = store.list().map(FateIdStatus::getTxid)) {
tids.forEach(tid -> {
try (Stream<FateId> fateIds = store.list().map(FateIdStatus::getFateId)) {
fateIds.forEach(fateId -> {

ReadOnlyFateTxStore<T> txStore = store.read(tid);
ReadOnlyFateTxStore<T> txStore = store.read(fateId);

String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);

List<String> hlocks = heldLocks.remove(tid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
List<String> hlocks = heldLocks.remove(fateId.getTid());

if (hlocks == null) {
hlocks = Collections.emptyList();
}

List<String> wlocks = waitingLocks.remove(tid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
List<String> wlocks = waitingLocks.remove(fateId.getTid());

if (wlocks == null) {
wlocks = Collections.emptyList();
Expand All @@ -398,9 +400,10 @@ private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T

long timeCreated = txStore.timeCreated();

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(
new TransactionStatus(tid, type, status, txName, hlocks, wlocks, top, timeCreated));
// ELASTICITY_TODO DEFERRED - ISSUE 4044
if (includeByStatus(status, filterStatus) && includeByTxid(fateId.getTid(), filterTxid)) {
statuses.add(new TransactionStatus(fateId.getTid(), type, status, txName, hlocks,
wlocks, top, timeCreated));
}
});
}
Expand Down Expand Up @@ -461,7 +464,9 @@ public boolean prepDelete(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath p
return false;
}
boolean state = false;
FateTxStore<T> txStore = zs.reserve(txid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
FateId fateId = FateId.from(FateInstanceType.META, txid);
FateTxStore<T> txStore = zs.reserve(fateId);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
Expand Down Expand Up @@ -500,7 +505,9 @@ public boolean prepFail(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLo
return false;
}
boolean state = false;
FateTxStore<T> txStore = zs.reserve(txid);
// ELASTICITY_TODO DEFERRED - ISSUE 4044
FateId fateId = FateId.from(FateInstanceType.META, txid);
FateTxStore<T> txStore = zs.reserve(fateId);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
Expand Down
Loading

0 comments on commit 1d51e46

Please sign in to comment.