Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunpandianp committed Jan 24, 2025
1 parent fe8c772 commit da69bfb
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ synchronized ActivateWorkResult activateWorkForKey(ExecutableWork executableWork
if (executableWork.id().workToken() > queuedWork.id().workToken()) {
// Check to see if the queuedWork is active. We only want to remove it if it is NOT
// currently active.
if (!queuedWork.equals(Preconditions.checkNotNull(firstEntry(workQueue)).getValue())) {
if (!queuedWork.equals(Preconditions.checkNotNull(firstValue(workQueue)))) {
workIterator.remove();
decrementActiveWorkBudget(queuedWork.work());
}
Expand Down Expand Up @@ -253,8 +253,7 @@ private synchronized void removeCompletedWorkFromQueue(
@SuppressWarnings("ReferenceEquality")
private synchronized Optional<ExecutableWork> getNextWork(
LinkedHashMap<WorkId, ExecutableWork> workQueue, ShardedKey shardedKey) {
Optional<ExecutableWork> nextWork =
Optional.ofNullable(firstEntry(workQueue)).map(Entry::getValue);
Optional<ExecutableWork> nextWork = Optional.ofNullable(firstValue(workQueue));
if (!nextWork.isPresent()) {
Preconditions.checkState(workQueue == activeWork.remove(shardedKey.shardingKey()));
}
Expand All @@ -276,10 +275,9 @@ synchronized void invalidateStuckCommits(
}
}

private static @Nullable Entry<WorkId, ExecutableWork> firstEntry(
Map<WorkId, ExecutableWork> map) {
private static @Nullable ExecutableWork firstValue(Map<WorkId, ExecutableWork> map) {
Iterator<Entry<WorkId, ExecutableWork>> iterator = map.entrySet().iterator();
return iterator.hasNext() ? iterator.next() : null;
return iterator.hasNext() ? iterator.next().getValue() : null;
}

private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
Expand All @@ -288,9 +286,9 @@ private synchronized ImmutableMap<ShardedKey, WorkId> getStuckCommitsAt(
// activeWork as completeWork may delete the entry from activeWork.
ImmutableMap.Builder<ShardedKey, WorkId> stuckCommits = ImmutableMap.builder();
for (Entry<Long, LinkedHashMap<WorkId, ExecutableWork>> entry : activeWork.entrySet()) {
@Nullable Entry<WorkId, ExecutableWork> executableWork = firstEntry(entry.getValue());
@Nullable ExecutableWork executableWork = firstValue(entry.getValue());
if (executableWork != null) {
Work work = executableWork.getValue().work();
Work work = executableWork.work();
if (work.isStuckCommittingAt(stuckCommitDeadline)) {
LOG.error(
"Detected key {} stuck in COMMITTING state since {}, completing it with error.",
Expand Down Expand Up @@ -335,7 +333,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
for (Entry<Long, LinkedHashMap<WorkId, ExecutableWork>> entry : activeWork.entrySet()) {
LinkedHashMap<WorkId, ExecutableWork> workQueue =
Preconditions.checkNotNull(entry.getValue());
Work activeWork = Preconditions.checkNotNull(firstEntry(workQueue)).getValue().work();
Work activeWork = Preconditions.checkNotNull(firstValue(workQueue)).work();
WorkItem workItem = activeWork.getWorkItem();
if (activeWork.isCommitPending()) {
if (++commitsPendingCount >= MAX_PRINTABLE_COMMIT_PENDING_KEYS) {
Expand Down

0 comments on commit da69bfb

Please sign in to comment.