Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ovfs): export VirtioFs struct #4983

Merged
merged 2 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions integrations/virtiofs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ version = "0.0.0"
[dependencies]
anyhow = { version = "1.0.86", features = ["std"] }
libc = "0.2.139"
log = "0.4.22"
opendal = { version = "0.48.0", path = "../../core" }
snafu = "0.8.3"
vhost = "0.11.0"
vhost-user-backend = "0.14.0"
Expand Down
12 changes: 8 additions & 4 deletions integrations/virtiofs/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::io::Write;
use std::mem::size_of;

use opendal::Operator;
use vm_memory::ByteValued;

use crate::error::*;
Expand All @@ -37,12 +38,15 @@ const MAX_BUFFER_SIZE: u32 = 1 << 20;

/// Filesystem is a filesystem implementation with opendal backend,
/// and will decode and process messages from VMs.
pub struct Filesystem {}
pub struct Filesystem {
// FIXME: #[allow(dead_code)] here should be removed in the future.
#[allow(dead_code)]
core: Operator,
}

#[allow(dead_code)]
impl Filesystem {
pub fn new() -> Filesystem {
Filesystem {}
pub fn new(core: Operator) -> Filesystem {
Filesystem { core }
}

pub fn handle_message(&self, mut r: Reader, w: Writer) -> Result<usize> {
Expand Down
2 changes: 2 additions & 0 deletions integrations/virtiofs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ mod filesystem;
mod filesystem_message;
mod virtiofs;
mod virtiofs_util;

pub use virtiofs::VirtioFs;
199 changes: 144 additions & 55 deletions integrations/virtiofs/src/virtiofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,41 @@
// under the License.

use std::io;
use std::sync::Arc;
use std::sync::RwLock;

use log::warn;
use opendal::Operator;
use vhost::vhost_user::message::VhostUserProtocolFeatures;
use vhost::vhost_user::message::VhostUserVirtioFeatures;
use vhost::vhost_user::Backend;
use vhost::vhost_user::Listener;
use vhost_user_backend::VhostUserBackend;
use vhost_user_backend::VhostUserDaemon;
use vhost_user_backend::VringMutex;
use vhost_user_backend::VringState;
use vhost_user_backend::VringT;
use virtio_bindings::bindings::virtio_config::VIRTIO_F_VERSION_1;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_INDIRECT_DESC;
use vm_memory::ByteValued;
use virtio_queue::DescriptorChain;
use virtio_queue::QueueOwnedT;
use vm_memory::GuestAddressSpace;
use vm_memory::GuestMemoryAtomic;
use vm_memory::GuestMemoryLoadGuard;
use vm_memory::GuestMemoryMmap;
use vm_memory::Le32;
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;

use crate::error::*;
use crate::filesystem::Filesystem;
use crate::virtiofs_util::Reader;
use crate::virtiofs_util::Writer;

/// Marks an event from the high priority queue.
const HIPRIO_QUEUE_EVENT: u16 = 0;
/// Marks an event from the request queue.
const REQ_QUEUE_EVENT: u16 = 1;
/// The maximum number of bytes in VirtioFsConfig tag field.
const MAX_TAG_LEN: usize = 36;
/// The maximum queue size supported.
const QUEUE_SIZE: usize = 32768;
/// The number of request queues supported.
Expand All @@ -53,25 +61,56 @@ const NUM_QUEUES: usize = REQUEST_QUEUES + 1;

/// VhostUserFsThread represents the actual worker process used to handle file system requests from VMs.
struct VhostUserFsThread {
core: Filesystem,
mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
vu_req: Option<Backend>,
event_idx: bool,
kill_event_fd: EventFd,
}

impl VhostUserFsThread {
fn new() -> Result<VhostUserFsThread> {
fn new(core: Filesystem) -> Result<VhostUserFsThread> {
let event_fd = EventFd::new(libc::EFD_NONBLOCK).map_err(|err| {
new_unexpected_error("failed to create kill eventfd", Some(err.into()))
})?;
Ok(VhostUserFsThread {
core,
mem: None,
vu_req: None,
event_idx: false,
kill_event_fd: event_fd,
})
}

/// This is used when the backend has processed a request and needs to notify the frontend.
fn return_descriptor(
vring_state: &mut VringState,
head_index: u16,
event_idx: bool,
len: usize,
) {
if vring_state.add_used(head_index, len as u32).is_err() {
warn!("Failed to add used to used queue.");
};
// Check if the used queue needs to be signaled.
if event_idx {
match vring_state.needs_notification() {
Ok(needs_notification) => {
if needs_notification && vring_state.signal_used_queue().is_err() {
warn!("Failed to signal used queue.");
}
}
Err(_) => {
if vring_state.signal_used_queue().is_err() {
warn!("Failed to signal used queue.");
};
}
}
} else if vring_state.signal_used_queue().is_err() {
warn!("Failed to signal used queue.");
}
}

/// Process filesystem requests one at a time in a serialized manner.
fn handle_event_serial(&self, device_event: u16, vrings: &[VringMutex]) -> Result<()> {
let mut vring_state = match device_event {
Expand All @@ -83,10 +122,16 @@ impl VhostUserFsThread {
// If EVENT_IDX is enabled, we could keep calling process_queue()
// until it stops finding new request on the queue.
loop {
vring_state.disable_notification().unwrap();
if vring_state.disable_notification().is_err() {
warn!("Failed to disable used queue notification.");
}
self.process_queue_serial(&mut vring_state)?;
if !vring_state.enable_notification().unwrap() {
break;
if let Ok(has_more) = vring_state.enable_notification() {
if !has_more {
break;
}
} else {
warn!("Failed to enable used queue notification.");
}
}
} else {
Expand All @@ -98,37 +143,61 @@ impl VhostUserFsThread {

/// Forwards filesystem messages to specific functions and
/// returns the filesystem request execution result.
fn process_queue_serial(&self, _vring_state: &mut VringState) -> Result<bool> {
unimplemented!()
fn process_queue_serial(&self, vring_state: &mut VringState) -> Result<bool> {
let mut used_any = false;
let mem = match &self.mem {
Some(m) => m.memory(),
None => return Err(new_unexpected_error("no memory configured", None)),
};
let avail_chains: Vec<DescriptorChain<GuestMemoryLoadGuard<GuestMemoryMmap>>> = vring_state
.get_queue_mut()
.iter(mem.clone())
.map_err(|_| new_unexpected_error("iterating through the queue failed", None))?
.collect();
for chain in avail_chains {
used_any = true;
let head_index = chain.head_index();
let reader = Reader::new(&mem, chain.clone())
.map_err(|_| new_unexpected_error("creating a queue reader failed", None))
.unwrap();
let writer = Writer::new(&mem, chain.clone())
.map_err(|_| new_unexpected_error("creating a queue writer failed", None))
.unwrap();
let len = self
.core
.handle_message(reader, writer)
.map_err(|_| new_unexpected_error("processing a queue request failed", None))
.unwrap();
VhostUserFsThread::return_descriptor(vring_state, head_index, self.event_idx, len);
}
Ok(used_any)
}
}

/// VhostUserFsBackend is a structure that implements the VhostUserBackend trait
/// and implements concrete services for the vhost user backend server.
pub struct VhostUserFsBackend {
tag: Option<String>,
struct VhostUserFsBackend {
thread: RwLock<VhostUserFsThread>,
}

#[allow(dead_code)]
impl VhostUserFsBackend {
pub fn new(tag: Option<String>) -> Result<VhostUserFsBackend> {
let thread = RwLock::new(VhostUserFsThread::new()?);
Ok(VhostUserFsBackend { thread, tag })
fn new(core: Filesystem) -> Result<VhostUserFsBackend> {
let thread = RwLock::new(VhostUserFsThread::new(core)?);
Ok(VhostUserFsBackend { thread })
}
}

/// VirtioFsConfig will be serialized and used as
/// the return value of get_config function in the VhostUserBackend trait.
#[repr(C)]
#[derive(Clone, Copy)]
struct VirtioFsConfig {
tag: [u8; MAX_TAG_LEN],
num_request_queues: Le32,
fn kill(&self) -> Result<()> {
self.thread
.read()
.unwrap()
.kill_event_fd
.write(1)
.map_err(|err| {
new_unexpected_error("failed to write to kill eventfd", Some(err.into()))
})
}
}

unsafe impl ByteValued for VirtioFsConfig {}

impl VhostUserBackend for VhostUserFsBackend {
type Bitmap = ();
type Vring = VringMutex;
Expand All @@ -155,45 +224,18 @@ impl VhostUserBackend for VhostUserFsBackend {
/// Get available vhost protocol features.
fn protocol_features(&self) -> VhostUserProtocolFeatures {
// Align to the virtiofsd's protocol features here.
let mut protocol_features = VhostUserProtocolFeatures::MQ
VhostUserProtocolFeatures::MQ
| VhostUserProtocolFeatures::BACKEND_REQ
| VhostUserProtocolFeatures::BACKEND_SEND_FD
| VhostUserProtocolFeatures::REPLY_ACK
| VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS;
if self.tag.is_some() {
protocol_features |= VhostUserProtocolFeatures::CONFIG;
}
protocol_features
| VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
}

/// Enable or disabled the virtio EVENT_IDX feature.
fn set_event_idx(&self, enabled: bool) {
self.thread.write().unwrap().event_idx = enabled;
}

/// Get virtio device configuration.
fn get_config(&self, offset: u32, size: u32) -> Vec<u8> {
let tag = self
.tag
.as_ref()
.expect("did not expect read of config if tag is not set.");
let mut fixed_len_tag = [0; MAX_TAG_LEN];
fixed_len_tag[0..tag.len()].copy_from_slice(tag.as_bytes());
let config = VirtioFsConfig {
tag: fixed_len_tag,
num_request_queues: Le32::from(REQUEST_QUEUES as u32),
};
let mut result: Vec<_> = config
.as_slice()
.iter()
.skip(offset as usize)
.take(size as usize)
.copied()
.collect();
result.resize(size as usize, 0);
result
}

/// Update guest memory regions.
fn update_memory(&self, mem: GuestMemoryAtomic<GuestMemoryMmap>) -> io::Result<()> {
self.thread.write().unwrap().mem = Some(mem);
Expand Down Expand Up @@ -238,3 +280,50 @@ impl VhostUserBackend for VhostUserFsBackend {
.map_err(|err| err.into())
}
}

/// VirtioFS is a structure that represents the virtiofs service.
/// It is used to run the virtiofs service with the given operator and socket path.
/// The operator is used to interact with the backend storage system.
/// The socket path is used to communicate with the QEMU and VMs.
pub struct VirtioFs {
socket_path: String,
filesystem_backend: Arc<VhostUserFsBackend>,
}

impl VirtioFs {
pub fn new(core: Operator, socket_path: &str) -> Result<VirtioFs> {
let filesystem_core = Filesystem::new(core);
let filesystem_backend = Arc::new(VhostUserFsBackend::new(filesystem_core).unwrap());
Ok(VirtioFs {
socket_path: socket_path.to_string(),
filesystem_backend,
})
}

// Run the virtiofs service.
pub fn run(&self) -> Result<()> {
let listener = Listener::new(&self.socket_path, true)
.map_err(|_| new_unexpected_error("failed to create listener", None))?;
let mut daemon = VhostUserDaemon::new(
String::from("virtiofs-backend"),
self.filesystem_backend.clone(),
GuestMemoryAtomic::new(GuestMemoryMmap::new()),
)
.unwrap();
if daemon.start(listener).is_err() {
return Err(new_unexpected_error("failed to start daemon", None));
}
if daemon.wait().is_err() {
return Err(new_unexpected_error("failed to wait daemon", None));
}
Ok(())
}

// Kill the virtiofs service.
pub fn kill(&self) -> Result<()> {
if self.filesystem_backend.kill().is_err() {
return Err(new_unexpected_error("failed to kill backend", None));
}
Ok(())
}
}
2 changes: 0 additions & 2 deletions integrations/virtiofs/src/virtiofs_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ pub struct Reader<'a, B = ()> {
buffer: DescriptorChainConsumer<'a, B>,
}

#[allow(dead_code)]
impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> {
pub fn new<M>(
mem: &'a GuestMemoryMmap<B>,
Expand Down Expand Up @@ -174,7 +173,6 @@ pub struct Writer<'a, B = ()> {
buffer: DescriptorChainConsumer<'a, B>,
}

#[allow(dead_code)]
impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> {
pub fn new<M>(
mem: &'a GuestMemoryMmap<B>,
Expand Down
Loading