Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Globally Unique FATE Transaction Ids - Part 3 #4247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateId.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.accumulo.core.fate;

import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.data.AbstractId;
import org.apache.accumulo.core.manager.thrift.TFateId;
Expand All @@ -34,6 +36,9 @@ public class FateId extends AbstractId<FateId> {
private static final long serialVersionUID = 1L;
private static final String PREFIX = "FATE:";
private static final Pattern HEX_PATTERN = Pattern.compile("^[0-9a-fA-F]+$");
private static final Pattern FATEID_PATTERN = Pattern.compile("^" + PREFIX + "("
+ Stream.of(FateInstanceType.values()).map(Enum::name).collect(Collectors.joining("|"))
+ "):[0-9a-fA-F]+$");

private FateId(String canonical) {
super(canonical);
Expand Down Expand Up @@ -86,6 +91,30 @@ public static FateId from(FateInstanceType type, String hexTid) {
}
}

/**
* @param fateIdStr the string representation of the FateId
* @return a new FateId object from the given string
*/
public static FateId from(String fateIdStr) {
if (FATEID_PATTERN.matcher(fateIdStr).matches()) {
return new FateId(fateIdStr);
} else {
throw new IllegalArgumentException("Invalid Fate ID: " + fateIdStr);
}
}

/**
* @param fateIdStr the string representation of the FateId
* @return true if the string is a valid FateId, false otherwise
*/
public static boolean isFormattedTid(String fateIdStr) {
return FATEID_PATTERN.matcher(fateIdStr).matches();
}

/**
* @param tFateId the TFateId
* @return the FateId equivalent of the given TFateId
*/
public static FateId fromThrift(TFateId tFateId) {
FateInstanceType type;
long tid = tFateId.getTid();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.gc.GcCandidate;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.lock.ServiceLock;
Expand Down Expand Up @@ -371,7 +372,7 @@ interface TabletUpdates<T> {

T putTime(MetadataTime time);

T putBulkFile(ReferencedTabletFile bulkref, long tid);
T putBulkFile(ReferencedTabletFile bulkref, FateId fateId);

T deleteBulkFile(StoredTabletFile bulkref);

Expand All @@ -383,9 +384,9 @@ interface TabletUpdates<T> {

T deleteExternalCompaction(ExternalCompactionId ecid);

T putCompacted(long fateTxid);
T putCompacted(FateId fateId);

T deleteCompacted(long fateTxid);
T deleteCompacted(FateId fateId);

T putTabletAvailability(TabletAvailability tabletAvailability);

Expand Down Expand Up @@ -659,9 +660,9 @@ default void deleteScanServerFileReferences(String serverAddress, UUID serverSes
* Create a Bulk Load In Progress flag in the metadata table
*
* @param path The bulk directory filepath
* @param fateTxid The id of the Bulk Import Fate operation.
* @param fateId The FateId of the Bulk Import Fate operation.
*/
default void addBulkLoadInProgressFlag(String path, long fateTxid) {
default void addBulkLoadInProgressFlag(String path, FateId fateId) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.Set;

import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
Expand All @@ -41,19 +42,19 @@ public class CompactionMetadata {
private final short priority;
private final CompactorGroupId cgid;
private final boolean propagateDeletes;
private final Long fateTxId;
private final FateId fateId;

public CompactionMetadata(Set<StoredTabletFile> jobFiles, ReferencedTabletFile compactTmpName,
String compactorId, CompactionKind kind, short priority, CompactorGroupId ceid,
boolean propagateDeletes, Long fateTxId) {
boolean propagateDeletes, FateId fateId) {
this.jobFiles = Objects.requireNonNull(jobFiles);
this.compactTmpName = Objects.requireNonNull(compactTmpName);
this.compactorId = Objects.requireNonNull(compactorId);
this.kind = Objects.requireNonNull(kind);
this.priority = priority;
this.cgid = Objects.requireNonNull(ceid);
this.propagateDeletes = propagateDeletes;
this.fateTxId = fateTxId;
this.fateId = fateId;
}

public Set<StoredTabletFile> getJobFiles() {
Expand Down Expand Up @@ -84,8 +85,8 @@ public boolean getPropagateDeletes() {
return propagateDeletes;
}

public Long getFateTxId() {
return fateTxId;
public FateId getFateId() {
return fateId;
}

// This class is used to serialize and deserialize this class using GSon. Any changes to this
Expand All @@ -98,7 +99,7 @@ private static class GSonData {
String groupId;
short priority;
boolean propDels;
Long fateTxId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need a follow on update for upgrade

String fateId;
}

public String toJson() {
Expand All @@ -111,7 +112,7 @@ public String toJson() {
jData.groupId = cgid.toString();
jData.priority = priority;
jData.propDels = propagateDeletes;
jData.fateTxId = fateTxId;
jData.fateId = fateId.canonical();
return GSON.get().toJson(jData);
}

Expand All @@ -121,7 +122,7 @@ public static CompactionMetadata fromJson(String json) {
return new CompactionMetadata(jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
StoredTabletFile.of(jData.tmp).getTabletFile(), jData.compactor,
CompactionKind.valueOf(jData.kind), jData.priority,
CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, jData.fateTxId);
CompactorGroupIdImpl.groupId(jData.groupId), jData.propDels, FateId.from(jData.fateId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.schema.Section;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.Pair;
Expand Down Expand Up @@ -334,18 +334,13 @@ public static class BulkFileColumnFamily {
public static final String STR_NAME = "loaded";
public static final Text NAME = new Text(STR_NAME);

public static long getBulkLoadTid(Value v) {
public static FateId getBulkLoadTid(Value v) {
return getBulkLoadTid(v.toString());
}

public static long getBulkLoadTid(String vs) {
if (FateTxId.isFormatedTid(vs)) {
return FateTxId.fromString(vs);
} else {
// a new serialization format was introduce in 2.0. This code support deserializing the
// old format.
return Long.parseLong(vs);
}
public static FateId getBulkLoadTid(String vs) {
// ELASTICITY_TODO issue 4044 - May need to introduce code in upgrade to handle old format.
return FateId.from(vs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.metadata.StoredTabletFile;

import com.google.common.base.Preconditions;
Expand All @@ -43,26 +43,26 @@ public class SelectedFiles {

private final Set<StoredTabletFile> files;
private final boolean initiallySelectedAll;
private final long fateTxId;
private final FateId fateId;

private String metadataValue;

private static final Gson GSON = new GsonBuilder()
.registerTypeAdapter(SelectedFiles.class, new SelectedFilesTypeAdapter()).create();

public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, long fateTxId) {
public SelectedFiles(Set<StoredTabletFile> files, boolean initiallySelectedAll, FateId fateId) {
Preconditions.checkArgument(files != null && !files.isEmpty());
this.files = Set.copyOf(files);
this.initiallySelectedAll = initiallySelectedAll;
this.fateTxId = fateTxId;
this.fateId = fateId;
}

private static class SelectedFilesTypeAdapter extends TypeAdapter<SelectedFiles> {

@Override
public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOException {
out.beginObject();
out.name("txid").value(FateTxId.formatTid(selectedFiles.getFateTxId()));
out.name("fateId").value(selectedFiles.getFateId().canonical());
out.name("selAll").value(selectedFiles.initiallySelectedAll());
out.name("files").beginArray();
// sort the data to make serialized json comparable
Expand All @@ -81,16 +81,16 @@ public void write(JsonWriter out, SelectedFiles selectedFiles) throws IOExceptio

@Override
public SelectedFiles read(JsonReader in) throws IOException {
long fateTxId = 0L;
FateId fateId = null;
boolean selAll = false;
List<String> files = new ArrayList<>();

in.beginObject();
while (in.hasNext()) {
String name = in.nextName();
switch (name) {
case "txid":
fateTxId = FateTxId.fromString(in.nextString());
case "fateId":
fateId = FateId.from(in.nextString());
break;
case "selAll":
selAll = in.nextBoolean();
Expand All @@ -111,7 +111,7 @@ public SelectedFiles read(JsonReader in) throws IOException {
Set<StoredTabletFile> tabletFiles =
files.stream().map(StoredTabletFile::new).collect(Collectors.toSet());

return new SelectedFiles(tabletFiles, selAll, fateTxId);
return new SelectedFiles(tabletFiles, selAll, fateId);
}

}
Expand All @@ -128,8 +128,8 @@ public boolean initiallySelectedAll() {
return initiallySelectedAll;
}

public long getFateTxId() {
return fateTxId;
public FateId getFateId() {
return fateId;
}

public String getMetadataValue() {
Expand All @@ -149,13 +149,13 @@ public boolean equals(Object obj) {
return false;
}
SelectedFiles other = (SelectedFiles) obj;
return fateTxId == other.fateTxId && files.equals(other.files)
return fateId.equals(other.fateId) && files.equals(other.files)
&& initiallySelectedAll == other.initiallySelectedAll;
}

@Override
public int hashCode() {
return Objects.hash(fateTxId, files, initiallySelectedAll);
return Objects.hash(fateId, files, initiallySelectedAll);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
Expand Down Expand Up @@ -97,7 +97,7 @@ public class TabletMetadata {
private Location location;
private Map<StoredTabletFile,DataFileValue> files;
private List<StoredTabletFile> scans;
private Map<StoredTabletFile,Long> loadedFiles;
private Map<StoredTabletFile,FateId> loadedFiles;
private SelectedFiles selectedFiles;
private EnumSet<ColumnType> fetchedCols;
private KeyExtent extent;
Expand All @@ -115,7 +115,7 @@ public class TabletMetadata {
private boolean onDemandHostingRequested = false;
private TabletOperationId operationId;
private boolean futureAndCurrentLocationSet = false;
private Set<Long> compacted;
private Set<FateId> compacted;

public static TabletMetadataBuilder builder(KeyExtent extent) {
return new TabletMetadataBuilder(extent);
Expand Down Expand Up @@ -285,7 +285,7 @@ public boolean hasCurrent() {
return location != null && location.getType() == LocationType.CURRENT;
}

public Map<StoredTabletFile,Long> getLoaded() {
public Map<StoredTabletFile,FateId> getLoaded() {
ensureFetched(ColumnType.LOADED);
return loadedFiles;
}
Expand Down Expand Up @@ -390,7 +390,7 @@ public Map<ExternalCompactionId,CompactionMetadata> getExternalCompactions() {
return extCompactions;
}

public Set<Long> getCompacted() {
public Set<FateId> getCompacted() {
ensureFetched(ColumnType.COMPACTED);
return compacted;
}
Expand Down Expand Up @@ -421,8 +421,8 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
final var scansBuilder = ImmutableList.<StoredTabletFile>builder();
final var logsBuilder = ImmutableList.<LogEntry>builder();
final var extCompBuilder = ImmutableMap.<ExternalCompactionId,CompactionMetadata>builder();
final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,Long>builder();
final var compactedBuilder = ImmutableSet.<Long>builder();
final var loadedFilesBuilder = ImmutableMap.<StoredTabletFile,FateId>builder();
final var compactedBuilder = ImmutableSet.<FateId>builder();
ByteSequence row = null;

while (rowIter.hasNext()) {
Expand Down Expand Up @@ -519,7 +519,7 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
te.merged = true;
break;
case CompactedColumnFamily.STR_NAME:
compactedBuilder.add(FateTxId.fromString(qual));
compactedBuilder.add(FateId.from(qual));
break;
default:
throw new IllegalStateException("Unexpected family " + fam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
Expand Down Expand Up @@ -165,9 +166,9 @@ public TabletMetadataBuilder putTime(MetadataTime time) {
}

@Override
public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, long tid) {
public TabletMetadataBuilder putBulkFile(ReferencedTabletFile bulkref, FateId fateId) {
fetched.add(LOADED);
internalBuilder.putBulkFile(bulkref, tid);
internalBuilder.putBulkFile(bulkref, fateId);
return this;
}

Expand Down Expand Up @@ -202,14 +203,14 @@ public TabletMetadataBuilder deleteExternalCompaction(ExternalCompactionId ecid)
}

@Override
public TabletMetadataBuilder putCompacted(long fateTxId) {
public TabletMetadataBuilder putCompacted(FateId fateId) {
fetched.add(COMPACTED);
internalBuilder.putCompacted(fateTxId);
internalBuilder.putCompacted(fateId);
return this;
}

@Override
public TabletMetadataBuilder deleteCompacted(long fateTxId) {
public TabletMetadataBuilder deleteCompacted(FateId fateId) {
throw new UnsupportedOperationException();
}

Expand Down
Loading