From e9002c506888379e12fc1b16129f6f7c8faeaac4 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Wed, 29 May 2024 14:18:31 +0530 Subject: [PATCH 1/2] configure early release of _partitionGroupConsumerSemaphore in RealtimeSegmentDataManager --- .../realtime/RealtimeSegmentDataManager.java | 19 +++++++++++++++++-- .../TableUpsertMetadataManagerFactory.java | 9 +++++++++ .../pinot/spi/config/table/UpsertConfig.java | 12 ++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 44cea7155d8f..916bbb225777 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -316,6 +316,7 @@ public void deleteSegmentFile() { private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime; private final CompletionMode _segmentCompletionMode; private final List _filteredMessageOffsets = new ArrayList<>(); + private final boolean _allowPartialUpsertConsumptionDuringCommit; private boolean _trackFilteredMessageOffsets = false; // TODO each time this method is called, we print reason for stop. Good to print only once. @@ -978,7 +979,12 @@ AtomicBoolean getAcquiredConsumerSemaphore() { @VisibleForTesting SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { - closeStreamConsumers(); + // for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload() + // to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is + // complete. + if (_allowPartialUpsertConsumptionDuringCommit) { + closeStreamConsumers(); + } // Do not allow building segment when table data manager is already shut down if (_realtimeTableDataManager.isShutDown()) { _segmentLogger.warn("Table data manager is already shut down"); @@ -1335,7 +1341,12 @@ public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata) protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) throws Exception { - closeStreamConsumers(); + // for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload() + // to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is + // complete. + if (_allowPartialUpsertConsumptionDuringCommit) { + closeStreamConsumers(); + } _realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata); } @@ -1611,6 +1622,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _segmentLogger .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); + _allowPartialUpsertConsumptionDuringCommit = + _realtimeTableDataManager.isPartialUpsertEnabled() ? _tableConfig.getUpsertConfig() != null + && _tableConfig.getUpsertConfig() + .isAllowPartialUpsertConsumptionDuringCommit() : true; } catch (Exception e) { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java index f3a3dff5ab9d..b6cc7480fbe2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java @@ -37,6 +37,9 @@ private TableUpsertMetadataManagerFactory() { public static final String UPSERT_DEFAULT_ENABLE_SNAPSHOT = "default.enable.snapshot"; public static final String UPSERT_DEFAULT_ENABLE_PRELOAD = "default.enable.preload"; + public static final String UPSERT_DEFAULT_ALLOW_PARTIAL_UPSERT_CONSUMPTION_DURING_COMMIT = + "default.allow.partial.upsert.consumption.during.commit"; + public static TableUpsertMetadataManager create(TableConfig tableConfig, @Nullable PinotConfiguration instanceUpsertConfig) { String tableNameWithType = tableConfig.getTableName(); @@ -61,6 +64,12 @@ public static TableUpsertMetadataManager create(TableConfig tableConfig, upsertConfig.setEnablePreload( Boolean.parseBoolean(instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD, "false"))); } + + // server level config honoured only when table level config is not set to true + if (!upsertConfig.isAllowPartialUpsertConsumptionDuringCommit()) { + upsertConfig.setAllowPartialUpsertConsumptionDuringCommit(Boolean.parseBoolean( + instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ALLOW_PARTIAL_UPSERT_CONSUMPTION_DURING_COMMIT, "false"))); + } } if (StringUtils.isNotEmpty(metadataManagerClass)) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java index fcab1ab85808..2355f0a790f2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java @@ -94,6 +94,9 @@ public enum ConsistencyMode { @JsonPropertyDescription("Whether to drop out-of-order record") private boolean _dropOutOfOrderRecord; + @JsonPropertyDescription("Whether to pause partial upsert table's partition consumption during commit") + private boolean _allowPartialUpsertConsumptionDuringCommit; + public UpsertConfig(Mode mode) { _mode = mode; } @@ -274,4 +277,13 @@ public void setMetadataManagerClass(String metadataManagerClass) { public void setMetadataManagerConfigs(Map metadataManagerConfigs) { _metadataManagerConfigs = metadataManagerConfigs; } + + public void setAllowPartialUpsertConsumptionDuringCommit( + boolean allowPartialUpsertConsumptionDuringCommit) { + _allowPartialUpsertConsumptionDuringCommit = allowPartialUpsertConsumptionDuringCommit; + } + + public boolean isAllowPartialUpsertConsumptionDuringCommit() { + return _allowPartialUpsertConsumptionDuringCommit; + } } From 69e28c2ee4ea483dc61cc931811b70493a2c44c8 Mon Sep 17 00:00:00 2001 From: Rohit Yadav Date: Fri, 14 Jun 2024 00:19:19 +0530 Subject: [PATCH 2/2] review comments resolution --- .../manager/realtime/RealtimeSegmentDataManager.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 916bbb225777..c29cb5bd51f1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -316,7 +316,7 @@ public void deleteSegmentFile() { private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime; private final CompletionMode _segmentCompletionMode; private final List _filteredMessageOffsets = new ArrayList<>(); - private final boolean _allowPartialUpsertConsumptionDuringCommit; + private final boolean _allowConsumptionDuringCommit; private boolean _trackFilteredMessageOffsets = false; // TODO each time this method is called, we print reason for stop. Good to print only once. @@ -982,7 +982,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { // for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload() // to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is // complete. - if (_allowPartialUpsertConsumptionDuringCommit) { + if (_allowConsumptionDuringCommit) { closeStreamConsumers(); } // Do not allow building segment when table data manager is already shut down @@ -1344,7 +1344,7 @@ protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata) // for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload() // to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is // complete. - if (_allowPartialUpsertConsumptionDuringCommit) { + if (_allowConsumptionDuringCommit) { closeStreamConsumers(); } _realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata); @@ -1622,10 +1622,8 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _segmentLogger .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); - _allowPartialUpsertConsumptionDuringCommit = - _realtimeTableDataManager.isPartialUpsertEnabled() ? _tableConfig.getUpsertConfig() != null - && _tableConfig.getUpsertConfig() - .isAllowPartialUpsertConsumptionDuringCommit() : true; + _allowConsumptionDuringCommit = !_realtimeTableDataManager.isPartialUpsertEnabled() ? true + : _tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit(); } catch (Exception e) { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.