From 902196079f77f9547f2ad65d3b9cf3777653c329 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Thu, 20 Jun 2024 09:22:36 -0700 Subject: [PATCH] Allow stop to interrupt the consumer thread and safely release the resource (#13418) --- .../realtime/RealtimeSegmentDataManager.java | 107 +++++++++--------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 8dc7842f8156..fc3903012339 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -760,38 +760,38 @@ public void run() { // Keep this in memory, but wait for the online transition, and download when it comes in. _state = State.DISCARDED; break; - case KEEP: + case KEEP: { + if (_segmentCompletionMode == CompletionMode.DOWNLOAD) { + _state = State.DISCARDED; + break; + } _state = State.RETAINING; - switch (_segmentCompletionMode) { - case DOWNLOAD: - _state = State.DISCARDED; - break; - case DEFAULT: - // Lock the segment to avoid multiple threads touching the same segment. - Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr); - segmentLock.lock(); - try { - if (buildSegmentAndReplace()) { - _state = State.RETAINED; - break; - } - } finally { - segmentLock.unlock(); - } + // Lock the segment to avoid multiple threads touching the same segment. + Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr); + // NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the + // CONSUMING -> ONLINE state transition. + segmentLock.lockInterruptibly(); + try { + if (buildSegmentAndReplace()) { + _state = State.RETAINED; + } else { // Could not build segment for some reason. We can only download it. _state = State.ERROR; _segmentLogger.error("Could not build segment for {}", _segmentNameStr); - break; - default: - break; + } + } finally { + segmentLock.unlock(); } break; - case COMMIT: + } + case COMMIT: { _state = State.COMMITTING; _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset); // Lock the segment to avoid multiple threads touching the same segment. Lock segmentLock = _realtimeTableDataManager.getSegmentLock(_segmentNameStr); - segmentLock.lock(); + // NOTE: We need to lock interruptibly because the lock might already be held by the Helix thread for the + // CONSUMING -> ONLINE state transition. + segmentLock.lockInterruptibly(); try { long buildTimeSeconds = response.getBuildTimeSeconds(); buildSegmentForCommit(buildTimeSeconds * 1000L); @@ -814,6 +814,7 @@ public void run() { _segmentLogger.info("Could not commit segment. Retrying after hold"); hold(); break; + } default: _segmentLogger.error("Holding after response from Controller: {}", response.toJsonString()); hold(); @@ -821,13 +822,18 @@ public void run() { } } } catch (Exception e) { - String errorMessage = "Exception while in work"; - _segmentLogger.error(errorMessage, e); - postStopConsumedMsg(e.getClass().getName()); - _state = State.ERROR; - _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); - _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0); - return; + if (_shouldStop) { + _segmentLogger.info("Caught exception in consumer thread after stop() is invoked: {}, ignoring the exception", + e.toString()); + } else { + String errorMessage = "Exception while in work"; + _segmentLogger.error(errorMessage, e); + postStopConsumedMsg(e.getClass().getName()); + _state = State.ERROR; + _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e)); + _serverMetrics.setValueOfTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING, 0); + return; + } } removeSegmentFile(); @@ -1174,12 +1180,9 @@ private void cleanupMetrics() { _serverMetrics.removeTableGauge(_clientId, ServerGauge.LLC_PARTITION_CONSUMING); } - protected void hold() { - try { - Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS); - } catch (InterruptedException e) { - _segmentLogger.warn("Interrupted while holding"); - } + protected void hold() + throws InterruptedException { + Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS); } private static class ConsumptionStopIndicator { @@ -1250,13 +1253,11 @@ public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata) // Remove the segment file before we do anything else. removeSegmentFile(); _leaseExtender.removeSegment(_segmentNameStr); - final StreamPartitionMsgOffset endOffset = - _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()); - _segmentLogger - .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(), - _startOffset, endOffset); + StreamPartitionMsgOffset endOffset = _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()); + _segmentLogger.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state, + _startOffset, endOffset); stop(); - _segmentLogger.info("Consumer thread stopped in state {}", _state.toString()); + _segmentLogger.info("Consumer thread stopped in state {}", _state); switch (_state) { case COMMITTED: @@ -1357,8 +1358,8 @@ private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long ti public void doOffload() { try { stop(); - } catch (InterruptedException e) { - _segmentLogger.error("Could not stop consumer thread"); + } catch (Exception e) { + _segmentLogger.error("Caught exception while stopping the consumer thread", e); } closeStreamConsumers(); cleanupMetrics(); @@ -1378,19 +1379,21 @@ public void startConsumption() { /** * Stop the consuming thread. + * + * This method is invoked in 2 places: + * 1. By the Helix thread when handling the segment state transition from CONSUMING to ONLINE. When this method is + * invoked, Helix thread should already hold the segment lock, and the consumer thread is not building the segment. + * We can safely interrupt the consumer thread and wait for it to join. + * 2. By either the Helix thread or consumer thread to offload the segment. In this case, we can also safely interrupt + * the consumer thread because there is no need to build the segment. */ public void stop() throws InterruptedException { _shouldStop = true; - // This method could be called either when we get an ONLINE transition or - // when we commit a segment and replace the realtime segment with a committed - // one. In the latter case, we don't want to call join. - if (Thread.currentThread() != _consumerThread) { - Uninterruptibles.joinUninterruptibly(_consumerThread, 10, TimeUnit.MINUTES); - - if (_consumerThread.isAlive()) { - _segmentLogger.warn("Failed to stop consumer thread within 10 minutes"); - } + if (Thread.currentThread() != _consumerThread && _consumerThread.isAlive()) { + // Interrupt the consumer thread and wait for it to join. + _consumerThread.interrupt(); + _consumerThread.join(); } }