Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIE-2026] Separate in-sync from sync-status listeners #100

Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ public void removeTransactionDroppedListener(final long listenerIdentifier) {

@Override
public long addSyncStatusListener(final SyncStatusListener syncStatusListener) {
return syncState.addSyncStatusListener(syncStatusListener);
return syncState.subscribeSyncStatus(syncStatusListener);
}

@Override
public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.removeSyncStatusListener(listenerIdentifier);
syncState.unsubscribeSyncStatus(listenerIdentifier);
}
}
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ task checkMavenCoordianteCollisions {
def coordinate = it.publishing?.publications[0].coordinates
if (coordinates.containsKey(coordinate)) {
throw new GradleException("Duplicate maven coordinates detected, ${coordinate} is used by " +
"both ${coordinates[coordinate]} and ${it.path}.\n" +
"Please add a `publishing` script block to one or both subprojects.")
"both ${coordinates[coordinate]} and ${it.path}.\n" +
"Please add a `publishing` script block to one or both subprojects.")
}
coordinates[coordinate] = it.path
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class SyncingSubscriptionService {
public SyncingSubscriptionService(
final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) {
this.subscriptionManager = subscriptionManager;
synchronizer.observeSyncStatus(this::sendSyncingToMatchingSubscriptions);
synchronizer.subscribeSyncStatus(this::sendSyncingToMatchingSubscriptions);
}

private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class SyncingSubscriptionServiceTest {
public void before() {
final ArgumentCaptor<SyncStatusListener> captor =
ArgumentCaptor.forClass(SyncStatusListener.class);
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L);
when(synchronizer.subscribeSyncStatus(captor.capture())).thenReturn(1L);
new SyncingSubscriptionService(subscriptionManager, synchronizer);
syncStatusListener = captor.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public AbstractMiningCoordinator(
this.blockchain = blockchain;
this.syncState = syncState;
this.blockchain.observeBlockAdded(this);
syncState.addInSyncListener(this::inSyncChanged);
syncState.subscribeInSync(this::inSyncChanged);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.util.Subscribers.Unsubscriber;

import java.util.Optional;

/** Provides an interface to block synchronization processes. */
public interface Synchronizer {

// Default tolerance used to determine whether or not this node is "in sync"
long DEFAULT_IN_SYNC_TOLERANCE = 5;

void start();

void stop();
Expand All @@ -32,7 +36,35 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();

long observeSyncStatus(final BesuEvents.SyncStatusListener listener);
long subscribeSyncStatus(final BesuEvents.SyncStatusListener listener);

boolean unsubscribeSyncStatus(long observerId);

/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code DEFAULT_IN_SYNC_TOLERANCE} behind the
* highest estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @return An {@code Unsubscriber} that can be used to stop listening for these events
*/
Unsubscriber subscribeInSync(final InSyncListener listener);

/**
* Add a listener that will be notified when this node's sync status changes. A node is considered
* in-sync if the local chain height is no more than {@code syncTolerance} behind the highest
* estimated remote chain height.
*
* @param listener The callback to invoke when the sync status changes
* @param syncTolerance The tolerance used to determine whether this node is in-sync. A value of
* zero means that the node is considered in-sync only when the local chain height is greater
* than or equal to the best estimated remote chain height.
* @return An {@code Unsubscriber} that can be used to stop listening for these events
*/
Unsubscriber subscribeInSync(final InSyncListener listener, final long syncTolerance);

boolean removeObserver(long observerId);
@FunctionalInterface
interface InSyncListener {
void onInSyncStatusChange(boolean newSyncStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,14 @@ public List<Block> blockSequence(
}

public Block genesisBlock() {
final BlockOptions options =
new BlockOptions()
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return genesisBlock(new BlockOptions());
}

public Block genesisBlock(final BlockOptions options) {
options
.setBlockNumber(BlockHeader.GENESIS_BLOCK_NUMBER)
.setStateRoot(Hash.EMPTY_TRIE_HASH)
.setParentHash(Hash.ZERO);
return block(options);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.util.uint.UInt256;

public interface ChainHeadEstimate {

UInt256 getEstimatedTotalDifficulty();

long getEstimatedHeight();

/**
* Returns true if this chain state represents a chain that is "better" than the chain represented
* by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or
* heavier than the supplied {@code chainToCheck}.
*
* @param chainToCheck The chain being compared.
* @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}.
*/
default boolean chainIsBetterThan(final ChainHead chainToCheck) {
return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck);
}

default boolean hasHigherDifficultyThan(final ChainHead chainToCheck) {
return getEstimatedTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0;
}

default boolean hasLongerChainThan(final ChainHead chainToCheck) {
return getEstimatedHeight() > chainToCheck.getHeight();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.chain.ChainHead;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.util.Subscribers;
import org.hyperledger.besu.util.uint.UInt256;

import com.google.common.base.MoreObjects;

public class ChainState {
public class ChainState implements ChainHeadEstimate {
// The best block by total difficulty that we know about
private final BestBlock bestBlock = new BestBlock();
// The highest block that we've seen
Expand All @@ -40,14 +39,20 @@ public void removeEstimatedHeightListener(final long listenerId) {
estimatedHeightListeners.unsubscribe(listenerId);
}

public ChainStateSnapshot getSnapshot() {
return new ChainStateSnapshot(getEstimatedTotalDifficulty(), getEstimatedHeight());
}

public boolean hasEstimatedHeight() {
return estimatedHeightKnown;
}

@Override
public long getEstimatedHeight() {
return estimatedHeight;
}

@Override
public UInt256 getEstimatedTotalDifficulty() {
return bestBlock.getTotalDifficulty();
}
Expand Down Expand Up @@ -106,26 +111,6 @@ public void updateHeightEstimate(final long blockNumber) {
}
}

/**
* Returns true if this chain state represents a chain that is "better" than the chain represented
* by the supplied {@link ChainHead}. "Better" currently means that this chain is longer or
* heavier than the supplied {@code chainToCheck}.
*
* @param chainToCheck The chain being compared.
* @return true if this {@link ChainState} represents a better chain than {@code chainToCheck}.
*/
public boolean chainIsBetterThan(final ChainHead chainToCheck) {
return hasHigherDifficultyThan(chainToCheck) || hasLongerChainThan(chainToCheck);
}

private boolean hasHigherDifficultyThan(final ChainHead chainToCheck) {
return bestBlock.getTotalDifficulty().compareTo(chainToCheck.getTotalDifficulty()) > 0;
}

private boolean hasLongerChainThan(final ChainHead chainToCheck) {
return estimatedHeight > chainToCheck.getHeight();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.util.uint.UInt256;

public class ChainStateSnapshot implements ChainHeadEstimate {
private final UInt256 totalDifficulty;
private final long chainHeight;

public ChainStateSnapshot(final UInt256 totalDifficulty, final long chainHeight) {
this.totalDifficulty = totalDifficulty;
this.chainHeight = chainHeight;
}

@Override
public UInt256 getEstimatedTotalDifficulty() {
return totalDifficulty;
}

@Override
public long getEstimatedHeight() {
return chainHeight;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,16 @@ public boolean hasSeenBlock(final Hash hash) {
return knownBlocks.contains(hash);
}

/** @return This peer's current chain state. */
public ChainState chainState() {
return chainHeadState;
}

/** @return A read-only snapshot of this peer's current {@code chainState} } */
public ChainHeadEstimate chainStateSnapshot() {
return chainHeadState.getSnapshot();
}

public void registerHeight(final Hash blockHash, final long height) {
chainHeadState.update(blockHash, height);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;
import org.hyperledger.besu.util.Subscribers;
import org.hyperledger.besu.util.Subscribers.Unsubscriber;

import java.nio.file.Path;
import java.time.Clock;
Expand All @@ -51,7 +51,6 @@ public class DefaultSynchronizer<C> implements Synchronizer {
private final Optional<Pruner> maybePruner;
private final SyncState syncState;
private final AtomicBoolean running = new AtomicBoolean(false);
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
private final BlockPropagationManager<C> blockPropagationManager;
private final Optional<FastSyncDownloader<C>> fastSyncDownloader;
private final FullSyncDownloader<C> fullSyncDownloader;
Expand Down Expand Up @@ -126,7 +125,6 @@ private TrailingPeerRequirements calculateTrailingPeerRequirements() {
public void start() {
if (running.compareAndSet(false, true)) {
LOG.info("Starting synchronizer.");
syncState.addSyncStatusListener(this::syncStatusCallback);
blockPropagationManager.start();
if (fastSyncDownloader.isPresent()) {
fastSyncDownloader.get().start().whenComplete(this::handleFastSyncResult);
Expand Down Expand Up @@ -187,17 +185,23 @@ public Optional<SyncStatus> getSyncStatus() {
}

@Override
public long observeSyncStatus(final SyncStatusListener listener) {
public long subscribeSyncStatus(final SyncStatusListener listener) {
checkNotNull(listener);
return syncStatusListeners.subscribe(listener);
return syncState.subscribeSyncStatus(listener);
}

@Override
public boolean removeObserver(final long observerId) {
return syncStatusListeners.unsubscribe(observerId);
public boolean unsubscribeSyncStatus(final long subscriberId) {
return syncState.unsubscribeSyncStatus(subscriberId);
}

private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status));
@Override
public Unsubscriber subscribeInSync(final InSyncListener listener) {
return syncState.subscribeInSync(listener);
}

@Override
public Unsubscriber subscribeInSync(final InSyncListener listener, final long syncTolerance) {
return syncState.subscribeInSync(listener, syncTolerance);
}
}
Loading