Skip to content

Commit

Permalink
ZooStore deferral time to use System.nanoTime()
Browse files Browse the repository at this point in the history
- ZooStore() now uses System.nanoTime() instead of
  System.currentTimeMillis()
- Added TimeUnit param to unreserve()
- Renamed 'defered' -> 'deferred'
  • Loading branch information
kevinrr888 committed Jan 3, 2024
1 parent d973179 commit 1b885d8
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.FateLock;
Expand Down Expand Up @@ -367,7 +368,7 @@ private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTx

long timeCreated = zs.timeCreated(tid);

zs.unreserve(tid, 0);
zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
Expand Down Expand Up @@ -450,7 +451,7 @@ public boolean prepDelete(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath path
break;
}

zs.unreserve(txid, 0);
zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
return state;
}

Expand Down Expand Up @@ -494,7 +495,7 @@ public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockM
break;
}

zs.unreserve(txid, 0);
zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void ageOff() {
}

} finally {
store.unreserve(txid, 0);
store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
Expand Down Expand Up @@ -137,7 +138,7 @@ public AgeOffStore(ZooStore<T> store, long ageOffTime, TimeSource timeSource) {
break;
}
} finally {
store.unreserve(txid, 0);
store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
}
}
}
Expand Down Expand Up @@ -165,8 +166,8 @@ public boolean tryReserve(long tid) {
}

@Override
public void unreserve(long tid, long deferTime) {
store.unreserve(tid, deferTime);
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
store.unreserve(tid, deferTime, deferTimeUnit);
}

@Override
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand Down Expand Up @@ -131,7 +132,7 @@ public void run() {
runnerLog.error("Uncaught exception in FATE runner thread.", e);
} finally {
if (tid != null) {
store.unreserve(tid, deferTime);
store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
}
}
}
Expand Down Expand Up @@ -295,7 +296,7 @@ public void seedTransaction(String txName, long tid, Repo<T> repo, boolean autoC
store.setStatus(tid, SUBMITTED);
}
} finally {
store.unreserve(tid, 0);
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
}

}
Expand Down Expand Up @@ -331,7 +332,7 @@ public boolean cancel(long tid) {
return false;
}
} finally {
store.unreserve(tid, 0);
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
}
} else {
// reserved, lets retry.
Expand Down Expand Up @@ -362,7 +363,7 @@ public void delete(long tid) {
break;
}
} finally {
store.unreserve(tid, 0);
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -375,7 +376,7 @@ public String getReturn(long tid) {
}
return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
} finally {
store.unreserve(tid, 0);
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
}
}

Expand All @@ -389,7 +390,7 @@ public Exception getException(long tid) {
}
return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
} finally {
store.unreserve(tid, 0);
store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Read only access to a Transaction Store.
Expand Down Expand Up @@ -76,10 +77,11 @@ enum TStatus {
* longer interact with it.
*
* @param tid transaction id, previously reserved.
* @param deferTime time in millis to keep this transaction out of the pool used in the
* {@link #reserve() reserve} method. must be non-negative.
* @param deferTime time to keep this transaction out of the pool used in the {@link #reserve()
* reserve} method. must be non-negative.
* @param deferTimeUnit the time unit of deferTime
*/
void unreserve(long tid, long deferTime);
void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit);

/**
* Get the current operation for the given transaction id.
Expand Down
23 changes: 13 additions & 10 deletions core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
Expand All @@ -61,7 +62,7 @@ public class ZooStore<T> implements TStore<T> {
private ZooReaderWriter zk;
private String lastReserved = "";
private Set<Long> reserved;
private Map<Long,Long> defered;
private Map<Long,Long> deferred;
private static final SecureRandom random = new SecureRandom();
private long statusChangeEvents = 0;
private int reservationsWaiting = 0;
Expand Down Expand Up @@ -106,7 +107,7 @@ public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, Interru
this.path = path;
this.zk = zk;
this.reserved = new HashSet<>();
this.defered = new HashMap<>();
this.deferred = new HashMap<>();

zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP);
}
Expand Down Expand Up @@ -163,9 +164,9 @@ public long reserve() {
continue;
}

if (defered.containsKey(tid)) {
if (defered.get(tid) < System.currentTimeMillis()) {
defered.remove(tid);
if (deferred.containsKey(tid)) {
if ((deferred.get(tid) - System.nanoTime()) < 0) {
deferred.remove(tid);
} else {
continue;
}
Expand Down Expand Up @@ -200,11 +201,12 @@ public long reserve() {
synchronized (this) {
// suppress lgtm alert - synchronized variable is not always true
if (events == statusChangeEvents) { // lgtm [java/constant-comparison]
if (defered.isEmpty()) {
if (deferred.isEmpty()) {
this.wait(5000);
} else {
Long minTime = Collections.min(defered.values());
long waitTime = minTime - System.currentTimeMillis();
Long minTime = Collections.min(deferred.values());
long waitTime = minTime - System.nanoTime();
waitTime = TimeUnit.MILLISECONDS.convert(waitTime, TimeUnit.NANOSECONDS);
if (waitTime > 0) {
this.wait(Math.min(waitTime, 5000));
}
Expand Down Expand Up @@ -271,7 +273,8 @@ private void unreserve(long tid) {
}

@Override
public void unreserve(long tid, long deferTime) {
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
deferTime = TimeUnit.NANOSECONDS.convert(deferTime, deferTimeUnit);

if (deferTime < 0) {
throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
Expand All @@ -284,7 +287,7 @@ public void unreserve(long tid, long deferTime) {
}

if (deferTime > 0) {
defered.put(tid, System.currentTimeMillis() + deferTime);
deferred.put(tid, System.nanoTime() + deferTime);
}

this.notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.Serializable;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.accumulo.core.fate.Fate;
Expand Down Expand Up @@ -61,8 +62,8 @@ public boolean tryReserve(long tid) {
}

@Override
public void unreserve(long tid, long deferTime) {
store.unreserve(tid, deferTime);
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
store.unreserve(tid, deferTime, deferTimeUnit);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
Expand Down Expand Up @@ -52,23 +53,23 @@ public void testBasic() throws InterruptedException, KeeperException {
long txid1 = aoStore.create();
aoStore.reserve(txid1);
aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
aoStore.unreserve(txid1, 0);
aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);

aoStore.ageOff();

long txid2 = aoStore.create();
aoStore.reserve(txid2);
aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
aoStore.setStatus(txid2, TStatus.FAILED);
aoStore.unreserve(txid2, 0);
aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);

tts.time = 6;

long txid3 = aoStore.create();
aoStore.reserve(txid3);
aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
aoStore.unreserve(txid3, 0);
aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);

Long txid4 = aoStore.create();

Expand Down Expand Up @@ -101,19 +102,19 @@ public void testNonEmpty() throws InterruptedException, KeeperException {
long txid1 = testStore.create();
testStore.reserve(txid1);
testStore.setStatus(txid1, TStatus.IN_PROGRESS);
testStore.unreserve(txid1, 0);
testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);

long txid2 = testStore.create();
testStore.reserve(txid2);
testStore.setStatus(txid2, TStatus.IN_PROGRESS);
testStore.setStatus(txid2, TStatus.FAILED);
testStore.unreserve(txid2, 0);
testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);

long txid3 = testStore.create();
testStore.reserve(txid3);
testStore.setStatus(txid3, TStatus.IN_PROGRESS);
testStore.setStatus(txid3, TStatus.SUCCESSFUL);
testStore.unreserve(txid3, 0);
testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);

Long txid4 = testStore.create();

Expand All @@ -136,7 +137,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException {

aoStore.reserve(txid1);
aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
aoStore.unreserve(txid1, 0);
aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);

tts.time = 30;

Expand All @@ -147,7 +148,7 @@ public void testNonEmpty() throws InterruptedException, KeeperException {

aoStore.reserve(txid1);
aoStore.setStatus(txid1, TStatus.FAILED);
aoStore.unreserve(txid1, 0);
aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);

aoStore.ageOff();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* Transient in memory store for transactions.
Expand Down Expand Up @@ -61,7 +62,7 @@ public boolean tryReserve(long tid) {
}

@Override
public void unreserve(long tid, long deferTime) {
public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
if (!reserved.remove(tid)) {
throw new IllegalStateException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.PrintStream;
import java.nio.file.Files;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
Expand Down Expand Up @@ -144,7 +145,7 @@ public void testFailTx() throws Exception {
expectLastCall().once();
zs.setStatus(tid, ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS);
expectLastCall().once();
zs.unreserve(tid, 0);
zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);
expectLastCall().once();

TestHelper helper = new TestHelper(true);
Expand Down

0 comments on commit 1b885d8

Please sign in to comment.