diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index 2e0214782a1..5be742d2fd3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -19,6 +19,8 @@ package org.apache.accumulo.core.fate; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.data.AbstractId; import org.apache.accumulo.core.manager.thrift.TFateId; @@ -34,6 +36,9 @@ public class FateId extends AbstractId { private static final long serialVersionUID = 1L; private static final String PREFIX = "FATE:"; private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$"); + private static final Pattern FATEID_PATTERN = Pattern.compile("^" + PREFIX + "(" + + Stream.of(FateInstanceType.values()).map(Enum::name).collect(Collectors.joining("|")) + + "):[0-9a-fA-F]+$"); private FateId(String canonical) { super(canonical); @@ -86,6 +91,30 @@ public static FateId from(FateInstanceType type, String hexTid) { } } + /** + * @param fateIdStr the string representation of the FateId + * @return a new FateId object from the given string + */ + public static FateId from(String fateIdStr) { + if (FATEID_PATTERN.matcher(fateIdStr).matches()) { + return new FateId(fateIdStr); + } else { + throw new IllegalArgumentException("Invalid Fate ID: " + fateIdStr); + } + } + + /** + * @param fateIdStr the string representation of the FateId + * @return true if the string is a valid FateId, false otherwise + */ + public static boolean isFormattedTid(String fateIdStr) { + return FATEID_PATTERN.matcher(fateIdStr).matches(); + } + + /** + * @param tFateId the TFateId + * @return the FateId equivalent of the given TFateId + */ public static FateId fromThrift(TFateId tFateId) { FateInstanceType type; long tid = tFateId.getTid(); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index f86248dab18..ce8f32ceb92 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.gc.GcCandidate; import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.lock.ServiceLock; @@ -371,7 +372,7 @@ interface TabletUpdates { T putTime(MetadataTime time); - T putBulkFile(ReferencedTabletFile bulkref, long tid); + T putBulkFile(ReferencedTabletFile bulkref, FateId fateId); T deleteBulkFile(StoredTabletFile bulkref); @@ -383,9 +384,9 @@ interface TabletUpdates { T deleteExternalCompaction(ExternalCompactionId ecid); - T putCompacted(long fateTxid); + T putCompacted(FateId fateId); - T deleteCompacted(long fateTxid); + T deleteCompacted(FateId fateId); T putTabletAvailability(TabletAvailability tabletAvailability); @@ -659,9 +660,9 @@ default void deleteScanServerFileReferences(String serverAddress, UUID serverSes * Create a Bulk Load In Progress flag in the metadata table * * @param path The bulk directory filepath - * @param fateTxid The id of the Bulk Import Fate operation. + * @param fateId The FateId of the Bulk Import Fate operation. */ - default void addBulkLoadInProgressFlag(String path, long fateTxid) { + default void addBulkLoadInProgressFlag(String path, FateId fateId) { throw new UnsupportedOperationException(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/CompactionMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/CompactionMetadata.java index c790f3698b5..e3cd1f3347c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/CompactionMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/CompactionMetadata.java @@ -26,6 +26,7 @@ import java.util.Objects; import java.util.Set; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.spi.compaction.CompactionKind; @@ -41,11 +42,11 @@ public class CompactionMetadata { private final short priority; private final CompactorGroupId cgid; private final boolean propagateDeletes; - private final Long fateTxId; + private final FateId fateId; public CompactionMetadata(Set jobFiles, ReferencedTabletFile compactTmpName, String compactorId, CompactionKind kind, short priority, CompactorGroupId ceid, - boolean propagateDeletes, Long fateTxId) { + boolean propagateDeletes, FateId fateId) { this.jobFiles = Objects.requireNonNull(jobFiles); this.compactTmpName = Objects.requireNonNull(compactTmpName); this.compactorId = Objects.requireNonNull(compactorId); @@ -53,7 +54,7 @@ public CompactionMetadata(Set jobFiles, ReferencedTabletFile c this.priority = priority; this.cgid = Objects.requireNonNull(ceid); this.propagateDeletes = propagateDeletes; - this.fateTxId = fateTxId; + this.fateId = fateId; } public Set getJobFiles() { @@ -84,8 +85,8 @@ public boolean getPropagateDeletes() { return propagateDeletes; } - public Long getFateTxId() { - return fateTxId; + public FateId getFateId() { + return fateId; } // This class is used to serialize and deserialize this class using GSon. Any changes to this @@ -98,7 +99,7 @@ private static class GSonData { String groupId; short priority; boolean propDels; - Long fateTxId; + String fateId; } public String toJson() { @@ -111,7 +112,7 @@ public String toJson() { jData.groupId = cgid.toString(); jData.priority = priority; jData.propDels = propagateDeletes; - jData.fateTxId = fateTxId; + jData.fateId = fateId.canonical(); return GSON.get().toJson(jData); } @@ -121,7 +122,7 @@ public static CompactionMetadata fromJson(String json) { return new CompactionMetadata(jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()), StoredTabletFile.of(jData.tmp).getTabletFile(), jData.compactor, CompactionKind.valueOf(jData.kind), jData.priority, - CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, jData.fateTxId); + CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, FateId.from(jData.fateId)); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 6f68fc142ac..d024fbfd59f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -30,7 +30,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.schema.Section; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.Pair; @@ -334,18 +334,13 @@ public static class BulkFileColumnFamily { public static final String STR_NAME = "loaded"; public static final Text NAME = new Text(STR_NAME); - public static long getBulkLoadTid(Value v) { + public static FateId getBulkLoadTid(Value v) { return getBulkLoadTid(v.toString()); } - public static long getBulkLoadTid(String vs) { - if (FateTxId.isFormatedTid(vs)) { - return FateTxId.fromString(vs); - } else { - // a new serialization format was introduce in 2.0. This code support deserializing the - // old format. - return Long.parseLong(vs); - } + public static FateId getBulkLoadTid(String vs) { + // ELASTICITY_TODO issue 4044 - May need to introduce code in upgrade to handle old format. + return FateId.from(vs); } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java index fb50ef10631..f2dd1ad8615 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/SelectedFiles.java @@ -26,7 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.StoredTabletFile; import com.google.common.base.Preconditions; @@ -43,18 +43,18 @@ public class SelectedFiles { private final Set files; private final boolean initiallySelectedAll; - private final long fateTxId; + private final FateId fateId; private String metadataValue; private static final Gson GSON = new GsonBuilder() .registerTypeAdapter(SelectedFiles.class, new SelectedFilesTypeAdapter()).create(); - public SelectedFiles(Set files, boolean initiallySelectedAll, long fateTxId) { + public SelectedFiles(Set files, boolean initiallySelectedAll, FateId fateId) { Preconditions.checkArgument(files != null && !files.isEmpty()); this.files = Set.copyOf(files); this.initiallySelectedAll = initiallySelectedAll; - this.fateTxId = fateTxId; + this.fateId = fateId; } private static class SelectedFilesTypeAdapter extends TypeAdapter { @@ -62,7 +62,7 @@ private static class SelectedFilesTypeAdapter extends TypeAdapter @Override public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOException { out.beginObject(); - out.name("txid").value(FateTxId.formatTid(selectedFiles.getFateTxId())); + out.name("fateId").value(selectedFiles.getFateId().canonical()); out.name("selAll").value(selectedFiles.initiallySelectedAll()); out.name("files").beginArray(); // sort the data to make serialized json comparable @@ -81,7 +81,7 @@ public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOExceptio @Override public SelectedFiles read(JsonReader in) throws IOException { - long fateTxId = 0L; + FateId fateId = null; boolean selAll = false; List files = new ArrayList<>(); @@ -89,8 +89,8 @@ public SelectedFiles read(JsonReader in) throws IOException { while (in.hasNext()) { String name = in.nextName(); switch (name) { - case "txid": - fateTxId = FateTxId.fromString(in.nextString()); + case "fateId": + fateId = FateId.from(in.nextString()); break; case "selAll": selAll = in.nextBoolean(); @@ -111,7 +111,7 @@ public SelectedFiles read(JsonReader in) throws IOException { Set tabletFiles = files.stream().map(StoredTabletFile::new).collect(Collectors.toSet()); - return new SelectedFiles(tabletFiles, selAll, fateTxId); + return new SelectedFiles(tabletFiles, selAll, fateId); } } @@ -128,8 +128,8 @@ public boolean initiallySelectedAll() { return initiallySelectedAll; } - public long getFateTxId() { - return fateTxId; + public FateId getFateId() { + return fateId; } public String getMetadataValue() { @@ -149,13 +149,13 @@ public boolean equals(Object obj) { return false; } SelectedFiles other = (SelectedFiles) obj; - return fateTxId == other.fateTxId && files.equals(other.files) + return fateId.equals(other.fateId) && files.equals(other.files) && initiallySelectedAll == other.initiallySelectedAll; } @Override public int hashCode() { - return Objects.hash(fateTxId, files, initiallySelectedAll); + return Objects.hash(fateId, files, initiallySelectedAll); } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 460a52e22b3..5b426952a48 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -49,7 +49,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; @@ -97,7 +97,7 @@ public class TabletMetadata { private Location location; private Map files; private List scans; - private Map loadedFiles; + private Map loadedFiles; private SelectedFiles selectedFiles; private EnumSet fetchedCols; private KeyExtent extent; @@ -115,7 +115,7 @@ public class TabletMetadata { private boolean onDemandHostingRequested = false; private TabletOperationId operationId; private boolean futureAndCurrentLocationSet = false; - private Set compacted; + private Set compacted; public static TabletMetadataBuilder builder(KeyExtent extent) { return new TabletMetadataBuilder(extent); @@ -285,7 +285,7 @@ public boolean hasCurrent() { return location != null && location.getType() == LocationType.CURRENT; } - public Map getLoaded() { + public Map getLoaded() { ensureFetched(ColumnType.LOADED); return loadedFiles; } @@ -390,7 +390,7 @@ public Map getExternalCompactions() { return extCompactions; } - public Set getCompacted() { + public Set getCompacted() { ensureFetched(ColumnType.COMPACTED); return compacted; } @@ -421,8 +421,8 @@ public static > TabletMetadata convertRow(Iterator final var scansBuilder = ImmutableList.builder(); final var logsBuilder = ImmutableList.builder(); final var extCompBuilder = ImmutableMap.builder(); - final var loadedFilesBuilder = ImmutableMap.builder(); - final var compactedBuilder = ImmutableSet.builder(); + final var loadedFilesBuilder = ImmutableMap.builder(); + final var compactedBuilder = ImmutableSet.builder(); ByteSequence row = null; while (rowIter.hasNext()) { @@ -519,7 +519,7 @@ public static > TabletMetadata convertRow(Iterator te.merged = true; break; case CompactedColumnFamily.STR_NAME: - compactedBuilder.add(FateTxId.fromString(qual)); + compactedBuilder.add(FateId.from(qual)); break; default: throw new IllegalStateException("Unexpected family " + fam); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index e3568a9ef06..75adaabe518 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -165,9 +166,9 @@ public TabletMetadataBuilder putTime(MetadataTime time) { } @Override - public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, long tid) { + public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, FateId fateId) { fetched.add(LOADED); - internalBuilder.putBulkFile(bulkref, tid); + internalBuilder.putBulkFile(bulkref, fateId); return this; } @@ -202,14 +203,14 @@ public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId ecid) } @Override - public TabletMetadataBuilder putCompacted(long fateTxId) { + public TabletMetadataBuilder putCompacted(FateId fateId) { fetched.add(COMPACTED); - internalBuilder.putCompacted(fateTxId); + internalBuilder.putCompacted(fateId); return this; } @Override - public TabletMetadataBuilder deleteCompacted(long fateTxId) { + public TabletMetadataBuilder deleteCompacted(FateId fateId) { throw new UnsupportedOperationException(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index adcd4537e35..49c88570da7 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -28,7 +28,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -191,10 +191,10 @@ public T deleteWal(LogEntry logEntry) { } @Override - public T putBulkFile(ReferencedTabletFile bulkref, long tid) { + public T putBulkFile(ReferencedTabletFile bulkref, FateId fateId) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); mutation.put(BulkFileColumnFamily.NAME, bulkref.insert().getMetadataText(), - new Value(FateTxId.formatTid(tid))); + new Value(fateId.canonical())); return getThis(); } @@ -252,14 +252,14 @@ public T deleteExternalCompaction(ExternalCompactionId ecid) { } @Override - public T putCompacted(long fateTxId) { - mutation.put(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(fateTxId), ""); + public T putCompacted(FateId fateId) { + mutation.put(CompactedColumnFamily.STR_NAME, fateId.canonical(), ""); return getThis(); } @Override - public T deleteCompacted(long fateTxId) { - mutation.putDelete(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(fateTxId)); + public T deleteCompacted(FateId fateId) { + mutation.putDelete(CompactedColumnFamily.STR_NAME, fateId.canonical()); return getThis(); } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java index 86483f2c913..a0a02f3d1c4 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/SelectedFilesTest.java @@ -32,7 +32,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.hadoop.fs.Path; @@ -52,8 +53,9 @@ public class SelectedFilesTest { @Test public void testSerializationDeserialization() { Set files = getStoredTabletFiles(2); + FateId fateId = FateId.from(FateInstanceType.META, 12345L); - SelectedFiles original = new SelectedFiles(files, true, 12345L); + SelectedFiles original = new SelectedFiles(files, true, fateId); String json = original.getMetadataValue(); SelectedFiles deserialized = SelectedFiles.from(json); @@ -68,9 +70,10 @@ public void testSerializationDeserialization() { @Test public void testEqualSerialization() { Set files = getStoredTabletFiles(16); + FateId fateId = FateId.from(FateInstanceType.META, 12345L); - SelectedFiles sf1 = new SelectedFiles(files, true, 12345L); - SelectedFiles sf2 = new SelectedFiles(files, true, 12345L); + SelectedFiles sf1 = new SelectedFiles(files, true, fateId); + SelectedFiles sf2 = new SelectedFiles(files, true, fateId); assertEquals(sf1.getMetadataValue(), sf2.getMetadataValue()); assertEquals(sf1, sf2); @@ -84,13 +87,14 @@ public void testEqualSerialization() { public void testDifferentFilesOrdering() { Set files = getStoredTabletFiles(16); SortedSet sortedFiles = new TreeSet<>(files); + FateId fateId = FateId.from(FateInstanceType.META, 654123L); assertEquals(files, sortedFiles, "Entries in test file sets should be the same"); assertNotEquals(files.toString(), sortedFiles.toString(), "Order of files set should differ for this test case"); - SelectedFiles sf1 = new SelectedFiles(files, false, 654123L); - SelectedFiles sf2 = new SelectedFiles(sortedFiles, false, 654123L); + SelectedFiles sf1 = new SelectedFiles(files, false, fateId); + SelectedFiles sf2 = new SelectedFiles(sortedFiles, false, fateId); assertEquals(sf1.getMetadataValue(), sf2.getMetadataValue()); assertEquals(sf1, sf2); @@ -104,11 +108,12 @@ public void testDifferentFilesOrdering() { public void testJsonSuperSetSubset() { Set filesSuperSet = getStoredTabletFiles(3); Set filesSubSet = new HashSet<>(filesSuperSet); + FateId fateId = FateId.from(FateInstanceType.META, 123456L); // Remove an element to create a subset filesSubSet.remove(filesSubSet.iterator().next()); - SelectedFiles superSetSelectedFiles = new SelectedFiles(filesSuperSet, true, 123456L); - SelectedFiles subSetSelectedFiles = new SelectedFiles(filesSubSet, true, 123456L); + SelectedFiles superSetSelectedFiles = new SelectedFiles(filesSuperSet, true, fateId); + SelectedFiles subSetSelectedFiles = new SelectedFiles(filesSubSet, true, fateId); String superSetJson = superSetSelectedFiles.getMetadataValue(); String subSetJson = subSetSelectedFiles.getMetadataValue(); @@ -128,9 +133,9 @@ public void testJsonSuperSetSubset() { } private static Stream provideTestJsons() { - return Stream.of(Arguments.of("123456", true, 12), Arguments.of("123456", false, 12), - Arguments.of("123456", false, 23), Arguments.of("654321", false, 23), - Arguments.of("AE56E", false, 23)); + return Stream.of(Arguments.of("FATE:META:123456", true, 12), + Arguments.of("FATE:META:123456", false, 12), Arguments.of("FATE:META:123456", false, 23), + Arguments.of("FATE:META:654321", false, 23), Arguments.of("FATE:META:AE56E", false, 23)); } /** @@ -139,14 +144,14 @@ private static Stream provideTestJsons() { */ @ParameterizedTest @MethodSource("provideTestJsons") - public void testJsonStrings(String txid, boolean selAll, int numPaths) { + public void testJsonStrings(FateId fateId, boolean selAll, int numPaths) { List paths = getFilePaths(numPaths); // should be resilient to unordered file arrays Collections.shuffle(paths, RANDOM.get()); // construct a json from the given parameters - String json = getJson(txid, selAll, paths); + String json = getJson(fateId, selAll, paths); System.out.println(json); @@ -154,13 +159,13 @@ public void testJsonStrings(String txid, boolean selAll, int numPaths) { SelectedFiles selectedFiles = SelectedFiles.from(json); // ensure all parts of the SelectedFiles object are correct - assertEquals(Long.parseLong(txid, 16), selectedFiles.getFateTxId()); + assertEquals(fateId, selectedFiles.getFateId()); assertEquals(selAll, selectedFiles.initiallySelectedAll()); Set expectedStoredTabletFiles = filePathsToStoredTabletFiles(paths); assertEquals(expectedStoredTabletFiles, selectedFiles.getFiles()); Collections.sort(paths); - String jsonWithSortedFiles = getJson(txid, selAll, paths); + String jsonWithSortedFiles = getJson(fateId, selAll, paths); assertEquals(jsonWithSortedFiles, selectedFiles.getMetadataValue()); } @@ -170,19 +175,19 @@ public void testJsonStrings(String txid, boolean selAll, int numPaths) { * *
    * {
-   *   "txid": "FATE[123456]",
+   *   "fateId": "FATE:META:123456",
    *   "selAll": true,
    *   "files": ["/path/to/file1.rf", "/path/to/file2.rf"]
    * }
    * 
*/ - private static String getJson(String txid, boolean selAll, List paths) { + private static String getJson(FateId fateId, boolean selAll, List paths) { String filesJsonArray = paths.stream().map(path -> new ReferencedTabletFile(new Path(path)).insert().getMetadata()) .map(path -> path.replace("\"", "\\\"")).map(path -> "'" + path + "'") .collect(Collectors.joining(",")); - return ("{'txid':'" + FateTxId.formatTid(Long.parseLong(txid, 16)) + "','selAll':" + selAll - + ",'files':[" + filesJsonArray + "]}").replace('\'', '\"'); + return ("{'fateId':'" + fateId + "','selAll':" + selAll + ",'files':[" + filesJsonArray + "]}") + .replace('\'', '\"'); } /** diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index a45078edb8d..e3cfc83fa21 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.metadata.schema; import static java.util.stream.Collectors.toSet; -import static org.apache.accumulo.core.fate.FateTxId.formatTid; import static org.apache.accumulo.core.metadata.StoredTabletFile.serialize; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.MergedColumnFamily.MERGED_VALUE; @@ -55,6 +54,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SuspendingTServer; @@ -89,14 +90,18 @@ public void testAllColumns() { Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + FateInstanceType type = FateInstanceType.fromTableId(extent.tableId()); + FateId fateId56L = FateId.from(type, 56L); + FateId fateId59L = FateId.from(type, 59L); + DIRECTORY_COLUMN.put(mutation, new Value("t-0001757")); FLUSH_COLUMN.put(mutation, new Value("6")); TIME_COLUMN.put(mutation, new Value("M123456789")); String bf1 = serialize("hdfs://nn1/acc/tables/1/t-0001/bf1"); String bf2 = serialize("hdfs://nn1/acc/tables/1/t-0001/bf2"); - mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1).put(formatTid(56)); - mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2).put(formatTid(59)); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1).put(fateId56L.canonical()); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2).put(fateId59L.canonical()); mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK"); @@ -136,7 +141,7 @@ public void testAllColumns() { assertEquals(Map.of(tf1, dfv1, tf2, dfv2), tm.getFilesMap()); assertEquals(6L, tm.getFlushId().getAsLong()); assertEquals(rowMap, tm.getKeyValues()); - assertEquals(Map.of(new StoredTabletFile(bf1), 56L, new StoredTabletFile(bf2), 59L), + assertEquals(Map.of(new StoredTabletFile(bf1), fateId56L, new StoredTabletFile(bf2), fateId59L), tm.getLoaded()); assertEquals(HostAndPort.fromParts("server1", 8555), tm.getLocation().getHostAndPort()); assertEquals("s001", tm.getLocation().getSession()); @@ -324,6 +329,8 @@ public void testBuilder() { KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + FateInstanceType type = FateInstanceType.fromTableId(extent.tableId()); + StoredTabletFile sf1 = new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert(); DataFileValue dfv1 = new DataFileValue(89, 67); @@ -344,20 +351,22 @@ public void testBuilder() { TabletMetadata tm = TabletMetadata.builder(extent) .putTabletAvailability(TabletAvailability.UNHOSTED).putLocation(Location.future(ser1)) - .putFile(sf1, dfv1).putFile(sf2, dfv2).putBulkFile(rf1, 25).putBulkFile(rf2, 35) - .putFlushId(27).putDirName("dir1").putScan(sf3).putScan(sf4).putCompacted(17) - .putCompacted(23).build(ECOMP, HOSTING_REQUESTED, MERGED); + .putFile(sf1, dfv1).putFile(sf2, dfv2).putBulkFile(rf1, FateId.from(type, 25)) + .putBulkFile(rf2, FateId.from(type, 35)).putFlushId(27).putDirName("dir1").putScan(sf3) + .putScan(sf4).putCompacted(FateId.from(type, 17)).putCompacted(FateId.from(type, 23)) + .build(ECOMP, HOSTING_REQUESTED, MERGED); assertEquals(extent, tm.getExtent()); assertEquals(TabletAvailability.UNHOSTED, tm.getTabletAvailability()); assertEquals(Location.future(ser1), tm.getLocation()); assertEquals(27L, tm.getFlushId().orElse(-1)); assertEquals(Map.of(sf1, dfv1, sf2, dfv2), tm.getFilesMap()); - assertEquals(Map.of(rf1.insert(), 25L, rf2.insert(), 35L), tm.getLoaded()); + assertEquals(Map.of(rf1.insert(), FateId.from(type, 25L), rf2.insert(), FateId.from(type, 35L)), + tm.getLoaded()); assertEquals("dir1", tm.getDirName()); assertEquals(Set.of(sf3, sf4), Set.copyOf(tm.getScans())); assertEquals(Set.of(), tm.getExternalCompactions().keySet()); - assertEquals(Set.of(17L, 23L), tm.getCompacted()); + assertEquals(Set.of(FateId.from(type, 17L), FateId.from(type, 23L)), tm.getCompacted()); assertFalse(tm.getHostingRequested()); assertFalse(tm.hasMerged()); assertThrows(IllegalStateException.class, tm::getOperationId); @@ -384,13 +393,14 @@ public void testBuilder() { assertThrows(IllegalStateException.class, tm2::getCompacted); var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); - CompactionMetadata ecm = new CompactionMetadata(Set.of(sf1, sf2), rf1, "cid1", - CompactionKind.USER, (short) 3, CompactorGroupIdImpl.groupId("Q1"), true, 99L); + CompactionMetadata ecm = + new CompactionMetadata(Set.of(sf1, sf2), rf1, "cid1", CompactionKind.USER, (short) 3, + CompactorGroupIdImpl.groupId("Q1"), true, FateId.from(type, 99L)); LogEntry le1 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); LogEntry le2 = LogEntry.fromPath("localhost+8020/" + UUID.randomUUID()); - SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, 159L); + SelectedFiles selFiles = new SelectedFiles(Set.of(sf1, sf4), false, FateId.from(type, 159L)); TabletMetadata tm3 = TabletMetadata.builder(extent).putExternalCompaction(ecid1, ecm) .putSuspension(ser1, 45L).putTime(new MetadataTime(479, TimeType.LOGICAL)).putWal(le1) @@ -405,7 +415,7 @@ public void testBuilder() { assertEquals(Stream.of(le1, le2).map(LogEntry::toString).collect(toSet()), tm3.getLogs().stream().map(LogEntry::toString).collect(toSet())); assertEquals(Set.of(sf1, sf4), tm3.getSelectedFiles().getFiles()); - assertEquals(159L, tm3.getSelectedFiles().getFateTxId()); + assertEquals(FateId.from(type, 159L), tm3.getSelectedFiles().getFateId()); assertFalse(tm3.getSelectedFiles().initiallySelectedAll()); assertEquals(selFiles.getMetadataValue(), tm3.getSelectedFiles().getMetadataValue()); assertTrue(tm3.hasMerged()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java index 7821b163b37..da0233a2151 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionConfigStorage.java @@ -33,16 +33,20 @@ import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.UserCompactionUtils; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; -import org.apache.accumulo.core.util.FastFormat; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; +import com.google.common.base.Preconditions; + public class CompactionConfigStorage { + static final String DELIMITER = "-"; - private static String createPath(ServerContext context, long fateTxId) { - String txidString = FastFormat.toHexString(fateTxId); - return context.getZooKeeperRoot() + Constants.ZCOMPACTIONS + "/" + txidString; + private static String createPath(ServerContext context, FateId fateId) { + return context.getZooKeeperRoot() + Constants.ZCOMPACTIONS + "/" + fateId.getType() + DELIMITER + + fateId.getHexTid(); } public static byte[] encodeConfig(CompactionConfig config, TableId tableId) { @@ -57,15 +61,15 @@ public static byte[] encodeConfig(CompactionConfig config, TableId tableId) { } } - public static CompactionConfig getConfig(ServerContext context, long fateTxId) + public static CompactionConfig getConfig(ServerContext context, FateId fateId) throws InterruptedException, KeeperException { - return getConfig(context, fateTxId, tableId -> true); + return getConfig(context, fateId, tableId -> true); } - public static CompactionConfig getConfig(ServerContext context, long fateTxId, + public static CompactionConfig getConfig(ServerContext context, FateId fateId, Predicate tableIdPredicate) throws InterruptedException, KeeperException { try { - byte[] data = context.getZooReaderWriter().getData(createPath(context, fateTxId)); + byte[] data = context.getZooReaderWriter().getData(createPath(context, fateId)); try (ByteArrayInputStream bais = new ByteArrayInputStream(data); DataInputStream dis = new DataInputStream(bais)) { var tableId = TableId.of(dis.readUTF()); @@ -83,29 +87,31 @@ public static CompactionConfig getConfig(ServerContext context, long fateTxId, } } - public static void setConfig(ServerContext context, long fateTxId, byte[] encConfig) + public static void setConfig(ServerContext context, FateId fateId, byte[] encConfig) throws InterruptedException, KeeperException { - context.getZooReaderWriter().putPrivatePersistentData(createPath(context, fateTxId), encConfig, + context.getZooReaderWriter().putPrivatePersistentData(createPath(context, fateId), encConfig, ZooUtil.NodeExistsPolicy.SKIP); } - public static void deleteConfig(ServerContext context, long fateTxId) + public static void deleteConfig(ServerContext context, FateId fateId) throws InterruptedException, KeeperException { - context.getZooReaderWriter().delete(createPath(context, fateTxId)); + context.getZooReaderWriter().delete(createPath(context, fateId)); } - public static Map getAllConfig(ServerContext context, + public static Map getAllConfig(ServerContext context, Predicate tableIdPredicate) throws InterruptedException, KeeperException { - Map configs = new HashMap<>(); + Map configs = new HashMap<>(); var children = context.getZooReaderWriter() .getChildren(context.getZooKeeperRoot() + Constants.ZCOMPACTIONS); for (var child : children) { - var fateTxid = Long.parseLong(child, 16); - var cconf = getConfig(context, fateTxid, tableIdPredicate); + String[] fields = child.split(DELIMITER); + Preconditions.checkState(fields.length == 2, "Unexpected child %s", child); + FateId fateId = FateId.from(FateInstanceType.valueOf(fields[0]), fields[1]); + var cconf = getConfig(context, fateId, tableIdPredicate); if (cconf != null) { - configs.put(fateTxid, cconf); + configs.put(fateId, cconf); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index adc013fd0a4..02e3dc2fca1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.common.ServiceEnvironment; @@ -63,11 +64,11 @@ public class CompactionJobGenerator { private final Cache dispatchers; private final Set serviceIds; private final PluginEnvironment env; - private final Map> allExecutionHints; + private final Map> allExecutionHints; private final Cache,Long> unknownCompactionServiceErrorCache; public CompactionJobGenerator(PluginEnvironment env, - Map> executionHints) { + Map> executionHints) { servicesConfig = new CompactionServicesConfig(env.getConfiguration()); serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) .collect(Collectors.toUnmodifiableSet()); @@ -106,7 +107,7 @@ public Collection generateJobs(TabletMetadata tablet, Set userJobs = Set.of(); if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != null) { - var hints = allExecutionHints.get(tablet.getSelectedFiles().getFateTxId()); + var hints = allExecutionHints.get(tablet.getSelectedFiles().getFateId()); if (hints != null) { CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, hints); userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, hints); diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 4ecba98a0c2..c8d6f9dba67 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -32,7 +32,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.constraints.Constraint; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; @@ -267,7 +267,7 @@ public List check(Environment env, Mutation mutation) { violations = addViolation(violations, 11); } } else if (CompactedColumnFamily.NAME.equals(columnFamily)) { - if (!FateTxId.isFormatedTid(columnQualifier.toString())) { + if (!FateId.isFormattedTid(columnQualifier.toString())) { violations = addViolation(violations, 13); } } else if (columnFamily.equals(BulkFileColumnFamily.NAME)) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java index 6dbcee6d035..197b9feca6f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@ -34,10 +34,12 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.accumulo.core.data.AbstractId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; @@ -69,7 +71,7 @@ public class TabletManagementParameters { private final Supplier> resourceGroups; private final Map> tserverGroups; - private final Map> compactionHints; + private final Map> compactionHints; private final Set onlineTservers; private final boolean canSuspendTablets; private final Map volumeReplacements; @@ -78,7 +80,7 @@ public TabletManagementParameters(ManagerState managerState, Map parentUpgradeMap, Set onlineTables, LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot, Set serversToShutdown, Map migrations, - Ample.DataLevel level, Map> compactionHints, + Ample.DataLevel level, Map> compactionHints, boolean canSuspendTablets, Map volumeReplacements) { this.managerState = managerState; this.parentUpgradeMap = Map.copyOf(parentUpgradeMap); @@ -115,7 +117,8 @@ private TabletManagementParameters(JsonData jdata) { .collect(toUnmodifiableMap(entry -> JsonData.strToExtent(entry.getKey()), entry -> new TServerInstance(entry.getValue()))); this.level = jdata.level; - this.compactionHints = makeImmutable(jdata.compactionHints); + this.compactionHints = makeImmutable(jdata.compactionHints.entrySet().stream() + .collect(Collectors.toMap(entry -> FateId.from(entry.getKey()), Map.Entry::getValue))); this.tserverGroups = jdata.tserverGroups.entrySet().stream().collect(toUnmodifiableMap( Map.Entry::getKey, entry -> entry.getValue().stream().map(TServerInstance::new).collect(toUnmodifiableSet()))); @@ -173,7 +176,7 @@ public Set getOnlineTables() { return onlineTables; } - public Map> getCompactionHints() { + public Map> getCompactionHints() { return compactionHints; } @@ -185,9 +188,9 @@ public Map getVolumeReplacements() { return volumeReplacements; } - private static Map> - makeImmutable(Map> compactionHints) { - var copy = new HashMap>(); + private static Map> + makeImmutable(Map> compactionHints) { + var copy = new HashMap>(); compactionHints.forEach((ftxid, hints) -> copy.put(ftxid, Map.copyOf(hints))); return Collections.unmodifiableMap(copy); } @@ -205,7 +208,7 @@ private static class JsonData { final Map> tserverGroups; - final Map> compactionHints; + final Map> compactionHints; final boolean canSuspendTablets; final Map volumeReplacements; @@ -248,7 +251,8 @@ private static KeyExtent strToExtent(String kes) { tserverGroups = params.getGroupedTServers().entrySet().stream() .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().stream() .map(TServerInstance::getHostPortSession).collect(toSet()))); - compactionHints = params.compactionHints; + compactionHints = params.compactionHints.entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().canonical(), Map.Entry::getValue)); canSuspendTablets = params.canSuspendTablets; volumeReplacements = params.volumeReplacements; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index a96a2de58ad..bd35c007133 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Condition; import org.apache.accumulo.core.data.ConditionalMutation; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.iterators.SortedFilesIterator; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; @@ -212,7 +211,7 @@ private void requireSameSingle(TabletMetadata tabletMetadata, ColumnType type) { break; case COMPACTED: { Condition c = SetEqualityIterator.createCondition(tabletMetadata.getCompacted(), - ftid -> FateTxId.formatTid(ftid).getBytes(UTF_8), CompactedColumnFamily.NAME); + fTid -> fTid.canonical().getBytes(UTF_8), CompactedColumnFamily.NAME); mutation.addCondition(c); } break; diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java index 9fe2d7214db..12419be290d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java @@ -42,7 +42,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.gc.GcCandidate; import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -160,12 +160,12 @@ public void putGcFileAndDirCandidates(TableId tableId, Collection } @Override - public void addBulkLoadInProgressFlag(String path, long fateTxid) { + public void addBulkLoadInProgressFlag(String path, FateId fateId) { // Bulk Import operations are not supported on the metadata table, so no entries will ever be // required on the root table. Mutation m = new Mutation(BlipSection.getRowPrefix() + path); - m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(FateTxId.formatTid(fateTxid))); + m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(fateId.canonical())); try (BatchWriter bw = context.createBatchWriter(AccumuloTable.METADATA.tableName())) { bw.addMutation(m); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index eb3f39db54c..f87805d092c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -59,6 +59,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -135,16 +136,16 @@ private static void logUpdateFailure(Mutation m, KeyExtent extent, Exception e) log.error("Failed to write metadata updates for extent {} {}", extent, m.prettyPrint(), e); } - public static Map updateTabletDataFile(long tid, KeyExtent extent, - Map estSizes, MetadataTime time, ServerContext context, - ServiceLock zooLock) { + public static Map updateTabletDataFile(FateId fateId, + KeyExtent extent, Map estSizes, MetadataTime time, + ServerContext context, ServiceLock zooLock) { TabletMutator tablet = context.getAmple().mutateTablet(extent); tablet.putTime(time); Map newFiles = new HashMap<>(estSizes.size()); estSizes.forEach((tf, dfv) -> { tablet.putFile(tf, dfv); - tablet.putBulkFile(tf, tid); + tablet.putBulkFile(tf, fateId); newFiles.put(tf.insert(), dfv); }); tablet.putZooLock(context.getZooKeeperRoot(), zooLock); diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index 78f98be69e2..a738e2fc3cd 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -31,7 +31,8 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -140,13 +141,15 @@ public void testBulkFileCheck() { MetadataConstraints mc = new MetadataConstraints(); Mutation m; List violations; + FateId fateId5L = FateId.from(FateInstanceType.META, 5L); + FateId fateId7L = FateId.from(FateInstanceType.META, 7L); // loaded marker w/ file m = new Mutation(new Text("0;foo")); m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); m.put( DataFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), @@ -159,7 +162,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 8); // two files w/ same txid @@ -167,7 +170,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); m.put( DataFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), @@ -175,7 +178,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile2")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); m.put( DataFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile2")).getMetadataText(), @@ -188,7 +191,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); m.put( DataFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), @@ -196,7 +199,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile2")).getMetadataText(), - new Value("7")); + new Value(fateId7L.canonical())); m.put( DataFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile2")).getMetadataText(), @@ -208,7 +211,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); m.put( DataFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), @@ -216,7 +219,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile2")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 8); // mutation that looks like split @@ -224,7 +227,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value("/t1")); violations = mc.check(createEnv(), m); assertNull(violations); @@ -234,7 +237,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); m.put(CurrentLocationColumnFamily.NAME, new Text("789"), new Value("127.0.0.1:9997")); violations = mc.check(createEnv(), m); assertNull(violations); @@ -252,7 +255,7 @@ public void testBulkFileCheck() { new Text(StoredTabletFile.of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")) .getMetadata() .replace("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile", "/someFile")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Missing tables directory in path @@ -261,20 +264,20 @@ public void testBulkFileCheck() { new Text(StoredTabletFile.of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")) .getMetadata().replace("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile", "hdfs://1.2.3.4/accumulo/2a/t-0003/someFile")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); m = new Mutation(new Text("0;foo")); m.put( BulkFileColumnFamily.NAME, StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")).getMetadataText(), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 8); // Bad Json - only path (old format) so should fail parsing m = new Mutation(new Text("0;foo")); m.put(BulkFileColumnFamily.NAME, new Text("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile"), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Bad Json - test startRow key is missing so validation should fail @@ -283,7 +286,7 @@ public void testBulkFileCheck() { m.put(BulkFileColumnFamily.NAME, new Text( "{\"path\":\"hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile\",\"endRow\":\"\"}"), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Bad Json - test path key replaced with empty string so validation should fail @@ -292,7 +295,7 @@ public void testBulkFileCheck() { m.put( BulkFileColumnFamily.NAME, new Text(StoredTabletFile .serialize("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile").replace("path", "")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Bad Json - test path value missing @@ -301,7 +304,7 @@ BulkFileColumnFamily.NAME, new Text(StoredTabletFile m.put(BulkFileColumnFamily.NAME, new Text(StoredTabletFile.of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile")) .getMetadata().replaceFirst("\"path\":\".*\",\"startRow", "\"path\":\"\",\"startRow")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Bad Json - test startRow key replaced with empty string so validation should fail @@ -309,7 +312,7 @@ BulkFileColumnFamily.NAME, new Text(StoredTabletFile m = new Mutation(new Text("0;foo")); m.put(BulkFileColumnFamily.NAME, new Text(StoredTabletFile .serialize("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile").replace("startRow", "")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Bad Json - test endRow key missing so validation should fail @@ -317,7 +320,7 @@ BulkFileColumnFamily.NAME, new Text(StoredTabletFile m.put( BulkFileColumnFamily.NAME, new Text(StoredTabletFile .serialize("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile").replace("endRow", "")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); // Bad Json - endRow will be replaced with encoded row without the exclusive byte 0x00 which is @@ -328,7 +331,7 @@ BulkFileColumnFamily.NAME, new Text(StoredTabletFile .of(new Path("hdfs://1.2.3.4/accumulo/tables/2a/t-0003/someFile"), new Range("a", "b")) .getMetadata().replaceFirst("\"endRow\":\".*\"", "\"endRow\":\"" + encodeRowForMetadata("bad") + "\"")), - new Value("5")); + new Value(fateId5L.canonical())); assertViolation(mc, m, (short) 12); } @@ -469,6 +472,7 @@ public void testSelectedFiles() { MetadataConstraints mc = new MetadataConstraints(); Mutation m; List violations; + FateId fateId = FateId.from(FateInstanceType.META, 42L); m = new Mutation(new Text("0;foo")); ServerColumnFamily.SELECTED_COLUMN.put(m, new Value("bad id")); @@ -481,7 +485,7 @@ public void testSelectedFiles() { ServerColumnFamily.SELECTED_COLUMN.put(m, new Value(new SelectedFiles(Set.of(new ReferencedTabletFile( new Path("hdfs://nn.somewhere.com:86753/accumulo/tables/42/t-0000/F00001.rf")) - .insert()), true, 42L).getMetadataValue())); + .insert()), true, fateId).getMetadataValue())); violations = mc.check(createEnv(), m); assertNull(violations); } @@ -491,9 +495,10 @@ public void testCompacted() { MetadataConstraints mc = new MetadataConstraints(); Mutation m; List violations; + FateId fateId = FateId.from(FateInstanceType.META, 45L); m = new Mutation(new Text("0;foo")); - m.put(CompactedColumnFamily.STR_NAME, FateTxId.formatTid(45), ""); + m.put(CompactedColumnFamily.STR_NAME, fateId.canonical(), ""); violations = mc.check(createEnv(), m); assertNull(violations); diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java index 101157c44ba..ff6a2fc940b 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementParametersTest.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; @@ -50,7 +51,7 @@ public void testDeSer() { final Set serversToShutdown = Set.of(); final Map migrations = Map.of(); final Ample.DataLevel dataLevel = Ample.DataLevel.USER; - final Map> compactionHints = Map.of(); + final Map> compactionHints = Map.of(); final boolean canSuspendTablets = true; final Map replacements = Map.of(new Path("file:/vol1/accumulo/inst_id"), new Path("file:/vol2/accumulo/inst_id")); diff --git a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java index caa9c926f8b..49729c6d4b5 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/manager/state/TabletManagementTest.java @@ -35,7 +35,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; @@ -71,6 +72,10 @@ private SortedMap createMetadataEntryKV(KeyExtent extent) { Mutation mutation = TabletColumnFamily.createPrevRowMutation(extent); + FateInstanceType type = FateInstanceType.fromTableId(extent.tableId()); + FateId fateId56L = FateId.from(type, 56L); + FateId fateId59L = FateId.from(type, 59L); + DIRECTORY_COLUMN.put(mutation, new Value("t-0001757")); FLUSH_COLUMN.put(mutation, new Value("6")); TIME_COLUMN.put(mutation, new Value("M123456789")); @@ -80,9 +85,9 @@ private SortedMap createMetadataEntryKV(KeyExtent extent) { StoredTabletFile bf2 = new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/bf2")).insert(); mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf1.getMetadata()) - .put(FateTxId.formatTid(56)); + .put(fateId56L.canonical()); mutation.at().family(BulkFileColumnFamily.NAME).qualifier(bf2.getMetadata()) - .put(FateTxId.formatTid(59)); + .put(fateId59L.canonical()); mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK"); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 6a234824dbd..c626fcb24db 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -62,6 +62,8 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -211,7 +213,13 @@ protected void checkIfCanceled() { if (job.getKind() == TCompactionKind.USER) { - var cconf = CompactionConfigStorage.getConfig(getContext(), job.getFateTxId()); + // ELASTICITY_TODO DEFERRED - ISSUE 4044: TExternalCompactionJob.getFateTxId should be + // changed to + // TExternalCompactionJob.getFateId and return the FateId + FateInstanceType type = + FateInstanceType.fromTableId(KeyExtent.fromThrift(job.getExtent()).tableId()); + FateId fateId = FateId.from(type, job.getFateTxId()); + var cconf = CompactionConfigStorage.getConfig(getContext(), fateId); if (cconf == null) { LOG.info("Cancelling compaction {} for user compaction that no longer exists {} {}", diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 09b40386bb1..8b3d8378713 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -73,6 +73,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateCleaner; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ZooStore; @@ -246,8 +247,8 @@ public synchronized ManagerState getManagerState() { // retrieve information about compactions in that data level. Attempted this and a lot of // refactoring was needed to get that small bit of information to this method. Would be best to // address this after issue. May be best to attempt this after #3576. - public Map> getCompactionHints() { - Map allConfig = null; + public Map> getCompactionHints() { + Map allConfig = null; try { allConfig = CompactionConfigStorage.getAllConfig(getContext(), tableId -> true); } catch (InterruptedException | KeeperException e) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index ec6ee9a42d6..76af05395ca 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; @@ -161,7 +162,7 @@ public class CompactionCoordinator private final ScheduledThreadPoolExecutor schedExecutor; private final Cache completed; - private final LoadingCache compactionConfigCache; + private final LoadingCache compactionConfigCache; private final Cache tabletDirCache; private final DeadCompactionDetector deadCompactionDetector; @@ -183,8 +184,8 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); - CacheLoader loader = - txid -> CompactionConfigStorage.getConfig(ctx, txid); + CacheLoader loader = + fateId -> CompactionConfigStorage.getConfig(ctx, fateId); // Keep a small short lived cache of compaction config. Compaction config never changes, however // when a compaction is canceled it is deleted which is why there is a time limit. It does not @@ -455,7 +456,7 @@ protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, ExternalCompactionId externalCompactionId) { boolean propDels; - Long fateTxId = null; + FateId fateId = null; switch (job.getKind()) { case SYSTEM: { @@ -468,7 +469,7 @@ protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll() && tablet.getSelectedFiles().getFiles().equals(jobFiles); propDels = !compactingAll; - fateTxId = tablet.getSelectedFiles().getFateTxId(); + fateId = tablet.getSelectedFiles().getFateId(); } break; default: @@ -480,7 +481,7 @@ protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, tablet, directoryCreator, externalCompactionId); return new CompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), - job.getPriority(), job.getGroup(), propDels, fateTxId); + job.getPriority(), job.getGroup(), propDels, fateId); } @@ -565,15 +566,17 @@ protected TExternalCompactionJob createThriftJob(String externalCompactionId, dfv.getTime()); }).collect(Collectors.toList()); - long fateTxid = 0; + FateInstanceType type = FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId()); + FateId fateId = FateId.from(type, 0); if (metaJob.getJob().getKind() == CompactionKind.USER) { - fateTxid = metaJob.getTabletMetadata().getSelectedFiles().getFateTxId(); + fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId(); } + // ELASTICITY_TODO DEFERRED - ISSUE 4044 return new TExternalCompactionJob(externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides); + TCompactionKind.valueOf(ecm.getKind().name()), fateId.getTid(), overrides); } @Override @@ -598,7 +601,7 @@ private Optional getCompactionConfig(CompactionJobQueues.MetaJ if (metaJob.getJob().getKind() == CompactionKind.USER && metaJob.getTabletMetadata().getSelectedFiles() != null) { var cconf = - compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateTxId()); + compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId()); return Optional.ofNullable(cconf); } return Optional.empty(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index ca80bb8d2eb..477a0403305 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.ReferencedTabletFile; @@ -164,18 +163,18 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio // all files selected for the user compactions are finished, so the tablet is finish and // its compaction id needs to be updated. - long fateTxId = tablet.getSelectedFiles().getFateTxId(); + FateId fateId = tablet.getSelectedFiles().getFateId(); - Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId), + Preconditions.checkArgument(!tablet.getCompacted().contains(fateId), "Tablet %s unexpected has selected files and compacted columns for %s", - tablet.getExtent(), fateTxId); + tablet.getExtent(), fateId); // TODO set to trace - LOG.debug("All selected files compcated for {} setting compacted for {}", - tablet.getExtent(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + LOG.debug("All selected files compacted for {} setting compacted for {}", + tablet.getExtent(), tablet.getSelectedFiles().getFateId()); tabletMutator.deleteSelectedFiles(); - tabletMutator.putCompacted(fateTxId); + tabletMutator.putCompacted(fateId); } else { // not all of the selected files were finished, so need to add the new file to the @@ -200,7 +199,7 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio tabletMutator.putSelectedFiles( new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(), - tablet.getSelectedFiles().getFateTxId())); + tablet.getSelectedFiles().getFateId())); } } @@ -252,12 +251,11 @@ public static boolean canCommitCompaction(ExternalCompactionId ecid, return false; } - if (ecm.getFateTxId() != tabletMetadata.getSelectedFiles().getFateTxId()) { + if (!ecm.getFateId().equals(tabletMetadata.getSelectedFiles().getFateId())) { // maybe the compaction was cancled and another user compaction was started on the tablet. LOG.debug( "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}", - ecid, extent, FateTxId.formatTid(ecm.getFateTxId()), - FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); + ecid, extent, ecm.getFateId(), tabletMetadata.getSelectedFiles().getFateId()); } if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index eb78e48ce83..33cf61a0cc3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@ -93,9 +93,8 @@ public Repo call(FateId fateId, Manager manager) throws Exception { */ private void moveFiles(FateId fateId, Path sourceDir, Path bulkDir, Manager manager, final VolumeManager fs, Map renames) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 manager.getContext().getAmple().addBulkLoadInProgressFlag( - "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), fateId.getTid()); + "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), fateId); AccumuloConfiguration aConf = manager.getConfiguration(); int workerCount = aConf.getCount(Property.MANAGER_RENAME_THREADS); Map oldToNewMap = new HashMap<>(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index 88ca476ee30..c1849d25dd5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@ -108,13 +108,12 @@ private static void removeBulkLoadEntries(Ample ample, TableId tableId, FateId f var tabletsMutator = ample.conditionallyMutateTablets()) { for (var tablet : tablets) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - if (tablet.getLoaded().values().stream().anyMatch(l -> l == fateId.getTid())) { + if (tablet.getLoaded().values().stream() + .anyMatch(loadedFateId -> loadedFateId.equals(fateId))) { var tabletMutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); - tablet.getLoaded().entrySet().stream() - .filter(entry -> entry.getValue() == fateId.getTid()).map(Map.Entry::getKey) - .forEach(tabletMutator::deleteBulkFile); + tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue().equals(fateId)) + .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile); tabletMutator.submit(tm -> false); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 682bbbb3548..4cb42f01cd8 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -197,8 +197,7 @@ void load(List tablets, Files files) { .requireAbsentOperation().requireSame(tablet, LOADED, TIME, LOCATION); filesToLoad.forEach((f, v) -> { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - tabletMutator.putBulkFile(f, fateId.getTid()); + tabletMutator.putBulkFile(f, fateId); tabletMutator.putFile(f, v); }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java index c01e50114ff..3871c3cdf5d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java @@ -55,7 +55,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { // ELASTICITY_TODO DEFERRED - ISSUE 4044 TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit, - tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId.getTid())); + tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId)); return new CleanUpBulkImport(bulkInfo); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java index 0c263605b2e..b74875fda68 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java @@ -80,11 +80,10 @@ public long isReady(FateId fateId, Manager manager) throws Exception { t1 = System.nanoTime(); for (TabletMetadata tablet : tablets) { total++; - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - if (tablet.getCompacted().contains(fateId.getTid())) { + if (tablet.getCompacted().contains(fateId)) { tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, COMPACTED).deleteCompacted(fateId.getTid()) - .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(fateId.getTid())); + .requireSame(tablet, COMPACTED).deleteCompacted(fateId) + .submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(fateId)); submitted++; } } @@ -108,8 +107,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception { @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - CompactionConfigStorage.deleteConfig(manager.getContext(), fateId.getTid()); + CompactionConfigStorage.deleteConfig(manager.getContext(), fateId); Utils.getReadLock(manager, tableId, fateId).unlock(); Utils.getReadLock(manager, namespaceId, fateId).unlock(); return null; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java index f4cb4dbe425..0a813e5014e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactRange.java @@ -80,16 +80,14 @@ public long isReady(FateId fateId, Manager env) throws Exception { @Override public Repo call(final FateId fateId, Manager env) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - CompactionConfigStorage.setConfig(env.getContext(), fateId.getTid(), config); + CompactionConfigStorage.setConfig(env.getContext(), fateId, config); return new CompactionDriver(namespaceId, tableId, startRow, endRow); } @Override public void undo(FateId fateId, Manager env) throws Exception { try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - CompactionConfigStorage.deleteConfig(env.getContext(), fateId.getTid()); + CompactionConfigStorage.deleteConfig(env.getContext(), fateId); } finally { Utils.unreserveNamespace(env, namespaceId, fateId, false); Utils.unreserveTable(env, tableId, fateId, false); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index fa7e4d31e06..9c2ca4c2de4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -43,7 +43,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AbstractTabletFile; @@ -131,8 +130,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception { private boolean isCancelled(FateId fateId, ServerContext context) throws InterruptedException, KeeperException { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - return CompactionConfigStorage.getConfig(context, fateId.getTid()) == null; + return CompactionConfigStorage.getConfig(context, fateId) == null; } public int updateAndCheckTablets(Manager manager, FateId fateId) @@ -168,16 +166,13 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) .fetch(PREV_ROW, COMPACTED, FILES, SELECTED, ECOMP, OPID).checkConsistency().build(); var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - CompactionConfig config = - CompactionConfigStorage.getConfig(manager.getContext(), fateId.getTid()); + CompactionConfig config = CompactionConfigStorage.getConfig(manager.getContext(), fateId); for (TabletMetadata tablet : tablets) { total++; - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - if (tablet.getCompacted().contains(fateId.getTid())) { + if (tablet.getCompacted().contains(fateId)) { // this tablet is already considered done log.trace("{} compaction for {} is complete", fateId, tablet.getExtent()); complete++; @@ -189,10 +184,9 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) log.trace("{} tablet {} has no files, attempting to mark as compacted ", fateId, tablet.getExtent()); // this tablet has no files try to mark it as done - // ELASTICITY_TODO DEFERRED - ISSUE 4044 tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, FILES, COMPACTED).putCompacted(fateId.getTid()) - .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId.getTid())); + .requireSame(tablet, FILES, COMPACTED).putCompacted(fateId) + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId)); noFiles++; } else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) { // there are no selected files @@ -221,19 +215,16 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) } if (filesToCompact.isEmpty()) { // no files were selected so mark the tablet as compacted - // ELASTICITY_TODO DEFERRED - ISSUE 4044 tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() - .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED) - .putCompacted(fateId.getTid()) - .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId.getTid())); + .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(fateId) + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId)); noneSelected++; } else { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var selectedFiles = new SelectedFiles(filesToCompact, - tablet.getFiles().equals(filesToCompact), fateId.getTid()); + var selectedFiles = + new SelectedFiles(filesToCompact, tablet.getFiles().equals(filesToCompact), fateId); mutator.putSelectedFiles(selectedFiles); @@ -253,8 +244,7 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) } } else if (tablet.getSelectedFiles() != null) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - if (tablet.getSelectedFiles().getFateTxId() == fateId.getTid()) { + if (tablet.getSelectedFiles().getFateId().equals(fateId)) { log.trace( "{} tablet {} already has {} selected files for this compaction, waiting for them be processed", fateId, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size()); @@ -263,7 +253,7 @@ public int updateAndCheckTablets(Manager manager, FateId fateId) log.trace( "{} tablet {} already has {} selected files by another compaction {}, waiting for them be processed", fateId, tablet.getExtent(), tablet.getSelectedFiles().getFiles().size(), - FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + tablet.getSelectedFiles().getFateId()); otherSelected++; } } else { @@ -338,11 +328,10 @@ private void cleanupTabletMetadata(FateId fateId, Manager manager) throws Except var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) .fetch(PREV_ROW, COMPACTED, SELECTED).checkConsistency().build(); var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 Predicate needsUpdate = tabletMetadata -> (tabletMetadata.getSelectedFiles() != null - && tabletMetadata.getSelectedFiles().getFateTxId() == fateId.getTid()) - || tabletMetadata.getCompacted().contains(fateId.getTid()); + && tabletMetadata.getSelectedFiles().getFateId().equals(fateId)) + || tabletMetadata.getCompacted().contains(fateId); Predicate needsNoUpdate = needsUpdate.negate(); for (TabletMetadata tablet : tablets) { @@ -350,15 +339,13 @@ private void cleanupTabletMetadata(FateId fateId, Manager manager) throws Except if (needsUpdate.test(tablet)) { var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() .requireSame(tablet, COMPACTED, SELECTED); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 if (tablet.getSelectedFiles() != null - && tablet.getSelectedFiles().getFateTxId() == fateId.getTid()) { + && tablet.getSelectedFiles().getFateId().equals(fateId)) { mutator.deleteSelectedFiles(); } - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - if (tablet.getCompacted().contains(fateId.getTid())) { - mutator.deleteCompacted(fateId.getTid()); + if (tablet.getCompacted().contains(fateId)) { + mutator.deleteCompacted(fateId); } mutator.submit(needsNoUpdate::test); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java index d156545b8ae..4899fec8a5f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/cancel/CancelCompactions.java @@ -22,7 +22,6 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; @@ -53,12 +52,11 @@ public long isReady(FateId fateId, Manager env) throws Exception { @Override public Repo call(FateId fateId, Manager environment) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 var idsToCancel = CompactionConfigStorage.getAllConfig(environment.getContext(), tableId::equals).keySet(); for (var idToCancel : idsToCancel) { - log.debug("{} deleting compaction config {}", fateId, FateTxId.formatTid(idToCancel)); + log.debug("{} deleting compaction config {}", fateId, idToCancel); CompactionConfigStorage.deleteConfig(environment.getContext(), idToCancel); } return new FinishCancelCompaction(namespaceId, tableId); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index b3d7a85cf78..27c5c39709a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; @@ -188,10 +187,8 @@ private void addNewTablets(FateId fateId, Manager manager, TabletMetadata tablet mutator.putPrevEndRow(newExtent.prevEndRow()); tabletMetadata.getCompacted().forEach(mutator::putCompacted); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - tabletMetadata.getCompacted() - .forEach(ctid -> log.debug("{} copying compacted marker to new child tablet {}", fateId, - FateTxId.formatTid(ctid))); + tabletMetadata.getCompacted().forEach(compactedFateId -> log + .debug("{} copying compacted marker to new child tablet {}", fateId, compactedFateId)); mutator.putTabletAvailability(tabletMetadata.getTabletAvailability()); @@ -244,7 +241,7 @@ private void updateExistingTablet(FateId fateId, Manager manager, TabletMetadata if (tabletMetadata.getSelectedFiles() != null) { mutator.deleteSelectedFiles(); log.debug("{} deleting selected files {} because of split", fateId, - FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); + tabletMetadata.getSelectedFiles().getFateId()); } mutator.submit(tm -> false); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/SplitRecovery12to13.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/SplitRecovery12to13.java index 418bf173775..03e4180f8c3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/SplitRecovery12to13.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/SplitRecovery12to13.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -69,7 +70,7 @@ public class SplitRecovery12to13 { public static void addNewTablet(ServerContext context, KeyExtent extent, String dirName, TServerInstance tServerInstance, Map datafileSizes, - Map> bulkLoadedFiles, MetadataTime time, + Map> bulkLoadedFiles, MetadataTime time, long lastFlushID) { TabletMutator tablet = context.getAmple().mutateTablet(extent); @@ -88,7 +89,7 @@ public static void addNewTablet(ServerContext context, KeyExtent extent, String datafileSizes.forEach((key, value) -> tablet.putFile(key, value)); - for (Entry> entry : bulkLoadedFiles + for (Entry> entry : bulkLoadedFiles .entrySet()) { for (ReferencedTabletFile ref : entry.getValue()) { tablet.putBulkFile(ref, entry.getKey()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 973a369c96e..64cc5d03ffb 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.CompactableFileImpl; @@ -165,9 +166,11 @@ protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactor protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, Set jobFiles, TabletMetadata tablet, String compactorAddress, ExternalCompactionId externalCompactionId) { + FateInstanceType type = FateInstanceType.fromTableId(tablet.getTableId()); + FateId fateId = FateId.from(type, 1L); return new CompactionMetadata(jobFiles, new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), - compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), true, 1L); + compactorAddress, job.getKind(), job.getPriority(), job.getGroup(), true, fateId); } @Override @@ -270,6 +273,7 @@ public void testGetCompactionJob() throws Exception { TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); + expect(tm.getTableId()).andReturn(ke.tableId()); EasyMock.replay(tconf, context, creds, tm, security); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java index 9c6624d9e85..d5481bbab87 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateMutatorImplIT.java @@ -73,7 +73,7 @@ public void putRepo() throws Exception { ClientContext context = (ClientContext) client; final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; - FateId fateId = FateId.from(FateInstanceType.USER, tid); + FateId fateId = FateId.from(FateInstanceType.fromNamespaceOrTableName(table), tid); // add some repos in order FateMutatorImpl fateMutator = new FateMutatorImpl<>(context, table, fateId); @@ -104,7 +104,7 @@ public void requireStatus() throws Exception { ClientContext context = (ClientContext) client; final long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; - FateId fateId = FateId.from(FateInstanceType.USER, tid); + FateId fateId = FateId.from(FateInstanceType.fromNamespaceOrTableName(table), tid); // use require status passing all statuses. without the status column present this should fail assertThrows(IllegalStateException.class, diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index df5841228de..892a507530a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -66,7 +66,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iterators.user.GcWalsFilter; import org.apache.accumulo.core.iterators.user.HasCurrentFilter; import org.apache.accumulo.core.iterators.user.TabletMetadataFilter; @@ -316,8 +317,10 @@ public void testFiles() throws Exception { .of(new Path("hdfs://localhost:8020/accumulo/tables/2a/b-0000009/I0000074.rf")); ctmi = new ConditionalTabletsMutatorImpl(context); var tm6 = TabletMetadata.builder(e1).build(LOADED); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId = FateId.from(type, 9L); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED) - .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), 9L) + .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), fateId) .putFile(stf5, new DataFileValue(0, 0)).submit(tm -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); @@ -340,7 +343,7 @@ public void testFiles() throws Exception { // simulate trying to re bulk import file after a compaction ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm6, LOADED) - .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), 9L) + .putFile(stf5, new DataFileValue(0, 0)).putBulkFile(stf5.getTabletFile(), fateId) .submit(tm -> false); results = ctmi.process(); assertEquals(Status.REJECTED, results.get(e1).getStatus()); @@ -459,8 +462,10 @@ public void testSelectedFiles() throws Exception { assertEquals(Set.of(stf1, stf2, stf3), context.getAmple().readTablet(e1).getFiles()); ctmi = new ConditionalTabletsMutatorImpl(context); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId2L = FateId.from(type, 2L); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm1, FILES, SELECTED) - .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 2L)) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId2L)) .submit(tm -> false); results = ctmi.process(); assertEquals(Status.REJECTED, results.get(e1).getStatus()); @@ -472,7 +477,7 @@ public void testSelectedFiles() throws Exception { .build(SELECTED); ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, FILES, SELECTED) - .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 2L)) + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId2L)) .submit(tm -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); @@ -484,11 +489,12 @@ public void testSelectedFiles() throws Exception { // a list of selected files objects that are not the same as the current tablet and expected to // fail var expectedToFail = new ArrayList(); + FateId fateId3L = FateId.from(type, 3L); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, 2L)); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, 2L)); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, 2L)); - expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 3L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2), true, fateId2L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3, stf4), true, fateId2L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), false, fateId2L)); + expectedToFail.add(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId3L)); for (var selectedFiles : expectedToFail) { var tm3 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) @@ -505,7 +511,7 @@ public void testSelectedFiles() throws Exception { } var tm5 = TabletMetadata.builder(e1).putFile(stf1, dfv).putFile(stf2, dfv).putFile(stf3, dfv) - .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, 2L)).build(); + .putSelectedFiles(new SelectedFiles(Set.of(stf1, stf2, stf3), true, fateId2L)).build(); ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm5, FILES, SELECTED) .deleteSelectedFiles().submit(tm -> false); @@ -532,9 +538,10 @@ public void testSelectedFilesReordering() throws Exception { final Set storedTabletFiles = Set.of(stf1, stf2, stf3); final boolean initiallySelectedAll = true; - final long fateTxId = 2L; + final FateInstanceType type = FateInstanceType.fromTableId(tid); + final FateId fateId = FateId.from(type, 2L); final SelectedFiles selectedFiles = - new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateTxId); + new SelectedFiles(storedTabletFiles, initiallySelectedAll, fateId); ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); @@ -574,13 +581,13 @@ public void testSelectedFilesReordering() throws Exception { .sorted().collect(Collectors.toList()); // verify we have the format of the json correct - String newJson = createSelectedFilesJson(fateTxId, initiallySelectedAll, filesPathList); + String newJson = createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList); assertEquals(actualMetadataValue, newJson, "Test json should be identical to actual metadata at this point"); // reverse the order of the files and create a new json Collections.reverse(filesPathList); - newJson = createSelectedFilesJson(fateTxId, initiallySelectedAll, filesPathList); + newJson = createSelectedFilesJson(fateId, initiallySelectedAll, filesPathList); assertNotEquals(actualMetadataValue, newJson, "Test json should have reverse file order of actual metadata"); @@ -624,18 +631,17 @@ public void testSelectedFilesReordering() throws Exception { * *
    * {
-   *   "txid": "FATE[123456]",
+   *   "txid": "FATE:META:123456",
    *   "selAll": true,
    *   "files": ["/path/to/file1.rf", "/path/to/file2.rf"]
    * }
    * 
*/ - public static String createSelectedFilesJson(Long txid, boolean selAll, + public static String createSelectedFilesJson(FateId fateId, boolean selAll, Collection paths) { String filesJsonArray = GSON.get().toJson(paths); - String formattedTxid = FateTxId.formatTid(Long.parseLong(Long.toString(txid), 16)); - return ("{'txid':'" + formattedTxid + "','selAll':" + selAll + ",'files':" + filesJsonArray - + "}").replace('\'', '\"'); + return ("{'txid':'" + fateId + "','selAll':" + selAll + ",'files':" + filesJsonArray + "}") + .replace('\'', '\"'); } @Test @@ -745,58 +751,70 @@ public void testCompacted() { var ctmi = new ConditionalTabletsMutatorImpl(context); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId45L = FateId.from(type, 45L); + FateId fateId55L = FateId.from(type, 55L); + FateId fateId56L = FateId.from(type, 56L); + FateId fateId65L = FateId.from(type, 65L); + FateId fateId75L = FateId.from(type, 75L); + var tabletMeta1 = TabletMetadata.builder(e1).build(COMPACTED); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) - .putCompacted(55L).submit(tabletMetadata -> tabletMetadata.getCompacted().contains(55L)); - var tabletMeta2 = TabletMetadata.builder(e2).putCompacted(45L).build(COMPACTED); + .putCompacted(fateId55L) + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId55L)); + var tabletMeta2 = TabletMetadata.builder(e2).putCompacted(fateId45L).build(COMPACTED); ctmi.mutateTablet(e2).requireAbsentOperation().requireSame(tabletMeta2, COMPACTED) - .putCompacted(56L).submit(tabletMetadata -> tabletMetadata.getCompacted().contains(56L)); + .putCompacted(fateId56L) + .submit(tabletMetadata -> tabletMetadata.getCompacted().contains(fateId56L)); var results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); assertEquals(Status.REJECTED, results.get(e2).getStatus()); tabletMeta1 = context.getAmple().readTablet(e1); - assertEquals(Set.of(55L), tabletMeta1.getCompacted()); + assertEquals(Set.of(fateId55L), tabletMeta1.getCompacted()); assertEquals(Set.of(), context.getAmple().readTablet(e2).getCompacted()); ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) - .putCompacted(65L).putCompacted(75L).submit(tabletMetadata -> false); + .putCompacted(fateId65L).putCompacted(fateId75L).submit(tabletMetadata -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); tabletMeta1 = context.getAmple().readTablet(e1); - assertEquals(Set.of(55L, 65L, 75L), tabletMeta1.getCompacted()); + assertEquals(Set.of(fateId55L, fateId65L, fateId75L), tabletMeta1.getCompacted()); // test require same with a superset ctmi = new ConditionalTabletsMutatorImpl(context); - tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L) - .putCompacted(45L).build(COMPACTED); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(fateId55L).putCompacted(fateId65L) + .putCompacted(fateId75L).putCompacted(fateId45L).build(COMPACTED); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) - .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .deleteCompacted(fateId55L).deleteCompacted(fateId65L).deleteCompacted(fateId75L) .submit(tabletMetadata -> false); results = ctmi.process(); assertEquals(Status.REJECTED, results.get(e1).getStatus()); - assertEquals(Set.of(55L, 65L, 75L), context.getAmple().readTablet(e1).getCompacted()); + assertEquals(Set.of(fateId55L, fateId65L, fateId75L), + context.getAmple().readTablet(e1).getCompacted()); // test require same with a subset ctmi = new ConditionalTabletsMutatorImpl(context); - tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).build(COMPACTED); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(fateId55L).putCompacted(fateId65L) + .build(COMPACTED); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) - .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .deleteCompacted(fateId55L).deleteCompacted(fateId65L).deleteCompacted(fateId75L) .submit(tabletMetadata -> false); results = ctmi.process(); assertEquals(Status.REJECTED, results.get(e1).getStatus()); - assertEquals(Set.of(55L, 65L, 75L), context.getAmple().readTablet(e1).getCompacted()); + assertEquals(Set.of(fateId55L, fateId65L, fateId75L), + context.getAmple().readTablet(e1).getCompacted()); // now use the exact set the tablet has ctmi = new ConditionalTabletsMutatorImpl(context); - tabletMeta1 = TabletMetadata.builder(e2).putCompacted(55L).putCompacted(65L).putCompacted(75L) - .build(COMPACTED); + tabletMeta1 = TabletMetadata.builder(e2).putCompacted(fateId55L).putCompacted(fateId65L) + .putCompacted(fateId75L).build(COMPACTED); ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tabletMeta1, COMPACTED) - .deleteCompacted(55L).deleteCompacted(65L).deleteCompacted(75L) + .deleteCompacted(fateId55L).deleteCompacted(fateId65L).deleteCompacted(fateId75L) .submit(tabletMetadata -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); @@ -931,8 +949,10 @@ public void multipleFilters() { // add wal compact and flush ID to these tablets final Set tabletsWithWalCompactFlush = Set.of(e1, e2, e3); for (KeyExtent ke : tabletsWithWalCompactFlush) { + FateInstanceType type = FateInstanceType.fromTableId(ke.tableId()); + FateId fateId34L = FateId.from(type, 34L); ctmi = new ConditionalTabletsMutatorImpl(context); - ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(34L) + ctmi.mutateTablet(ke).requireAbsentOperation().putCompacted(fateId34L) .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).putWal(wal) .submit(tabletMetadata -> false); var results = ctmi.process(); @@ -971,13 +991,16 @@ public void testCompactedAndFlushIdFilter() { ServerContext context = cluster.getServerContext(); ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); Set filter = Set.of(new TestTabletMetadataFilter()); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId34L = FateId.from(type, 34L); + FateId fateId987L = FateId.from(type, 987L); // make sure we read all tablets on table initially with no filters testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4), "Initially, all tablets should be present"); // Set compacted on e2 but with no flush ID - ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(34L) + ctmi.mutateTablet(e2).requireAbsentOperation().putCompacted(fateId34L) .submit(tabletMetadata -> false); var results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e2).getStatus()); @@ -1004,7 +1027,7 @@ public void testCompactedAndFlushIdFilter() { // Set compacted and correct flush ID on e3 ctmi = new ConditionalTabletsMutatorImpl(context); - ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(987L) + ctmi.mutateTablet(e3).requireAbsentOperation().putCompacted(fateId987L) .putFlushId(TestTabletMetadataFilter.VALID_FLUSH_ID).submit(tabletMetadata -> false); results = ctmi.process(); assertEquals(Status.ACCEPTED, results.get(e3).getStatus()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java index 453baf40ccb..a4ca9803e7d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MergeIT.java @@ -59,6 +59,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -690,6 +692,8 @@ public void testCompactionMetadata() throws Exception { new KeyExtent(tableId, null, split))) { var tablet = tabletsMutator.mutateTablet(extent); ExternalCompactionId ecid = ExternalCompactionId.generate(UUID.randomUUID()); + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId44L = FateId.from(type, 44L); ReferencedTabletFile tmpFile = ReferencedTabletFile.of(new Path("file:///accumulo/tables/t-0/b-0/c1.rf")); @@ -697,7 +701,7 @@ public void testCompactionMetadata() throws Exception { Set jobFiles = Set.of(StoredTabletFile.of(new Path("file:///accumulo/tables/t-0/b-0/b2.rf"))); CompactionMetadata ecMeta = new CompactionMetadata(jobFiles, tmpFile, "localhost:4444", - CompactionKind.SYSTEM, (short) 2, ceid, false, 44L); + CompactionKind.SYSTEM, (short) 2, ceid, false, fateId44L); tablet.putExternalCompaction(ecid, ecMeta); tablet.mutate(); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index 24b4948a784..082b5de1036 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -46,6 +46,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.rfile.RFile; @@ -169,9 +171,9 @@ private void runSplitRecoveryTest(ServerContext context, int failPoint, String m dataFiles.put(new ReferencedTabletFile(new Path(tdir + "/" + RFile.EXTENSION + "_000_000")), new DataFileValue(1000017 + i, 10000 + i)); - int tid = 0; + FateId fateId = FateId.from(FateInstanceType.fromTableId(extent.tableId()), 0); SortedMap storedFiles = - new TreeMap<>(MetadataTableUtil.updateTabletDataFile(tid, extent, dataFiles, + new TreeMap<>(MetadataTableUtil.updateTabletDataFile(fateId, extent, dataFiles, new MetadataTime(0, TimeType.LOGICAL), context, zl)); if (i == extentToSplit) { splitDataFiles = storedFiles; @@ -187,12 +189,12 @@ private void runSplitRecoveryTest(ServerContext context, int failPoint, String m "localhost:1234", failPoint, zl); } - private static Map> getBulkFilesLoaded(ServerContext context, + private static Map> getBulkFilesLoaded(ServerContext context, KeyExtent extent) { - Map> bulkFiles = new HashMap<>(); + Map> bulkFiles = new HashMap<>(); - context.getAmple().readTablet(extent).getLoaded().forEach((path, txid) -> bulkFiles - .computeIfAbsent(txid, k -> new ArrayList<>()).add(path.getTabletFile())); + context.getAmple().readTablet(extent).getLoaded().forEach((path, fateId) -> bulkFiles + .computeIfAbsent(fateId, k -> new ArrayList<>()).add(path.getTabletFile())); return bulkFiles; } @@ -217,7 +219,7 @@ private void splitPartiallyAndRecover(ServerContext context, KeyExtent extent, K tabletMutator.mutate(); if (steps >= 1) { - Map> bulkFiles = getBulkFilesLoaded(context, high); + Map> bulkFiles = getBulkFilesLoaded(context, high); SplitRecovery12to13.addNewTablet(context, low, "lowDir", instance, lowDatafileSizes, bulkFiles, new MetadataTime(0, TimeType.LOGICAL), -1L); @@ -248,9 +250,9 @@ private void splitPartiallyAndRecover(ServerContext context, KeyExtent extent, K ensureTabletHasNoUnexpectedMetadataEntries(context, low, lowDatafileSizes); ensureTabletHasNoUnexpectedMetadataEntries(context, high, highDatafileSizes); - Map> lowBulkFiles = + Map> lowBulkFiles = getBulkFilesLoaded(context, low); - Map> highBulkFiles = + Map> highBulkFiles = getBulkFilesLoaded(context, high); if (!lowBulkFiles.equals(highBulkFiles)) {