From 8352f4d91224939a431f2abac122174c37b3dd3f Mon Sep 17 00:00:00 2001 From: Matej Hrica Date: Mon, 12 Jan 2026 11:47:46 +0100 Subject: [PATCH 1/3] devices: Refactor to make virtio queues managed by the transport layer (mmio) Refactor the MmioTransport to fully construct the virtqueues on behalf of the device. Signed-off-by: Matej Hrica --- src/devices/src/virtio/balloon/device.rs | 61 +++--- .../src/virtio/balloon/event_handler.rs | 59 +++--- src/devices/src/virtio/balloon/mod.rs | 5 +- src/devices/src/virtio/block/device.rs | 41 ++-- src/devices/src/virtio/block/mod.rs | 6 +- src/devices/src/virtio/block/worker.rs | 30 +-- src/devices/src/virtio/console/device.rs | 99 +++++---- src/devices/src/virtio/device.rs | 58 ++++-- src/devices/src/virtio/fs/device.rs | 66 ++---- src/devices/src/virtio/fs/mod.rs | 5 +- src/devices/src/virtio/fs/worker.rs | 4 +- src/devices/src/virtio/gpu/device.rs | 99 +++------ src/devices/src/virtio/gpu/event_handler.rs | 37 ++-- src/devices/src/virtio/gpu/mod.rs | 1 - src/devices/src/virtio/input/device.rs | 66 ++---- src/devices/src/virtio/input/mod.rs | 10 +- src/devices/src/virtio/input/worker.rs | 41 ++-- src/devices/src/virtio/mmio.rs | 194 +++++++++++------- src/devices/src/virtio/mod.rs | 4 +- src/devices/src/virtio/net/device.rs | 58 ++---- src/devices/src/virtio/net/mod.rs | 10 +- src/devices/src/virtio/net/worker.rs | 49 ++--- src/devices/src/virtio/queue.rs | 2 +- src/devices/src/virtio/rng/device.rs | 68 +++--- src/devices/src/virtio/rng/event_handler.rs | 11 +- src/devices/src/virtio/rng/mod.rs | 5 +- src/devices/src/virtio/snd/device.rs | 55 ++--- src/devices/src/virtio/snd/mod.rs | 5 +- src/devices/src/virtio/snd/worker.rs | 20 +- src/devices/src/virtio/vsock/device.rs | 103 +++++----- src/devices/src/virtio/vsock/mod.rs | 7 +- src/vmm/src/device_manager/hvf/mmio.rs | 91 +++----- src/vmm/src/device_manager/kvm/mmio.rs | 54 ++--- 33 files changed, 647 insertions(+), 777 deletions(-) diff --git a/src/devices/src/virtio/balloon/device.rs b/src/devices/src/virtio/balloon/device.rs index 345c23c5d..9b34da82b 100644 --- a/src/devices/src/virtio/balloon/device.rs +++ b/src/devices/src/virtio/balloon/device.rs @@ -6,7 +6,8 @@ use utils::eventfd::EventFd; use vm_memory::{ByteValued, GuestMemory, GuestMemoryMmap}; use super::super::{ - ActivateError, ActivateResult, BalloonError, DeviceState, Queue as VirtQueue, VirtioDevice, + ActivateError, ActivateResult, BalloonError, DeviceQueue, DeviceState, QueueConfig, + VirtioDevice, }; use super::{defs, defs::uapi}; use crate::virtio::InterruptTransport; @@ -45,8 +46,7 @@ pub struct VirtioBalloonConfig { unsafe impl ByteValued for VirtioBalloonConfig {} pub struct Balloon { - pub(crate) queues: Vec, - pub(crate) queue_events: Vec, + pub(crate) queues: Option>, pub(crate) avail_features: u64, pub(crate) acked_features: u64, pub(crate) activate_evt: EventFd, @@ -55,35 +55,18 @@ pub struct Balloon { } impl Balloon { - pub(crate) fn with_queues(queues: Vec) -> super::Result { - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(BalloonError::EventFd)?); - } - - let config = VirtioBalloonConfig::default(); - + pub fn new() -> super::Result { Ok(Balloon { - queues, - queue_events, + queues: None, avail_features: AVAIL_FEATURES, acked_features: 0, activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) .map_err(BalloonError::EventFd)?, device_state: DeviceState::Inactive, - config, + config: VirtioBalloonConfig::default(), }) } - pub fn new() -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues(queues) - } - pub fn id(&self) -> &str { defs::BALLOON_DEV_ID } @@ -96,9 +79,13 @@ impl Balloon { DeviceState::Inactive => unreachable!(), }; + let queues = self + .queues + .as_mut() + .expect("queues should exist when activated"); let mut have_used = false; - while let Some(head) = self.queues[FRQ_INDEX].pop(mem) { + while let Some(head) = queues[FRQ_INDEX].queue.pop(mem) { let index = head.index; for desc in head.into_iter() { let host_addr = mem.get_host_address(desc.addr).unwrap(); @@ -116,7 +103,7 @@ impl Balloon { } have_used = true; - if let Err(e) = self.queues[FRQ_INDEX].add_used(mem, index, 0) { + if let Err(e) = queues[FRQ_INDEX].queue.add_used(mem, index, 0) { error!("failed to add used elements to the queue: {e:?}"); } } @@ -146,16 +133,8 @@ impl VirtioDevice for Balloon { "balloon" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &defs::QUEUE_CONFIG } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -180,12 +159,17 @@ impl VirtioDevice for Balloon { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { - if self.queues.len() != defs::NUM_QUEUES { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { + if queues.len() != defs::NUM_QUEUES { error!( "Cannot perform activate. Expected {} queue(s), got {}", defs::NUM_QUEUES, - self.queues.len() + queues.len() ); return Err(ActivateError::BadActivate); } @@ -195,6 +179,7 @@ impl VirtioDevice for Balloon { return Err(ActivateError::BadActivate); } + self.queues = Some(queues); self.device_state = DeviceState::Activated(mem, interrupt); Ok(()) diff --git a/src/devices/src/virtio/balloon/event_handler.rs b/src/devices/src/virtio/balloon/event_handler.rs index 3ac23ff4e..2558d6e14 100644 --- a/src/devices/src/virtio/balloon/event_handler.rs +++ b/src/devices/src/virtio/balloon/event_handler.rs @@ -7,6 +7,10 @@ use super::device::{Balloon, DFQ_INDEX, FRQ_INDEX, IFQ_INDEX, PHQ_INDEX, STQ_IND use crate::virtio::device::VirtioDevice; impl Balloon { + fn queue_event(&self, idx: usize) -> &std::sync::Arc { + &self.queues.as_ref().expect("queues should exist")[idx].event + } + pub(crate) fn handle_ifq_event(&mut self, event: &EpollEvent) { error!("balloon: unsupported inflate queue event"); @@ -16,7 +20,7 @@ impl Balloon { return; } - if let Err(e) = self.queue_events[IFQ_INDEX].read() { + if let Err(e) = self.queue_event(IFQ_INDEX).read() { error!("Failed to read balloon inflate queue event: {e:?}"); } } @@ -30,7 +34,7 @@ impl Balloon { return; } - if let Err(e) = self.queue_events[DFQ_INDEX].read() { + if let Err(e) = self.queue_event(DFQ_INDEX).read() { error!("Failed to read balloon inflate queue event: {e:?}"); } } @@ -44,7 +48,7 @@ impl Balloon { return; } - if let Err(e) = self.queue_events[STQ_INDEX].read() { + if let Err(e) = self.queue_event(STQ_INDEX).read() { error!("Failed to read balloon stats queue event: {e:?}"); } } @@ -58,7 +62,7 @@ impl Balloon { return; } - if let Err(e) = self.queue_events[PHQ_INDEX].read() { + if let Err(e) = self.queue_event(PHQ_INDEX).read() { error!("Failed to read balloon page-hinting queue event: {e:?}"); } } @@ -72,7 +76,7 @@ impl Balloon { return; } - if let Err(e) = self.queue_events[FRQ_INDEX].read() { + if let Err(e) = self.queue_event(FRQ_INDEX).read() { error!("Failed to read balloon free-page reporting queue event: {e:?}"); } else if self.process_frq() { self.device_state.signal_used_queue(); @@ -93,11 +97,8 @@ impl Balloon { event_manager .register( - self.queue_events[IFQ_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[IFQ_INDEX].as_raw_fd() as u64, - ), + self.queue_event(IFQ_INDEX).as_raw_fd(), + EpollEvent::new(EventSet::IN, self.queue_event(IFQ_INDEX).as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -106,11 +107,8 @@ impl Balloon { event_manager .register( - self.queue_events[DFQ_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[DFQ_INDEX].as_raw_fd() as u64, - ), + self.queue_event(DFQ_INDEX).as_raw_fd(), + EpollEvent::new(EventSet::IN, self.queue_event(DFQ_INDEX).as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -119,11 +117,8 @@ impl Balloon { event_manager .register( - self.queue_events[STQ_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[STQ_INDEX].as_raw_fd() as u64, - ), + self.queue_event(STQ_INDEX).as_raw_fd(), + EpollEvent::new(EventSet::IN, self.queue_event(STQ_INDEX).as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -132,11 +127,8 @@ impl Balloon { event_manager .register( - self.queue_events[PHQ_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[PHQ_INDEX].as_raw_fd() as u64, - ), + self.queue_event(PHQ_INDEX).as_raw_fd(), + EpollEvent::new(EventSet::IN, self.queue_event(PHQ_INDEX).as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -145,11 +137,8 @@ impl Balloon { event_manager .register( - self.queue_events[FRQ_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[FRQ_INDEX].as_raw_fd() as u64, - ), + self.queue_event(FRQ_INDEX).as_raw_fd(), + EpollEvent::new(EventSet::IN, self.queue_event(FRQ_INDEX).as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -167,11 +156,11 @@ impl Balloon { impl Subscriber for Balloon { fn process(&mut self, event: &EpollEvent, event_manager: &mut EventManager) { let source = event.fd(); - let ifq = self.queue_events[IFQ_INDEX].as_raw_fd(); - let dfq = self.queue_events[DFQ_INDEX].as_raw_fd(); - let stq = self.queue_events[STQ_INDEX].as_raw_fd(); - let phq = self.queue_events[PHQ_INDEX].as_raw_fd(); - let frq = self.queue_events[FRQ_INDEX].as_raw_fd(); + let ifq = self.queue_event(IFQ_INDEX).as_raw_fd(); + let dfq = self.queue_event(DFQ_INDEX).as_raw_fd(); + let stq = self.queue_event(STQ_INDEX).as_raw_fd(); + let phq = self.queue_event(PHQ_INDEX).as_raw_fd(); + let frq = self.queue_event(FRQ_INDEX).as_raw_fd(); let activate_evt = self.activate_evt.as_raw_fd(); if self.is_activated() { diff --git a/src/devices/src/virtio/balloon/mod.rs b/src/devices/src/virtio/balloon/mod.rs index e99880906..6859c79b8 100644 --- a/src/devices/src/virtio/balloon/mod.rs +++ b/src/devices/src/virtio/balloon/mod.rs @@ -5,9 +5,12 @@ pub use self::defs::uapi::VIRTIO_ID_BALLOON as TYPE_BALLOON; pub use self::device::Balloon; mod defs { + use super::super::QueueConfig; + pub const BALLOON_DEV_ID: &str = "virtio_balloon"; pub const NUM_QUEUES: usize = 5; - pub const QUEUE_SIZES: &[u16] = &[256; NUM_QUEUES]; + pub const QUEUE_SIZE: u16 = 256; + pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; pub mod uapi { pub const VIRTIO_F_VERSION_1: u32 = 32; diff --git a/src/devices/src/virtio/block/device.rs b/src/devices/src/virtio/block/device.rs index a7ac26fbc..14c95ac3c 100644 --- a/src/devices/src/virtio/block/device.rs +++ b/src/devices/src/virtio/block/device.rs @@ -31,8 +31,8 @@ use vm_memory::{ByteValued, GuestMemoryMmap}; use super::worker::BlockWorker; use super::{ - super::{ActivateResult, DeviceState, Queue, VirtioDevice, TYPE_BLOCK}, - Error, QUEUE_SIZES, SECTOR_SHIFT, SECTOR_SIZE, + super::{ActivateResult, DeviceQueue, DeviceState, QueueConfig, VirtioDevice, TYPE_BLOCK}, + Error, NUM_QUEUES, QUEUE_CONFIG, SECTOR_SHIFT, SECTOR_SIZE, }; use crate::virtio::{ @@ -216,8 +216,6 @@ pub struct Block { config: VirtioBlkConfig, // Transport related fields. - pub(crate) queues: Vec, - pub(crate) queue_evts: [EventFd; 1], pub(crate) device_state: DeviceState, // Implementation specific fields. @@ -302,10 +300,6 @@ impl Block { avail_features |= 1u64 << VIRTIO_BLK_F_RO; }; - let queue_evts = [EventFd::new(EFD_NONBLOCK)?]; - - let queues = QUEUE_SIZES.iter().map(|&s| Queue::new(s)).collect(); - let config = VirtioBlkConfig { capacity: disk_properties.nsectors(), size_max: 0, @@ -330,8 +324,6 @@ impl Block { disk_image_id, avail_features, acked_features: 0u64, - queue_evts, - queues, device_state: DeviceState::Inactive, worker_thread: None, worker_stopfd: EventFd::new(EFD_NONBLOCK)?, @@ -363,16 +355,8 @@ impl VirtioDevice for Block { "block" } - fn queues(&self) -> &[Queue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [Queue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_evts + fn queue_config(&self) -> &[QueueConfig] { + &QUEUE_CONFIG } fn avail_features(&self) -> u64 { @@ -409,13 +393,23 @@ impl VirtioDevice for Block { self.device_state.is_activated() } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { if self.worker_thread.is_some() { panic!("virtio_blk: worker thread already exists"); } + let [mut blk_q]: [_; NUM_QUEUES] = queues.try_into().map_err(|_| { + error!("Cannot perform activate. Expected {} queue(s)", NUM_QUEUES); + ActivateError::BadActivate + })?; + let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - self.queues[0].set_event_idx(event_idx); + blk_q.queue.set_event_idx(event_idx); let disk = match self.disk.take() { Some(d) => d, @@ -428,8 +422,7 @@ impl VirtioDevice for Block { }; let worker = BlockWorker::new( - self.queues[0].clone(), - self.queue_evts[0].try_clone().unwrap(), + blk_q, interrupt.clone(), mem.clone(), disk, diff --git a/src/devices/src/virtio/block/mod.rs b/src/devices/src/virtio/block/mod.rs index 69127e858..c842bf271 100644 --- a/src/devices/src/virtio/block/mod.rs +++ b/src/devices/src/virtio/block/mod.rs @@ -8,12 +8,14 @@ pub use self::device::{Block, CacheType}; use vm_memory::GuestMemoryError; +use super::QueueConfig; + pub const CONFIG_SPACE_SIZE: usize = 8; pub const SECTOR_SHIFT: u8 = 9; pub const SECTOR_SIZE: u64 = (0x01_u64) << SECTOR_SHIFT; -pub const QUEUE_SIZE: u16 = 256; +const QUEUE_SIZE: u16 = 256; pub const NUM_QUEUES: usize = 1; -pub const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE]; +pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE)]; #[derive(Debug)] pub enum Error { diff --git a/src/devices/src/virtio/block/worker.rs b/src/devices/src/virtio/block/worker.rs index 195a7d22b..ad91fc918 100644 --- a/src/devices/src/virtio/block/worker.rs +++ b/src/devices/src/virtio/block/worker.rs @@ -1,6 +1,6 @@ use crate::virtio::descriptor_utils::{Reader, Writer}; -use super::super::Queue; +use super::super::DeviceQueue; use super::device::{CacheType, DiskProperties}; use crate::virtio::InterruptTransport; @@ -56,8 +56,7 @@ pub struct DiscardWriteData { unsafe impl ByteValued for DiscardWriteData {} pub struct BlockWorker { - queue: Queue, - queue_evt: EventFd, + device_queue: DeviceQueue, interrupt: InterruptTransport, mem: GuestMemoryMmap, disk: DiskProperties, @@ -65,18 +64,15 @@ pub struct BlockWorker { } impl BlockWorker { - #[allow(clippy::too_many_arguments)] pub fn new( - queue: Queue, - queue_evt: EventFd, + device_queue: DeviceQueue, interrupt: InterruptTransport, mem: GuestMemoryMmap, disk: DiskProperties, stop_fd: EventFd, ) -> Self { Self { - queue, - queue_evt, + device_queue, interrupt, mem, disk, @@ -92,7 +88,7 @@ impl BlockWorker { } fn work(mut self) { - let virtq_ev_fd = self.queue_evt.as_raw_fd(); + let virtq_ev_fd = self.device_queue.event.as_raw_fd(); let stop_ev_fd = self.stop_fd.as_raw_fd(); let epoll = Epoll::new().unwrap(); @@ -141,7 +137,7 @@ impl BlockWorker { } fn process_queue_event(&mut self) { - if let Err(e) = self.queue_evt.read() { + if let Err(e) = self.device_queue.event.read() { error!("Failed to get queue event: {e:?}"); } else { self.process_virtio_queues(); @@ -152,18 +148,18 @@ impl BlockWorker { fn process_virtio_queues(&mut self) { let mem = self.mem.clone(); loop { - self.queue.disable_notification(&mem).unwrap(); + self.device_queue.queue.disable_notification(&mem).unwrap(); self.process_queue(&mem); - if !self.queue.enable_notification(&mem).unwrap() { + if !self.device_queue.queue.enable_notification(&mem).unwrap() { break; } } } fn process_queue(&mut self, mem: &GuestMemoryMmap) { - while let Some(head) = self.queue.pop(mem) { + while let Some(head) = self.device_queue.queue.pop(mem) { let mut reader = match Reader::new(mem, head.clone()) { Ok(r) => r, Err(e) => { @@ -199,11 +195,15 @@ impl BlockWorker { error!("Failed to write virtio block status: {e:?}") } - if let Err(e) = self.queue.add_used(mem, head.index, len as u32) { + if let Err(e) = self + .device_queue + .queue + .add_used(mem, head.index, len as u32) + { error!("failed to add used elements to the queue: {e:?}"); } - if self.queue.needs_notification(mem).unwrap() { + if self.device_queue.queue.needs_notification(mem).unwrap() { if let Err(e) = self.interrupt.try_signal_used_queue() { error!("error signalling queue: {e:?}"); } diff --git a/src/devices/src/virtio/console/device.rs b/src/devices/src/virtio/console/device.rs index 2e4896e9c..b1022a38e 100644 --- a/src/devices/src/virtio/console/device.rs +++ b/src/devices/src/virtio/console/device.rs @@ -9,7 +9,7 @@ use utils::eventfd::EventFd; use vm_memory::{ByteValued, Bytes, GuestMemoryMmap}; use super::super::{ - ActivateError, ActivateResult, ConsoleError, DeviceState, Queue as VirtQueue, VirtioDevice, + ActivateError, ActivateResult, DeviceQueue, DeviceState, QueueConfig, VirtioDevice, }; use super::{defs, defs::control_event, defs::uapi}; use crate::virtio::console::console_control::{ @@ -57,8 +57,11 @@ pub struct Console { pub(crate) control: Arc, pub(crate) ports: Vec, - pub(crate) queues: Vec, - pub(crate) queue_events: Vec, + queue_config: Vec, + // Queues are stored as Option so individual queues can be taken when ports start. + pub(crate) queues: Vec>, + // TODO: move the queue event handling to the correct threads! + pub(crate) queue_events: Vec>, pub(crate) avail_features: u64, pub(crate) acked_features: u64, @@ -74,13 +77,9 @@ impl Console { assert!(!ports.is_empty(), "Expected at least 1 port"); let num_queues = num_queues(ports.len()); - let queues = vec![VirtQueue::new(QUEUE_SIZE); num_queues]; - - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(ConsoleError::EventFd)?); - } + let queue_config: Vec = (0..num_queues) + .map(|_| QueueConfig::new(QUEUE_SIZE)) + .collect(); let ports: Vec = zip(0u32.., ports) .map(|(port_id, description)| Port::new(port_id, description)) @@ -95,14 +94,15 @@ impl Console { Ok(Console { control: ConsoleControl::new(), ports, - queues, - queue_events, + queue_config, + queues: Vec::new(), + queue_events: Vec::new(), avail_features: AVAIL_FEATURES, acked_features: 0, activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) - .map_err(ConsoleError::EventFd)?, + .map_err(super::ConsoleError::EventFd)?, sigwinch_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) - .map_err(ConsoleError::EventFd)?, + .map_err(super::ConsoleError::EventFd)?, device_state: DeviceState::Inactive, config, }) @@ -129,7 +129,11 @@ impl Console { }; let mut raise_irq = false; - while let Some(head) = self.queues[CONTROL_RXQ_INDEX].pop(mem) { + let control_rx = self.queues[CONTROL_RXQ_INDEX] + .as_mut() + .expect("control rx queue should exist"); + + while let Some(head) = control_rx.queue.pop(mem) { if let Some(buf) = self.control.queue_pop() { match mem.write(&buf, head.addr) { Ok(n) => { @@ -138,9 +142,7 @@ impl Console { } raise_irq = true; log::trace!("process_control_rx wrote {n}"); - if let Err(e) = - self.queues[CONTROL_RXQ_INDEX].add_used(mem, head.index, n as u32) - { + if let Err(e) = control_rx.queue.add_used(mem, head.index, n as u32) { error!("failed to add used elements to the queue: {e:?}"); } } @@ -149,7 +151,7 @@ impl Console { } } } else { - self.queues[CONTROL_RXQ_INDEX].undo_pop(); + control_rx.queue.undo_pop(); break; } } @@ -162,12 +164,14 @@ impl Console { unreachable!() }; - let tx_queue = &mut self.queues[CONTROL_TXQ_INDEX]; + let control_tx = self.queues[CONTROL_TXQ_INDEX] + .as_mut() + .expect("control tx queue should exist"); let mut raise_irq = false; let mut ports_to_start = Vec::new(); - while let Some(head) = tx_queue.pop(mem) { + while let Some(head) = control_tx.queue.pop(mem) { raise_irq = true; let cmd: VirtioConsoleControl = match mem.read_obj(head.addr) { @@ -181,7 +185,10 @@ impl Console { continue; } }; - if let Err(e) = tx_queue.add_used(mem, head.index, size_of_val(&cmd) as u32) { + if let Err(e) = control_tx + .queue + .add_used(mem, head.index, size_of_val(&cmd) as u32) + { error!("failed to add used elements to the queue: {e:?}"); } @@ -247,10 +254,23 @@ impl Console { for port_id in ports_to_start { log::trace!("Starting port io for port {port_id}"); + let rx_idx = port_id_to_queue_idx(QueueDirection::Rx, port_id); + let tx_idx = port_id_to_queue_idx(QueueDirection::Tx, port_id); + + // Take ownership of port queues - they are moved to the port. + let rx_queue = self.queues[rx_idx] + .take() + .expect("port rx queue should exist") + .queue; + let tx_queue = self.queues[tx_idx] + .take() + .expect("port tx queue should exist") + .queue; + self.ports[port_id].start( mem.clone(), - self.queues[port_id_to_queue_idx(QueueDirection::Rx, port_id)].clone(), - self.queues[port_id_to_queue_idx(QueueDirection::Tx, port_id)].clone(), + rx_queue, + tx_queue, interrupt.clone(), self.control.clone(), ); @@ -281,16 +301,8 @@ impl VirtioDevice for Console { "console" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &self.queue_config } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -315,11 +327,19 @@ impl VirtioDevice for Console { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { if self.activate_evt.write(1).is_err() { error!("Cannot write to activate_evt"); return Err(ActivateError::BadActivate); } + + self.queue_events = queues.iter().map(|dq| dq.event.clone()).collect(); + self.queues = queues.into_iter().map(Some).collect(); self.device_state = DeviceState::Activated(mem, interrupt); Ok(()) @@ -330,14 +350,13 @@ impl VirtioDevice for Console { } fn reset(&mut self) -> bool { - // Strictly speaking, we should also unsubscribe the queue - // events, resubscribe the activate eventfd and deactivate - // the device, but we don't support any scenario in which - // neither GuestMemory nor the queue events would change, - // so let's avoid doing any unnecessary work. + // Shutdown ports and clear queues. for port in &mut self.ports { port.shutdown(); } + self.queues.clear(); + self.queue_events.clear(); + self.device_state = DeviceState::Inactive; true } } diff --git a/src/devices/src/virtio/device.rs b/src/devices/src/virtio/device.rs index 355b62a95..bb2c669d6 100644 --- a/src/devices/src/virtio/device.rs +++ b/src/devices/src/virtio/device.rs @@ -5,11 +5,41 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. +use std::sync::Arc; + use super::{ActivateResult, InterruptTransport, Queue}; use crate::virtio::AsAny; use utils::eventfd::EventFd; use vm_memory::GuestMemoryMmap; +/// Configuration for a single virtqueue. +/// This is used by devices to declare their queue requirements, +/// and by the transport to construct the actual queues. +#[derive(Clone, Copy, Debug)] +pub struct QueueConfig { + /// Maximum size of the queue. + pub size: u16, +} + +impl QueueConfig { + pub const fn new(size: u16) -> Self { + Self { size } + } +} + +/// A virtqueue combined with its notification eventfd. +/// This is passed to devices during activation. +pub struct DeviceQueue { + pub queue: Queue, + pub event: Arc, +} + +impl DeviceQueue { + pub fn new(queue: Queue, event: Arc) -> Self { + Self { queue, event } + } +} + /// Enum that indicates if a VirtioDevice is inactive or has been activated /// and memory attached to it. pub enum DeviceState { @@ -44,8 +74,9 @@ pub struct VirtioShmRegion { /// Trait for virtio devices to be driven by a virtio transport. /// /// The lifecycle of a virtio device is to be moved to a virtio transport, which will then query the -/// device. The virtio devices needs to create queues, events and event fds for interrupts and expose -/// them to the transport via get_queues/get_queue_events +/// device. The transport constructs queues based on queue_config() and passes them to the device +/// during activation, transferring ownership. After reset, the transport recreates queues +/// from queue_config() for the next negotiation cycle. pub trait VirtioDevice: AsAny + Send { /// Get the available features offered by device. fn avail_features(&self) -> u64; @@ -64,14 +95,9 @@ pub trait VirtioDevice: AsAny + Send { /// Device name used for logging information about the device at the transport layer fn device_name(&self) -> &str; - /// Returns the device queues. - fn queues(&self) -> &[Queue]; - - /// Returns a mutable reference to the device queues. - fn queues_mut(&mut self) -> &mut [Queue]; - - /// Returns the device queues event fds. - fn queue_events(&self) -> &[EventFd]; + /// Returns the queue configuration for this device. + /// The transport uses this to construct the queues during initialization and after reset. + fn queue_config(&self) -> &[QueueConfig]; /// The set of feature bits shifted by `page * 32`. fn avail_features_by_page(&self, page: u32) -> u32 { @@ -117,13 +143,19 @@ pub trait VirtioDevice: AsAny + Send { fn write_config(&mut self, offset: u64, data: &[u8]); /// Performs the formal activation for a device, which can be verified also with `is_activated`. - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult; + /// Ownership of the queues is transferred to the device. + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult; /// Checks if the resources of this device are activated. fn is_activated(&self) -> bool; - /// Optionally deactivates this device and returns ownership of the guest memory map, interrupt - /// event, and queue events. + /// Optionally deactivates this device. The device should drop its queues. + /// After reset, the transport will recreate queues from queue_config(). fn reset(&mut self) -> bool { false } diff --git a/src/devices/src/virtio/fs/device.rs b/src/devices/src/virtio/fs/device.rs index 49fac4548..07a766245 100644 --- a/src/devices/src/virtio/fs/device.rs +++ b/src/devices/src/virtio/fs/device.rs @@ -13,7 +13,7 @@ use virtio_bindings::{virtio_config::VIRTIO_F_VERSION_1, virtio_ring::VIRTIO_RIN use vm_memory::{ByteValued, GuestMemoryMmap}; use super::super::{ - ActivateResult, DeviceState, FsError, Queue as VirtQueue, VirtioDevice, VirtioShmRegion, + ActivateResult, DeviceQueue, DeviceState, FsError, QueueConfig, VirtioDevice, VirtioShmRegion, }; use super::passthrough; use super::worker::FsWorker; @@ -40,8 +40,6 @@ impl Default for VirtioFsConfig { unsafe impl ByteValued for VirtioFsConfig {} pub struct Fs { - queues: Vec, - queue_events: Vec, avail_features: u64, acked_features: u64, device_state: DeviceState, @@ -56,18 +54,7 @@ pub struct Fs { } impl Fs { - pub(crate) fn with_queues( - fs_id: String, - shared_dir: String, - exit_code: Arc, - queues: Vec, - ) -> super::Result { - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(FsError::EventFd)?); - } - + pub fn new(fs_id: String, shared_dir: String, exit_code: Arc) -> super::Result { let avail_features = (1u64 << VIRTIO_F_VERSION_1) | (1u64 << VIRTIO_RING_F_EVENT_IDX); let tag = fs_id.into_bytes(); @@ -81,8 +68,6 @@ impl Fs { }; Ok(Fs { - queues, - queue_events, avail_features, acked_features: 0, device_state: DeviceState::Inactive, @@ -97,14 +82,6 @@ impl Fs { }) } - pub fn new(fs_id: String, shared_dir: String, exit_code: Arc) -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues(fs_id, shared_dir, exit_code, queues) - } - pub fn id(&self) -> &str { defs::FS_DEV_ID } @@ -149,16 +126,8 @@ impl VirtioDevice for Fs { "fs" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &defs::QUEUE_CONFIG } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -183,22 +152,29 @@ impl VirtioDevice for Fs { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { if self.worker_thread.is_some() { panic!("virtio_fs: worker thread already exists"); } let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - self.queues[defs::HPQ_INDEX].set_event_idx(event_idx); - self.queues[defs::REQ_INDEX].set_event_idx(event_idx); - - let queue_evts = self - .queue_events - .iter() - .map(|e| e.try_clone().unwrap()) - .collect(); + + // Extract queues and eventfds from DeviceQueues. + let mut worker_queues = Vec::with_capacity(queues.len()); + let mut queue_evts = Vec::with_capacity(queues.len()); + for mut dq in queues { + dq.queue.set_event_idx(event_idx); + worker_queues.push(dq.queue); + queue_evts.push(dq.event); + } + let worker = FsWorker::new( - self.queues.clone(), + worker_queues, queue_evts, interrupt.clone(), mem.clone(), diff --git a/src/devices/src/virtio/fs/mod.rs b/src/devices/src/virtio/fs/mod.rs index ea475a5c1..dfd0be651 100644 --- a/src/devices/src/virtio/fs/mod.rs +++ b/src/devices/src/virtio/fs/mod.rs @@ -28,9 +28,12 @@ pub use self::device::Fs; pub use self::filesystem::ExportTable; mod defs { + use super::super::QueueConfig; + pub const FS_DEV_ID: &str = "virtio_fs"; pub const NUM_QUEUES: usize = 2; - pub const QUEUE_SIZES: &[u16] = &[1024; NUM_QUEUES]; + pub const QUEUE_SIZE: u16 = 1024; + pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; // High priority queue. pub const HPQ_INDEX: usize = 0; // Request queue. diff --git a/src/devices/src/virtio/fs/worker.rs b/src/devices/src/virtio/fs/worker.rs index 8ae8eb6c4..a5d2d377f 100644 --- a/src/devices/src/virtio/fs/worker.rs +++ b/src/devices/src/virtio/fs/worker.rs @@ -21,7 +21,7 @@ use crate::virtio::{InterruptTransport, VirtioShmRegion}; pub struct FsWorker { queues: Vec, - queue_evts: Vec, + queue_evts: Vec>, interrupt: InterruptTransport, mem: GuestMemoryMmap, shm_region: Option, @@ -36,7 +36,7 @@ impl FsWorker { #[allow(clippy::too_many_arguments)] pub fn new( queues: Vec, - queue_evts: Vec, + queue_evts: Vec>, interrupt: InterruptTransport, mem: GuestMemoryMmap, shm_region: Option, diff --git a/src/devices/src/virtio/gpu/device.rs b/src/devices/src/virtio/gpu/device.rs index 272f8a369..097197139 100644 --- a/src/devices/src/virtio/gpu/device.rs +++ b/src/devices/src/virtio/gpu/device.rs @@ -6,8 +6,8 @@ use utils::eventfd::EventFd; use vm_memory::{ByteValued, GuestMemoryMmap}; use super::super::{ - fs::ExportTable, ActivateError, ActivateResult, DeviceState, GpuError, Queue as VirtQueue, - VirtioDevice, VirtioShmRegion, + fs::ExportTable, ActivateError, ActivateResult, DeviceQueue, DeviceState, GpuError, + Queue as VirtQueue, QueueConfig, VirtioDevice, VirtioShmRegion, }; use super::defs; use super::defs::uapi; @@ -19,11 +19,6 @@ use krun_display::DisplayBackend; #[cfg(target_os = "macos")] use utils::worker_message::WorkerMessage; -// Control queue. -pub(crate) const CTL_INDEX: usize = 0; -// Cursor queue. -pub(crate) const CUR_INDEX: usize = 1; - // Supported features. pub(crate) const AVAIL_FEATURES: u64 = (1u64 << uapi::VIRTIO_F_VERSION_1) | (1u64 << uapi::VIRTIO_GPU_F_VIRGL) @@ -32,11 +27,15 @@ pub(crate) const AVAIL_FEATURES: u64 = (1u64 << uapi::VIRTIO_F_VERSION_1) | (1u64 << uapi::VIRTIO_GPU_F_RESOURCE_BLOB) | (1u64 << uapi::VIRTIO_GPU_F_CONTEXT_INIT); +const QUEUE_SIZE: u16 = 256; +static QUEUE_CONFIG: [QueueConfig; defs::NUM_QUEUES] = + [QueueConfig::new(QUEUE_SIZE); defs::NUM_QUEUES]; + pub struct Gpu { - pub(crate) queue_ctl: Arc>, - pub(crate) queue_cur: Arc>, - pub(crate) queues: Vec, - pub(crate) queue_events: Vec, + pub(crate) queue_ctl: Option>>, + // Queue events stored for event handler - these are shared with DeviceQueue + pub(crate) ctl_event: Option>, + pub(crate) cur_event: Option>, pub(crate) avail_features: u64, pub(crate) acked_features: u64, pub(crate) activate_evt: EventFd, @@ -52,27 +51,16 @@ pub struct Gpu { } impl Gpu { - pub(crate) fn with_queues( - queues: Vec, + pub fn new( virgl_flags: u32, displays: Box<[DisplayInfo]>, display_backend: DisplayBackend<'static>, #[cfg(target_os = "macos")] map_sender: Sender, ) -> super::Result { - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(GpuError::EventFd)?); - } - - let queue_ctl = Arc::new(Mutex::new(queues[CTL_INDEX].clone())); - let queue_cur = Arc::new(Mutex::new(queues[CUR_INDEX].clone())); - Ok(Gpu { - queue_ctl, - queue_cur, - queues, - queue_events, + queue_ctl: None, + ctl_event: None, + cur_event: None, avail_features: AVAIL_FEATURES, acked_features: 0, activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(GpuError::EventFd)?, @@ -88,26 +76,6 @@ impl Gpu { }) } - pub fn new( - virgl_flags: u32, - displays: Box<[DisplayInfo]>, - display_backend: DisplayBackend<'static>, - #[cfg(target_os = "macos")] map_sender: Sender, - ) -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues( - queues, - virgl_flags, - displays, - display_backend, - #[cfg(target_os = "macos")] - map_sender, - ) - } - pub fn id(&self) -> &str { defs::GPU_DEV_ID } @@ -198,16 +166,8 @@ impl VirtioDevice for Gpu { "gpu" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &QUEUE_CONFIG } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -239,29 +199,36 @@ impl VirtioDevice for Gpu { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { - if self.queues.len() != defs::NUM_QUEUES { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { + let [control_q, cursor_q]: [_; defs::NUM_QUEUES] = queues.try_into().map_err(|_| { error!( - "Cannot perform activate. Expected {} queue(s), got {}", - defs::NUM_QUEUES, - self.queues.len() + "Cannot perform activate. Expected {} queue(s)", + defs::NUM_QUEUES ); - return Err(ActivateError::BadActivate); - } + ActivateError::BadActivate + })?; let shm_region = match self.shm_region.as_ref() { Some(s) => s.clone(), None => panic!("virtio_gpu: missing SHM region"), }; - self.queue_ctl = Arc::new(Mutex::new(self.queues[CTL_INDEX].clone())); - self.queue_cur = Arc::new(Mutex::new(self.queues[CUR_INDEX].clone())); + self.ctl_event = Some(control_q.event.clone()); + self.cur_event = Some(cursor_q.event.clone()); + let queue_ctl = Arc::new(Mutex::new(control_q.queue)); + self.queue_ctl = Some(queue_ctl.clone()); + // cursor queue not used by worker let (sender, receiver) = unbounded(); let worker = Worker::new( receiver, mem.clone(), - self.queue_ctl.clone(), + queue_ctl, interrupt.clone(), shm_region, self.virgl_flags, diff --git a/src/devices/src/virtio/gpu/event_handler.rs b/src/devices/src/virtio/gpu/event_handler.rs index daeae9fea..f11ba33bb 100644 --- a/src/devices/src/virtio/gpu/event_handler.rs +++ b/src/devices/src/virtio/gpu/event_handler.rs @@ -3,7 +3,7 @@ use std::os::unix::io::AsRawFd; use polly::event_manager::{EventManager, Subscriber}; use utils::epoll::{EpollEvent, EventSet}; -use super::device::{Gpu, CTL_INDEX, CUR_INDEX}; +use super::device::Gpu; use crate::virtio::device::VirtioDevice; impl Gpu { @@ -16,10 +16,11 @@ impl Gpu { return; } - if let Err(e) = self.queue_events[CTL_INDEX].read() { + let ctl_event = self.ctl_event.as_ref().unwrap(); + if let Err(e) = ctl_event.read() { error!("Failed to read request queue event: {e:?}"); - } else if let Err(e) = self.sender.as_ref().unwrap().send(CTL_INDEX as u64) { - error!("Failed to signal worker for queue {CTL_INDEX}: {e:?}"); + } else if let Err(e) = self.sender.as_ref().unwrap().send(0) { + error!("Failed to signal worker for ctl queue: {e:?}"); } } @@ -32,10 +33,11 @@ impl Gpu { return; } - if let Err(e) = self.queue_events[CUR_INDEX].read() { + let cur_event = self.cur_event.as_ref().unwrap(); + if let Err(e) = cur_event.read() { error!("Failed to read request queue event: {e:?}"); - } else if let Err(e) = self.sender.as_ref().unwrap().send(CUR_INDEX as u64) { - error!("Failed to signal worker for queue {CUR_INDEX}: {e:?}"); + } else if let Err(e) = self.sender.as_ref().unwrap().send(1) { + error!("Failed to signal worker for cur queue: {e:?}"); } } @@ -51,13 +53,13 @@ impl Gpu { .subscriber(self.activate_evt.as_raw_fd()) .unwrap(); + let ctl_event = self.ctl_event.as_ref().unwrap(); + let cur_event = self.cur_event.as_ref().unwrap(); + event_manager .register( - self.queue_events[CTL_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[CTL_INDEX].as_raw_fd() as u64, - ), + ctl_event.as_raw_fd(), + EpollEvent::new(EventSet::IN, ctl_event.as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -66,11 +68,8 @@ impl Gpu { event_manager .register( - self.queue_events[CUR_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[CUR_INDEX].as_raw_fd() as u64, - ), + cur_event.as_raw_fd(), + EpollEvent::new(EventSet::IN, cur_event.as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -88,11 +87,11 @@ impl Gpu { impl Subscriber for Gpu { fn process(&mut self, event: &EpollEvent, event_manager: &mut EventManager) { let source = event.fd(); - let ctl = self.queue_events[CTL_INDEX].as_raw_fd(); - let cur = self.queue_events[CUR_INDEX].as_raw_fd(); let activate_evt = self.activate_evt.as_raw_fd(); if self.is_activated() { + let ctl = self.ctl_event.as_ref().unwrap().as_raw_fd(); + let cur = self.cur_event.as_ref().unwrap().as_raw_fd(); match source { _ if source == ctl => self.handle_ctl_event(event), _ if source == cur => self.handle_cur_event(event), diff --git a/src/devices/src/virtio/gpu/mod.rs b/src/devices/src/virtio/gpu/mod.rs index 25314afc1..143d7a67e 100644 --- a/src/devices/src/virtio/gpu/mod.rs +++ b/src/devices/src/virtio/gpu/mod.rs @@ -14,7 +14,6 @@ pub use self::device::Gpu; mod defs { pub const GPU_DEV_ID: &str = "virtio_gpu"; pub const NUM_QUEUES: usize = 2; - pub const QUEUE_SIZES: &[u16] = &[256; NUM_QUEUES]; #[allow(dead_code)] pub mod uapi { diff --git a/src/devices/src/virtio/input/device.rs b/src/devices/src/virtio/input/device.rs index 261bd9c69..c62a1c6e8 100644 --- a/src/devices/src/virtio/input/device.rs +++ b/src/devices/src/virtio/input/device.rs @@ -6,12 +6,14 @@ use log::{debug, error}; use utils::eventfd::{EventFd, EFD_NONBLOCK}; use vm_memory::GuestMemoryMmap; -use super::super::{ActivateError, ActivateResult, DeviceState, Queue as VirtQueue, VirtioDevice}; +use super::super::{ + ActivateError, ActivateResult, DeviceQueue, DeviceState, QueueConfig, VirtioDevice, +}; use super::worker::InputWorker; use super::{defs, defs::uapi, InputError}; +use crate::virtio::input::defs::config_select; use crate::virtio::input::defs::config_select::VIRTIO_INPUT_CFG_UNSET; -use crate::virtio::input::defs::{config_select, EVENTQ_IDX, STATUSQ_IDX}; use crate::virtio::InterruptTransport; use krun_input::{ InputAbsInfo, InputConfigBackend, InputConfigInstance, InputDeviceIds, @@ -118,8 +120,6 @@ union ConfigPayload { /// VirtIO Input device state pub struct Input { - queues: Vec, - queue_events: Vec, avail_features: u64, acked_features: u64, device_state: DeviceState, @@ -132,20 +132,11 @@ pub struct Input { } impl Input { - pub(crate) fn with_queues( - queues: Vec, + pub fn new( config_backend: InputConfigBackend<'static>, events_backend: InputEventProviderBackend<'static>, ) -> super::Result { - debug!("input: with_queues"); - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events.push(EventFd::new(EFD_NONBLOCK).map_err(InputError::EventFd)?); - } - Ok(Input { - queues, - queue_events, avail_features: AVAIL_FEATURES, acked_features: 0, event_provider_backend: events_backend, @@ -157,17 +148,6 @@ impl Input { }) } - pub fn new( - config_backend: InputConfigBackend<'static>, - events_backend: InputEventProviderBackend<'static>, - ) -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues(queues, config_backend, events_backend) - } - pub fn id(&self) -> &str { defs::INPUT_DEV_ID } @@ -196,16 +176,8 @@ impl VirtioDevice for Input { "input" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &defs::QUEUE_CONFIG } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -242,21 +214,23 @@ impl VirtioDevice for Input { .update_select(&self.config_instance, select, subsel); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { - if self.queues.len() != defs::NUM_QUEUES { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { + let [event_q, status_q]: [_; defs::NUM_QUEUES] = queues.try_into().map_err(|_| { error!( - "Cannot perform activate. Expected {} queue(s), got {}", - defs::NUM_QUEUES, - self.queues.len() + "Cannot perform activate. Expected {} queue(s)", + defs::NUM_QUEUES ); - return Err(ActivateError::BadActivate); - } + ActivateError::BadActivate + })?; let worker = InputWorker::new( - self.queues[EVENTQ_IDX].clone(), - self.queue_events[EVENTQ_IDX].try_clone().unwrap(), - self.queues[STATUSQ_IDX].clone(), - self.queue_events[STATUSQ_IDX].try_clone().unwrap(), + event_q, + status_q, interrupt.clone(), mem.clone(), self.event_provider_backend, diff --git a/src/devices/src/virtio/input/mod.rs b/src/devices/src/virtio/input/mod.rs index 1b1af3f06..8f0571a6d 100644 --- a/src/devices/src/virtio/input/mod.rs +++ b/src/devices/src/virtio/input/mod.rs @@ -5,12 +5,16 @@ mod worker; pub use self::defs::uapi::VIRTIO_ID_INPUT as TYPE_INPUT; pub use self::device::Input; +use super::QueueConfig; + mod defs { + use super::QueueConfig; + pub const INPUT_DEV_ID: &str = "virtio_input"; pub const NUM_QUEUES: usize = 2; - pub const EVENTQ_IDX: usize = 0; // Event queue (device -> guest) - pub const STATUSQ_IDX: usize = 1; // Status queue (guest -> device) - pub const QUEUE_SIZES: &[u16] = &[256; NUM_QUEUES]; + + const QUEUE_SIZE: u16 = 256; + pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; pub mod uapi { pub const VIRTIO_F_VERSION_1: u32 = 32; diff --git a/src/devices/src/virtio/input/worker.rs b/src/devices/src/virtio/input/worker.rs index 522e1a90e..bcde12ea6 100644 --- a/src/devices/src/virtio/input/worker.rs +++ b/src/devices/src/virtio/input/worker.rs @@ -8,7 +8,7 @@ use utils::eventfd::EventFd; use virtio_bindings::virtio_input; use vm_memory::{ByteValued, GuestMemoryMmap}; -use super::super::Queue; +use super::super::DeviceQueue; use crate::virtio::descriptor_utils::{Reader, Writer}; use crate::virtio::InterruptTransport; use krun_input::{InputEventProviderBackend, InputEventProviderInstance, InputEventsImpl}; @@ -25,33 +25,26 @@ struct VirtioInputEvent { unsafe impl ByteValued for VirtioInputEvent {} pub struct InputWorker { - event_queue: Queue, // Device -> Guest events - status_queue: Queue, // Guest -> Device events + event_q: DeviceQueue, // Device -> Guest events + status_q: DeviceQueue, // Guest -> Device events interrupt: InterruptTransport, mem: GuestMemoryMmap, backend_wrapper: InputEventProviderBackend<'static>, stop_fd: EventFd, - pub event_queue_efd: EventFd, - pub status_queue_efd: EventFd, } impl InputWorker { - #[allow(clippy::too_many_arguments)] pub fn new( - event_queue: Queue, - event_queue_efd: EventFd, - status_queue: Queue, - status_queue_efd: EventFd, + event_q: DeviceQueue, + status_q: DeviceQueue, interrupt: InterruptTransport, mem: GuestMemoryMmap, backend: InputEventProviderBackend<'static>, stop_fd: EventFd, ) -> Self { Self { - event_queue, - event_queue_efd, - status_queue, - status_queue_efd, + event_q, + status_q, interrupt, mem, backend_wrapper: backend, @@ -103,14 +96,14 @@ impl InputWorker { epoll .ctl( ControlOperation::Add, - self.event_queue_efd.as_raw_fd(), + self.event_q.event.as_raw_fd(), &EpollEvent::new(EventSet::IN, EVENTQ), ) .expect("Failed to add ready fd to epoll"); epoll .ctl( ControlOperation::Add, - self.status_queue_efd.as_raw_fd(), + self.status_q.event.as_raw_fd(), &EpollEvent::new(EventSet::IN, STATUSQ), ) .expect("Failed to add ready fd to epoll"); @@ -142,12 +135,12 @@ impl InputWorker { needs_interrupt |= self.process_event_queue(&mut events_instance); } EVENTQ => { - self.event_queue_efd.read().unwrap(); + self.event_q.event.read().unwrap(); trace!("EVENTQ"); needs_interrupt |= self.process_event_queue(&mut events_instance); } STATUSQ => { - self.status_queue_efd.read().unwrap(); + self.status_q.event.read().unwrap(); needs_interrupt |= self.process_status_queue(); } QUIT => { @@ -208,7 +201,7 @@ impl InputWorker { let mut needs_interrupt = false; let mem = self.mem.clone(); - while let Some(desc_chain) = self.event_queue.pop(&mem) { + while let Some(desc_chain) = self.event_q.queue.pop(&mem) { let mut writer = match Writer::new(&mem, desc_chain.clone()) { Ok(w) => w, Err(e) => { @@ -222,14 +215,15 @@ impl InputWorker { .unwrap(); if bytes_written != 0 { - self.event_queue + self.event_q + .queue .add_used(&mem, desc_chain.index, bytes_written as u32) .expect("TODO"); needs_interrupt = true; } if bytes_written == 0 { - self.event_queue.undo_pop(); + self.event_q.queue.undo_pop(); break; } @@ -258,7 +252,7 @@ impl InputWorker { let mut needs_interrupt = false; let mem = self.mem.clone(); - while let Some(desc_chain) = self.status_queue.pop(&mem) { + while let Some(desc_chain) = self.status_q.queue.pop(&mem) { let mut reader = match Reader::new(&mem, desc_chain.clone()) { Ok(r) => r, Err(e) => { @@ -268,7 +262,8 @@ impl InputWorker { }; match self.read_status_virtqueue(&mut reader) { Ok(bytes_read) => { - self.status_queue + self.status_q + .queue .add_used(&mem, desc_chain.index, bytes_read as u32) .unwrap(); } diff --git a/src/devices/src/virtio/mmio.rs b/src/devices/src/virtio/mmio.rs index 237d762a9..234d17601 100644 --- a/src/devices/src/virtio/mmio.rs +++ b/src/devices/src/virtio/mmio.rs @@ -5,12 +5,14 @@ // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. -use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; +use utils::eventfd::EFD_NONBLOCK; +use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; + use super::device_status; use super::*; use crate::bus::BusDevice; @@ -66,7 +68,14 @@ pub struct MmioTransport { pub(crate) device_status: u32, pub(crate) config_generation: u32, mem: GuestMemoryMmap, - queue_evts: HashMap, + // Queues owned by the transport during negotiation. + // These are moved to the device on activation. + queues: Option>, + // Queue eventfds - kept by transport to send notifications. + // Arc clones are passed to the device on activation. + queue_evts: Vec>, + // Stored queue config from device for recreating queues after reset. + queue_config: Vec, shm_region_select: u32, interrupt: InterruptTransport, } @@ -160,16 +169,16 @@ impl MmioTransport { intc: IrqChip, device: Arc>, ) -> Result { - let debug_log_target = format!( - "{}[{}]", - module_path!(), - device - .try_lock() - .expect( - "Mutex of VirtioDevice should not be locked when calling MmioTransport::new" - ) - .device_name() - ); + let locked = device + .try_lock() + .expect("Mutex of VirtioDevice should not be locked when calling MmioTransport::new"); + + let debug_log_target = format!("{}[{}]", module_path!(), locked.device_name()); + let queue_config: Vec = locked.queue_config().to_vec(); + drop(locked); + + let queues = Self::create_queues(&queue_config); + let queue_evts = Self::create_queue_evts(queue_config.len())?; Ok(MmioTransport { interrupt: InterruptTransport::new(intc, debug_log_target)?, @@ -180,11 +189,30 @@ impl MmioTransport { device_status: device_status::INIT, config_generation: 0, mem, - queue_evts: HashMap::new(), + queues: Some(queues), + queue_evts, + queue_config, shm_region_select: 0, }) } + /// Create queues from queue configuration. + fn create_queues(queue_config: &[QueueConfig]) -> Vec { + queue_config.iter().map(|c| Queue::new(c.size)).collect() + } + + /// Create eventfds for queue notifications. + fn create_queue_evts(count: usize) -> Result>, CreateMmioTransportError> { + let mut queue_evts = Vec::with_capacity(count); + for _ in 0..count { + queue_evts.push(Arc::new( + EventFd::new(EFD_NONBLOCK) + .map_err(CreateMmioTransportError::CreateInterruptEventFd)?, + )); + } + Ok(queue_evts) + } + /// Set the irq line for the device. /// NOTE: Can only be called when the device is not activated pub fn set_irq_line(&mut self, irq_line: u32) { @@ -204,8 +232,10 @@ impl MmioTransport { self.device.clone() } - pub fn register_queue_evt(&mut self, queue_evt: EventFd, id: u32) { - self.queue_evts.insert(id, queue_evt); + /// Returns a reference to the queue eventfds. Used by the VMM to register + /// queue notifications with KVM. + pub fn queue_evts(&self) -> &[Arc] { + &self.queue_evts } fn check_device_status(&self, set: u32, clr: u32) -> bool { @@ -216,31 +246,32 @@ impl MmioTransport { where F: FnOnce(&Queue) -> U, { - match self - .locked_device() - .queues() - .get(self.queue_select as usize) - { - Some(queue) => f(queue), + match &self.queues { + Some(queues) => match queues.get(self.queue_select as usize) { + Some(queue) => f(queue), + None => d, + }, None => d, } } fn with_queue_mut(&mut self, f: F) -> bool { - if let Some(queue) = self - .locked_device() - .queues_mut() - .get_mut(self.queue_select as usize) - { - f(queue); - true - } else { - false + match &mut self.queues { + Some(queues) => { + if let Some(queue) = queues.get_mut(self.queue_select as usize) { + f(queue); + true + } else { + false + } + } + None => false, } } fn update_queue_field(&mut self, f: F) { if self.check_device_status(device_status::FEATURES_OK, device_status::FAILED) { + // FIXME: check if activated! self.with_queue_mut(f); } else { warn!( @@ -259,13 +290,29 @@ impl MmioTransport { self.queue_select = 0; self.interrupt.0.status.store(0, Ordering::SeqCst); self.device_status = device_status::INIT; - // . Keep interrupt_evt and queue_evts as is. There may be pending - // notifications in those eventfds, but nothing will happen other - // than supurious wakeups. + // Do not reset config_generation and keep it monotonically increasing. + // Recreate queues from queue_config for the next negotiation cycle. + // Keep queue_evts as is - they are reused across reset cycles. + // TODO: consider resting the events when we refactor event handling + self.queues = Some(Self::create_queues(&self.queue_config)); // . Do not reset config_generation and keep it monotonically increasing - for queue in self.locked_device().queues_mut() { - *queue = Queue::new(queue.get_max_size()); - } + } + + fn activate(&mut self) { + let Some(queues) = self.queues.take() else { + return; + }; + + let mut device_queues: Vec = queues + .into_iter() + .zip(self.queue_evts.iter().cloned()) + .map(|(queue, event)| DeviceQueue::new(queue, event)) + .collect(); + + let mut locked_device = self.locked_device(); + locked_device + .activate(self.mem.clone(), self.interrupt.clone(), device_queues) + .expect("Failed to activate device"); } /// Update device status according to the state machine defined by VirtIO Spec 1.0. @@ -293,9 +340,7 @@ impl MmioTransport { self.device_status = status; let device_activated = self.locked_device().is_activated(); if !device_activated { - self.locked_device() - .activate(self.mem.clone(), self.interrupt.clone()) - .expect("Failed to activate device"); + self.activate(); } } _ if (status & FAILED) != 0 => { @@ -418,8 +463,11 @@ impl BusDevice for MmioTransport { 0x38 => self.update_queue_field(|q| q.size = v as u16), 0x44 => self.update_queue_field(|q| q.ready = v == 1), 0x50 => { - if let Some(eventfd) = self.queue_evts.get(&v) { - eventfd.write(v as u64).unwrap(); + // Queue notification - write to the eventfd for the specified queue. + if let Some(eventfd) = self.queue_evts.get(v as usize) { + eventfd.write(1).unwrap(); + } else { + warn!("invalid queue index for notification: {v}"); } } 0x64 => { @@ -477,14 +525,12 @@ pub(crate) mod tests { use super::*; use crate::legacy::DummyIrqChip; - use utils::eventfd::EventFd; use vm_memory::GuestMemoryMmap; pub(crate) struct DummyDevice { acked_features: u64, avail_features: u64, - queue_evts: Vec, - queues: Vec, + queue_config: Vec, device_activated: bool, config_bytes: [u8; 0xeff], } @@ -494,11 +540,7 @@ pub(crate) mod tests { DummyDevice { acked_features: 0, avail_features: 0, - queue_evts: vec![ - EventFd::new(utils::eventfd::EFD_NONBLOCK).unwrap(), - EventFd::new(utils::eventfd::EFD_NONBLOCK).unwrap(), - ], - queues: vec![Queue::new(16), Queue::new(32)], + queue_config: vec![QueueConfig::new(16), QueueConfig::new(32)], device_activated: false, config_bytes: [0; 0xeff], } @@ -540,27 +582,20 @@ pub(crate) mod tests { self.acked_features = acked_features; } + fn queue_config(&self) -> &[QueueConfig] { + &self.queue_config + } + fn activate( &mut self, _mem: GuestMemoryMmap, _interrupt: InterruptTransport, + _queues: Vec, ) -> ActivateResult { self.device_activated = true; Ok(()) } - fn queues(&self) -> &[Queue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [Queue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_evts - } - fn is_activated(&self) -> bool { self.device_activated } @@ -582,17 +617,18 @@ pub(crate) mod tests { // We just make sure here that the implementation of a mmio device behaves as we expect, // given a known virtio device implementation (the dummy device). - assert_eq!(d.locked_device().queue_events().len(), 2); + // Transport now owns the queue_evts. + assert_eq!(d.queue_evts().len(), 2); d.queue_select = 0; assert_eq!(d.with_queue(0, Queue::get_max_size), 16); assert!(d.with_queue_mut(|q| q.size = 16)); - assert_eq!(d.locked_device().queues()[d.queue_select as usize].size, 16); + assert_eq!(d.queues.as_ref().unwrap()[d.queue_select as usize].size, 16); d.queue_select = 1; assert_eq!(d.with_queue(0, Queue::get_max_size), 32); assert!(d.with_queue_mut(|q| q.size = 16)); - assert_eq!(d.locked_device().queues()[d.queue_select as usize].size, 16); + assert_eq!(d.queues.as_ref().unwrap()[d.queue_select as usize].size, 16); d.queue_select = 2; assert_eq!(d.with_queue(0, Queue::get_max_size), 0); @@ -764,42 +800,42 @@ pub(crate) mod tests { assert_eq!(d.queue_select, 3); d.queue_select = 0; - assert_eq!(d.locked_device().queues()[0].size, 0); + assert_eq!(d.queues.as_ref().unwrap()[0].size, 0); write_le_u32(&mut buf[..], 16); d.write(0, 0x38, &buf[..]); - assert_eq!(d.locked_device().queues()[0].size, 16); + assert_eq!(d.queues.as_ref().unwrap()[0].size, 16); - assert!(!d.locked_device().queues()[0].ready); + assert!(!d.queues.as_ref().unwrap()[0].ready); write_le_u32(&mut buf[..], 1); d.write(0, 0x44, &buf[..]); - assert!(d.locked_device().queues()[0].ready); + assert!(d.queues.as_ref().unwrap()[0].ready); - assert_eq!(d.locked_device().queues()[0].desc_table.0, 0); + assert_eq!(d.queues.as_ref().unwrap()[0].desc_table.0, 0); write_le_u32(&mut buf[..], 123); d.write(0, 0x80, &buf[..]); - assert_eq!(d.locked_device().queues()[0].desc_table.0, 123); + assert_eq!(d.queues.as_ref().unwrap()[0].desc_table.0, 123); d.write(0, 0x84, &buf[..]); assert_eq!( - d.locked_device().queues()[0].desc_table.0, + d.queues.as_ref().unwrap()[0].desc_table.0, 123 + (123 << 32) ); - assert_eq!(d.locked_device().queues()[0].avail_ring.0, 0); + assert_eq!(d.queues.as_ref().unwrap()[0].avail_ring.0, 0); write_le_u32(&mut buf[..], 124); d.write(0, 0x90, &buf[..]); - assert_eq!(d.locked_device().queues()[0].avail_ring.0, 124); + assert_eq!(d.queues.as_ref().unwrap()[0].avail_ring.0, 124); d.write(0, 0x94, &buf[..]); assert_eq!( - d.locked_device().queues()[0].avail_ring.0, + d.queues.as_ref().unwrap()[0].avail_ring.0, 124 + (124 << 32) ); - assert_eq!(d.locked_device().queues()[0].used_ring.0, 0); + assert_eq!(d.queues.as_ref().unwrap()[0].used_ring.0, 0); write_le_u32(&mut buf[..], 125); d.write(0, 0xa0, &buf[..]); - assert_eq!(d.locked_device().queues()[0].used_ring.0, 125); + assert_eq!(d.queues.as_ref().unwrap()[0].used_ring.0, 125); d.write(0, 0xa4, &buf[..]); - assert_eq!(d.locked_device().queues()[0].used_ring.0, 125 + (125 << 32)); + assert_eq!(d.queues.as_ref().unwrap()[0].used_ring.0, 125 + (125 << 32)); set_device_status( &mut d, @@ -880,7 +916,7 @@ pub(crate) mod tests { ); let mut buf = [0; 4]; - let queue_len = d.locked_device().queues().len(); + let queue_len = d.queues.as_ref().unwrap().len(); for q in 0..queue_len { d.queue_select = q as u32; write_le_u32(&mut buf[..], 16); @@ -924,7 +960,7 @@ pub(crate) mod tests { // Setup queue data structures let mut buf = [0; 4]; - let queues_count = d.locked_device().queues().len(); + let queues_count = d.queues.as_ref().unwrap().len(); for q in 0..queues_count { d.queue_select = q as u32; write_le_u32(&mut buf[..], 16); diff --git a/src/devices/src/virtio/mod.rs b/src/devices/src/virtio/mod.rs index 4f9258383..bb142700e 100644 --- a/src/devices/src/virtio/mod.rs +++ b/src/devices/src/virtio/mod.rs @@ -43,7 +43,9 @@ pub use self::balloon::*; #[cfg(feature = "blk")] pub use self::block::{Block, CacheType}; pub use self::console::*; -pub use self::device::*; +pub use self::device::{ + DeviceQueue, DeviceState, QueueConfig, VirtioDevice, VirtioShmRegion, VmmExitObserver, +}; #[cfg(not(any(feature = "tee", feature = "nitro")))] pub use self::fs::*; #[cfg(feature = "gpu")] diff --git a/src/devices/src/virtio/net/device.rs b/src/devices/src/virtio/net/device.rs index fc0e27c4f..2f7fd143e 100644 --- a/src/devices/src/virtio/net/device.rs +++ b/src/devices/src/virtio/net/device.rs @@ -4,11 +4,12 @@ // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the THIRD-PARTY file. -use crate::virtio::net::{Error, Result}; -use crate::virtio::net::{QUEUE_SIZES, RX_INDEX, TX_INDEX}; +use crate::virtio::net::Result; +use crate::virtio::net::{NUM_QUEUES, QUEUE_CONFIG}; use crate::virtio::queue::Error as QueueError; use crate::virtio::{ - ActivateError, ActivateResult, DeviceState, InterruptTransport, Queue, VirtioDevice, TYPE_NET, + ActivateError, ActivateResult, DeviceQueue, DeviceState, InterruptTransport, QueueConfig, + VirtioDevice, TYPE_NET, }; use crate::Error as DeviceError; @@ -19,7 +20,6 @@ use std::cmp; use std::io::Write; use std::os::fd::RawFd; use std::path::PathBuf; -use utils::eventfd::{EventFd, EFD_NONBLOCK}; use virtio_bindings::virtio_net::VIRTIO_NET_F_MAC; use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{ByteValued, GuestMemoryError, GuestMemoryMmap}; @@ -76,9 +76,6 @@ pub struct Net { avail_features: u64, acked_features: u64, - queues: Vec, - queue_evts: Vec, - pub(crate) device_state: DeviceState, config: VirtioNetConfig, @@ -97,13 +94,6 @@ impl Net { | (1 << VIRTIO_RING_F_EVENT_IDX) | (1 << VIRTIO_F_VERSION_1); - let mut queue_evts = Vec::new(); - for _ in QUEUE_SIZES.iter() { - queue_evts.push(EventFd::new(EFD_NONBLOCK).map_err(Error::EventFd)?); - } - - let queues = QUEUE_SIZES.iter().map(|&s| Queue::new(s)).collect(); - let config = VirtioNetConfig { mac, status: 0, @@ -117,8 +107,6 @@ impl Net { avail_features, acked_features: 0u64, - queues, - queue_evts, device_state: DeviceState::Inactive, config, }) @@ -156,16 +144,8 @@ impl VirtioDevice for Net { "net" } - fn queues(&self) -> &[Queue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [Queue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_evts + fn queue_config(&self) -> &[QueueConfig] { + &QUEUE_CONFIG } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -190,20 +170,24 @@ impl VirtioDevice for Net { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { - let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - self.queues[RX_INDEX].set_event_idx(event_idx); - self.queues[TX_INDEX].set_event_idx(event_idx); + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { + let [mut rx_q, mut tx_q]: [_; NUM_QUEUES] = queues.try_into().map_err(|_| { + error!("Cannot perform activate. Expected {} queue(s)", NUM_QUEUES); + ActivateError::BadActivate + })?; - let queue_evts = self - .queue_evts - .iter() - .map(|e| e.try_clone().unwrap()) - .collect(); + let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; + rx_q.queue.set_event_idx(event_idx); + tx_q.queue.set_event_idx(event_idx); match NetWorker::new( - self.queues.clone(), - queue_evts, + rx_q, + tx_q, interrupt.clone(), mem.clone(), self.acked_features, diff --git a/src/devices/src/virtio/net/mod.rs b/src/devices/src/virtio/net/mod.rs index 367b6013d..d3e7e9739 100644 --- a/src/devices/src/virtio/net/mod.rs +++ b/src/devices/src/virtio/net/mod.rs @@ -4,14 +4,12 @@ use std::{io, mem, result}; use virtio_bindings::virtio_net::virtio_net_hdr_v1; +use super::QueueConfig; + pub const MAX_BUFFER_SIZE: usize = 65562; -pub const QUEUE_SIZE: u16 = 1024; +const QUEUE_SIZE: u16 = 1024; pub const NUM_QUEUES: usize = 2; -pub const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE; NUM_QUEUES]; -// The index of the rx queue from Net device queues/queues_evts vector. -pub const RX_INDEX: usize = 0; -// The index of the tx queue from Net device queues/queues_evts vector. -pub const TX_INDEX: usize = 1; +pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; mod backend; pub mod device; diff --git a/src/devices/src/virtio/net/worker.rs b/src/devices/src/virtio/net/worker.rs index 9ab122fe5..222b781a0 100644 --- a/src/devices/src/virtio/net/worker.rs +++ b/src/devices/src/virtio/net/worker.rs @@ -3,8 +3,8 @@ use crate::virtio::net::backend::ConnectError; use crate::virtio::net::tap::Tap; use crate::virtio::net::unixgram::Unixgram; use crate::virtio::net::unixstream::Unixstream; -use crate::virtio::net::{MAX_BUFFER_SIZE, QUEUE_SIZE, RX_INDEX, TX_INDEX}; -use crate::virtio::{InterruptTransport, Queue}; +use crate::virtio::net::{MAX_BUFFER_SIZE, QUEUE_SIZE}; +use crate::virtio::{DeviceQueue, InterruptTransport}; use super::backend::{NetBackend, ReadError, WriteError}; use super::device::{FrontendError, RxError, TxError, VirtioNetBackend}; @@ -14,12 +14,11 @@ use std::os::fd::{AsRawFd, FromRawFd, OwnedFd}; use std::thread; use std::{cmp, result}; use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; -use utils::eventfd::EventFd; use vm_memory::{Bytes, GuestAddress, GuestMemoryMmap}; pub struct NetWorker { - queues: Vec, - queue_evts: Vec, + rx_q: DeviceQueue, + tx_q: DeviceQueue, interrupt: InterruptTransport, mem: GuestMemoryMmap, @@ -35,10 +34,9 @@ pub struct NetWorker { } impl NetWorker { - #[allow(clippy::too_many_arguments)] pub fn new( - queues: Vec, - queue_evts: Vec, + rx_q: DeviceQueue, + tx_q: DeviceQueue, interrupt: InterruptTransport, mem: GuestMemoryMmap, _vnet_features: u64, @@ -70,8 +68,8 @@ impl NetWorker { }; Ok(Self { - queues, - queue_evts, + rx_q, + tx_q, mem, backend, @@ -95,8 +93,8 @@ impl NetWorker { } fn work(mut self) { - let virtq_rx_ev_fd = self.queue_evts[RX_INDEX].as_raw_fd(); - let virtq_tx_ev_fd = self.queue_evts[TX_INDEX].as_raw_fd(); + let virtq_rx_ev_fd = self.rx_q.event.as_raw_fd(); + let virtq_tx_ev_fd = self.tx_q.event.as_raw_fd(); let backend_socket = self.backend.raw_socket_fd(); let epoll = Epoll::new().unwrap(); @@ -166,22 +164,22 @@ impl NetWorker { } pub(crate) fn process_rx_queue_event(&mut self) { - if let Err(e) = self.queue_evts[RX_INDEX].read() { + if let Err(e) = self.rx_q.event.read() { log::error!("Failed to get rx event from queue: {e:?}"); } - if let Err(e) = self.queues[RX_INDEX].disable_notification(&self.mem) { + if let Err(e) = self.rx_q.queue.disable_notification(&self.mem) { error!("error disabling queue notifications: {e:?}"); } if let Err(e) = self.process_rx() { log::error!("Failed to process rx: {e:?} (triggered by queue event)") }; - if let Err(e) = self.queues[RX_INDEX].enable_notification(&self.mem) { + if let Err(e) = self.rx_q.queue.enable_notification(&self.mem) { error!("error disabling queue notifications: {e:?}"); } } pub(crate) fn process_tx_queue_event(&mut self) { - match self.queue_evts[TX_INDEX].read() { + match self.tx_q.event.read() { Ok(_) => self.process_tx_loop(), Err(e) => { log::error!("Failed to get tx queue event from queue: {e:?}"); @@ -190,13 +188,13 @@ impl NetWorker { } pub(crate) fn process_backend_socket_readable(&mut self) { - if let Err(e) = self.queues[RX_INDEX].enable_notification(&self.mem) { + if let Err(e) = self.rx_q.queue.enable_notification(&self.mem) { error!("error disabling queue notifications: {e:?}"); } if let Err(e) = self.process_rx() { log::error!("Failed to process rx: {e:?} (triggered by backend socket readable)"); }; - if let Err(e) = self.queues[RX_INDEX].disable_notification(&self.mem) { + if let Err(e) = self.rx_q.queue.disable_notification(&self.mem) { error!("error disabling queue notifications: {e:?}"); } } @@ -259,25 +257,20 @@ impl NetWorker { fn process_tx_loop(&mut self) { loop { - self.queues[TX_INDEX] - .disable_notification(&self.mem) - .unwrap(); + self.tx_q.queue.disable_notification(&self.mem).unwrap(); if let Err(e) = self.process_tx() { log::error!("Failed to process rx: {e:?} (triggered by backend socket readable)"); }; - if !self.queues[TX_INDEX] - .enable_notification(&self.mem) - .unwrap() - { + if !self.tx_q.queue.enable_notification(&self.mem).unwrap() { break; } } } fn process_tx(&mut self) -> result::Result<(), TxError> { - let tx_queue = &mut self.queues[TX_INDEX]; + let tx_queue = &mut self.tx_q.queue; if self.backend.has_unfinished_write() && self @@ -379,7 +372,7 @@ impl NetWorker { fn write_frame_to_guest_impl(&mut self) -> result::Result<(), FrontendError> { let mut result: std::result::Result<(), FrontendError> = Ok(()); - let queue = &mut self.queues[RX_INDEX]; + let queue = &mut self.rx_q.queue; let head_descriptor = queue.pop(&self.mem).ok_or(FrontendError::EmptyQueue)?; let head_index = head_descriptor.index; @@ -427,7 +420,7 @@ impl NetWorker { // Copies a single frame from `self.rx_frame_buf` into the guest. In case of an error retries // the operation if possible. Returns true if the operation was successfull. fn write_frame_to_guest(&mut self) -> bool { - let max_iterations = self.queues[RX_INDEX].actual_size(); + let max_iterations = self.rx_q.queue.actual_size(); for _ in 0..max_iterations { match self.write_frame_to_guest_impl() { Ok(()) => return true, diff --git a/src/devices/src/virtio/queue.rs b/src/devices/src/virtio/queue.rs index c539c7198..2fb74289d 100644 --- a/src/devices/src/virtio/queue.rs +++ b/src/devices/src/virtio/queue.rs @@ -319,7 +319,7 @@ impl<'a> DescriptorChain<'a> { } } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] /// A virtio queue's parameters. pub struct Queue { /// The maximal size in elements offered by the device diff --git a/src/devices/src/virtio/rng/device.rs b/src/devices/src/virtio/rng/device.rs index 1191e4032..c008c6189 100644 --- a/src/devices/src/virtio/rng/device.rs +++ b/src/devices/src/virtio/rng/device.rs @@ -3,7 +3,7 @@ use utils::eventfd::EventFd; use vm_memory::{Bytes, GuestMemoryMmap}; use super::super::{ - ActivateError, ActivateResult, DeviceState, Queue as VirtQueue, RngError, VirtioDevice, + ActivateError, ActivateResult, DeviceQueue, DeviceState, QueueConfig, RngError, VirtioDevice, }; use super::{defs, defs::uapi}; use crate::virtio::InterruptTransport; @@ -15,8 +15,7 @@ pub(crate) const REQ_INDEX: usize = 0; pub(crate) const AVAIL_FEATURES: u64 = 1 << uapi::VIRTIO_F_VERSION_1 as u64; pub struct Rng { - pub(crate) queues: Vec, - pub(crate) queue_events: Vec, + pub(crate) queues: Option>, pub(crate) avail_features: u64, pub(crate) acked_features: u64, pub(crate) activate_evt: EventFd, @@ -24,16 +23,13 @@ pub struct Rng { } impl Rng { - pub(crate) fn with_queues(queues: Vec) -> super::Result { - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(RngError::EventFd)?); - } + pub(crate) fn queue_event(&self, idx: usize) -> &std::sync::Arc { + &self.queues.as_ref().expect("queues should exist")[idx].event + } + pub fn new() -> super::Result { Ok(Rng { - queues, - queue_events, + queues: None, avail_features: AVAIL_FEATURES, acked_features: 0, activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(RngError::EventFd)?, @@ -41,14 +37,6 @@ impl Rng { }) } - pub fn new() -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues(queues) - } - pub fn id(&self) -> &str { defs::RNG_DEV_ID } @@ -61,28 +49,32 @@ impl Rng { DeviceState::Inactive => unreachable!(), }; + let queues = self + .queues + .as_mut() + .expect("queues should exist when activated"); let mut have_used = false; - while let Some(head) = self.queues[REQ_INDEX].pop(mem) { + while let Some(head) = queues[REQ_INDEX].queue.pop(mem) { let index = head.index; let mut written = 0; for desc in head.into_iter() { let mut rand_bytes = vec![0u8; desc.len as usize]; if let Err(e) = OsRng.try_fill_bytes(&mut rand_bytes) { error!("Failed to fill buffer with random data: {e:?}"); - self.queues[REQ_INDEX].go_to_previous_position(); + queues[REQ_INDEX].queue.go_to_previous_position(); break; } if let Err(e) = mem.write_slice(&rand_bytes[..], desc.addr) { error!("Failed to write slice: {e:?}"); - self.queues[REQ_INDEX].go_to_previous_position(); + queues[REQ_INDEX].queue.go_to_previous_position(); break; } written += desc.len; } have_used = true; - if let Err(e) = self.queues[REQ_INDEX].add_used(mem, index, written) { + if let Err(e) = queues[REQ_INDEX].queue.add_used(mem, index, written) { error!("failed to add used elements to the queue: {e:?}"); } } @@ -112,16 +104,8 @@ impl VirtioDevice for Rng { "rng" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &defs::QUEUE_CONFIG } fn read_config(&self, _offset: u64, _data: &mut [u8]) { @@ -136,12 +120,17 @@ impl VirtioDevice for Rng { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { - if self.queues.len() != defs::NUM_QUEUES { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { + if queues.len() != defs::NUM_QUEUES { error!( "Cannot perform activate. Expected {} queue(s), got {}", defs::NUM_QUEUES, - self.queues.len() + queues.len() ); return Err(ActivateError::BadActivate); } @@ -151,6 +140,7 @@ impl VirtioDevice for Rng { return Err(ActivateError::BadActivate); } + self.queues = Some(queues); self.device_state = DeviceState::Activated(mem, interrupt); Ok(()) @@ -161,10 +151,8 @@ impl VirtioDevice for Rng { } fn reset(&mut self) -> bool { - // Strictly speaking, we should unsubscribe the queue events resubscribe - // the activate eventfd and deactivate the device, but we don't support - // any scenario in which neither GuestMemory nor the queue events would - // change, so let's avoid doing any unnecessary work. + self.queues = None; + self.device_state = DeviceState::Inactive; true } } diff --git a/src/devices/src/virtio/rng/event_handler.rs b/src/devices/src/virtio/rng/event_handler.rs index c31c841ad..cf32f2343 100644 --- a/src/devices/src/virtio/rng/event_handler.rs +++ b/src/devices/src/virtio/rng/event_handler.rs @@ -16,7 +16,7 @@ impl Rng { return; } - if let Err(e) = self.queue_events[REQ_INDEX].read() { + if let Err(e) = self.queue_event(REQ_INDEX).read() { error!("Failed to read request queue event: {e:?}"); } else if self.process_req() { self.device_state.signal_used_queue(); @@ -37,11 +37,8 @@ impl Rng { event_manager .register( - self.queue_events[REQ_INDEX].as_raw_fd(), - EpollEvent::new( - EventSet::IN, - self.queue_events[REQ_INDEX].as_raw_fd() as u64, - ), + self.queue_event(REQ_INDEX).as_raw_fd(), + EpollEvent::new(EventSet::IN, self.queue_event(REQ_INDEX).as_raw_fd() as u64), self_subscriber.clone(), ) .unwrap_or_else(|e| { @@ -59,7 +56,7 @@ impl Rng { impl Subscriber for Rng { fn process(&mut self, event: &EpollEvent, event_manager: &mut EventManager) { let source = event.fd(); - let req = self.queue_events[REQ_INDEX].as_raw_fd(); + let req = self.queue_event(REQ_INDEX).as_raw_fd(); let activate_evt = self.activate_evt.as_raw_fd(); if self.is_activated() { diff --git a/src/devices/src/virtio/rng/mod.rs b/src/devices/src/virtio/rng/mod.rs index 3788073cb..864effd63 100644 --- a/src/devices/src/virtio/rng/mod.rs +++ b/src/devices/src/virtio/rng/mod.rs @@ -5,9 +5,12 @@ pub use self::defs::uapi::VIRTIO_ID_RNG as TYPE_RNG; pub use self::device::Rng; mod defs { + use crate::virtio::QueueConfig; + pub const RNG_DEV_ID: &str = "virtio_rng"; pub const NUM_QUEUES: usize = 1; - pub const QUEUE_SIZES: &[u16] = &[256; NUM_QUEUES]; + const QUEUE_SIZE: u16 = 256; + pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; pub mod uapi { pub const VIRTIO_F_VERSION_1: u32 = 32; diff --git a/src/devices/src/virtio/snd/device.rs b/src/devices/src/virtio/snd/device.rs index 1226d8491..4c3fa7ce5 100644 --- a/src/devices/src/virtio/snd/device.rs +++ b/src/devices/src/virtio/snd/device.rs @@ -5,7 +5,7 @@ use utils::eventfd::EventFd; use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{ByteValued, GuestMemoryMmap}; -use super::super::{ActivateError, ActivateResult, Queue as VirtQueue, VirtioDevice}; +use super::super::{ActivateError, ActivateResult, DeviceQueue, QueueConfig, VirtioDevice}; use super::virtio_sound::VirtioSoundConfig; use super::worker::SndWorker; use super::{defs, defs::uapi, defs::QUEUE_INDEXES, Error}; @@ -16,8 +16,6 @@ use crate::virtio::{DeviceState, InterruptTransport}; pub(crate) const AVAIL_FEATURES: u64 = 1 << uapi::VIRTIO_F_VERSION_1 as u64; pub struct Snd { - pub(crate) queues: Vec, - pub(crate) queue_events: Vec, pub(crate) avail_features: u64, pub(crate) acked_features: u64, pub(crate) activate_evt: EventFd, @@ -27,16 +25,8 @@ pub struct Snd { } impl Snd { - pub(crate) fn with_queues(queues: Vec) -> super::Result { - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(Error::EventFdCreate)?); - } - + pub fn new() -> super::Result { Ok(Snd { - queues, - queue_events, avail_features: AVAIL_FEATURES, acked_features: 0, activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) @@ -48,14 +38,6 @@ impl Snd { }) } - pub fn new() -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues(queues) - } - pub fn id(&self) -> &str { defs::SND_DEV_ID } @@ -82,16 +64,8 @@ impl VirtioDevice for Snd { "snd" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &defs::QUEUE_CONFIG } fn read_config(&self, offset: u64, mut data: &mut [u8]) { @@ -122,33 +96,32 @@ impl VirtioDevice for Snd { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + mut queues: Vec, + ) -> ActivateResult { if self.worker_thread.is_some() { panic!("virtio_snd: worker thread already exists"); } - if self.queues.len() != defs::NUM_QUEUES { + if queues.len() != defs::NUM_QUEUES { error!( "Cannot perform activate. Expected {} queue(s), got {}", defs::NUM_QUEUES, - self.queues.len() + queues.len() ); return Err(ActivateError::BadActivate); } let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; for idx in QUEUE_INDEXES { - self.queues[idx].set_event_idx(event_idx); + queues[idx].queue.set_event_idx(event_idx); } - let queue_evts = self - .queue_events - .iter() - .map(|e| e.try_clone().unwrap()) - .collect(); let worker = SndWorker::new( - self.queues.clone(), - queue_evts, + queues, interrupt.clone(), mem.clone(), self.worker_stopfd.try_clone().unwrap(), diff --git a/src/devices/src/virtio/snd/mod.rs b/src/devices/src/virtio/snd/mod.rs index 17c31aff5..ea2f31f4a 100644 --- a/src/devices/src/virtio/snd/mod.rs +++ b/src/devices/src/virtio/snd/mod.rs @@ -22,17 +22,20 @@ use super::{Descriptor, InterruptTransport, Queue}; use crate::virtio::snd::virtio_sound::{VirtioSoundHeader, VirtioSoundPcmStatus}; mod defs { + use super::super::QueueConfig; use super::virtio_sound::*; pub const SND_DEV_ID: &str = "virtio_snd"; pub const NUM_QUEUES: usize = 4; - pub const QUEUE_SIZES: &[u16] = &[256; NUM_QUEUES]; pub const CTL_INDEX: usize = 0; pub const EVT_INDEX: usize = 1; pub const TXQ_INDEX: usize = 2; pub const RXQ_INDEX: usize = 3; pub const QUEUE_INDEXES: [usize; 4] = [CTL_INDEX, EVT_INDEX, TXQ_INDEX, RXQ_INDEX]; + const QUEUE_SIZE: u16 = 256; + pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; + pub const SUPPORTED_FORMATS: u64 = (1 << VIRTIO_SND_PCM_FMT_U8) | (1 << VIRTIO_SND_PCM_FMT_S16) | (1 << VIRTIO_SND_PCM_FMT_S24) diff --git a/src/devices/src/virtio/snd/worker.rs b/src/devices/src/virtio/snd/worker.rs index a32282f88..b269c9503 100644 --- a/src/devices/src/virtio/snd/worker.rs +++ b/src/devices/src/virtio/snd/worker.rs @@ -8,7 +8,7 @@ use utils::epoll::{ControlOperation, Epoll, EpollEvent, EventSet}; use utils::eventfd::EventFd; use vm_memory::{ByteValued, Bytes, GuestMemoryMmap}; -use super::super::Queue; +use super::super::DeviceQueue; use super::audio_backends::{alloc_audio_backend, AudioBackend}; use super::defs::{CTL_INDEX, EVT_INDEX, QUEUE_INDEXES, RXQ_INDEX, TXQ_INDEX}; use super::stream::{Error as StreamError, Stream}; @@ -28,7 +28,7 @@ use crate::virtio::{DescriptorChain, InterruptTransport}; pub struct SndWorker { vrings: Vec>>, - queue_evts: Vec, + queue_events: Vec>, interrupt: InterruptTransport, mem: GuestMemoryMmap, streams: Arc>>, @@ -40,10 +40,8 @@ pub struct SndWorker { } impl SndWorker { - #[allow(clippy::too_many_arguments)] pub fn new( - queues: Vec, - queue_evts: Vec, + queues: Vec, interrupt: InterruptTransport, mem: GuestMemoryMmap, stop_fd: EventFd, @@ -86,18 +84,20 @@ impl SndWorker { RwLock::new(alloc_audio_backend(BackendType::Pipewire, streams.clone()).unwrap()); let mut vrings: Vec>> = Vec::new(); + let mut queue_events: Vec> = Vec::new(); - for idx in QUEUE_INDEXES { + for dq in queues { vrings.push(Arc::new(Mutex::new(Vring { mem: mem.clone(), - queue: queues[idx].clone(), + queue: dq.queue, interrupt: interrupt.clone(), }))); + queue_events.push(dq.event); } Self { vrings, - queue_evts, + queue_events, interrupt, mem, streams, @@ -120,7 +120,7 @@ impl SndWorker { let epoll = Epoll::new().unwrap(); for idx in QUEUE_INDEXES { - let fd = self.queue_evts[idx].as_raw_fd(); + let fd = self.queue_events[idx].as_raw_fd(); epoll .ctl( ControlOperation::Add, @@ -173,7 +173,7 @@ impl SndWorker { fn handle_event(&mut self, queue_index: usize) { debug!("Fs: queue event: {queue_index}"); - if let Err(e) = self.queue_evts[queue_index].read() { + if let Err(e) = self.queue_events[queue_index].read() { error!("Failed to get queue event: {e:?}"); } diff --git a/src/devices/src/virtio/vsock/device.rs b/src/devices/src/virtio/vsock/device.rs index 2708327ca..a61043b4c 100644 --- a/src/devices/src/virtio/vsock/device.rs +++ b/src/devices/src/virtio/vsock/device.rs @@ -14,7 +14,8 @@ use utils::eventfd::EventFd; use vm_memory::GuestMemoryMmap; use super::super::{ - ActivateError, ActivateResult, DeviceState, Queue as VirtQueue, VirtioDevice, VsockError, + ActivateError, ActivateResult, DeviceQueue, DeviceState, Queue as VirtQueue, QueueConfig, + VirtioDevice, }; use super::muxer::VsockMuxer; use super::packet::VsockPacket; @@ -37,10 +38,10 @@ pub(crate) const AVAIL_FEATURES: u64 = (1 << uapi::VIRTIO_F_VERSION_1 as u64) pub struct Vsock { cid: u64, pub(crate) muxer: VsockMuxer, - pub(crate) queue_rx: Arc>, - pub(crate) queue_tx: Arc>, - pub(crate) queues: Vec, - pub(crate) queue_events: Vec, + pub(crate) queue_rx: Option>>, + pub(crate) queue_tx: Option>>, + // Queue events are stored separately for event handling. + pub(crate) queue_events: Vec>, pub(crate) avail_features: u64, pub(crate) acked_features: u64, pub(crate) activate_evt: EventFd, @@ -48,51 +49,27 @@ pub struct Vsock { } impl Vsock { - pub(crate) fn with_queues( + /// Create a new virtio-vsock device with the given VM CID. + pub fn new( cid: u64, host_port_map: Option>, - queues: Vec, unix_ipc_port_map: Option>, tsi_flags: TsiFlags, ) -> super::Result { - let mut queue_events = Vec::new(); - for _ in 0..queues.len() { - queue_events - .push(EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(VsockError::EventFd)?); - } - - let queue_tx = Arc::new(Mutex::new(queues[TXQ_INDEX].clone())); - let queue_rx = Arc::new(Mutex::new(queues[RXQ_INDEX].clone())); - Ok(Vsock { cid, muxer: VsockMuxer::new(cid, host_port_map, unix_ipc_port_map, tsi_flags), - queue_rx, - queue_tx, - queues, - queue_events, + queue_rx: None, + queue_tx: None, + queue_events: Vec::new(), avail_features: AVAIL_FEATURES, acked_features: 0, activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) - .map_err(VsockError::EventFd)?, + .map_err(super::VsockError::EventFd)?, device_state: DeviceState::Inactive, }) } - /// Create a new virtio-vsock device with the given VM CID. - pub fn new( - cid: u64, - host_port_map: Option>, - unix_ipc_port_map: Option>, - tsi_flags: TsiFlags, - ) -> super::Result { - let queues: Vec = defs::QUEUE_SIZES - .iter() - .map(|&max_size| VirtQueue::new(max_size)) - .collect(); - Self::with_queues(cid, host_port_map, queues, unix_ipc_port_map, tsi_flags) - } - pub fn id(&self) -> &str { defs::VSOCK_DEV_ID } @@ -115,7 +92,11 @@ impl Vsock { let mut have_used = false; debug!("process_rx before while"); - let mut queue_rx = self.queue_rx.lock().unwrap(); + let queue_rx = self + .queue_rx + .as_ref() + .expect("queue_rx should exist when activated"); + let mut queue_rx = queue_rx.lock().unwrap(); while let Some(head) = queue_rx.pop(mem) { debug!("process_rx inside while"); let used_len = match VsockPacket::from_rx_virtq_head(&head) { @@ -157,7 +138,11 @@ impl Vsock { let mut have_used = false; - let mut queue_tx = self.queue_tx.lock().unwrap(); + let queue_tx = self + .queue_tx + .as_ref() + .expect("queue_tx should exist when activated"); + let mut queue_tx = queue_tx.lock().unwrap(); while let Some(head) = queue_tx.pop(mem) { let pkt = match VsockPacket::from_tx_virtq_head(&head) { Ok(pkt) => pkt, @@ -216,16 +201,8 @@ impl VirtioDevice for Vsock { "vsock" } - fn queues(&self) -> &[VirtQueue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [VirtQueue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_events + fn queue_config(&self) -> &[QueueConfig] { + &defs::QUEUE_CONFIG } fn read_config(&self, offset: u64, data: &mut [u8]) { @@ -253,12 +230,17 @@ impl VirtioDevice for Vsock { ); } - fn activate(&mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport) -> ActivateResult { - if self.queues.len() != defs::NUM_QUEUES { + fn activate( + &mut self, + mem: GuestMemoryMmap, + interrupt: InterruptTransport, + queues: Vec, + ) -> ActivateResult { + if queues.len() != defs::NUM_QUEUES { error!( "Cannot perform activate. Expected {} queue(s), got {}", defs::NUM_QUEUES, - self.queues.len() + queues.len() ); return Err(ActivateError::BadActivate); } @@ -268,10 +250,23 @@ impl VirtioDevice for Vsock { return Err(ActivateError::BadActivate); } - self.queue_tx = Arc::new(Mutex::new(self.queues[TXQ_INDEX].clone())); - self.queue_rx = Arc::new(Mutex::new(self.queues[RXQ_INDEX].clone())); - self.muxer - .activate(mem.clone(), self.queue_rx.clone(), interrupt.clone()); + // Store queue events for event handling. + self.queue_events = queues.iter().map(|dq| dq.event.clone()).collect(); + + // Extract queues from DeviceQueues and wrap in Arc>. + let mut queues_vec: Vec = queues.into_iter().map(|dq| dq.queue).collect(); + // Note: EVQ (index 2) is currently unused, we just take it to maintain the vec. + let _evq = queues_vec.pop().unwrap(); + let tx_queue = queues_vec.pop().unwrap(); + let rx_queue = queues_vec.pop().unwrap(); + + self.queue_tx = Some(Arc::new(Mutex::new(tx_queue))); + self.queue_rx = Some(Arc::new(Mutex::new(rx_queue))); + self.muxer.activate( + mem.clone(), + self.queue_rx.clone().unwrap(), + interrupt.clone(), + ); self.device_state = DeviceState::Activated(mem, interrupt); diff --git a/src/devices/src/virtio/vsock/mod.rs b/src/devices/src/virtio/vsock/mod.rs index 7288de0bd..c307c5443 100644 --- a/src/devices/src/virtio/vsock/mod.rs +++ b/src/devices/src/virtio/vsock/mod.rs @@ -58,11 +58,14 @@ mod defs { /// Because Vsock is unique per-vm, this ID can be hardcoded. pub const VSOCK_DEV_ID: &str = "vsock"; + use crate::virtio::QueueConfig; + /// Number of virtio queues. pub const NUM_QUEUES: usize = 3; - /// Virtio queue sizes, in number of descriptor chain heads. + const QUEUE_SIZE: u16 = 256; + /// Virtio queue config. /// There are 3 queues for a virtio device (in this order): RX, TX, Event - pub const QUEUE_SIZES: &[u16] = &[256; NUM_QUEUES]; + pub static QUEUE_CONFIG: [QueueConfig; NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); NUM_QUEUES]; /// Max vsock packet data/buffer size. pub const MAX_PKT_BUF_SIZE: usize = 64 * 1024; diff --git a/src/vmm/src/device_manager/hvf/mmio.rs b/src/vmm/src/device_manager/hvf/mmio.rs index 3c909283a..57b5f0385 100644 --- a/src/vmm/src/device_manager/hvf/mmio.rs +++ b/src/vmm/src/device_manager/hvf/mmio.rs @@ -113,16 +113,6 @@ impl MMIODeviceManager { return Err(Error::IrqsExhausted); } - let mut queue_evts: Vec = Vec::new(); - - for queue_evt in mmio_device.locked_device().queue_events().iter() { - queue_evts.push(queue_evt.try_clone().unwrap()); - } - - for (i, queue_evt) in queue_evts.drain(0..).enumerate() { - mmio_device.register_queue_evt(queue_evt, i as u32); - } - mmio_device.set_irq_line(self.irq); self.bus @@ -338,11 +328,10 @@ mod tests { use super::*; use arch; use devices::legacy::DummyIrqChip; - use devices::virtio::{ActivateResult, InterruptTransport, Queue, VirtioDevice}; - use std::sync::atomic::AtomicUsize; + use devices::virtio::{ + ActivateResult, DeviceQueue, InterruptTransport, QueueConfig, VirtioDevice, + }; use std::sync::Arc; - use utils::errno; - use utils::eventfd::EventFd; use vm_memory::{GuestAddress, GuestMemoryMmap}; const QUEUE_SIZES: &[u16] = &[64]; @@ -350,19 +339,18 @@ mod tests { impl MMIODeviceManager { fn register_virtio_device( &mut self, - vm: &VmFd, + _vm: &Vm, guest_mem: GuestMemoryMmap, device: Arc>, - cmdline: &mut kernel_cmdline::Cmdline, + _cmdline: &mut kernel_cmdline::Cmdline, type_id: u32, device_id: &str, ) -> Result { let mmio_device = - devices::virtio::MmioTransport::new(guest_mem, DummyIrqChip::new().into(), device); + devices::virtio::MmioTransport::new(guest_mem, DummyIrqChip::new().into(), device) + .unwrap(); let (mmio_base, _irq) = - self.register_mmio_device(vm, mmio_device, type_id, device_id.to_string())?; - #[cfg(target_arch = "x86_64")] - self.add_device_to_cmdline(cmdline, mmio_base, _irq)?; + self.register_mmio_device(mmio_device, type_id, device_id.to_string())?; Ok(mmio_base) } } @@ -370,21 +358,16 @@ mod tests { #[allow(dead_code)] struct DummyDevice { dummy: u32, - queues: Vec, - queue_evts: [EventFd; 1], - interrupt_evt: EventFd, + queue_config: Vec, + device_activated: bool, } impl DummyDevice { pub fn new() -> Self { DummyDevice { dummy: 0, - queues: QUEUE_SIZES.iter().map(|&s| Queue::new(s)).collect(), - queue_evts: [ - EventFd::new(utils::eventfd::EFD_NONBLOCK).expect("cannot create eventFD") - ], - interrupt_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) - .expect("cannot create eventFD"), + queue_config: QUEUE_SIZES.iter().map(|&s| QueueConfig::new(s)).collect(), + device_activated: false, } } } @@ -404,21 +387,12 @@ mod tests { 0 } - fn queues(&self) -> &[Queue] { - &self.queues + fn device_name(&self) -> &str { + "dummy" } - fn queues_mut(&mut self) -> &mut [Queue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_evts - } - - fn ack_features_by_page(&mut self, page: u32, value: u32) { - let _ = page; - let _ = value; + fn queue_config(&self) -> &[QueueConfig] { + &self.queue_config } fn read_config(&self, offset: u64, data: &mut [u8]) { @@ -433,14 +407,16 @@ mod tests { fn activate( &mut self, - mem: GuestMemoryMmap, + _mem: GuestMemoryMmap, _interrupt: InterruptTransport, + _queues: Vec, ) -> ActivateResult { + self.device_activated = true; Ok(()) } fn is_activated(&self) -> bool { - false + self.device_activated } } @@ -462,7 +438,7 @@ mod tests { assert!(builder::setup_interrupt_controller(&mut vm, 1).is_ok()); assert!(device_manager - .register_virtio_device(vm.fd(), guest_mem, dummy, &mut cmdline, 0, "dummy") + .register_virtio_device(&vm, guest_mem, dummy, &mut cmdline, 0, "dummy") .is_ok()); } @@ -485,7 +461,7 @@ mod tests { for _i in arch::IRQ_BASE..=arch::IRQ_MAX { device_manager .register_virtio_device( - vm.fd(), + &vm, guest_mem.clone(), Arc::new(Mutex::new(DummyDevice::new())), &mut cmdline, @@ -499,7 +475,7 @@ mod tests { "{}", device_manager .register_virtio_device( - vm.fd(), + &vm, guest_mem, Arc::new(Mutex::new(DummyDevice::new())), &mut cmdline, @@ -516,7 +492,7 @@ mod tests { fn test_dummy_device() { let dummy = DummyDevice::new(); assert_eq!(dummy.device_type(), 0); - assert_eq!(dummy.queues().len(), QUEUE_SIZES.len()); + assert_eq!(dummy.queue_config().len(), QUEUE_SIZES.len()); } #[test] @@ -560,12 +536,12 @@ mod tests { "no more IRQs are available" ); assert_eq!( - format!("{}", Error::RegisterIoEvent(errno::Error::new(0))), - format!("failed to register IO event: {}", errno::Error::new(0)) + format!("{}", Error::RegisterIoEvent), + "failed to register IO event" ); assert_eq!( - format!("{}", Error::RegisterIrqFd(errno::Error::new(0))), - format!("failed to register irqfd: {}", errno::Error::new(0)) + format!("{}", Error::RegisterIrqFd), + "failed to register irqfd" ); } @@ -583,14 +559,9 @@ mod tests { let type_id = 0; let id = String::from("foo"); - if let Ok(addr) = device_manager.register_virtio_device( - vm.fd(), - guest_mem, - dummy, - &mut cmdline, - type_id, - &id, - ) { + if let Ok(addr) = + device_manager.register_virtio_device(&vm, guest_mem, dummy, &mut cmdline, type_id, &id) + { assert!(device_manager .get_device(DeviceType::Virtio(type_id), &id) .is_some()); diff --git a/src/vmm/src/device_manager/kvm/mmio.rs b/src/vmm/src/device_manager/kvm/mmio.rs index 73acc3a4f..25c6ad280 100644 --- a/src/vmm/src/device_manager/kvm/mmio.rs +++ b/src/vmm/src/device_manager/kvm/mmio.rs @@ -130,12 +130,7 @@ impl MMIODeviceManager { return Err(Error::IrqsExhausted); } - for (i, queue_evt) in mmio_device - .locked_device() - .queue_events() - .iter() - .enumerate() - { + for (i, queue_evt) in mmio_device.queue_evts().iter().enumerate() { let io_addr = IoEventAddress::Mmio( self.mmio_base + u64::from(devices::virtio::NOTIFY_REG_OFFSET), ); @@ -324,10 +319,11 @@ mod tests { use devices::legacy::KvmGicV3; #[cfg(target_arch = "x86_64")] use devices::legacy::KvmIoapic; - use devices::virtio::{ActivateResult, InterruptTransport, Queue, VirtioDevice}; + use devices::virtio::{ + ActivateResult, DeviceQueue, InterruptTransport, QueueConfig, VirtioDevice, + }; use std::sync::Arc; use utils::errno; - use utils::eventfd::EventFd; use vm_memory::{GuestAddress, GuestMemoryMmap}; const QUEUE_SIZES: &[u16] = &[64]; @@ -356,21 +352,16 @@ mod tests { #[allow(dead_code)] struct DummyDevice { dummy: u32, - queues: Vec, - queue_evts: [EventFd; 1], - interrupt_evt: EventFd, + queue_config: Vec, + device_activated: bool, } impl DummyDevice { pub fn new() -> Self { DummyDevice { dummy: 0, - queues: QUEUE_SIZES.iter().map(|&s| Queue::new(s)).collect(), - queue_evts: [ - EventFd::new(utils::eventfd::EFD_NONBLOCK).expect("cannot create eventFD") - ], - interrupt_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK) - .expect("cannot create eventFD"), + queue_config: QUEUE_SIZES.iter().map(|&s| QueueConfig::new(s)).collect(), + device_activated: false, } } } @@ -394,21 +385,8 @@ mod tests { "dummy" } - fn queues(&self) -> &[Queue] { - &self.queues - } - - fn queues_mut(&mut self) -> &mut [Queue] { - &mut self.queues - } - - fn queue_events(&self) -> &[EventFd] { - &self.queue_evts - } - - fn ack_features_by_page(&mut self, page: u32, value: u32) { - let _ = page; - let _ = value; + fn queue_config(&self) -> &[QueueConfig] { + &self.queue_config } fn read_config(&self, offset: u64, data: &mut [u8]) { @@ -421,12 +399,18 @@ mod tests { let _ = data; } - fn activate(&mut self, _mem: GuestMemoryMmap, _intc: InterruptTransport) -> ActivateResult { + fn activate( + &mut self, + _mem: GuestMemoryMmap, + _intc: InterruptTransport, + _queues: Vec, + ) -> ActivateResult { + self.device_activated = true; Ok(()) } fn is_activated(&self) -> bool { - false + self.device_activated } } @@ -502,7 +486,7 @@ mod tests { fn test_dummy_device() { let dummy = DummyDevice::new(); assert_eq!(dummy.device_type(), 0); - assert_eq!(dummy.queues().len(), QUEUE_SIZES.len()); + assert_eq!(dummy.queue_config().len(), QUEUE_SIZES.len()); } #[test] From 6c495596c81820533e2db23daf87c8f7bddba62e Mon Sep 17 00:00:00 2001 From: Matej Hrica Date: Wed, 14 Jan 2026 13:11:26 +0100 Subject: [PATCH 2/3] device/gpu: Refactor queue event handling Make the worker thread wait on the queue events directly instead of having an event_handler which would forward the event. Signed-off-by: Matej Hrica --- src/devices/src/virtio/gpu/device.rs | 38 ++----- src/devices/src/virtio/gpu/event_handler.rs | 114 -------------------- src/devices/src/virtio/gpu/mod.rs | 1 - src/devices/src/virtio/gpu/worker.rs | 45 +++++--- src/vmm/src/builder.rs | 6 -- 5 files changed, 36 insertions(+), 168 deletions(-) delete mode 100644 src/devices/src/virtio/gpu/event_handler.rs diff --git a/src/devices/src/virtio/gpu/device.rs b/src/devices/src/virtio/gpu/device.rs index 097197139..04f619c27 100644 --- a/src/devices/src/virtio/gpu/device.rs +++ b/src/devices/src/virtio/gpu/device.rs @@ -1,13 +1,12 @@ use std::io::Write; -use std::sync::{Arc, Mutex}; -use crossbeam_channel::{unbounded, Sender}; -use utils::eventfd::EventFd; +#[cfg(target_os = "macos")] +use crossbeam_channel::Sender; use vm_memory::{ByteValued, GuestMemoryMmap}; use super::super::{ - fs::ExportTable, ActivateError, ActivateResult, DeviceQueue, DeviceState, GpuError, - Queue as VirtQueue, QueueConfig, VirtioDevice, VirtioShmRegion, + fs::ExportTable, ActivateError, ActivateResult, DeviceQueue, DeviceState, QueueConfig, + VirtioDevice, VirtioShmRegion, }; use super::defs; use super::defs::uapi; @@ -32,16 +31,10 @@ static QUEUE_CONFIG: [QueueConfig; defs::NUM_QUEUES] = [QueueConfig::new(QUEUE_SIZE); defs::NUM_QUEUES]; pub struct Gpu { - pub(crate) queue_ctl: Option>>, - // Queue events stored for event handler - these are shared with DeviceQueue - pub(crate) ctl_event: Option>, - pub(crate) cur_event: Option>, pub(crate) avail_features: u64, pub(crate) acked_features: u64, - pub(crate) activate_evt: EventFd, pub(crate) device_state: DeviceState, shm_region: Option, - pub(crate) sender: Option>, virgl_flags: u32, #[cfg(target_os = "macos")] map_sender: Sender, @@ -58,15 +51,10 @@ impl Gpu { #[cfg(target_os = "macos")] map_sender: Sender, ) -> super::Result { Ok(Gpu { - queue_ctl: None, - ctl_event: None, - cur_event: None, avail_features: AVAIL_FEATURES, acked_features: 0, - activate_evt: EventFd::new(utils::eventfd::EFD_NONBLOCK).map_err(GpuError::EventFd)?, device_state: DeviceState::Inactive, shm_region: None, - sender: None, virgl_flags, #[cfg(target_os = "macos")] map_sender, @@ -205,7 +193,7 @@ impl VirtioDevice for Gpu { interrupt: InterruptTransport, queues: Vec, ) -> ActivateResult { - let [control_q, cursor_q]: [_; defs::NUM_QUEUES] = queues.try_into().map_err(|_| { + let [control_q, _cursor_q]: [_; defs::NUM_QUEUES] = queues.try_into().map_err(|_| { error!( "Cannot perform activate. Expected {} queue(s)", defs::NUM_QUEUES @@ -218,17 +206,10 @@ impl VirtioDevice for Gpu { None => panic!("virtio_gpu: missing SHM region"), }; - self.ctl_event = Some(control_q.event.clone()); - self.cur_event = Some(cursor_q.event.clone()); - let queue_ctl = Arc::new(Mutex::new(control_q.queue)); - self.queue_ctl = Some(queue_ctl.clone()); // cursor queue not used by worker - - let (sender, receiver) = unbounded(); let worker = Worker::new( - receiver, + control_q, mem.clone(), - queue_ctl, interrupt.clone(), shm_region, self.virgl_flags, @@ -240,13 +221,6 @@ impl VirtioDevice for Gpu { ); worker.run(); - self.sender = Some(sender); - - if self.activate_evt.write(1).is_err() { - error!("Cannot write to activate_evt",); - return Err(ActivateError::BadActivate); - } - self.device_state = DeviceState::Activated(mem, interrupt); Ok(()) diff --git a/src/devices/src/virtio/gpu/event_handler.rs b/src/devices/src/virtio/gpu/event_handler.rs deleted file mode 100644 index f11ba33bb..000000000 --- a/src/devices/src/virtio/gpu/event_handler.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::os::unix::io::AsRawFd; - -use polly::event_manager::{EventManager, Subscriber}; -use utils::epoll::{EpollEvent, EventSet}; - -use super::device::Gpu; -use crate::virtio::device::VirtioDevice; - -impl Gpu { - pub(crate) fn handle_ctl_event(&mut self, event: &EpollEvent) { - debug!("gpu: request queue event"); - - let event_set = event.event_set(); - if event_set != EventSet::IN { - warn!("gpu: request queue unexpected event {event_set:?}"); - return; - } - - let ctl_event = self.ctl_event.as_ref().unwrap(); - if let Err(e) = ctl_event.read() { - error!("Failed to read request queue event: {e:?}"); - } else if let Err(e) = self.sender.as_ref().unwrap().send(0) { - error!("Failed to signal worker for ctl queue: {e:?}"); - } - } - - pub(crate) fn handle_cur_event(&mut self, event: &EpollEvent) { - debug!("gpu: request queue event"); - - let event_set = event.event_set(); - if event_set != EventSet::IN { - warn!("gpu: request queue unexpected event {event_set:?}"); - return; - } - - let cur_event = self.cur_event.as_ref().unwrap(); - if let Err(e) = cur_event.read() { - error!("Failed to read request queue event: {e:?}"); - } else if let Err(e) = self.sender.as_ref().unwrap().send(1) { - error!("Failed to signal worker for cur queue: {e:?}"); - } - } - - fn handle_activate_event(&self, event_manager: &mut EventManager) { - debug!("gpu: activate event"); - if let Err(e) = self.activate_evt.read() { - error!("Failed to consume gpu activate event: {e:?}"); - } - - // The subscriber must exist as we previously registered activate_evt via - // `interest_list()`. - let self_subscriber = event_manager - .subscriber(self.activate_evt.as_raw_fd()) - .unwrap(); - - let ctl_event = self.ctl_event.as_ref().unwrap(); - let cur_event = self.cur_event.as_ref().unwrap(); - - event_manager - .register( - ctl_event.as_raw_fd(), - EpollEvent::new(EventSet::IN, ctl_event.as_raw_fd() as u64), - self_subscriber.clone(), - ) - .unwrap_or_else(|e| { - error!("Failed to register gpu ctl with event manager: {e:?}"); - }); - - event_manager - .register( - cur_event.as_raw_fd(), - EpollEvent::new(EventSet::IN, cur_event.as_raw_fd() as u64), - self_subscriber.clone(), - ) - .unwrap_or_else(|e| { - error!("Failed to register gpu cur with event manager: {e:?}"); - }); - - event_manager - .unregister(self.activate_evt.as_raw_fd()) - .unwrap_or_else(|e| { - error!("Failed to unregister gpu activate evt: {e:?}"); - }) - } -} - -impl Subscriber for Gpu { - fn process(&mut self, event: &EpollEvent, event_manager: &mut EventManager) { - let source = event.fd(); - let activate_evt = self.activate_evt.as_raw_fd(); - - if self.is_activated() { - let ctl = self.ctl_event.as_ref().unwrap().as_raw_fd(); - let cur = self.cur_event.as_ref().unwrap().as_raw_fd(); - match source { - _ if source == ctl => self.handle_ctl_event(event), - _ if source == cur => self.handle_cur_event(event), - _ if source == activate_evt => { - self.handle_activate_event(event_manager); - } - _ => warn!("Unexpected gpu event received: {source:?}"), - } - } else { - warn!("gpu: The device is not yet activated. Spurious event received: {source:?}"); - } - } - - fn interest_list(&self) -> Vec { - vec![EpollEvent::new( - EventSet::IN, - self.activate_evt.as_raw_fd() as u64, - )] - } -} diff --git a/src/devices/src/virtio/gpu/mod.rs b/src/devices/src/virtio/gpu/mod.rs index 143d7a67e..64f42de8f 100644 --- a/src/devices/src/virtio/gpu/mod.rs +++ b/src/devices/src/virtio/gpu/mod.rs @@ -1,7 +1,6 @@ mod device; pub mod display; mod edid; -mod event_handler; mod protocol; mod virtio_gpu; mod worker; diff --git a/src/devices/src/virtio/gpu/worker.rs b/src/devices/src/virtio/gpu/worker.rs index d5a4c32ca..cd13df14b 100644 --- a/src/devices/src/virtio/gpu/worker.rs +++ b/src/devices/src/virtio/gpu/worker.rs @@ -1,8 +1,11 @@ use std::io::Read; +use std::os::fd::{AsRawFd, BorrowedFd}; use std::sync::{Arc, Mutex}; use std::thread; -use crossbeam_channel::Receiver; +use nix::fcntl::{fcntl, FcntlArg, OFlag}; +use utils::eventfd::EventFd; + #[cfg(target_os = "macos")] use crossbeam_channel::Sender; use rutabaga_gfx::{ @@ -14,7 +17,7 @@ use utils::worker_message::WorkerMessage; use vm_memory::{GuestAddress, GuestMemoryMmap}; use super::super::descriptor_utils::{Reader, Writer}; -use super::super::{GpuError, Queue as VirtQueue}; +use super::super::{DeviceQueue, GpuError, Queue as VirtQueue}; use super::protocol::{ virtio_gpu_ctrl_hdr, virtio_gpu_mem_entry, GpuCommand, GpuResponse, VirtioGpuResult, }; @@ -28,9 +31,9 @@ use krun_display::DisplayBackend; use krun_display::Rect; pub struct Worker { - receiver: Receiver, + control_evt: EventFd, + control_queue: Arc>, mem: GuestMemoryMmap, - queue_ctl: Arc>, interrupt: InterruptTransport, shm_region: VirtioShmRegion, virgl_flags: u32, @@ -44,9 +47,8 @@ pub struct Worker { impl Worker { #[allow(clippy::too_many_arguments)] pub fn new( - receiver: Receiver, + control_q: DeviceQueue, mem: GuestMemoryMmap, - queue_ctl: Arc>, interrupt: InterruptTransport, shm_region: VirtioShmRegion, virgl_flags: u32, @@ -55,10 +57,17 @@ impl Worker { displays: Box<[DisplayInfo]>, display_backend: DisplayBackend<'static>, ) -> Self { + // Clone the eventfd so we have our own file description, then set it to blocking mode. + let control_evt = control_q.event.try_clone().unwrap(); + // SAFETY: control_evt is valid for the duration of the fcntl calls. + let fd = unsafe { BorrowedFd::borrow_raw(control_evt.as_raw_fd()) }; + let flags = OFlag::from_bits_retain(fcntl(fd, FcntlArg::F_GETFL).unwrap()) & !OFlag::O_NONBLOCK; + fcntl(fd, FcntlArg::F_SETFL(flags)).unwrap(); + Self { - receiver, + control_evt, + control_queue: Arc::new(Mutex::new(control_q.queue)), mem, - queue_ctl, interrupt, shm_region, virgl_flags, @@ -80,7 +89,7 @@ impl Worker { fn work(mut self) { let mut virtio_gpu = VirtioGpu::new( self.mem.clone(), - self.queue_ctl.clone(), + self.control_queue.clone(), self.interrupt.clone(), self.virgl_flags, #[cfg(target_os = "macos")] @@ -91,8 +100,11 @@ impl Worker { ); loop { - let _ = self.receiver.recv().unwrap(); - if self.process_queue(&mut virtio_gpu, 0) { + if let Err(e) = self.control_evt.read() { + error!("Failed to read control_evt: {e:?}"); + continue; + } + if self.process_queue(&mut virtio_gpu, &self.control_queue.clone()) { if let Err(e) = self.interrupt.try_signal_used_queue() { error!("Error signaling queue: {e:?}"); } @@ -333,12 +345,16 @@ impl Worker { } } - pub fn process_queue(&mut self, virtio_gpu: &mut VirtioGpu, _queue_index: usize) -> bool { + fn process_queue( + &mut self, + virtio_gpu: &mut VirtioGpu, + control_queue: &Arc>, + ) -> bool { let mut used_any = false; let mem = self.mem.clone(); loop { - let head = self.queue_ctl.lock().unwrap().pop(&mem); + let head = control_queue.lock().unwrap().pop(&mem); if let Some(head) = head { let mut reader = Reader::new(&mem, head.clone()) @@ -419,8 +435,7 @@ impl Worker { } if add_to_queue { - if let Err(e) = self - .queue_ctl + if let Err(e) = control_queue .lock() .unwrap() .add_used(&mem, head.index, len) diff --git a/src/vmm/src/builder.rs b/src/vmm/src/builder.rs index 92ac87079..03573485f 100644 --- a/src/vmm/src/builder.rs +++ b/src/vmm/src/builder.rs @@ -1013,7 +1013,6 @@ pub fn build_microvm( attach_gpu_device( &mut vmm, - event_manager, &mut _shm_manager, #[cfg(not(feature = "tee"))] export_table.clone(), @@ -2251,7 +2250,6 @@ fn attach_rng_device( #[allow(clippy::too_many_arguments)] fn attach_gpu_device( vmm: &mut Vmm, - event_manager: &mut EventManager, shm_manager: &mut ShmManager, #[cfg(not(feature = "tee"))] mut export_table: Option, intc: IrqChip, @@ -2273,10 +2271,6 @@ fn attach_gpu_device( .unwrap(), )); - event_manager - .add_subscriber(gpu.clone()) - .map_err(RegisterEvent)?; - let id = String::from(gpu.lock().unwrap().id()); if let Some(shm_region) = shm_manager.gpu_region() { From 2d211356f33a93af95f309c07ded825be614aea5 Mon Sep 17 00:00:00 2001 From: Matej Hrica Date: Thu, 15 Jan 2026 16:01:34 +0100 Subject: [PATCH 3/3] devices/mmio: Automatically enable device_idx feature for queues Instead of each device manually enabling the event_idx for each queue, let's make the transport layer configure the queues properly based on the acknowledged features. If it is necessary to disable this for some specific queue in the future (unlikely) we could an option for this in QueueConfig or the device could override this. Signed-off-by: Matej Hrica --- src/devices/src/virtio/block/device.rs | 5 +---- src/devices/src/virtio/fs/device.rs | 5 +---- src/devices/src/virtio/gpu/worker.rs | 3 ++- src/devices/src/virtio/mmio.rs | 5 +++++ src/devices/src/virtio/net/device.rs | 6 +----- src/devices/src/virtio/snd/device.rs | 10 ++-------- 6 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/devices/src/virtio/block/device.rs b/src/devices/src/virtio/block/device.rs index 14c95ac3c..c943b10e0 100644 --- a/src/devices/src/virtio/block/device.rs +++ b/src/devices/src/virtio/block/device.rs @@ -403,14 +403,11 @@ impl VirtioDevice for Block { panic!("virtio_blk: worker thread already exists"); } - let [mut blk_q]: [_; NUM_QUEUES] = queues.try_into().map_err(|_| { + let [blk_q]: [_; NUM_QUEUES] = queues.try_into().map_err(|_| { error!("Cannot perform activate. Expected {} queue(s)", NUM_QUEUES); ActivateError::BadActivate })?; - let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - blk_q.queue.set_event_idx(event_idx); - let disk = match self.disk.take() { Some(d) => d, None => DiskProperties::new( diff --git a/src/devices/src/virtio/fs/device.rs b/src/devices/src/virtio/fs/device.rs index 07a766245..5a0aca08b 100644 --- a/src/devices/src/virtio/fs/device.rs +++ b/src/devices/src/virtio/fs/device.rs @@ -162,13 +162,10 @@ impl VirtioDevice for Fs { panic!("virtio_fs: worker thread already exists"); } - let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - // Extract queues and eventfds from DeviceQueues. let mut worker_queues = Vec::with_capacity(queues.len()); let mut queue_evts = Vec::with_capacity(queues.len()); - for mut dq in queues { - dq.queue.set_event_idx(event_idx); + for dq in queues { worker_queues.push(dq.queue); queue_evts.push(dq.event); } diff --git a/src/devices/src/virtio/gpu/worker.rs b/src/devices/src/virtio/gpu/worker.rs index cd13df14b..e00186c46 100644 --- a/src/devices/src/virtio/gpu/worker.rs +++ b/src/devices/src/virtio/gpu/worker.rs @@ -61,7 +61,8 @@ impl Worker { let control_evt = control_q.event.try_clone().unwrap(); // SAFETY: control_evt is valid for the duration of the fcntl calls. let fd = unsafe { BorrowedFd::borrow_raw(control_evt.as_raw_fd()) }; - let flags = OFlag::from_bits_retain(fcntl(fd, FcntlArg::F_GETFL).unwrap()) & !OFlag::O_NONBLOCK; + let flags = + OFlag::from_bits_retain(fcntl(fd, FcntlArg::F_GETFL).unwrap()) & !OFlag::O_NONBLOCK; fcntl(fd, FcntlArg::F_SETFL(flags)).unwrap(); Self { diff --git a/src/devices/src/virtio/mmio.rs b/src/devices/src/virtio/mmio.rs index 234d17601..f386c2ab8 100644 --- a/src/devices/src/virtio/mmio.rs +++ b/src/devices/src/virtio/mmio.rs @@ -310,6 +310,11 @@ impl MmioTransport { .collect(); let mut locked_device = self.locked_device(); + let event_idx_enabled = + (locked_device.acked_features() & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; + for dq in &mut device_queues { + dq.queue.set_event_idx(event_idx_enabled); + } locked_device .activate(self.mem.clone(), self.interrupt.clone(), device_queues) .expect("Failed to activate device"); diff --git a/src/devices/src/virtio/net/device.rs b/src/devices/src/virtio/net/device.rs index 2f7fd143e..9d4b4a1fc 100644 --- a/src/devices/src/virtio/net/device.rs +++ b/src/devices/src/virtio/net/device.rs @@ -176,15 +176,11 @@ impl VirtioDevice for Net { interrupt: InterruptTransport, queues: Vec, ) -> ActivateResult { - let [mut rx_q, mut tx_q]: [_; NUM_QUEUES] = queues.try_into().map_err(|_| { + let [rx_q, tx_q]: [_; NUM_QUEUES] = queues.try_into().map_err(|_| { error!("Cannot perform activate. Expected {} queue(s)", NUM_QUEUES); ActivateError::BadActivate })?; - let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - rx_q.queue.set_event_idx(event_idx); - tx_q.queue.set_event_idx(event_idx); - match NetWorker::new( rx_q, tx_q, diff --git a/src/devices/src/virtio/snd/device.rs b/src/devices/src/virtio/snd/device.rs index 4c3fa7ce5..4607790d7 100644 --- a/src/devices/src/virtio/snd/device.rs +++ b/src/devices/src/virtio/snd/device.rs @@ -2,13 +2,12 @@ use std::io::Write; use std::thread::JoinHandle; use utils::eventfd::EventFd; -use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; use vm_memory::{ByteValued, GuestMemoryMmap}; use super::super::{ActivateError, ActivateResult, DeviceQueue, QueueConfig, VirtioDevice}; use super::virtio_sound::VirtioSoundConfig; use super::worker::SndWorker; -use super::{defs, defs::uapi, defs::QUEUE_INDEXES, Error}; +use super::{defs, defs::uapi, Error}; use crate::virtio::{DeviceState, InterruptTransport}; @@ -100,7 +99,7 @@ impl VirtioDevice for Snd { &mut self, mem: GuestMemoryMmap, interrupt: InterruptTransport, - mut queues: Vec, + queues: Vec, ) -> ActivateResult { if self.worker_thread.is_some() { panic!("virtio_snd: worker thread already exists"); @@ -115,11 +114,6 @@ impl VirtioDevice for Snd { return Err(ActivateError::BadActivate); } - let event_idx: bool = (self.acked_features & (1 << VIRTIO_RING_F_EVENT_IDX)) != 0; - for idx in QUEUE_INDEXES { - queues[idx].queue.set_event_idx(event_idx); - } - let worker = SndWorker::new( queues, interrupt.clone(),