1 {-# LANGUAGE BangPatterns #-}
    2 {-# LANGUAGE CPP #-}
    3 {-# LANGUAGE DeriveDataTypeable #-}
    4 {-# LANGUAGE OverloadedStrings #-}
    5 {-# LANGUAGE PackageImports #-}
    6 {-# LANGUAGE ScopedTypeVariables #-}
    7 {-# LANGUAGE TypeSynonymInstances #-}
    8 {-# OPTIONS_GHC -fno-warn-orphans #-}
    9 
   10 ------------------------------------------------------------------------------
   11 -- | Snap Framework type aliases and utilities for iteratees. Note that as a
   12 -- convenience, this module also exports everything from @Data.Enumerator@ in
   13 -- the @enumerator@ library.
   14 
   15 module Snap.Iteratee
   16   (
   17     -- * Enumerators
   18     enumBS
   19   , enumLBS
   20   , enumBuilder
   21   , enumFile
   22   , enumFilePartial
   23   , InvalidRangeException
   24 
   25 
   26     -- * Iteratee utilities
   27   , joinI'
   28   , countBytes
   29   , drop'
   30   , mkIterateeBuffer
   31   , unsafeBufferIterateeWithBuffer
   32   , unsafeBufferIteratee
   33   , take
   34   , drop
   35   , takeExactly
   36   , takeNoMoreThan
   37   , skipToEof
   38   , mapEnum
   39   , mapIter
   40   , killIfTooSlow
   41 
   42   , TooManyBytesReadException
   43   , ShortWriteException
   44   , RateTooSlowException
   45 
   46     -- * Re-export types and functions from @Data.Enumerator@
   47   , Stream (..)
   48   , Step (..)
   49   , Iteratee (..)
   50   , Enumerator
   51   , Enumeratee
   52 
   53     -- ** Primitives
   54     -- *** Combinators
   55     -- | These are common patterns which occur whenever iteratees are
   56     -- being defined.
   57   , returnI
   58   , yield
   59   , continue
   60   , throwError
   61   , catchError
   62   , liftI
   63   , (>>==)
   64   , (==<<)
   65   , ($$)
   66   , (>==>)
   67   , (<==<)
   68 
   69     -- *** Iteratees
   70   , run
   71   , run_
   72   , consume
   73   , Data.Enumerator.isEOF
   74   , liftTrans
   75   , liftFoldL
   76   , liftFoldL'
   77   , liftFoldM
   78   , printChunks
   79   , head
   80   , peek
   81 
   82     -- *** Enumerators
   83   , enumEOF
   84   , enumList
   85   , concatEnums
   86     -- *** Enumeratees
   87   , checkDone
   88   , Data.Enumerator.List.map
   89   , Data.Enumerator.sequence
   90   , joinI
   91 
   92 
   93 {-
   94     -- ** Iteratee utilities
   95   , drop'
   96 
   97 -}
   98   ) where
   99 
  100 ------------------------------------------------------------------------------
  101 
  102 import           Blaze.ByteString.Builder
  103 import           Control.DeepSeq
  104 import           Control.Exception (SomeException, assert)
  105 import           Control.Monad
  106 import           Control.Monad.CatchIO
  107 import           Control.Monad.Trans (MonadIO, lift, liftIO)
  108 import           Data.ByteString (ByteString)
  109 import qualified Data.ByteString.Char8 as S
  110 import qualified Data.ByteString.Unsafe as S
  111 import qualified Data.ByteString.Lazy.Char8 as L
  112 import           Data.Enumerator hiding (consume, drop, head)
  113 import qualified Data.Enumerator as I
  114 import           Data.Enumerator.Binary (enumHandle)
  115 import           Data.Enumerator.List hiding (take, drop)
  116 import qualified Data.Enumerator.List as IL
  117 import qualified Data.List as List
  118 import           Data.Monoid (mappend)
  119 import           Data.Time.Clock.POSIX (getPOSIXTime)
  120 import           Data.Typeable
  121 import           Foreign hiding (peek)
  122 import           Foreign.C.Types
  123 import           GHC.ForeignPtr
  124 import           Prelude hiding (catch, drop, head, take)
  125 import           System.IO
  126 
  127 #ifndef PORTABLE
  128 import           System.IO.Posix.MMap
  129 import           System.PosixCompat.Files
  130 import           System.PosixCompat.Types
  131 #endif
  132 
  133 ------------------------------------------------------------------------------
  134 instance (Functor m, MonadCatchIO m) =>
  135          MonadCatchIO (Iteratee s m) where
  136     --catch  :: Exception  e => m a -> (e -> m a) -> m a
  137     catch m handler = insideCatch (m `catchError` h)
  138       where
  139         insideCatch !mm = Iteratee $ do
  140             ee <- try $ runIteratee mm
  141             case ee of
  142                 (Left e)  -> runIteratee $ handler e
  143                 (Right v) -> step v
  144 
  145         step (Continue !k)  = do
  146             return $ Continue (\s -> insideCatch $ k s)
  147         -- don't worry about Error here because the error had to come from the
  148         -- handler (because of 'catchError' above)
  149         step y             = return y
  150 
  151         -- we can only catch iteratee errors if "e" matches "SomeException"
  152         h e = maybe (throwError e)
  153                     (handler)
  154                     (fromException e)
  155 
  156     --block :: m a -> m a
  157     block m = Iteratee $ block $ (runIteratee m >>= step)
  158       where
  159         step (Continue k) = return $ Continue (\s -> block (k s))
  160         step y            = return y
  161 
  162     unblock m = Iteratee $ unblock $ (runIteratee m >>= step)
  163       where
  164         step (Continue k) = return $ Continue (\s -> unblock (k s))
  165         step y            = return y
  166 
  167 
  168 ------------------------------------------------------------------------------
  169 -- | Get the length of a bytestring Stream
  170 streamLength :: Stream ByteString -> Int
  171 streamLength (Chunks xs) = List.foldl' (\c s -> c + S.length s) 0 xs
  172 streamLength EOF         = 0
  173 
  174 
  175 ------------------------------------------------------------------------------
  176 -- | Enumerates a Builder.
  177 enumBuilder :: (Monad m) => Builder -> Enumerator Builder m a
  178 enumBuilder = enumList 1 . (:[])
  179 {-# INLINE enumBuilder #-}
  180 
  181 
  182 ------------------------------------------------------------------------------
  183 -- | Enumerates a strict bytestring.
  184 enumBS :: (Monad m) => ByteString -> Enumerator ByteString m a
  185 enumBS = enumList 1 . (:[])
  186 {-# INLINE enumBS #-}
  187 
  188 
  189 ------------------------------------------------------------------------------
  190 -- | Enumerates a lazy bytestring.
  191 
  192 enumLBS :: (Monad m) => L.ByteString -> Enumerator ByteString m a
  193 enumLBS bs = enumList 1 (L.toChunks bs)
  194 {-# INLINE enumLBS #-}
  195 
  196 
  197 ------------------------------------------------------------------------------
  198 skipToEof :: (Monad m) => Iteratee a m ()
  199 skipToEof = continue k
  200   where
  201     k EOF = return ()
  202     k _   = skipToEof
  203 
  204 
  205 ------------------------------------------------------------------------------
  206 -- | Wraps an 'Iteratee', counting the number of bytes consumed by it.
  207 countBytes :: (Monad m) => forall a .
  208               Iteratee ByteString m a
  209            -> Iteratee ByteString m (a, Int64)
  210 countBytes i = Iteratee $ do
  211     step <- runIteratee i
  212     case step of
  213       (Continue k) -> return (Continue $ go 0 k)
  214       (Yield x s)  -> return $ Yield (x,0) s
  215       (Error e)    -> return $ Error e
  216 
  217   where
  218     go !n k str = Iteratee $ do
  219         let len = toEnum $ streamLength str
  220         step <- runIteratee (k str)
  221         case step of
  222           (Continue k') -> return (Continue $ go (n + len) k')
  223           (Yield x s)   -> let len' = n + len - (toEnum $ streamLength s)
  224                            in return (Yield (x, len') s)
  225           (Error e)     -> return (Error e)
  226 
  227 
  228 ------------------------------------------------------------------------------
  229 bUFSIZ :: Int
  230 bUFSIZ = 8192
  231 
  232 
  233 ------------------------------------------------------------------------------
  234 -- | Creates a buffer to be passed into 'unsafeBufferIterateeWithBuffer'.
  235 mkIterateeBuffer :: IO (ForeignPtr CChar)
  236 mkIterateeBuffer = mallocPlainForeignPtrBytes bUFSIZ
  237 
  238 
  239 ------------------------------------------------------------------------------
  240 -- | Buffers an iteratee, \"unsafely\". Here we use a fixed binary buffer
  241 -- which we'll re-use, meaning that if you hold on to any of the bytestring
  242 -- data passed into your iteratee (instead of, let's say, shoving it right out
  243 -- a socket) it'll get changed out from underneath you, breaking referential
  244 -- transparency. Use with caution!
  245 unsafeBufferIteratee :: Iteratee ByteString IO a
  246                      -> IO (Iteratee ByteString IO a)
  247 unsafeBufferIteratee step = do
  248     buf <- mkIterateeBuffer
  249     return $ unsafeBufferIterateeWithBuffer buf step
  250 
  251 
  252 ------------------------------------------------------------------------------
  253 -- | Buffers an iteratee, \"unsafely\". Here we use a fixed binary buffer
  254 -- which we'll re-use, meaning that if you hold on to any of the bytestring
  255 -- data passed into your iteratee (instead of, let's say, shoving it right out
  256 -- a socket) it'll get changed out from underneath you, breaking referential
  257 -- transparency. Use with caution!
  258 --
  259 -- This version accepts a buffer created by 'mkIterateeBuffer'.
  260 --
  261 unsafeBufferIterateeWithBuffer :: ForeignPtr CChar
  262                                -> Iteratee ByteString IO a
  263                                -> Iteratee ByteString IO a
  264 unsafeBufferIterateeWithBuffer buf iter = Iteratee $ do
  265     step <- runIteratee iter
  266     start step
  267 
  268   where
  269     --------------------------------------------------------------------------
  270     start :: Step ByteString IO a -> IO (Step ByteString IO a)
  271     start (Continue k) = return $ Continue $ go 0 k
  272     start s@_          = return s
  273 
  274 
  275     --------------------------------------------------------------------------
  276     sendBuf :: Int
  277             -> (Stream ByteString -> Iteratee ByteString IO a)
  278             -> IO (Step ByteString IO a)
  279     sendBuf n k =
  280       {-# SCC "unsafeBufferIteratee/sendBuf" #-} do
  281         assert (n > 0)       (return ())
  282         assert (n <= bUFSIZ) (return ())
  283 
  284         withForeignPtr buf $ \ptr -> do
  285             !s <- S.unsafePackCStringLen (ptr, n)
  286             runIteratee $ k $ Chunks [s]
  287 
  288 
  289     --------------------------------------------------------------------------
  290     copy EOF         = EOF
  291     copy (Chunks xs) = zs `deepseq` Chunks ys
  292       where
  293         !ys  = Prelude.map S.copy xs
  294         !zs  = Prelude.map (`seq` ()) ys
  295 
  296     --------------------------------------------------------------------------
  297     go :: Int
  298        -> (Stream ByteString -> Iteratee ByteString IO a)
  299        -> (Stream ByteString -> Iteratee ByteString IO a)
  300     go !n !k EOF = Iteratee $ do
  301         if n == 0
  302           then runIteratee $ k EOF
  303           else do
  304               assert (n > 0)       (return ())
  305               assert (n <= bUFSIZ) (return ())
  306 
  307               step  <- sendBuf n k
  308               step2 <- runIteratee $ enumEOF step
  309               return $ copyStep step2
  310 
  311 
  312     go !n !k (Chunks xs) = Iteratee $ do
  313         assert (n >= 0)      (return ())
  314         assert (n <= bUFSIZ) (return ())
  315 
  316         let s = S.concat xs
  317         let m = S.length s
  318         if m+n >= bUFSIZ
  319           then overflow    n k s m
  320           else copyAndCont n k s m
  321 
  322 
  323     --------------------------------------------------------------------------
  324     copyStep (Yield x r) = let !z = copy r in Yield x z
  325     copyStep x           = x
  326 
  327 
  328     --------------------------------------------------------------------------
  329     copyAndCont :: Int
  330                 -> (Stream ByteString -> Iteratee ByteString IO a)
  331                 -> ByteString
  332                 -> Int
  333                 -> IO (Step ByteString IO a)
  334     copyAndCont !n k !s !m =
  335       {-# SCC "unsafeBufferIteratee/copyAndCont" #-} do
  336         assert (n >= 0) (return ())
  337         assert (n+m < bUFSIZ) (return ())
  338         S.unsafeUseAsCStringLen s $ \(p,sz) -> do
  339           assert (m == sz) (return ())
  340           withForeignPtr buf $ \bufp -> do
  341             let b' = plusPtr bufp n
  342             copyBytes b' p sz
  343 
  344         return $ Continue $ go (n+m) k
  345 
  346 
  347 
  348     --------------------------------------------------------------------------
  349     overflow :: Int
  350              -> (Stream ByteString -> Iteratee ByteString IO a)
  351              -> ByteString
  352              -> Int
  353              -> IO (Step ByteString IO a)
  354     overflow !n k !s !m =
  355       {-# SCC "unsafeBufferIteratee/overflow" #-} do
  356         assert (n+m >= bUFSIZ) (return ())
  357         assert (n < bUFSIZ)    (return ())
  358 
  359         let rest    = bUFSIZ - n
  360         let m2      = m - rest
  361 
  362         let (s1,s2) = S.splitAt rest s
  363 
  364         S.unsafeUseAsCStringLen s1 $ \(p,_) ->
  365           withForeignPtr buf $ \bufp -> do
  366             let b' = plusPtr bufp n
  367             copyBytes b' p rest
  368 
  369             iv <- sendBuf bUFSIZ k
  370             case iv of
  371               (Yield x r)   -> let !z = copy r
  372                                in return $ Yield x $ (z `mappend` Chunks [s2])
  373               (Error e)     -> return $ Error e
  374               (Continue k') -> do
  375                   -- check the size of the remainder; if it's bigger than the
  376                   -- buffer size then just send it
  377                   if m2 >= bUFSIZ
  378                     then do
  379                         step <- runIteratee $ k' $ Chunks [s2]
  380                         case step of
  381                           (Yield x r)    -> let !z = copy r
  382                                             in return $! Yield x z
  383                           (Error e)      -> return $ Error e
  384                           (Continue k'') -> return $ Continue $ go 0 k''
  385 
  386                     else copyAndCont 0 k' s2 m2
  387 
  388 
  389 ------------------------------------------------------------------------------
  390 -- | Skip n elements of the stream, if there are that many
  391 drop :: (Monad m) => Int -> Iteratee ByteString m ()
  392 drop k = drop' (toEnum k)
  393 
  394 ------------------------------------------------------------------------------
  395 -- | Skip n elements of the stream, if there are that many
  396 drop' :: (Monad m) => Int64 -> Iteratee ByteString m ()
  397 drop' 0  = return ()
  398 drop' !n = continue k
  399 
  400   where
  401     k EOF         = return ()
  402     k (Chunks xs) = chunks n xs
  403 
  404     chunks !m []     = drop' m
  405     chunks !m (x:xs) = do
  406         let strlen = toEnum $ S.length x
  407         if strlen <= m
  408           then chunks (m-strlen) xs
  409           else yield () $ Chunks ((S.drop (fromEnum m) x):xs)
  410 
  411 
  412 ------------------------------------------------------------------------------
  413 data ShortWriteException = ShortWriteException deriving (Typeable)
  414 data RateTooSlowException = RateTooSlowException deriving (Typeable)
  415 data TooManyBytesReadException = TooManyBytesReadException deriving (Typeable)
  416 
  417 instance Show ShortWriteException where
  418     show ShortWriteException = "Short write"
  419 
  420 instance Show RateTooSlowException where
  421     show RateTooSlowException = "Input rate too slow"
  422 
  423 instance Show TooManyBytesReadException where
  424     show TooManyBytesReadException = "Too many bytes read"
  425 
  426 instance Exception ShortWriteException
  427 instance Exception RateTooSlowException
  428 instance Exception TooManyBytesReadException
  429 
  430 
  431 ------------------------------------------------------------------------------
  432 take :: (Monad m) => Int -> Enumeratee ByteString ByteString m a
  433 take k = take' (toEnum k)
  434 
  435 
  436 ------------------------------------------------------------------------------
  437 take' :: (Monad m) => Int64 -> Enumeratee ByteString ByteString m a
  438 take' _ y@(Yield _ _   ) = return y
  439 take' _   (Error e     ) = throwError e
  440 take' !n st@(Continue k) = do
  441     if n == 0
  442       then lift $ runIteratee $ k EOF
  443       else do
  444         mbX <- head
  445         maybe (lift $ runIteratee $ k EOF)
  446               check
  447               mbX
  448 
  449   where
  450     check x | S.null x    = take' n st
  451             | strlen <= n = do
  452                   newStep <- lift $ runIteratee $ k $ Chunks [x]
  453                   take' (n-strlen) newStep
  454             | otherwise = do
  455                   step1 <- lift $ runIteratee $ k $ Chunks [s1]
  456                   step2 <- lift $ runIteratee $ enumEOF step1
  457 
  458                   case step2 of
  459                     (Yield v _)    -> yield (Yield v EOF) (Chunks [s2])
  460                     (Error e)      -> throwError e
  461                     (Continue _)   -> error "divergent iteratee"
  462       where
  463         strlen  = toEnum $ S.length x
  464         (s1,s2) = S.splitAt (fromEnum n) x
  465 
  466 
  467 ------------------------------------------------------------------------------
  468 -- | Reads n bytes from a stream and applies the given iteratee to the stream
  469 -- of the read elements. Reads exactly n bytes, and if the stream is short
  470 -- propagates an error.
  471 takeExactly :: (Monad m)
  472             => Int64
  473             -> Enumeratee ByteString ByteString m a
  474 takeExactly !n  y@(Yield _ _ ) = drop' n >> return y
  475 takeExactly _     (Error e   ) = throwError e
  476 takeExactly !n st@(Continue !k) = do
  477     if n == 0
  478       then lift $ runIteratee $ k EOF
  479       else do
  480         mbX <- head
  481         maybe (throwError ShortWriteException)
  482               check
  483               mbX
  484 
  485   where
  486     check !x | S.null x   = takeExactly n st
  487              | strlen < n = do
  488                    newStep <- lift $ runIteratee $ k $ Chunks [x]
  489                    takeExactly (n-strlen) newStep
  490              | otherwise = do
  491                    let (s1,s2) = S.splitAt (fromEnum n) x
  492                    !step1 <- lift $ runIteratee $ k $ Chunks [s1]
  493                    !step2 <- lift $ runIteratee $ enumEOF step1
  494 
  495                    case step2 of
  496                      (Continue _) -> error "divergent iteratee"
  497                      (Error e)    -> throwError e
  498                      (Yield v _)  -> yield (Yield v EOF) (Chunks [s2])
  499 
  500       where
  501         !strlen  = toEnum $ S.length x
  502 
  503 
  504 ------------------------------------------------------------------------------
  505 takeNoMoreThan :: (Monad m) =>
  506                   Int64 -> Enumeratee ByteString ByteString m a
  507 takeNoMoreThan _   y@(Yield _ _)  = return y
  508 takeNoMoreThan _     (Error e  )  = throwError e
  509 takeNoMoreThan !n st@(Continue k) = do
  510     mbX <- head
  511     maybe (lift $ runIteratee $ k EOF)
  512           check
  513           mbX
  514 
  515   where
  516     check x | S.null x    = takeNoMoreThan n st
  517             | strlen <= n = do
  518                   newStep <- lift $ runIteratee $ k $ Chunks [x]
  519                   takeNoMoreThan (n-strlen) newStep
  520             | otherwise = do
  521                   step1 <- lift $ runIteratee $ k $ Chunks [s1]
  522                   case step1 of
  523                     (Yield v rest) -> yield (Yield v EOF)
  524                                             (rest `mappend` Chunks [s2])
  525                     (Error e)      -> throwError e
  526                     (Continue _)   -> throwError TooManyBytesReadException
  527       where
  528         strlen  = toEnum $ S.length x
  529         (s1,s2) = S.splitAt (fromEnum n) x
  530 
  531 
  532 ------------------------------------------------------------------------------
  533 {-# INLINE _enumFile #-}
  534 _enumFile :: FilePath
  535           -> Enumerator ByteString IO a
  536 _enumFile fp iter = do
  537     h  <- liftIO $ openBinaryFile fp ReadMode
  538     enumHandle 32678 h iter `finally` (liftIO $ hClose h)
  539 
  540 
  541 ------------------------------------------------------------------------------
  542 data InvalidRangeException = InvalidRangeException
  543    deriving (Typeable)
  544 
  545 
  546 ------------------------------------------------------------------------------
  547 instance Show InvalidRangeException where
  548     show InvalidRangeException = "Invalid range"
  549 
  550 
  551 ------------------------------------------------------------------------------
  552 instance Exception InvalidRangeException
  553 
  554 
  555 ------------------------------------------------------------------------------
  556 {-# INLINE _enumFilePartial #-}
  557 _enumFilePartial :: FilePath
  558                  -> (Int64,Int64)
  559                  -> Enumerator ByteString IO a
  560 _enumFilePartial fp (start,end) iter = do
  561     let len = end - start
  562 
  563     bracket (liftIO $ openBinaryFile fp ReadMode)
  564             (liftIO . hClose)
  565             (\h -> do
  566                  unless (start == 0) $ liftIO $
  567                         hSeek h AbsoluteSeek $ toInteger start
  568                  step <- lift $ runIteratee $ joinI $ takeExactly len iter
  569                  enumHandle 32678 h step)
  570 
  571 
  572 ------------------------------------------------------------------------------
  573 enumFile :: FilePath -> Enumerator ByteString IO a
  574 enumFilePartial :: FilePath
  575                 -> (Int64,Int64)
  576                 -> Enumerator ByteString IO a
  577 
  578 
  579 #ifdef PORTABLE
  580 
  581 enumFile = _enumFile
  582 enumFilePartial fp rng@(start,end) iter = do
  583     when (end < start) $ throwError InvalidRangeException
  584     _enumFilePartial fp rng iter
  585 
  586 #else
  587 
  588 -- 40MB limit
  589 maxMMapFileSize :: FileOffset
  590 maxMMapFileSize = 41943040
  591 
  592 
  593 ------------------------------------------------------------------------------
  594 tooBigForMMap :: FilePath -> IO Bool
  595 tooBigForMMap fp = do
  596     stat <- getFileStatus fp
  597     return $ fileSize stat > maxMMapFileSize
  598 
  599 
  600 ------------------------------------------------------------------------------
  601 enumFile _  (Error e)    = throwError e
  602 enumFile _  (Yield x _)  = yield x EOF
  603 enumFile fp st@(Continue k) = do
  604     -- for small files we'll use mmap to save ourselves a copy, otherwise
  605     -- we'll stream it
  606     tooBig <- lift $ tooBigForMMap fp
  607 
  608     if tooBig
  609       then _enumFile fp st
  610       else do
  611         es <- try $ lift $ unsafeMMapFile fp
  612         case es of
  613           (Left (e :: SomeException)) -> throwError e
  614           (Right s) -> k $ Chunks [s]
  615 
  616 
  617 ------------------------------------------------------------------------------
  618 enumFilePartial _ _ (Error e)   = throwError e
  619 enumFilePartial _ _ (Yield x _) = yield x EOF
  620 enumFilePartial fp rng@(start,end) st@(Continue k) = do
  621     when (end < start) $ throwError InvalidRangeException
  622 
  623     let len = end - start
  624 
  625     tooBig <- lift $ tooBigForMMap fp
  626 
  627     if tooBig
  628       then _enumFilePartial fp rng st
  629       else do
  630         es <- try $ lift $ unsafeMMapFile fp
  631 
  632         case es of
  633           (Left (e::SomeException)) -> throwError e
  634           (Right s) -> k $ Chunks [ S.take (fromEnum len) $
  635                                     S.drop (fromEnum start) s ]
  636 
  637 #endif
  638 
  639 
  640 ------------------------------------------------------------------------------
  641 mapEnum :: (Monad m) =>
  642            (aOut -> aIn)
  643         -> (aIn -> aOut)
  644         -> Enumerator aIn m a
  645         -> Enumerator aOut m a
  646 mapEnum f g enum outStep = do
  647     let z = IL.map g outStep
  648     let p = joinI z
  649     let q = enum $$ p
  650     (I.joinI . IL.map f) $$ q
  651 
  652 
  653 ------------------------------------------------------------------------------
  654 mapIter :: (Monad m) =>
  655            (aOut -> aIn)
  656         -> (aIn -> aOut)
  657         -> Iteratee aIn m a
  658         -> Iteratee aOut m a
  659 mapIter f g iter = do
  660     step <- lift $ runIteratee iter
  661     mapStep step
  662   where
  663     -- mapStep :: Step aIn m a -> Iteratee aOut m a
  664     mapStep (Continue k)   = continue $ wrapK k
  665     mapStep (Yield x rest) = yield x (fmap g rest)
  666     mapStep (Error e)      = throwError e
  667 
  668     -- wrapK :: (Stream aIn  -> Iteratee aIn m a)
  669     --       -> (Stream aOut -> Iteratee aOut m a)
  670     wrapK k streamOut = mapIter f g iterIn
  671       where
  672         streamIn = fmap f streamOut
  673         iterIn   = k streamIn
  674 
  675 
  676 ------------------------------------------------------------------------------
  677 joinI' :: Monad m => Iteratee a m (Step a m b)
  678        -> Iteratee a m b
  679 joinI' outer = outer >>= check where
  680     check (Continue k) = k EOF >>== \s -> case s of
  681         Continue _ -> error "joinI: divergent iteratee"
  682         _ -> check s
  683     check (Yield x r) = yield x r
  684     check (Error e) = throwError e
  685 
  686 
  687 ------------------------------------------------------------------------------
  688 killIfTooSlow :: (MonadIO m) =>
  689                  m ()                     -- ^ action to bump timeout
  690               -> Double                   -- ^ minimum data rate, in bytes per
  691                                           -- second
  692               -> Int                      -- ^ minimum amount of time to let
  693                                           -- the iteratee run for
  694               -> Iteratee ByteString m a  -- ^ iteratee consumer to wrap
  695               -> Iteratee ByteString m a
  696 killIfTooSlow !bump !minRate !minSeconds' !inputIter = do
  697     !_ <- lift bump
  698     startTime <- liftIO getTime
  699     step <- lift $ runIteratee inputIter
  700     wrap startTime (0::Int64) step
  701 
  702   where
  703     minSeconds = fromIntegral minSeconds'
  704 
  705     wrap !startTime = proc
  706       where
  707         proc !nb (Continue !k) = continue $ cont nb k
  708         proc _ !z              = returnI z
  709 
  710         cont _ !k EOF = k EOF
  711         cont !nBytesRead !k !stream = do
  712             let !slen = toEnum $ streamLength stream
  713             now <- liftIO getTime
  714             let !delta = now - startTime
  715             let !newBytes = nBytesRead + slen
  716             when (delta > minSeconds+1 &&
  717                   fromIntegral newBytes / (delta-minSeconds) < minRate) $
  718               throwError RateTooSlowException
  719 
  720             -- otherwise bump the timeout and continue running the iteratee
  721             !_ <- lift bump
  722             lift (runIteratee $! k stream) >>= proc newBytes
  723 
  724 
  725 ------------------------------------------------------------------------------
  726 getTime :: IO Double
  727 getTime = (fromRational . toRational) `fmap` getPOSIXTime