Skip to content

Commit

Permalink
Globally Unique FATE Transaction Ids - Part 4
Browse files Browse the repository at this point in the history
This addresses several previously deferred changes for issue apache#4044.
Changes:
- ZooReservation now uses FateId (used in Utils)
- TabletOperationId now uses FateId
- TExternalCompactionJob now uses FateId
- VolumeManager and VolumeManagerImpl now use FateId
- Utils.getLock() lockData now uses the full FateId
- TabletRefresher now uses FateId
- Classes which used the above classes updated
- Several test changes to reflect new changes
- Deferred a couple of changes (in Compactor and CompactionCoordinator) (need pull/4247 merged first)
  • Loading branch information
kevinrr888 committed Feb 12, 2024
1 parent 47b54a6 commit b164641
Show file tree
Hide file tree
Showing 31 changed files with 213 additions and 109 deletions.
63 changes: 63 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateId.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.accumulo.core.data.AbstractId;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
import org.apache.accumulo.core.util.FastFormat;

/**
Expand All @@ -34,6 +35,8 @@ public class FateId extends AbstractId<FateId> {
private static final long serialVersionUID = 1L;
private static final String PREFIX = "FATE:";
private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$");
private static final Pattern FATEID_PATTERN =
Pattern.compile("^" + PREFIX + "[a-zA-Z]+:[0-9a-fA-F]+$");

private FateId(String canonical) {
super(canonical);
Expand Down Expand Up @@ -86,6 +89,46 @@ public static FateId from(FateInstanceType type, String hexTid) {
}
}

/**
* @param fateIdStr the string representation of the FateId
* @return a new FateId object from the given string
*/
public static FateId from(String fateIdStr) {
if (FATEID_PATTERN.matcher(fateIdStr).matches()) {
String[] fields = fateIdStr.split(":");
try {
FateInstanceType.valueOf(fields[1]);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid FateInstanceType: " + fields[1], e);
}
return new FateId(fateIdStr);
} else {
throw new IllegalArgumentException("Invalid Fate ID: " + fateIdStr);
}
}

/**
* @param fateIdStr the string representation of the FateId
* @return true if the string is a valid FateId, false otherwise
*/
public static boolean isFormattedTid(String fateIdStr) {
if (FATEID_PATTERN.matcher(fateIdStr).matches()) {
String[] fields = fateIdStr.split(":");
try {
FateInstanceType.valueOf(fields[1]);
} catch (IllegalArgumentException e) {
return false;
}
return true;
} else {
return false;
}
}

/**
* @param tFateId the TFateId
* @return the FateId equivalent of the given TFateId
*/
public static FateId fromThrift(TFateId tFateId) {
FateInstanceType type;
long tid = tFateId.getTid();
Expand All @@ -104,6 +147,26 @@ public static FateId fromThrift(TFateId tFateId) {
return new FateId(PREFIX + type + ":" + formatTid(tid));
}

/**
*
* @return the TFateId equivalent of the FateId
*/
public TFateId toThrift() {
TFateInstanceType thriftType;
FateInstanceType type = getType();
switch (type) {
case USER:
thriftType = TFateInstanceType.USER;
break;
case META:
thriftType = TFateInstanceType.META;
break;
default:
throw new IllegalArgumentException("Invalid FateInstanceType: " + type);
}
return new TFateId(thriftType, getTid());
}

/**
* Returns the hex string equivalent of the tid
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.zookeeper.KeeperException;
Expand All @@ -29,15 +30,14 @@

public class ZooReservation {

public static boolean attempt(ZooReaderWriter zk, String path, String reservationID,
String debugInfo) throws KeeperException, InterruptedException {
if (reservationID.contains(":")) {
throw new IllegalArgumentException();
}
private static final String DELIMITER = "-";

public static boolean attempt(ZooReaderWriter zk, String path, FateId fateId, String debugInfo)
throws KeeperException, InterruptedException {

while (true) {
try {
zk.putPersistentData(path, (reservationID + ":" + debugInfo).getBytes(UTF_8),
zk.putPersistentData(path, (fateId + DELIMITER + debugInfo).getBytes(UTF_8),
NodeExistsPolicy.FAIL);
return true;
} catch (NodeExistsException nee) {
Expand All @@ -48,15 +48,15 @@ public static boolean attempt(ZooReaderWriter zk, String path, String reservatio
continue;
}

String idInZoo = new String(zooData, UTF_8).split(":")[0];
FateId idInZoo = FateId.from(new String(zooData, UTF_8).split(DELIMITER)[0]);

return idInZoo.equals(reservationID);
return idInZoo.equals(fateId);
}
}

}

public static void release(ZooReaderWriter zk, String path, String reservationID)
public static void release(ZooReaderWriter zk, String path, FateId fateId)
throws KeeperException, InterruptedException {
byte[] zooData;

Expand All @@ -69,11 +69,11 @@ public static void release(ZooReaderWriter zk, String path, String reservationID
}

String zooDataStr = new String(zooData, UTF_8);
String idInZoo = zooDataStr.split(":")[0];
FateId idInZoo = FateId.from(zooDataStr.split(DELIMITER)[0]);

if (!idInZoo.equals(reservationID)) {
if (!idInZoo.equals(fateId)) {
throw new IllegalStateException("Tried to release reservation " + path
+ " with data mismatch " + reservationID + " " + zooDataStr);
+ " with data mismatch " + fateId + " " + zooDataStr);
}

zk.recursiveDelete(path, NodeMissingPolicy.SKIP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.accumulo.core.metadata.schema;

import org.apache.accumulo.core.data.AbstractId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;

import com.google.common.base.Preconditions;

Expand All @@ -33,14 +33,14 @@ public class TabletOperationId extends AbstractId<TabletOperationId> {

public static String validate(String opid) {
var fields = opid.split(":");
Preconditions.checkArgument(fields.length == 2, "Malformed operation id %s", opid);
Preconditions.checkArgument(fields.length == 4, "Malformed operation id %s", opid);
try {
TabletOperationType.valueOf(fields[0]);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Malformed operation id " + opid, e);
}

if (!FateTxId.isFormatedTid(fields[1])) {
if (!FateId.isFormattedTid(opid.substring(fields[0].length() + 1))) {
throw new IllegalArgumentException("Malformed operation id " + opid);
}

Expand All @@ -53,15 +53,15 @@ private TabletOperationId(String canonical) {

public TabletOperationType getType() {
var fields = canonical().split(":");
Preconditions.checkState(fields.length == 2);
Preconditions.checkState(fields.length == 4);
return TabletOperationType.valueOf(fields[0]);
}

public static TabletOperationId from(String opid) {
return new TabletOperationId(validate(opid));
}

public static TabletOperationId from(TabletOperationType type, long txid) {
return new TabletOperationId(type + ":" + FateTxId.formatTid(txid));
public static TabletOperationId from(TabletOperationType type, FateId fateId) {
return new TabletOperationId(type + ":" + fateId);
}
}
Loading

0 comments on commit b164641

Please sign in to comment.