Skip to content

Commit

Permalink
Allow stop to interrupt the consumer thread and safely release the re…
Browse files Browse the repository at this point in the history
…source (apache#13418)
  • Loading branch information
Jackie-Jiang authored and suyashpatel98 committed Jul 6, 2024
1 parent 431be12 commit 77d4ace
Showing 1 changed file with 55 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -774,38 +774,38 @@ public void run() {
// Keep this in memory, but wait for the online transition, and download when it comes in.
_state = State.DISCARDED;
break;
case KEEP:
case KEEP: {
if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
_state = State.DISCARDED;
break;
}
_state = State.RETAINING;
switch (_segmentCompletionMode) {
case DOWNLOAD:
_state = State.DISCARDED;
break;
case DEFAULT:
// Lock the segment to avoid multiple threads touching the same segment.
Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
segmentLock.lock();
try {
if (buildSegmentAndReplace()) {
_state = State.RETAINED;
break;
}
} finally {
segmentLock.unlock();
}
// Lock the segment to avoid multiple threads touching the same segment.
Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
// NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the
// CONSUMING -> ONLINE state transition.
segmentLock.lockInterruptibly();
try {
if (buildSegmentAndReplace()) {
_state = State.RETAINED;
} else {
// Could not build segment for some reason. We can only download it.
_state = State.ERROR;
_segmentLogger.error("Could not build segment for {}", _segmentNameStr);
break;
default:
break;
}
} finally {
segmentLock.unlock();
}
break;
case COMMIT:
}
case COMMIT: {
_state = State.COMMITTING;
_currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset);
// Lock the segment to avoid multiple threads touching the same segment.
Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr);
segmentLock.lock();
// NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the
// CONSUMING -> ONLINE state transition.
segmentLock.lockInterruptibly();
try {
long buildTimeSeconds = response.getBuildTimeSeconds();
buildSegmentForCommit(buildTimeSeconds * 1000L);
Expand All @@ -828,20 +828,26 @@ public void run() {
_segmentLogger.info("Could not commit segment. Retrying after hold");
hold();
break;
}
default:
_segmentLogger.error("Holding after response from Controller: {}", response.toJsonString());
hold();
break;
}
}
} catch (Exception e) {
String errorMessage = "Exception while in work";
_segmentLogger.error(errorMessage, e);
postStopConsumedMsg(e.getClass().getName());
_state = State.ERROR;
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
if (_shouldStop) {
_segmentLogger.info("Caught exception in consumer thread after stop() is invoked: {}, ignoring the exception",
e.toString());
} else {
String errorMessage = "Exception while in work";
_segmentLogger.error(errorMessage, e);
postStopConsumedMsg(e.getClass().getName());
_state = State.ERROR;
_realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
}

removeSegmentFile();
Expand Down Expand Up @@ -1193,12 +1199,9 @@ private void cleanupMetrics() {
_serverMetrics.removeTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING);
}

protected void hold() {
try {
Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS);
} catch (InterruptedException e) {
_segmentLogger.warn("Interrupted while holding");
}
protected void hold()
throws InterruptedException {
Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS);
}

private static class ConsumptionStopIndicator {
Expand Down Expand Up @@ -1269,13 +1272,11 @@ public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata)
// Remove the segment file before we do anything else.
removeSegmentFile();
_leaseExtender.removeSegment(_segmentNameStr);
final StreamPartitionMsgOffset endOffset =
_streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
_segmentLogger
.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
_startOffset, endOffset);
StreamPartitionMsgOffset endOffset = _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
_segmentLogger.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state,
_startOffset, endOffset);
stop();
_segmentLogger.info("Consumer thread stopped in state {}", _state.toString());
_segmentLogger.info("Consumer thread stopped in state {}", _state);

switch (_state) {
case COMMITTED:
Expand Down Expand Up @@ -1381,8 +1382,8 @@ private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long ti
public void doOffload() {
try {
stop();
} catch (InterruptedException e) {
_segmentLogger.error("Could not stop consumer thread");
} catch (Exception e) {
_segmentLogger.error("Caught exception while stopping the consumer thread", e);
}
closeStreamConsumers();
cleanupMetrics();
Expand All @@ -1402,19 +1403,21 @@ public void startConsumption() {

/**
* Stop the consuming thread.
*
* This method is invoked in 2 places:
* 1. By the Helix thread when handling the segment state transition from CONSUMING to ONLINE. When this method is
* invoked, Helix thread should already hold the segment lock, and the consumer thread is not building the segment.
* We can safely interrupt the consumer thread and wait for it to join.
* 2. By either the Helix thread or consumer thread to offload the segment. In this case, we can also safely interrupt
* the consumer thread because there is no need to build the segment.
*/
public void stop()
throws InterruptedException {
_shouldStop = true;
// This method could be called either when we get an ONLINE transition or
// when we commit a segment and replace the realtime segment with a committed
// one. In the latter case, we don't want to call join.
if (Thread.currentThread() != _consumerThread) {
Uninterruptibles.joinUninterruptibly(_consumerThread, 10, TimeUnit.MINUTES);

if (_consumerThread.isAlive()) {
_segmentLogger.warn("Failed to stop consumer thread within 10 minutes");
}
if (Thread.currentThread() != _consumerThread && _consumerThread.isAlive()) {
// Interrupt the consumer thread and wait for it to join.
_consumerThread.interrupt();
_consumerThread.join();
}
}

Expand Down

0 comments on commit 77d4ace

Please sign in to comment.