Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Apr 8, 2024
1 parent 88557ee commit 9fa02d6
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.pinot.controller.helix;

import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -43,33 +40,31 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;


public class RealtimeConsumerMonitorTest {

@Test
public void realtimeBasicTest()
throws Exception {
final String tableName = "myTable_REALTIME";
final String rawTableName = TableNameBuilder.extractRawTableName(tableName);
List<String> allTableNames = new ArrayList<String>();
allTableNames.add(tableName);
String rawTableName = "myTable";
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn")
new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn")
.setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build();

LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis());
LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis());
LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2, 0, System.currentTimeMillis());
IdealState idealState = new IdealState(tableName);
IdealState idealState = new IdealState(realtimeTableName);
idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE");
idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE");
idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING");
Expand All @@ -79,7 +74,7 @@ public void realtimeBasicTest()
idealState.setReplicas("3");
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);

ExternalView externalView = new ExternalView(tableName);
ExternalView externalView = new ExternalView(realtimeTableName);
externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE");
externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE");
externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING");
Expand All @@ -91,13 +86,11 @@ public void realtimeBasicTest()
{
helixResourceManager = mock(PinotHelixResourceManager.class);
ZkHelixPropertyStore<ZNRecord> helixPropertyStore = mock(ZkHelixPropertyStore.class);
when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
when(helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig);
when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore);
when(helixResourceManager.getDatabaseNames())
.thenReturn(Collections.singletonList(DEFAULT_DATABASE));
when(helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames);
when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState);
when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView);
when(helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName));
when(helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState);
when(helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView);
ZNRecord znRecord = new ZNRecord("0");
znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000");
when(helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord);
Expand All @@ -121,61 +114,53 @@ public void realtimeBasicTest()
// So, the consumer monitor should show: 1. partition-1 has 0 lag; partition-2 has some non-zero lag.
// Segment 1 in replicas:
TreeMap<String, List<ConsumingSegmentInfoReader.ConsumingSegmentInfo>> response = new TreeMap<>();
List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part1ServerConsumingSegmentInfo = new ArrayList<>(2);
part1ServerConsumingSegmentInfo.add(
getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"));
part1ServerConsumingSegmentInfo.add(
getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0"));

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part1ServerConsumingSegmentInfo =
List.of(getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"),
getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0"));
response.put(segmentPartition1Seq1.getSegmentName(), part1ServerConsumingSegmentInfo);

// Segment 2 in replicas
List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part2ServerConsumingSegmentInfo = new ArrayList<>(2);
part2ServerConsumingSegmentInfo.add(
getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"));
part2ServerConsumingSegmentInfo.add(
getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000"));

List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> part2ServerConsumingSegmentInfo =
List.of(getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"),
getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000"));
response.put(segmentPartition2Seq0.getSegmentName(), part2ServerConsumingSegmentInfo);

ConsumingSegmentInfoReader consumingSegmentReader = mock(ConsumingSegmentInfoReader.class);
when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
.thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0));
when(consumingSegmentReader.getConsumingSegmentsInfo(realtimeTableName, 10000)).thenReturn(
new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0));
RealtimeConsumerMonitor realtimeConsumerMonitor =
new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager,
controllerMetrics, consumingSegmentReader);
new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, controllerMetrics,
consumingSegmentReader);
realtimeConsumerMonitor.start();
realtimeConsumerMonitor.run();
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1,

assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 1,
ControllerGauge.MAX_RECORDS_LAG), 0);
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2,
assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 2,
ControllerGauge.MAX_RECORDS_LAG), 40);
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1,
ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0);
Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2,
assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 1,
ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0);
assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 2,
ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000);
}

ConsumingSegmentInfoReader.ConsumingSegmentInfo getConsumingSegmentInfoForServer(String serverName,
String partitionId, String currentOffset, String upstreamLatestOffset, String availabilityLagMs) {
Map<String, String> currentOffsetMap = Collections.singletonMap(partitionId, currentOffset);
Map<String, String> latestUpstreamOffsetMap = Collections.singletonMap(partitionId, upstreamLatestOffset);
Map<String, String> recordsLagMap = Collections.singletonMap(partitionId, String.valueOf(
Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset)));
Map<String, String> availabilityLagMsMap = Collections.singletonMap(partitionId, availabilityLagMs);
Map<String, String> currentOffsetMap = Map.of(partitionId, currentOffset);
Map<String, String> latestUpstreamOffsetMap = Map.of(partitionId, upstreamLatestOffset);
Map<String, String> recordsLagMap =
Map.of(partitionId, String.valueOf(Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset)));
Map<String, String> availabilityLagMsMap = Map.of(partitionId, availabilityLagMs);

ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo =
new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap, latestUpstreamOffsetMap, recordsLagMap,
availabilityLagMsMap);
return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1,
currentOffsetMap, partitionOffsetInfo);
return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1, currentOffsetMap,
partitionOffsetInfo);
}

Map<String, String> getStreamConfigMap() {
return ImmutableMap.of(
"streamType", "kafka",
"stream.kafka.consumer.type", "simple",
"stream.kafka.topic.name", "test",
return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test",
"stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name",
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
Expand Down
Loading

0 comments on commit 9fa02d6

Please sign in to comment.