Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24516 : Refeactor building of idle verify result. #11866

Merged
merged 17 commits into from
Feb 20, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -53,19 +54,19 @@ public class IdleVerifyResult extends VisorDataTransferObject {

/** Counter conflicts. */
@GridToStringInclude
private Map<PartitionKey, List<PartitionHashRecord>> cntrConflicts = new HashMap<>();
private Map<PartitionKey, List<PartitionHashRecord>> cntrConflicts;

/** Hash conflicts. */
@GridToStringInclude
private Map<PartitionKey, List<PartitionHashRecord>> hashConflicts = new HashMap<>();
private Map<PartitionKey, List<PartitionHashRecord>> hashConflicts;

/** Moving partitions. */
@GridToStringInclude
private Map<PartitionKey, List<PartitionHashRecord>> movingPartitions = new HashMap<>();
private Map<PartitionKey, List<PartitionHashRecord>> movingPartitions;

/** Lost partitions. */
@GridToStringInclude
private Map<PartitionKey, List<PartitionHashRecord>> lostPartitions = new HashMap<>();
private Map<PartitionKey, List<PartitionHashRecord>> lostPartitions;

/** Transaction hashes conflicts. */
@GridToStringInclude
Expand All @@ -86,69 +87,23 @@ public IdleVerifyResult() {
}

/**
* @param exceptions Occurred exceptions.
* @see #builder()
*/
public IdleVerifyResult(Map<ClusterNode, Exception> exceptions) {
this.exceptions = exceptions;
}

/**
* @param txHashConflicts Transaction hashes conflicts.
*/
public IdleVerifyResult(
Map<PartitionKey, List<PartitionHashRecord>> clusterHashes,
private IdleVerifyResult(
Map<PartitionKey, List<PartitionHashRecord>> cntrConflicts,
Map<PartitionKey, List<PartitionHashRecord>> hashConflicts,
Map<PartitionKey, List<PartitionHashRecord>> movingPartitions,
Map<PartitionKey, List<PartitionHashRecord>> lostPartitions,
@Nullable List<List<TransactionsHashRecord>> txHashConflicts,
@Nullable Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs
@Nullable Map<ClusterNode, Collection<GridCacheVersion>> partiallyCommittedTxs,
Map<ClusterNode, Exception> exceptions
) {
this(clusterHashes, Collections.emptyMap());

this.cntrConflicts = cntrConflicts;
this.hashConflicts = hashConflicts;
this.movingPartitions = movingPartitions;
this.lostPartitions = lostPartitions;
this.txHashConflicts = txHashConflicts;
this.partiallyCommittedTxs = partiallyCommittedTxs;
}

/**
* @param clusterHashes Map of cluster partition hashes.
* @param exceptions Exceptions on each cluster node.
*/
public IdleVerifyResult(
Map<PartitionKey, List<PartitionHashRecord>> clusterHashes,
Map<ClusterNode, Exception> exceptions
) {
for (Map.Entry<PartitionKey, List<PartitionHashRecord>> e : clusterHashes.entrySet()) {
Integer partHash = null;
Integer partVerHash = null;
Object updateCntr = null;

for (PartitionHashRecord record : e.getValue()) {
if (record.partitionState() == PartitionHashRecord.PartitionState.MOVING) {
movingPartitions.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
.add(record);

continue;
}

if (record.partitionState() == PartitionHashRecord.PartitionState.LOST) {
lostPartitions.computeIfAbsent(e.getKey(), k -> new ArrayList<>())
.add(record);

continue;
}

if (partHash == null) {
partHash = record.partitionHash();
partVerHash = record.partitionVersionsHash();

updateCntr = record.updateCounter();
}
else {
if (!Objects.equals(record.updateCounter(), updateCntr))
cntrConflicts.putIfAbsent(e.getKey(), e.getValue());

if (record.partitionHash() != partHash || record.partitionVersionsHash() != partVerHash)
hashConflicts.putIfAbsent(e.getKey(), e.getValue());
}
}
}

this.exceptions = exceptions;
}
Expand Down Expand Up @@ -414,4 +369,183 @@ private void printConflicts(Consumer<String> printer) {
@Override public String toString() {
return S.toString(IdleVerifyResult.class, this);
}

/** @return A fresh result builder. */
public static Builder builder() {
return new Builder();
}

/** Builder of {@link IdleVerifyResult}. Is not thread-safe. */
public static final class Builder {
/** */
private @Nullable Map<PartitionKey, List<PartitionHashRecord>> partHashes;

/** */
private @Nullable List<List<TransactionsHashRecord>> txHashConflicts;

/** */
private @Nullable Map<ClusterNode, Collection<GridCacheVersion>> partCommitTxs;

/** Incremental snapshot transactions records per consistent id. */
private @Nullable Map<Object, Map<Object, TransactionsHashRecord>> incrTxHashRecords;

/** */
private @Nullable Map<ClusterNode, Exception> exceptions;

/** */
private Builder() {
// No-op.
}

/** Build a {@link IdleVerifyResult}. */
public IdleVerifyResult build() {
// Add all missed incremental pairs to the conflicts.
if (!F.isEmpty(incrTxHashRecords))
incrTxHashRecords.values().stream().flatMap(e -> e.values().stream()).forEach(e -> addTxConflicts(F.asList(e, null)));

Map<PartitionKey, List<PartitionHashRecord>> cntrConflicts = new HashMap<>();
Map<PartitionKey, List<PartitionHashRecord>> hashConflicts = new HashMap<>();
Map<PartitionKey, List<PartitionHashRecord>> movingPartitions = new HashMap<>();
Map<PartitionKey, List<PartitionHashRecord>> lostPartitions = new HashMap<>();

if (exceptions == null)
exceptions = Collections.emptyMap();

if (F.isEmpty(partHashes)) {
return new IdleVerifyResult(cntrConflicts, hashConflicts, movingPartitions, lostPartitions, txHashConflicts,
partCommitTxs, exceptions);
}

for (Map.Entry<PartitionKey, List<PartitionHashRecord>> e : partHashes.entrySet()) {
Integer partHash = null;
Integer partVerHash = null;
Object updateCntr = null;

for (PartitionHashRecord record : e.getValue()) {
if (record.partitionState() == PartitionHashRecord.PartitionState.MOVING) {
movingPartitions.computeIfAbsent(e.getKey(), k -> new ArrayList<>()).add(record);

continue;
}

if (record.partitionState() == PartitionHashRecord.PartitionState.LOST) {
lostPartitions.computeIfAbsent(e.getKey(), k -> new ArrayList<>()).add(record);

continue;
}

if (partHash == null) {
partHash = record.partitionHash();
partVerHash = record.partitionVersionsHash();

updateCntr = record.updateCounter();
}
else {
if (!Objects.equals(record.updateCounter(), updateCntr))
cntrConflicts.putIfAbsent(e.getKey(), e.getValue());

if (record.partitionHash() != partHash || record.partitionVersionsHash() != partVerHash)
hashConflicts.putIfAbsent(e.getKey(), e.getValue());
}
}
}

return new IdleVerifyResult(cntrConflicts, hashConflicts, movingPartitions, lostPartitions, txHashConflicts,
partCommitTxs, exceptions);
}

/** Stores an exception if none is set for certain node. */
public Builder addException(ClusterNode node, Exception e) {
assert e != null;

if (exceptions == null)
exceptions = new HashMap<>();

exceptions.putIfAbsent(node, e);

return this;
}

/** Sets all the result exceptions. */
public Builder exceptions(Map<ClusterNode, Exception> exceptions) {
assert this.exceptions == null;
assert exceptions != null;

this.exceptions = exceptions;

return this;
}

/** Stores map of partition hashes per partition key. */
public void addPartitionHashes(Map<PartitionKey, PartitionHashRecord> newHashes) {
newHashes.forEach((key, hash) -> {
if (partHashes == null)
partHashes = new HashMap<>();

partHashes.compute(key, (key0, hashes0) -> {
if (hashes0 == null)
hashes0 = new ArrayList<>();

hashes0.add(hash);

return hashes0;
});
});
}

/** Stores incremental snapshot transaction hash records of a certain node. */
public void addIncrementalHashRecords(ClusterNode node, Map<Object, TransactionsHashRecord> res) {
if (incrTxHashRecords == null)
incrTxHashRecords = new HashMap<>();

assert incrTxHashRecords.get(node.consistentId()) == null;

incrTxHashRecords.put(node.consistentId(), res);

Iterator<Map.Entry<Object, TransactionsHashRecord>> resIt = res.entrySet().iterator();

while (resIt.hasNext()) {
Map.Entry<Object, TransactionsHashRecord> nodeTxHash = resIt.next();

Map<Object, TransactionsHashRecord> prevNodeTxHash = incrTxHashRecords.get(nodeTxHash.getKey());

if (prevNodeTxHash != null) {
TransactionsHashRecord hash = nodeTxHash.getValue();
TransactionsHashRecord prevHash = prevNodeTxHash.remove(hash.localConsistentId());

if (prevHash == null || prevHash.transactionHash() != hash.transactionHash())
addTxConflicts(F.asList(hash, prevHash));

resIt.remove();
}
}
}

/** Stores transaction conflicts. */
private Builder addTxConflicts(List<TransactionsHashRecord> newTxConflicts) {
if (txHashConflicts == null)
txHashConflicts = new ArrayList<>();

txHashConflicts.add(newTxConflicts);

return this;
}

/** Stores partially commited transactions of a certain node. */
public Builder addPartiallyCommited(ClusterNode node, Collection<GridCacheVersion> newVerisons) {
if (partCommitTxs == null)
partCommitTxs = new HashMap<>();

partCommitTxs.compute(node, (node0, versions0) -> {
if (versions0 == null)
versions0 = new ArrayList<>();

versions0.addAll(newVerisons);

return versions0;
});

return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,29 +151,21 @@ public class VerifyBackupPartitionsTask extends ComputeTaskAdapter<CacheIdleVeri
* @return Idle verify job result constructed from results of remote executions.
*/
public static IdleVerifyResult reduce0(List<ComputeJobResult> results) {
Map<PartitionKey, List<PartitionHashRecord>> clusterHashes = new HashMap<>();
Map<ClusterNode, Exception> ex = new HashMap<>();
IdleVerifyResult.Builder bldr = IdleVerifyResult.builder();

for (ComputeJobResult res : results) {
if (res.getException() != null) {
ex.put(res.getNode(), res.getException());
bldr.addException(res.getNode(), res.getException());

continue;
}

Map<PartitionKey, PartitionHashRecord> nodeHashes = res.getData();

for (Map.Entry<PartitionKey, PartitionHashRecord> e : nodeHashes.entrySet()) {
List<PartitionHashRecord> records = clusterHashes.computeIfAbsent(e.getKey(), k -> new ArrayList<>());

records.add(e.getValue());
}
bldr.addPartitionHashes(nodeHashes);
}

if (results.size() != ex.size())
return new IdleVerifyResult(clusterHashes, ex);
else
return new IdleVerifyResult(ex);
return bldr.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1792,23 +1792,29 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(
kctx0.task().execute(
cls,
new SnapshotPartitionsVerifyTaskArg(grps, metas, snpPath, incIdx, check),
options(new ArrayList<>(metas.keySet()))
).listen(f1 -> {
if (f1.error() == null)
res.onDone(f1.result());
else if (f1.error() instanceof IgniteSnapshotVerifyException)
res.onDone(new SnapshotPartitionsVerifyTaskResult(metas,
new IdleVerifyResult(((IgniteSnapshotVerifyException)f1.error()).exceptions())));
else
res.onDone(f1.error());
});
options(new ArrayList<>(metas.keySet()))
).listen(f1 -> {
if (f1.error() == null)
res.onDone(f1.result());
else if (f1.error() instanceof IgniteSnapshotVerifyException) {
IdleVerifyResult idleRes = IdleVerifyResult.builder()
.exceptions(((IgniteSnapshotVerifyException)f1.error()).exceptions()).build();

res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, idleRes));
}
else
res.onDone(f1.error());
});
}
else {
if (f0.error() == null)
res.onDone(new IgniteSnapshotVerifyException(metasRes.exceptions()));
else if (f0.error() instanceof IgniteSnapshotVerifyException)
res.onDone(new SnapshotPartitionsVerifyTaskResult(null,
new IdleVerifyResult(((IgniteSnapshotVerifyException)f0.error()).exceptions())));
else if (f0.error() instanceof IgniteSnapshotVerifyException) {
IdleVerifyResult idleRes = IdleVerifyResult.builder()
.exceptions(((IgniteSnapshotVerifyException)f0.error()).exceptions()).build();

res.onDone(new SnapshotPartitionsVerifyTaskResult(null, idleRes));
}
else
res.onDone(f0.error());
}
Expand Down
Loading