-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial server commit. #1
Conversation
…es, efficacy measurement and response length visibility
* add channel update delta tallies, and omit updates that have no delta * Update the server full gossip data once the downloader is caught up with the gossip. * Cache the warp response type directly.
…emental channel updates
…hether intermediate updates should be included, and preparing an easily traversable serialization set.
� Conflicts: � src/download.rs � src/main.rs � src/server.rs
…activating dynamic response route.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two more notes about being robust against a slow db
impl GossipPersister { | ||
pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self { | ||
let (gossip_persistence_sender, gossip_persistence_receiver) = | ||
mpsc::channel::<DetectedGossipMessage>(10000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably not be so huge - if the DB gets behind we'll end up creating snapshots that are missing db entries that are still in the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't actually. The snapshot generation is triggered by the persistence tasks being completed, not the gossip getting caught up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't the snapshot generation based on an entry in the queue? There can still be updates in the queue after that entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but there will always be updates in the queue after that entry because you're gonna keep receiving new gossip. The criterion is "sufficiently caught up," but considering that snapshot generation takes a couple minutes to calculate, the odds of not receiving any gossip during that time are diminishingly low. We can't stop and restart snapshot generation every time we receive a new gossip message, right? We'd never produce any snapshots.
let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone()); | ||
let detected_gossip_message = DetectedGossipMessage { | ||
message: gossip_message, | ||
timestamp_seen: timestamp_seen as u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create the timestamp when we insert or we get a race condition - we have something sitting in the queue with a timestamp a second ago but we create a snapshot with now. I think we should instead at rely on the DB to add the timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue with letting postgres generate the timestamp is that the db's clock may not be in sync with the clock the rust server is running on (thinking of hosted environments here). I think moving the timestamp to the persistence job, and allowing for sub-second deltas, is the prudent way forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fine? As long as we do the filtering in SQL itself it'll work itself out.
timestamp_seen: timestamp_seen as u32, | ||
}; | ||
let sender = self.sender.clone(); | ||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than spawning here, and in order to block if we get too far ahead of the DB, we should first do a sender.try_send
and then, if that fails, do a tokio::block_on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that still applicable considering #1 (comment)? Also, keep in mind that large divergences are only possible during initial sync, where the snapshot process is only triggered after the persistence is caught up with the downloaded gossip.
src/snapshot.rs
Outdated
fs::remove_file(&symlink_path).unwrap(); | ||
} | ||
println!("Recreating symlink: {} -> {}", symlink_path, snapshot_path); | ||
symlink(&canonical_snapshot_path, &symlink_path).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these symlinks relative somehow? I want to be able to rsync the two folders around and have the symlinks still be valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they were originally relative, but that wasn't working on my machine. Would you mind experimenting here a little bit?
Will do. |
Things left, I think:
|
tokio::spawn(async move { | ||
disconnection_future.await; | ||
eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); | ||
monitor_peer_connection(current_peer.clone(), peer_manager_clone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will just silently stop trying to reconnect if the peer reboots and is refusing connections for a while, no? We should retry always.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we actually have a bigger issue. Without executor, I can't await monitor_peer_connection now that I've made it async, because it needs to recursively check type safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error[E0391]: cycle detected when unsafety-checking
tracking::monitor_peer_connection
--> src/tracking.rs:158:1
|
158 | async fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
note: ...which requires building MIR fortracking::monitor_peer_connection
...
--> src/tracking.rs:158:1
|
158 | async fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: ...which requires type-checkingtracking::monitor_peer_connection
...
--> src/tracking.rs:171:3
|
171 | tokio::spawn(async move {
| ^^^^^^^^^^^^
= note: ...which requires evaluating trait selection obligationfor<'r> {std::future::ResumeTy, impl futures::Future, (), &'r (bitcoin::secp256k1::PublicKey, std::net::SocketAddr), (bitcoin::secp256k1::PublicKey, std::net::SocketAddr), std::sync::Arc<lightning::ln::peer_handler::PeerManager<lightning_net_tokio::SocketDescriptor, std::sync::Arc<lightning::ln::peer_handler::ErroringMessageHandler>, std::sync::Arc<downloader::GossipRouter>, std::sync::Arc<types::TestLogger>, std::sync::Arc<lightning::ln::peer_handler::IgnoringMessageHandler>>>, impl futures::Future}: std::marker::Send
...
note: ...which requires computing type oftracking::monitor_peer_connection::{opaque#0}
...
--> src/tracking.rs:158:109
|
158 | async fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
| ^^^^
= note: ...which again requires unsafety-checkingtracking::monitor_peer_connection
, completing the cycle
note: cycle used when computing type of<impl at src/lib.rs:57:1: 132:2>::start_sync::{opaque#0}
--> src/lib.rs:86:33
|
86 | pub async fn start_sync(&self) {
| ^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Matt, I really need your help here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, also, rustc complains here now:
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: warning: unused implementer of `futures::Future` that must be used
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: --> src/tracking.rs:176:4
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: |
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: 176 | monitor_peer_connection(current_peer.clone(), peer_manager_clone);
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: |
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: = note: `#[warn(unused_must_use)]` on by default
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: = note: futures do nothing unless you `.await` or poll them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I was wondering why you weren't responding to this thread, where I had already asked about that like two hours ago: #1 (comment)
But turns out Github didn't upload my comments, sorry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, okay just leave this one and fix the other stuff, we'll land this and I'll do it as a followup pr
src/tracking.rs
Outdated
tokio::spawn(async move { | ||
disconnection_future.await; | ||
eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); | ||
// TODO: figure out how to await this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just have the failure case in the else Failed block retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice not to indefinitely keep retrying, especially on first connect. If the peer is no longer available, it should optimally crash, instructing the user to find better peers.
src/lookup.rs
Outdated
|
||
for current_announcement_row in announcement_rows { | ||
let blob: String = current_announcement_row.get("announcement_signed"); | ||
let data = hex_utils::to_vec(&blob).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we hex-encoding the blobs? Can't we just store them as blobs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
}; | ||
|
||
println!("Obtaining corresponding database entries"); | ||
// get all the channel announcements that are currently in the network graph |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we filtering this by "first seen after our target time"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because that would be incorrect. There can be announcements that were first seen before our target time, but whose first channel update, or even whose first channel update in a given direction, was seen after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a channel announcement to be returned if we're only giving users the channel update?
|
||
// here is where the channels whose first update in either direction occurred after | ||
// `last_seen_timestamp` are added to the selection | ||
let unannounced_rows = client.query("SELECT short_channel_id, blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused why we need two queries here at all? We should be able to do this all in a single query, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Above reason is why we can't.
Are my comments not getting sent or something? |
No, they were not... thanks GitHub |
No description provided.