diff --git a/integrations/virtiofs/Cargo.toml b/integrations/virtiofs/Cargo.toml index b7de43038bcf..d3eca2361b2c 100644 --- a/integrations/virtiofs/Cargo.toml +++ b/integrations/virtiofs/Cargo.toml @@ -30,6 +30,8 @@ version = "0.0.0" [dependencies] anyhow = { version = "1.0.86", features = ["std"] } libc = "0.2.139" +log = "0.4.22" +opendal = { version = "0.48.0", path = "../../core" } snafu = "0.8.3" vhost = "0.11.0" vhost-user-backend = "0.14.0" diff --git a/integrations/virtiofs/src/filesystem.rs b/integrations/virtiofs/src/filesystem.rs index ae3f1b8550c4..cdaedea6d029 100644 --- a/integrations/virtiofs/src/filesystem.rs +++ b/integrations/virtiofs/src/filesystem.rs @@ -18,6 +18,7 @@ use std::io::Write; use std::mem::size_of; +use opendal::Operator; use vm_memory::ByteValued; use crate::error::*; @@ -37,12 +38,15 @@ const MAX_BUFFER_SIZE: u32 = 1 << 20; /// Filesystem is a filesystem implementation with opendal backend, /// and will decode and process messages from VMs. -pub struct Filesystem {} +pub struct Filesystem { + // FIXME: #[allow(dead_code)] here should be removed in the future. + #[allow(dead_code)] + core: Operator, +} -#[allow(dead_code)] impl Filesystem { - pub fn new() -> Filesystem { - Filesystem {} + pub fn new(core: Operator) -> Filesystem { + Filesystem { core } } pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result { diff --git a/integrations/virtiofs/src/lib.rs b/integrations/virtiofs/src/lib.rs index a3de2697b2da..4abea18c87b5 100644 --- a/integrations/virtiofs/src/lib.rs +++ b/integrations/virtiofs/src/lib.rs @@ -20,3 +20,5 @@ mod filesystem; mod filesystem_message; mod virtiofs; mod virtiofs_util; + +pub use virtiofs::VirtioFs; diff --git a/integrations/virtiofs/src/virtiofs.rs b/integrations/virtiofs/src/virtiofs.rs index 56f0fbd39682..fbacaf1f672e 100644 --- a/integrations/virtiofs/src/virtiofs.rs +++ b/integrations/virtiofs/src/virtiofs.rs @@ -16,33 +16,41 @@ // under the License. use std::io; +use std::sync::Arc; use std::sync::RwLock; +use log::warn; +use opendal::Operator; use vhost::vhost_user::message::VhostUserProtocolFeatures; use vhost::vhost_user::message::VhostUserVirtioFeatures; use vhost::vhost_user::Backend; +use vhost::vhost_user::Listener; use vhost_user_backend::VhostUserBackend; +use vhost_user_backend::VhostUserDaemon; use vhost_user_backend::VringMutex; use vhost_user_backend::VringState; use vhost_user_backend::VringT; use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1; use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC; -use vm_memory::ByteValued; +use virtio_queue::DescriptorChain; +use virtio_queue::QueueOwnedT; +use vm_memory::GuestAddressSpace; use vm_memory::GuestMemoryAtomic; +use vm_memory::GuestMemoryLoadGuard; use vm_memory::GuestMemoryMmap; -use vm_memory::Le32; use vmm_sys_util::epoll::EventSet; use vmm_sys_util::eventfd::EventFd; use crate::error::*; +use crate::filesystem::Filesystem; +use crate::virtiofs_util::Reader; +use crate::virtiofs_util::Writer; /// Marks an event from the high priority queue. const HIPRIO_QUEUE_EVENT: u16 = 0; /// Marks an event from the request queue. const REQ_QUEUE_EVENT: u16 = 1; -/// The maximum number of bytes in VirtioFsConfig tag field. -const MAX_TAG_LEN: usize = 36; /// The maximum queue size supported. const QUEUE_SIZE: usize = 32768; /// The number of request queues supported. @@ -53,6 +61,7 @@ const NUM_QUEUES: usize = REQUEST_QUEUES + 1; /// VhostUserFsThread represents the actual worker process used to handle file system requests from VMs. struct VhostUserFsThread { + core: Filesystem, mem: Option>, vu_req: Option, event_idx: bool, @@ -60,11 +69,12 @@ struct VhostUserFsThread { } impl VhostUserFsThread { - fn new() -> Result { + fn new(core: Filesystem) -> Result { let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| { new_unexpected_error("failed to create kill eventfd", Some(err.into())) })?; Ok(VhostUserFsThread { + core, mem: None, vu_req: None, event_idx: false, @@ -72,6 +82,35 @@ impl VhostUserFsThread { }) } + /// This is used when the backend has processed a request and needs to notify the frontend. + fn return_descriptor( + vring_state: &mut VringState, + head_index: u16, + event_idx: bool, + len: usize, + ) { + if vring_state.add_used(head_index, len as u32).is_err() { + warn!("Failed to add used to used queue."); + }; + // Check if the used queue needs to be signaled. + if event_idx { + match vring_state.needs_notification() { + Ok(needs_notification) => { + if needs_notification && vring_state.signal_used_queue().is_err() { + warn!("Failed to signal used queue."); + } + } + Err(_) => { + if vring_state.signal_used_queue().is_err() { + warn!("Failed to signal used queue."); + }; + } + } + } else if vring_state.signal_used_queue().is_err() { + warn!("Failed to signal used queue."); + } + } + /// Process filesystem requests one at a time in a serialized manner. fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> Result<()> { let mut vring_state = match device_event { @@ -83,10 +122,16 @@ impl VhostUserFsThread { // If EVENT_IDX is enabled, we could keep calling process_queue() // until it stops finding new request on the queue. loop { - vring_state.disable_notification().unwrap(); + if vring_state.disable_notification().is_err() { + warn!("Failed to disable used queue notification."); + } self.process_queue_serial(&mut vring_state)?; - if !vring_state.enable_notification().unwrap() { - break; + if let Ok(has_more) = vring_state.enable_notification() { + if !has_more { + break; + } + } else { + warn!("Failed to enable used queue notification."); } } } else { @@ -98,37 +143,61 @@ impl VhostUserFsThread { /// Forwards filesystem messages to specific functions and /// returns the filesystem request execution result. - fn process_queue_serial(&self, _vring_state: &mut VringState) -> Result { - unimplemented!() + fn process_queue_serial(&self, vring_state: &mut VringState) -> Result { + let mut used_any = false; + let mem = match &self.mem { + Some(m) => m.memory(), + None => return Err(new_unexpected_error("no memory configured", None)), + }; + let avail_chains: Vec>> = vring_state + .get_queue_mut() + .iter(mem.clone()) + .map_err(|_| new_unexpected_error("iterating through the queue failed", None))? + .collect(); + for chain in avail_chains { + used_any = true; + let head_index = chain.head_index(); + let reader = Reader::new(&mem, chain.clone()) + .map_err(|_| new_unexpected_error("creating a queue reader failed", None)) + .unwrap(); + let writer = Writer::new(&mem, chain.clone()) + .map_err(|_| new_unexpected_error("creating a queue writer failed", None)) + .unwrap(); + let len = self + .core + .handle_message(reader, writer) + .map_err(|_| new_unexpected_error("processing a queue request failed", None)) + .unwrap(); + VhostUserFsThread::return_descriptor(vring_state, head_index, self.event_idx, len); + } + Ok(used_any) } } /// VhostUserFsBackend is a structure that implements the VhostUserBackend trait /// and implements concrete services for the vhost user backend server. -pub struct VhostUserFsBackend { - tag: Option, +struct VhostUserFsBackend { thread: RwLock, } -#[allow(dead_code)] impl VhostUserFsBackend { - pub fn new(tag: Option) -> Result { - let thread = RwLock::new(VhostUserFsThread::new()?); - Ok(VhostUserFsBackend { thread, tag }) + fn new(core: Filesystem) -> Result { + let thread = RwLock::new(VhostUserFsThread::new(core)?); + Ok(VhostUserFsBackend { thread }) } -} -/// VirtioFsConfig will be serialized and used as -/// the return value of get_config function in the VhostUserBackend trait. -#[repr(C)] -#[derive(Clone, Copy)] -struct VirtioFsConfig { - tag: [u8; MAX_TAG_LEN], - num_request_queues: Le32, + fn kill(&self) -> Result<()> { + self.thread + .read() + .unwrap() + .kill_event_fd + .write(1) + .map_err(|err| { + new_unexpected_error("failed to write to kill eventfd", Some(err.into())) + }) + } } -unsafe impl ByteValued for VirtioFsConfig {} - impl VhostUserBackend for VhostUserFsBackend { type Bitmap = (); type Vring = VringMutex; @@ -155,15 +224,11 @@ impl VhostUserBackend for VhostUserFsBackend { /// Get available vhost protocol features. fn protocol_features(&self) -> VhostUserProtocolFeatures { // Align to the virtiofsd's protocol features here. - let mut protocol_features = VhostUserProtocolFeatures::MQ + VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::BACKEND_REQ | VhostUserProtocolFeatures::BACKEND_SEND_FD | VhostUserProtocolFeatures::REPLY_ACK - | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS; - if self.tag.is_some() { - protocol_features |= VhostUserProtocolFeatures::CONFIG; - } - protocol_features + | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS } /// Enable or disabled the virtio EVENT_IDX feature. @@ -171,29 +236,6 @@ impl VhostUserBackend for VhostUserFsBackend { self.thread.write().unwrap().event_idx = enabled; } - /// Get virtio device configuration. - fn get_config(&self, offset: u32, size: u32) -> Vec { - let tag = self - .tag - .as_ref() - .expect("did not expect read of config if tag is not set."); - let mut fixed_len_tag = [0; MAX_TAG_LEN]; - fixed_len_tag[0..tag.len()].copy_from_slice(tag.as_bytes()); - let config = VirtioFsConfig { - tag: fixed_len_tag, - num_request_queues: Le32::from(REQUEST_QUEUES as u32), - }; - let mut result: Vec<_> = config - .as_slice() - .iter() - .skip(offset as usize) - .take(size as usize) - .copied() - .collect(); - result.resize(size as usize, 0); - result - } - /// Update guest memory regions. fn update_memory(&self, mem: GuestMemoryAtomic) -> io::Result<()> { self.thread.write().unwrap().mem = Some(mem); @@ -238,3 +280,50 @@ impl VhostUserBackend for VhostUserFsBackend { .map_err(|err| err.into()) } } + +/// VirtioFS is a structure that represents the virtiofs service. +/// It is used to run the virtiofs service with the given operator and socket path. +/// The operator is used to interact with the backend storage system. +/// The socket path is used to communicate with the QEMU and VMs. +pub struct VirtioFs { + socket_path: String, + filesystem_backend: Arc, +} + +impl VirtioFs { + pub fn new(core: Operator, socket_path: &str) -> Result { + let filesystem_core = Filesystem::new(core); + let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap()); + Ok(VirtioFs { + socket_path: socket_path.to_string(), + filesystem_backend, + }) + } + + // Run the virtiofs service. + pub fn run(&self) -> Result<()> { + let listener = Listener::new(&self.socket_path, true) + .map_err(|_| new_unexpected_error("failed to create listener", None))?; + let mut daemon = VhostUserDaemon::new( + String::from("virtiofs-backend"), + self.filesystem_backend.clone(), + GuestMemoryAtomic::new(GuestMemoryMmap::new()), + ) + .unwrap(); + if daemon.start(listener).is_err() { + return Err(new_unexpected_error("failed to start daemon", None)); + } + if daemon.wait().is_err() { + return Err(new_unexpected_error("failed to wait daemon", None)); + } + Ok(()) + } + + // Kill the virtiofs service. + pub fn kill(&self) -> Result<()> { + if self.filesystem_backend.kill().is_err() { + return Err(new_unexpected_error("failed to kill backend", None)); + } + Ok(()) + } +} diff --git a/integrations/virtiofs/src/virtiofs_util.rs b/integrations/virtiofs/src/virtiofs_util.rs index f797c8dacb4a..bcbdf6ea918b 100644 --- a/integrations/virtiofs/src/virtiofs_util.rs +++ b/integrations/virtiofs/src/virtiofs_util.rs @@ -96,7 +96,6 @@ pub struct Reader<'a, B = ()> { buffer: DescriptorChainConsumer<'a, B>, } -#[allow(dead_code)] impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> { pub fn new( mem: &'a GuestMemoryMmap, @@ -174,7 +173,6 @@ pub struct Writer<'a, B = ()> { buffer: DescriptorChainConsumer<'a, B>, } -#[allow(dead_code)] impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> { pub fn new( mem: &'a GuestMemoryMmap,