blob: c2676cbd707c2481ad6a5c9700f5ad0b3c2ad1f9 [file] [log] [blame]
// Copyright 2019 Intel Corporation. All Rights Reserved.
// Copyright 2019-2021 Alibaba Cloud. All rights reserved.
//
// SPDX-License-Identifier: Apache-2.0
use std::error;
use std::fs::File;
use std::io;
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use std::thread;
use vhost::vhost_user::message::{
VhostUserConfigFlags, VhostUserMemoryRegion, VhostUserProtocolFeatures,
VhostUserSingleMemoryRegion, VhostUserVirtioFeatures, VhostUserVringAddrFlags,
VhostUserVringState,
};
use vhost::vhost_user::{
Backend, Error as VhostUserError, Result as VhostUserResult, VhostUserBackendReqHandlerMut,
};
use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
use virtio_queue::{Error as VirtQueError, QueueT};
use vm_memory::mmap::NewBitmap;
use vm_memory::{GuestAddress, GuestAddressSpace, GuestMemoryMmap, GuestRegionMmap};
use vmm_sys_util::epoll::EventSet;
use super::backend::VhostUserBackend;
use super::event_loop::VringEpollHandler;
use super::event_loop::{VringEpollError, VringEpollResult};
use super::vring::VringT;
use super::GM;
// vhost in the kernel usually supports 509 mem slots.
// The 509 used to be the KVM limit, it supported 512, but 3 were used
// for internal purposes (nowadays, it supports more than that).
const MAX_MEM_SLOTS: u64 = 509;
#[derive(Debug)]
/// Errors related to vhost-user handler.
pub enum VhostUserHandlerError {
/// Failed to create a `Vring`.
CreateVring(VirtQueError),
/// Failed to create vring worker.
CreateEpollHandler(VringEpollError),
/// Failed to spawn vring worker.
SpawnVringWorker(io::Error),
/// Could not find the mapping from memory regions.
MissingMemoryMapping,
}
impl std::fmt::Display for VhostUserHandlerError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
VhostUserHandlerError::CreateVring(e) => {
write!(f, "failed to create vring: {}", e)
}
VhostUserHandlerError::CreateEpollHandler(e) => {
write!(f, "failed to create vring epoll handler: {}", e)
}
VhostUserHandlerError::SpawnVringWorker(e) => {
write!(f, "failed spawning the vring worker: {}", e)
}
VhostUserHandlerError::MissingMemoryMapping => write!(f, "Missing memory mapping"),
}
}
}
impl error::Error for VhostUserHandlerError {}
/// Result of vhost-user handler operations.
pub type VhostUserHandlerResult<T> = std::result::Result<T, VhostUserHandlerError>;
struct AddrMapping {
vmm_addr: u64,
size: u64,
gpa_base: u64,
}
pub struct VhostUserHandler<T: VhostUserBackend> {
backend: T,
handlers: Vec<Arc<VringEpollHandler<T>>>,
owned: bool,
features_acked: bool,
acked_features: u64,
acked_protocol_features: u64,
num_queues: usize,
max_queue_size: usize,
queues_per_thread: Vec<u64>,
mappings: Vec<AddrMapping>,
atomic_mem: GM<T::Bitmap>,
vrings: Vec<T::Vring>,
worker_threads: Vec<thread::JoinHandle<VringEpollResult<()>>>,
}
// Ensure VhostUserHandler: Clone + Send + Sync + 'static.
impl<T> VhostUserHandler<T>
where
T: VhostUserBackend + Clone + 'static,
T::Vring: Clone + Send + Sync + 'static,
T::Bitmap: Clone + Send + Sync + 'static,
{
pub(crate) fn new(backend: T, atomic_mem: GM<T::Bitmap>) -> VhostUserHandlerResult<Self> {
let num_queues = backend.num_queues();
let max_queue_size = backend.max_queue_size();
let queues_per_thread = backend.queues_per_thread();
let mut vrings = Vec::new();
for _ in 0..num_queues {
let vring = T::Vring::new(atomic_mem.clone(), max_queue_size as u16)
.map_err(VhostUserHandlerError::CreateVring)?;
vrings.push(vring);
}
let mut handlers = Vec::new();
let mut worker_threads = Vec::new();
for (thread_id, queues_mask) in queues_per_thread.iter().enumerate() {
let mut thread_vrings = Vec::new();
for (index, vring) in vrings.iter().enumerate() {
if (queues_mask >> index) & 1u64 == 1u64 {
thread_vrings.push(vring.clone());
}
}
let handler = Arc::new(
VringEpollHandler::new(backend.clone(), thread_vrings, thread_id)
.map_err(VhostUserHandlerError::CreateEpollHandler)?,
);
let handler2 = handler.clone();
let worker_thread = thread::Builder::new()
.name("vring_worker".to_string())
.spawn(move || handler2.run())
.map_err(VhostUserHandlerError::SpawnVringWorker)?;
handlers.push(handler);
worker_threads.push(worker_thread);
}
Ok(VhostUserHandler {
backend,
handlers,
owned: false,
features_acked: false,
acked_features: 0,
acked_protocol_features: 0,
num_queues,
max_queue_size,
queues_per_thread,
mappings: Vec::new(),
atomic_mem,
vrings,
worker_threads,
})
}
}
impl<T: VhostUserBackend> VhostUserHandler<T> {
pub(crate) fn send_exit_event(&self) {
for handler in self.handlers.iter() {
handler.send_exit_event();
}
}
fn vmm_va_to_gpa(&self, vmm_va: u64) -> VhostUserHandlerResult<u64> {
for mapping in self.mappings.iter() {
if vmm_va >= mapping.vmm_addr && vmm_va < mapping.vmm_addr + mapping.size {
return Ok(vmm_va - mapping.vmm_addr + mapping.gpa_base);
}
}
Err(VhostUserHandlerError::MissingMemoryMapping)
}
}
impl<T> VhostUserHandler<T>
where
T: VhostUserBackend,
{
pub(crate) fn get_epoll_handlers(&self) -> Vec<Arc<VringEpollHandler<T>>> {
self.handlers.clone()
}
fn vring_needs_init(&self, vring: &T::Vring) -> bool {
let vring_state = vring.get_ref();
// If the vring wasn't initialized and we already have an EventFd for
// VRING_KICK, initialize it now.
!vring_state.get_queue().ready() && vring_state.get_kick().is_some()
}
fn initialize_vring(&self, vring: &T::Vring, index: u8) -> VhostUserResult<()> {
assert!(vring.get_ref().get_kick().is_some());
if let Some(fd) = vring.get_ref().get_kick() {
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.handlers[thread_index]
.register_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
vring.set_enabled(false);
vring.set_queue_ready(true);
Ok(())
}
/// Helper to check if VirtioFeature enabled
fn check_feature(&self, feat: VhostUserVirtioFeatures) -> VhostUserResult<()> {
if self.acked_features & feat.bits() != 0 {
Ok(())
} else {
Err(VhostUserError::InactiveFeature(feat))
}
}
}
impl<T: VhostUserBackend> VhostUserBackendReqHandlerMut for VhostUserHandler<T>
where
T::Bitmap: NewBitmap + Clone,
{
fn set_owner(&mut self) -> VhostUserResult<()> {
if self.owned {
return Err(VhostUserError::InvalidOperation("already claimed"));
}
self.owned = true;
Ok(())
}
fn reset_owner(&mut self) -> VhostUserResult<()> {
self.owned = false;
self.features_acked = false;
self.acked_features = 0;
self.acked_protocol_features = 0;
Ok(())
}
fn get_features(&mut self) -> VhostUserResult<u64> {
Ok(self.backend.features())
}
fn set_features(&mut self, features: u64) -> VhostUserResult<()> {
if (features & !self.backend.features()) != 0 {
return Err(VhostUserError::InvalidParam);
}
self.acked_features = features;
self.features_acked = true;
// Upon receiving a `VHOST_USER_SET_FEATURES` message from the front-end without
// `VHOST_USER_F_PROTOCOL_FEATURES` set, the back-end must enable all rings immediately.
// While processing the rings (whether they are enabled or not), the back-end must support
// changing some configuration aspects on the fly.
// (see https://qemu-project.gitlab.io/qemu/interop/vhost-user.html#ring-states)
//
// Note: If `VHOST_USER_F_PROTOCOL_FEATURES` has been negotiated we must leave
// the vrings in their current state.
if self.acked_features & VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() == 0 {
for vring in self.vrings.iter_mut() {
vring.set_enabled(true);
}
}
self.backend.acked_features(self.acked_features);
Ok(())
}
fn set_mem_table(
&mut self,
ctx: &[VhostUserMemoryRegion],
files: Vec<File>,
) -> VhostUserResult<()> {
// We need to create tuple of ranges from the list of VhostUserMemoryRegion
// that we get from the caller.
let mut regions = Vec::new();
let mut mappings: Vec<AddrMapping> = Vec::new();
for (region, file) in ctx.iter().zip(files) {
regions.push(
GuestRegionMmap::new(
region.mmap_region(file)?,
GuestAddress(region.guest_phys_addr),
)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?,
);
mappings.push(AddrMapping {
vmm_addr: region.user_addr,
size: region.memory_size,
gpa_base: region.guest_phys_addr,
});
}
let mem = GuestMemoryMmap::from_regions(regions).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
// Updating the inner GuestMemory object here will cause all our vrings to
// see the new one the next time they call to `atomic_mem.memory()`.
self.atomic_mem.lock().unwrap().replace(mem);
self.backend
.update_memory(self.atomic_mem.clone())
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.mappings = mappings;
Ok(())
}
fn set_vring_num(&mut self, index: u32, num: u32) -> VhostUserResult<()> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
if num == 0 || num as usize > self.max_queue_size {
return Err(VhostUserError::InvalidParam);
}
vring.set_queue_size(num as u16);
Ok(())
}
fn set_vring_addr(
&mut self,
index: u32,
_flags: VhostUserVringAddrFlags,
descriptor: u64,
used: u64,
available: u64,
_log: u64,
) -> VhostUserResult<()> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
if !self.mappings.is_empty() {
let desc_table = self.vmm_va_to_gpa(descriptor).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
let avail_ring = self.vmm_va_to_gpa(available).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
let used_ring = self.vmm_va_to_gpa(used).map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
vring
.set_queue_info(desc_table, avail_ring, used_ring)
.map_err(|_| VhostUserError::InvalidParam)?;
// SET_VRING_BASE will only restore the 'avail' index, however, after the guest driver
// changes, for instance, after reboot, the 'used' index should be reset to 0.
//
// So let's fetch the used index from the vring as set by the guest here to keep
// compatibility with the QEMU's vhost-user library just in case, any implementation
// expects the 'used' index to be set when receiving a SET_VRING_ADDR message.
//
// Note: I'm not sure why QEMU's vhost-user library sets the 'user' index here,
// _probably_ to make sure that the VQ is already configured. A better solution would
// be to receive the 'used' index in SET_VRING_BASE, as is done when using packed VQs.
let idx = vring
.queue_used_idx()
.map_err(|_| VhostUserError::BackendInternalError)?;
vring.set_queue_next_used(idx);
Ok(())
} else {
Err(VhostUserError::InvalidParam)
}
}
fn set_vring_base(&mut self, index: u32, base: u32) -> VhostUserResult<()> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0;
vring.set_queue_next_avail(base as u16);
vring.set_queue_event_idx(event_idx);
self.backend.set_event_idx(event_idx);
Ok(())
}
fn get_vring_base(&mut self, index: u32) -> VhostUserResult<VhostUserVringState> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
// Quote from vhost-user specification:
// Client must start ring upon receiving a kick (that is, detecting
// that file descriptor is readable) on the descriptor specified by
// VHOST_USER_SET_VRING_KICK, and stop ring upon receiving
// VHOST_USER_GET_VRING_BASE.
vring.set_queue_ready(false);
if let Some(fd) = vring.get_ref().get_kick() {
for (thread_index, queues_mask) in self.queues_per_thread.iter().enumerate() {
let shifted_queues_mask = queues_mask >> index;
if shifted_queues_mask & 1u64 == 1u64 {
let evt_idx = queues_mask.count_ones() - shifted_queues_mask.count_ones();
self.handlers[thread_index]
.unregister_event(fd.as_raw_fd(), EventSet::IN, u64::from(evt_idx))
.map_err(VhostUserError::ReqHandlerError)?;
break;
}
}
}
let next_avail = vring.queue_next_avail();
vring.set_kick(None);
vring.set_call(None);
Ok(VhostUserVringState::new(index, u32::from(next_avail)))
}
fn set_vring_kick(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
// SAFETY: EventFd requires that it has sole ownership of its fd. So
// does File, so this is safe.
// Ideally, we'd have a generic way to refer to a uniquely-owned fd,
// such as that proposed by Rust RFC #3128.
vring.set_kick(file);
if self.vring_needs_init(vring) {
self.initialize_vring(vring, index)?;
}
Ok(())
}
fn set_vring_call(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
vring.set_call(file);
if self.vring_needs_init(vring) {
self.initialize_vring(vring, index)?;
}
Ok(())
}
fn set_vring_err(&mut self, index: u8, file: Option<File>) -> VhostUserResult<()> {
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
vring.set_err(file);
Ok(())
}
fn get_protocol_features(&mut self) -> VhostUserResult<VhostUserProtocolFeatures> {
Ok(self.backend.protocol_features())
}
fn set_protocol_features(&mut self, features: u64) -> VhostUserResult<()> {
// Note: backend that reported VHOST_USER_F_PROTOCOL_FEATURES must
// support this message even before VHOST_USER_SET_FEATURES was
// called.
self.acked_protocol_features = features;
Ok(())
}
fn get_queue_num(&mut self) -> VhostUserResult<u64> {
Ok(self.num_queues as u64)
}
fn set_vring_enable(&mut self, index: u32, enable: bool) -> VhostUserResult<()> {
// This request should be handled only when VHOST_USER_F_PROTOCOL_FEATURES
// has been negotiated.
self.check_feature(VhostUserVirtioFeatures::PROTOCOL_FEATURES)?;
let vring = self
.vrings
.get(index as usize)
.ok_or_else(|| VhostUserError::InvalidParam)?;
// Backend must not pass data to/from the backend until ring is
// enabled by VHOST_USER_SET_VRING_ENABLE with parameter 1,
// or after it has been disabled by VHOST_USER_SET_VRING_ENABLE
// with parameter 0.
vring.set_enabled(enable);
Ok(())
}
fn get_config(
&mut self,
offset: u32,
size: u32,
_flags: VhostUserConfigFlags,
) -> VhostUserResult<Vec<u8>> {
Ok(self.backend.get_config(offset, size))
}
fn set_config(
&mut self,
offset: u32,
buf: &[u8],
_flags: VhostUserConfigFlags,
) -> VhostUserResult<()> {
self.backend
.set_config(offset, buf)
.map_err(VhostUserError::ReqHandlerError)
}
fn set_backend_req_fd(&mut self, backend: Backend) {
if self.acked_protocol_features & VhostUserProtocolFeatures::REPLY_ACK.bits() != 0 {
backend.set_reply_ack_flag(true);
}
self.backend.set_backend_req_fd(backend);
}
fn get_inflight_fd(
&mut self,
_inflight: &vhost::vhost_user::message::VhostUserInflight,
) -> VhostUserResult<(vhost::vhost_user::message::VhostUserInflight, File)> {
// Assume the backend hasn't negotiated the inflight feature; it
// wouldn't be correct for the backend to do so, as we don't (yet)
// provide a way for it to handle such requests.
Err(VhostUserError::InvalidOperation("not supported"))
}
fn set_inflight_fd(
&mut self,
_inflight: &vhost::vhost_user::message::VhostUserInflight,
_file: File,
) -> VhostUserResult<()> {
Err(VhostUserError::InvalidOperation("not supported"))
}
fn get_max_mem_slots(&mut self) -> VhostUserResult<u64> {
Ok(MAX_MEM_SLOTS)
}
fn add_mem_region(
&mut self,
region: &VhostUserSingleMemoryRegion,
file: File,
) -> VhostUserResult<()> {
let guest_region = Arc::new(
GuestRegionMmap::new(
region.mmap_region(file)?,
GuestAddress(region.guest_phys_addr),
)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?,
);
let mem = self
.atomic_mem
.memory()
.insert_region(guest_region)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.atomic_mem.lock().unwrap().replace(mem);
self.backend
.update_memory(self.atomic_mem.clone())
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.mappings.push(AddrMapping {
vmm_addr: region.user_addr,
size: region.memory_size,
gpa_base: region.guest_phys_addr,
});
Ok(())
}
fn remove_mem_region(&mut self, region: &VhostUserSingleMemoryRegion) -> VhostUserResult<()> {
let (mem, _) = self
.atomic_mem
.memory()
.remove_region(GuestAddress(region.guest_phys_addr), region.memory_size)
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.atomic_mem.lock().unwrap().replace(mem);
self.backend
.update_memory(self.atomic_mem.clone())
.map_err(|e| {
VhostUserError::ReqHandlerError(io::Error::new(io::ErrorKind::Other, e))
})?;
self.mappings
.retain(|mapping| mapping.gpa_base != region.guest_phys_addr);
Ok(())
}
}
impl<T: VhostUserBackend> Drop for VhostUserHandler<T> {
fn drop(&mut self) {
// Signal all working threads to exit.
self.send_exit_event();
for thread in self.worker_threads.drain(..) {
if let Err(e) = thread.join() {
error!("Error in vring worker: {:?}", e);
}
}
}
}