/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use std::{ collections::HashMap, mem::MaybeUninit, os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle, RawHandle}, ptr::null_mut, rc::Rc, }; use windows_sys::Win32::{ Foundation::{ERROR_BROKEN_PIPE, FALSE, HANDLE, INVALID_HANDLE_VALUE}, System::{ Threading::INFINITE, IO::{CreateIoCompletionPort, GetQueuedCompletionStatus, OVERLAPPED}, }, }; use crate::{ errors::IPCError, ipc_queue::IPCQueueError, messages::Header, platform::{ windows::{get_last_error, OverlappedOperation}, PlatformError, }, IPCConnector, IPCConnectorKey, IPCEvent, IPCListener, }; const CONCURRENT_THREADS: u32 = 1; struct IPCQueueElement { connector: Rc, operation: Option, } pub struct IPCQueue { connectors: HashMap, listener: IPCListener, listen_operation: Option, port: OwnedHandle, } impl IPCQueue { pub fn new(listener: IPCListener) -> Result { let listener_port = listener.as_raw(); // Create a new completion port that allows only one active thread. let port = unsafe { CreateIoCompletionPort( /* FileHandle */ INVALID_HANDLE_VALUE, /* ExistingCompletionPort */ 0, /* CompletionKey */ 0, CONCURRENT_THREADS, ) as RawHandle }; if port.is_null() { return Err(IPCQueueError::CreationFailure(get_last_error())); } let mut queue = IPCQueue { connectors: HashMap::with_capacity(10), listener, listen_operation: None, port: unsafe { OwnedHandle::from_raw_handle(port) }, }; queue.add_handle(listener_port)?; Ok(queue) } pub fn add_connector(&mut self, connector: &Rc) -> Result<(), IPCQueueError> { self.add_handle(connector.as_raw())?; self.insert_connector(connector); Ok(()) } fn insert_connector(&mut self, connector: &Rc) { let res = self.connectors.insert( connector.key(), IPCQueueElement { connector: connector.clone(), operation: None, }, ); debug_assert!(res.is_none()); } fn add_handle(&mut self, handle: HANDLE) -> Result<(), IPCQueueError> { let port = unsafe { CreateIoCompletionPort( handle, self.port.as_raw_handle() as HANDLE, // Use the connector's handle as the events' key handle as usize, CONCURRENT_THREADS, ) as RawHandle }; if port.is_null() { return Err(IPCQueueError::RegistrationFailure(get_last_error())); } Ok(()) } pub fn wait_for_events(&mut self) -> Result, IPCQueueError> { let mut events = Vec::with_capacity(1); for element in self.connectors.values_mut() { if element.operation.is_none() { match element.connector.sched_recv_header() { Ok(operation) => element.operation = Some(operation), Err(_error @ IPCError::ReceptionFailure(PlatformError::BrokenPipe)) => { events.push(IPCEvent::Disconnect(element.connector.key())); } Err(error) => return Err(IPCQueueError::from(error)), } } } for event in &events { if let IPCEvent::Disconnect(key) = event { self.connectors.remove(key); } } if self.connectors.len() == 0 { // The last client disconnected. return Ok(events); } if self.listen_operation.is_none() { self.listen_operation = Some(self.listener.sched_listen()?); } let mut number_of_bytes_transferred = MaybeUninit::::uninit(); let mut completion_key = MaybeUninit::::uninit(); let mut overlapped = MaybeUninit::<*mut OVERLAPPED>::uninit(); let res = unsafe { GetQueuedCompletionStatus( self.port.as_raw_handle() as HANDLE, number_of_bytes_transferred.as_mut_ptr(), completion_key.as_mut_ptr(), overlapped.as_mut_ptr(), INFINITE, ) }; // SAFETY: `overlapped` will always be populated by // `GetQueueCompletionStatus()` so it's safe to assume initialization. let overlapped = unsafe { overlapped.assume_init() }; if res == FALSE { let err = get_last_error(); // If `overlapped` is non-null then the completion packet contained // the result of a failed I/O operation. We only handle failures // caused by a broken pipes, all others are considered fatal. if !overlapped.is_null() && (err == ERROR_BROKEN_PIPE) { // SAFETY: `overlapped` was non-null, so `completion_key` has // also been populated by `GetQueuedCompletionStatus()`. let completion_key = unsafe { completion_key.assume_init() }; let element = self.connectors.remove(&completion_key); debug_assert!(element.is_some(), "Completion on missing connector"); events.push(IPCEvent::Disconnect(completion_key)); } else { return Err(IPCQueueError::WaitError(err)); } } else { // SAFETY: `GetQueueCompletionStatus()` successfully retrieved a // completed I/O operation, all parameters have been populated. let (number_of_bytes_transferred, completion_key) = unsafe { ( number_of_bytes_transferred.assume_init(), completion_key.assume_init(), ) }; if number_of_bytes_transferred == 0 { // This is an event on the listener debug_assert!( self.listener.as_raw() as IPCConnectorKey == completion_key, "Completion event doesn't match the listener" ); let operation = self.listen_operation.take(); if let Some(operation) = operation { operation .accept() .map_err(|_e| IPCQueueError::RegistrationFailure(0))?; } let connector = Rc::new(self.listener.replace_pipe()?); self.insert_connector(&connector); // After the pipe is connected the listener handle will have been // replaced with a new one, so associate the new handle with the // completion queue. self.add_handle(self.listener.as_raw())?; events.push(IPCEvent::Connect(connector)); } else { let element = self .connectors .get_mut(&completion_key) .expect("Event did not match a known connector"); let operation = element .operation .take() .expect("No pending receive operation"); let buffer = &operation.collect_recv(); let header = Header::decode(buffer)?; let payload = element.connector.recv(header.size); match payload { Ok(payload) => { events.push(IPCEvent::Message( completion_key, header, payload.0, payload.1, )); } Err(_error @ IPCError::ReceptionFailure(PlatformError::BrokenPipe)) => { // This connector will generate a disconnection event // when `wait_for_events()` is called again. Do nothing // for the time being. } Err(error) => return Err(IPCQueueError::from(error)), } } } Ok(events) } } impl Drop for IPCQueue { fn drop(&mut self) { // Cancel all the pending operations. for element in self.connectors.values_mut() { if let Some(operation) = &mut element.operation { if !operation.cancel() { operation.leak(); } } } if let Some(operation) = &mut self.listen_operation { if !operation.cancel() { operation.leak(); } } // Drain the queue, once no more events are left we can safely drop it. loop { let mut number_of_bytes_transferred: u32 = 0; let mut completion_key: IPCConnectorKey = 0; let mut overlapped: *mut OVERLAPPED = null_mut(); let res = unsafe { GetQueuedCompletionStatus( self.port.as_raw_handle() as HANDLE, &mut number_of_bytes_transferred, &mut completion_key, &mut overlapped, 0, ) }; // TODO: Check that we got enough completion events? if res == FALSE && overlapped.is_null() { // TODO: Maybe check the error and report odd ones? break; } } } }