Skip to content

Commit

Permalink
Merge branch 'grpc' into accumulo-4664-grpc
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Aug 24, 2024
2 parents fa3a8b6 + 7d85cef commit 32f09d1
Show file tree
Hide file tree
Showing 94 changed files with 1,022 additions and 803 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonService;
import org.apache.accumulo.core.util.Interner;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.time.NanoTime;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -311,7 +311,7 @@ public static class CachedTablet {
private final TabletAvailability availability;
private final boolean hostingRequested;

private final NanoTime creationTime = NanoTime.now();
private final Timer creationTimer = Timer.startNew();

public CachedTablet(KeyExtent tablet_extent, String tablet_location, String session,
TabletAvailability availability, boolean hostingRequested) {
Expand Down Expand Up @@ -392,8 +392,11 @@ public TabletAvailability getAvailability() {
return this.availability;
}

public NanoTime getCreationTime() {
return creationTime;
/**
* @return a timer that was started when this object was created
*/
public Timer getCreationTimer() {
return creationTimer;
}

public boolean wasHostingRequested() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.time.NanoTime;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.slf4j.Logger;
Expand Down Expand Up @@ -238,14 +237,14 @@ public <T extends Mutation> void binMutations(ClientContext context, List<T> mut
// Want to ignore any entries in the cache w/o a location that were created before the
// following time. Entries created after the following time may have been populated by the
// following loop, and we want to use those.
var cacheCutoff = NanoTime.now();
Timer cacheCutoffTimer = Timer.startNew();

for (T mutation : notInCache) {

row.set(mutation.getRow());

CachedTablet tl = _findTablet(context, row, false, false, false, lcSession,
LocationNeed.REQUIRED, cacheCutoff);
LocationNeed.REQUIRED, cacheCutoffTimer);

if (!addMutation(binnedMutations, mutation, tl, lcSession)) {
failures.add(mutation);
Expand Down Expand Up @@ -328,7 +327,7 @@ private List<Range> findTablets(ClientContext context, List<Range> ranges,
// Use anything in the cache w/o a location populated after this point in time. Cache entries
// w/o a location created before the following time should be ignored and the metadata table
// consulted.
var cacheCutoff = NanoTime.now();
Timer cacheCutoffTimer = Timer.startNew();

l1: for (Range range : ranges) {

Expand All @@ -348,7 +347,7 @@ private List<Range> findTablets(ClientContext context, List<Range> ranges,
tl = lcSession.checkLock(findTabletInCache(startRow));
} else {
tl = _findTablet(context, startRow, false, false, false, lcSession, locationNeed,
cacheCutoff);
cacheCutoffTimer);
}

if (tl == null) {
Expand All @@ -367,7 +366,7 @@ private List<Range> findTablets(ClientContext context, List<Range> ranges,
tl = lcSession.checkLock(findTabletInCache(row));
} else {
tl = _findTablet(context, tl.getExtent().endRow(), true, false, false, lcSession,
locationNeed, cacheCutoff);
locationNeed, cacheCutoffTimer);
}

if (tl == null) {
Expand Down Expand Up @@ -561,7 +560,7 @@ public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow,

LockCheckerSession lcSession = new LockCheckerSession();
CachedTablet tl =
_findTablet(context, row, skipRow, false, true, lcSession, locationNeed, NanoTime.now());
_findTablet(context, row, skipRow, false, true, lcSession, locationNeed, Timer.startNew());

if (timer != null) {
log.trace("tid={} Located tablet {} at {} in {}", Thread.currentThread().getId(),
Expand Down Expand Up @@ -613,7 +612,7 @@ private Map<KeyExtent,CachedTablet> findExtentsToHost(ClientContext context, int
// Use anything in the cache w/o a location populated after this point in time. Cache entries
// w/o a location created before the following time should be ignored and the metadata table
// consulted.
var cacheCutoff = NanoTime.now();
Timer cacheCutoffTimer = Timer.startNew();

for (int i = 0; i < hostAheadCount; i++) {
if (currTablet.endRow() == null || hostAheadRange
Expand All @@ -622,7 +621,7 @@ private Map<KeyExtent,CachedTablet> findExtentsToHost(ClientContext context, int
}

CachedTablet followingTablet = _findTablet(context, currTablet.endRow(), true, false, true,
lcSession, locationNeed, cacheCutoff);
lcSession, locationNeed, cacheCutoffTimer);

if (followingTablet == null) {
break;
Expand Down Expand Up @@ -684,14 +683,14 @@ private void requestTabletHosting(ClientContext context,

List<TKeyExtent> extentsToBringOnline = new ArrayList<>();
for (var cachedTablet : tabletsWithNoLocation) {
if (cachedTablet.getCreationTime().elapsed().compareTo(STALE_DURATION) < 0) {
if (cachedTablet.getCreationTimer().elapsed().compareTo(STALE_DURATION) < 0) {
if (cachedTablet.getAvailability() == TabletAvailability.ONDEMAND) {
if (!cachedTablet.wasHostingRequested()) {
extentsToBringOnline.add(cachedTablet.getExtent().toThrift());
log.trace("requesting ondemand tablet to be hosted {}", cachedTablet.getExtent());
} else {
log.trace("ignoring ondemand tablet that already has a hosting request in place {} {}",
cachedTablet.getExtent(), cachedTablet.getCreationTime().elapsed());
cachedTablet.getExtent(), cachedTablet.getCreationTimer().elapsed());
}
} else if (cachedTablet.getAvailability() == TabletAvailability.UNHOSTED) {
throw new InvalidTabletHostingRequestException("Extent " + cachedTablet.getExtent()
Expand Down Expand Up @@ -861,13 +860,13 @@ private CachedTablet findTabletInCache(Text row) {
}

/**
* @param cacheCutoff Tablets w/o locations are cached. When LocationNeed is REQUIRED, this cut
* off is used to determine if cached entries w/o a location should be used or of we should
* instead ignore them and reread the tablet information from the metadata table.
* @param cacheCutoffTimer Tablets w/o locations are cached. When LocationNeed is REQUIRED, this
* Timer value is used to determine if cached entries w/o a location should be used or of
* we should instead ignore them and reread the tablet information from the metadata table.
*/
protected CachedTablet _findTablet(ClientContext context, Text row, boolean skipRow,
boolean retry, boolean lock, LockCheckerSession lcSession, LocationNeed locationNeed,
NanoTime cacheCutoff) throws AccumuloException, AccumuloSecurityException,
Timer cacheCutoffTimer) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException, InvalidTabletHostingRequestException {

if (skipRow) {
Expand All @@ -889,9 +888,10 @@ protected CachedTablet _findTablet(ClientContext context, Text row, boolean skip
}

if (tl == null || (locationNeed == LocationNeed.REQUIRED && tl.getTserverLocation().isEmpty()
&& tl.getCreationTime().compareTo(cacheCutoff) < 0)) {
// not in cache OR the cached entry was created before the cut off time, so obtain info from
// metadata table
&& cacheCutoffTimer.startedAfter(tl.getCreationTimer()))) {

// not in cache OR the cutoff timer was started after when the cached entry timer was started,
// so obtain info from metadata table
if (lock) {
wLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.time.NanoTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,7 +69,7 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
};

protected final Set<FateId> reserved;
protected final Map<FateId,NanoTime> deferred;
protected final Map<FateId,CountDownTimer> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
private final FateIdGenerator fateIdGenerator;
Expand Down Expand Up @@ -171,11 +172,10 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {
synchronized (AbstractFateStore.this) {
var deferredTime = deferred.get(fateId);
if (deferredTime != null) {
if (deferredTime.elapsed().isNegative()) {
// negative elapsed time indicates the deferral time is in the future
return false;
} else {
if (deferredTime.isExpired()) {
deferred.remove(fateId);
} else {
return false;
}
}
return !reserved.contains(fateId);
Expand All @@ -194,9 +194,9 @@ public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> idConsumer) {
long waitTime = 5000;
synchronized (AbstractFateStore.this) {
if (!deferred.isEmpty()) {
var now = NanoTime.now();
waitTime = deferred.values().stream()
.mapToLong(nanoTime -> nanoTime.subtract(now).toMillis()).min().getAsLong();
.mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
.getAsLong();
}
}

Expand Down Expand Up @@ -420,7 +420,7 @@ public void unreserve(Duration deferTime) {
deferredOverflow.set(true);
deferred.clear();
} else {
deferred.put(fateId, NanoTime.nowPlus(deferTime));
deferred.put(fateId, CountDownTimer.startNew(deferTime));
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.core.util.time.NanoTime;
import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -307,18 +307,18 @@ private void undo(FateId fateId, Repo<T> op) {
}

protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
var startTime = NanoTime.now();
var startTime = Timer.startNew();
var deferTime = op.isReady(fateId, environment);
log.debug("Running {}.isReady() {} took {} ms and returned {}", op.getName(), fateId,
startTime.elapsed().toMillis(), deferTime);
startTime.elapsed(MILLISECONDS), deferTime);
return deferTime;
}

protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception {
var startTime = NanoTime.now();
var startTime = Timer.startNew();
var next = op.call(fateId, environment);
log.debug("Running {}.call() {} took {} ms and returned {}", op.getName(), fateId,
startTime.elapsed().toMillis(), next == null ? "null" : next.getName());
startTime.elapsed(MILLISECONDS), next == null ? "null" : next.getName());

return next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,8 +33,8 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UuidUtil;
import org.apache.accumulo.core.util.time.NanoTime;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -559,11 +560,11 @@ public synchronized void unlock() throws InterruptedException, KeeperException {
ZooUtil.recursiveDelete(zooKeeper, pathToDelete, NodeMissingPolicy.SKIP);

// Wait for the delete to happen on the server before exiting method
NanoTime start = NanoTime.now();
Timer start = Timer.startNew();
while (zooKeeper.exists(pathToDelete, null) != null) {
Thread.onSpinWait();
if (NanoTime.now().subtract(start).toSeconds() > 10) {
start = NanoTime.now();
if (start.hasElapsed(10, SECONDS)) {
start.restart();
LOG.debug("[{}] Still waiting for zookeeper to delete all at {}", vmLockPrefix,
pathToDelete);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,13 @@
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@link #METRICS_MANAGER_COMPACTION_SVC_ERRORS}</td>
* <td>Gauge</td>
* <td></td>
* </tr>
* <tr>
* <td>N/A</td>
* <td>N/A</td>
* <td>{@link #METRICS_MANAGER_USER_TGW_ERRORS}</td>
* <td>Gauge</td>
* <td></td>
Expand Down Expand Up @@ -700,6 +707,8 @@ public interface MetricsProducer {
String METRICS_MANAGER_ROOT_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tabletmgmt.root.errors";
String METRICS_MANAGER_META_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tabletmgmt.meta.errors";
String METRICS_MANAGER_USER_TGW_ERRORS = METRICS_MANAGER_PREFIX + "tabletmgmt.user.errors";
String METRICS_MANAGER_COMPACTION_SVC_ERRORS =
METRICS_MANAGER_PREFIX + "compaction.svc.misconfigured";

String METRICS_MAJC_PREFIX = "accumulo.compactions.majc.";
String METRICS_MAJC_QUEUED = METRICS_MAJC_PREFIX + "queued";
Expand Down
Loading

0 comments on commit 32f09d1

Please sign in to comment.