Skip to content

Commit

Permalink
Globally Unique FATE Transaction Ids - Part 3 (#4247)
Browse files Browse the repository at this point in the history
This addresses several previously deferred changes for issue #4044. Root of most of these new changes were from changing TabletMetadata and TabletUpdates (in Ample) - FateId is now stored in the Metadata table instead of just the formatted long id. Addresses deferred changes to TabletMetadata, TabletUpdates, CompactionConfigStorage, SelectedFiles, and Ample.
  • Loading branch information
kevinrr888 authored Feb 14, 2024
1 parent 5d12597 commit f8ab378
Show file tree
Hide file tree
Showing 39 changed files with 373 additions and 290 deletions.
29 changes: 29 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateId.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.accumulo.core.fate;

import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.data.AbstractId;
import org.apache.accumulo.core.manager.thrift.TFateId;
Expand All @@ -34,6 +36,9 @@ public class FateId extends AbstractId<FateId> {
private static final long serialVersionUID = 1L;
private static final String PREFIX = "FATE:";
private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$");
private static final Pattern FATEID_PATTERN = Pattern.compile("^" + PREFIX + "("
+ Stream.of(FateInstanceType.values()).map(Enum::name).collect(Collectors.joining("|"))
+ "):[0-9a-fA-F]+$");

private FateId(String canonical) {
super(canonical);
Expand Down Expand Up @@ -86,6 +91,30 @@ public static FateId from(FateInstanceType type, String hexTid) {
}
}

/**
* @param fateIdStr the string representation of the FateId
* @return a new FateId object from the given string
*/
public static FateId from(String fateIdStr) {
if (FATEID_PATTERN.matcher(fateIdStr).matches()) {
return new FateId(fateIdStr);
} else {
throw new IllegalArgumentException("Invalid Fate ID: " + fateIdStr);
}
}

/**
* @param fateIdStr the string representation of the FateId
* @return true if the string is a valid FateId, false otherwise
*/
public static boolean isFormattedTid(String fateIdStr) {
return FATEID_PATTERN.matcher(fateIdStr).matches();
}

/**
* @param tFateId the TFateId
* @return the FateId equivalent of the given TFateId
*/
public static FateId fromThrift(TFateId tFateId) {
FateInstanceType type;
long tid = tFateId.getTid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.gc.GcCandidate;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.lock.ServiceLock;
Expand Down Expand Up @@ -371,7 +372,7 @@ interface TabletUpdates<T> {

T putTime(MetadataTime time);

T putBulkFile(ReferencedTabletFile bulkref, long tid);
T putBulkFile(ReferencedTabletFile bulkref, FateId fateId);

T deleteBulkFile(StoredTabletFile bulkref);

Expand All @@ -383,9 +384,9 @@ interface TabletUpdates<T> {

T deleteExternalCompaction(ExternalCompactionId ecid);

T putCompacted(long fateTxid);
T putCompacted(FateId fateId);

T deleteCompacted(long fateTxid);
T deleteCompacted(FateId fateId);

T putTabletAvailability(TabletAvailability tabletAvailability);

Expand Down Expand Up @@ -659,9 +660,9 @@ default void deleteScanServerFileReferences(String serverAddress, UUID serverSes
* Create a Bulk Load In Progress flag in the metadata table
*
* @param path The bulk directory filepath
* @param fateTxid The id of the Bulk Import Fate operation.
* @param fateId The FateId of the Bulk Import Fate operation.
*/
default void addBulkLoadInProgressFlag(String path, long fateTxid) {
default void addBulkLoadInProgressFlag(String path, FateId fateId) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
Expand All @@ -41,19 +42,19 @@ public class CompactionMetadata {
private final short priority;
private final CompactorGroupId cgid;
private final boolean propagateDeletes;
private final Long fateTxId;
private final FateId fateId;

public CompactionMetadata(Set<StoredTabletFile> jobFiles, ReferencedTabletFile compactTmpName,
String compactorId, CompactionKind kind, short priority, CompactorGroupId ceid,
boolean propagateDeletes, Long fateTxId) {
boolean propagateDeletes, FateId fateId) {
this.jobFiles = Objects.requireNonNull(jobFiles);
this.compactTmpName = Objects.requireNonNull(compactTmpName);
this.compactorId = Objects.requireNonNull(compactorId);
this.kind = Objects.requireNonNull(kind);
this.priority = priority;
this.cgid = Objects.requireNonNull(ceid);
this.propagateDeletes = propagateDeletes;
this.fateTxId = fateTxId;
this.fateId = fateId;
}

public Set<StoredTabletFile> getJobFiles() {
Expand Down Expand Up @@ -84,8 +85,8 @@ public boolean getPropagateDeletes() {
return propagateDeletes;
}

public Long getFateTxId() {
return fateTxId;
public FateId getFateId() {
return fateId;
}

// This class is used to serialize and deserialize this class using GSon. Any changes to this
Expand All @@ -98,7 +99,7 @@ private static class GSonData {
String groupId;
short priority;
boolean propDels;
Long fateTxId;
String fateId;
}

public String toJson() {
Expand All @@ -111,7 +112,7 @@ public String toJson() {
jData.groupId = cgid.toString();
jData.priority = priority;
jData.propDels = propagateDeletes;
jData.fateTxId = fateTxId;
jData.fateId = fateId.canonical();
return GSON.get().toJson(jData);
}

Expand All @@ -121,7 +122,7 @@ public static CompactionMetadata fromJson(String json) {
return new CompactionMetadata(jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
StoredTabletFile.of(jData.tmp).getTabletFile(), jData.compactor,
CompactionKind.valueOf(jData.kind), jData.priority,
CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, jData.fateTxId);
CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, FateId.from(jData.fateId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Pair;
Expand Down Expand Up @@ -334,18 +334,13 @@ public static class BulkFileColumnFamily {
public static final String STR_NAME = "loaded";
public static final Text NAME = new Text(STR_NAME);

public static long getBulkLoadTid(Value v) {
public static FateId getBulkLoadTid(Value v) {
return getBulkLoadTid(v.toString());
}

public static long getBulkLoadTid(String vs) {
if (FateTxId.isFormatedTid(vs)) {
return FateTxId.fromString(vs);
} else {
// a new serialization format was introduce in 2.0. This code support deserializing the
// old format.
return Long.parseLong(vs);
}
public static FateId getBulkLoadTid(String vs) {
// ELASTICITY_TODO issue 4044 - May need to introduce code in upgrade to handle old format.
return FateId.from(vs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.metadata.StoredTabletFile;

import com.google.common.base.Preconditions;
Expand All @@ -43,26 +43,26 @@ public class SelectedFiles {

private final Set<StoredTabletFile> files;
private final boolean initiallySelectedAll;
private final long fateTxId;
private final FateId fateId;

private String metadataValue;

private static final Gson GSON = new GsonBuilder()
.registerTypeAdapter(SelectedFiles.class, new SelectedFilesTypeAdapter()).create();

public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, long fateTxId) {
public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, FateId fateId) {
Preconditions.checkArgument(files != null && !files.isEmpty());
this.files = Set.copyOf(files);
this.initiallySelectedAll = initiallySelectedAll;
this.fateTxId = fateTxId;
this.fateId = fateId;
}

private static class SelectedFilesTypeAdapter extends TypeAdapter<SelectedFiles> {

@Override
public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOException {
out.beginObject();
out.name("txid").value(FateTxId.formatTid(selectedFiles.getFateTxId()));
out.name("fateId").value(selectedFiles.getFateId().canonical());
out.name("selAll").value(selectedFiles.initiallySelectedAll());
out.name("files").beginArray();
// sort the data to make serialized json comparable
Expand All @@ -81,16 +81,16 @@ public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOExceptio

@Override
public SelectedFiles read(JsonReader in) throws IOException {
long fateTxId = 0L;
FateId fateId = null;
boolean selAll = false;
List<String> files = new ArrayList<>();

in.beginObject();
while (in.hasNext()) {
String name = in.nextName();
switch (name) {
case "txid":
fateTxId = FateTxId.fromString(in.nextString());
case "fateId":
fateId = FateId.from(in.nextString());
break;
case "selAll":
selAll = in.nextBoolean();
Expand All @@ -111,7 +111,7 @@ public SelectedFiles read(JsonReader in) throws IOException {
Set<StoredTabletFile> tabletFiles =
files.stream().map(StoredTabletFile::new).collect(Collectors.toSet());

return new SelectedFiles(tabletFiles, selAll, fateTxId);
return new SelectedFiles(tabletFiles, selAll, fateId);
}

}
Expand All @@ -128,8 +128,8 @@ public boolean initiallySelectedAll() {
return initiallySelectedAll;
}

public long getFateTxId() {
return fateTxId;
public FateId getFateId() {
return fateId;
}

public String getMetadataValue() {
Expand All @@ -149,13 +149,13 @@ public boolean equals(Object obj) {
return false;
}
SelectedFiles other = (SelectedFiles) obj;
return fateTxId == other.fateTxId && files.equals(other.files)
return fateId.equals(other.fateId) && files.equals(other.files)
&& initiallySelectedAll == other.initiallySelectedAll;
}

@Override
public int hashCode() {
return Objects.hash(fateTxId, files, initiallySelectedAll);
return Objects.hash(fateId, files, initiallySelectedAll);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand Down Expand Up @@ -97,7 +97,7 @@ public class TabletMetadata {
private Location location;
private Map<StoredTabletFile,DataFileValue> files;
private List<StoredTabletFile> scans;
private Map<StoredTabletFile,Long> loadedFiles;
private Map<StoredTabletFile,FateId> loadedFiles;
private SelectedFiles selectedFiles;
private EnumSet<ColumnType> fetchedCols;
private KeyExtent extent;
Expand All @@ -115,7 +115,7 @@ public class TabletMetadata {
private boolean onDemandHostingRequested = false;
private TabletOperationId operationId;
private boolean futureAndCurrentLocationSet = false;
private Set<Long> compacted;
private Set<FateId> compacted;

public static TabletMetadataBuilder builder(KeyExtent extent) {
return new TabletMetadataBuilder(extent);
Expand Down Expand Up @@ -285,7 +285,7 @@ public boolean hasCurrent() {
return location != null && location.getType() == LocationType.CURRENT;
}

public Map<StoredTabletFile,Long> getLoaded() {
public Map<StoredTabletFile,FateId> getLoaded() {
ensureFetched(ColumnType.LOADED);
return loadedFiles;
}
Expand Down Expand Up @@ -390,7 +390,7 @@ public Map<ExternalCompactionId,CompactionMetadata> getExternalCompactions() {
return extCompactions;
}

public Set<Long> getCompacted() {
public Set<FateId> getCompacted() {
ensureFetched(ColumnType.COMPACTED);
return compacted;
}
Expand Down Expand Up @@ -421,8 +421,8 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
final var scansBuilder = ImmutableList.<StoredTabletFile>builder();
final var logsBuilder = ImmutableList.<LogEntry>builder();
final var extCompBuilder = ImmutableMap.<ExternalCompactionId,CompactionMetadata>builder();
final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,Long>builder();
final var compactedBuilder = ImmutableSet.<Long>builder();
final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,FateId>builder();
final var compactedBuilder = ImmutableSet.<FateId>builder();
ByteSequence row = null;

while (rowIter.hasNext()) {
Expand Down Expand Up @@ -519,7 +519,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
te.merged = true;
break;
case CompactedColumnFamily.STR_NAME:
compactedBuilder.add(FateTxId.fromString(qual));
compactedBuilder.add(FateId.from(qual));
break;
default:
throw new IllegalStateException("Unexpected family " + fam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
Expand Down Expand Up @@ -165,9 +166,9 @@ public TabletMetadataBuilder putTime(MetadataTime time) {
}

@Override
public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, long tid) {
public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, FateId fateId) {
fetched.add(LOADED);
internalBuilder.putBulkFile(bulkref, tid);
internalBuilder.putBulkFile(bulkref, fateId);
return this;
}

Expand Down Expand Up @@ -202,14 +203,14 @@ public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId ecid)
}

@Override
public TabletMetadataBuilder putCompacted(long fateTxId) {
public TabletMetadataBuilder putCompacted(FateId fateId) {
fetched.add(COMPACTED);
internalBuilder.putCompacted(fateTxId);
internalBuilder.putCompacted(fateId);
return this;
}

@Override
public TabletMetadataBuilder deleteCompacted(long fateTxId) {
public TabletMetadataBuilder deleteCompacted(FateId fateId) {
throw new UnsupportedOperationException();
}

Expand Down
Loading

0 comments on commit f8ab378

Please sign in to comment.