From a3ec20e41a8058106d829f73346f65f1734d1860 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun <43969518+kevinrr888@users.noreply.github.com> Date: Fri, 16 Feb 2024 14:24:58 -0500 Subject: [PATCH] Globally Unique FATE Transaction Ids - Part 4 (#4258) This addresses several previously deferred changes for issue #4044. Changes: - ZooReservation now uses FateId (used in Utils) - TabletOperationId now uses FateId - TExternalCompactionJob now uses FateId - VolumeManager and VolumeManagerImpl now use FateId - Utils.getLock() lockData now uses the full FateId - TabletRefresher now uses FateId - Classes which used the above classes updated - Several test changes to reflect new changes - Deferred a couple of changes (in Compactor and CompactionCoordinator) (need pull/4247 merged first) --- .../org/apache/accumulo/core/fate/FateId.java | 23 +++- .../core/fate/zookeeper/ZooReservation.java | 24 ++-- .../metadata/schema/TabletOperationId.java | 12 +- .../thrift/TExternalCompactionJob.java | 126 ++++++++++-------- core/src/main/thrift/tabletserver.thrift | 2 +- .../metadata/schema/TabletMetadataTest.java | 3 +- .../constraints/MetadataConstraints.java | 2 +- .../accumulo/server/fs/VolumeManager.java | 5 +- .../accumulo/server/fs/VolumeManagerImpl.java | 11 +- .../constraints/MetadataConstraintsTest.java | 2 +- .../apache/accumulo/compactor/Compactor.java | 13 +- .../coordinator/CompactionCoordinator.java | 3 +- .../accumulo/manager/tableOps/Utils.java | 8 +- .../tableOps/bulkVer2/BulkImportMove.java | 3 +- .../tableOps/bulkVer2/RefreshTablets.java | 3 +- .../tableOps/bulkVer2/TabletRefresher.java | 10 +- .../tableOps/compact/CompactionDriver.java | 5 +- .../tableOps/compact/RefreshTablets.java | 5 +- .../tableOps/delete/ReserveTablets.java | 3 +- .../manager/tableOps/merge/DeleteRows.java | 3 +- .../manager/tableOps/merge/DeleteTablets.java | 3 +- .../tableOps/merge/FinishTableRangeOp.java | 3 +- .../manager/tableOps/merge/MergeTablets.java | 3 +- .../tableOps/merge/ReserveTablets.java | 3 +- .../tableOps/split/DeleteOperationIds.java | 3 +- .../manager/tableOps/split/PreSplit.java | 6 +- .../manager/tableOps/split/UpdateTablets.java | 3 +- .../tableImport/MoveExportedFiles.java | 3 +- .../compaction/CompactionCoordinatorTest.java | 10 +- .../apache/accumulo/test/ScanServerIT.java | 6 +- .../functional/AmpleConditionalWriterIT.java | 18 ++- .../test/functional/ManagerAssignmentIT.java | 10 +- .../TabletManagementIteratorIT.java | 6 +- 33 files changed, 191 insertions(+), 152 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index 5be742d2fd3..8907c6879c5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -24,6 +24,7 @@ import org.apache.accumulo.core.data.AbstractId; import org.apache.accumulo.core.manager.thrift.TFateId; +import org.apache.accumulo.core.manager.thrift.TFateInstanceType; import org.apache.accumulo.core.util.FastFormat; /** @@ -107,7 +108,7 @@ public static FateId from(String fateIdStr) { * @param fateIdStr the string representation of the FateId * @return true if the string is a valid FateId, false otherwise */ - public static boolean isFormattedTid(String fateIdStr) { + public static boolean isFateId(String fateIdStr) { return FATEID_PATTERN.matcher(fateIdStr).matches(); } @@ -133,6 +134,26 @@ public static FateId fromThrift(TFateId tFateId) { return new FateId(PREFIX + type + ":" + formatTid(tid)); } + /** + * + * @return the TFateId equivalent of the FateId + */ + public TFateId toThrift() { + TFateInstanceType thriftType; + FateInstanceType type = getType(); + switch (type) { + case USER: + thriftType = TFateInstanceType.USER; + break; + case META: + thriftType = TFateInstanceType.META; + break; + default: + throw new IllegalArgumentException("Invalid FateInstanceType: " + type); + } + return new TFateId(thriftType, getTid()); + } + /** * Returns the hex string equivalent of the tid */ diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java index 33714b25812..8c6a9183012 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.zookeeper.KeeperException; @@ -29,15 +30,14 @@ public class ZooReservation { - public static boolean attempt(ZooReaderWriter zk, String path, String reservationID, - String debugInfo) throws KeeperException, InterruptedException { - if (reservationID.contains(":")) { - throw new IllegalArgumentException(); - } + private static final String DELIMITER = "-"; + + public static boolean attempt(ZooReaderWriter zk, String path, FateId fateId, String debugInfo) + throws KeeperException, InterruptedException { while (true) { try { - zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(UTF_8), + zk.putPersistentData(path, (fateId.canonical() + DELIMITER + debugInfo).getBytes(UTF_8), NodeExistsPolicy.FAIL); return true; } catch (NodeExistsException nee) { @@ -48,15 +48,15 @@ public static boolean attempt(ZooReaderWriter zk, String path, String reservatio continue; } - String idInZoo = new String(zooData, UTF_8).split(":")[0]; + FateId idInZoo = FateId.from(new String(zooData, UTF_8).split(DELIMITER)[0]); - return idInZoo.equals(reservationID); + return idInZoo.equals(fateId); } } } - public static void release(ZooReaderWriter zk, String path, String reservationID) + public static void release(ZooReaderWriter zk, String path, FateId fateId) throws KeeperException, InterruptedException { byte[] zooData; @@ -69,11 +69,11 @@ public static void release(ZooReaderWriter zk, String path, String reservationID } String zooDataStr = new String(zooData, UTF_8); - String idInZoo = zooDataStr.split(":")[0]; + FateId idInZoo = FateId.from(zooDataStr.split(DELIMITER)[0]); - if (!idInZoo.equals(reservationID)) { + if (!idInZoo.equals(fateId)) { throw new IllegalStateException("Tried to release reservation " + path - + " with data mismatch " + reservationID + " " + zooDataStr); + + " with data mismatch " + fateId + " " + zooDataStr); } zk.recursiveDelete(path, NodeMissingPolicy.SKIP); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java index 49b7cf169a1..8da831eacd5 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java @@ -19,7 +19,7 @@ package org.apache.accumulo.core.metadata.schema; import org.apache.accumulo.core.data.AbstractId; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import com.google.common.base.Preconditions; @@ -32,7 +32,7 @@ public class TabletOperationId extends AbstractId { private static final long serialVersionUID = 1L; public static String validate(String opid) { - var fields = opid.split(":"); + var fields = opid.split(":", 2); Preconditions.checkArgument(fields.length == 2, "Malformed operation id %s", opid); try { TabletOperationType.valueOf(fields[0]); @@ -40,7 +40,7 @@ public static String validate(String opid) { throw new IllegalArgumentException("Malformed operation id " + opid, e); } - if (!FateTxId.isFormatedTid(fields[1])) { + if (!FateId.isFateId(fields[1])) { throw new IllegalArgumentException("Malformed operation id " + opid); } @@ -52,7 +52,7 @@ private TabletOperationId(String canonical) { } public TabletOperationType getType() { - var fields = canonical().split(":"); + var fields = canonical().split(":", 2); Preconditions.checkState(fields.length == 2); return TabletOperationType.valueOf(fields[0]); } @@ -61,7 +61,7 @@ public static TabletOperationId from(String opid) { return new TabletOperationId(validate(opid)); } - public static TabletOperationId from(TabletOperationType type, long txid) { - return new TabletOperationId(type + ":" + FateTxId.formatTid(txid)); + public static TabletOperationId from(TabletOperationType type, FateId fateId) { + return new TabletOperationId(type + ":" + fateId); } } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java index 6a0523c51b2..7a3fdf46d17 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java @@ -35,7 +35,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase overrides; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -60,7 +60,7 @@ public enum _Fields implements org.apache.thrift.TFieldIdEnum { OUTPUT_FILE((short)5, "outputFile"), PROPAGATE_DELETES((short)6, "propagateDeletes"), KIND((short)7, "kind"), - FATE_TX_ID((short)8, "fateTxId"), + FATE_ID((short)8, "fateId"), OVERRIDES((short)9, "overrides"); private static final java.util.Map byName = new java.util.HashMap(); @@ -91,8 +91,8 @@ public static _Fields findByThriftId(int fieldId) { return PROPAGATE_DELETES; case 7: // KIND return KIND; - case 8: // FATE_TX_ID - return FATE_TX_ID; + case 8: // FATE_ID + return FATE_ID; case 9: // OVERRIDES return OVERRIDES; default: @@ -139,7 +139,6 @@ public java.lang.String getFieldName() { // isset id assignments private static final int __PROPAGATEDELETES_ISSET_ID = 0; - private static final int __FATETXID_ISSET_ID = 1; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -159,8 +158,8 @@ public java.lang.String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM , "TCompactionKind"))); - tmpMap.put(_Fields.FATE_TX_ID, new org.apache.thrift.meta_data.FieldMetaData("fateTxId", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.FATE_ID, new org.apache.thrift.meta_data.FieldMetaData("fateId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.manager.thrift.TFateId.class))); tmpMap.put(_Fields.OVERRIDES, new org.apache.thrift.meta_data.FieldMetaData("overrides", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), @@ -180,7 +179,7 @@ public TExternalCompactionJob( java.lang.String outputFile, boolean propagateDeletes, TCompactionKind kind, - long fateTxId, + org.apache.accumulo.core.manager.thrift.TFateId fateId, java.util.Map overrides) { this(); @@ -192,8 +191,7 @@ public TExternalCompactionJob( this.propagateDeletes = propagateDeletes; setPropagateDeletesIsSet(true); this.kind = kind; - this.fateTxId = fateTxId; - setFateTxIdIsSet(true); + this.fateId = fateId; this.overrides = overrides; } @@ -225,7 +223,9 @@ public TExternalCompactionJob(TExternalCompactionJob other) { if (other.isSetKind()) { this.kind = other.kind; } - this.fateTxId = other.fateTxId; + if (other.isSetFateId()) { + this.fateId = new org.apache.accumulo.core.manager.thrift.TFateId(other.fateId); + } if (other.isSetOverrides()) { java.util.Map __this__overrides = new java.util.HashMap(other.overrides); this.overrides = __this__overrides; @@ -247,8 +247,7 @@ public void clear() { setPropagateDeletesIsSet(false); this.propagateDeletes = false; this.kind = null; - setFateTxIdIsSet(false); - this.fateTxId = 0; + this.fateId = null; this.overrides = null; } @@ -441,27 +440,29 @@ public void setKindIsSet(boolean value) { } } - public long getFateTxId() { - return this.fateTxId; + @org.apache.thrift.annotation.Nullable + public org.apache.accumulo.core.manager.thrift.TFateId getFateId() { + return this.fateId; } - public TExternalCompactionJob setFateTxId(long fateTxId) { - this.fateTxId = fateTxId; - setFateTxIdIsSet(true); + public TExternalCompactionJob setFateId(@org.apache.thrift.annotation.Nullable org.apache.accumulo.core.manager.thrift.TFateId fateId) { + this.fateId = fateId; return this; } - public void unsetFateTxId() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __FATETXID_ISSET_ID); + public void unsetFateId() { + this.fateId = null; } - /** Returns true if field fateTxId is set (has been assigned a value) and false otherwise */ - public boolean isSetFateTxId() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __FATETXID_ISSET_ID); + /** Returns true if field fateId is set (has been assigned a value) and false otherwise */ + public boolean isSetFateId() { + return this.fateId != null; } - public void setFateTxIdIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __FATETXID_ISSET_ID, value); + public void setFateIdIsSet(boolean value) { + if (!value) { + this.fateId = null; + } } public int getOverridesSize() { @@ -559,11 +560,11 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable } break; - case FATE_TX_ID: + case FATE_ID: if (value == null) { - unsetFateTxId(); + unsetFateId(); } else { - setFateTxId((java.lang.Long)value); + setFateId((org.apache.accumulo.core.manager.thrift.TFateId)value); } break; @@ -603,8 +604,8 @@ public java.lang.Object getFieldValue(_Fields field) { case KIND: return getKind(); - case FATE_TX_ID: - return getFateTxId(); + case FATE_ID: + return getFateId(); case OVERRIDES: return getOverrides(); @@ -635,8 +636,8 @@ public boolean isSet(_Fields field) { return isSetPropagateDeletes(); case KIND: return isSetKind(); - case FATE_TX_ID: - return isSetFateTxId(); + case FATE_ID: + return isSetFateId(); case OVERRIDES: return isSetOverrides(); } @@ -719,12 +720,12 @@ public boolean equals(TExternalCompactionJob that) { return false; } - boolean this_present_fateTxId = true; - boolean that_present_fateTxId = true; - if (this_present_fateTxId || that_present_fateTxId) { - if (!(this_present_fateTxId && that_present_fateTxId)) + boolean this_present_fateId = true && this.isSetFateId(); + boolean that_present_fateId = true && that.isSetFateId(); + if (this_present_fateId || that_present_fateId) { + if (!(this_present_fateId && that_present_fateId)) return false; - if (this.fateTxId != that.fateTxId) + if (!this.fateId.equals(that.fateId)) return false; } @@ -770,7 +771,9 @@ public int hashCode() { if (isSetKind()) hashCode = hashCode * 8191 + kind.getValue(); - hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(fateTxId); + hashCode = hashCode * 8191 + ((isSetFateId()) ? 131071 : 524287); + if (isSetFateId()) + hashCode = hashCode * 8191 + fateId.hashCode(); hashCode = hashCode * 8191 + ((isSetOverrides()) ? 131071 : 524287); if (isSetOverrides()) @@ -857,12 +860,12 @@ public int compareTo(TExternalCompactionJob other) { return lastComparison; } } - lastComparison = java.lang.Boolean.compare(isSetFateTxId(), other.isSetFateTxId()); + lastComparison = java.lang.Boolean.compare(isSetFateId(), other.isSetFateId()); if (lastComparison != 0) { return lastComparison; } - if (isSetFateTxId()) { - lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fateTxId, other.fateTxId); + if (isSetFateId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.fateId, other.fateId); if (lastComparison != 0) { return lastComparison; } @@ -953,8 +956,12 @@ public java.lang.String toString() { } first = false; if (!first) sb.append(", "); - sb.append("fateTxId:"); - sb.append(this.fateTxId); + sb.append("fateId:"); + if (this.fateId == null) { + sb.append("null"); + } else { + sb.append(this.fateId); + } first = false; if (!first) sb.append(", "); sb.append("overrides:"); @@ -977,6 +984,9 @@ public void validate() throws org.apache.thrift.TException { if (iteratorSettings != null) { iteratorSettings.validate(); } + if (fateId != null) { + fateId.validate(); + } } private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { @@ -1086,10 +1096,11 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TExternalCompaction org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 8: // FATE_TX_ID - if (schemeField.type == org.apache.thrift.protocol.TType.I64) { - struct.fateTxId = iprot.readI64(); - struct.setFateTxIdIsSet(true); + case 8: // FATE_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.fateId = new org.apache.accumulo.core.manager.thrift.TFateId(); + struct.fateId.read(iprot); + struct.setFateIdIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1170,9 +1181,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TExternalCompactio oprot.writeI32(struct.kind.getValue()); oprot.writeFieldEnd(); } - oprot.writeFieldBegin(FATE_TX_ID_FIELD_DESC); - oprot.writeI64(struct.fateTxId); - oprot.writeFieldEnd(); + if (struct.fateId != null) { + oprot.writeFieldBegin(FATE_ID_FIELD_DESC); + struct.fateId.write(oprot); + oprot.writeFieldEnd(); + } if (struct.overrides != null) { oprot.writeFieldBegin(OVERRIDES_FIELD_DESC); { @@ -1226,7 +1239,7 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExternalCompaction if (struct.isSetKind()) { optionals.set(6); } - if (struct.isSetFateTxId()) { + if (struct.isSetFateId()) { optionals.set(7); } if (struct.isSetOverrides()) { @@ -1260,8 +1273,8 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExternalCompaction if (struct.isSetKind()) { oprot.writeI32(struct.kind.getValue()); } - if (struct.isSetFateTxId()) { - oprot.writeI64(struct.fateTxId); + if (struct.isSetFateId()) { + struct.fateId.write(oprot); } if (struct.isSetOverrides()) { { @@ -1320,8 +1333,9 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TExternalCompactionJ struct.setKindIsSet(true); } if (incoming.get(7)) { - struct.fateTxId = iprot.readI64(); - struct.setFateTxIdIsSet(true); + struct.fateId = new org.apache.accumulo.core.manager.thrift.TFateId(); + struct.fateId.read(iprot); + struct.setFateIdIsSet(true); } if (incoming.get(8)) { { diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift index e473b4ad814..231e8f58096 100644 --- a/core/src/main/thrift/tabletserver.thrift +++ b/core/src/main/thrift/tabletserver.thrift @@ -111,7 +111,7 @@ struct TExternalCompactionJob { 5:string outputFile 6:bool propagateDeletes 7:TCompactionKind kind - 8:i64 fateTxId + 8:manager.TFateId fateId 9:map overrides } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index e3cfc83fa21..36f2ea06e13 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -373,7 +373,8 @@ public void testBuilder() { assertThrows(IllegalStateException.class, tm::getSuspend); assertThrows(IllegalStateException.class, tm::getTime); - TabletOperationId opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, 55); + TabletOperationId opid1 = + TabletOperationId.from(TabletOperationType.SPLITTING, FateId.from(type, 55)); TabletMetadata tm2 = TabletMetadata.builder(extent).putOperation(opid1).build(LOCATION); assertEquals(extent, tm2.getExtent()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index c8d6f9dba67..5851e7224e6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -267,7 +267,7 @@ public List check(Environment env, Mutation mutation) { violations = addViolation(violations, 11); } } else if (CompactedColumnFamily.NAME.equals(columnFamily)) { - if (!FateId.isFormattedTid(columnQualifier.toString())) { + if (!FateId.isFateId(columnQualifier.toString())) { violations = addViolation(violations, 13); } } else if (columnFamily.equals(BulkFileColumnFamily.NAME)) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 018d473a76d..bac4d3d4cc1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.conf.Configuration; @@ -167,8 +168,8 @@ RemoteIterator listFiles(final Path path, final boolean recur * This operation should be idempotent to allow calling multiple times in the case of a partial * completion. */ - void bulkRename(Map oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException; + void bulkRename(Map oldToNewPathMap, int poolSize, String poolName, FateId fateId) + throws IOException; // forward to the appropriate FileSystem object boolean moveToTrash(Path sourcePath) throws IOException; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index b47dedabe0d..1c1e7a4fd56 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.spi.fs.VolumeChooser; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -321,7 +322,7 @@ public boolean rename(Path path, Path newPath) throws IOException { @Override public void bulkRename(Map oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException { + FateId fateId) throws IOException { List> results = new ArrayList<>(); ExecutorService workerPool = ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, poolName, false); @@ -337,14 +338,14 @@ public void bulkRename(Map oldToNewPathMap, int poolSize, String pool } log.debug( "Ignoring rename exception for {} because destination already exists. orig: {} new: {}", - transactionId, oldPath, newPath, e); + fateId, oldPath, newPath, e); success = true; } if (!success && (!exists(newPath) || exists(oldPath))) { - throw new IOException("Rename operation " + transactionId + " returned false. orig: " - + oldPath + " new: " + newPath); + throw new IOException("Rename operation " + fateId + " returned false. orig: " + oldPath + + " new: " + newPath); } else if (log.isTraceEnabled()) { - log.trace("{} moved {} to {}", transactionId, oldPath, newPath); + log.trace("{} moved {} to {}", fateId, oldPath, newPath); } return null; }))); diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index a738e2fc3cd..5360a54e956 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -462,7 +462,7 @@ public void testOperationId() { assertViolation(mc, m, (short) 9); m = new Mutation(new Text("0;foo")); - ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE[123abc]")); + ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE:META:123abc")); violations = mc.check(createEnv(), m); assertNull(violations); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index c626fcb24db..1a1c2ece589 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -63,8 +63,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -213,17 +211,12 @@ protected void checkIfCanceled() { if (job.getKind() == TCompactionKind.USER) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044: TExternalCompactionJob.getFateTxId should be - // changed to - // TExternalCompactionJob.getFateId and return the FateId - FateInstanceType type = - FateInstanceType.fromTableId(KeyExtent.fromThrift(job.getExtent()).tableId()); - FateId fateId = FateId.from(type, job.getFateTxId()); - var cconf = CompactionConfigStorage.getConfig(getContext(), fateId); + var cconf = + CompactionConfigStorage.getConfig(getContext(), FateId.fromThrift(job.getFateId())); if (cconf == null) { LOG.info("Cancelling compaction {} for user compaction that no longer exists {} {}", - ecid, FateTxId.formatTid(job.getFateTxId()), extent); + ecid, FateId.fromThrift(job.getFateId()), extent); JOB_HOLDER.cancel(job.getExternalCompactionId()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 76af05395ca..fdbf0093523 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -572,11 +572,10 @@ protected TExternalCompactionJob createThriftJob(String externalCompactionId, fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId(); } - // ELASTICITY_TODO DEFERRED - ISSUE 4044 return new TExternalCompactionJob(externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), fateId.getTid(), overrides); + TCompactionKind.valueOf(ecm.getKind().name()), fateId.toThrift(), overrides); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index e5ffc8039d1..dce71c231da 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -146,8 +146,7 @@ public static long reserveHdfsDirectory(Manager env, String directory, FateId fa ZooReaderWriter zk = env.getContext().getZooReaderWriter(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 .. should the full FateId be passed below? - if (ZooReservation.attempt(zk, resvPath, fateId.getHexTid(), "")) { + if (ZooReservation.attempt(zk, resvPath, fateId, "")) { return 0; } else { return 50; @@ -158,13 +157,12 @@ public static void unreserveHdfsDirectory(Manager env, String directory, FateId throws KeeperException, InterruptedException { String resvPath = env.getContext().getZooKeeperRoot() + Constants.ZHDFS_RESERVATIONS + "/" + Base64.getEncoder().encodeToString(directory.getBytes(UTF_8)); - ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId.getHexTid()); + ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId); } private static Lock getLock(ServerContext context, AbstractId id, FateId fateId, boolean writeLock) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 ... should lock data use full FateId? - byte[] lockData = fateId.getHexTid().getBytes(UTF_8); + byte[] lockData = fateId.canonical().getBytes(UTF_8); var fLockPath = FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical()); FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index 33cf61a0cc3..3b5b2fbbee5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@ -105,8 +105,7 @@ private void moveFiles(FateId fateId, Path sourceDir, Path bulkDir, Manager mana oldToNewMap.put(originalPath, newPath); } try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId.getHexTid()); + fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java index 3871c3cdf5d..95816691a66 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java @@ -52,8 +52,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception { @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), + TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit, tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId)); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index 57d36164821..a4f7928060f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -39,7 +39,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -60,12 +60,12 @@ public class TabletRefresher { private static final Logger log = LoggerFactory.getLogger(TabletRefresher.class); public static void refresh(ServerContext context, - Supplier> onlineTserversSupplier, long fateTxid, TableId tableId, + Supplier> onlineTserversSupplier, FateId fateId, TableId tableId, byte[] startRow, byte[] endRow, Predicate needsRefresh) { // ELASTICITY_TODO should this thread pool be configurable? - ThreadPoolExecutor threadPool = context.threadPools().createFixedThreadPool(10, - "Tablet refresh " + FateTxId.formatTid(fateTxid), false); + ThreadPoolExecutor threadPool = + context.threadPools().createFixedThreadPool(10, "Tablet refresh " + fateId, false); try (var tablets = context.getAmple().readTablets().forTable(tableId) .overlapping(startRow, endRow).checkConsistency() @@ -86,7 +86,7 @@ public static void refresh(ServerContext context, var refreshesNeeded = batch.stream().collect(groupingBy(TabletMetadata::getLocation, mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), toList()))); - refreshTablets(threadPool, FateTxId.formatTid(fateTxid), context, onlineTserversSupplier, + refreshTablets(threadPool, fateId.canonical(), context, onlineTserversSupplier, refreshesNeeded); }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 9c2ca4c2de4..3e5c17b6504 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -295,9 +295,8 @@ public void undo(FateId fateId, Manager env) throws Exception { // For any compactions that may have happened before this operation failed, attempt to refresh // tablets. - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId.getTid(), tableId, - startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId, tableId, startRow, + endRow, tabletMetadata -> true); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java index aca1242276d..fd4daf0c4c9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java @@ -45,9 +45,8 @@ public RefreshTablets(TableId tableId, NamespaceId namespaceId, byte[] startRow, @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), - tableId, startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, tableId, + startRow, endRow, tabletMetadata -> true); return new CleanUp(tableId, namespaceId, startRow, endRow); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index 91d62752e77..25c2b7a58f6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -54,8 +54,7 @@ public ReserveTablets(TableId tableId, NamespaceId namespaceId) { @Override public long isReady(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId); // The consumer may be called in another thread so use an AtomicLong AtomicLong accepted = new AtomicLong(0); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java index 7df3561e1a3..f071785ece5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java @@ -85,8 +85,7 @@ private Optional deleteTabletFiles(Manager manager, FateId fateId) { // Only delete data within the original extent specified by the user KeyExtent range = data.getOriginalExtent(); log.debug("{} deleting tablet files in range {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); try ( var tabletsMetadata = manager.getContext().getAmple().readTablets() diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index 53c24959ae3..f48885d5469 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -60,8 +60,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { KeyExtent range = data.getMergeExtent(); log.debug("{} Deleting tablets for {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 643a3428a96..65cefef1abe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -61,8 +61,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { static void removeOperationIds(Logger log, MergeInfo data, FateId fateId, Manager manager) { KeyExtent range = data.getReserveExtent(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); log.debug("{} unreserving tablet in range {}", fateId, range); AtomicLong acceptedCount = new AtomicLong(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index b46f7bd2ef1..2b5b9c969c5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -78,8 +78,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { KeyExtent range = data.getMergeExtent(); log.debug("{} Merging metadata for {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); Set tabletAvailabilities = new HashSet<>(); MetadataTime maxLogicalTime = null; List dirs = new ArrayList<>(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index df8bd977ba0..b3d246572a3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -55,8 +55,7 @@ public ReserveTablets(MergeInfo data) { public long isReady(FateId fateId, Manager env) throws Exception { var range = data.getReserveExtent(); log.debug("{} reserving tablets in range {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); AtomicLong opsAccepted = new AtomicLong(0); Consumer resultConsumer = result -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index e3262daa8c9..dbf0d04f428 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -42,8 +42,7 @@ public DeleteOperationIds(SplitInfo splitInfo) { @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 813f525692f..a973015ddc0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -66,8 +66,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception { // ELASTICITY_TODO intentionally not getting the table lock because not sure if its needed, // revist later when more operations are moved out of tablet server - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); // ELASTICITY_TODO write IT that spins up 100 threads that all try to add a diff split to // the same tablet. @@ -132,8 +131,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { TabletMetadata tabletMetadata = manager.getContext().getAmple() .readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) { // the tablet no longer exists or we could not set the operation id, maybe another operation diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index 27c5c39709a..afc1a77b7a1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -61,8 +61,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { TabletMetadata tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal()); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); if (tabletMetadata == null) { // check to see if this operation has already succeeded. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java index 0bc2e86983e..f0ffe01994a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java @@ -106,8 +106,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { } } try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId.getHexTid()); + fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null, TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 64cc5d03ffb..404e04fccc1 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -166,7 +166,7 @@ protected CompactionMetadata reserveCompaction(MetaJob metaJob, String compactor protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, Set jobFiles, TabletMetadata tablet, String compactorAddress, ExternalCompactionId externalCompactionId) { - FateInstanceType type = FateInstanceType.fromTableId(tablet.getTableId()); + FateInstanceType type = FateInstanceType.fromTableId(tablet.getExtent().tableId()); FateId fateId = FateId.from(type, 1L); return new CompactionMetadata(jobFiles, new ReferencedTabletFile(new Path("file:///accumulo/tables/1/default_tablet/F00001.rf")), @@ -180,7 +180,12 @@ protected TExternalCompactionJob createThriftJob(String externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), SystemIteratorUtil.toIteratorConfig(List.of()), ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), 1L, Map.of()); + TCompactionKind.valueOf(ecm.getKind().name()), + FateId + .from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getExtent().tableId()), + 1L) + .toThrift(), + Map.of()); } @Override @@ -273,7 +278,6 @@ public void testGetCompactionJob() throws Exception { TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); - expect(tm.getTableId()).andReturn(ke.tableId()); EasyMock.replay(tconf, context, creds, tm, security); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 7076bf44ce6..3ecb58bd6ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -57,6 +57,8 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -267,7 +269,9 @@ public void testScanTabletsWithOperationIds() throws Exception { 1_000); // Set operationIds on all the table's tablets so that they won't be loaded. - TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, 1234L); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId = FateId.from(type, 1234L); + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Ample ample = getCluster().getServerContext().getAmple(); ServerAmpleImpl sai = (ServerAmpleImpl) ample; try (TabletsMutator tm = sai.mutateTablets()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index a1933575970..2b3a8972f1d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -700,8 +700,11 @@ public void testOperations() { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var context = cluster.getServerContext(); - var opid1 = TabletOperationId.from("SPLITTING:FATE[1234]"); - var opid2 = TabletOperationId.from("MERGING:FATE[5678]"); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId1 = FateId.from(type, "1234"); + FateId fateId2 = FateId.from(type, "5678"); + var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2); var ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false); @@ -832,7 +835,9 @@ public void testRootTabletUpdate() { assertEquals(LocationType.CURRENT, loc.getType()); assertNull(rootMeta.getOperationId()); - TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, 7); + FateInstanceType type = FateInstanceType.fromTableId(RootTable.EXTENT.tableId()); + FateId fateId = FateId.from(type, 7); + TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); var ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation() @@ -1201,7 +1206,9 @@ public void testAsyncMutator() throws Exception { }; // run a test where a subset of tablets are modified, all modifications should be accepted - var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50); + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId1 = FateId.from(type, 50); + var opid1 = TabletOperationId.from(TabletOperationType.MERGING, fateId1); int expected = 0; try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); @@ -1222,7 +1229,8 @@ public void testAsyncMutator() throws Exception { // run test where some will be accepted and some will be rejected and ensure the counts come // out as expected. - var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51); + FateId fateId2 = FateId.from(type, 51); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2); accepted.set(0); total.set(0); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 1d9096f8198..2e61925e656 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@ -62,6 +62,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -382,6 +384,10 @@ public void testOpidPreventsAssignment() throws Exception { String tableName = super.getUniqueNames(1)[0]; var tableId = TableId.of(prepTableForScanTest(c, tableName)); + + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId = FateId.from(type, 42L); + assertEquals(0, countTabletsWithLocation(c, tableId)); assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream() @@ -394,7 +400,7 @@ public void testOpidPreventsAssignment() throws Exception { // to not be assigned try (var writer = c.createBatchWriter(AccumuloTable.METADATA.tableName())) { var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Mutation m = new Mutation(extent.toMetaRow()); TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); writer.addMutation(m); @@ -420,7 +426,7 @@ public void testOpidPreventsAssignment() throws Exception { // to be unhosted try (var writer = c.createBatchWriter(AccumuloTable.METADATA.tableName())) { var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Mutation m = new Mutation(extent.toMetaRow()); TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); writer.addMutation(m); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index ff6f4d34cb5..c88989d1c48 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -56,6 +56,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.state.TabletManagement; @@ -322,7 +324,9 @@ private void reassignLocation(AccumuloClient client, String table, String tableN // Sets an operation type on all tablets up to the end row private void setOperationId(AccumuloClient client, String table, String tableNameToModify, Text end, TabletOperationType opType) throws TableNotFoundException { - var opid = TabletOperationId.from(opType, 42L); + FateInstanceType type = FateInstanceType.fromNamespaceOrTableName(table); + FateId fateId = FateId.from(type, 42L); + var opid = TabletOperationId.from(opType, fateId); TableId tableIdToModify = TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); try (TabletsMetadata tabletsMetadata =