Skip to content

Commit

Permalink
hack co for baseline
Browse files Browse the repository at this point in the history
  • Loading branch information
danielxiangzl committed Jan 9, 2025
1 parent f58f417 commit 28d7444
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
15 changes: 15 additions & 0 deletions consensus/src/consensus_observer/publisher/consensus_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ use crate::consensus_observer::{
};
use aptos_channels::aptos_channel::Receiver;
use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId};
use aptos_crypto::HashValue;
use aptos_infallible::RwLock;
use aptos_logger::{error, info, warn};
use aptos_network::application::interface::NetworkClient;
use dashmap::DashMap;
use futures::StreamExt;
use futures_channel::mpsc;
use std::{collections::HashSet, sync::Arc, time::Duration};
Expand All @@ -41,6 +43,8 @@ pub struct ConsensusPublisher {

// The sender for outbound network messages
outbound_message_sender: mpsc::Sender<(PeerNetworkId, ConsensusObserverDirectSend)>,

buffered_ordered_block: DashMap<HashValue, ConsensusObserverDirectSend>,
}

impl ConsensusPublisher {
Expand All @@ -64,6 +68,7 @@ impl ConsensusPublisher {
consensus_observer_config,
active_subscribers: Arc::new(RwLock::new(HashSet::new())),
outbound_message_sender,
buffered_ordered_block: DashMap::new(),
};

// Return the publisher and the outbound message receiver
Expand Down Expand Up @@ -231,6 +236,16 @@ impl ConsensusPublisher {
}
}

pub fn buffer_ordered_block(&self, block_id: HashValue, message: ConsensusObserverDirectSend) {
self.buffered_ordered_block.insert(block_id, message);
}

pub fn flush_buffered_ordered_block(&self, block_id: HashValue) {
if let Some((_, message)) = self.buffered_ordered_block.remove(&block_id) {
self.publish_message(message);
}
}

/// Starts the consensus publisher
pub async fn start(
self,
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/pipeline/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ impl BufferManager {
ordered_blocks.clone().into_iter().map(Arc::new).collect(),
ordered_proof.clone(),
);
consensus_publisher.publish_message(message);
// consensus_publisher.publish_message(message);
let block_id = ordered_blocks.last().unwrap().id();
consensus_publisher.buffer_ordered_block(block_id, message);
}
self.execution_schedule_phase_tx
.send(request)
Expand Down Expand Up @@ -526,6 +528,8 @@ impl BufferManager {
self.reset().await;
}
if let Some(consensus_publisher) = &self.consensus_publisher {
let block_id = block.id();
consensus_publisher.flush_buffered_ordered_block(block_id);
let message =
ConsensusObserverMessage::new_commit_decision_message(commit_proof.clone());
consensus_publisher.publish_message(message);
Expand Down

0 comments on commit 28d7444

Please sign in to comment.