Skip to content

Commit

Permalink
configure early release of _partitionGroupConsumerSemaphore in Realti…
Browse files Browse the repository at this point in the history
…meSegmentDataManager
  • Loading branch information
rohityadav1993 committed Jun 13, 2024
1 parent 6303658 commit e9002c5
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ public void deleteSegmentFile() {
private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
private final CompletionMode _segmentCompletionMode;
private final List<String> _filteredMessageOffsets = new ArrayList<>();
private final boolean _allowPartialUpsertConsumptionDuringCommit;
private boolean _trackFilteredMessageOffsets = false;

// TODO each time this method is called, we print reason for stop. Good to print only once.
Expand Down Expand Up @@ -978,7 +979,12 @@ AtomicBoolean getAcquiredConsumerSemaphore() {

@VisibleForTesting
SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
closeStreamConsumers();
// for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload()
// to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is
// complete.
if (_allowPartialUpsertConsumptionDuringCommit) {
closeStreamConsumers();
}
// Do not allow building segment when table data manager is already shut down
if (_realtimeTableDataManager.isShutDown()) {
_segmentLogger.warn("Table data manager is already shut down");
Expand Down Expand Up @@ -1335,7 +1341,12 @@ public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata)

protected void downloadSegmentAndReplace(SegmentZKMetadata segmentZKMetadata)
throws Exception {
closeStreamConsumers();
// for partial upsert tables, do not release _partitionGroupConsumerSemaphore proactively and rely on offload()
// to release the semaphore. This ensures new consuming segment is not consuming until the segment replacement is
// complete.
if (_allowPartialUpsertConsumptionDuringCommit) {
closeStreamConsumers();
}
_realtimeTableDataManager.downloadAndReplaceConsumingSegment(segmentZKMetadata);
}

Expand Down Expand Up @@ -1611,6 +1622,10 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_segmentLogger
.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", llcSegmentName,
_segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC));
_allowPartialUpsertConsumptionDuringCommit =
_realtimeTableDataManager.isPartialUpsertEnabled() ? _tableConfig.getUpsertConfig() != null
&& _tableConfig.getUpsertConfig()
.isAllowPartialUpsertConsumptionDuringCommit() : true;
} catch (Exception e) {
// In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ private TableUpsertMetadataManagerFactory() {
public static final String UPSERT_DEFAULT_ENABLE_SNAPSHOT = "default.enable.snapshot";
public static final String UPSERT_DEFAULT_ENABLE_PRELOAD = "default.enable.preload";

public static final String UPSERT_DEFAULT_ALLOW_PARTIAL_UPSERT_CONSUMPTION_DURING_COMMIT =
"default.allow.partial.upsert.consumption.during.commit";

public static TableUpsertMetadataManager create(TableConfig tableConfig,
@Nullable PinotConfiguration instanceUpsertConfig) {
String tableNameWithType = tableConfig.getTableName();
Expand All @@ -61,6 +64,12 @@ public static TableUpsertMetadataManager create(TableConfig tableConfig,
upsertConfig.setEnablePreload(
Boolean.parseBoolean(instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ENABLE_PRELOAD, "false")));
}

// server level config honoured only when table level config is not set to true
if (!upsertConfig.isAllowPartialUpsertConsumptionDuringCommit()) {
upsertConfig.setAllowPartialUpsertConsumptionDuringCommit(Boolean.parseBoolean(
instanceUpsertConfig.getProperty(UPSERT_DEFAULT_ALLOW_PARTIAL_UPSERT_CONSUMPTION_DURING_COMMIT, "false")));
}
}

if (StringUtils.isNotEmpty(metadataManagerClass)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public enum ConsistencyMode {
@JsonPropertyDescription("Whether to drop out-of-order record")
private boolean _dropOutOfOrderRecord;

@JsonPropertyDescription("Whether to pause partial upsert table's partition consumption during commit")
private boolean _allowPartialUpsertConsumptionDuringCommit;

public UpsertConfig(Mode mode) {
_mode = mode;
}
Expand Down Expand Up @@ -274,4 +277,13 @@ public void setMetadataManagerClass(String metadataManagerClass) {
public void setMetadataManagerConfigs(Map<String, String> metadataManagerConfigs) {
_metadataManagerConfigs = metadataManagerConfigs;
}

public void setAllowPartialUpsertConsumptionDuringCommit(
boolean allowPartialUpsertConsumptionDuringCommit) {
_allowPartialUpsertConsumptionDuringCommit = allowPartialUpsertConsumptionDuringCommit;
}

public boolean isAllowPartialUpsertConsumptionDuringCommit() {
return _allowPartialUpsertConsumptionDuringCommit;
}
}

0 comments on commit e9002c5

Please sign in to comment.