use crate::io::blocking::Buf; use crate::io::uring::open::Open; use crate::io::uring::read::Read; use crate::io::uring::utils::ArcFd; use crate::io::uring::write::Write; use crate::runtime::Handle; use io_uring::cqueue; use io_uring::squeue::Entry; use std::future::Future; use std::io::{self, Error}; use std::mem; use std::os::fd::OwnedFd; use std::pin::Pin; use std::task::{Context, Poll, Waker}; // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] #[derive(Debug)] pub(crate) enum CancelData { Open(Open), Write(Write), ReadVec(Read, OwnedFd>), ReadBuf(Read), } #[derive(Debug)] pub(crate) enum Lifecycle { /// The operation has been submitted to uring and is currently in-flight Submitted, /// The submitter is waiting for the completion of the operation Waiting(Waker), /// The submitter no longer has interest in the operation result. The state /// must be passed to the driver and held until the operation completes. Cancelled( // This field isn't accessed directly, but it holds cancellation data, // so `#[allow(dead_code)]` is needed. #[allow(dead_code)] CancelData, ), /// The operation has completed with a single cqe result Completed(io_uring::cqueue::Entry), } pub(crate) enum State { Initialize(Option), Polled(usize), Complete, } pub(crate) struct Op { // Handle to the runtime handle: Handle, // State of this Op state: State, // Per operation data. data: Option, } impl Op { /// # Safety /// /// Callers must ensure that parameters of the entry (such as buffer) are valid and will /// be valid for the entire duration of the operation, otherwise it may cause memory problems. pub(crate) unsafe fn new(entry: Entry, data: T) -> Self { let handle = Handle::current(); Self { handle, data: Some(data), state: State::Initialize(Some(entry)), } } pub(crate) fn take_data(&mut self) -> Option { self.data.take() } } impl Drop for Op { fn drop(&mut self) { match self.state { // We've already dropped this Op. State::Complete => (), // We will cancel this Op. State::Polled(index) => { let data = self.take_data(); let handle = &mut self.handle; handle.inner.driver().io().cancel_op(index, data); } // This Op has not been polled yet. // We don't need to do anything here. State::Initialize(_) => (), } } } /// A single CQE result pub(crate) struct CqeResult { pub(crate) result: io::Result, } impl From for CqeResult { fn from(cqe: cqueue::Entry) -> Self { let res = cqe.result(); let result = if res >= 0 { Ok(res as u32) } else { Err(io::Error::from_raw_os_error(-res)) }; CqeResult { result } } } /// A trait that converts a CQE result into a usable value for each operation. pub(crate) trait Completable { type Output; fn complete(self, cqe: CqeResult) -> Self::Output; // This is used when you want to terminate an operation with an error. // // The `Op` type that implements this trait can return the passed error // upstream by embedding it in the `Output`. fn complete_with_error(self, error: Error) -> Self::Output; } /// Extracts the `CancelData` needed to safely cancel an in-flight io_uring operation. pub(crate) trait Cancellable { fn cancel(self) -> CancelData; } impl Unpin for Op {} impl Future for Op { type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); let handle = &mut this.handle; let driver = handle.inner.driver().io(); match &mut this.state { State::Initialize(entry_opt) => { let entry = entry_opt.take().expect("Entry must be present"); let waker = cx.waker().clone(); // SAFETY: entry is valid for the entire duration of the operation match unsafe { driver.register_op(entry, waker) } { Ok(idx) => this.state = State::Polled(idx), Err(err) => { let data = this .take_data() .expect("Data must be present on Initialization"); this.state = State::Complete; return Poll::Ready(data.complete_with_error(err)); } }; Poll::Pending } State::Polled(idx) => { let mut ctx = driver.get_uring().lock(); let lifecycle = ctx.ops.get_mut(*idx).expect("Lifecycle must be present"); match mem::replace(lifecycle, Lifecycle::Submitted) { // Only replace the stored waker if it wouldn't wake the new one Lifecycle::Waiting(prev) if !prev.will_wake(cx.waker()) => { let waker = cx.waker().clone(); *lifecycle = Lifecycle::Waiting(waker); Poll::Pending } Lifecycle::Waiting(prev) => { *lifecycle = Lifecycle::Waiting(prev); Poll::Pending } Lifecycle::Completed(cqe) => { // Clean up and complete the future ctx.remove_op(*idx); this.state = State::Complete; drop(ctx); let data = this .take_data() .expect("Data must be present on completion"); Poll::Ready(data.complete(cqe.into())) } Lifecycle::Submitted => { unreachable!("Submitted lifecycle should never be seen here"); } Lifecycle::Cancelled(_) => { unreachable!("Cancelled lifecycle should never be seen here"); } } } State::Complete => { panic!("Future polled after completion"); } } } }