| // Copyright 2020 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| use std::io::{self, Read, Write}; |
| use std::os::unix::io::RawFd; |
| use std::sync::mpsc::{channel, Receiver, TryRecvError}; |
| use std::thread; |
| |
| use base::{error, Event, PollContext, PollToken}; |
| use data_model::{DataInit, Le16, Le32}; |
| use vm_memory::GuestMemory; |
| |
| use super::{ |
| base_features, copy_config, Interrupt, Queue, Reader, VirtioDevice, Writer, TYPE_CONSOLE, |
| }; |
| use crate::SerialDevice; |
| |
| const QUEUE_SIZE: u16 = 256; |
| |
| // For now, just implement port 0 (receiveq and transmitq). |
| // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed. |
| const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE]; |
| |
| #[derive(Copy, Clone, Debug, Default)] |
| #[repr(C)] |
| struct virtio_console_config { |
| cols: Le16, |
| rows: Le16, |
| max_nr_ports: Le32, |
| emerg_wr: Le32, |
| } |
| |
| // Safe because it only has data and has no implicit padding. |
| unsafe impl DataInit for virtio_console_config {} |
| |
| struct Worker { |
| mem: GuestMemory, |
| interrupt: Interrupt, |
| input: Option<Box<dyn io::Read + Send>>, |
| output: Option<Box<dyn io::Write + Send>>, |
| } |
| |
| fn write_output(output: &mut Box<dyn io::Write>, data: &[u8]) -> io::Result<()> { |
| output.write_all(&data)?; |
| output.flush() |
| } |
| |
| impl Worker { |
| fn process_transmit_request( |
| mut reader: Reader, |
| output: &mut Box<dyn io::Write>, |
| ) -> io::Result<u32> { |
| let len = reader.available_bytes(); |
| let mut data = vec![0u8; len]; |
| reader.read_exact(&mut data)?; |
| write_output(output, &data)?; |
| Ok(0) |
| } |
| |
| fn process_transmit_queue( |
| &mut self, |
| transmit_queue: &mut Queue, |
| output: &mut Box<dyn io::Write>, |
| ) { |
| let mut needs_interrupt = false; |
| while let Some(avail_desc) = transmit_queue.pop(&self.mem) { |
| let desc_index = avail_desc.index; |
| |
| let reader = match Reader::new(self.mem.clone(), avail_desc) { |
| Ok(r) => r, |
| Err(e) => { |
| error!("console: failed to create reader: {}", e); |
| transmit_queue.add_used(&self.mem, desc_index, 0); |
| needs_interrupt = true; |
| continue; |
| } |
| }; |
| |
| let len = match Self::process_transmit_request(reader, output) { |
| Ok(written) => written, |
| Err(e) => { |
| error!("console: process_transmit_request failed: {}", e); |
| 0 |
| } |
| }; |
| |
| transmit_queue.add_used(&self.mem, desc_index, len); |
| needs_interrupt = true; |
| } |
| |
| if needs_interrupt { |
| self.interrupt.signal_used_queue(transmit_queue.vector); |
| } |
| } |
| |
| // Start a thread that reads self.input and sends the input back via the returned channel. |
| // |
| // `in_avail_evt` will be triggered by the thread when new input is available. |
| fn spawn_input_thread(&mut self, in_avail_evt: &Event) -> Option<Receiver<Vec<u8>>> { |
| let mut rx = match self.input.take() { |
| Some(input) => input, |
| None => return None, |
| }; |
| |
| let (send_channel, recv_channel) = channel(); |
| |
| let thread_in_avail_evt = match in_avail_evt.try_clone() { |
| Ok(evt) => evt, |
| Err(e) => { |
| error!("failed to clone in_avail_evt: {}", e); |
| return None; |
| } |
| }; |
| |
| // The input thread runs in detached mode and will exit when channel is disconnected because |
| // the console device has been dropped. |
| let res = thread::Builder::new() |
| .name(format!("console_input")) |
| .spawn(move || { |
| loop { |
| let mut rx_buf = vec![0u8; 1 << 12]; |
| match rx.read(&mut rx_buf) { |
| Ok(0) => break, // Assume the stream of input has ended. |
| Ok(size) => { |
| rx_buf.truncate(size); |
| if send_channel.send(rx_buf).is_err() { |
| // The receiver has disconnected. |
| break; |
| } |
| thread_in_avail_evt.write(1).unwrap(); |
| } |
| Err(e) => { |
| // Being interrupted is not an error, but everything else is. |
| if e.kind() != io::ErrorKind::Interrupted { |
| error!( |
| "failed to read for bytes to queue into console device: {}", |
| e |
| ); |
| break; |
| } |
| } |
| } |
| } |
| }); |
| if let Err(e) = res { |
| error!("failed to spawn input thread: {}", e); |
| return None; |
| } |
| Some(recv_channel) |
| } |
| |
| // Check for input from `in_channel_opt` and transfer it to the receive queue, if any. |
| fn handle_input( |
| &mut self, |
| in_channel_opt: &mut Option<Receiver<Vec<u8>>>, |
| receive_queue: &mut Queue, |
| ) { |
| let in_channel = match in_channel_opt.as_ref() { |
| Some(v) => v, |
| None => return, |
| }; |
| |
| while let Some(desc) = receive_queue.peek(&self.mem) { |
| let desc_index = desc.index; |
| let mut writer = match Writer::new(self.mem.clone(), desc) { |
| Ok(w) => w, |
| Err(e) => { |
| error!("console: failed to create Writer: {}", e); |
| break; |
| } |
| }; |
| |
| let mut disconnected = false; |
| while writer.available_bytes() > 0 { |
| match in_channel.try_recv() { |
| Ok(data) => { |
| writer.write_all(&data).unwrap(); |
| } |
| Err(TryRecvError::Empty) => break, |
| Err(TryRecvError::Disconnected) => { |
| disconnected = true; |
| break; |
| } |
| } |
| } |
| |
| let bytes_written = writer.bytes_written() as u32; |
| |
| if bytes_written > 0 { |
| receive_queue.pop_peeked(&self.mem); |
| receive_queue.add_used(&self.mem, desc_index, bytes_written); |
| self.interrupt.signal_used_queue(receive_queue.vector); |
| } |
| |
| if disconnected { |
| // Set in_channel to None so that future handle_input calls exit early. |
| in_channel_opt.take(); |
| return; |
| } |
| |
| if bytes_written == 0 { |
| break; |
| } |
| } |
| } |
| |
| fn run(&mut self, mut queues: Vec<Queue>, mut queue_evts: Vec<Event>, kill_evt: Event) { |
| #[derive(PollToken)] |
| enum Token { |
| ReceiveQueueAvailable, |
| TransmitQueueAvailable, |
| InputAvailable, |
| InterruptResample, |
| Kill, |
| } |
| |
| // Device -> driver |
| let (mut receive_queue, receive_evt) = (queues.remove(0), queue_evts.remove(0)); |
| |
| // Driver -> device |
| let (mut transmit_queue, transmit_evt) = (queues.remove(0), queue_evts.remove(0)); |
| |
| let in_avail_evt = match Event::new() { |
| Ok(evt) => evt, |
| Err(e) => { |
| error!("failed creating Event: {}", e); |
| return; |
| } |
| }; |
| |
| // Spawn a separate thread to poll self.input. |
| // A thread is used because io::Read only provides a blocking interface, and there is no |
| // generic way to add an io::Read instance to a poll context (it may not be backed by a file |
| // descriptor). Moving the blocking read call to a separate thread and sending data back to |
| // the main worker thread with an event for notification bridges this gap. |
| let mut in_channel = self.spawn_input_thread(&in_avail_evt); |
| |
| let poll_ctx: PollContext<Token> = match PollContext::build_with(&[ |
| (&transmit_evt, Token::TransmitQueueAvailable), |
| (&receive_evt, Token::ReceiveQueueAvailable), |
| (&in_avail_evt, Token::InputAvailable), |
| (self.interrupt.get_resample_evt(), Token::InterruptResample), |
| (&kill_evt, Token::Kill), |
| ]) { |
| Ok(pc) => pc, |
| Err(e) => { |
| error!("failed creating PollContext: {}", e); |
| return; |
| } |
| }; |
| |
| let mut output: Box<dyn io::Write> = match self.output.take() { |
| Some(o) => o, |
| None => Box::new(io::sink()), |
| }; |
| |
| 'poll: loop { |
| let events = match poll_ctx.wait() { |
| Ok(v) => v, |
| Err(e) => { |
| error!("failed polling for events: {}", e); |
| break; |
| } |
| }; |
| |
| for event in events.iter_readable() { |
| match event.token() { |
| Token::TransmitQueueAvailable => { |
| if let Err(e) = transmit_evt.read() { |
| error!("failed reading transmit queue Event: {}", e); |
| break 'poll; |
| } |
| self.process_transmit_queue(&mut transmit_queue, &mut output); |
| } |
| Token::ReceiveQueueAvailable => { |
| if let Err(e) = receive_evt.read() { |
| error!("failed reading receive queue Event: {}", e); |
| break 'poll; |
| } |
| self.handle_input(&mut in_channel, &mut receive_queue); |
| } |
| Token::InputAvailable => { |
| if let Err(e) = in_avail_evt.read() { |
| error!("failed reading in_avail_evt: {}", e); |
| break 'poll; |
| } |
| self.handle_input(&mut in_channel, &mut receive_queue); |
| } |
| Token::InterruptResample => { |
| self.interrupt.interrupt_resample(); |
| } |
| Token::Kill => break 'poll, |
| } |
| } |
| } |
| } |
| } |
| |
| /// Virtio console device. |
| pub struct Console { |
| base_features: u64, |
| kill_evt: Option<Event>, |
| worker_thread: Option<thread::JoinHandle<Worker>>, |
| input: Option<Box<dyn io::Read + Send>>, |
| output: Option<Box<dyn io::Write + Send>>, |
| keep_fds: Vec<RawFd>, |
| } |
| |
| impl SerialDevice for Console { |
| fn new( |
| _evt: Event, |
| input: Option<Box<dyn io::Read + Send>>, |
| output: Option<Box<dyn io::Write + Send>>, |
| keep_fds: Vec<RawFd>, |
| ) -> Console { |
| Console { |
| base_features: base_features(), |
| kill_evt: None, |
| worker_thread: None, |
| input, |
| output, |
| keep_fds, |
| } |
| } |
| } |
| |
| impl Drop for Console { |
| fn drop(&mut self) { |
| if let Some(kill_evt) = self.kill_evt.take() { |
| // Ignore the result because there is nothing we can do about it. |
| let _ = kill_evt.write(1); |
| } |
| |
| if let Some(worker_thread) = self.worker_thread.take() { |
| let _ = worker_thread.join(); |
| } |
| } |
| } |
| |
| impl VirtioDevice for Console { |
| fn keep_fds(&self) -> Vec<RawFd> { |
| self.keep_fds.clone() |
| } |
| |
| fn features(&self) -> u64 { |
| self.base_features |
| } |
| |
| fn device_type(&self) -> u32 { |
| TYPE_CONSOLE |
| } |
| |
| fn queue_max_sizes(&self) -> &[u16] { |
| QUEUE_SIZES |
| } |
| |
| fn read_config(&self, offset: u64, data: &mut [u8]) { |
| let config = virtio_console_config { |
| max_nr_ports: 1.into(), |
| ..Default::default() |
| }; |
| copy_config(data, 0, config.as_slice(), offset); |
| } |
| |
| fn activate( |
| &mut self, |
| mem: GuestMemory, |
| interrupt: Interrupt, |
| queues: Vec<Queue>, |
| queue_evts: Vec<Event>, |
| ) { |
| if queues.len() < 2 || queue_evts.len() < 2 { |
| return; |
| } |
| |
| let (self_kill_evt, kill_evt) = match Event::new().and_then(|e| Ok((e.try_clone()?, e))) { |
| Ok(v) => v, |
| Err(e) => { |
| error!("failed creating kill Event pair: {}", e); |
| return; |
| } |
| }; |
| self.kill_evt = Some(self_kill_evt); |
| |
| let input = self.input.take(); |
| let output = self.output.take(); |
| |
| let worker_result = thread::Builder::new() |
| .name("virtio_console".to_string()) |
| .spawn(move || { |
| let mut worker = Worker { |
| mem, |
| interrupt, |
| input, |
| output, |
| }; |
| worker.run(queues, queue_evts, kill_evt); |
| worker |
| }); |
| |
| match worker_result { |
| Err(e) => { |
| error!("failed to spawn virtio_console worker: {}", e); |
| return; |
| } |
| Ok(join_handle) => { |
| self.worker_thread = Some(join_handle); |
| } |
| } |
| } |
| |
| fn reset(&mut self) -> bool { |
| if let Some(kill_evt) = self.kill_evt.take() { |
| if kill_evt.write(1).is_err() { |
| error!("{}: failed to notify the kill event", self.debug_label()); |
| return false; |
| } |
| } |
| |
| if let Some(worker_thread) = self.worker_thread.take() { |
| match worker_thread.join() { |
| Err(_) => { |
| error!("{}: failed to get back resources", self.debug_label()); |
| return false; |
| } |
| Ok(worker) => { |
| self.input = worker.input; |
| self.output = worker.output; |
| return true; |
| } |
| } |
| } |
| false |
| } |
| } |