-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split set_auto_ping, set_auto_close issue #87
Comments
There is no set_auto_ping at all? I suppose it's a typo from you, but still, this is not clear, not sure what you are saying: otherwise frames are being captured without write, which creates wrong frames in reader channel, i.e. instead of OpCode::Ping, reader reads OpCode::Pong on server Ping. |
set_auto_ping in the comment is a typo, but in the code it's correctly written set_auto_pong: let tls_stream = tls_connector.connect(domain, tcp_stream).await?;
let req = create_request(url, &addr)?;
let (mut ws, _) =
fastwebsockets::handshake::client(&SpawnExecutor, req, tls_stream).await?;
ws.set_auto_pong(false);
ws.set_auto_close(false);
let (rx, tx) = ws.split(tokio::io::split);
let rx = FragmentCollectorRead::new(rx); If set_auto_pong is not explicitly set to false before splitting the channel rx channel reads OpCode::Pong (when ping is received), if set_auto_pong set to false rx channel reads OpCode::Ping on server pings, which is correct behavior. I'd suggest you run this code and check how reader channel behaves on pings, also server doesn't receive auto pongs (if set_auto_pong is true) when channels are split (but this is not an issue, just OpCode::Close&OpCode::Ping frames handler has to be implemented by those who use split channels) |
I think the same behavior happens to tokio_tungstenite. I just observed my tokio_tungstenite server received pong while I am not sending out any ping, which is Odd. Seems like this is a difficult issue. A side question: Do you know how to send tx channel around, i.e., pass it as a parameter to another function or simply make it a member of another struct? I am asking because the actual type of tx is almost impossible to spit it out. |
In my code it doesn't leave function which creates connection, because WS connection (rx, tx in our case) has to be re-created every time when connection is closed. But communication with rx, tx is static and can be send anywhere, example: pub struct WebSocketClient {
url: String,
reconnect_delay: Duration,
pending_requests: Mutex<HashMap<Uuid, oneshot::Sender<WsResponse>>>
}
impl WebSocketClient {
pub fn new(url: String, reconnect_delay: Duration) -> Self {
let pending_requests = Mutex::new(HashMap::new());
WebSocketClient {
url,
reconnect_delay,
pending_requests
}
}
pub async fn start_ws_api(&self,
req_rx: Receiver<(WsRequest, oneshot::Sender<WsResponse>)>,
req_tx: Sender<(WsRequest, oneshot::Sender<WsResponse>)>,
public_key: &str,
secret_key: &str) {
loop {
match connect_sender(&self.url).await {
Ok((ws_rx, ws_tx)) => {
info!("WS API - successfully connected to {}", self.url);
let ws_tx = Arc::new(Mutex::new(ws_tx));
let reader_fut = self.run_api_reader(ws_rx, ws_tx.clone());
let writer_fut = self.run_api_writer(ws_tx.clone(), &req_rx);
let auth_fut = self.authenticate(&req_tx, public_key, secret_key);
let _ = tokio::try_join!(
reader_fut,
writer_fut,
auth_fut
);
}
Err(e) => {
warn!("WS API - {} - Connection failed: {}.Reconnecting...", self.url, e.to_string());
sleep(Duration::from_secs(5)).await;
continue;
}
}
}
} In my code it's a loop which always re-creates rx, tx on any error or closed connection. P.S. fastwebsockets produces some errors with incomplete frames but I didn't dive into debugging. EDIT: It actually passed wrapped in atomic reference counter and mutex to internal function which handles communication. |
The problem I have here is: Unfortunately, the type for TX is unspeakable. :-) You can see what it is, but you can write it out in your source code. Your self.run_api_writer might work since you can use generic type variable. But it's not allowed for static. |
WebSocketWrite passed as non-generic, but since WebSocketWrite is mutable it has to be wrapped in mutex and arc (if shared between threads) async fn run_api_writer(&self,
ws_tx: Arc<Mutex<WebSocketWrite<tokio::io::WriteHalf<TokioIo<Upgraded>>>>>,
request_receiver: &Receiver<(WsRequest, oneshot::Sender<WsResponse>)>,
) -> Result<(), GeneralError> {
loop {
let (req, res_tx) = request_receiver.recv_async().await?;
self.pending_requests.lock().await.insert(req.id, res_tx);
let serialized_req = serde_json::to_string(&req)?;
let mut ws_tx_lock = ws_tx.lock().await;
ws_tx_lock.write_frame(Frame::text(Payload::Owned(serialized_req.into_bytes()))).await?;
}
} |
Splitted channels require explicitly set set_auto_ping, set_auto_close to false, otherwise frames are being captured without write, which creates wrong frames in reader channel, i.e. instead of OpCode::Ping, reader reads OpCode::Pong on server Ping.
The text was updated successfully, but these errors were encountered: