diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java index 2e0214782a1..2a0eb1067aa 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateId.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateId.java @@ -22,6 +22,7 @@ import org.apache.accumulo.core.data.AbstractId; import org.apache.accumulo.core.manager.thrift.TFateId; +import org.apache.accumulo.core.manager.thrift.TFateInstanceType; import org.apache.accumulo.core.util.FastFormat; /** @@ -34,6 +35,8 @@ public class FateId extends AbstractId { private static final long serialVersionUID = 1L; private static final String PREFIX = "FATE:"; private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$"); + private static final Pattern FATEID_PATTERN = + Pattern.compile("^" + PREFIX + "[a-zA-Z]+:[0-9a-fA-F]+$"); private FateId(String canonical) { super(canonical); @@ -86,6 +89,46 @@ public static FateId from(FateInstanceType type, String hexTid) { } } + /** + * @param fateIdStr the string representation of the FateId + * @return a new FateId object from the given string + */ + public static FateId from(String fateIdStr) { + if (FATEID_PATTERN.matcher(fateIdStr).matches()) { + String[] fields = fateIdStr.split(":"); + try { + FateInstanceType.valueOf(fields[1]); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid FateInstanceType: " + fields[1], e); + } + return new FateId(fateIdStr); + } else { + throw new IllegalArgumentException("Invalid Fate ID: " + fateIdStr); + } + } + + /** + * @param fateIdStr the string representation of the FateId + * @return true if the string is a valid FateId, false otherwise + */ + public static boolean isFormattedTid(String fateIdStr) { + if (FATEID_PATTERN.matcher(fateIdStr).matches()) { + String[] fields = fateIdStr.split(":"); + try { + FateInstanceType.valueOf(fields[1]); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } else { + return false; + } + } + + /** + * @param tFateId the TFateId + * @return the FateId equivalent of the given TFateId + */ public static FateId fromThrift(TFateId tFateId) { FateInstanceType type; long tid = tFateId.getTid(); @@ -104,6 +147,26 @@ public static FateId fromThrift(TFateId tFateId) { return new FateId(PREFIX + type + ":" + formatTid(tid)); } + /** + * + * @return the TFateId equivalent of the FateId + */ + public TFateId toThrift() { + TFateInstanceType thriftType; + FateInstanceType type = getType(); + switch (type) { + case USER: + thriftType = TFateInstanceType.USER; + break; + case META: + thriftType = TFateInstanceType.META; + break; + default: + throw new IllegalArgumentException("Invalid FateInstanceType: " + type); + } + return new TFateId(thriftType, getTid()); + } + /** * Returns the hex string equivalent of the tid */ diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java index 33714b25812..5cccad2456d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooReservation.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.zookeeper.KeeperException; @@ -29,15 +30,14 @@ public class ZooReservation { - public static boolean attempt(ZooReaderWriter zk, String path, String reservationID, - String debugInfo) throws KeeperException, InterruptedException { - if (reservationID.contains(":")) { - throw new IllegalArgumentException(); - } + private static final String DELIMITER = "-"; + + public static boolean attempt(ZooReaderWriter zk, String path, FateId fateId, String debugInfo) + throws KeeperException, InterruptedException { while (true) { try { - zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(UTF_8), + zk.putPersistentData(path, (fateId + DELIMITER + debugInfo).getBytes(UTF_8), NodeExistsPolicy.FAIL); return true; } catch (NodeExistsException nee) { @@ -48,15 +48,15 @@ public static boolean attempt(ZooReaderWriter zk, String path, String reservatio continue; } - String idInZoo = new String(zooData, UTF_8).split(":")[0]; + FateId idInZoo = FateId.from(new String(zooData, UTF_8).split(DELIMITER)[0]); - return idInZoo.equals(reservationID); + return idInZoo.equals(fateId); } } } - public static void release(ZooReaderWriter zk, String path, String reservationID) + public static void release(ZooReaderWriter zk, String path, FateId fateId) throws KeeperException, InterruptedException { byte[] zooData; @@ -69,11 +69,11 @@ public static void release(ZooReaderWriter zk, String path, String reservationID } String zooDataStr = new String(zooData, UTF_8); - String idInZoo = zooDataStr.split(":")[0]; + FateId idInZoo = FateId.from(zooDataStr.split(DELIMITER)[0]); - if (!idInZoo.equals(reservationID)) { + if (!idInZoo.equals(fateId)) { throw new IllegalStateException("Tried to release reservation " + path - + " with data mismatch " + reservationID + " " + zooDataStr); + + " with data mismatch " + fateId + " " + zooDataStr); } zk.recursiveDelete(path, NodeMissingPolicy.SKIP); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java index 49b7cf169a1..a70ab73a580 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletOperationId.java @@ -19,7 +19,7 @@ package org.apache.accumulo.core.metadata.schema; import org.apache.accumulo.core.data.AbstractId; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import com.google.common.base.Preconditions; @@ -33,14 +33,14 @@ public class TabletOperationId extends AbstractId { public static String validate(String opid) { var fields = opid.split(":"); - Preconditions.checkArgument(fields.length == 2, "Malformed operation id %s", opid); + Preconditions.checkArgument(fields.length == 4, "Malformed operation id %s", opid); try { TabletOperationType.valueOf(fields[0]); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Malformed operation id " + opid, e); } - if (!FateTxId.isFormatedTid(fields[1])) { + if (!FateId.isFormattedTid(opid.substring(fields[0].length() + 1))) { throw new IllegalArgumentException("Malformed operation id " + opid); } @@ -53,7 +53,7 @@ private TabletOperationId(String canonical) { public TabletOperationType getType() { var fields = canonical().split(":"); - Preconditions.checkState(fields.length == 2); + Preconditions.checkState(fields.length == 4); return TabletOperationType.valueOf(fields[0]); } @@ -61,7 +61,7 @@ public static TabletOperationId from(String opid) { return new TabletOperationId(validate(opid)); } - public static TabletOperationId from(TabletOperationType type, long txid) { - return new TabletOperationId(type + ":" + FateTxId.formatTid(txid)); + public static TabletOperationId from(TabletOperationType type, FateId fateId) { + return new TabletOperationId(type + ":" + fateId); } } diff --git a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java index 6a0523c51b2..ce3a4095c3e 100644 --- a/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java +++ b/core/src/main/thrift-gen-java/org/apache/accumulo/core/tabletserver/thrift/TExternalCompactionJob.java @@ -35,7 +35,7 @@ public class TExternalCompactionJob implements org.apache.thrift.TBase overrides; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -139,7 +139,6 @@ public java.lang.String getFieldName() { // isset id assignments private static final int __PROPAGATEDELETES_ISSET_ID = 0; - private static final int __FATETXID_ISSET_ID = 1; private byte __isset_bitfield = 0; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { @@ -159,8 +158,8 @@ public java.lang.String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.KIND, new org.apache.thrift.meta_data.FieldMetaData("kind", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.ENUM , "TCompactionKind"))); - tmpMap.put(_Fields.FATE_TX_ID, new org.apache.thrift.meta_data.FieldMetaData("fateTxId", org.apache.thrift.TFieldRequirementType.DEFAULT, - new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.FATE_TX_ID, new org.apache.thrift.meta_data.FieldMetaData("fateTxId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.manager.thrift.TFateId.class))); tmpMap.put(_Fields.OVERRIDES, new org.apache.thrift.meta_data.FieldMetaData("overrides", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), @@ -180,7 +179,7 @@ public TExternalCompactionJob( java.lang.String outputFile, boolean propagateDeletes, TCompactionKind kind, - long fateTxId, + org.apache.accumulo.core.manager.thrift.TFateId fateTxId, java.util.Map overrides) { this(); @@ -225,7 +224,9 @@ public TExternalCompactionJob(TExternalCompactionJob other) { if (other.isSetKind()) { this.kind = other.kind; } - this.fateTxId = other.fateTxId; + if (other.isSetFateTxId()) { + this.fateTxId = new org.apache.accumulo.core.manager.thrift.TFateId(other.fateTxId); + } if (other.isSetOverrides()) { java.util.Map __this__overrides = new java.util.HashMap(other.overrides); this.overrides = __this__overrides; @@ -248,7 +249,7 @@ public void clear() { this.propagateDeletes = false; this.kind = null; setFateTxIdIsSet(false); - this.fateTxId = 0; + this.fateTxId = null; this.overrides = null; } @@ -441,27 +442,29 @@ public void setKindIsSet(boolean value) { } } - public long getFateTxId() { + public org.apache.accumulo.core.manager.thrift.TFateId getFateTxId() { return this.fateTxId; } - public TExternalCompactionJob setFateTxId(long fateTxId) { + public TExternalCompactionJob setFateTxId(org.apache.accumulo.core.manager.thrift.TFateId fateTxId) { this.fateTxId = fateTxId; setFateTxIdIsSet(true); return this; } public void unsetFateTxId() { - __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __FATETXID_ISSET_ID); + this.fateTxId = null; } /** Returns true if field fateTxId is set (has been assigned a value) and false otherwise */ public boolean isSetFateTxId() { - return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __FATETXID_ISSET_ID); + return this.fateTxId != null; } public void setFateTxIdIsSet(boolean value) { - __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __FATETXID_ISSET_ID, value); + if (!value) { + this.fateTxId = null; + } } public int getOverridesSize() { @@ -563,7 +566,7 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable if (value == null) { unsetFateTxId(); } else { - setFateTxId((java.lang.Long)value); + setFateTxId((org.apache.accumulo.core.manager.thrift.TFateId)value); } break; @@ -719,12 +722,12 @@ public boolean equals(TExternalCompactionJob that) { return false; } - boolean this_present_fateTxId = true; - boolean that_present_fateTxId = true; + boolean this_present_fateTxId = true && this.isSetFateTxId(); + boolean that_present_fateTxId = true && that.isSetFateTxId(); if (this_present_fateTxId || that_present_fateTxId) { if (!(this_present_fateTxId && that_present_fateTxId)) return false; - if (this.fateTxId != that.fateTxId) + if (!this.fateTxId.equals(that.fateTxId)) return false; } @@ -770,7 +773,9 @@ public int hashCode() { if (isSetKind()) hashCode = hashCode * 8191 + kind.getValue(); - hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(fateTxId); + hashCode = hashCode * 8191 + ((isSetFateTxId() ? 131071 : 524287)); + if (isSetFateTxId()) + hashCode = hashCode * 8191 + fateTxId.hashCode(); hashCode = hashCode * 8191 + ((isSetOverrides()) ? 131071 : 524287); if (isSetOverrides()) @@ -954,7 +959,11 @@ public java.lang.String toString() { first = false; if (!first) sb.append(", "); sb.append("fateTxId:"); - sb.append(this.fateTxId); + if (this.fateTxId == null) { + sb.append("null"); + } else { + sb.append(this.fateTxId); + } first = false; if (!first) sb.append(", "); sb.append("overrides:"); @@ -974,6 +983,9 @@ public void validate() throws org.apache.thrift.TException { if (extent != null) { extent.validate(); } + if (fateTxId != null) { + fateTxId.validate(); + } if (iteratorSettings != null) { iteratorSettings.validate(); } @@ -1087,8 +1099,9 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TExternalCompaction } break; case 8: // FATE_TX_ID - if (schemeField.type == org.apache.thrift.protocol.TType.I64) { - struct.fateTxId = iprot.readI64(); + if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) { + struct.fateTxId = new org.apache.accumulo.core.manager.thrift.TFateId(); + struct.fateTxId.read(iprot); struct.setFateTxIdIsSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); @@ -1170,9 +1183,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TExternalCompactio oprot.writeI32(struct.kind.getValue()); oprot.writeFieldEnd(); } - oprot.writeFieldBegin(FATE_TX_ID_FIELD_DESC); - oprot.writeI64(struct.fateTxId); - oprot.writeFieldEnd(); + if (struct.fateTxId != null) { + oprot.writeFieldBegin(FATE_TX_ID_FIELD_DESC); + struct.fateTxId.write(oprot); + oprot.writeFieldEnd(); + } if (struct.overrides != null) { oprot.writeFieldBegin(OVERRIDES_FIELD_DESC); { @@ -1261,7 +1276,7 @@ public void write(org.apache.thrift.protocol.TProtocol prot, TExternalCompaction oprot.writeI32(struct.kind.getValue()); } if (struct.isSetFateTxId()) { - oprot.writeI64(struct.fateTxId); + struct.fateTxId.write(oprot); } if (struct.isSetOverrides()) { { @@ -1320,7 +1335,8 @@ public void read(org.apache.thrift.protocol.TProtocol prot, TExternalCompactionJ struct.setKindIsSet(true); } if (incoming.get(7)) { - struct.fateTxId = iprot.readI64(); + struct.fateTxId = new org.apache.accumulo.core.manager.thrift.TFateId(); + struct.extent.read(iprot); struct.setFateTxIdIsSet(true); } if (incoming.get(8)) { diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index a45078edb8d..34bca1e9c3a 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -55,6 +55,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SuspendingTServer; @@ -324,6 +326,8 @@ public void testBuilder() { KeyExtent extent = new KeyExtent(TableId.of("5"), new Text("df"), new Text("da")); + FateInstanceType type = FateInstanceType.fromTableId(extent.tableId()); + StoredTabletFile sf1 = new ReferencedTabletFile(new Path("hdfs://nn1/acc/tables/1/t-0001/sf1.rf")).insert(); DataFileValue dfv1 = new DataFileValue(89, 67); @@ -364,7 +368,8 @@ public void testBuilder() { assertThrows(IllegalStateException.class, tm::getSuspend); assertThrows(IllegalStateException.class, tm::getTime); - TabletOperationId opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, 55); + TabletOperationId opid1 = + TabletOperationId.from(TabletOperationType.SPLITTING, FateId.from(type, 55)); TabletMetadata tm2 = TabletMetadata.builder(extent).putOperation(opid1).build(LOCATION); assertEquals(extent, tm2.getExtent()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java index 018d473a76d..bac4d3d4cc1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManager.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.hadoop.conf.Configuration; @@ -167,8 +168,8 @@ RemoteIterator listFiles(final Path path, final boolean recur * This operation should be idempotent to allow calling multiple times in the case of a partial * completion. */ - void bulkRename(Map oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException; + void bulkRename(Map oldToNewPathMap, int poolSize, String poolName, FateId fateId) + throws IOException; // forward to the appropriate FileSystem object boolean moveToTrash(Path sourcePath) throws IOException; diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java index b47dedabe0d..1c1e7a4fd56 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeManagerImpl.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.spi.fs.VolumeChooser; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -321,7 +322,7 @@ public boolean rename(Path path, Path newPath) throws IOException { @Override public void bulkRename(Map oldToNewPathMap, int poolSize, String poolName, - String transactionId) throws IOException { + FateId fateId) throws IOException { List> results = new ArrayList<>(); ExecutorService workerPool = ThreadPools.getServerThreadPools().createFixedThreadPool(poolSize, poolName, false); @@ -337,14 +338,14 @@ public void bulkRename(Map oldToNewPathMap, int poolSize, String pool } log.debug( "Ignoring rename exception for {} because destination already exists. orig: {} new: {}", - transactionId, oldPath, newPath, e); + fateId, oldPath, newPath, e); success = true; } if (!success && (!exists(newPath) || exists(oldPath))) { - throw new IOException("Rename operation " + transactionId + " returned false. orig: " - + oldPath + " new: " + newPath); + throw new IOException("Rename operation " + fateId + " returned false. orig: " + oldPath + + " new: " + newPath); } else if (log.isTraceEnabled()) { - log.trace("{} moved {} to {}", transactionId, oldPath, newPath); + log.trace("{} moved {} to {}", fateId, oldPath, newPath); } return null; }))); diff --git a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java index 78f98be69e2..5658a679238 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java @@ -459,7 +459,7 @@ public void testOperationId() { assertViolation(mc, m, (short) 9); m = new Mutation(new Text("0;foo")); - ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE[123abc]")); + ServerColumnFamily.OPID_COLUMN.put(m, new Value("MERGING:FATE:META:123abc")); violations = mc.check(createEnv(), m); assertNull(violations); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 2d93e420682..8e198cc4a80 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -62,7 +62,6 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -210,11 +209,14 @@ protected void checkIfCanceled() { if (job.getKind() == TCompactionKind.USER) { - var cconf = CompactionConfigStorage.getConfig(getContext(), job.getFateTxId()); + // ELASTICITY_TODO DEFERRED - ISSUE 4044 + // deferred for after pull/4247 is merged. Will be able to pass + // FateId.fromThrift(job.getFateTxId()) + var cconf = CompactionConfigStorage.getConfig(getContext(), job.getFateTxId().getTid()); if (cconf == null) { LOG.info("Cancelling compaction {} for user compaction that no longer exists {} {}", - ecid, FateTxId.formatTid(job.getFateTxId()), extent); + ecid, job.getFateTxId(), extent); JOB_HOLDER.cancel(job.getExternalCompactionId()); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index ec6ee9a42d6..946a13c2993 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; @@ -570,10 +571,13 @@ protected TExternalCompactionJob createThriftJob(String externalCompactionId, fateTxid = metaJob.getTabletMetadata().getSelectedFiles().getFateTxId(); } + // ELASTICITY_TODO DEFERRED - ISSUE 4044 : fixable after pull/4247 is merged + FateId tempFateId = FateId + .from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId()), fateTxid); return new TExternalCompactionJob(externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides); + TCompactionKind.valueOf(ecm.getKind().name()), tempFateId.toThrift(), overrides); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index e5ffc8039d1..dce71c231da 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -146,8 +146,7 @@ public static long reserveHdfsDirectory(Manager env, String directory, FateId fa ZooReaderWriter zk = env.getContext().getZooReaderWriter(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 .. should the full FateId be passed below? - if (ZooReservation.attempt(zk, resvPath, fateId.getHexTid(), "")) { + if (ZooReservation.attempt(zk, resvPath, fateId, "")) { return 0; } else { return 50; @@ -158,13 +157,12 @@ public static void unreserveHdfsDirectory(Manager env, String directory, FateId throws KeeperException, InterruptedException { String resvPath = env.getContext().getZooKeeperRoot() + Constants.ZHDFS_RESERVATIONS + "/" + Base64.getEncoder().encodeToString(directory.getBytes(UTF_8)); - ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId.getHexTid()); + ZooReservation.release(env.getContext().getZooReaderWriter(), resvPath, fateId); } private static Lock getLock(ServerContext context, AbstractId id, FateId fateId, boolean writeLock) { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 ... should lock data use full FateId? - byte[] lockData = fateId.getHexTid().getBytes(UTF_8); + byte[] lockData = fateId.canonical().getBytes(UTF_8); var fLockPath = FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical()); FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index eb78e48ce83..7556158a713 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@ -106,8 +106,7 @@ private void moveFiles(FateId fateId, Path sourceDir, Path bulkDir, Manager mana oldToNewMap.put(originalPath, newPath); } try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId.getHexTid()); + fs.bulkRename(oldToNewMap, workerCount, "bulkDir move", fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java index c01e50114ff..4df298793fa 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java @@ -52,8 +52,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception { @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), + TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit, tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId.getTid())); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index 57d36164821..a4f7928060f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -39,7 +39,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; @@ -60,12 +60,12 @@ public class TabletRefresher { private static final Logger log = LoggerFactory.getLogger(TabletRefresher.class); public static void refresh(ServerContext context, - Supplier> onlineTserversSupplier, long fateTxid, TableId tableId, + Supplier> onlineTserversSupplier, FateId fateId, TableId tableId, byte[] startRow, byte[] endRow, Predicate needsRefresh) { // ELASTICITY_TODO should this thread pool be configurable? - ThreadPoolExecutor threadPool = context.threadPools().createFixedThreadPool(10, - "Tablet refresh " + FateTxId.formatTid(fateTxid), false); + ThreadPoolExecutor threadPool = + context.threadPools().createFixedThreadPool(10, "Tablet refresh " + fateId, false); try (var tablets = context.getAmple().readTablets().forTable(tableId) .overlapping(startRow, endRow).checkConsistency() @@ -86,7 +86,7 @@ public static void refresh(ServerContext context, var refreshesNeeded = batch.stream().collect(groupingBy(TabletMetadata::getLocation, mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), toList()))); - refreshTablets(threadPool, FateTxId.formatTid(fateTxid), context, onlineTserversSupplier, + refreshTablets(threadPool, fateId.canonical(), context, onlineTserversSupplier, refreshesNeeded); }); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index fa7e4d31e06..dfd0ef9cf59 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -305,9 +305,8 @@ public void undo(FateId fateId, Manager env) throws Exception { // For any compactions that may have happened before this operation failed, attempt to refresh // tablets. - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId.getTid(), tableId, - startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId, tableId, startRow, + endRow, tabletMetadata -> true); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java index aca1242276d..fd4daf0c4c9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java @@ -45,9 +45,8 @@ public RefreshTablets(TableId tableId, NamespaceId namespaceId, byte[] startRow, @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId.getTid(), - tableId, startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, tableId, + startRow, endRow, tabletMetadata -> true); return new CleanUp(tableId, namespaceId, startRow, endRow); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java index 91d62752e77..25c2b7a58f6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/ReserveTablets.java @@ -54,8 +54,7 @@ public ReserveTablets(TableId tableId, NamespaceId namespaceId) { @Override public long isReady(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.DELETING, fateId); // The consumer may be called in another thread so use an AtomicLong AtomicLong accepted = new AtomicLong(0); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java index 7df3561e1a3..f071785ece5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java @@ -85,8 +85,7 @@ private Optional deleteTabletFiles(Manager manager, FateId fateId) { // Only delete data within the original extent specified by the user KeyExtent range = data.getOriginalExtent(); log.debug("{} deleting tablet files in range {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); try ( var tabletsMetadata = manager.getContext().getAmple().readTablets() diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java index 53c24959ae3..f48885d5469 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteTablets.java @@ -60,8 +60,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { KeyExtent range = data.getMergeExtent(); log.debug("{} Deleting tablets for {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); AtomicLong acceptedCount = new AtomicLong(); AtomicLong rejectedCount = new AtomicLong(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java index 643a3428a96..65cefef1abe 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/FinishTableRangeOp.java @@ -61,8 +61,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { static void removeOperationIds(Logger log, MergeInfo data, FateId fateId, Manager manager) { KeyExtent range = data.getReserveExtent(); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); log.debug("{} unreserving tablet in range {}", fateId, range); AtomicLong acceptedCount = new AtomicLong(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java index b46f7bd2ef1..2b5b9c969c5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/MergeTablets.java @@ -78,8 +78,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { KeyExtent range = data.getMergeExtent(); log.debug("{} Merging metadata for {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); Set tabletAvailabilities = new HashSet<>(); MetadataTime maxLogicalTime = null; List dirs = new ArrayList<>(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java index df8bd977ba0..b3d246572a3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/ReserveTablets.java @@ -55,8 +55,7 @@ public ReserveTablets(MergeInfo data) { public long isReady(FateId fateId, Manager env) throws Exception { var range = data.getReserveExtent(); log.debug("{} reserving tablets in range {}", fateId, range); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); AtomicLong opsAccepted = new AtomicLong(0); Consumer resultConsumer = result -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java index e3262daa8c9..dbf0d04f428 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/DeleteOperationIds.java @@ -42,8 +42,7 @@ public DeleteOperationIds(SplitInfo splitInfo) { @Override public Repo call(FateId fateId, Manager manager) throws Exception { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java index 813f525692f..a973015ddc0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java @@ -66,8 +66,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception { // ELASTICITY_TODO intentionally not getting the table lock because not sure if its needed, // revist later when more operations are moved out of tablet server - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); // ELASTICITY_TODO write IT that spins up 100 threads that all try to add a diff split to // the same tablet. @@ -132,8 +131,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { TabletMetadata tabletMetadata = manager.getContext().getAmple() .readTablet(splitInfo.getOriginal(), PREV_ROW, LOCATION, OPID); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); if (tabletMetadata == null || !opid.equals(tabletMetadata.getOperationId())) { // the tablet no longer exists or we could not set the operation id, maybe another operation diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index b3d7a85cf78..23556aa41e3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -62,8 +62,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { TabletMetadata tabletMetadata = manager.getContext().getAmple().readTablet(splitInfo.getOriginal()); - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId.getTid()); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); if (tabletMetadata == null) { // check to see if this operation has already succeeded. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java index 0bc2e86983e..f0ffe01994a 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java @@ -106,8 +106,7 @@ public Repo call(FateId fateId, Manager manager) throws Exception { } } try { - // ELASTICITY_TODO DEFERRED - ISSUE 4044 - fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId.getHexTid()); + fs.bulkRename(oldToNewPaths, workerCount, "importtable rename", fateId); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null, TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage()); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 973a369c96e..1748cab29b0 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.CompactableFileImpl; @@ -177,7 +178,10 @@ protected TExternalCompactionJob createThriftJob(String externalCompactionId, metaJob.getTabletMetadata().getExtent().toThrift(), List.of(), SystemIteratorUtil.toIteratorConfig(List.of()), ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), - TCompactionKind.valueOf(ecm.getKind().name()), 1L, Map.of()); + TCompactionKind.valueOf(ecm.getKind().name()), + FateId.from(FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId()), 1L) + .toThrift(), + Map.of()); } @Override @@ -270,6 +274,7 @@ public void testGetCompactionJob() throws Exception { TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); + expect(tm.getTableId()).andReturn(ke.tableId()); EasyMock.replay(tconf, context, creds, tm, security); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 7076bf44ce6..3ecb58bd6ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -57,6 +57,8 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -267,7 +269,9 @@ public void testScanTabletsWithOperationIds() throws Exception { 1_000); // Set operationIds on all the table's tablets so that they won't be loaded. - TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, 1234L); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId = FateId.from(type, 1234L); + TabletOperationId opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Ample ample = getCluster().getServerContext().getAmple(); ServerAmpleImpl sai = (ServerAmpleImpl) ample; try (TabletsMutator tm = sai.mutateTablets()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index df5841228de..dcbb04f1adb 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -66,6 +66,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.iterators.user.GcWalsFilter; import org.apache.accumulo.core.iterators.user.HasCurrentFilter; @@ -694,8 +696,11 @@ public void testOperations() { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var context = cluster.getServerContext(); - var opid1 = TabletOperationId.from("SPLITTING:FATE[1234]"); - var opid2 = TabletOperationId.from("MERGING:FATE[5678]"); + FateInstanceType type = FateInstanceType.fromTableId(tid); + FateId fateId1 = FateId.from(type, "1234"); + FateId fateId2 = FateId.from(type, "5678"); + var opid1 = TabletOperationId.from(TabletOperationType.SPLITTING, fateId1); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2); var ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(e1).requireAbsentOperation().putOperation(opid1).submit(tm -> false); @@ -814,7 +819,9 @@ public void testRootTabletUpdate() { assertEquals(LocationType.CURRENT, loc.getType()); assertNull(rootMeta.getOperationId()); - TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, 7); + FateInstanceType type = FateInstanceType.fromTableId(RootTable.EXTENT.tableId()); + FateId fateId = FateId.from(type, 7); + TabletOperationId opid = TabletOperationId.from(TabletOperationType.MERGING, fateId); var ctmi = new ConditionalTabletsMutatorImpl(context); ctmi.mutateTablet(RootTable.EXTENT).requireAbsentOperation().requireAbsentLocation() @@ -1178,7 +1185,9 @@ public void testAsyncMutator() throws Exception { }; // run a test where a subset of tablets are modified, all modifications should be accepted - var opid1 = TabletOperationId.from(TabletOperationType.MERGING, 50); + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId1 = FateId.from(type, 50); + var opid1 = TabletOperationId.from(TabletOperationType.MERGING, fateId1); int expected = 0; try (var tablets = ample.readTablets().forTable(tableId).fetch(OPID, PREV_ROW).build(); @@ -1199,7 +1208,8 @@ public void testAsyncMutator() throws Exception { // run test where some will be accepted and some will be rejected and ensure the counts come // out as expected. - var opid2 = TabletOperationId.from(TabletOperationType.MERGING, 51); + FateId fateId2 = FateId.from(type, 51); + var opid2 = TabletOperationId.from(TabletOperationType.MERGING, fateId2); accepted.set(0); total.set(0); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 1d9096f8198..2e61925e656 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@ -62,6 +62,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -382,6 +384,10 @@ public void testOpidPreventsAssignment() throws Exception { String tableName = super.getUniqueNames(1)[0]; var tableId = TableId.of(prepTableForScanTest(c, tableName)); + + FateInstanceType type = FateInstanceType.fromTableId(tableId); + FateId fateId = FateId.from(type, 42L); + assertEquals(0, countTabletsWithLocation(c, tableId)); assertEquals(Set.of("f", "m", "t"), c.tableOperations().listSplits(tableName).stream() @@ -394,7 +400,7 @@ public void testOpidPreventsAssignment() throws Exception { // to not be assigned try (var writer = c.createBatchWriter(AccumuloTable.METADATA.tableName())) { var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Mutation m = new Mutation(extent.toMetaRow()); TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); writer.addMutation(m); @@ -420,7 +426,7 @@ public void testOpidPreventsAssignment() throws Exception { // to be unhosted try (var writer = c.createBatchWriter(AccumuloTable.METADATA.tableName())) { var extent = new KeyExtent(tableId, new Text("m"), new Text("f")); - var opid = TabletOperationId.from(TabletOperationType.SPLITTING, 42L); + var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId); Mutation m = new Mutation(extent.toMetaRow()); TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, new Value(opid.canonical())); writer.addMutation(m); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index ff6f4d34cb5..c88989d1c48 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -56,6 +56,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.manager.state.TabletManagement; @@ -322,7 +324,9 @@ private void reassignLocation(AccumuloClient client, String table, String tableN // Sets an operation type on all tablets up to the end row private void setOperationId(AccumuloClient client, String table, String tableNameToModify, Text end, TabletOperationType opType) throws TableNotFoundException { - var opid = TabletOperationId.from(opType, 42L); + FateInstanceType type = FateInstanceType.fromNamespaceOrTableName(table); + FateId fateId = FateId.from(type, 42L); + var opid = TabletOperationId.from(opType, fateId); TableId tableIdToModify = TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); try (TabletsMetadata tabletsMetadata =