Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial server commit. #1

Closed
wants to merge 90 commits into from
Closed

Initial server commit. #1

wants to merge 90 commits into from

Conversation

arik-so
Copy link
Contributor

@arik-so arik-so commented May 20, 2022

No description provided.

Arik Sosman and others added 30 commits January 11, 2022 20:09
…es, efficacy measurement and response length visibility
* add channel update delta tallies, and omit updates that have no delta

* Update the server full gossip data once the downloader is caught up with the gossip.

* Cache the warp response type directly.
…hether intermediate updates should be included, and preparing an easily traversable serialization set.
� Conflicts:
�	src/download.rs
�	src/main.rs
�	src/server.rs
Copy link
Contributor

@TheBlueMatt TheBlueMatt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more notes about being robust against a slow db

impl GossipPersister {
pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
let (gossip_persistence_sender, gossip_persistence_receiver) =
mpsc::channel::<DetectedGossipMessage>(10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably not be so huge - if the DB gets behind we'll end up creating snapshots that are missing db entries that are still in the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't actually. The snapshot generation is triggered by the persistence tasks being completed, not the gossip getting caught up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't the snapshot generation based on an entry in the queue? There can still be updates in the queue after that entry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but there will always be updates in the queue after that entry because you're gonna keep receiving new gossip. The criterion is "sufficiently caught up," but considering that snapshot generation takes a couple minutes to calculate, the odds of not receiving any gossip during that time are diminishingly low. We can't stop and restart snapshot generation every time we receive a new gossip message, right? We'd never produce any snapshots.

let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
let detected_gossip_message = DetectedGossipMessage {
message: gossip_message,
timestamp_seen: timestamp_seen as u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create the timestamp when we insert or we get a race condition - we have something sitting in the queue with a timestamp a second ago but we create a snapshot with now. I think we should instead at rely on the DB to add the timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with letting postgres generate the timestamp is that the db's clock may not be in sync with the clock the rust server is running on (thinking of hosted environments here). I think moving the timestamp to the persistence job, and allowing for sub-second deltas, is the prudent way forward.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine? As long as we do the filtering in SQL itself it'll work itself out.

timestamp_seen: timestamp_seen as u32,
};
let sender = self.sender.clone();
tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than spawning here, and in order to block if we get too far ahead of the DB, we should first do a sender.try_send and then, if that fails, do a tokio::block_on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that still applicable considering #1 (comment)? Also, keep in mind that large divergences are only possible during initial sync, where the snapshot process is only triggered after the persistence is caught up with the downloaded gossip.

src/snapshot.rs Outdated
fs::remove_file(&symlink_path).unwrap();
}
println!("Recreating symlink: {} -> {}", symlink_path, snapshot_path);
symlink(&canonical_snapshot_path, &symlink_path).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these symlinks relative somehow? I want to be able to rsync the two folders around and have the symlinks still be valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they were originally relative, but that wasn't working on my machine. Would you mind experimenting here a little bit?

@arik-so
Copy link
Contributor Author

arik-so commented Aug 21, 2022

#1 (comment)

Will do.

@TheBlueMatt
Copy link
Contributor

TheBlueMatt commented Aug 21, 2022

Things left, I think:

  • Initial server commit. #1 (comment)
  • blocking when db gets behind
  • moving dates into SQL instead of doing them locally
  • CI (incl MSRV)
  • relative symlinks (and always generating, say, 10k symlinks?)
  • peer reconnection
  • better detection of "we're synced" on multiple peers

tokio::spawn(async move {
disconnection_future.await;
eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
monitor_peer_connection(current_peer.clone(), peer_manager_clone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will just silently stop trying to reconnect if the peer reboots and is refusing connections for a while, no? We should retry always.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we actually have a bigger issue. Without executor, I can't await monitor_peer_connection now that I've made it async, because it needs to recursively check type safety.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error[E0391]: cycle detected when unsafety-checking tracking::monitor_peer_connection
--> src/tracking.rs:158:1
|
158 | async fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
note: ...which requires building MIR for tracking::monitor_peer_connection...
--> src/tracking.rs:158:1
|
158 | async fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: ...which requires type-checking tracking::monitor_peer_connection...
--> src/tracking.rs:171:3
|
171 | tokio::spawn(async move {
| ^^^^^^^^^^^^
= note: ...which requires evaluating trait selection obligation for<'r> {std::future::ResumeTy, impl futures::Future, (), &'r (bitcoin::secp256k1::PublicKey, std::net::SocketAddr), (bitcoin::secp256k1::PublicKey, std::net::SocketAddr), std::sync::Arc<lightning::ln::peer_handler::PeerManager<lightning_net_tokio::SocketDescriptor, std::sync::Arc<lightning::ln::peer_handler::ErroringMessageHandler>, std::sync::Arc<downloader::GossipRouter>, std::sync::Arc<types::TestLogger>, std::sync::Arc<lightning::ln::peer_handler::IgnoringMessageHandler>>>, impl futures::Future}: std::marker::Send...
note: ...which requires computing type of tracking::monitor_peer_connection::{opaque#0}...
--> src/tracking.rs:158:109
|
158 | async fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
| ^^^^
= note: ...which again requires unsafety-checking tracking::monitor_peer_connection, completing the cycle
note: cycle used when computing type of <impl at src/lib.rs:57:1: 132:2>::start_sync::{opaque#0}
--> src/lib.rs:86:33
|
86 | pub async fn start_sync(&self) {
| ^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matt, I really need your help here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, also, rustc complains here now:

Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: warning: unused implementer of `futures::Future` that must be used
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]:    --> src/tracking.rs:176:4
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]:     |
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]: 176 |             monitor_peer_connection(current_peer.clone(), peer_manager_clone);
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]:     |             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]:     |
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]:     = note: `#[warn(unused_must_use)]` on by default
Aug 21 23:54:41 ldk-gossip-sync-server.bitcoin.ninja run.sh[2348]:     = note: futures do nothing unless you `.await` or poll them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I was wondering why you weren't responding to this thread, where I had already asked about that like two hours ago: #1 (comment)

But turns out Github didn't upload my comments, sorry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, okay just leave this one and fix the other stuff, we'll land this and I'll do it as a followup pr

src/tracking.rs Outdated
tokio::spawn(async move {
disconnection_future.await;
eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
// TODO: figure out how to await this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just have the failure case in the else Failed block retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice not to indefinitely keep retrying, especially on first connect. If the peer is no longer available, it should optimally crash, instructing the user to find better peers.

src/lookup.rs Outdated

for current_announcement_row in announcement_rows {
let blob: String = current_announcement_row.get("announcement_signed");
let data = hex_utils::to_vec(&blob).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we hex-encoding the blobs? Can't we just store them as blobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

};

println!("Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we filtering this by "first seen after our target time"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that would be incorrect. There can be announcements that were first seen before our target time, but whose first channel update, or even whose first channel update in a given direction, was seen after.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a channel announcement to be returned if we're only giving users the channel update?


// here is where the channels whose first update in either direction occurred after
// `last_seen_timestamp` are added to the selection
let unannounced_rows = client.query("SELECT short_channel_id, blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused why we need two queries here at all? We should be able to do this all in a single query, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above reason is why we can't.

@arik-so
Copy link
Contributor Author

arik-so commented Aug 22, 2022

Are my comments not getting sent or something?

@TheBlueMatt
Copy link
Contributor

TheBlueMatt commented Aug 22, 2022

No, they were not... thanks GitHub

@arik-so arik-so deleted the server_addition branch August 22, 2022 02:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants