Skip to content

Commit

Permalink
skip missing segments while checking freshness during server startup
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Apr 18, 2024
1 parent 263f4f6 commit f1a9673
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private void registerServiceStatusHandler() {
realtimeMinFreshnessMs, idleTimeoutMs);
FreshnessBasedConsumptionStatusChecker freshnessStatusChecker =
new FreshnessBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments,
realtimeMinFreshnessMs, idleTimeoutMs);
this::getConsumingSegments, realtimeMinFreshnessMs, idleTimeoutMs);
Supplier<Integer> getNumConsumingSegmentsNotReachedMinFreshness =
freshnessStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
Expand All @@ -341,7 +341,8 @@ private void registerServiceStatusHandler() {
} else if (isOffsetBasedConsumptionStatusCheckerEnabled) {
LOGGER.info("Setting up offset based status checker");
OffsetBasedConsumptionStatusChecker consumptionStatusChecker =
new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments);
new OffsetBasedConsumptionStatusChecker(_serverInstance.getInstanceDataManager(), consumingSegments,
this::getConsumingSegments);
Supplier<Integer> getNumConsumingSegmentsNotReachedTheirLatestOffset =
consumptionStatusChecker::getNumConsumingSegmentsNotReachedIngestionCriteria;
serviceStatusCallbackListBuilder.add(
Expand All @@ -359,6 +360,27 @@ private void registerServiceStatusHandler() {
new ServiceStatus.MultipleCallbackServiceStatusCallback(serviceStatusCallbackListBuilder.build()));
}

private Set<String> getConsumingSegments() {
Set<String> consumingSegments = new HashSet<>();
for (String resourceName : _helixAdmin.getResourcesInCluster(_helixClusterName)) {
// Only monitor table resources
if (!TableNameBuilder.isTableResource(resourceName)) {
continue;
}
// Only monitor enabled realtime table
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, resourceName);
if (idealState.isEnabled() && TableNameBuilder.isRealtimeTableResource(resourceName)) {
for (String partitionName : idealState.getPartitionSet()) {
if (StateModel.SegmentStateModel.CONSUMING.equals(
idealState.getInstanceStateMap(partitionName).get(_instanceId))) {
consumingSegments.add(partitionName);
}
}
}
}
return consumingSegments;
}

private void updateInstanceConfigIfNeeded(ServerConf serverConf) {
InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixManager, _instanceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.pinot.server.starter.helix;

import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
Expand All @@ -39,7 +41,12 @@ public class FreshnessBasedConsumptionStatusChecker extends IngestionBasedConsum

public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments,
long minFreshnessMs, long idleTimeoutMs) {
super(instanceDataManager, consumingSegments);
this(instanceDataManager, consumingSegments, null, minFreshnessMs, idleTimeoutMs);
}

public FreshnessBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments,
@Nullable Supplier<Set<String>> consumingSegmentsSupplier, long minFreshnessMs, long idleTimeoutMs) {
super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
_minFreshnessMs = minFreshnessMs;
_idleTimeoutMs = idleTimeoutMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
Expand All @@ -37,25 +40,34 @@ public abstract class IngestionBasedConsumptionStatusChecker {

// constructor parameters
protected final InstanceDataManager _instanceDataManager;
protected final Set<String> _consumingSegments;
protected volatile Set<String> _consumingSegments;
protected final Supplier<Set<String>> _consumingSegmentsSupplier;

// helper variable
private final Set<String> _caughtUpSegments = new HashSet<>();
// helper variable, which is thread safe, as the method might be called from multiple threads when the health check
// endpoint is called by many probes.
private final Set<String> _caughtUpSegments = ConcurrentHashMap.newKeySet();

public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager,
Set<String> consumingSegments) {
/**
* Both consumingSegments and consumingSegmentsSupplier are provided as it can be costly to get consumingSegments
* via the supplier, so only use it when any missing segment is detected.
*/
public IngestionBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments,
@Nullable Supplier<Set<String>> consumingSegmentsSupplier) {
_instanceDataManager = instanceDataManager;
_consumingSegments = consumingSegments;
_consumingSegmentsSupplier = consumingSegmentsSupplier;
}

public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
Set<String> missingSegments = new HashSet<>();
for (String segName : _consumingSegments) {
if (_caughtUpSegments.contains(segName)) {
continue;
}
TableDataManager tableDataManager = getTableDataManager(segName);
if (tableDataManager == null) {
_logger.info("TableDataManager is not yet setup for segment {}. Will check consumption status later", segName);
missingSegments.add(segName);
continue;
}
SegmentDataManager segmentDataManager = null;
Expand All @@ -64,6 +76,7 @@ public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
if (segmentDataManager == null) {
_logger.info("SegmentDataManager is not yet setup for segment {}. Will check consumption status later",
segName);
missingSegments.add(segName);
continue;
}
if (!(segmentDataManager instanceof RealtimeSegmentDataManager)) {
Expand All @@ -84,6 +97,12 @@ public int getNumConsumingSegmentsNotReachedIngestionCriteria() {
}
}
}
if (!missingSegments.isEmpty() && _consumingSegmentsSupplier != null) {
_consumingSegments = _consumingSegmentsSupplier.get();
_caughtUpSegments.retainAll(_consumingSegments);
_logger.warn("Found missing segments: {}. Refreshed consumingSegments: {} and caughtUpSegments: {}",
missingSegments, _consumingSegments, _caughtUpSegments);
}
return _consumingSegments.size() - _caughtUpSegments.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.apache.pinot.server.starter.helix;

import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
Expand All @@ -35,7 +37,12 @@
public class OffsetBasedConsumptionStatusChecker extends IngestionBasedConsumptionStatusChecker {

public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments) {
super(instanceDataManager, consumingSegments);
this(instanceDataManager, consumingSegments, null);
}

public OffsetBasedConsumptionStatusChecker(InstanceDataManager instanceDataManager, Set<String> consumingSegments,
@Nullable Supplier<Set<String>> consumingSegmentsSupplier) {
super(instanceDataManager, consumingSegments, consumingSegmentsSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.pinot.server.starter.helix;

import com.google.common.collect.ImmutableSet;
import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
Expand Down Expand Up @@ -119,6 +120,57 @@ public void regularCaseWithOffsetCatchup() {
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0);
}

@Test
public void testWithDroppedTableAndSegment()
throws InterruptedException {
String segA0 = "tableA__0__0__123Z";
String segA1 = "tableA__1__0__123Z";
String segB0 = "tableB__0__0__123Z";
Set<String> consumingSegments = new HashSet<>();
consumingSegments.add(segA0);
consumingSegments.add(segA1);
consumingSegments.add(segB0);
Set<String> updatedConsumingSegments = new HashSet<>(consumingSegments);
InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
FreshnessBasedConsumptionStatusChecker statusChecker =
new FreshnessBasedConsumptionStatusChecker(instanceDataManager, consumingSegments,
// Create a new Set instance to keep statusChecker._consumingSegments and this Set separate.
() -> new HashSet<>(updatedConsumingSegments), 10L, 0L);

// TableDataManager is not set up yet
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3);

// setup TableDataMangers
TableDataManager tableDataManagerA = mock(TableDataManager.class);
when(instanceDataManager.getTableDataManager("tableA_REALTIME")).thenReturn(tableDataManagerA);
when(instanceDataManager.getTableDataManager("tableB_REALTIME")).thenReturn(null);

// setup SegmentDataManagers
RealtimeSegmentDataManager segMngrA0 = mock(RealtimeSegmentDataManager.class);
when(tableDataManagerA.acquireSegment(segA0)).thenReturn(segMngrA0);
when(tableDataManagerA.acquireSegment(segA1)).thenReturn(null);

when(segMngrA0.fetchLatestStreamOffset(5000)).thenReturn(new LongMsgOffset(20));
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(0));
// ensure negative values are ignored
setupLatestIngestionTimestamp(segMngrA0, Long.MIN_VALUE);

// current offset latest stream offset current time last ingestion time
// segA0 0 20 100 Long.MIN_VALUE
// segA1 (segment is absent)
// segB0 (table is absent)
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 3);

// updatedConsumingSegments still provide 3 segments to checker but one has caught up.
when(segMngrA0.getCurrentOffset()).thenReturn(new LongMsgOffset(20));
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 2);
// Remove the missing segments and check again.
updatedConsumingSegments.remove(segA1);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 1);
updatedConsumingSegments.remove(segB0);
assertEquals(statusChecker.getNumConsumingSegmentsNotReachedIngestionCriteria(), 0);
}

private void setupLatestIngestionTimestamp(RealtimeSegmentDataManager segmentDataManager,
long latestIngestionTimestamp) {
MutableSegment mockSegment = mock(MutableSegment.class);
Expand Down

0 comments on commit f1a9673

Please sign in to comment.