use super::Notify; use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::AtomicBool; use std::error::Error; use std::fmt; use std::future::{poll_fn, Future}; use std::mem::MaybeUninit; use std::ops::Drop; use std::ptr; use std::sync::atomic::Ordering; use std::task::Poll; // This file contains an implementation of an SetOnce. The value of SetOnce // can only be modified once during initialization. // // 1. When `value_set` is false, the `value` is not initialized and wait() // future will keep on waiting. // 2. When `value_set` is true, the wait() future completes, get() will return // Some(&T) // // The value cannot be changed after set() is called. Subsequent calls to set() // will return a `SetOnceError`. /// A thread-safe cell that can be written to only once. /// /// A `SetOnce` is inspired from python's [`asyncio.Event`] type. It can be /// used to wait until the value of the `SetOnce` is set like a "Event" mechanism. /// /// # Example /// /// ``` /// use tokio::sync::{SetOnce, SetOnceError}; /// /// static ONCE: SetOnce = SetOnce::const_new(); /// /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), SetOnceError> { /// /// // set the value inside a task somewhere... /// tokio::spawn(async move { ONCE.set(20) }); /// /// // checking with .get doesn't block main thread /// println!("{:?}", ONCE.get()); /// /// // wait until the value is set, blocks the thread /// println!("{:?}", ONCE.wait().await); /// /// Ok(()) /// # } /// ``` /// /// A `SetOnce` is typically used for global variables that need to be /// initialized once on first use, but need no further changes. The `SetOnce` /// in Tokio allows the initialization procedure to be asynchronous. /// /// # Example /// /// ``` /// use tokio::sync::{SetOnce, SetOnceError}; /// use std::sync::Arc; /// /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), SetOnceError> { /// let once = SetOnce::new(); /// /// let arc = Arc::new(once); /// let first_cl = Arc::clone(&arc); /// let second_cl = Arc::clone(&arc); /// /// // set the value inside a task /// tokio::spawn(async move { first_cl.set(20) }).await.unwrap()?; /// /// // wait inside task to not block the main thread /// tokio::spawn(async move { /// // wait inside async context for the value to be set /// assert_eq!(*second_cl.wait().await, 20); /// }).await.unwrap(); /// /// // subsequent set calls will fail /// assert!(arc.set(30).is_err()); /// /// println!("{:?}", arc.get()); /// /// Ok(()) /// # } /// ``` /// /// [`asyncio.Event`]: https://docs.python.org/3/library/asyncio-sync.html#asyncio.Event pub struct SetOnce { value_set: AtomicBool, value: UnsafeCell>, notify: Notify, } impl Default for SetOnce { fn default() -> SetOnce { SetOnce::new() } } impl fmt::Debug for SetOnce { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("SetOnce") .field("value", &self.get()) .finish() } } impl Clone for SetOnce { fn clone(&self) -> SetOnce { SetOnce::new_with(self.get().cloned()) } } impl PartialEq for SetOnce { fn eq(&self, other: &SetOnce) -> bool { self.get() == other.get() } } impl Eq for SetOnce {} impl Drop for SetOnce { fn drop(&mut self) { // TODO: Use get_mut() if self.value_set.load(Ordering::Relaxed) { // SAFETY: If the value_set is true, then the value is initialized // then there is a value to be dropped and this is safe unsafe { self.value.with_mut(|ptr| ptr::drop_in_place(ptr as *mut T)) } } } } impl From for SetOnce { fn from(value: T) -> Self { SetOnce { value_set: AtomicBool::new(true), value: UnsafeCell::new(MaybeUninit::new(value)), notify: Notify::new(), } } } impl SetOnce { /// Creates a new empty `SetOnce` instance. pub fn new() -> Self { Self { value_set: AtomicBool::new(false), value: UnsafeCell::new(MaybeUninit::uninit()), notify: Notify::new(), } } /// Creates a new empty `SetOnce` instance. /// /// Equivalent to `SetOnce::new`, except that it can be used in static /// variables. /// /// When using the `tracing` [unstable feature], a `SetOnce` created with /// `const_new` will not be instrumented. As such, it will not be visible /// in [`tokio-console`]. Instead, [`SetOnce::new`] should be used to /// create an instrumented object if that is needed. /// /// # Example /// /// ``` /// use tokio::sync::{SetOnce, SetOnceError}; /// /// static ONCE: SetOnce = SetOnce::const_new(); /// /// fn get_global_integer() -> Result, SetOnceError> { /// ONCE.set(2)?; /// Ok(ONCE.get()) /// } /// /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() -> Result<(), SetOnceError> { /// let result = get_global_integer()?; /// /// assert_eq!(result, Some(&2)); /// Ok(()) /// # } /// ``` /// /// [`tokio-console`]: https://github.com/tokio-rs/console /// [unstable feature]: crate#unstable-features #[cfg(not(all(loom, test)))] pub const fn const_new() -> Self { Self { value_set: AtomicBool::new(false), value: UnsafeCell::new(MaybeUninit::uninit()), notify: Notify::const_new(), } } /// Creates a new `SetOnce` that contains the provided value, if any. /// /// If the `Option` is `None`, this is equivalent to `SetOnce::new`. /// /// [`SetOnce::new`]: crate::sync::SetOnce::new pub fn new_with(value: Option) -> Self { if let Some(v) = value { SetOnce::from(v) } else { SetOnce::new() } } /// Creates a new `SetOnce` that contains the provided value. /// /// # Example /// /// When using the `tracing` [unstable feature], a `SetOnce` created with /// `const_new_with` will not be instrumented. As such, it will not be /// visible in [`tokio-console`]. Instead, [`SetOnce::new_with`] should be /// used to create an instrumented object if that is needed. /// /// ``` /// use tokio::sync::SetOnce; /// /// static ONCE: SetOnce = SetOnce::const_new_with(1); /// /// fn get_global_integer() -> Option<&'static u32> { /// ONCE.get() /// } /// /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() { /// let result = get_global_integer(); /// /// assert_eq!(result, Some(&1)); /// # } /// ``` /// /// [`tokio-console`]: https://github.com/tokio-rs/console /// [unstable feature]: crate#unstable-features #[cfg(not(all(loom, test)))] pub const fn const_new_with(value: T) -> Self { Self { value_set: AtomicBool::new(true), value: UnsafeCell::new(MaybeUninit::new(value)), notify: Notify::const_new(), } } /// Returns `true` if the `SetOnce` currently contains a value, and `false` /// otherwise. pub fn initialized(&self) -> bool { // Using acquire ordering so we're able to read/catch any writes that // are done with `Ordering::Release` self.value_set.load(Ordering::Acquire) } // SAFETY: The SetOnce must not be empty. unsafe fn get_unchecked(&self) -> &T { unsafe { &*self.value.with(|ptr| (*ptr).as_ptr()) } } /// Returns a reference to the value currently stored in the `SetOnce`, or /// `None` if the `SetOnce` is empty. pub fn get(&self) -> Option<&T> { if self.initialized() { // SAFETY: the SetOnce is initialized, so we can safely // call get_unchecked and return the value Some(unsafe { self.get_unchecked() }) } else { None } } /// Sets the value of the `SetOnce` to the given value if the `SetOnce` is /// empty. /// /// If the `SetOnce` already has a value, this call will fail with an /// [`SetOnceError`]. /// /// [`SetOnceError`]: crate::sync::SetOnceError pub fn set(&self, value: T) -> Result<(), SetOnceError> { if self.initialized() { return Err(SetOnceError(value)); } // SAFETY: lock notify to ensure only one caller of set // can run at a time. let guard = self.notify.lock_waiter_list(); if self.initialized() { return Err(SetOnceError(value)); } // SAFETY: We have locked the mutex and checked if the value is // initialized or not, so we can safely write to the value unsafe { self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); } // Using release ordering so any threads that read a true from this // atomic is able to read the value we just stored. self.value_set.store(true, Ordering::Release); // notify the waiting wakers that the value is set guard.notify_waiters(); Ok(()) } /// Takes the value from the cell, destroying the cell in the process. /// Returns `None` if the cell is empty. pub fn into_inner(self) -> Option { // TODO: Use get_mut() let value_set = self.value_set.load(Ordering::Relaxed); if value_set { // Since we have taken ownership of self, its drop implementation // will be called by the end of this function, to prevent a double // free we will set the value_set to false so that the drop // implementation does not try to drop the value again. self.value_set.store(false, Ordering::Relaxed); // SAFETY: The SetOnce is currently initialized, we can assume the // value is initialized and return that, when we return the value // we give the drop handler to the return scope. Some(unsafe { self.value.with_mut(|ptr| ptr::read(ptr).assume_init()) }) } else { None } } /// Waits until the value is set. /// /// If the `SetOnce` is already initialized, it will return the value /// immediately. /// /// # Cancel safety /// /// This method is cancel safe. pub async fn wait(&self) -> &T { loop { if let Some(val) = self.get() { return val; } let notify_fut = self.notify.notified(); pin!(notify_fut); poll_fn(|cx| { // Register under the notify's internal lock. let ret = notify_fut.as_mut().poll(cx); if self.value_set.load(Ordering::Relaxed) { return Poll::Ready(()); } ret }) .await; } } } // Since `get` gives us access to immutable references of the SetOnce, SetOnce // can only be Sync if T is Sync, otherwise SetOnce would allow sharing // references of !Sync values across threads. We need T to be Send in order for // SetOnce to by Sync because we can use `set` on `&SetOnce` to send values // (of type T) across threads. unsafe impl Sync for SetOnce {} // Access to SetOnce's value is guarded by the Atomic boolean flag // and atomic operations on `value_set`, so as long as T itself is Send // it's safe to send it to another thread unsafe impl Send for SetOnce {} /// Error that can be returned from [`SetOnce::set`]. /// /// This error means that the `SetOnce` was already initialized when /// set was called /// /// [`SetOnce::set`]: crate::sync::SetOnce::set #[derive(Debug, PartialEq, Eq)] pub struct SetOnceError(pub T); impl fmt::Display for SetOnceError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "SetOnceError") } } impl Error for SetOnceError {}