Skip to content

Commit

Permalink
Refactors MultipleStoresIT
Browse files Browse the repository at this point in the history
Refactors MultipleStoresIT to function more similarly to how other fate
tests are run (the two store types are tested in their own separate
concrete classes). Created MultipleStoresTestRunner which is very
similar to FateTestRunner. MultipleStoresIT is now an abstract class
implemented by two new classes MetaMultipleStoresIT and
UserMultipleStoresIT.

closes apache#4903
  • Loading branch information
kevinrr888 committed Oct 11, 2024
1 parent 93e4862 commit 340604c
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 135 deletions.
168 changes: 33 additions & 135 deletions test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
*/
package org.apache.accumulo.test.fate;

import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -38,94 +36,44 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.test.util.Wait;
import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

public class MultipleStoresIT extends SharedMiniClusterBase {
public abstract class MultipleStoresIT extends SharedMiniClusterBase
implements MultipleStoresTestRunner {

private static final Logger LOG = LoggerFactory.getLogger(MultipleStoresIT.class);
@TempDir
private static File tempDir;
private static ZooKeeperTestingServer szk = null;
private static ZooReaderWriter zk;
private static final String FATE_DIR = "/fate";
private ClientContext client;

@BeforeEach
public void beforeEachSetup() {
client = (ClientContext) Accumulo.newClient().from(getClientProps()).build();
}

@AfterEach
public void afterEachTeardown() {
client.close();
}

@BeforeAll
public static void beforeAllSetup() throws Exception {
SharedMiniClusterBase.startMiniCluster();
szk = new ZooKeeperTestingServer(tempDir);
zk = szk.getZooReaderWriter();
}

@AfterAll
public static void afterAllTeardown() throws Exception {
SharedMiniClusterBase.stopMiniCluster();
szk.close();
}

@Test
public void testReserveUnreserve() throws Exception {
testReserveUnreserve(FateInstanceType.META);
testReserveUnreserve(FateInstanceType.USER);
executeSleepingEnvTest(this::testReserveUnreserve);
}

private void testReserveUnreserve(FateInstanceType storeType) throws Exception {
private void testReserveUnreserve(TestStoreFactory<SleepingTestEnv> testStoreFactory)
throws Exception {
// reserving/unreserving a FateId should be reflected across instances of the stores
final String tableName = getUniqueNames(1)[0];
final int numFateIds = 500;
final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>();
final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final FateStore<SleepingTestEnv> store1, store2;
final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
Map<FateId,FateStore.FateReservation> activeReservations;

if (isUserStore) {
createFateTable(client, tableName);
store1 = new UserFateStore<>(client, tableName, lock1, null);
store2 = new UserFateStore<>(client, tableName, lock2, null);
} else {
store1 = new MetaFateStore<>(FATE_DIR, zk, lock1, null);
store2 = new MetaFateStore<>(FATE_DIR, zk, lock2, null);
}
final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, null);
final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, null);
final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID());

// Create the fate ids using store1
for (int i = 0; i < numFateIds; i++) {
Expand Down Expand Up @@ -174,52 +122,33 @@ private void testReserveUnreserve(FateInstanceType storeType) throws Exception {

@Test
public void testReserveNonExistentTxn() throws Exception {
testReserveNonExistentTxn(FateInstanceType.META);
testReserveNonExistentTxn(FateInstanceType.USER);
executeSleepingEnvTest(this::testReserveNonExistentTxn);
}

private void testReserveNonExistentTxn(FateInstanceType storeType) throws Exception {
private void testReserveNonExistentTxn(TestStoreFactory<SleepingTestEnv> testStoreFactory)
throws Exception {
// Tests that reserve() doesn't hang indefinitely and instead throws an error
// on reserve() a non-existent transaction.
final FateStore<SleepingTestEnv> store;
final boolean isUserStore = storeType == FateInstanceType.USER;
final String tableName = getUniqueNames(1)[0];
final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);

if (isUserStore) {
createFateTable(client, tableName);
store = new UserFateStore<>(client, tableName, lock, null);
} else {
store = new MetaFateStore<>(FATE_DIR, zk, lock, null);
}
final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null);
final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());

var err = assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId));
assertTrue(err.getMessage().contains(fakeFateId.canonical()));
}

@Test
public void testReserveReservedAndUnreserveUnreserved() throws Exception {
testReserveReservedAndUnreserveUnreserved(FateInstanceType.META);
testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER);
executeSleepingEnvTest(this::testReserveReservedAndUnreserveUnreserved);
}

private void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeType)
throws Exception {
final String tableName = getUniqueNames(1)[0];
private void testReserveReservedAndUnreserveUnreserved(
TestStoreFactory<SleepingTestEnv> testStoreFactory) throws Exception {
final int numFateIds = 500;
final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>();
final FateStore<SleepingTestEnv> store;
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);

if (isUserStore) {
createFateTable(client, tableName);
store = new UserFateStore<>(client, tableName, lock, null);
} else {
store = new MetaFateStore<>(FATE_DIR, zk, lock, null);
}
final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null);

// Create some FateIds and ensure that they can be reserved
for (int i = 0; i < numFateIds; i++) {
Expand Down Expand Up @@ -248,26 +177,16 @@ private void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeTyp

@Test
public void testReserveAfterUnreserveAndReserveAfterDeleted() throws Exception {
testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.META);
testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER);
executeSleepingEnvTest(this::testReserveAfterUnreserveAndReserveAfterDeleted);
}

private void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType)
throws Exception {
final String tableName = getUniqueNames(1)[0];
private void testReserveAfterUnreserveAndReserveAfterDeleted(
TestStoreFactory<SleepingTestEnv> testStoreFactory) throws Exception {
final int numFateIds = 500;
final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new ArrayList<>();
final FateStore<SleepingTestEnv> store;
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);

if (isUserStore) {
createFateTable(client, tableName);
store = new UserFateStore<>(client, tableName, lock, null);
} else {
store = new MetaFateStore<>(FATE_DIR, zk, lock, null);
}
final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock, null);

// Create some FateIds and ensure that they can be reserved
for (int i = 0; i < numFateIds; i++) {
Expand Down Expand Up @@ -305,31 +224,22 @@ private void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType st

@Test
public void testMultipleFateInstances() throws Exception {
testMultipleFateInstances(FateInstanceType.META);
testMultipleFateInstances(FateInstanceType.USER);
executeSleepingEnvTest(this::testMultipleFateInstances);
}

private void testMultipleFateInstances(FateInstanceType storeType) throws Exception {
final String tableName = getUniqueNames(1)[0];
private void testMultipleFateInstances(TestStoreFactory<SleepingTestEnv> testStoreFactory)
throws Exception {
final int numFateIds = 500;
final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final FateStore<SleepingTestEnv> store1, store2;
final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
final SleepingTestEnv testEnv2 = new SleepingTestEnv(50);
final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
final Set<ZooUtil.LockID> liveLocks = new HashSet<>();
final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains;
final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld);
final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld);

if (isUserStore) {
createFateTable(client, tableName);
store1 = new UserFateStore<>(client, tableName, lock1, isLockHeld);
store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld);
} else {
store1 = new MetaFateStore<>(FATE_DIR, zk, lock1, isLockHeld);
store2 = new MetaFateStore<>(FATE_DIR, zk, lock2, isLockHeld);
}
liveLocks.add(lock1);
liveLocks.add(lock2);

Expand Down Expand Up @@ -366,23 +276,20 @@ private void testMultipleFateInstances(FateInstanceType storeType) throws Except

@Test
public void testDeadReservationsCleanup() throws Exception {
testDeadReservationsCleanup(FateInstanceType.META);
testDeadReservationsCleanup(FateInstanceType.USER);
executeLatchEnvTest(this::testDeadReservationsCleanup);
}

private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exception {
private void testDeadReservationsCleanup(TestStoreFactory<LatchTestEnv> testStoreFactory)
throws Exception {
// Tests reserving some transactions, then simulating that the Manager died by creating
// a new Fate instance and store with a new LockID. The transactions which were
// reserved using the old LockID should be cleaned up by Fate's DeadReservationCleaner,
// then picked up by the new Fate/store.

final String tableName = getUniqueNames(1)[0];
// One transaction for each FATE worker thread
final int numFateIds =
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
final boolean isUserStore = storeType == FateInstanceType.USER;
final Set<FateId> allIds = new HashSet<>();
final FateStore<LatchTestEnv> store1, store2;
final LatchTestEnv testEnv1 = new LatchTestEnv();
final LatchTestEnv testEnv2 = new LatchTestEnv();
final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
Expand All @@ -391,12 +298,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce
final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains;
Map<FateId,FateStore.FateReservation> reservations;

if (isUserStore) {
createFateTable(client, tableName);
store1 = new UserFateStore<>(client, tableName, lock1, isLockHeld);
} else {
store1 = new MetaFateStore<>(FATE_DIR, zk, lock1, isLockHeld);
}
final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1, isLockHeld);
liveLocks.add(lock1);

FastFate<LatchTestEnv> fate1 = new FastFate<>(testEnv1, store1, true, Object::toString,
Expand Down Expand Up @@ -424,11 +326,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce
reservations.values().forEach(
res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID())));

if (isUserStore) {
store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld);
} else {
store2 = new MetaFateStore<>(FATE_DIR, zk, lock2, isLockHeld);
}
final FateStore<LatchTestEnv> store2 = testStoreFactory.create(lock2, isLockHeld);

// Verify store2 can see the reserved transactions even though they were reserved using
// store1
Expand Down Expand Up @@ -501,7 +399,7 @@ public String getReturn() {
}
}

public static class SleepingTestEnv {
public static class SleepingTestEnv extends MultipleStoresTestEnv {
public final Set<FateId> executedOps = Collections.synchronizedSet(new HashSet<>());
public final int sleepTimeMs;

Expand Down Expand Up @@ -544,7 +442,7 @@ public String getReturn() {
}
}

public static class LatchTestEnv {
public static class LatchTestEnv extends MultipleStoresTestEnv {
public final AtomicInteger numWorkers = new AtomicInteger(0);
public final CountDownLatch workersLatch = new CountDownLatch(1);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.test.fate;

import java.util.function.Predicate;

import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.zookeeper.KeeperException;

public interface MultipleStoresTestRunner {

void executeSleepingEnvTest(
MultipleStoresTestExecutor<MultipleStoresIT.SleepingTestEnv> testMethod) throws Exception;

void executeLatchEnvTest(MultipleStoresTestExecutor<MultipleStoresIT.LatchTestEnv> testMethod)
throws Exception;

interface TestStoreFactory<T extends MultipleStoresTestEnv> {
FateStore<T> create(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> isLockHeld)
throws InterruptedException, KeeperException;
}

@FunctionalInterface
interface MultipleStoresTestExecutor<T extends MultipleStoresTestEnv> {
void execute(TestStoreFactory<T> fateStoreFactory) throws Exception;
}

class MultipleStoresTestEnv {}
}
Loading

0 comments on commit 340604c

Please sign in to comment.