From 9fa02d67a69e75014d88d367c3662490d48f2743 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Sun, 7 Apr 2024 19:01:31 -0700 Subject: [PATCH] Fix tests --- .../helix/RealtimeConsumerMonitorTest.java | 85 ++-- .../helix/SegmentStatusCheckerTest.java | 425 ++++++++---------- .../ControllerPeriodicTaskTest.java | 20 +- .../core/retention/RetentionManagerTest.java | 71 +-- 4 files changed, 262 insertions(+), 339 deletions(-) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java index 051fd784b685..26928deb3c0f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java @@ -18,9 +18,6 @@ */ package org.apache.pinot.controller.helix; -import com.google.common.collect.ImmutableMap; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -43,15 +40,14 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; public class RealtimeConsumerMonitorTest { @@ -59,17 +55,16 @@ public class RealtimeConsumerMonitorTest { @Test public void realtimeBasicTest() throws Exception { - final String tableName = "myTable_REALTIME"; - final String rawTableName = TableNameBuilder.extractRawTableName(tableName); - List allTableNames = new ArrayList(); - allTableNames.add(tableName); + String rawTableName = "myTable"; + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn") + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn") .setNumReplicas(2).setStreamConfigs(getStreamConfigMap()).build(); + LLCSegmentName segmentPartition1Seq0 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); LLCSegmentName segmentPartition1Seq1 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); LLCSegmentName segmentPartition2Seq0 = new LLCSegmentName(rawTableName, 2, 0, System.currentTimeMillis()); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE"); idealState.setPartitionState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE"); idealState.setPartitionState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING"); @@ -79,7 +74,7 @@ public void realtimeBasicTest() idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(realtimeTableName); externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot1", "ONLINE"); externalView.setState(segmentPartition1Seq0.getSegmentName(), "pinot2", "ONLINE"); externalView.setState(segmentPartition1Seq1.getSegmentName(), "pinot1", "CONSUMING"); @@ -91,13 +86,11 @@ public void realtimeBasicTest() { helixResourceManager = mock(PinotHelixResourceManager.class); ZkHelixPropertyStore helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); + when(helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig); when(helixResourceManager.getPropertyStore()).thenReturn(helixPropertyStore); - when(helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView); ZNRecord znRecord = new ZNRecord("0"); znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); when(helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); @@ -121,61 +114,53 @@ public void realtimeBasicTest() // So, the consumer monitor should show: 1. partition-1 has 0 lag; partition-2 has some non-zero lag. // Segment 1 in replicas: TreeMap> response = new TreeMap<>(); - List part1ServerConsumingSegmentInfo = new ArrayList<>(2); - part1ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0")); - part1ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0")); - + List part1ServerConsumingSegmentInfo = + List.of(getConsumingSegmentInfoForServer("pinot1", "1", "100", "100", "0"), + getConsumingSegmentInfoForServer("pinot2", "1", "100", "100", "0")); response.put(segmentPartition1Seq1.getSegmentName(), part1ServerConsumingSegmentInfo); // Segment 2 in replicas - List part2ServerConsumingSegmentInfo = new ArrayList<>(2); - part2ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0")); - part2ServerConsumingSegmentInfo.add( - getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000")); - + List part2ServerConsumingSegmentInfo = + List.of(getConsumingSegmentInfoForServer("pinot1", "2", "120", "120", "0"), + getConsumingSegmentInfoForServer("pinot2", "2", "80", "120", "60000")); response.put(segmentPartition2Seq0.getSegmentName(), part2ServerConsumingSegmentInfo); ConsumingSegmentInfoReader consumingSegmentReader = mock(ConsumingSegmentInfoReader.class); - when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000)) - .thenReturn(new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0)); + when(consumingSegmentReader.getConsumingSegmentsInfo(realtimeTableName, 10000)).thenReturn( + new ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0)); RealtimeConsumerMonitor realtimeConsumerMonitor = - new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, - controllerMetrics, consumingSegmentReader); + new RealtimeConsumerMonitor(config, helixResourceManager, leadControllerManager, controllerMetrics, + consumingSegmentReader); realtimeConsumerMonitor.start(); realtimeConsumerMonitor.run(); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1, + + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 1, ControllerGauge.MAX_RECORDS_LAG), 0); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2, + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 2, ControllerGauge.MAX_RECORDS_LAG), 40); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1, - ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); - Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2, + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 1, + ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); + assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, realtimeTableName, 2, ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000); } ConsumingSegmentInfoReader.ConsumingSegmentInfo getConsumingSegmentInfoForServer(String serverName, String partitionId, String currentOffset, String upstreamLatestOffset, String availabilityLagMs) { - Map currentOffsetMap = Collections.singletonMap(partitionId, currentOffset); - Map latestUpstreamOffsetMap = Collections.singletonMap(partitionId, upstreamLatestOffset); - Map recordsLagMap = Collections.singletonMap(partitionId, String.valueOf( - Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset))); - Map availabilityLagMsMap = Collections.singletonMap(partitionId, availabilityLagMs); + Map currentOffsetMap = Map.of(partitionId, currentOffset); + Map latestUpstreamOffsetMap = Map.of(partitionId, upstreamLatestOffset); + Map recordsLagMap = + Map.of(partitionId, String.valueOf(Long.parseLong(upstreamLatestOffset) - Long.parseLong(currentOffset))); + Map availabilityLagMsMap = Map.of(partitionId, availabilityLagMs); ConsumingSegmentInfoReader.PartitionOffsetInfo partitionOffsetInfo = new ConsumingSegmentInfoReader.PartitionOffsetInfo(currentOffsetMap, latestUpstreamOffsetMap, recordsLagMap, availabilityLagMsMap); - return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1, - currentOffsetMap, partitionOffsetInfo); + return new ConsumingSegmentInfoReader.ConsumingSegmentInfo(serverName, "CONSUMING", -1, currentOffsetMap, + partitionOffsetInfo); } Map getStreamConfigMap() { - return ImmutableMap.of( - "streamType", "kafka", - "stream.kafka.consumer.type", "simple", - "stream.kafka.topic.name", "test", + return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name", "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index a1dd8f2697b0..3161c9da200a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -18,10 +18,6 @@ */ package org.apache.pinot.controller.helix; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -52,19 +48,21 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; public class SegmentStatusCheckerTest { + private final ExecutorService _executorService = Executors.newFixedThreadPool(1); + private SegmentStatusChecker _segmentStatusChecker; private PinotHelixResourceManager _helixResourceManager; private ZkHelixPropertyStore _helixPropertyStore; @@ -73,18 +71,15 @@ public class SegmentStatusCheckerTest { private ControllerMetrics _controllerMetrics; private ControllerConf _config; private TableSizeReader _tableSizeReader; - private ExecutorService _executorService = Executors.newFixedThreadPool(1); @Test public void offlineBasicTest() throws Exception { - final String tableName = "myTable_OFFLINE"; - List allTableNames = new ArrayList(); - allTableNames.add(tableName); + String offlineTableName = "myTable_OFFLINE"; TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNumReplicas(2).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(2).build(); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); @@ -101,7 +96,7 @@ public void offlineBasicTest() idealState.setReplicas("2"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(offlineTableName); externalView.setState("myTable_0", "pinot1", "ONLINE"); externalView.setState("myTable_0", "pinot2", "ONLINE"); externalView.setState("myTable_1", "pinot1", "ERROR"); @@ -114,27 +109,23 @@ public void offlineBasicTest() { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); } { _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); // Based on the lineage entries: {myTable_1 -> myTable_3, COMPLETED}, {myTable_3 -> myTable_4, IN_PROGRESS}, // myTable_1 and myTable_4 will be skipped for the metrics. - SegmentLineage segmentLineage = new SegmentLineage(tableName); + SegmentLineage segmentLineage = new SegmentLineage(offlineTableName); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), - new LineageEntry(Collections.singletonList("myTable_1"), Collections.singletonList("myTable_3"), - LineageEntryState.COMPLETED, 11111L)); + new LineageEntry(List.of("myTable_1"), List.of("myTable_3"), LineageEntryState.COMPLETED, 11111L)); segmentLineage.addLineageEntry(SegmentLineageUtils.generateLineageEntryId(), - new LineageEntry(Collections.singletonList("myTable_3"), Collections.singletonList("myTable_4"), - LineageEntryState.IN_PROGRESS, 11111L)); - when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + tableName), any(), eq(AccessOption.PERSISTENT))) - .thenReturn(segmentLineage.toZNRecord()); + new LineageEntry(List.of("myTable_3"), List.of("myTable_4"), LineageEntryState.IN_PROGRESS, 11111L)); + when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), any(), + eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord()); } { _config = mock(ControllerConf.class); @@ -158,40 +149,41 @@ public void offlineBasicTest() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, offlineTableName, ControllerGauge.REPLICATION_FROM_CONFIG), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENT_COUNT), 3); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENT_COUNT), + 3); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 66); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 2); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 66); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test public void realtimeBasicTest() throws Exception { - final String tableName = "myTable_REALTIME"; - final String rawTableName = TableNameBuilder.extractRawTableName(tableName); - List allTableNames = new ArrayList(); - allTableNames.add(tableName); + String rawTableName = "myTable"; + String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(rawTableName); TableConfig tableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName("timeColumn") + new TableConfigBuilder(TableType.REALTIME).setTableName(rawTableName).setTimeColumnName("timeColumn") .setNumReplicas(3).setStreamConfigs(getStreamConfigMap()).build(); - final LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); - final LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); - final LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, System.currentTimeMillis()); - IdealState idealState = new IdealState(tableName); + + LLCSegmentName seg1 = new LLCSegmentName(rawTableName, 1, 0, System.currentTimeMillis()); + LLCSegmentName seg2 = new LLCSegmentName(rawTableName, 1, 1, System.currentTimeMillis()); + LLCSegmentName seg3 = new LLCSegmentName(rawTableName, 2, 1, System.currentTimeMillis()); + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState(seg1.getSegmentName(), "pinot1", "ONLINE"); idealState.setPartitionState(seg1.getSegmentName(), "pinot2", "ONLINE"); idealState.setPartitionState(seg1.getSegmentName(), "pinot3", "ONLINE"); @@ -204,7 +196,7 @@ public void realtimeBasicTest() idealState.setReplicas("3"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(realtimeTableName); externalView.setState(seg1.getSegmentName(), "pinot1", "ONLINE"); externalView.setState(seg1.getSegmentName(), "pinot2", "ONLINE"); externalView.setState(seg1.getSegmentName(), "pinot3", "ONLINE"); @@ -218,13 +210,11 @@ public void realtimeBasicTest() { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); - when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); + when(_helixResourceManager.getTableConfig(realtimeTableName)).thenReturn(tableConfig); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(externalView); ZNRecord znRecord = new ZNRecord("0"); znRecord.setSimpleField(CommonConstants.Segment.Realtime.END_OFFSET, "10000"); when(_helixPropertyStore.get(anyString(), any(), anyInt())).thenReturn(znRecord); @@ -251,27 +241,25 @@ public void realtimeBasicTest() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.REPLICATION_FROM_CONFIG), 3); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.REPLICATION_FROM_CONFIG), 3); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 3); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 3); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } Map getStreamConfigMap() { - return ImmutableMap.of( - "streamType", "kafka", - "stream.kafka.consumer.type", "simple", - "stream.kafka.topic.name", "test", + return Map.of("streamType", "kafka", "stream.kafka.consumer.type", "simple", "stream.kafka.topic.name", "test", "stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder", "stream.kafka.consumer.factory.class.name", "org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory"); @@ -281,8 +269,7 @@ Map getStreamConfigMap() { public void missingEVPartitionTest() throws Exception { String offlineTableName = "myTable_OFFLINE"; - List allTableNames = new ArrayList(); - allTableNames.add(offlineTableName); + IdealState idealState = new IdealState(offlineTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); @@ -317,21 +304,19 @@ public void missingEVPartitionTest() ZkHelixPropertyStore propertyStore; { propertyStore = (ZkHelixPropertyStore) mock(ZkHelixPropertyStore.class); - when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, AccessOption.PERSISTENT)) - .thenReturn(znrecord); + when(propertyStore.get("/SEGMENTS/myTable_OFFLINE/myTable_3", null, AccessOption.PERSISTENT)).thenReturn( + znrecord); } { _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_3")) - .thenReturn(new SegmentZKMetadata(znrecord)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_3")).thenReturn( + new SegmentZKMetadata(znrecord)); } { _config = mock(ControllerConf.class); @@ -355,25 +340,25 @@ public void missingEVPartitionTest() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); } @Test public void missingEVTest() throws Exception { - final String tableName = "myTable_REALTIME"; - List allTableNames = new ArrayList(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + String realtimeTableName = "myTable_REALTIME"; + + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot2", "ONLINE"); idealState.setPartitionState("myTable_0", "pinot3", "ONLINE"); @@ -388,11 +373,9 @@ public void missingEVTest() _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -416,30 +399,28 @@ public void missingEVTest() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), + 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test public void missingIdealTest() throws Exception { - final String tableName = "myTable_REALTIME"; - List allTableNames = new ArrayList<>(); - allTableNames.add(tableName); + String realtimeTableName = "myTable_REALTIME"; { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(null); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(null); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -463,24 +444,24 @@ public void missingIdealTest() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.PERCENT_OF_REPLICAS)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, - ControllerGauge.TABLE_COMPRESSED_SIZE)); + assertFalse( + MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS)); + assertFalse( + MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS)); + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, + ControllerGauge.TABLE_COMPRESSED_SIZE)); } @Test public void missingEVPartitionPushTest() throws Exception { String offlineTableName = "myTable_OFFLINE"; - List allTableNames = new ArrayList(); - allTableNames.add(offlineTableName); + IdealState idealState = new IdealState(offlineTableName); idealState.setPartitionState("myTable_0", "pinot1", "ONLINE"); idealState.setPartitionState("myTable_1", "pinot1", "ONLINE"); @@ -527,15 +508,13 @@ public void missingEVPartitionPushTest() _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_0")) - .thenReturn(new SegmentZKMetadata(znrecord)); - when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_2")) - .thenReturn(new SegmentZKMetadata(znrecord2)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_0")).thenReturn( + new SegmentZKMetadata(znrecord)); + when(_helixResourceManager.getSegmentZKMetadata(offlineTableName, "myTable_2")).thenReturn( + new SegmentZKMetadata(znrecord2)); } { _config = mock(ControllerConf.class); @@ -559,27 +538,27 @@ public void missingEVPartitionPushTest() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.NUMBER_OF_REPLICAS), 2); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 2); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test public void noReplicas() throws Exception { - final String tableName = "myTable_REALTIME"; - List allTableNames = new ArrayList(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + String realtimeTableName = "myTable_REALTIME"; + + IdealState idealState = new IdealState(realtimeTableName); idealState.setPartitionState("myTable_0", "pinot1", "OFFLINE"); idealState.setPartitionState("myTable_0", "pinot2", "OFFLINE"); idealState.setPartitionState("myTable_0", "pinot3", "OFFLINE"); @@ -590,11 +569,9 @@ public void noReplicas() _helixResourceManager = mock(PinotHelixResourceManager.class); _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -618,26 +595,26 @@ public void noReplicas() _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_WITH_LESS_REPLICAS), 0); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS), 1); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), + 1); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS), + 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } @Test - public void disabledTableTest() - throws Exception { + public void disabledTableTest() { + String offlineTableName = "myTable_OFFLINE"; - final String tableName = "myTable_OFFLINE"; - List allTableNames = new ArrayList(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); // disable table in idealstate idealState.enable(false); idealState.setPartitionState("myTable_OFFLINE", "pinot1", "OFFLINE"); @@ -648,11 +625,9 @@ public void disabledTableTest() { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -669,23 +644,21 @@ public void disabledTableTest() _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); + // verify state before test - Assert.assertEquals( - MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 0); + // update metrics _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals( - MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); } @Test - public void disabledEmptyTableTest() - throws Exception { + public void disabledEmptyTableTest() { + String offlineTableName = "myTable_OFFLINE"; - final String tableName = "myTable_OFFLINE"; - List allTableNames = Lists.newArrayList(tableName); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); // disable table in idealstate idealState.enable(false); idealState.setReplicas("1"); @@ -693,11 +666,9 @@ public void disabledEmptyTableTest() { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -714,14 +685,14 @@ public void disabledEmptyTableTest() _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); + // verify state before test - Assert.assertFalse( - MetricValueUtils.globalGaugeExists(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT)); + assertFalse(MetricValueUtils.globalGaugeExists(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT)); + // update metrics _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals( - MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); } @Test @@ -734,22 +705,20 @@ public void noSegments() @Test public void lessThanOnePercentSegmentsUnavailableTest() - throws Exception { - String tableName = "myTable_OFFLINE"; - int numSegments = 200; - List allTableNames = new ArrayList(); - allTableNames.add(tableName); + throws Exception { + String offlineTableName = "myTable_OFFLINE"; TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setNumReplicas(1).build(); + new TableConfigBuilder(TableType.OFFLINE).setTableName(offlineTableName).setNumReplicas(1).build(); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(offlineTableName); + int numSegments = 200; for (int i = 0; i < numSegments; i++) { idealState.setPartitionState("myTable_" + i, "pinot1", "ONLINE"); } idealState.setReplicas("1"); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); - ExternalView externalView = new ExternalView(tableName); + ExternalView externalView = new ExternalView(offlineTableName); externalView.setState("myTable_0", "pinot1", "OFFLINE"); for (int i = 1; i < numSegments; i++) { externalView.setState("myTable_" + i, "pinot1", "ONLINE"); @@ -757,19 +726,17 @@ public void lessThanOnePercentSegmentsUnavailableTest() { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(externalView); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(offlineTableName)); + when(_helixResourceManager.getTableConfig(offlineTableName)).thenReturn(tableConfig); + when(_helixResourceManager.getTableIdealState(offlineTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(offlineTableName)).thenReturn(externalView); } { _helixPropertyStore = mock(ZkHelixPropertyStore.class); when(_helixResourceManager.getPropertyStore()).thenReturn(_helixPropertyStore); - SegmentLineage segmentLineage = new SegmentLineage(tableName); - when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + tableName), any(), eq(AccessOption.PERSISTENT))) - .thenReturn(segmentLineage.toZNRecord()); + SegmentLineage segmentLineage = new SegmentLineage(offlineTableName); + when(_helixPropertyStore.get(eq("/SEGMENT_LINEAGE/" + offlineTableName), any(), + eq(AccessOption.PERSISTENT))).thenReturn(segmentLineage.toZNRecord()); } { _config = mock(ControllerConf.class); @@ -788,37 +755,35 @@ public void lessThanOnePercentSegmentsUnavailableTest() _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = - new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, - _executorService); + new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, + _executorService); _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), - ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99); + + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 99); } public void noSegmentsInternal(final int nReplicas) throws Exception { - final String tableName = "myTable_REALTIME"; + String realtimeTableName = "myTable_REALTIME"; + String nReplicasStr = Integer.toString(nReplicas); int nReplicasExpectedValue = nReplicas; if (nReplicas < 0) { nReplicasStr = "abc"; nReplicasExpectedValue = 1; } - List allTableNames = new ArrayList(); - allTableNames.add(tableName); - IdealState idealState = new IdealState(tableName); + IdealState idealState = new IdealState(realtimeTableName); idealState.setReplicas(nReplicasStr); idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); { _helixResourceManager = mock(PinotHelixResourceManager.class); - when(_helixResourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_helixResourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(allTableNames); - when(_helixResourceManager.getTableIdealState(tableName)).thenReturn(idealState); - when(_helixResourceManager.getTableExternalView(tableName)).thenReturn(null); + when(_helixResourceManager.getAllTables()).thenReturn(List.of(realtimeTableName)); + when(_helixResourceManager.getTableIdealState(realtimeTableName)).thenReturn(idealState); + when(_helixResourceManager.getTableExternalView(realtimeTableName)).thenReturn(null); } { _config = mock(ControllerConf.class); @@ -843,15 +808,17 @@ public void noSegmentsInternal(final int nReplicas) _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, realtimeTableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE)); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.NUMBER_OF_REPLICAS), nReplicasExpectedValue); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, - ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.NUMBER_OF_REPLICAS), + nReplicasExpectedValue); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_OF_REPLICAS), + 100); + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, realtimeTableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index 15c3cf6d8197..f4e0eb46b14f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.helix.core.periodictask; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,7 +34,6 @@ import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -89,9 +87,7 @@ public void processTable(String tableNameWithType) { public void beforeTest() { List tables = new ArrayList<>(_numTables); IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE")); - when(_resourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(_resourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(tables); + when(_resourceManager.getAllTables()).thenReturn(tables); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } @@ -109,7 +105,6 @@ public void testRandomInitialDelay() { _task.getInitialDelayInSeconds() >= ControllerConf.ControllerPeriodicTasksConf.MIN_INITIAL_DELAY_IN_SECONDS); assertTrue( _task.getInitialDelayInSeconds() < ControllerConf.ControllerPeriodicTasksConf.MAX_INITIAL_DELAY_IN_SECONDS); - assertEquals(_task.getIntervalInSeconds(), RUN_FREQUENCY_IN_SECONDS); } @@ -124,7 +119,7 @@ public void testControllerPeriodicTaskCalls() { assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); // Run periodic task with leadership resetState(); @@ -133,8 +128,7 @@ public void testControllerPeriodicTaskCalls() { assertTrue(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), _numTables); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), - _numTables); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); @@ -145,7 +139,7 @@ public void testControllerPeriodicTaskCalls() { assertFalse(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); assertTrue(_stopTaskCalled.get()); assertFalse(_task.isStarted()); @@ -156,7 +150,7 @@ public void testControllerPeriodicTaskCalls() { assertFalse(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), 0); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); assertFalse(_stopTaskCalled.get()); assertFalse(_task.isStarted()); @@ -169,7 +163,7 @@ public void testControllerPeriodicTaskCalls() { assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); // Run periodic task with leadership resetState(); @@ -178,7 +172,7 @@ public void testControllerPeriodicTaskCalls() { assertTrue(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), _numTables); assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, - ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index ce5e31e5ef13..b3e656de9ead 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -19,7 +19,6 @@ package org.apache.pinot.controller.helix.core.retention; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -43,15 +42,13 @@ import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.mockito.ArgumentMatchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.testng.Assert; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.DEFAULT_DATABASE; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class RetentionManagerTest { @@ -60,8 +57,7 @@ public class RetentionManagerTest { private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME); - private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long dayAfterTomorrowTimeStamp) - throws Exception { + private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long dayAfterTomorrowTimeStamp) { List segmentsZKMetadata = new ArrayList<>(); // Create metadata for 10 segments really old, that will be removed by the retention manager. final int numOlderSegments = 10; @@ -105,8 +101,7 @@ private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long } @Test - public void testRetentionWithMinutes() - throws Exception { + public void testRetentionWithMinutes() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60; final long pastMinutesSinceEpoch = 22383360L; @@ -114,8 +109,7 @@ public void testRetentionWithMinutes() } @Test - public void testRetentionWithSeconds() - throws Exception { + public void testRetentionWithSeconds() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60; final long pastSecondsSinceEpoch = 1343001600L; @@ -123,8 +117,7 @@ public void testRetentionWithSeconds() } @Test - public void testRetentionWithMillis() - throws Exception { + public void testRetentionWithMillis() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 * 60 * 60 * 1000; final long pastMillisSinceEpoch = 1343001600000L; @@ -132,8 +125,7 @@ public void testRetentionWithMillis() } @Test - public void testRetentionWithHours() - throws Exception { + public void testRetentionWithHours() { final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24; final long pastHoursSinceEpoch = 373056L; @@ -141,8 +133,7 @@ public void testRetentionWithHours() } @Test - public void testRetentionWithDays() - throws Exception { + public void testRetentionWithDays() { final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 60 / 60 / 24 + 2; final long pastDaysSinceEpoch = 15544L; testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, daysSinceEpochTimeStamp); @@ -161,10 +152,8 @@ private TableConfig createRealtimeTableConfig1(int replicaCount) { private void setupPinotHelixResourceManager(TableConfig tableConfig, final List removedSegments, PinotHelixResourceManager resourceManager, LeadControllerManager leadControllerManager) { - final String tableNameWithType = tableConfig.getTableName(); - when(resourceManager.getDatabaseNames()) - .thenReturn(Collections.singletonList(DEFAULT_DATABASE)); - when(resourceManager.getAllTables(DEFAULT_DATABASE)).thenReturn(Collections.singletonList(tableNameWithType)); + String tableNameWithType = tableConfig.getTableName(); + when(resourceManager.getAllTables()).thenReturn(List.of(tableNameWithType)); ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); when(resourceManager.getPropertyStore()).thenReturn(propertyStore); @@ -172,38 +161,27 @@ private void setupPinotHelixResourceManager(TableConfig tableConfig, final List< SegmentDeletionManager deletionManager = mock(SegmentDeletionManager.class); // Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we only test that the call is made once per // run of the retention manager - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) - throws Throwable { - return null; - } - }).when(deletionManager).removeAgedDeletedSegments(leadControllerManager); + doAnswer(invocationOnMock -> null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager); when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager); // If and when PinotHelixResourceManager.deleteSegments() is invoked, make sure that the segments deleted // are exactly the same as the ones we expect to be deleted. - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) - throws Throwable { - Object[] args = invocationOnMock.getArguments(); - String tableNameArg = (String) args[0]; - Assert.assertEquals(tableNameArg, tableNameWithType); - List segmentListArg = (List) args[1]; - Assert.assertEquals(segmentListArg.size(), removedSegments.size()); - for (String segmentName : removedSegments) { - Assert.assertTrue(segmentListArg.contains(segmentName)); - } - return null; + doAnswer(invocationOnMock -> { + Object[] args = invocationOnMock.getArguments(); + String tableNameArg = (String) args[0]; + assertEquals(tableNameArg, tableNameWithType); + List segmentListArg = (List) args[1]; + assertEquals(segmentListArg.size(), removedSegments.size()); + for (String segmentName : removedSegments) { + assertTrue(segmentListArg.contains(segmentName)); } - }).when(resourceManager).deleteSegments(anyString(), ArgumentMatchers.anyList()); + return null; + }).when(resourceManager).deleteSegments(anyString(), anyList()); } // This test makes sure that we clean up the segments marked OFFLINE in realtime for more than 7 days @Test - public void testRealtimeLLCCleanup() - throws Exception { + public void testRealtimeLLCCleanup() { final int initialNumSegments = 8; final long now = System.currentTimeMillis(); @@ -237,8 +215,7 @@ public void testRealtimeLLCCleanup() // This test makes sure that we do not clean up last llc completed segments @Test - public void testRealtimeLastLLCCleanup() - throws Exception { + public void testRealtimeLastLLCCleanup() { final long now = System.currentTimeMillis(); final int replicaCount = 1;