Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split set_auto_ping, set_auto_close issue #87

Open
bigvo opened this issue Sep 6, 2024 · 6 comments
Open

Split set_auto_ping, set_auto_close issue #87

bigvo opened this issue Sep 6, 2024 · 6 comments

Comments

@bigvo
Copy link

bigvo commented Sep 6, 2024

Splitted channels require explicitly set set_auto_ping, set_auto_close to false, otherwise frames are being captured without write, which creates wrong frames in reader channel, i.e. instead of OpCode::Ping, reader reads OpCode::Pong on server Ping.

let (mut ws, _) =
        fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?;
    ws.set_auto_pong(false);
    ws.set_auto_close(false);
    let (rx, tx) = ws.split(tokio::io::split);
    let rx = FragmentCollectorRead::new(rx);
    Ok((rx, tx))
@szguoxz
Copy link

szguoxz commented Dec 9, 2024

There is no set_auto_ping at all? I suppose it's a typo from you, but still, this is not clear, not sure what you are saying:

otherwise frames are being captured without write, which creates wrong frames in reader channel, i.e. instead of OpCode::Ping, reader reads OpCode::Pong on server Ping.

@bigvo
Copy link
Author

bigvo commented Dec 9, 2024

set_auto_ping in the comment is a typo, but in the code it's correctly written set_auto_pong:

let tls_stream = tls_connector.connect(domain, tcp_stream).await?;

    let req = create_request(url, &addr)?;

    let (mut ws, _) =
        fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?;
    ws.set_auto_pong(false);
    ws.set_auto_close(false);
    let (rx, tx) = ws.split(tokio::io::split);
    let rx = FragmentCollectorRead::new(rx);

If set_auto_pong is not explicitly set to false before splitting the channel rx channel reads OpCode::Pong (when ping is received), if set_auto_pong set to false rx channel reads OpCode::Ping on server pings, which is correct behavior.

I'd suggest you run this code and check how reader channel behaves on pings, also server doesn't receive auto pongs (if set_auto_pong is true) when channels are split (but this is not an issue, just OpCode::Close&OpCode::Ping frames handler has to be implemented by those who use split channels)

@szguoxz
Copy link

szguoxz commented Dec 9, 2024

If set_auto_pong is not explicitly set to false before splitting the channel rx channel reads OpCode::Pong (when ping is received),
Can you say this is a bug? Can't believe split is still an unstable feature, makes me nervous to use it in production.

I think the same behavior happens to tokio_tungstenite. I just observed my tokio_tungstenite server received pong while I am not sending out any ping, which is Odd.

Seems like this is a difficult issue.

A side question: Do you know how to send tx channel around, i.e., pass it as a parameter to another function or simply make it a member of another struct? I am asking because the actual type of tx is almost impossible to spit it out.

@bigvo
Copy link
Author

bigvo commented Dec 9, 2024

In my code it doesn't leave function which creates connection, because WS connection (rx, tx in our case) has to be re-created every time when connection is closed.

But communication with rx, tx is static and can be send anywhere, example:

pub struct WebSocketClient {
    url: String,
    reconnect_delay: Duration,
    pending_requests: Mutex<HashMap<Uuid, oneshot::Sender<WsResponse>>>
}

impl WebSocketClient {
    pub fn new(url: String, reconnect_delay: Duration) -> Self {
        let pending_requests = Mutex::new(HashMap::new());
        WebSocketClient {
            url,
            reconnect_delay,
            pending_requests
        }
    }

    pub async fn start_ws_api(&self,
                              req_rx: Receiver<(WsRequest, oneshot::Sender<WsResponse>)>,
                              req_tx: Sender<(WsRequest, oneshot::Sender<WsResponse>)>,
                              public_key: &str,
                              secret_key: &str) {
        loop {
            match connect_sender(&self.url).await {
                Ok((ws_rx, ws_tx)) => {
                    info!("WS API - successfully connected to {}", self.url);
                    let ws_tx = Arc::new(Mutex::new(ws_tx));

                    let reader_fut = self.run_api_reader(ws_rx, ws_tx.clone());
                    let writer_fut = self.run_api_writer(ws_tx.clone(), &req_rx);
                    let auth_fut = self.authenticate(&req_tx, public_key, secret_key);

                    let _ = tokio::try_join!(
                        reader_fut,
                        writer_fut,
                        auth_fut
                    );
                }
                Err(e) => {
                    warn!("WS API - {} - Connection failed: {}.Reconnecting...", self.url, e.to_string());
                    sleep(Duration::from_secs(5)).await;
                    continue;
                }
            }
        }
    }

In my code it's a loop which always re-creates rx, tx on any error or closed connection.

P.S. fastwebsockets produces some errors with incomplete frames but I didn't dive into debugging.

EDIT: It actually passed wrapped in atomic reference counter and mutex to internal function which handles communication.

@szguoxz
Copy link

szguoxz commented Dec 10, 2024

The problem I have here is:
I want to have a hashmap to save all the tx for future lookup. The map is static variable using lazy lock.
In order to declare this map variable, I need the each type to be explicitly spitted out.

Unfortunately, the type for TX is unspeakable. :-) You can see what it is, but you can write it out in your source code.

Your self.run_api_writer might work since you can use generic type variable. But it's not allowed for static.

@bigvo
Copy link
Author

bigvo commented Dec 10, 2024

WebSocketWrite passed as non-generic, but since WebSocketWrite is mutable it has to be wrapped in mutex and arc (if shared between threads)

   async fn run_api_writer(&self,
                            ws_tx: Arc<Mutex<WebSocketWrite<tokio::io::WriteHalf<TokioIo<Upgraded>>>>>,
                            request_receiver: &Receiver<(WsRequest, oneshot::Sender<WsResponse>)>,
    ) -> Result<(), GeneralError> {
        loop {
            let (req, res_tx) = request_receiver.recv_async().await?;
            self.pending_requests.lock().await.insert(req.id, res_tx);
            let serialized_req = serde_json::to_string(&req)?;
            let mut ws_tx_lock = ws_tx.lock().await;
            ws_tx_lock.write_frame(Frame::text(Payload::Owned(serialized_req.into_bytes()))).await?;
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants