-
Notifications
You must be signed in to change notification settings - Fork 4
/
handle.rs
120 lines (105 loc) · 3.86 KB
/
handle.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use super::{
lowlevel::{Handle, HandleOwned},
{Error, Id, WriteEnd, WriteEndWithCachedId},
};
use std::{
borrow::Cow,
future::Future,
ops::{Deref, DerefMut},
sync::Arc,
};
use derive_destructure2::destructure;
/// Remote Directory
#[derive(Debug, Clone, destructure)]
pub(super) struct OwnedHandle {
pub(super) write_end: WriteEndWithCachedId,
pub(super) handle: Arc<HandleOwned>,
}
impl Drop for OwnedHandle {
fn drop(&mut self) {
let write_end = &mut self.write_end;
let handle = &self.handle;
if Arc::strong_count(handle) == 1 {
// This is the last reference to the arc
let id = write_end.get_id_mut();
match write_end.send_close_request(id, Cow::Borrowed(handle)) {
Ok(response) => {
// Requests is already added to write buffer, so wakeup
// the `flush_task`.
self.get_auxiliary().wakeup_flush_task();
// Reasons for moving future out of the async block:
// 1. `response.wait()` is basically a no-op, which simply takes out the inner value of
// AwaitableStatus and wrap it with a corresponding AwaitableStatusFuture
// 2. `rustc` isn't very good at optimizing moves in the future, it often results in the
// size of the Future blows out, becomes double of its size.
// 3. the more states the Futures have, the harder it is to optimize and take advantage of the niche.
let future = response.wait();
self.get_auxiliary().tokio_handle().spawn(async move {
let _res = future.await;
#[cfg(feature = "tracing")]
match _res {
Ok(_) => tracing::debug!("close handle success"),
Err(err) => tracing::error!(?err, "failed to close handle"),
}
});
}
Err(_err) => {
#[cfg(feature = "tracing")]
tracing::error!(?_err, "failed to send close request");
}
}
}
}
}
impl OwnedHandle {
pub(super) fn new(write_end: WriteEndWithCachedId, handle: HandleOwned) -> Self {
Self {
write_end,
handle: Arc::new(handle),
}
}
pub(super) async fn send_request<Func, F, R>(&mut self, f: Func) -> Result<R, Error>
where
Func: FnOnce(&mut WriteEnd, Cow<'_, Handle>, Id) -> Result<F, Error> + Send,
F: Future<Output = Result<(Id, R), Error>> + Send + 'static,
R: Send,
{
let handle = &self.handle;
self.write_end
.send_request(|write_end, id| f(write_end, Cow::Borrowed(handle), id))
.await
}
/// Close the [`OwnedHandle`], send the close request
/// if this is the last reference.
///
/// # Cancel Safety
///
/// This function is cancel safe.
pub(super) async fn close(self) -> Result<(), Error> {
if Arc::strong_count(&self.handle) == 1 {
// This is the last reference to the arc
// Release resources without running `Drop::drop`
let (mut write_end, handle) = self.destructure();
write_end
.send_request(|write_end, id| {
Ok(write_end
.send_close_request(id, Cow::Borrowed(&handle))?
.wait())
})
.await
} else {
Ok(())
}
}
}
impl Deref for OwnedHandle {
type Target = WriteEndWithCachedId;
fn deref(&self) -> &Self::Target {
&self.write_end
}
}
impl DerefMut for OwnedHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.write_end
}
}