blob: 71fa80a6514c2038911e4734c675aa755e6348af [file] [log] [blame]
// Copyright 2017 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! This module implements the virtio wayland used by the guest to access the host's wayland server.
//!
//! The virtio wayland protocol is done over two queues: `in` and `out`. The `in` queue is used for
//! sending commands to the guest that are generated by the host, usually messages from the wayland
//! server. The `out` queue is for commands from the guest, usually requests to allocate shared
//! memory, open a wayland server connection, or send data over an existing connection.
//!
//! Each `WlVfd` represents one virtual file descriptor created by either the guest or the host.
//! Virtual file descriptors contain actual file descriptors, either a shared memory file descriptor
//! or a unix domain socket to the wayland server. In the shared memory case, there is also an
//! associated slot that indicates which KVM memory slot the memory is installed into, as well as a
//! page frame number that the guest can access the memory from.
//!
//! The types starting with `Ctrl` are structures representing the virtio wayland protocol "on the
//! wire." They are decoded/encoded as some variant of `WlOp` for requests and `WlResp` for
//! responses.
//!
//! There is one `WlState` instance that contains every known vfd and the current state of `in`
//! queue. The `in` queue requires extra state to buffer messages to the guest in case the `in`
//! queue is already full. The `WlState` also has a control socket necessary to fulfill certain
//! requests, such as those registering guest memory.
//!
//! The `Worker` is responsible for the poll loop over all possible events, encoding/decoding from
//! the virtio queue, and routing messages in and out of `WlState`. Possible events include the kill
//! event, available descriptors on the `in` or `out` queue, and incoming data on any vfd's socket.
use std::cell::RefCell;
use std::collections::btree_map::Entry;
use std::collections::{BTreeSet as Set, BTreeMap as Map, VecDeque};
use std::convert::From;
use std::ffi::CStr;
use std::fmt;
use std::fs::File;
use std::io;
use std::mem::size_of;
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{UnixDatagram, UnixStream};
use std::path::{PathBuf, Path};
use std::rc::Rc;
use std::result;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use data_model::*;
use data_model::VolatileMemoryError;
use sys_util::{Error, Result, EventFd, Poller, Pollable, Scm, SharedMemory, GuestAddress,
GuestMemory, GuestMemoryError};
use vm_control::{VmControlError, VmRequest, VmResponse, MaybeOwnedFd};
use super::{VirtioDevice, Queue, DescriptorChain, INTERRUPT_STATUS_USED_RING, TYPE_WL};
const VIRTWL_SEND_MAX_ALLOCS: usize = 28;
const VIRTIO_WL_CMD_VFD_NEW: u32 = 256;
const VIRTIO_WL_CMD_VFD_CLOSE: u32 = 257;
const VIRTIO_WL_CMD_VFD_SEND: u32 = 258;
const VIRTIO_WL_CMD_VFD_RECV: u32 = 259;
const VIRTIO_WL_CMD_VFD_NEW_CTX: u32 = 260;
const VIRTIO_WL_RESP_OK: u32 = 4096;
const VIRTIO_WL_RESP_VFD_NEW: u32 = 4097;
const VIRTIO_WL_RESP_ERR: u32 = 4352;
const VIRTIO_WL_RESP_OUT_OF_MEMORY: u32 = 4353;
const VIRTIO_WL_RESP_INVALID_ID: u32 = 4354;
const VIRTIO_WL_RESP_INVALID_TYPE: u32 = 4355;
const VIRTIO_WL_VFD_WRITE: u32 = 0x1;
const VIRTIO_WL_VFD_MAP: u32 = 0x2;
const VIRTIO_WL_VFD_CONTROL: u32 = 0x4;
const Q_IN: u32 = 0;
const Q_OUT: u32 = 1;
const KILL: u32 = 2;
const VFD_BASE_TOKEN: u32 = 0x100;
const QUEUE_SIZE: u16 = 16;
const QUEUE_SIZES: &'static [u16] = &[QUEUE_SIZE, QUEUE_SIZE];
const NEXT_VFD_ID_BASE: u32 = 0x40000000;
const VFD_ID_HOST_MASK: u32 = NEXT_VFD_ID_BASE;
const IN_BUFFER_LEN: usize = 4080;
const PAGE_MASK: u64 = 0x0fff;
fn round_to_page_size(v: u64) -> u64 {
(v + PAGE_MASK) & !PAGE_MASK
}
fn parse_new(addr: GuestAddress, mem: &GuestMemory) -> WlResult<WlOp> {
const ID_OFFSET: usize = 8;
const FLAGS_OFFSET: usize = 12;
const SIZE_OFFSET: usize = 24;
let id: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, ID_OFFSET)
.ok_or(WlError::CheckedOffset)?)?;
let flags: Le32 =
mem.read_obj_from_addr(mem.checked_offset(addr, FLAGS_OFFSET)
.ok_or(WlError::CheckedOffset)?)?;
let size: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, SIZE_OFFSET)
.ok_or(WlError::CheckedOffset)?)?;
Ok(WlOp::NewAlloc {
id: id.into(),
flags: flags.into(),
size: size.into(),
})
}
fn parse_send(addr: GuestAddress, len: u32, mem: &GuestMemory) -> WlResult<WlOp> {
const ID_OFFSET: usize = 8;
const VFD_COUNT_OFFSET: usize = 12;
const VFDS_OFFSET: usize = 16;
let id: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, ID_OFFSET)
.ok_or(WlError::CheckedOffset)?)?;
let vfd_count: Le32 =
mem.read_obj_from_addr(mem.checked_offset(addr, VFD_COUNT_OFFSET)
.ok_or(WlError::CheckedOffset)?)?;
let vfd_count: u32 = vfd_count.into();
let vfds_addr = mem.checked_offset(addr, VFDS_OFFSET)
.ok_or(WlError::CheckedOffset)?;
let data_addr = mem.checked_offset(vfds_addr, (vfd_count * 4) as usize)
.ok_or(WlError::CheckedOffset)?;
Ok(WlOp::Send {
id: id.into(),
vfds_addr: vfds_addr,
vfd_count: vfd_count,
data_addr: data_addr,
data_len: len - (VFDS_OFFSET as u32) - vfd_count * 4,
})
}
fn parse_id(addr: GuestAddress, mem: &GuestMemory) -> WlResult<u32> {
const ID_OFFSET: usize = 8;
let id: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, ID_OFFSET)
.ok_or(WlError::CheckedOffset)?)?;
Ok(id.into())
}
fn parse_desc(desc: &DescriptorChain, mem: &GuestMemory) -> WlResult<WlOp> {
let type_: Le32 = mem.read_obj_from_addr(desc.addr)?;
match type_.into() {
VIRTIO_WL_CMD_VFD_NEW => parse_new(desc.addr, mem),
VIRTIO_WL_CMD_VFD_CLOSE => Ok(WlOp::Close { id: parse_id(desc.addr, mem)? }),
VIRTIO_WL_CMD_VFD_SEND => parse_send(desc.addr, desc.len, mem),
VIRTIO_WL_CMD_VFD_NEW_CTX => Ok(WlOp::NewCtx { id: parse_id(desc.addr, mem)? }),
v => Ok(WlOp::Unsupported { op_type: v }),
}
}
fn encode_vfd_new(desc_mem: VolatileSlice,
resp: bool,
vfd_id: u32,
flags: u32,
pfn: u64,
size: u32)
-> WlResult<u32> {
let ctrl_vfd_new = CtrlVfdNew {
hdr: CtrlHeader {
type_: Le32::from(if resp {
VIRTIO_WL_RESP_VFD_NEW
} else {
VIRTIO_WL_CMD_VFD_NEW
}),
flags: Le32::from(0),
},
id: Le32::from(vfd_id),
flags: Le32::from(flags),
pfn: Le64::from(pfn),
size: Le32::from(size),
};
desc_mem.get_ref(0)?.store(ctrl_vfd_new);
Ok(size_of::<CtrlVfdNew>() as u32)
}
fn encode_vfd_recv(desc_mem: VolatileSlice,
vfd_id: u32,
data: &[u8],
vfd_ids: &[u32])
-> WlResult<u32> {
let ctrl_vfd_recv = CtrlVfdRecv {
hdr: CtrlHeader {
type_: Le32::from(VIRTIO_WL_CMD_VFD_RECV),
flags: Le32::from(0),
},
id: Le32::from(vfd_id),
vfd_count: Le32::from(vfd_ids.len() as u32),
};
desc_mem.get_ref(0)?.store(ctrl_vfd_recv);
let vfd_slice = desc_mem
.get_slice(size_of::<CtrlVfdRecv>(), vfd_ids.len() * size_of::<Le32>())?;
for (i, &recv_vfd_id) in vfd_ids.iter().enumerate() {
vfd_slice
.get_ref(size_of::<Le32>() * i)?
.store(recv_vfd_id);
}
let data_slice = desc_mem
.get_slice(size_of::<CtrlVfdRecv>() + vfd_ids.len() * size_of::<Le32>(),
data.len())?;
data_slice.copy_from(data);
Ok((size_of::<CtrlVfdRecv>() + vfd_ids.len() * size_of::<Le32>() + data.len()) as u32)
}
fn encode_resp(desc_mem: VolatileSlice, resp: WlResp) -> WlResult<u32> {
match resp {
WlResp::VfdNew {
id,
flags,
pfn,
size,
resp,
} => encode_vfd_new(desc_mem, resp, id, flags, pfn, size),
WlResp::VfdRecv { id, data, vfds } => encode_vfd_recv(desc_mem, id, data, vfds),
r => {
desc_mem.get_ref(0)?.store(Le32::from(r.get_code()));
Ok(size_of::<Le32>() as u32)
}
}
}
#[derive(Debug)]
enum WlError {
NewAlloc(Error),
AllocSetSize(Error),
AllocFromFile(Error),
SocketConnect(io::Error),
SocketNonBlock(io::Error),
VmControl(VmControlError),
VmBadResponse,
CheckedOffset,
GuestMemory(GuestMemoryError),
VolatileMemory(VolatileMemoryError),
SendVfd(Error),
RecvVfd(Error),
}
type WlResult<T> = result::Result<T, WlError>;
impl From<GuestMemoryError> for WlError {
fn from(e: GuestMemoryError) -> WlError {
WlError::GuestMemory(e)
}
}
impl From<VolatileMemoryError> for WlError {
fn from(e: VolatileMemoryError) -> WlError {
WlError::VolatileMemory(e)
}
}
#[derive(Clone)]
struct VmRequester {
inner: Rc<RefCell<(Scm, UnixDatagram)>>,
}
impl VmRequester {
fn new(vm_socket: UnixDatagram) -> VmRequester {
VmRequester { inner: Rc::new(RefCell::new((Scm::new(1), vm_socket))) }
}
fn request(&self, request: VmRequest) -> WlResult<VmResponse> {
let mut inner = self.inner.borrow_mut();
let (ref mut scm, ref mut vm_socket) = *inner;
request
.send(scm, vm_socket)
.map_err(WlError::VmControl)?;
VmResponse::recv(scm, vm_socket).map_err(WlError::VmControl)
}
}
#[repr(C)]
#[derive(Copy, Clone)]
struct CtrlHeader {
type_: Le32,
flags: Le32,
}
#[repr(C)]
#[derive(Copy, Clone)]
struct CtrlVfdNew {
hdr: CtrlHeader,
id: Le32,
flags: Le32,
pfn: Le64,
size: Le32,
}
unsafe impl DataInit for CtrlVfdNew {}
#[repr(C)]
#[derive(Copy, Clone)]
struct CtrlVfdRecv {
hdr: CtrlHeader,
id: Le32,
vfd_count: Le32,
}
unsafe impl DataInit for CtrlVfdRecv {}
#[derive(Debug)]
enum WlOp {
NewAlloc { id: u32, flags: u32, size: u32 },
Close { id: u32 },
Send {
id: u32,
vfds_addr: GuestAddress,
vfd_count: u32,
data_addr: GuestAddress,
data_len: u32,
},
NewCtx { id: u32 },
Unsupported { op_type: u32 },
}
#[derive(Debug)]
#[allow(dead_code)]
enum WlResp<'a> {
Ok,
VfdNew {
id: u32,
flags: u32,
pfn: u64,
size: u32,
// The VfdNew variant can be either a response or a command depending on this `resp`. This
// is important for the `get_code` method.
resp: bool,
},
VfdRecv {
id: u32,
data: &'a [u8],
vfds: &'a [u32],
},
Err,
OutOfMemory,
InvalidId,
InvalidType,
}
impl<'a> WlResp<'a> {
fn get_code(&self) -> u32 {
match self {
&WlResp::Ok => VIRTIO_WL_RESP_OK,
&WlResp::VfdNew { resp, .. } => {
if resp {
VIRTIO_WL_RESP_VFD_NEW
} else {
VIRTIO_WL_CMD_VFD_NEW
}
}
&WlResp::VfdRecv { .. } => VIRTIO_WL_CMD_VFD_RECV,
&WlResp::Err => VIRTIO_WL_RESP_ERR,
&WlResp::OutOfMemory => VIRTIO_WL_RESP_OUT_OF_MEMORY,
&WlResp::InvalidId => VIRTIO_WL_RESP_INVALID_ID,
&WlResp::InvalidType => VIRTIO_WL_RESP_INVALID_TYPE,
}
}
}
#[derive(Default)]
struct WlVfd {
socket: Option<UnixStream>,
guest_shared_memory: Option<SharedMemory>,
slot: Option<(u32 /* slot */, u64 /* pfn */, VmRequester)>,
}
impl fmt::Debug for WlVfd {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "WlVfd {{")?;
if let Some(ref s) = self.socket {
write!(f, " socket: {}", s.as_raw_fd())?;
}
if let Some(&(slot, pfn, _)) = self.slot.as_ref() {
write!(f, " slot: {} pfn: {}", slot, pfn)?;
}
write!(f, " }}")
}
}
impl WlVfd {
fn connect<P: AsRef<Path>>(path: P) -> WlResult<WlVfd> {
let socket = UnixStream::connect(path)
.map_err(WlError::SocketConnect)?;
socket
.set_nonblocking(true)
.map_err(WlError::SocketNonBlock)?;
Ok(WlVfd {
socket: Some(socket),
guest_shared_memory: None,
slot: None,
})
}
fn allocate(vm: VmRequester, size: u64) -> WlResult<WlVfd> {
let size_page_aligned = round_to_page_size(size);
let mut vfd_shm = SharedMemory::new(Some(CStr::from_bytes_with_nul(b"virtwl_alloc\0")
.unwrap()))
.map_err(WlError::NewAlloc)?;
vfd_shm
.set_size(size_page_aligned)
.map_err(WlError::AllocSetSize)?;
let register_response =
vm.request(VmRequest::RegisterMemory(MaybeOwnedFd::Borrowed(vfd_shm.as_raw_fd()),
vfd_shm.size() as usize))?;
match register_response {
VmResponse::RegisterMemory { pfn, slot } => {
Ok(WlVfd {
socket: None,
guest_shared_memory: Some(vfd_shm),
slot: Some((slot, pfn, vm)),
})
}
_ => Err(WlError::VmBadResponse),
}
}
fn from_file(vm: VmRequester, fd: File) -> WlResult<WlVfd> {
let vfd_shm = SharedMemory::from_raw_fd(fd)
.map_err(WlError::AllocFromFile)?;
let size = round_to_page_size(vfd_shm.size());
let register_response =
vm.request(VmRequest::RegisterMemory(MaybeOwnedFd::Borrowed(vfd_shm.as_raw_fd()),
size as usize))?;
match register_response {
VmResponse::RegisterMemory { pfn, slot } => {
Ok(WlVfd {
socket: None,
guest_shared_memory: Some(vfd_shm),
slot: Some((slot, pfn, vm)),
})
}
_ => Err(WlError::VmBadResponse),
}
}
fn flags(&self) -> u32 {
let mut flags = 0;
if self.socket.is_some() {
flags |= VIRTIO_WL_VFD_CONTROL;
}
if self.slot.is_some() {
flags |= VIRTIO_WL_VFD_WRITE | VIRTIO_WL_VFD_MAP
}
flags
}
fn pfn(&self) -> Option<u64> {
self.slot.as_ref().map(|s| s.1)
}
fn size(&self) -> Option<u64> {
self.guest_shared_memory.as_ref().map(|m| m.size())
}
fn fd(&self) -> Option<RawFd> {
self.guest_shared_memory
.as_ref()
.map(|m| m.as_raw_fd())
.or(self.socket.as_ref().map(|s| s.as_raw_fd()))
}
fn send(&mut self, scm: &mut Scm, fds: &[RawFd], data: VolatileSlice) -> WlResult<WlResp> {
match self.socket {
Some(ref socket) => {
scm.send(socket, &[data], fds)
.map_err(WlError::SendVfd)?;
Ok(WlResp::Ok)
}
None => Ok(WlResp::InvalidType),
}
}
fn recv(&mut self, scm: &mut Scm, in_file_queue: &mut Vec<File>) -> WlResult<Vec<u8>> {
// This awkward looking scope is to allow us to remove self.socket if we discover after
// borrowing it that the socket is disconnected.
{
let socket = match self.socket {
Some(ref s) => s,
None => return Ok(Vec::new()),
};
let mut buf = Vec::new();
buf.resize(IN_BUFFER_LEN, 0);
let old_len = in_file_queue.len();
let len = scm.recv(socket, &mut [&mut buf[..]], in_file_queue)
.map_err(WlError::RecvVfd)?;
// If any data gets read, the return statement avoids removing the socket.
if len != 0 || in_file_queue.len() != old_len {
buf.truncate(len);
buf.shrink_to_fit();
return Ok(buf);
}
}
self.socket = None;
Ok(Vec::new())
}
fn close(&mut self) -> WlResult<()> {
self.socket = None;
if let Some((slot, _, vm)) = self.slot.take() {
vm.request(VmRequest::UnregisterMemory(slot))?;
}
Ok(())
}
}
impl Drop for WlVfd {
fn drop(&mut self) {
let _ = self.close();
}
}
#[derive(Debug)]
enum WlRecv {
Vfd { id: u32 },
Data { buf: Vec<u8> },
}
struct WlState {
wayland_path: PathBuf,
vm: VmRequester,
vfds: Map<u32, WlVfd>,
next_vfd_id: u32,
scm: Scm,
in_file_queue: Vec<File>,
in_queue: VecDeque<(u32 /* vfd_id */, WlRecv)>,
current_recv_vfd: Option<u32>,
recv_vfds: Vec<u32>,
}
impl WlState {
fn new(wayland_path: PathBuf, vm_socket: UnixDatagram) -> WlState {
WlState {
wayland_path: wayland_path,
vm: VmRequester::new(vm_socket),
scm: Scm::new(VIRTWL_SEND_MAX_ALLOCS),
vfds: Map::new(),
next_vfd_id: NEXT_VFD_ID_BASE,
in_file_queue: Vec::new(),
in_queue: VecDeque::new(),
current_recv_vfd: None,
recv_vfds: Vec::new(),
}
}
fn new_alloc(&mut self, id: u32, flags: u32, size: u32) -> WlResult<WlResp> {
if id & VFD_ID_HOST_MASK != 0 {
return Ok(WlResp::InvalidId);
}
if flags & !(VIRTIO_WL_VFD_WRITE | VIRTIO_WL_VFD_MAP) != 0 {
return Ok(WlResp::Err);
}
match self.vfds.entry(id) {
Entry::Vacant(entry) => {
let vfd = WlVfd::allocate(self.vm.clone(), size as u64)?;
let resp = WlResp::VfdNew {
id: id,
flags: flags,
pfn: vfd.pfn().unwrap_or_default(),
size: vfd.size().unwrap_or_default() as u32,
resp: true,
};
entry.insert(vfd);
Ok(resp)
}
Entry::Occupied(_) => Ok(WlResp::InvalidId),
}
}
fn new_context(&mut self, id: u32) -> WlResult<WlResp> {
if id & VFD_ID_HOST_MASK != 0 {
return Ok(WlResp::InvalidId);
}
match self.vfds.entry(id) {
Entry::Vacant(entry) => {
entry.insert(WlVfd::connect(&self.wayland_path)?);
Ok(WlResp::VfdNew {
id: id,
flags: VIRTIO_WL_VFD_CONTROL,
pfn: 0,
size: 0,
resp: true,
})
}
Entry::Occupied(_) => Ok(WlResp::InvalidId),
}
}
fn close(&mut self, vfd_id: u32) -> WlResult<WlResp> {
let mut to_delete = Set::new();
for &(dest_vfd_id, ref q) in self.in_queue.iter() {
if dest_vfd_id == vfd_id {
if let &WlRecv::Vfd { id } = q {
to_delete.insert(id);
}
}
}
for vfd_id in to_delete {
// Sorry sub-error, we can't have cascading errors leaving us in an inconsistent state.
let _ = self.close(vfd_id);
}
match self.vfds.remove(&vfd_id) {
Some(mut vfd) => {
self.in_queue.retain(|&(id, _)| id != vfd_id);
vfd.close()?;
Ok(WlResp::Ok)
}
None => Ok(WlResp::InvalidId),
}
}
fn send(&mut self, vfd_id: u32, vfds: VolatileSlice, data: VolatileSlice) -> WlResult<WlResp> {
let vfd_count = vfds.size() / size_of::<Le32>();
let mut vfd_ids = [Le32::from(0); VIRTWL_SEND_MAX_ALLOCS];
vfds.copy_to(&mut vfd_ids[..]);
let mut fds = [0; VIRTWL_SEND_MAX_ALLOCS];
for (&id, fd) in vfd_ids[..vfd_count].iter().zip(fds.iter_mut()) {
match self.vfds.get(&id.into()) {
Some(vfd) => {
match vfd.fd() {
Some(vfd_fd) => *fd = vfd_fd,
None => return Ok(WlResp::InvalidType),
}
}
None => return Ok(WlResp::InvalidId),
}
}
match self.vfds.get_mut(&vfd_id) {
Some(vfd) => vfd.send(&mut self.scm, &fds[..vfd_count], data),
None => Ok(WlResp::InvalidId),
}
}
fn recv(&mut self, vfd_id: u32) -> WlResult<()> {
let buf = match self.vfds.get_mut(&vfd_id) {
Some(vfd) => vfd.recv(&mut self.scm, &mut self.in_file_queue)?,
None => return Ok(()),
};
for file in self.in_file_queue.drain(..) {
self.vfds
.insert(self.next_vfd_id, WlVfd::from_file(self.vm.clone(), file)?);
self.in_queue
.push_back((vfd_id, WlRecv::Vfd { id: self.next_vfd_id }));
self.next_vfd_id += 1;
}
self.in_queue
.push_back((vfd_id, WlRecv::Data { buf: buf }));
Ok(())
}
fn execute(&mut self, mem: &GuestMemory, op: WlOp) -> WlResult<WlResp> {
match op {
WlOp::NewAlloc { id, flags, size } => self.new_alloc(id, flags, size),
WlOp::Close { id } => self.close(id),
WlOp::Send {
id,
vfds_addr,
vfd_count,
data_addr,
data_len,
} => {
let vfd_mem = mem.get_slice(vfds_addr.0, (vfd_count as usize) * size_of::<Le32>())?;
let data_mem = mem.get_slice(data_addr.0, data_len as usize)?;
self.send(id, vfd_mem, data_mem)
}
WlOp::NewCtx { id } => self.new_context(id),
WlOp::Unsupported { .. } => Ok(WlResp::Err),
}
}
fn next_recv(&self) -> Option<WlResp> {
if let Some(q) = self.in_queue.front() {
match q {
&(vfd_id, WlRecv::Vfd { id }) => {
if self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id) {
match self.vfds.get(&id) {
Some(vfd) => {
Some(WlResp::VfdNew {
id: id,
flags: vfd.flags(),
pfn: vfd.pfn().unwrap_or_default(),
size: vfd.size().unwrap_or_default() as u32,
resp: false,
})
}
_ => {
Some(WlResp::VfdNew {
id: id,
flags: 0,
pfn: 0,
size: 0,
resp: false,
})
}
}
} else {
Some(WlResp::VfdRecv {
id: self.current_recv_vfd.unwrap(),
data: &[],
vfds: &self.recv_vfds[..],
})
}
}
&(vfd_id, WlRecv::Data { ref buf }) => {
if self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id) {
Some(WlResp::VfdRecv {
id: vfd_id,
data: &buf[..],
vfds: &self.recv_vfds[..],
})
} else {
Some(WlResp::VfdRecv {
id: self.current_recv_vfd.unwrap(),
data: &[],
vfds: &self.recv_vfds[..],
})
}
}
}
} else {
None
}
}
fn pop_recv(&mut self) {
if let Some(q) = self.in_queue.front() {
match q {
&(vfd_id, WlRecv::Vfd { id }) => {
if self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id) {
self.recv_vfds.push(id);
self.current_recv_vfd = Some(vfd_id);
} else {
self.recv_vfds.clear();
self.current_recv_vfd = None;
return;
}
}
&(vfd_id, WlRecv::Data { .. }) => {
self.recv_vfds.clear();
self.current_recv_vfd = None;
if !(self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id)) {
return;
}
}
}
}
self.in_queue.pop_front();
}
fn iter_sockets<'a, F>(&'a self, mut f: F)
where F: FnMut(u32, &'a UnixStream)
{
for (id, socket) in self.vfds
.iter()
.filter_map(|(&k, v)| v.socket.as_ref().map(|s| (k, s))) {
f(id, &socket);
}
}
}
struct Worker {
mem: GuestMemory,
interrupt_evt: EventFd,
interrupt_status: Arc<AtomicUsize>,
in_queue: Queue,
out_queue: Queue,
state: WlState,
in_desc_chains: VecDeque<(u16, GuestAddress, u32)>,
}
impl Worker {
fn new(mem: GuestMemory,
interrupt_evt: EventFd,
interrupt_status: Arc<AtomicUsize>,
in_queue: Queue,
out_queue: Queue,
wayland_path: PathBuf,
vm_socket: UnixDatagram)
-> Worker {
Worker {
mem: mem,
interrupt_evt: interrupt_evt,
interrupt_status: interrupt_status,
in_queue: in_queue,
out_queue: out_queue,
state: WlState::new(wayland_path, vm_socket),
in_desc_chains: VecDeque::with_capacity(QUEUE_SIZE as usize),
}
}
fn signal_used_queue(&self) {
self.interrupt_status
.fetch_or(INTERRUPT_STATUS_USED_RING as usize, Ordering::SeqCst);
let _ = self.interrupt_evt.write(1);
}
fn run(&mut self, mut queue_evts: Vec<EventFd>, kill_evt: EventFd) {
let in_queue_evt = queue_evts.remove(0);
let out_queue_evt = queue_evts.remove(0);
let mut token_vfd_id_map = Map::new();
let mut poller = Poller::new(3);
'poll: loop {
let tokens = {
// TODO(zachr): somehow keep pollables from allocating every loop
// The capacity is always the 3 static eventfds plus the number of vfd sockets. To
// estimate the number of vfd sockets, we use the previous poll's vfd id map size,
// which was equal to the number of vfd sockets.
let mut pollables = Vec::with_capacity(3 + token_vfd_id_map.len());
pollables.push((Q_IN, &in_queue_evt as &Pollable));
pollables.push((Q_OUT, &out_queue_evt as &Pollable));
pollables.push((KILL, &kill_evt as &Pollable));
token_vfd_id_map.clear();
// TODO(zachr): leave these out if there is no Q_IN to use
self.state
.iter_sockets(|id, socket| {
let token = VFD_BASE_TOKEN + token_vfd_id_map.len() as u32;
token_vfd_id_map.insert(token, id);
pollables.push((token, socket));
});
poller.poll(&pollables[..]).expect("error: failed poll")
};
let mut signal_used = false;
for &token in tokens {
match token {
Q_IN => {
let _ = in_queue_evt.read();
// Used to buffer descriptor indexes that are invalid for our uses.
let mut rejects = [0u16; QUEUE_SIZE as usize];
let mut rejects_len = 0;
let min_in_desc_len = (size_of::<CtrlVfdRecv>() +
size_of::<Le32>() * VIRTWL_SEND_MAX_ALLOCS) as
u32;
self.in_desc_chains.extend(self.in_queue.iter(&self.mem).filter_map(|d| {
if d.len >= min_in_desc_len && d.is_write_only() {
Some((d.index, d.addr, d.len))
} else {
// Can not use queue.add_used directly because it's being borrowed
// for the iterator chain, so we buffer the descriptor index in
// rejects.
rejects[rejects_len] = d.index;
rejects_len += 1;
None
}
}));
for &reject in &rejects[..rejects_len] {
signal_used = true;
self.in_queue.add_used(&self.mem, reject, 0);
}
}
Q_OUT => {
let _ = out_queue_evt.read();
// Used to buffer filled in descriptors that will be added to the used queue
// after iterating the available queue.
let mut used_descs = [(0u16, 0u32); QUEUE_SIZE as usize];
let mut used_descs_len = 0;
let min_resp_desc_len = size_of::<CtrlHeader>() as u32;
for desc in self.out_queue.iter(&self.mem) {
// Expects that each descriptor chain is made of one "in" followed by
// one "out" descriptor.
if !desc.is_write_only() {
if let Some(resp_desc) = desc.next_descriptor() {
if resp_desc.is_write_only() &&
resp_desc.len >= min_resp_desc_len {
let resp = match parse_desc(&desc, &self.mem) {
Ok(op) => {
match self.state.execute(&self.mem, op) {
Ok(r) => r,
_ => WlResp::Err,
}
}
_ => WlResp::Err,
};
let resp_mem = self.mem
.get_slice(resp_desc.addr.0, resp_desc.len as usize)
.unwrap();
let used_len = encode_resp(resp_mem, resp)
.unwrap_or_default();
used_descs[used_descs_len] = (desc.index, used_len);
}
}
} else {
// Chains that are unusable get sent straight back to the used
// queue.
used_descs[used_descs_len] = (desc.index, 0);
}
used_descs_len += 1;
}
for &(index, len) in &used_descs[..used_descs_len] {
signal_used = true;
self.out_queue.add_used(&self.mem, index, len);
}
}
KILL => break 'poll,
v => {
if let Some(&id) = token_vfd_id_map.get(&v) {
let res = self.state.recv(id);
if let Err(e) = res {
error!("failed to receive vfd {}: {:?}", id, e);
}
}
}
}
}
// Because this loop should be retried after the in queue is usable or after one of the
// VFDs was read, we do it after the poll event responses.
while !self.in_desc_chains.is_empty() {
let mut should_pop = false;
if let Some(in_resp) = self.state.next_recv() {
// self.in_desc_chains is not empty (checked by loop condition) so unwrap is
// safe.
let (index, addr, desc_len) = self.in_desc_chains.pop_front().unwrap();
// This memory location is valid because it came from a queue which always
// checks the descriptor memory locations.
let desc_mem = self.mem.get_slice(addr.0, desc_len as usize).unwrap();
let len = match encode_resp(desc_mem, in_resp) {
Ok(len) => {
should_pop = true;
len
}
Err(e) => {
error!("failed to encode response to descriptor chain: {:?}", e);
0
}
};
signal_used = true;
self.in_queue.add_used(&self.mem, index, len);
} else {
break;
}
if should_pop {
self.state.pop_recv();
}
}
if signal_used {
self.signal_used_queue();
}
}
}
}
pub struct Wl {
kill_evt: Option<EventFd>,
wayland_path: PathBuf,
vm_socket: Option<UnixDatagram>,
}
impl Wl {
pub fn new<P: AsRef<Path>>(wayland_path: P, vm_socket: UnixDatagram) -> Result<Wl> {
// let kill_evt = EventFd::new()?;
// workers_kill_evt: Some(kill_evt.try_clone()?),
Ok(Wl {
kill_evt: None,
wayland_path: wayland_path.as_ref().to_owned(),
vm_socket: Some(vm_socket),
})
}
}
impl Drop for Wl {
fn drop(&mut self) {
if let Some(kill_evt) = self.kill_evt.take() {
// Ignore the result because there is nothing we can do about it.
let _ = kill_evt.write(1);
}
}
}
impl VirtioDevice for Wl {
fn keep_fds(&self) -> Vec<RawFd> {
let mut keep_fds = Vec::new();
if let Some(ref vm_socket) = self.vm_socket {
keep_fds.push(vm_socket.as_raw_fd());
}
keep_fds
}
fn device_type(&self) -> u32 {
TYPE_WL
}
fn queue_max_sizes(&self) -> &[u16] {
QUEUE_SIZES
}
fn activate(&mut self,
mem: GuestMemory,
interrupt_evt: EventFd,
status: Arc<AtomicUsize>,
mut queues: Vec<Queue>,
queue_evts: Vec<EventFd>) {
if queues.len() != QUEUE_SIZES.len() || queue_evts.len() != QUEUE_SIZES.len() {
return;
}
let (self_kill_evt, kill_evt) =
match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) {
Ok(v) => v,
Err(e) => {
error!("failed creating kill EventFd pair: {:?}", e);
return;
}
};
self.kill_evt = Some(self_kill_evt);
if let Some(vm_socket) = self.vm_socket.take() {
let wayland_path = self.wayland_path.clone();
let worker_result = thread::Builder::new()
.name("virtio_wl".to_string())
.spawn(move || {
Worker::new(mem,
interrupt_evt,
status,
queues.remove(0),
queues.remove(0),
wayland_path,
vm_socket)
.run(queue_evts, kill_evt);
});
if let Err(e) = worker_result {
error!("failed to spawn virtio_wl worker: {}", e);
return;
}
}
}
}