Back in part 5, we added the ability to attach arbitrary finalizers
to pipes. But when those finalizers actually ran was purely mechanical:
when any given pipe finished, it would run all upstream finalizers, and then
its own. This behavior can sometimes delay the finalization of an upstream
pipe, if the downstream pipe stops awaiting but continues running
and possibly yielding.
This time, we'll add the `close` primitive,
which will allow the programmer to indicate that a pipe
will never `await` again. This should possibly be named
`unsafeClose`, because in this implementation,
we will not use the type system to enforce this guarantee.
> {-# LANGUAGE TypeOperators #-}
> {-# OPTIONS_GHC -Wall #-}
>
> module PipeClose where
>
> import Control.Monad.Trans.Free (FreeT(..), FreeF(..), liftF, wrap)
> import Fun ((:&:)(..), (:|:)(..))
>
> import Data.Void (Void, absurd)
> import Control.Monad (when, forever)
> import Control.Monad.Trans.Class (lift)
> import Control.Monad.Trans.Resource (MonadResource, allocate, release)
Functors
--------------------------------------------------
We'll not add a new functor this time;
we'll just reuse `Then` to indicate
the "rest" of the computation after a pipe closes its input end.
> newtype Then next = Then next -- Identity
> newtype Yield o next = Yield o -- Const
> newtype Await i next = Await (i -> next) -- Fun
> data Abort next = Abort -- Empty
> newtype Finalize m next = Finalize (m ()) -- Const
> newtype Leftover l next = Leftover l -- Const
> instance Functor Then where
> fmap f (Then next) = Then (f next)
>
> instance Functor (Yield o) where
> fmap _f (Yield o) = Yield o
>
> instance Functor (Await i) where
> fmap f (Await g) = Await (f . g)
>
> instance Functor Abort where
> fmap _f Abort = Abort
>
> instance Functor (Finalize m) where
> fmap _f (Finalize m) = Finalize m
>
> instance Functor (Leftover l) where
> fmap _f (Leftover l) = Leftover l
>
> pass :: Monad m => m ()
> pass = return ()
>
> unreachable :: Monad m => m ()
> unreachable = error "You've reached the unreachable finalizer"
The Pipe type
--------------------------------------------------
> type LeftoverThen l = Leftover l :&: Then
> type YieldThen o m = Yield o :&: Finalize m :&: Then
> type AwaitU i u = Await i :&: Await u :&: Then
> type Close = Then
Our `PipeF` type has certainly grown! Remember when it used to be
just `Await i :|: YieldThen o`? At least we're not adding yet another
type parameter this time.
> type PipeF l i o u m = YieldThen o m
> :|: AwaitU i u
> :|: Abort
> :|: LeftoverThen l
> :|: Close
> type Pipe l i o u m r = FreeT (PipeF l i o u m) m r
>
> type Producer o m r = Pipe Void () o () m r
> type Consumer l i u m r = Pipe l i Void u m r
> type Pipeline m r = Pipe Void () Void () m r
Working with PipeF
--------------------------------------------------
We update the lifts, the smart constructors, and `pipeCase` as usual.
> liftYield :: YieldThen o m next -> PipeF l i o u m next
> liftYield = L . L . L . L
>
> liftAwait :: AwaitU i u next -> PipeF l i o u m next
> liftAwait = L . L . L . R
>
> liftAbort :: Abort next -> PipeF l i o u m next
> liftAbort = L . L . R
>
> liftLeftover :: LeftoverThen l next -> PipeF l i o u m next
> liftLeftover = L . R
>
> liftClose :: Close next -> PipeF l i o u m next
> liftClose = R
> yieldF :: o -> m () -> next -> PipeF l i o u m next
> yieldF o m next = liftYield $ Yield o :&: Finalize m :&: Then next
>
> awaitF :: (i -> next) -> (u -> next) -> next -> PipeF l i o u m next
> awaitF f g next = liftAwait $ Await f :&: Await g :&: Then next
>
> abortF :: PipeF l i o u m next
> abortF = liftAbort Abort
>
> leftoverF :: l -> next -> PipeF l i o u m next
> leftoverF l next = liftLeftover $ Leftover l :&: Then next
>
> closeF :: next -> PipeF l i o u m next
> closeF next = liftClose $ Then next
> pipeCase :: FreeF (PipeF l i o u m) r next
> -> a -- Abort
> -> (r -> a) -- Return
> -> (next -> a) -- Close
> -> (l -> next -> a) -- Leftover
> -> (o -> m () -> next -> a) -- Yield
> -> ((i -> next) -> (u -> next) -> next -> a) -- Await
> -> a
> pipeCase (Wrap (L (L (R Abort))))
> k _ _ _ _ _ = k
> pipeCase (Return r)
> _ k _ _ _ _ = k r
> pipeCase (Wrap (R (Then next)))
> _ _ k _ _ _ = k next
> pipeCase (Wrap (L (R (Leftover l :&: Then next))))
> _ _ _ k _ _ = k l next
> pipeCase (Wrap (L (L (L (L (Yield o :&: Finalize m :&: Then next))))))
> _ _ _ _ k _ = k o m next
> pipeCase (Wrap (L (L (L (R (Await f :&: Await g :&: Then next))))))
> _ _ _ _ _ k = k f g next
Pipe primitives
--------------------------------------------------
We add a new primitive, as usual.
> tryAwait :: Monad m => Pipe l i o u m (Either (Maybe u) i)
> tryAwait = liftF $ awaitF Right (Left . Just) (Left Nothing)
>
> yield :: Monad m => o -> Pipe l i o u m ()
> yield b = liftF $ yieldF b pass ()
>
> abort :: Monad m => Pipe l i o u m r
> abort = liftF abortF
>
> leftover :: Monad m => l -> Pipe l i o u m ()
> leftover l = liftF $ leftoverF l ()
>
> close :: Monad m => Pipe l i o u m ()
> close = liftF $ closeF ()
Pipe composition
--------------------------------------------------
> (<+<) :: Monad m => Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 <+< p2 = composeWithFinalizer pass p1 p2
> ( Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> p1 composeWithFinalizer :: Monad m => m ()
> -> Pipe Void i' o u' m r -> Pipe l i i' u m u' -> Pipe l i o u m r
> composeWithFinalizer finalizeUpstream p1 p2 = FreeT $ do
> x1 <- runFreeT p1
> let p1' = FreeT $ return x1
> runFreeT $ pipeCase x1
> {- Abort -} ( lift finalizeUpstream >> abort)
> {- Return -} (\r -> lift finalizeUpstream >> return r)
> {- Close -} (\next -> lift finalizeUpstream >> (wrap $ closeF (next <+< abort)))
The very reason that we made the Close option was so that
the upstream pipe could be finalized early.
Once we do that, what do we compose with `next`?
We could compose it with `p2`, but that would be *very* unsafe,
since `p2`'s finalizers have been run. Imagine if `p2` were reading
from a file, then we close the file, then ask `p2` to keep reading!
So instead, we compose with abort. Recall that earlier we asserted that
right-composing a Producer with `abort` was the same as the identity function:
$\forall p \in Producer, p \circ abort \equiv p$
If it is indeed true that the downstream pipe will never `await` again,
then we can rest assured that `next <+< abort` will behave as we desire.
> {- L-over -} (\l _next -> absurd l)
> {- Yield -} (\o finalizeDownstream next ->
> let (<*<) = composeWithFinalizer finalizeUpstream
> in wrap $ yieldF o
> (finalizeUpstream >> finalizeDownstream)
> (next <*< p2))
> {- Await -} (\f1 g1 onAbort1 -> FreeT $ do
> x2 <- runFreeT p2
> runFreeT $ pipeCase x2
> {- Abort -} ( onAbort1 <+< abort) -- downstream recovers
> {- Return -} (\u' -> g1 u' <+< abort) -- downstream recovers
> {- Close -} (\next -> wrap $ closeF (p1' {- L-over -} (\l next -> wrap $ leftoverF l (p1' {- Yield -} (\o newFinalizer next ->
> let (<*<) = composeWithFinalizer newFinalizer
> in f1 o <*< next)
> {- Await -} (\f2 g2 onAbort2 -> wrap $ awaitF
> (\i -> p1' (\u -> p1' ( p1' (>+>) :: Monad m => Pipe l i i' u m u' -> Pipe Void i' o u' m r -> Pipe l i o u m r
> (>+>) = flip (<+<)
>
> infixr 9 <+<
> infixr 9 >+>
Running a pipeline
--------------------------------------------------
At the level of a pipeline, the `close` operation
is meaningless, since it shouldn't be awaiting anyways.
When we `runPipe` on a `Close`, therefore, we will simply move on
to the next computation.
> runPipe :: Monad m => Pipeline m r -> m (Maybe r)
> runPipe p = do
> e <- runFreeT p
> pipeCase e
> {- Abort -} ( return Nothing)
> {- Return -} (\r -> return $ Just r)
> {- Close -} (\next -> runPipe next)
> {- L-over -} (\l _next -> absurd l)
> {- Yield -} (\o _fin _next -> absurd o)
> {- Await -} (\f _g _onAbort -> runPipe $ f ())
Getting rid of leftovers
-------------------------------------------------
The adjustment to `injectLeftovers` is interesting:
once we close the input end, what should we do with leftovers?
Discard them, since we promised not to look at them? Or keep them,
since it doesn't hurt the upstream pipe if we look at the stuff
that we already acquired from it.
> injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
> injectLeftovers = go [] where
> go ls p = FreeT $ do
> x <- runFreeT p
> runFreeT $ pipeCase x
> {- Abort -} (abort)
> {- Return -} (\r -> return r)
> {- Close -} (\next -> wrap $ closeF (go [] next))
In the name of garbage collection, I say dump them.
This is reflected by the recursive call ignoring `ls` and instead
passing in an empty list.
> {- L-over -} (\l next -> go (l:ls) next)
> {- Yield -} (\o fin next -> wrap $ yieldF o fin (go ls next))
> {- Await -} (\f g onAbort -> case ls of
> [] -> wrap $ awaitF (go [] . f) (go [] . g) (go [] onAbort)
> l : ls' -> go ls' (f l))
Adding finalizers to a pipe
-------------------------------------------------
`Close` is always an intermediate step of a pipe
(even if the next step is merely `return ()`),
so when revisiting `cleanupP`, we need only make sure
that the cleanup procedures are passed on to the next computation.
> cleanupP :: Monad m => m () -> m () -> m () -> Pipe l i o u m r
> -> Pipe l i o u m r
> cleanupP abortFinalize selfAbortFinalize returnFinalize = go where
> go p = FreeT $ do
> x <- runFreeT p
> runFreeT $ pipeCase x
> {- Abort -} ( lift selfAbortFinalize >> abort)
> {- Return -} (\r -> lift returnFinalize >> return r)
> {- Close -} (\next -> wrap $ closeF (go next))
> {- L-over -} (\l next -> wrap $ leftoverF l (go next))
> {- Yield -} (\o finalizeRest next -> wrap $
> yieldF o (finalizeRest >> abortFinalize) (go next))
> {- Await -} (\f g onAbort -> wrap $
> awaitF (go . f) (go . g) (go onAbort))
Playing with our new primitive
-------------------------------------------------
Here's a simple example using (essentially) printf debugging
to illustrate the code execution path. See if you can guess
the output of `runPipe $ exampleConsumer <+< exampleProducer`
before looking at it.
> exampleProducer :: Producer Int IO ()
> exampleProducer = finallyP (putStrLn "End producer") $ do
> lift $ putStrLn "Begin producer"
> lift $ putStrLn "Producer yielding"
> yield 1
> lift $ putStrLn "Producer done yielding"
> pass
> exampleConsumer :: Consumer Void Int () IO ()
> exampleConsumer = finallyP (putStrLn "End consumer") $ do
> lift $ putStrLn "Begin consumer"
> lift $ putStrLn "Consumer awaiting"
> _ <- await
> lift $ putStrLn "Consumer done awaiting"
> close
> lift $ putStrLn "Consumer continues"
> pass
[ghci]
runPipe $ exampleConsumer <+< exampleProducer
Begin consumer
Consumer awaiting
Begin producer
Producer yielding
Consumer done awaiting
End producer
Consumer continues
End consumer
Just ()
As you can see, `close` caused the producer's finalizer to be run immediately.
What would the output look like if the consumer didn't `close`?
Safety
-------------------------------------------------
Our `close` primitive gives programmers a convenient way
to indicate that a pipe's upstream should be finalized,
but it is completely up to the programmer to make sure
that `close` is used in a safe way, that is, that it is not
followed by an `await`. We gave pipes the behavior of
`abort`ing in such circumstances, which is a decent choice,
but can we do better?
Control.Frame from the `pipes` package provides a
different (though similar) implementation of pipes that uses
indexed monads to solve this problem. If you `close` a frame,
then it is a *type error* to `await` afterwards.
This has obvious type-safety benefits, but at the cost of
using a relatively new concept that has little syntactic sugar
support; see
[Control.Pipe.Tutorial](http://hackage.haskell.org/packages/archive/pipes/latest/doc/html/Control-Frame-Tutorial.html)
for details on how this technique works.
Next time
-------------------------------------------------
We've come a long ways from the simplicity of Control.Pipe.
Next time, I'll take away `abort` and `close`, and what we have left
will be fairly similar to the current state of Data.Conduit.
I'll guide you through some of the `conduit` source code and
observe which choices were made, and attempt to explain why.
Convenience combinators
-------------------------------------------------
> finallyP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> finallyP finalize = cleanupP finalize finalize finalize
>
> catchP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> catchP finalize = cleanupP finalize finalize pass
>
> successP :: Monad m => m () -> Pipe l i o u m r -> Pipe l i o u m r
> successP finalize = cleanupP pass pass finalize
> bracketP :: MonadResource m => IO a -> (a -> IO ()) -> (a -> Pipe l i o u m r)
> -> Pipe l i o u m r
> bracketP create destroy mkPipe = do
> (key, val) <- lift $ allocate create destroy
> finallyP (release key) (mkPipe val)
Some basic pipes
-------------------------------------------------
> idMsg :: String -> Pipe l i i u IO u
> idMsg str = finallyP (putStrLn str) idP
>
> testPipeR :: Monad m => Pipe Void i o u m r -> m (Maybe r)
> testPipeR p = runPipe $ (await >> abort) <+< p <+< abort
>
> testPipeL :: Monad m => Pipe Void Int o () m r -> m (Maybe r)
> testPipeL p = runPipe $ (await >> await >> abort) <+< take' 1 <+< p <+< fromList [1 ..]
>
> testPipe :: Monad m => Pipe Void Int o () m r -> m (Maybe (r, [o]))
> testPipe p = runPipe $ runP <+< p <+< fromList [1..]
>
> take' :: Monad m => Int -> Pipe l i i u m ()
> take' 0 = pass
> take' n = (await >>= yield) >> take' (pred n)
> fromList :: Monad m => [o] -> Producer o m ()
> fromList = mapM_ yield
> awaitE :: Monad m => Pipe l i o u m (Either u i)
> awaitE = tryAwait >>= \emx -> case emx of
> Left Nothing -> abort
> Left (Just u) -> return $ Left u
> Right i -> return $ Right i
>
> awaitForever :: Monad m => (i -> Pipe l i o u m r) -> Pipe l i o u m u
> awaitForever f = go where
> go = awaitE >>= \ex -> case ex of
> Left u -> return u
> Right i -> f i >> go
>
> pipe :: Monad m => (i -> o) -> Pipe l i o u m u
> pipe f = awaitForever $ yield . f
>
> idP :: Monad m => Pipe l i i u m u
> idP = pipe id
>
> filterP :: Monad m => (i -> Bool) -> Pipe l i i u m u
> filterP test = awaitForever $ \x -> when (test x) (yield x)
>
> printer :: Show i => Consumer l i u IO u
> printer = awaitForever $ lift . print
> runP :: Monad m => Consumer l i u m (u, [i])
> runP = awaitE >>= \ex -> case ex of
> Left u -> return (u, [])
> Right i -> runP >>= \ ~(u, is) -> return (u, i:is)
>
> evalP :: Monad m => Consumer l i u m u
> evalP = fst `fmap` runP
>
> execP :: Monad m => Consumer l i u m [i]
> execP = snd `fmap` runP
>
> fold :: Monad m => (r -> i -> r) -> r -> Consumer l i u m r
> fold f = go where
> go r = awaitE >>= \ex -> case ex of
> Left _u -> return r
> Right i -> go $! f r i
> await :: Monad m => Pipe l i o u m i
> await = awaitE >>= \ex -> case ex of
> Left _u -> abort
> Right i -> return i
>
> oldPipe :: Monad m => (i -> o) -> Pipe l i o u m r
> oldPipe f = forever $ await >>= yield . f
>
> oldIdP :: Monad m => Pipe l i i u m r
> oldIdP = oldPipe id
>
> oldFilterP :: Monad m => (i -> Bool) -> Pipe l i i u m r
> oldFilterP test = forever $ await >>= \x -> when (test x) (yield x)
>
> oldPrinter :: Show i => Consumer l i u IO r
> oldPrinter = forever $ await >>= lift . print
You can play with this code for yourself by downloading
[PipeClose.lhs](https://raw.github.com/DanBurton/Blog/master/Literate%20Haskell/Pipes%20to%20Conduits/PipeClose.lhs).