Skip to content

Commit

Permalink
HDDS-12081. TestKeyInputStream repeats tests with default container l…
Browse files Browse the repository at this point in the history
…ayout
  • Loading branch information
adoroszlai committed Jan 14, 2025
1 parent 0723902 commit 86840d8
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@


import java.io.File;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
Expand Down Expand Up @@ -54,6 +57,9 @@ public File getChunkFile(File chunkDir, BlockID blockID, String chunkName) {
private static final List<ContainerLayoutVersion> CONTAINER_LAYOUT_VERSIONS =
ImmutableList.copyOf(values());

private static final Set<ContainerLayoutVersion> SUPPORTED_VERSIONS =
ImmutableSet.copyOf(EnumSet.complementOf(EnumSet.of(FILE_PER_CHUNK)));

private final int version;
private final String description;

Expand Down Expand Up @@ -83,6 +89,13 @@ public static List<ContainerLayoutVersion> getAllVersions() {
return CONTAINER_LAYOUT_VERSIONS;
}

/**
* @return supported versions
*/
public static Set<ContainerLayoutVersion> getSupportedVersions() {
return SUPPORTED_VERSIONS;
}

/**
* @return the latest version.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.File;
Expand Down Expand Up @@ -122,13 +123,24 @@ private static void assertFileCount(File dir, long count) {
}

/**
* Composite annotation for tests parameterized with {@link ContainerLayoutTestInfo}.
* Composite annotation for tests parameterized with {@link ContainerLayoutVersion}.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@ParameterizedTest
@MethodSource("org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion#getAllVersions")
@MethodSource("org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion#getSupportedVersions")
public @interface ContainerTest {
// composite annotation
}

/**
* Composite annotation for read tests parameterized with {@link ContainerLayoutTestInfo}.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@ParameterizedTest
@EnumSource(names = {"FILE_PER_BLOCK", "FILE_PER_CHUNK"})
public @interface ReadTest {
// composite annotation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ContainerTestVersionInfo(String schemaVersion,

private static List<ContainerTestVersionInfo> layoutList = new ArrayList<>();
static {
for (ContainerLayoutVersion ch : ContainerLayoutVersion.getAllVersions()) {
for (ContainerLayoutVersion ch : ContainerLayoutVersion.getSupportedVersions()) {
for (String sch : SCHEMA_VERSIONS) {
layoutList.add(new ContainerTestVersionInfo(sch, ch));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.om.TestBucket;
import org.junit.jupiter.api.TestInstance;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -36,24 +36,23 @@
/**
* Tests {@link ChunkInputStream}.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestChunkInputStream extends TestInputStreamBase {

/**
* Run the tests as a single test method to avoid needing a new mini-cluster
* for each test.
*/
@ContainerLayoutTestInfo.ContainerTest
void testAll(ContainerLayoutVersion layout) throws Exception {
try (MiniOzoneCluster cluster = newCluster(layout)) {
cluster.waitForClusterToBeReady();
@ContainerLayoutTestInfo.ReadTest
void testAll(ContainerLayoutTestInfo layout) throws Exception {
try (OzoneClient client = getCluster().newClient()) {
updateConfig(getCluster(), layout);

try (OzoneClient client = cluster.newClient()) {
TestBucket bucket = TestBucket.newBuilder(client).build();
TestBucket bucket = TestBucket.newBuilder(client).build();

testChunkReadBuffers(bucket);
testBufferRelease(bucket);
testCloseReleasesBuffers(bucket);
}
testChunkReadBuffers(bucket);
testBufferRelease(bucket);
testCloseReleasesBuffers(bucket);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,22 @@
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;

import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;

// TODO remove this class, set config as default in integration tests
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
abstract class TestInputStreamBase {

static final int CHUNK_SIZE = 1024 * 1024; // 1MB
Expand All @@ -42,8 +49,7 @@ abstract class TestInputStreamBase {
static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; // 8MB
static final int BYTES_PER_CHECKSUM = 256 * 1024; // 256KB

protected static MiniOzoneCluster newCluster(
ContainerLayoutVersion containerLayout) throws Exception {
protected static MiniOzoneCluster newCluster() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();

OzoneClientConfig config = conf.getObject(OzoneClientConfig.class);
Expand All @@ -57,8 +63,6 @@ protected static MiniOzoneCluster newCluster(
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64,
StorageUnit.MB);
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY,
containerLayout.toString());

ReplicationManagerConfiguration repConf =
conf.getObject(ReplicationManagerConfiguration.class);
Expand All @@ -81,4 +85,38 @@ static String getNewKeyName() {
return UUID.randomUUID().toString();
}

protected static void updateConfig(MiniOzoneCluster cluster, ContainerLayoutTestInfo containerLayout) {
cluster.getHddsDatanodes().forEach(dn -> containerLayout.updateConfig(dn.getConf()));
}

private MiniOzoneCluster cluster;

protected MiniOzoneCluster getCluster() {
return cluster;
}

@BeforeAll
void setup() throws Exception {
cluster = newCluster();
cluster.waitForClusterToBeReady();
}

@AfterAll
void cleanup() {
IOUtils.closeQuietly(cluster);
}

@AfterEach
void closeContainers() {
StorageContainerManager scm = cluster.getStorageContainerManager();
scm.getContainerManager().getContainers().forEach(container -> {
if (container.isOpen()) {
try {
TestHelper.waitForContainerClose(getCluster(), container.getContainerID());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,26 @@
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
import org.apache.hadoop.hdds.scm.storage.ChunkInputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.io.KeyInputStream;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.om.TestBucket;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.ozone.container.TestHelper.countReplicas;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
import static org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -63,6 +62,8 @@
/**
* Tests {@link KeyInputStream}.
*/
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class TestKeyInputStream extends TestInputStreamBase {

/**
Expand Down Expand Up @@ -121,22 +122,20 @@ private void validate(TestBucket bucket, KeyInputStream keyInputStream,
* This test runs the others as a single test, so to avoid creating a new
* mini-cluster for each test.
*/
@ContainerLayoutTestInfo.ContainerTest
void testNonReplicationReads(ContainerLayoutVersion layout) throws Exception {
try (MiniOzoneCluster cluster = newCluster(layout)) {
cluster.waitForClusterToBeReady();

try (OzoneClient client = cluster.newClient()) {
TestBucket bucket = TestBucket.newBuilder(client).build();

testInputStreams(bucket);
testSeekRandomly(bucket);
testSeek(bucket);
testReadChunkWithByteArray(bucket);
testReadChunkWithByteBuffer(bucket);
testSkip(bucket);
testECSeek(bucket);
}
@ContainerLayoutTestInfo.ReadTest
void testNonReplicationReads(ContainerLayoutTestInfo layout) throws Exception {
try (OzoneClient client = getCluster().newClient()) {
updateConfig(getCluster(), layout);

TestBucket bucket = TestBucket.newBuilder(client).build();

testInputStreams(bucket);
testSeekRandomly(bucket);
testSeek(bucket);
testReadChunkWithByteArray(bucket);
testReadChunkWithByteBuffer(bucket);
testSkip(bucket);
testECSeek(bucket);
}
}

Expand Down Expand Up @@ -379,32 +378,18 @@ private void testSkip(TestBucket bucket) throws Exception {
}
}

private static List<Arguments> readAfterReplicationArgs() {
return Arrays.asList(
Arguments.arguments(FILE_PER_BLOCK, false),
Arguments.arguments(FILE_PER_BLOCK, true),
Arguments.arguments(FILE_PER_CHUNK, false),
Arguments.arguments(FILE_PER_CHUNK, true)
);
}

@ParameterizedTest
@MethodSource("readAfterReplicationArgs")
void readAfterReplication(ContainerLayoutVersion layout,
boolean doUnbuffer) throws Exception {
try (MiniOzoneCluster cluster = newCluster(layout)) {
cluster.waitForClusterToBeReady();

try (OzoneClient client = cluster.newClient()) {
TestBucket bucket = TestBucket.newBuilder(client).build();
@ValueSource(booleans = {false, true})
@Order(Integer.MAX_VALUE) // shuts down datanodes
void readAfterReplication(boolean doUnbuffer) throws Exception {
try (OzoneClient client = getCluster().newClient()) {
TestBucket bucket = TestBucket.newBuilder(client).build();

testReadAfterReplication(cluster, bucket, doUnbuffer);
}
testReadAfterReplication(bucket, doUnbuffer);
}
}

private void testReadAfterReplication(MiniOzoneCluster cluster,
TestBucket bucket, boolean doUnbuffer) throws Exception {
private void testReadAfterReplication(TestBucket bucket, boolean doUnbuffer) throws Exception {
int dataLength = 2 * CHUNK_SIZE;
String keyName = getNewKeyName();
byte[] data = bucket.writeRandomBytes(keyName, dataLength);
Expand All @@ -415,7 +400,7 @@ private void testReadAfterReplication(MiniOzoneCluster cluster,
.setKeyName(keyName)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.build();
OmKeyInfo keyInfo = cluster.getOzoneManager()
OmKeyInfo keyInfo = getCluster().getOzoneManager()
.getKeyInfo(keyArgs, false)
.getKeyInfo();

Expand All @@ -425,32 +410,20 @@ private void testReadAfterReplication(MiniOzoneCluster cluster,
assertEquals(1, locationInfoList.size());
OmKeyLocationInfo loc = locationInfoList.get(0);
long containerID = loc.getContainerID();
assertEquals(3, countReplicas(containerID, cluster));
assertEquals(3, countReplicas(containerID, getCluster()));

TestHelper.waitForContainerClose(cluster, containerID);
TestHelper.waitForContainerClose(getCluster(), containerID);

List<DatanodeDetails> pipelineNodes = loc.getPipeline().getNodes();

// read chunk data
try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
int b = keyInputStream.read();
assertNotEquals(-1, b);
if (doUnbuffer) {
keyInputStream.unbuffer();
}
cluster.shutdownHddsDatanode(pipelineNodes.get(0));
// check that we can still read it
assertReadFully(data, keyInputStream, dataLength - 1, 1);
}

// read chunk data with ByteBuffer
try (KeyInputStream keyInputStream = bucket.getKeyInputStream(keyName)) {
int b = keyInputStream.read();
assertNotEquals(-1, b);
if (doUnbuffer) {
keyInputStream.unbuffer();
}
cluster.shutdownHddsDatanode(pipelineNodes.get(0));
getCluster().shutdownHddsDatanode(pipelineNodes.get(0));
// check that we can still read it
assertReadFullyUsingByteBuffer(data, keyInputStream, dataLength - 1, 1);
}
Expand Down

0 comments on commit 86840d8

Please sign in to comment.