extern crate std; use { super::Handle, futures::{ channel::oneshot, future::{self, FutureExt}, sink::Sink, stream::Stream, }, std::{ boxed::Box, collections::hash_map::Entry, convert::Infallible, fmt, future::Future, iter, mem::{self, MaybeUninit}, pin::Pin, sync::atomic::{AtomicU32, Ordering::Relaxed}, task::{Context, Poll}, vec::Vec, }, }; fn ceiling(x: usize, y: usize) -> usize { (x / y) + if x % y == 0 { 0 } else { 1 } } #[doc(hidden)] pub struct StreamVtable { pub write: fn(future: u32, values: &[T]) -> Pin + '_>>, pub read: fn( future: u32, values: &mut [MaybeUninit], ) -> Pin> + '_>>, pub cancel_write: fn(future: u32), pub cancel_read: fn(future: u32), pub close_writable: fn(future: u32), pub close_readable: fn(future: u32), } struct CancelWriteOnDrop { handle: Option, vtable: &'static StreamVtable, } impl Drop for CancelWriteOnDrop { fn drop(&mut self) { if let Some(handle) = self.handle.take() { super::with_entry(handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { Handle::LocalOpen | Handle::LocalWaiting(_) | Handle::Read | Handle::LocalClosed => unreachable!(), Handle::LocalReady(..) => { entry.insert(Handle::LocalOpen); } Handle::Write => (self.vtable.cancel_write)(handle), }, }); } } } /// Represents the writable end of a Component Model `stream`. pub struct StreamWriter { handle: u32, future: Option + 'static>>>, vtable: &'static StreamVtable, } impl StreamWriter { #[doc(hidden)] pub fn new(handle: u32, vtable: &'static StreamVtable) -> Self { Self { handle, future: None, vtable, } } /// Cancel the current pending write operation. /// /// This will panic if no such operation is pending. pub fn cancel(&mut self) { assert!(self.future.is_some()); self.future = None; } } impl fmt::Debug for StreamWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StreamWriter") .field("handle", &self.handle) .finish() } } impl Sink> for StreamWriter { type Error = Infallible; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = self.get_mut(); if let Some(future) = &mut me.future { match future.as_mut().poll(cx) { Poll::Ready(_) => { me.future = None; Poll::Ready(Ok(())) } Poll::Pending => Poll::Pending, } } else { Poll::Ready(Ok(())) } } fn start_send(self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { assert!(self.future.is_none()); super::with_entry(self.handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { Handle::LocalOpen => { let handle = self.handle; let mut item = Some(item); let mut cancel_on_drop = Some(CancelWriteOnDrop:: { handle: Some(handle), vtable: self.vtable, }); self.get_mut().future = Some(Box::pin(future::poll_fn(move |cx| { super::with_entry(handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { Handle::LocalOpen => { if let Some(item) = item.take() { entry.insert(Handle::LocalReady( Box::new(item), cx.waker().clone(), )); Poll::Pending } else { cancel_on_drop.take().unwrap().handle = None; Poll::Ready(()) } } Handle::LocalReady(..) => Poll::Pending, Handle::LocalClosed => { cancel_on_drop.take().unwrap().handle = None; Poll::Ready(()) } Handle::LocalWaiting(_) | Handle::Read | Handle::Write => { unreachable!() } }, }) }))); } Handle::LocalWaiting(_) => { let Handle::LocalWaiting(tx) = entry.insert(Handle::LocalOpen) else { unreachable!() }; _ = tx.send(Box::new(item)); } Handle::LocalClosed => (), Handle::Read | Handle::LocalReady(..) => unreachable!(), Handle::Write => { let handle = self.handle; let vtable = self.vtable; let mut cancel_on_drop = CancelWriteOnDrop:: { handle: Some(handle), vtable, }; self.get_mut().future = Some(Box::pin(async move { (vtable.write)(handle, &item).await; cancel_on_drop.handle = None; drop(cancel_on_drop); })); } }, }); Ok(()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll_ready(cx) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.poll_ready(cx) } } impl Drop for StreamWriter { fn drop(&mut self) { self.future = None; super::with_entry(self.handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get_mut() { Handle::LocalOpen | Handle::LocalWaiting(_) | Handle::LocalReady(..) => { entry.insert(Handle::LocalClosed); } Handle::Read => unreachable!(), Handle::Write | Handle::LocalClosed => { entry.remove(); (self.vtable.close_writable)(self.handle); } }, }); } } struct CancelReadOnDrop { handle: Option, vtable: &'static StreamVtable, } impl Drop for CancelReadOnDrop { fn drop(&mut self) { if let Some(handle) = self.handle.take() { super::with_entry(handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { Handle::LocalOpen | Handle::LocalReady(..) | Handle::Write | Handle::LocalClosed => unreachable!(), Handle::LocalWaiting(_) => { entry.insert(Handle::LocalOpen); } Handle::Read => (self.vtable.cancel_read)(handle), }, }); } } } /// Represents the readable end of a Component Model `stream`. pub struct StreamReader { handle: AtomicU32, future: Option>> + 'static>>>, vtable: &'static StreamVtable, } impl StreamReader { /// Cancel the current pending read operation. /// /// This will panic if no such operation is pending. pub fn cancel(&mut self) { assert!(self.future.is_some()); self.future = None; } } impl fmt::Debug for StreamReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StreamReader") .field("handle", &self.handle) .finish() } } impl StreamReader { #[doc(hidden)] pub fn new(handle: u32, vtable: &'static StreamVtable) -> Self { Self { handle: AtomicU32::new(handle), future: None, vtable, } } #[doc(hidden)] pub fn from_handle_and_vtable(handle: u32, vtable: &'static StreamVtable) -> Self { super::with_entry(handle, |entry| match entry { Entry::Vacant(entry) => { entry.insert(Handle::Read); } Entry::Occupied(mut entry) => match entry.get() { Handle::Write => { entry.insert(Handle::LocalOpen); } Handle::Read | Handle::LocalOpen | Handle::LocalReady(..) | Handle::LocalWaiting(_) | Handle::LocalClosed => { unreachable!() } }, }); Self { handle: AtomicU32::new(handle), future: None, vtable, } } #[doc(hidden)] pub fn take_handle(&self) -> u32 { let handle = self.handle.swap(u32::MAX, Relaxed); super::with_entry(handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { Handle::LocalOpen => { entry.insert(Handle::Write); } Handle::Read | Handle::LocalClosed => { entry.remove(); } Handle::LocalReady(..) | Handle::LocalWaiting(_) | Handle::Write => unreachable!(), }, }); handle } } impl Stream for StreamReader { type Item = Vec; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = self.get_mut(); if me.future.is_none() { me.future = Some(super::with_entry( me.handle.load(Relaxed), |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get() { Handle::Write | Handle::LocalWaiting(_) => unreachable!(), Handle::Read => { let handle = me.handle.load(Relaxed); let vtable = me.vtable; let mut cancel_on_drop = CancelReadOnDrop:: { handle: Some(handle), vtable, }; Box::pin(async move { let mut buffer = iter::repeat_with(MaybeUninit::uninit) .take(ceiling(64 * 1024, mem::size_of::().max(1))) .collect::>(); let result = if let Some(count) = (vtable.read)(handle, &mut buffer).await { buffer.truncate(count); Some(unsafe { mem::transmute::>, Vec>(buffer) }) } else { None }; cancel_on_drop.handle = None; drop(cancel_on_drop); result }) as Pin>> } Handle::LocalOpen => { let (tx, rx) = oneshot::channel(); entry.insert(Handle::LocalWaiting(tx)); let mut cancel_on_drop = CancelReadOnDrop:: { handle: Some(me.handle.load(Relaxed)), vtable: me.vtable, }; Box::pin(async move { let result = rx.map(|v| v.ok().map(|v| *v.downcast().unwrap())).await; cancel_on_drop.handle = None; drop(cancel_on_drop); result }) } Handle::LocalClosed => Box::pin(future::ready(None)), Handle::LocalReady(..) => { let Handle::LocalReady(v, waker) = entry.insert(Handle::LocalOpen) else { unreachable!() }; waker.wake(); Box::pin(future::ready(Some(*v.downcast().unwrap()))) } }, }, )); } match me.future.as_mut().unwrap().as_mut().poll(cx) { Poll::Ready(v) => { me.future = None; Poll::Ready(v) } Poll::Pending => Poll::Pending, } } } impl Drop for StreamReader { fn drop(&mut self) { self.future = None; match self.handle.load(Relaxed) { u32::MAX => {} handle => { super::with_entry(handle, |entry| match entry { Entry::Vacant(_) => unreachable!(), Entry::Occupied(mut entry) => match entry.get_mut() { Handle::LocalReady(..) => { let Handle::LocalReady(_, waker) = entry.insert(Handle::LocalClosed) else { unreachable!() }; waker.wake(); } Handle::LocalOpen | Handle::LocalWaiting(_) => { entry.insert(Handle::LocalClosed); } Handle::Read | Handle::LocalClosed => { entry.remove(); (self.vtable.close_readable)(handle); } Handle::Write => unreachable!(), }, }); } } } }