// // AsyncStream+Pipe.swift // // // Created by Thibault Wittemberg on 24/09/2022. // public extension AsyncStream { /// Factory function that creates an AsyncStream and returns a tuple standing for its inputs and outputs. /// It easy the usage of an AsyncStream in a imperative code context. /// - Parameter bufferingPolicy: A `Continuation.BufferingPolicy` value to /// set the stream's buffering behavior. By default, the stream buffers an /// unlimited number of elements. You can also set the policy to buffer a /// specified number of oldest or newest elements. /// - Returns: the tuple (input, output). The input can be yielded with values, the output can be iterated over static func pipe( bufferingPolicy: AsyncStream.Continuation.BufferingPolicy = .unbounded ) -> (AsyncStream.Continuation, AsyncStream) { var continuation: AsyncStream.Continuation! let stream = AsyncStream(bufferingPolicy: bufferingPolicy) { continuation = $0 } return (continuation, stream) } } public extension AsyncThrowingStream { /// Factory function that creates an AsyncthrowingStream and returns a tuple standing for its inputs and outputs. /// It easy the usage of an AsyncthrowingStream in a imperative code context. /// - Parameter bufferingPolicy: A `Continuation.BufferingPolicy` value to /// set the stream's buffering behavior. By default, the stream buffers an /// unlimited number of elements. You can also set the policy to buffer a /// specified number of oldest or newest elements. /// - Returns: the tuple (input, output). The input can be yielded with values/errors, the output can be iterated over static func pipe( bufferingPolicy: AsyncThrowingStream.Continuation.BufferingPolicy = .unbounded ) -> (AsyncThrowingStream.Continuation, AsyncThrowingStream) { var continuation: AsyncThrowingStream.Continuation! let stream = AsyncThrowingStream(bufferingPolicy: bufferingPolicy) { continuation = $0 } return (continuation, stream) } }