#![doc = include_str!("../README.md")] use std::{ future::Future, sync::{Arc, Condvar, Mutex}, task::{Context, Poll, Wake, Waker}, }; #[cfg(feature = "macro")] pub use pollster_macro::{main, test}; /// An extension trait that allows blocking on a future in suffix position. pub trait FutureExt: Future { /// Block the thread until the future is ready. /// /// # Example /// /// ``` /// use pollster::FutureExt as _; /// /// let my_fut = async {}; /// /// let result = my_fut.block_on(); /// ``` fn block_on(self) -> Self::Output where Self: Sized { block_on(self) } } impl FutureExt for F {} enum SignalState { Empty, Waiting, Notified, } struct Signal { state: Mutex, cond: Condvar, } impl Signal { fn new() -> Self { Self { state: Mutex::new(SignalState::Empty), cond: Condvar::new(), } } fn wait(&self) { let mut state = self.state.lock().unwrap(); match *state { SignalState::Notified => { // Notify() was called before we got here, consume it here without waiting and return immediately. *state = SignalState::Empty; return; } // This should not be possible because our signal is created within a function and never handed out to any // other threads. If this is the case, we have a serious problem so we panic immediately to avoid anything // more problematic happening. SignalState::Waiting => { unreachable!("Multiple threads waiting on the same signal: Open a bug report!"); } SignalState::Empty => { // Nothing has happened yet, and we're the only thread waiting (as should be the case!). Set the state // accordingly and begin polling the condvar in a loop until it's no longer telling us to wait. The // loop prevents incorrect spurious wakeups. *state = SignalState::Waiting; while let SignalState::Waiting = *state { state = self.cond.wait(state).unwrap(); } } } } fn notify(&self) { let mut state = self.state.lock().unwrap(); match *state { // The signal was already notified, no need to do anything because the thread will be waking up anyway SignalState::Notified => {} // The signal wasnt notified but a thread isnt waiting on it, so we can avoid doing unnecessary work by // skipping the condvar and leaving behind a message telling the thread that a notification has already // occurred should it come along in the future. SignalState::Empty => *state = SignalState::Notified, // The signal wasnt notified and there's a waiting thread. Reset the signal so it can be wait()'ed on again // and wake up the thread. Because there should only be a single thread waiting, `notify_all` would also be // valid. SignalState::Waiting => { *state = SignalState::Empty; self.cond.notify_one(); } } } } impl Wake for Signal { fn wake(self: Arc) { self.notify(); } } /// Block the thread until the future is ready. /// /// # Example /// /// ``` /// let my_fut = async {}; /// let result = pollster::block_on(my_fut); /// ``` pub fn block_on(mut fut: F) -> F::Output { // Pin the future so that it can be polled. // SAFETY: We shadow `fut` so that it cannot be used again. The future is now pinned to the stack and will not be // moved until the end of this scope. This is, incidentally, exactly what the `pin_mut!` macro from `pin_utils` // does. let mut fut = unsafe { std::pin::Pin::new_unchecked(&mut fut) }; // Signal used to wake up the thread for polling as the future moves to completion. We need to use an `Arc` // because, although the lifetime of `fut` is limited to this function, the underlying IO abstraction might keep // the signal alive for far longer. `Arc` is a thread-safe way to allow this to happen. // TODO: Investigate ways to reuse this `Arc`... perhaps via a `static`? let signal = Arc::new(Signal::new()); // Create a context that will be passed to the future. let waker = Waker::from(Arc::clone(&signal)); let mut context = Context::from_waker(&waker); // Poll the future to completion loop { match fut.as_mut().poll(&mut context) { Poll::Pending => signal.wait(), Poll::Ready(item) => break item, } } }