diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index b5dc999d425..e4c36fd63a3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -44,12 +44,17 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; public class AccumuloStore extends AbstractFateStore { + private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class); + private final ClientContext context; private final String tableName; @@ -73,11 +78,35 @@ public AccumuloStore(ClientContext context) { @Override public long create() { - long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + final int maxAttempts = 5; + long tid = 0L; - newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); + for (int attempt = 0; attempt < maxAttempts; attempt++) { + if (attempt >= 1) { + log.debug("Failed to create new id: {}, trying again", tid); + UtilWaitThread.sleep(100); + } + tid = getTid(); + + var status = newMutator(tid).requireStatus().putStatus(TStatus.NEW) + .putCreateTime(System.currentTimeMillis()).tryMutate(); + + switch (status) { + case ACCEPTED: + return tid; + case UNKNOWN: + case REJECTED: + continue; + default: + throw new IllegalStateException("Unknown status " + status); + } + } - return tid; + throw new IllegalStateException("Failed to create new id after " + maxAttempts + " attempts"); + } + + public long getTid() { + return RANDOM.get().nextLong() & 0x7fffffffffffffffL; } @Override @@ -249,11 +278,9 @@ public void setStatus(TStatus status) { public void setTransactionInfo(TxInfo txInfo, Serializable so) { verifyReserved(true); - FateMutator fateMutator = newMutator(tid); final byte[] serialized = serializeTxInfo(so); - fateMutator.putTxInfo(txInfo, serialized); - fateMutator.mutate(); + newMutator(tid).putTxInfo(txInfo, serialized).mutate(); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java index 4caf5985bd9..22497006db5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java @@ -46,4 +46,12 @@ public interface FateMutator { void mutate(); + // This exists to represent the subset of statuses from ConditionalWriter.Status that are expected + // and need to be handled. + enum Status { + ACCEPTED, REJECTED, UNKNOWN + } + + Status tryMutate(); + } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java index 90d22008d59..7056438d213 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java @@ -24,11 +24,16 @@ import java.util.Objects; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; @@ -45,13 +50,13 @@ public class FateMutatorImpl implements FateMutator { private final ClientContext context; private final String tableName; private final long tid; - private final Mutation mutation; + private final ConditionalMutation mutation; - FateMutatorImpl(ClientContext context, String tableName, long tid) { + public FateMutatorImpl(ClientContext context, String tableName, long tid) { this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); this.tid = tid; - this.mutation = new Mutation(new Text("tx_" + FastFormat.toHexString(tid))); + this.mutation = new ConditionalMutation(new Text("tx_" + FastFormat.toHexString(tid))); } @Override @@ -122,7 +127,10 @@ public FateMutator putTxInfo(TxInfo txInfo, byte[] data) { @Override public FateMutator putRepo(int position, Repo repo) { - mutation.put(RepoColumnFamily.NAME, invertRepo(position), new Value(serialize(repo))); + final Text cq = invertRepo(position); + // ensure this repo is not already set + mutation.addCondition(new Condition(RepoColumnFamily.NAME, cq)); + mutation.put(RepoColumnFamily.NAME, cq, new Value(serialize(repo))); return this; } @@ -143,12 +151,61 @@ public FateMutator delete() { return this; } + /** + * Require that the transaction status is one of the given statuses. If no statuses are provided, + * require that the status column is absent. + * + * @param statuses The statuses to check against. + */ + public FateMutator requireStatus(TStatus... statuses) { + Condition condition = StatusMappingIterator.createCondition(statuses); + mutation.addCondition(condition); + return this; + } + @Override public void mutate() { - try (BatchWriter writer = context.createBatchWriter(tableName)) { - writer.addMutation(mutation); - } catch (Exception e) { - throw new IllegalStateException(e); + var status = tryMutate(); + if (status != Status.ACCEPTED) { + throw new IllegalStateException("Failed to write mutation " + status + " " + mutation); + } + } + + @Override + public Status tryMutate() { + try { + // if there are no conditions attached, then we can use a batch writer + if (mutation.getConditions().isEmpty()) { + try (BatchWriter writer = context.createBatchWriter(tableName)) { + writer.addMutation(mutation); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + + return Status.ACCEPTED; + } else { + try (ConditionalWriter writer = context.createConditionalWriter(tableName)) { + ConditionalWriter.Result result = writer.write(mutation); + + switch (result.getStatus()) { + case ACCEPTED: + return Status.ACCEPTED; + case REJECTED: + return Status.REJECTED; + case UNKNOWN: + return Status.UNKNOWN; + default: + // do not expect other statuses + throw new IllegalStateException( + "Unhandled status for mutation " + result.getStatus()); + } + + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + } catch (TableNotFoundException e) { + throw new RuntimeException(e); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java new file mode 100644 index 00000000000..d7dc4fa22cd --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import static org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily.STATUS_COLUMN; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Condition; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * A specialized iterator that maps the value of the status column to "present" or "absent". This + * iterator allows for checking of the status column's value against a set of acceptable statuses + * within a conditional mutation. + */ +public class StatusMappingIterator implements SortedKeyValueIterator { + + private static final String PRESENT = "present"; + private static final String ABSENT = "absent"; + private static final String STATUS_SET_KEY = "statusSet"; + + private SortedKeyValueIterator source; + private final Set acceptableStatuses = new HashSet<>(); + private Value mappedValue; + + /** + * The set of acceptable must be provided as an option to the iterator using the + * {@link #STATUS_SET_KEY} key. + */ + @Override + public void init(SortedKeyValueIterator source, Map options, + IteratorEnvironment env) throws IOException { + this.source = source; + if (options.containsKey(STATUS_SET_KEY)) { + String[] statuses = decodeStatuses(options.get(STATUS_SET_KEY)); + acceptableStatuses.addAll(Arrays.asList(statuses)); + } + } + + @Override + public boolean hasTop() { + return source.hasTop(); + } + + @Override + public void next() throws IOException { + source.next(); + mapValue(); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + source.seek(range, columnFamilies, inclusive); + mapValue(); + } + + /** + * Maps the value of the status column to "present" or "absent" based on its presence within the + * set of statuses. + */ + private void mapValue() { + if (source.hasTop()) { + String currentValue = source.getTopValue().toString(); + mappedValue = + acceptableStatuses.contains(currentValue) ? new Value(PRESENT) : new Value(ABSENT); + } else { + mappedValue = null; + } + } + + @Override + public Key getTopKey() { + return source.getTopKey(); + } + + @Override + public Value getTopValue() { + return Objects.requireNonNull(mappedValue); + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + /** + * Creates a condition that checks if the status column's value is one of the given acceptable + * statuses. + * + * @param statuses The acceptable statuses. + * @return A condition configured to use this iterator. + */ + public static Condition createCondition(ReadOnlyFateStore.TStatus... statuses) { + Condition condition = + new Condition(STATUS_COLUMN.getColumnFamily(), STATUS_COLUMN.getColumnQualifier()); + + if (statuses.length == 0) { + // If no statuses are provided, require the status column to be absent. Return the condition + // with no value set so that the mutation will be rejected if the status column is present. + return condition; + } else { + IteratorSetting is = new IteratorSetting(100, StatusMappingIterator.class); + is.addOption(STATUS_SET_KEY, encodeStatuses(statuses)); + + // If the value of the status column is in the set, it will be mapped to "present", so set the + // value of the condition to "present". + return condition.setValue(PRESENT).setIterators(is); + } + } + + private static String encodeStatuses(ReadOnlyFateStore.TStatus[] statuses) { + return Arrays.stream(statuses).map(Enum::name).collect(Collectors.joining(",")); + } + + private static String[] decodeStatuses(String statuses) { + return statuses.split(","); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java new file mode 100644 index 00000000000..88c2ac48844 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AccumuloStoreIT extends SharedMiniClusterBase { + + private static final Logger log = LoggerFactory.getLogger(AccumuloStore.class); + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + private static class TestAccumuloStore extends AccumuloStore { + private final Iterator tidIterator; + + // use the list of txids to simulate collisions on txids + public TestAccumuloStore(ClientContext context, String tableName, List txids) { + super(context, tableName); + this.tidIterator = txids.iterator(); + } + + @Override + public long getTid() { + if (tidIterator.hasNext()) { + return tidIterator.next(); + } else { + return -1L; + } + } + } + + @Test + public void testCreateWithCollisionAndExceedRetryLimit() throws Exception { + String table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table); + + List txids = List.of(1L, 1L, 1L, 2L, 3L, 3L, 3L, 3L, 4L, 4L, 5L, 5L, 5L, 5L, 5L, 5L); + Set expectedTids = new TreeSet<>(txids); + TestAccumuloStore store = new TestAccumuloStore(client, table, txids); + + // call create and expect we get the unique txids + for (Long expectedTid : expectedTids) { + long tid = store.create(); + log.info("Created tid: " + tid); + assertEquals(expectedTid, tid, "Expected " + expectedTid + " but got " + tid); + } + + // Calling create again on 5L should throw an exception since we've exceeded the max retries + assertThrows(IllegalStateException.class, store::create); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java new file mode 100644 index 00000000000..27e6dd650ba --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import static org.apache.accumulo.core.fate.accumulo.FateMutator.Status.ACCEPTED; +import static org.apache.accumulo.core.fate.accumulo.FateMutator.Status.REJECTED; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Duration; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.accumulo.FateMutatorImpl; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.fate.FateIT; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FateMutatorImplIT extends SharedMiniClusterBase { + + Logger log = LoggerFactory.getLogger(FateMutatorImplIT.class); + final NewTableConfiguration ntc = + new NewTableConfiguration().withInitialHostingGoal(TabletHostingGoal.ALWAYS); + + @BeforeAll + public static void setup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + } + + @AfterAll + public static void tearDown() { + SharedMiniClusterBase.stopMiniCluster(); + } + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + @Test + public void putRepo() throws Exception { + final String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table, ntc); + + ClientContext context = (ClientContext) client; + + final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + + // add some repos in order + FateMutatorImpl fateMutator = new FateMutatorImpl<>(context, table, tid); + fateMutator.putRepo(100, new FateIT.TestRepo("test")).mutate(); + FateMutatorImpl fateMutator1 = new FateMutatorImpl<>(context, table, tid); + fateMutator1.putRepo(99, new FateIT.TestRepo("test")).mutate(); + FateMutatorImpl fateMutator2 = new FateMutatorImpl<>(context, table, tid); + fateMutator2.putRepo(98, new FateIT.TestRepo("test")).mutate(); + + // make sure we cant add a repo that has already been added + FateMutatorImpl fateMutator3 = new FateMutatorImpl<>(context, table, tid); + assertThrows(IllegalStateException.class, + () -> fateMutator3.putRepo(98, new FateIT.TestRepo("test")).mutate(), + "Repo in position 98 already exists. Expected to not be able to add it again."); + FateMutatorImpl fateMutator4 = new FateMutatorImpl<>(context, table, tid); + assertThrows(IllegalStateException.class, + () -> fateMutator4.putRepo(99, new FateIT.TestRepo("test")).mutate(), + "Repo in position 99 already exists. Expected to not be able to add it again."); + } + } + + @Test + public void requireStatus() throws Exception { + final String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table, ntc); + + ClientContext context = (ClientContext) client; + + final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + + // use require status passing all statuses. without the status column present this should fail + assertThrows(IllegalStateException.class, + () -> new FateMutatorImpl<>(context, table, tid) + .requireStatus(ReadOnlyFateStore.TStatus.values()) + .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate()); + assertEquals(0, client.createScanner(table).stream().count()); + var status = new FateMutatorImpl<>(context, table, tid) + .requireStatus(ReadOnlyFateStore.TStatus.values()) + .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); + assertEquals(REJECTED, status); + assertEquals(0, client.createScanner(table).stream().count()); + + // use require status without passing any statuses to require that the status column is absent + status = new FateMutatorImpl<>(context, table, tid).requireStatus() + .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); + assertEquals(ACCEPTED, status); + + // try again with requiring an absent status column. this time it should fail because we just + // put status NEW + assertThrows(IllegalStateException.class, + () -> new FateMutatorImpl<>(context, table, tid).requireStatus() + .putStatus(ReadOnlyFateStore.TStatus.NEW).mutate(), + "Expected to not be able to use requireStatus() without passing any statuses"); + status = new FateMutatorImpl<>(context, table, tid).requireStatus() + .putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate(); + assertEquals(REJECTED, status, + "Expected to not be able to use requireStatus() without passing any statuses"); + + // now use require same with the current status, NEW passed in + status = + new FateMutatorImpl<>(context, table, tid).requireStatus(ReadOnlyFateStore.TStatus.NEW) + .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); + assertEquals(ACCEPTED, status); + + // use require same with an array of statuses, none of which are the current status + // (SUBMITTED) + assertThrows(IllegalStateException.class, + () -> new FateMutatorImpl<>(context, table, tid) + .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN) + .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).mutate(), + "Expected to not be able to use requireStatus() with statuses that do not match the current status"); + status = new FateMutatorImpl<>(context, table, tid) + .requireStatus(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.UNKNOWN) + .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate(); + assertEquals(REJECTED, status, + "Expected to not be able to use requireStatus() with statuses that do not match the current status"); + + // use require same with an array of statuses, one of which is the current status (SUBMITTED) + status = new FateMutatorImpl<>(context, table, tid) + .requireStatus(ReadOnlyFateStore.TStatus.UNKNOWN, ReadOnlyFateStore.TStatus.SUBMITTED) + .putStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS).tryMutate(); + assertEquals(ACCEPTED, status); + + // one more time check that we can use require same with the current status (IN_PROGRESS) + status = new FateMutatorImpl<>(context, table, tid) + .requireStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS) + .putStatus(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS).tryMutate(); + assertEquals(ACCEPTED, status); + + } + + } + + void logAllEntriesInTable(String tableName, AccumuloClient client) throws Exception { + client.createScanner(tableName).forEach(e -> log.info(e.getKey() + " " + e.getValue())); + } +}