Skip to content

Commit

Permalink
Rename DeadlineTracker to TimeoutTracker
Browse files Browse the repository at this point in the history
A deadline is the latest time by which something should be completed.
It's an instant, like "next Tuesday at noon". A timeout is an interval
of time allowed for some event to occur or be completed. It's a delta,
like "15 minutes". The DeadlineTracker tracked a "relative deadline",
relative to when the instance's "deadline" was set. That's actually a
timeout. This patch harmonizes the names to the concepts.

Change-Id: I3f465c925856390ecf4747e84bdd5a67c51c69eb
Reviewed-on: http://gerrit.cloudera.org:8080/12373
Reviewed-by: Alexey Serbin <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Will Berkeley <[email protected]>
  • Loading branch information
wdberkeley committed Feb 28, 2019
1 parent 8f85019 commit 8fbf1cc
Show file tree
Hide file tree
Showing 26 changed files with 232 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Pair<AlterTableResponse, Object> deserialize(final CallResponse callResponse,
final AlterTableResponsePB.Builder respBuilder = AlterTableResponsePB.newBuilder();
readProtobuf(callResponse.getPBMessage(), respBuilder);
AlterTableResponse response = new AlterTableResponse(
deadlineTracker.getElapsedMillis(),
timeoutTracker.getElapsedMillis(),
tsUUID,
respBuilder.hasTableId() ? respBuilder.getTableId().toStringUtf8() : null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ public Void call(Exception arg) {
RecoverableException ex = (RecoverableException)arg;
long sleepTime = getSleepTimeForRpcMillis(fakeRpc);
if (cannotRetryRequest(fakeRpc) ||
fakeRpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
fakeRpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) {
tooManyAttemptsOrTimeout(fakeRpc, ex); // invokes fakeRpc.Deferred
return null;
}
Expand Down Expand Up @@ -1579,7 +1579,7 @@ public void run(final Timeout timeout) {
}

long sleepTimeMillis = getSleepTimeForRpcMillis(rpc);
if (rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
if (rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
tooManyAttemptsOrTimeout(rpc, null);
return;
}
Expand Down Expand Up @@ -1609,7 +1609,7 @@ public void run(final Timeout timeout) {
}

long sleepTimeMillis = getSleepTimeForRpcMillis(rpc);
if (rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
if (rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTimeMillis)) {
tooManyAttemptsOrTimeout(rpc, null);
return;
}
Expand Down Expand Up @@ -1664,7 +1664,7 @@ void emptyTabletsCacheForTable(String tableId) {
* {@code false} otherwise (in which case it's OK to retry once more)
*/
private static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
return rpc.timeoutTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
}

/**
Expand Down Expand Up @@ -1722,7 +1722,7 @@ private Deferred<Master.GetTableLocationsResponsePB> locateTablet(KuduTable tabl
d = getMasterTableLocationsPB(parentRpc);
} else {
long timeoutMillis = parentRpc == null ? defaultAdminOperationTimeoutMs :
parentRpc.deadlineTracker.getMillisBeforeDeadline();
parentRpc.timeoutTracker.getMillisBeforeTimeout();
// Leave the end of the partition key range empty in order to pre-fetch tablet locations.
GetTableLocationsRequest rpc =
new GetTableLocationsRequest(masterTable,
Expand Down Expand Up @@ -1818,7 +1818,7 @@ private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table,
final byte[] endPartitionKey,
final int fetchBatchSize,
final List<LocatedTablet> ret,
final DeadlineTracker deadlineTracker) {
final TimeoutTracker timeoutTracker) {
// We rely on the keys initially not being empty.
Preconditions.checkArgument(startPartitionKey == null || startPartitionKey.length > 0,
"use null for unbounded start partition key");
Expand Down Expand Up @@ -1846,9 +1846,9 @@ private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table,
continue;
}

if (deadlineTracker.timedOut()) {
if (timeoutTracker.timedOut()) {
Status statusTimedOut = Status.TimedOut("Took too long getting the list of tablets, " +
deadlineTracker);
timeoutTracker);
return Deferred.fromError(new NonRecoverableException(statusTimedOut));
}

Expand All @@ -1861,7 +1861,7 @@ private Deferred<List<LocatedTablet>> loopLocateTable(final KuduTable table,
// Build a fake RPC to encapsulate and propagate the timeout. There's no actual "RPC" to send.
KuduRpc fakeRpc = buildFakeRpc("loopLocateTable",
null,
deadlineTracker.getMillisBeforeDeadline());
timeoutTracker.getMillisBeforeTimeout());

return locateTablet(table, key, fetchBatchSize, fakeRpc).addCallbackDeferring(
new Callback<Deferred<List<LocatedTablet>>, GetTableLocationsResponsePB>() {
Expand All @@ -1872,7 +1872,7 @@ public Deferred<List<LocatedTablet>> call(GetTableLocationsResponsePB resp) {
endPartitionKey,
fetchBatchSize,
ret,
deadlineTracker);
timeoutTracker);
}

@Override
Expand Down Expand Up @@ -1904,14 +1904,14 @@ Deferred<List<LocatedTablet>> locateTable(final KuduTable table,
int fetchBatchSize,
long deadline) {
final List<LocatedTablet> ret = Lists.newArrayList();
final DeadlineTracker deadlineTracker = new DeadlineTracker();
deadlineTracker.setDeadline(deadline);
final TimeoutTracker timeoutTracker = new TimeoutTracker();
timeoutTracker.setTimeout(deadline);
return loopLocateTable(table,
startPartitionKey,
endPartitionKey,
fetchBatchSize,
ret,
deadlineTracker);
timeoutTracker);
}

/**
Expand Down Expand Up @@ -1998,7 +1998,7 @@ public void run(final Timeout timeout) {

long sleepTime = getSleepTimeForRpcMillis(rpc);
if (cannotRetryRequest(rpc) ||
rpc.deadlineTracker.wouldSleepingTimeoutMillis(sleepTime)) {
rpc.timeoutTracker.wouldSleepingTimeoutMillis(sleepTime)) {
// Don't let it retry.
return tooManyAttemptsOrTimeout(rpc, ex);
}
Expand Down Expand Up @@ -2215,7 +2215,7 @@ enum LookupType {
Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
final byte[] partitionKey,
final LookupType lookupType,
long deadline) {
long timeout) {

// Locate the tablet at the partition key by locating tablets between
// the partition key (inclusive), and the incremented partition key (exclusive).
Expand All @@ -2230,10 +2230,10 @@ Deferred<LocatedTablet> getTabletLocation(final KuduTable table,
endPartitionKey = Arrays.copyOf(partitionKey, partitionKey.length + 1);
}

final DeadlineTracker deadlineTracker = new DeadlineTracker();
deadlineTracker.setDeadline(deadline);
final TimeoutTracker timeoutTracker = new TimeoutTracker();
timeoutTracker.setTimeout(timeout);
Deferred<List<LocatedTablet>> locatedTablets = locateTable(
table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, deadline);
table, startPartitionKey, endPartitionKey, FETCH_TABLETS_PER_POINT_LOOKUP, timeout);

// Then pick out the single tablet result from the list.
return locatedTablets.addCallbackDeferring(
Expand Down Expand Up @@ -2265,7 +2265,7 @@ public Deferred<LocatedTablet> call(List<LocatedTablet> tablets) {
// This is a LOWER_BOUND lookup, get the tablet location from the upper bound key
// of the non-covered range to return the next valid tablet location.
return getTabletLocation(table, entry.getUpperBoundPartitionKey(),
LookupType.POINT, deadlineTracker.getMillisBeforeDeadline());
LookupType.POINT, timeoutTracker.getMillisBeforeTimeout());
}
return Deferred.fromResult(new LocatedTablet(entry.getTablet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ Pair<Response, Object> deserialize(final CallResponse callResponse,
}
}
RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
deadlineTracker.getElapsedMillis(), tsUUID, schema, resp.getData(), callResponse);
timeoutTracker.getElapsedMillis(), tsUUID, schema, resp.getData(), callResponse);

boolean hasMore = resp.getHasMoreResults();
if (id.length != 0 && scannerId != null && !Bytes.equals(scannerId, id)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class Batch extends KuduRpc<BatchResponse> {
* @param timeoutMillis the new timeout of the batch in milliseconds
*/
void resetTimeoutMillis(Timer timer, long timeoutMillis) {
deadlineTracker.reset();
deadlineTracker.setDeadline(timeoutMillis);
timeoutTracker.reset();
timeoutTracker.setTimeout(timeoutMillis);
if (timeoutTask != null) {
timeoutTask.cancel();
}
Expand Down Expand Up @@ -145,7 +145,7 @@ Pair<BatchResponse, Object> deserialize(CallResponse callResponse,
}
}

BatchResponse response = new BatchResponse(deadlineTracker.getElapsedMillis(),
BatchResponse response = new BatchResponse(timeoutTracker.getElapsedMillis(),
tsUUID,
builder.getTimestamp(),
errorsPB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class BatchResponse extends KuduRpcResponse {
currentErrorIndex++;
}
individualResponses.add(
new OperationResponse(currentOperation.deadlineTracker.getElapsedMillis(),
new OperationResponse(currentOperation.timeoutTracker.getElapsedMillis(),
tsUUID,
writeTimestamp,
currentOperation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private static Deferred<ConnectToMasterResponsePB> connectToMaster(
// TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
// basically reuse in some way the master permits.
long timeoutMillis = parentRpc == null ? defaultTimeoutMs :
parentRpc.deadlineTracker.getMillisBeforeDeadline();
parentRpc.timeoutTracker.getMillisBeforeTimeout();
final ConnectToMasterRequest rpc = new ConnectToMasterRequest(masterTable, timer, timeoutMillis);
rpc.setParentRpc(parentRpc);
Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Pair<CreateTableResponse, Object> deserialize(final CallResponse callResponse,
readProtobuf(callResponse.getPBMessage(), builder);
CreateTableResponse response =
new CreateTableResponse(
deadlineTracker.getElapsedMillis(),
timeoutTracker.getElapsedMillis(),
tsUUID,
builder.getTableId().toStringUtf8());
return new Pair<CreateTableResponse, Object>(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Pair<DeleteTableResponse, Object> deserialize(CallResponse callResponse,
final Master.DeleteTableResponsePB.Builder builder = Master.DeleteTableResponsePB.newBuilder();
readProtobuf(callResponse.getPBMessage(), builder);
DeleteTableResponse response =
new DeleteTableResponse(deadlineTracker.getElapsedMillis(), tsUUID);
new DeleteTableResponse(timeoutTracker.getElapsedMillis(), tsUUID);
return new Pair<DeleteTableResponse, Object>(
response, builder.hasError() ? builder.getError() : null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Pair<GetTableSchemaResponse, Object> deserialize(CallResponse callResponse,
readProtobuf(callResponse.getPBMessage(), respBuilder);
Schema schema = ProtobufHelper.pbToSchema(respBuilder.getSchema());
GetTableSchemaResponse response = new GetTableSchemaResponse(
deadlineTracker.getElapsedMillis(),
timeoutTracker.getElapsedMillis(),
tsUUID,
schema,
respBuilder.getTableId().toStringUtf8(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Pair<IsAlterTableDoneResponse, Object> deserialize(final CallResponse callRespon
String tsUUID) throws KuduException {
final IsAlterTableDoneResponsePB.Builder respBuilder = IsAlterTableDoneResponsePB.newBuilder();
readProtobuf(callResponse.getPBMessage(), respBuilder);
IsAlterTableDoneResponse resp = new IsAlterTableDoneResponse(deadlineTracker.getElapsedMillis(),
IsAlterTableDoneResponse resp = new IsAlterTableDoneResponse(timeoutTracker.getElapsedMillis(),
tsUUID,
respBuilder.getDone());
return new Pair<IsAlterTableDoneResponse, Object>(
Expand Down
Loading

0 comments on commit 8fbf1cc

Please sign in to comment.