From 544f86a1acaa3f1eeea3d4293bb6f2f6905c9f66 Mon Sep 17 00:00:00 2001 From: "Doroszlai, Attila" Date: Wed, 15 Jan 2025 14:23:24 +0100 Subject: [PATCH] HDDS-12088. Speed up TestStorageContainerManager --- .../hdds/scm/TestStorageContainerManager.java | 466 ++++++++---------- 1 file changed, 197 insertions(+), 269 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java index 445298058f9..382b6df9a0a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java @@ -21,9 +21,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.hdds.conf.DefaultConfigManager; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -89,30 +89,25 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServerConfigKeys; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; -import org.mockito.ArgumentMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.net.UnknownHostException; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -161,49 +156,59 @@ */ @Timeout(900) public class TestStorageContainerManager { + private static final int KEY_COUNT = 5; private static final String LOCALHOST_IP = "127.0.0.1"; - private static XceiverClientManager xceiverClientManager; private static final Logger LOG = LoggerFactory.getLogger( TestStorageContainerManager.class); - @BeforeAll - public static void setup() throws IOException { - xceiverClientManager = new XceiverClientManager(new OzoneConfiguration()); - } - - @AfterAll - public static void cleanup() { - if (xceiverClientManager != null) { - xceiverClientManager.close(); - } - } - - @AfterEach - public void cleanupDefaults() { - DefaultConfigManager.clearDefaultConfigs(); - } - + /** This runs most test cases in a single cluster. */ @Test - public void testRpcPermission() throws Exception { + void test(@TempDir Path tempDir) throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); - try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).build()) { - cluster.waitForClusterToBeReady(); + configureTopology(conf); + configureBlockDeletion(conf); + Path scmPath = tempDir.resolve("scm-meta"); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString()); - // Test with default configuration - testRpcPermission(cluster, "anyUser", true); + try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build()) { + cluster.waitForClusterToBeReady(); - // Update ozone.administrators in configuration - cluster.getStorageContainerManager() - .getReconfigurationHandler() - .reconfigureProperty(OzoneConfigKeys.OZONE_ADMINISTRATORS, "adminUser1, adminUser2"); + // non-destructive test cases + testBlockDeletionTransactions(cluster); + testRpcPermission(cluster); + testScmProcessDatanodeHeartbeat(cluster); - // Non-admin user will get permission denied. - testRpcPermission(cluster, "unknownUser", true); - // Admin user will pass the permission check. - testRpcPermission(cluster, "adminUser2", false); + StorageContainerManager scm = cluster.getStorageContainerManager(); + List directories = Arrays.asList( + new File(SCMHAUtils.getRatisStorageDir(scm.getConfiguration())), + scm.getScmMetadataStore().getStore().getDbLocation(), + new File(scm.getScmStorageConfig().getStorageDir()) + ); + + // re-init + testSCMReinitialization(cluster); + + // re-init after delete + directories.forEach(FileUtil::fullyDelete); + testOldDNRegistersToReInitialisedSCM(cluster); } } + private void testRpcPermission(MiniOzoneCluster cluster) throws Exception { + // Test with default configuration + testRpcPermission(cluster, "anyUser", true); + + // Update ozone.administrators in configuration + cluster.getStorageContainerManager() + .getReconfigurationHandler() + .reconfigureProperty(OzoneConfigKeys.OZONE_ADMINISTRATORS, "adminUser1, adminUser2"); + + // Non-admin user will get permission denied. + testRpcPermission(cluster, "unknownUser", true); + // Admin user will pass the permission check. + testRpcPermission(cluster, "adminUser2", false); + } + private void testRpcPermission(MiniOzoneCluster cluster, String fakeRemoteUsername, boolean expectPermissionDenied) { SCMClientProtocolServer mockClientServer = spy( @@ -251,10 +256,64 @@ private void verifyPermissionDeniedException(Exception e, String userName) { assertEquals(expectedErrorMessage, e.getMessage()); } - @Test - public void testBlockDeletionTransactions() throws Exception { - int numKeys = 5; - OzoneConfiguration conf = new OzoneConfiguration(); + private void testBlockDeletionTransactions(MiniOzoneCluster cluster) throws Exception { + DeletedBlockLog delLog = cluster.getStorageContainerManager() + .getScmBlockManager().getDeletedBlockLog(); + assertEquals(0, delLog.getNumOfValidTransactions()); + + Map keyLocations = TestDataUtil.createKeys(cluster, KEY_COUNT); + // Wait for container report + Thread.sleep(1000); + for (OmKeyInfo keyInfo : keyLocations.values()) { + OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(), + cluster.getStorageContainerManager()); + } + Map> containerBlocks = createDeleteTXLog( + cluster.getStorageContainerManager(), + delLog, keyLocations, cluster); + + // Verify a few TX gets created in the TX log. + assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0); + + // Once TXs are written into the log, SCM starts to fetch TX + // entries from the log and schedule block deletions in HB interval, + // after sometime, all the TX should be proceed and by then + // the number of containerBlocks of all known containers will be + // empty again. + OzoneTestUtils.waitBlockDeleted(cluster.getStorageContainerManager()); + assertTrue(verifyBlocksWithTxnTable(cluster, containerBlocks)); + // Continue the work, add some TXs that with known container names, + // but unknown block IDs. + for (Long containerID : containerBlocks.keySet()) { + // Add 2 TXs per container. + Map> deletedBlocks = new HashMap<>(); + List blocks = new ArrayList<>(); + blocks.add(RandomUtils.nextLong()); + blocks.add(RandomUtils.nextLong()); + deletedBlocks.put(containerID, blocks); + addTransactions(cluster.getStorageContainerManager(), delLog, + deletedBlocks); + } + + // Verify a few TX gets created in the TX log. + assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0); + + // These blocks cannot be found in the container, skip deleting them + // eventually these TX will success. + GenericTestUtils.waitFor(() -> { + try { + if (SCMHAUtils.isSCMHAEnabled(cluster.getConf())) { + cluster.getStorageContainerManager().getScmHAManager() + .asSCMHADBTransactionBuffer().flush(); + } + return delLog.getFailedTransactions(-1, 0).size() == 0; + } catch (IOException e) { + return false; + } + }, 1000, 20000); + } + + private static void configureBlockDeletion(OzoneConfiguration conf) { conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); DatanodeConfiguration datanodeConfiguration = conf.getObject( @@ -280,153 +339,74 @@ public void testBlockDeletionTransactions() throws Exception { conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5); // Reset container provision size, otherwise only one container // is created by default. - conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, - numKeys); - - try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) - .build()) { - cluster.waitForClusterToBeReady(); - DeletedBlockLog delLog = cluster.getStorageContainerManager() - .getScmBlockManager().getDeletedBlockLog(); - assertEquals(0, delLog.getNumOfValidTransactions()); - - Map keyLocations = TestDataUtil.createKeys(cluster, numKeys); - // Wait for container report - Thread.sleep(1000); - for (OmKeyInfo keyInfo : keyLocations.values()) { - OzoneTestUtils.closeContainers(keyInfo.getKeyLocationVersions(), - cluster.getStorageContainerManager()); - } - Map> containerBlocks = createDeleteTXLog( - cluster.getStorageContainerManager(), - delLog, keyLocations, cluster, conf); - - // Verify a few TX gets created in the TX log. - assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0); - - // Once TXs are written into the log, SCM starts to fetch TX - // entries from the log and schedule block deletions in HB interval, - // after sometime, all the TX should be proceed and by then - // the number of containerBlocks of all known containers will be - // empty again. - OzoneTestUtils.waitBlockDeleted(cluster.getStorageContainerManager()); - assertTrue(verifyBlocksWithTxnTable(cluster, conf, containerBlocks)); - // Continue the work, add some TXs that with known container names, - // but unknown block IDs. - for (Long containerID : containerBlocks.keySet()) { - // Add 2 TXs per container. - Map> deletedBlocks = new HashMap<>(); - List blocks = new ArrayList<>(); - blocks.add(RandomUtils.nextLong()); - blocks.add(RandomUtils.nextLong()); - deletedBlocks.put(containerID, blocks); - addTransactions(cluster.getStorageContainerManager(), delLog, - deletedBlocks); - } - - // Verify a few TX gets created in the TX log. - assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0); - - // These blocks cannot be found in the container, skip deleting them - // eventually these TX will success. - GenericTestUtils.waitFor(() -> { - try { - if (SCMHAUtils.isSCMHAEnabled(cluster.getConf())) { - cluster.getStorageContainerManager().getScmHAManager() - .asSCMHADBTransactionBuffer().flush(); - } - return delLog.getFailedTransactions(-1, 0).size() == 0; - } catch (IOException e) { - return false; - } - }, 1000, 20000); - } + conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 10 * KEY_COUNT); } - @Test - public void testOldDNRegistersToReInitialisedSCM() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1000, TimeUnit.MILLISECONDS); - conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 3000, TimeUnit.MILLISECONDS); - - - - try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(1) - .build()) { - cluster.waitForClusterToBeReady(); - HddsDatanodeService datanode = cluster.getHddsDatanodes().get(0); - StorageContainerManager scm = cluster.getStorageContainerManager(); - File dbDir = scm.getScmMetadataStore().getStore().getDbLocation(); - scm.stop(); - - // re-initialise SCM with new clusterID - - GenericTestUtils.deleteDirectory(new File(SCMHAUtils.getRatisStorageDir(conf))); - GenericTestUtils.deleteDirectory(dbDir); - GenericTestUtils.deleteDirectory( - new File(scm.getScmStorageConfig().getStorageDir())); - String newClusterId = UUID.randomUUID().toString(); - StorageContainerManager.scmInit(scm.getConfiguration(), newClusterId); - scm = HddsTestUtils.getScmSimple(scm.getConfiguration()); - - DatanodeStateMachine dsm = datanode.getDatanodeStateMachine(); + // assumes SCM is already stopped + private void testOldDNRegistersToReInitialisedSCM(MiniOzoneCluster cluster) throws Exception { + HddsDatanodeService datanode = cluster.getHddsDatanodes().get(0); + + // re-initialise SCM with new clusterID + String newClusterId = UUID.randomUUID().toString(); + StorageContainerManager.scmInit(cluster.getConf(), newClusterId); + StorageContainerManager scm = HddsTestUtils.getScmSimple(cluster.getConf()); + + DatanodeStateMachine dsm = datanode.getDatanodeStateMachine(); + assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, + dsm.getContext().getState()); + // DN Endpoint State has already gone through GetVersion and Register, + // so it will be in HEARTBEAT state. + for (EndpointStateMachine endpoint : dsm.getConnectionManager() + .getValues()) { + assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, + endpoint.getState()); + } + GenericTestUtils.LogCapturer scmDnHBDispatcherLog = + GenericTestUtils.LogCapturer.captureLogs( + SCMDatanodeHeartbeatDispatcher.LOG); + LogManager.getLogger(HeartbeatEndpointTask.class).setLevel(Level.DEBUG); + GenericTestUtils.LogCapturer heartbeatEndpointTaskLog = + GenericTestUtils.LogCapturer.captureLogs(HeartbeatEndpointTask.LOG); + GenericTestUtils.LogCapturer versionEndPointTaskLog = + GenericTestUtils.LogCapturer.captureLogs(VersionEndpointTask.LOG); + // Initially empty + assertThat(scmDnHBDispatcherLog.getOutput()).isEmpty(); + assertThat(versionEndPointTaskLog.getOutput()).isEmpty(); + // start the new SCM + try { + scm.start(); + // Initially DatanodeStateMachine will be in Running state assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, dsm.getContext().getState()); - // DN Endpoint State has already gone through GetVersion and Register, - // so it will be in HEARTBEAT state. - for (EndpointStateMachine endpoint : dsm.getConnectionManager() - .getValues()) { - assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT, - endpoint.getState()); - } - GenericTestUtils.LogCapturer scmDnHBDispatcherLog = - GenericTestUtils.LogCapturer.captureLogs( - SCMDatanodeHeartbeatDispatcher.LOG); - LogManager.getLogger(HeartbeatEndpointTask.class).setLevel(Level.DEBUG); - GenericTestUtils.LogCapturer heartbeatEndpointTaskLog = - GenericTestUtils.LogCapturer.captureLogs(HeartbeatEndpointTask.LOG); - GenericTestUtils.LogCapturer versionEndPointTaskLog = - GenericTestUtils.LogCapturer.captureLogs(VersionEndpointTask.LOG); - // Initially empty - assertThat(scmDnHBDispatcherLog.getOutput()).isEmpty(); - assertThat(versionEndPointTaskLog.getOutput()).isEmpty(); - // start the new SCM - try { - scm.start(); - // Initially DatanodeStateMachine will be in Running state - assertEquals(DatanodeStateMachine.DatanodeStates.RUNNING, - dsm.getContext().getState()); - // DN heartbeats to new SCM, SCM doesn't recognize the node, sends the - // command to DN to re-register. Wait for SCM to send re-register command - String expectedLog = String.format( - "SCM received heartbeat from an unregistered datanode %s. " - + "Asking datanode to re-register.", - datanode.getDatanodeDetails()); - GenericTestUtils.waitFor( - () -> scmDnHBDispatcherLog.getOutput().contains(expectedLog), 100, - 30000); - ExitUtil.disableSystemExit(); - // As part of processing response for re-register, DN EndpointStateMachine - // goes to GET-VERSION state which checks if there is already existing - // version file on the DN & if the clusterID matches with that of the SCM - // In this case, it won't match and gets InconsistentStorageStateException - // and DN shuts down. - String expectedLog2 = "Received SCM notification to register." - + " Interrupt HEARTBEAT and transit to GETVERSION state."; - GenericTestUtils.waitFor( - () -> heartbeatEndpointTaskLog.getOutput().contains(expectedLog2), - 100, 5000); - GenericTestUtils.waitFor(() -> dsm.getContext().getShutdownOnError(), 100, - 5000); - assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN, - dsm.getContext().getState()); - assertThat(versionEndPointTaskLog.getOutput()).contains( - "org.apache.hadoop.ozone.common" + - ".InconsistentStorageStateException: Mismatched ClusterIDs"); - } finally { - scm.stop(); - } + // DN heartbeats to new SCM, SCM doesn't recognize the node, sends the + // command to DN to re-register. Wait for SCM to send re-register command + String expectedLog = String.format( + "SCM received heartbeat from an unregistered datanode %s. " + + "Asking datanode to re-register.", + datanode.getDatanodeDetails()); + GenericTestUtils.waitFor( + () -> scmDnHBDispatcherLog.getOutput().contains(expectedLog), 100, + 30000); + ExitUtil.disableSystemExit(); + // As part of processing response for re-register, DN EndpointStateMachine + // goes to GET-VERSION state which checks if there is already existing + // version file on the DN & if the clusterID matches with that of the SCM + // In this case, it won't match and gets InconsistentStorageStateException + // and DN shuts down. + String expectedLog2 = "Received SCM notification to register." + + " Interrupt HEARTBEAT and transit to GETVERSION state."; + GenericTestUtils.waitFor( + () -> heartbeatEndpointTaskLog.getOutput().contains(expectedLog2), + 100, 5000); + GenericTestUtils.waitFor(() -> dsm.getContext().getShutdownOnError(), 100, + 5000); + assertEquals(DatanodeStateMachine.DatanodeStates.SHUTDOWN, + dsm.getContext().getState()); + assertThat(versionEndPointTaskLog.getOutput()).contains( + "org.apache.hadoop.ozone.common" + + ".InconsistentStorageStateException: Mismatched ClusterIDs"); + } finally { + scm.stop(); } } @@ -472,7 +452,7 @@ public void testBlockDeletingThrottling() throws Exception { } createDeleteTXLog(cluster.getStorageContainerManager(), - delLog, keyLocations, cluster, conf); + delLog, keyLocations, cluster); // Verify a few TX gets created in the TX log. assertThat(delLog.getNumOfValidTransactions()).isGreaterThan(0); @@ -499,7 +479,7 @@ public void testBlockDeletingThrottling() throws Exception { private Map> createDeleteTXLog( StorageContainerManager scm, DeletedBlockLog delLog, - Map keyLocations, MiniOzoneCluster cluster, OzoneConfiguration conf) + Map keyLocations, MiniOzoneCluster cluster) throws IOException, TimeoutException { // These keys will be written into a bunch of containers, // gets a set of container names, verify container containerBlocks @@ -518,7 +498,7 @@ private Map> createDeleteTXLog( } assertThat(totalCreatedBlocks).isGreaterThan(0); assertEquals(totalCreatedBlocks, - getAllBlocks(cluster, conf, containerNames).size()); + getAllBlocks(cluster, containerNames).size()); // Create a deletion TX for each key. Map> containerBlocks = Maps.newHashMap(); @@ -562,25 +542,15 @@ public void testSCMInitialization(@TempDir Path tempDir) throws Exception { validateRatisGroupExists(conf, clusterId.toString()); } - @Test - public void testSCMReinitialization(@TempDir Path tempDir) throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - Path scmPath = tempDir.resolve("scm-meta"); - conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString()); - //This will set the cluster id in the version file - + private void testSCMReinitialization(MiniOzoneCluster cluster) throws Exception { + cluster.getStorageContainerManager().stop(); - try (MiniOzoneCluster cluster = - MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build()) { - cluster.waitForClusterToBeReady(); - cluster.getStorageContainerManager().stop(); - final UUID clusterId = UUID.randomUUID(); - // This will initialize SCM - StorageContainerManager.scmInit(conf, clusterId.toString()); - SCMStorageConfig scmStore = new SCMStorageConfig(conf); - assertNotEquals(clusterId.toString(), scmStore.getClusterID()); - assertTrue(scmStore.isSCMHAEnabled()); - } + final UUID clusterId = UUID.randomUUID(); + // This will initialize SCM + StorageContainerManager.scmInit(cluster.getConf(), clusterId.toString()); + SCMStorageConfig scmStore = new SCMStorageConfig(cluster.getConf()); + assertNotEquals(clusterId.toString(), scmStore.getClusterID()); + assertTrue(scmStore.isSCMHAEnabled()); } @VisibleForTesting @@ -669,10 +639,22 @@ public void testScmInfo(@TempDir Path tempDir) throws Exception { /** * Test datanode heartbeat well processed with a 4-layer network topology. */ - @Test - public void testScmProcessDatanodeHeartbeat() throws Exception { + private void testScmProcessDatanodeHeartbeat(MiniOzoneCluster cluster) { + NodeManager nodeManager = cluster.getStorageContainerManager().getScmNodeManager(); + List allNodes = nodeManager.getAllNodes(); + assertEquals(cluster.getHddsDatanodes().size(), allNodes.size()); + + for (DatanodeDetails node : allNodes) { + DatanodeInfo datanodeInfo = assertInstanceOf(DatanodeInfo.class, nodeManager.getNodeByUuid(node.getUuid())); + assertNotNull(datanodeInfo); + assertThat(datanodeInfo.getLastHeartbeatTime()).isPositive(); + assertEquals(datanodeInfo.getUuidString(), datanodeInfo.getNetworkName()); + assertEquals("/rack1", datanodeInfo.getNetworkLocation()); + } + } + + private static void configureTopology(OzoneConfiguration conf) throws UnknownHostException { String rackName = "/rack1"; - OzoneConfiguration conf = new OzoneConfiguration(); conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, StaticMapping.class, DNSToSwitchMapping.class); StaticMapping.addNodeToRack(NetUtils.normalizeHostName(HddsUtils.getHostName(conf)), @@ -680,34 +662,6 @@ public void testScmProcessDatanodeHeartbeat() throws Exception { // In case of JDK17, the IP address is resolved to localhost mapped to 127.0.0.1 which is not in sync with JDK8 // and hence need to make following entry under HDDS-10132 StaticMapping.addNodeToRack(LOCALHOST_IP, rackName); - - final int datanodeNum = 3; - - try (MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(datanodeNum) - .build()) { - cluster.waitForClusterToBeReady(); - StorageContainerManager scm = cluster.getStorageContainerManager(); - // first sleep 10s - Thread.sleep(10000); - // verify datanode heartbeats are well processed - long heartbeatCheckerIntervalMs = cluster.getConf() - .getTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1000, - TimeUnit.MILLISECONDS); - long start = Time.monotonicNow(); - Thread.sleep(heartbeatCheckerIntervalMs * 2); - - List allNodes = scm.getScmNodeManager().getAllNodes(); - assertEquals(datanodeNum, allNodes.size()); - for (DatanodeDetails node : allNodes) { - DatanodeInfo datanodeInfo = (DatanodeInfo) scm.getScmNodeManager() - .getNodeByUuid(node.getUuidString()); - assertThat(datanodeInfo.getLastHeartbeatTime()).isGreaterThan(start); - assertEquals(datanodeInfo.getUuidString(), - datanodeInfo.getNetworkName()); - assertEquals("/rack1", datanodeInfo.getNetworkLocation()); - } - } } @Test @@ -942,44 +896,18 @@ private void addTransactions(StorageContainerManager scm, } } - private static class CloseContainerCommandMatcher - implements ArgumentMatcher { - - private final CommandForDatanode cmd; - private final UUID uuid; - - CloseContainerCommandMatcher(UUID uuid, CommandForDatanode cmd) { - this.uuid = uuid; - this.cmd = cmd; - } - - @Override - public boolean matches(CommandForDatanode cmdRight) { - CloseContainerCommand left = (CloseContainerCommand) cmd.getCommand(); - CloseContainerCommand right = - (CloseContainerCommand) cmdRight.getCommand(); - return cmdRight.getDatanodeId().equals(uuid) - && left.getContainerID() == right.getContainerID() - && left.getPipelineID().equals(right.getPipelineID()) - && left.getType() == right.getType() - && left.getProto().equals(right.getProto()); - } - } - - public List getAllBlocks(MiniOzoneCluster cluster, OzoneConfiguration conf, Set containerIDs) - throws IOException { + public List getAllBlocks(MiniOzoneCluster cluster, Set containerIDs) throws IOException { List allBlocks = Lists.newArrayList(); for (Long containerID : containerIDs) { - allBlocks.addAll(getAllBlocks(cluster, conf, containerID)); + allBlocks.addAll(getAllBlocks(cluster, containerID)); } return allBlocks; } - public List getAllBlocks(MiniOzoneCluster cluster, - OzoneConfiguration conf, Long containerID) throws IOException { + public List getAllBlocks(MiniOzoneCluster cluster, Long containerID) throws IOException { List allBlocks = Lists.newArrayList(); KeyValueContainerData cData = getContainerMetadata(cluster, containerID); - try (DBHandle db = BlockUtils.getDB(cData, conf)) { + try (DBHandle db = BlockUtils.getDB(cData, cluster.getConf())) { List> kvs = db.getStore().getBlockDataTable() @@ -994,12 +922,12 @@ public List getAllBlocks(MiniOzoneCluster cluster, return allBlocks; } - public boolean verifyBlocksWithTxnTable(MiniOzoneCluster cluster, OzoneConfiguration conf, + public boolean verifyBlocksWithTxnTable(MiniOzoneCluster cluster, Map> containerBlocks) throws IOException { for (Map.Entry> entry : containerBlocks.entrySet()) { KeyValueContainerData cData = getContainerMetadata(cluster, entry.getKey()); - try (DBHandle db = BlockUtils.getDB(cData, conf)) { + try (DBHandle db = BlockUtils.getDB(cData, cluster.getConf())) { DatanodeStore ds = db.getStore(); DatanodeStoreSchemaThreeImpl dnStoreImpl = (DatanodeStoreSchemaThreeImpl) ds;