From ab2673296dc3bc6818936e7012919509dfc40e44 Mon Sep 17 00:00:00 2001 From: Matthew Ahrens Date: Fri, 24 Sep 2021 19:51:21 -0700 Subject: [PATCH] cleanup async (#459) --- .../zettaobject/src/kernel_connection.rs | 20 ++++---- cmd/zfs_object_agent/zettaobject/src/pool.rs | 51 +++++++++---------- 2 files changed, 33 insertions(+), 38 deletions(-) diff --git a/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs b/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs index 094a2e52a5f2..b8a3c398dda4 100644 --- a/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs +++ b/cmd/zfs_object_agent/zettaobject/src/kernel_connection.rs @@ -251,17 +251,17 @@ impl KernelConnectionState { .as_ref() .ok_or_else(|| anyhow!("no pool open"))? .clone(); - // Need to write_block() before spawning, so that the Pool knows what's been written before resume_complete() - let fut = pool.write_block(block, slice.to_vec()); + // XXX copying data + let vec = slice.to_vec(); Ok(Box::pin(async move { - fut.await; - let mut nvl = NvList::new_unique_names(); - nvl.insert("Type", "write done").unwrap(); - nvl.insert("block", &block.0).unwrap(); - nvl.insert("request_id", &request_id).unwrap(); - nvl.insert("token", &token).unwrap(); - trace!("sending response: {:?}", nvl); - Ok(Some(nvl)) + pool.write_block(block, vec).await; + let mut response = NvList::new_unique_names(); + response.insert("Type", "write done").unwrap(); + response.insert("block", &block.0).unwrap(); + response.insert("request_id", &request_id).unwrap(); + response.insert("token", &token).unwrap(); + trace!("sending response: {:?}", response); + Ok(Some(response)) })) } diff --git a/cmd/zfs_object_agent/zettaobject/src/pool.rs b/cmd/zfs_object_agent/zettaobject/src/pool.rs index bc93b29431b8..f70383c66f48 100644 --- a/cmd/zfs_object_agent/zettaobject/src/pool.rs +++ b/cmd/zfs_object_agent/zettaobject/src/pool.rs @@ -1441,7 +1441,7 @@ impl Pool { Self::check_pending_flushes(state, syncing_state); } - pub fn write_block(&self, block: BlockId, data: Vec) -> impl Future { + pub async fn write_block(&self, block: BlockId, data: Vec) { let data2 = data.clone(); // XXX copying let receiver = self.state.with_syncing_state(|syncing_state| { // XXX change to return error @@ -1463,39 +1463,34 @@ impl Pool { receiver }); let guid = self.state.shared_state.guid; - // XXX Cloning the zettacache has to clone several Arc's; maybe should - // have one that covers all the members? Or find a way to use the - // Pool's zettacache? let cache = match *WRITES_INGEST_TO_ZETTACACHE { - true => self.state.zettacache.as_ref().cloned(), + true => self.state.zettacache.as_ref(), false => None, }; - async move { - if let Some(cache) = cache { - match cache.lookup(guid, block).await { - LookupResponse::Present(_) => { - // Surprisingly, the BlockId may be in the cache even - // when writing a "new" block, if the system crashed or - // the pool rewound, causing a BlockId that was already - // persisted to the cache to be reused. - // - // XXX Ideally we would force-evict it and then insert - // again. For now, we ignore the insertion request. - // Subsequent lookups will return the wrong data, and we - // rely on the checksum in the blkptr_t to catch it. - // (Lookups without a preceeding insertion will also - // return the wrong data, so this is no worse.) - trace!( - "writing block already in zettacache: {:?} {:?}", - guid, - block - ); - } - LookupResponse::Absent(key) => cache.insert(key, data2).await, + if let Some(cache) = cache { + match cache.lookup(guid, block).await { + LookupResponse::Present(_) => { + // Surprisingly, the BlockId may be in the cache even + // when writing a "new" block, if the system crashed or + // the pool rewound, causing a BlockId that was already + // persisted to the cache to be reused. + // + // XXX Ideally we would force-evict it and then insert + // again. For now, we ignore the insertion request. + // Subsequent lookups will return the wrong data, and we + // rely on the checksum in the blkptr_t to catch it. + // (Lookups without a preceeding insertion will also + // return the wrong data, so this is no worse.) + trace!( + "writing block already in zettacache: {:?} {:?}", + guid, + block + ); } + LookupResponse::Absent(key) => cache.insert(key, data2).await, } - receiver.await.unwrap(); } + receiver.await.unwrap(); } async fn read_block_impl(&self, block: BlockId, bypass_cache: bool) -> Vec {