1 {-# LANGUAGE CPP #-}
    2 {-# LANGUAGE BangPatterns #-}
    3 {-# LANGUAGE DeriveDataTypeable #-}
    4 {-# LANGUAGE ForeignFunctionInterface #-}
    5 {-# LANGUAGE OverloadedStrings #-}
    6 {-# LANGUAGE RankNTypes #-}
    7 {-# LANGUAGE ScopedTypeVariables #-}
    8 {-# LANGUAGE PackageImports #-}
    9 
   10 module Snap.Internal.Http.Server.LibevBackend
   11   ( libEvEventLoop
   12   ) where
   13 
   14 #ifndef LIBEV
   15 
   16 import Control.Exception
   17 import Data.Typeable
   18 import Snap.Internal.Http.Server.Backend
   19 
   20 data LibevException = LibevException String
   21   deriving (Show, Typeable)
   22 instance Exception LibevException
   23 
   24 libEvEventLoop :: EventLoop
   25 libEvEventLoop _ _ _ _ _ = throwIO $
   26     LibevException "libev event loop is not supported"
   27 
   28 #else
   29 
   30 ---------------------------
   31 -- TODO: document module --
   32 ---------------------------
   33 
   34 ------------------------------------------------------------------------------
   35 import             Control.Concurrent hiding (yield)
   36 import             Control.Exception
   37 import             Control.Monad
   38 import             Control.Monad.Trans
   39 import             Data.ByteString (ByteString)
   40 import             Data.ByteString.Internal (c2w)
   41 import qualified   Data.ByteString as S
   42 import             Data.Maybe
   43 import             Data.IORef
   44 import             Data.Typeable
   45 import             Foreign hiding (new)
   46 import             Foreign.C.Types
   47 import             GHC.Conc (forkOnIO)
   48 import             Network.Libev
   49 import             Network.Socket
   50 import             Prelude hiding (catch)
   51 ------------------------------------------------------------------------------
   52 
   53 -- FIXME: should be HashSet, make that later.
   54 import qualified   Data.Concurrent.HashMap as H
   55 import             Data.Concurrent.HashMap (HashMap)
   56 import             Snap.Iteratee hiding (map)
   57 import             Snap.Internal.Debug
   58 import             Snap.Internal.Http.Server.Date
   59 import             Snap.Internal.Http.Server.Backend
   60 import qualified   Snap.Internal.Http.Server.ListenHelpers as Listen
   61 
   62 #if defined(HAS_SENDFILE)
   63 import qualified   System.SendFile as SF
   64 import             System.Posix.IO
   65 import             System.Posix.Types (Fd(..))
   66 #endif
   67 
   68 
   69 ------------------------------------------------------------------------------
   70 data Backend = Backend
   71     { _acceptSockets     :: [ListenSocket]
   72     , _evLoop            :: !EvLoopPtr
   73     , _acceptIOCallbacks :: ![MVar (FunPtr IoCallback)]
   74     , _acceptIOObjs      :: ![EvIoPtr]
   75     , _mutexCallbacks    :: !(FunPtr MutexCallback, FunPtr MutexCallback)
   76     , _loopLock          :: !(MVar ())
   77     , _asyncCb           :: !(FunPtr AsyncCallback)
   78     , _asyncObj          :: !EvAsyncPtr
   79     , _killCb            :: !(FunPtr AsyncCallback)
   80     , _killObj           :: !EvAsyncPtr
   81     , _connectionThreads :: !(HashMap ThreadId Connection)
   82     , _backendCPU        :: !Int
   83     , _loopExit          :: !(MVar ())
   84     }
   85 
   86 
   87 ------------------------------------------------------------------------------
   88 data Connection = Connection
   89     { _backend             :: !Backend
   90     , _listenSocket        :: !ListenSocket
   91     , _rawSocket           :: !CInt
   92     , _sessionInfo         :: !SessionInfo
   93     , _readAvailable       :: !(MVar ())
   94     , _writeAvailable      :: !(MVar ())
   95     , _timerObj            :: !EvTimerPtr
   96     , _timerCallback       :: !(FunPtr TimerCallback)
   97     , _timerTimeoutTime    :: !(IORef CTime)
   98     , _readActive          :: !(IORef Bool)
   99     , _writeActive         :: !(IORef Bool)
  100     , _connReadIOObj       :: !EvIoPtr
  101     , _connReadIOCallback  :: !(FunPtr IoCallback)
  102     , _connWriteIOObj      :: !EvIoPtr
  103     , _connWriteIOCallback :: !(FunPtr IoCallback)
  104     , _connThread          :: !(ThreadId)
  105     }
  106 
  107 
  108 ------------------------------------------------------------------------------
  109 libEvEventLoop :: EventLoop
  110 libEvEventLoop defaultTimeout sockets cap elog handler = do
  111     backends <- Prelude.mapM (newLoop defaultTimeout sockets handler elog)
  112                              [0..(cap-1)]
  113 
  114     debug "libevEventLoop: waiting for loop exit"
  115     ignoreException (Prelude.mapM_ (takeMVar . _loopExit) backends)
  116     debug "libevEventLoop: stopping all backends"
  117     ignoreException $ mapM stop backends
  118     ignoreException $ mapM Listen.closeSocket sockets
  119 
  120 
  121 ------------------------------------------------------------------------------
  122 newLoop :: Int                   -- ^ default timeout
  123         -> [ListenSocket]        -- ^ value you got from bindIt
  124         -> SessionHandler        -- ^ handler
  125         -> (ByteString -> IO ()) -- ^ error logger
  126         -> Int                   -- ^ cpu
  127         -> IO Backend
  128 newLoop defaultTimeout sockets handler elog cpu = do
  129     -- We'll try kqueue on OSX even though the libev docs complain that it's
  130     -- "broken", in the hope that it works as expected for sockets
  131     f  <- evRecommendedBackends
  132     lp <- evLoopNew $ toEnum . fromEnum $ f .|. evbackend_kqueue
  133 
  134 
  135     -- we'll be working multithreaded so we need to set up locking for the C
  136     -- event loop struct
  137     (mc1,mc2,looplock) <- setupLockingForLoop lp
  138 
  139     -- setup async callbacks -- these allow us to wake up the main loop
  140     -- (normally blocked in c-land) from other threads
  141     asyncObj <- mkEvAsync
  142     asyncCB  <- mkAsyncCallback $ \_ _ _ -> do
  143                             debug "async wakeup"
  144                             return ()
  145 
  146     killObj <- mkEvAsync
  147     killCB  <- mkAsyncCallback $ \_ _ _ -> do
  148                             debug "async kill wakeup"
  149                             evUnloop lp evunloop_all
  150                             return ()
  151 
  152     evAsyncInit asyncObj asyncCB
  153     evAsyncStart lp asyncObj
  154     evAsyncInit killObj killCB
  155     evAsyncStart lp killObj
  156 
  157     -- create the ios for the accept callbacks
  158     accMVars <- forM sockets $ \_ -> newEmptyMVar
  159     accIOs <- forM sockets $ \_ -> mkEvIo
  160 
  161     -- thread set stuff
  162     connSet <- H.new (H.hashString . show)
  163 
  164     -- freed gets stuffed with () when all resources are released.
  165     freed <- newEmptyMVar
  166 
  167     let b = Backend sockets
  168                     lp
  169                     accMVars
  170                     accIOs
  171                     (mc1,mc2)
  172                     looplock
  173                     asyncCB
  174                     asyncObj
  175                     killCB
  176                     killObj
  177                     connSet
  178                     cpu
  179                     freed
  180 
  181     -- setup the accept callback; this watches for read readiness on the
  182     -- listen port
  183     forM_ (zip3 sockets accIOs accMVars) $ \(sock, accIO, x) -> do
  184         accCB <- mkIoCallback $ acceptCallback defaultTimeout b handler elog
  185                                                cpu sock
  186         evIoInit accIO accCB (fdSocket $ Listen.listenSocket sock) ev_read
  187         evIoStart lp accIO
  188         putMVar x accCB
  189 
  190     forkOnIO cpu $ loopThread b
  191 
  192     debug $ "LibEv.newLoop: loop spawned"
  193     return b
  194 
  195 
  196 ------------------------------------------------------------------------------
  197 -- | Run evLoop in a thread
  198 loopThread :: Backend -> IO ()
  199 loopThread backend = do
  200     debug $ "starting loop"
  201     (ignoreException go) `finally` cleanup
  202     debug $ "loop finished"
  203   where
  204     cleanup = block $ do
  205         debug $ "loopThread: cleaning up"
  206         ignoreException $ freeBackend backend
  207         putMVar (_loopExit backend) ()
  208 
  209     lock    = _loopLock backend
  210     loop    = _evLoop backend
  211     go      = takeMVar lock >> block (evLoop loop 0)
  212 
  213 
  214 ------------------------------------------------------------------------------
  215 acceptCallback :: Int
  216                -> Backend
  217                -> SessionHandler
  218                -> (ByteString -> IO ())
  219                -> Int
  220                -> ListenSocket
  221                -> IoCallback
  222 acceptCallback defaultTimeout back handler
  223                elog cpu sock _loopPtr _ioPtr _ = do
  224     debug "inside acceptCallback"
  225     r <- c_accept $ fdSocket $ Listen.listenSocket sock
  226 
  227     case r of
  228       -- this (EWOULDBLOCK) shouldn't happen (we just got told it was ready!),
  229       -- if it does (maybe the request got picked up by another thread) we'll
  230       -- just bail out
  231       -2 -> return ()
  232       -1 -> debugErrno "Libev.acceptCallback:c_accept()"
  233       fd -> do
  234           debug $ "acceptCallback: accept()ed fd, writing to chan " ++ show fd
  235           forkOnIO cpu $ (go r `catches` cleanup)
  236           return ()
  237   where
  238     go = runSession defaultTimeout back handler sock
  239     cleanup = [ Handler $ \(_ :: TimeoutException) -> return ()
  240               , Handler $ \(e :: SomeException) ->
  241                   elog $ S.concat [ "libev.acceptCallback: "
  242                                   , S.pack . map c2w $ show e ]
  243               ]
  244 
  245 
  246 ------------------------------------------------------------------------------
  247 ioReadCallback :: CInt -> IORef Bool -> MVar () -> IoCallback
  248 ioReadCallback fd active ra _loopPtr _ioPtr _ = do
  249     -- send notifications to the worker thread
  250     debug $ "ioReadCallback: notification (" ++ show fd ++ ")"
  251     tryPutMVar ra ()
  252     debug $ "stopping ioReadCallback (" ++ show fd ++ ")"
  253     evIoStop _loopPtr _ioPtr
  254     writeIORef active False
  255 
  256 
  257 ------------------------------------------------------------------------------
  258 ioWriteCallback :: CInt -> IORef Bool -> MVar () -> IoCallback
  259 ioWriteCallback fd active wa _loopPtr _ioPtr _ = do
  260     -- send notifications to the worker thread
  261     debug $ "ioWriteCallback: notification (" ++ show fd ++ ")"
  262     tryPutMVar wa ()
  263     debug $ "stopping ioWriteCallback (" ++ show fd ++ ")"
  264     evIoStop _loopPtr _ioPtr
  265     writeIORef active False
  266 
  267 
  268 ------------------------------------------------------------------------------
  269 stop :: Backend -> IO ()
  270 stop b = ignoreException $ do
  271     debug $ "Libev.stop"
  272 
  273     -- 1. take the loop lock
  274     -- 2. shut down the accept() callback
  275     -- 3. call evUnloop and wake up the loop using evAsyncSend
  276     -- 4. release the loop lock, the main loop thread should then free/clean
  277     --    everything up (threads, connections, io objects, callbacks, etc)
  278 
  279     withMVar lock $ \_ -> do
  280         forM acceptObjs $ evIoStop loop
  281         evUnloop loop evunloop_all
  282         evAsyncSend loop killObj
  283 
  284   where
  285     loop           = _evLoop b
  286     acceptObjs     = _acceptIOObjs b
  287     killObj        = _killObj b
  288     lock           = _loopLock b
  289 
  290 
  291 ------------------------------------------------------------------------------
  292 getAddr :: SockAddr -> IO (ByteString, Int)
  293 getAddr addr =
  294     case addr of
  295       SockAddrInet p ha -> do
  296           s <- liftM (S.pack . map c2w) (inet_ntoa ha)
  297           return (s, fromIntegral p)
  298 
  299       a -> throwIO $ AddressNotSupportedException (show a)
  300 
  301 
  302 ------------------------------------------------------------------------------
  303 -- | Throws a timeout exception to the handling thread.  The thread will clean
  304 -- up everything.
  305 timerCallback :: EvLoopPtr         -- ^ loop obj
  306               -> EvTimerPtr        -- ^ timer obj
  307               -> IORef CTime       -- ^ when to timeout?
  308               -> ThreadId          -- ^ thread to kill
  309               -> TimerCallback
  310 timerCallback loop tmr ioref tid _ _ _ = do
  311     debug "Libev.timerCallback: entered"
  312 
  313     now       <- getCurrentDateTime
  314     whenToDie <- readIORef ioref
  315 
  316     if whenToDie <= now
  317       then do
  318           debug "Libev.timerCallback: killing thread"
  319           throwTo tid TimeoutException
  320 
  321       else do
  322           debug $ "Libev.timerCallback: now=" ++ show now
  323                   ++ ", whenToDie=" ++ show whenToDie
  324           evTimerSetRepeat tmr $ fromRational . toRational $ (whenToDie - now)
  325           evTimerAgain loop tmr
  326 
  327 
  328 ------------------------------------------------------------------------------
  329 -- | If you already hold the loop lock, you are entitled to destroy a
  330 -- connection
  331 destroyConnection :: Connection -> IO ()
  332 destroyConnection conn = do
  333     debug "Libev.destroyConnection: closing socket and killing connection"
  334     c_close fd
  335 
  336     -- stop and free timer object
  337     evTimerStop loop timerObj
  338     freeEvTimer timerObj
  339     freeTimerCallback timerCb
  340 
  341     -- stop and free i/o objects
  342     evIoStop loop ioWrObj
  343     freeEvIo ioWrObj
  344     freeIoCallback ioWrCb
  345 
  346     evIoStop loop ioRdObj
  347     freeEvIo ioRdObj
  348     freeIoCallback ioRdCb
  349 
  350   where
  351     backend    = _backend conn
  352     loop       = _evLoop backend
  353 
  354     fd         = _rawSocket conn
  355     ioWrObj    = _connWriteIOObj conn
  356     ioWrCb     = _connWriteIOCallback conn
  357     ioRdObj    = _connReadIOObj conn
  358     ioRdCb     = _connReadIOCallback conn
  359     timerObj   = _timerObj conn
  360     timerCb    = _timerCallback conn
  361 
  362 
  363 ------------------------------------------------------------------------------
  364 freeConnection :: Connection -> IO ()
  365 freeConnection conn = ignoreException $ do
  366     withMVar loopLock $ \_ -> block $ do
  367         debug $ "freeConnection (" ++ show (_rawSocket conn) ++ ")"
  368         destroyConnection conn
  369         let tid = _connThread conn
  370 
  371         -- remove the thread id from the backend set
  372         H.delete tid $ _connectionThreads backend
  373 
  374         -- wake up the event loop so it can be apprised of the changes
  375         evAsyncSend loop asyncObj
  376 
  377   where
  378     backend    = _backend conn
  379     loop       = _evLoop backend
  380     loopLock   = _loopLock backend
  381     asyncObj   = _asyncObj backend
  382 
  383 
  384 ------------------------------------------------------------------------------
  385 freeBackend :: Backend -> IO ()
  386 freeBackend backend = ignoreException $ block $ do
  387     -- note: we only get here after an unloop, so we have the loop lock
  388     -- here. (?)
  389 
  390     -- kill everything in thread table
  391     tset <- H.toList $ _connectionThreads backend
  392 
  393     let nthreads = Prelude.length tset
  394 
  395     debug $ "Libev.freeBackend: killing active connection threads"
  396 
  397     Prelude.mapM_ (destroyConnection . snd) tset
  398 
  399     -- kill the threads twice, they're probably getting stuck in the
  400     -- freeConnection 'finally' handler
  401     Prelude.mapM_ (killThread . fst) tset
  402     Prelude.mapM_ (killThread . fst) tset
  403 
  404     debug $ "Libev.freeBackend: " ++ show nthreads ++ " thread(s) killed"
  405     debug $ "Libev.freeBackend: destroying libev resources"
  406 
  407     mapM freeEvIo acceptObjs
  408     forM acceptCbs $ \x -> do
  409         acceptCb <- readMVar x
  410         freeIoCallback acceptCb
  411 
  412     evAsyncStop loop asyncObj
  413     freeEvAsync asyncObj
  414     freeAsyncCallback asyncCb
  415 
  416     evAsyncStop loop killObj
  417     freeEvAsync killObj
  418     freeAsyncCallback killCb
  419 
  420     freeMutexCallback mcb1
  421     freeMutexCallback mcb2
  422 
  423     evLoopDestroy loop
  424     debug $ "Libev.freeBackend: resources destroyed"
  425 
  426   where
  427     acceptObjs  = _acceptIOObjs backend
  428     acceptCbs   = _acceptIOCallbacks backend
  429     asyncObj    = _asyncObj backend
  430     asyncCb     = _asyncCb backend
  431     killObj     = _killObj backend
  432     killCb      = _killCb backend
  433     (mcb1,mcb2) = _mutexCallbacks backend
  434     loop        = _evLoop backend
  435 
  436 
  437 ------------------------------------------------------------------------------
  438 -- | Note: proc gets run in the background
  439 runSession :: Int
  440            -> Backend
  441            -> SessionHandler
  442            -> ListenSocket
  443            -> CInt
  444            -> IO ()
  445 runSession defaultTimeout backend handler lsock fd = do
  446     sock <- mkSocket fd AF_INET Stream 0 Connected
  447     peerName <- getPeerName sock
  448     sockName <- getSocketName sock
  449     tid <- myThreadId
  450 
  451     -- set_linger fd
  452     c_setnonblocking fd
  453 
  454     (raddr, rport) <- getAddr peerName
  455     (laddr, lport) <- getAddr sockName
  456 
  457     let lp = _evLoop backend
  458 
  459     -- makes sense to assume the socket is read/write available when
  460     -- opened; worst-case is we get EWOULDBLOCK
  461     ra    <- newMVar ()
  462     wa    <- newMVar ()
  463 
  464 
  465     -----------------
  466     -- setup timer --
  467     -----------------
  468     tmr         <- mkEvTimer
  469     now         <- getCurrentDateTime
  470     timeoutTime <- newIORef $ now + 20
  471     tcb         <- mkTimerCallback $ timerCallback lp
  472                                                   tmr
  473                                                   timeoutTime
  474                                                   tid
  475     -- 20 second timeout
  476     evTimerInit tmr tcb 0 20.0
  477 
  478 
  479     readActive  <- newIORef True
  480     writeActive <- newIORef True
  481 
  482     evioRead <- mkEvIo
  483     ioReadCb <- mkIoCallback $ ioReadCallback fd readActive ra
  484 
  485     evioWrite <- mkEvIo
  486     ioWriteCb <- mkIoCallback $ ioWriteCallback fd writeActive wa
  487 
  488     evIoInit evioRead ioReadCb fd ev_read
  489     evIoInit evioWrite ioWriteCb fd ev_write
  490 
  491     -- take ev_loop lock, start timer and io watchers
  492     withMVar (_loopLock backend) $ \_ -> do
  493          evTimerAgain lp tmr
  494          evIoStart lp evioRead
  495          evIoStart lp evioWrite
  496 
  497          -- wakeup the loop thread so that these new watchers get
  498          -- registered next time through the loop
  499          evAsyncSend lp $ _asyncObj backend
  500 
  501     let sinfo = SessionInfo laddr lport raddr rport $
  502                     Listen.isSecure lsock
  503     let conn = Connection backend
  504                           lsock
  505                           fd
  506                           sinfo
  507                           ra
  508                           wa
  509                           tmr
  510                           tcb
  511                           timeoutTime
  512                           readActive
  513                           writeActive
  514                           evioRead
  515                           ioReadCb
  516                           evioWrite
  517                           ioWriteCb
  518                           tid
  519 
  520     bracket (Listen.createSession lsock bLOCKSIZE fd $
  521                    waitForLock True conn)
  522             (\session -> block $ do
  523                 debug "runSession: thread killed, closing socket"
  524 
  525                 ignoreException $ Listen.endSession lsock session
  526                 ignoreException $ freeConnection conn
  527             )
  528             (\session -> do H.update tid conn (_connectionThreads backend)
  529                             handler sinfo
  530                                     (enumerate conn session)
  531                                     (writeOut defaultTimeout conn session)
  532                                     (sendFile defaultTimeout conn session)
  533                                     (tickleTimeout conn)
  534             )
  535 
  536 
  537 ------------------------------------------------------------------------------
  538 ignoreException :: IO a -> IO ()
  539 ignoreException act =
  540     (act >> return ()) `catch` \(_::SomeException) -> return ()
  541 
  542 
  543 ------------------------------------------------------------------------------
  544 data AddressNotSupportedException = AddressNotSupportedException String
  545    deriving (Typeable)
  546 
  547 instance Show AddressNotSupportedException where
  548     show (AddressNotSupportedException x) = "Address not supported: " ++ x
  549 
  550 instance Exception AddressNotSupportedException
  551 
  552 
  553 ------------------------------------------------------------------------------
  554 
  555 bLOCKSIZE :: Int
  556 bLOCKSIZE = 8192
  557 
  558 --
  559 -- About timeouts
  560 --
  561 -- It's not good enough to restart the timer from io(Read|Write)Callback,
  562 -- because those seem to be edge-triggered. I've definitely had where after 20
  563 -- seconds they still weren't being re-awakened.
  564 --
  565 
  566 
  567 ------------------------------------------------------------------------------
  568 data TimeoutException = TimeoutException
  569    deriving (Typeable)
  570 
  571 instance Show TimeoutException where
  572     show _ = "timeout"
  573 
  574 instance Exception TimeoutException
  575 
  576 
  577 ------------------------------------------------------------------------------
  578 tickleTimeout :: Connection -> Int -> IO ()
  579 tickleTimeout conn tm = do
  580     debug "Libev.tickleTimeout"
  581     now       <- getCurrentDateTime
  582     writeIORef (_timerTimeoutTime conn) (now + toEnum tm)
  583 
  584 
  585 ------------------------------------------------------------------------------
  586 waitForLock :: Bool        -- ^ True = wait for read, False = wait for write
  587             -> Connection
  588             -> IO ()
  589 waitForLock readLock conn = do
  590     dbg "start waitForLock"
  591 
  592     withMVar looplock $ \_ -> do
  593         act <- readIORef active
  594         if act
  595           then dbg "read watcher already active, skipping"
  596           else do
  597             dbg "starting watcher, sending async"
  598             tryTakeMVar lock
  599             evIoStart lp io
  600             writeIORef active True
  601             evAsyncSend lp async
  602 
  603     dbg "waitForLock: waiting for mvar"
  604     takeMVar lock
  605     dbg "waitForLock: took mvar"
  606 
  607   where
  608     dbg s    = debug $ "Libev.recvData(" ++ show (_rawSocket conn) ++ "): "
  609                        ++ s
  610     io       = if readLock
  611                  then (_connReadIOObj conn)
  612                  else (_connWriteIOObj conn)
  613     bk       = _backend conn
  614     active   = if readLock
  615                  then (_readActive conn)
  616                  else (_writeActive conn)
  617     lp       = _evLoop bk
  618     looplock = _loopLock bk
  619     async    = _asyncObj bk
  620     lock     = if readLock
  621                  then (_readAvailable conn)
  622                  else (_writeAvailable conn)
  623 
  624 
  625 ------------------------------------------------------------------------------
  626 sendFile :: Int
  627          -> Connection
  628          -> NetworkSession
  629          -> FilePath
  630          -> Int64
  631          -> Int64
  632          -> IO ()
  633 sendFile defaultTimeout c s fp start sz = do
  634     withMVar lock $ \_ -> do
  635       act <- readIORef $ _writeActive c
  636       when act $ evIoStop loop io
  637       writeIORef (_writeActive c) False
  638       evAsyncSend loop asy
  639 
  640 #if defined(HAS_SENDFILE)
  641     case (_listenSocket c) of
  642         ListenHttp _ -> bracket (openFd fp ReadOnly Nothing defaultFileFlags)
  643                                 (closeFd)
  644                                 (go start sz)
  645         _            -> do
  646             step <- runIteratee $ writeOut defaultTimeout c s
  647             run_ $ enumFilePartial fp (start,start+sz) step
  648 #else
  649     step <- runIteratee $ writeOut defaultTimeout c s
  650 
  651     run_ $ enumFilePartial fp (start,start+sz) step
  652     return ()
  653 #endif
  654 
  655     withMVar lock $ \_ -> do
  656       tryTakeMVar $ _readAvailable c
  657       tryTakeMVar $ _writeAvailable c
  658       evAsyncSend loop asy
  659 
  660   where
  661 #if defined(HAS_SENDFILE)
  662     go off bytes fd
  663       | bytes == 0 = return ()
  664       | otherwise  = do
  665             sent <- SF.sendFile (waitForLock False c) sfd fd off bytes
  666             if sent < bytes
  667               then tickleTimeout c defaultTimeout >>
  668                    go (off+sent) (bytes-sent) fd
  669               else return ()
  670 
  671     sfd  = Fd $ _rawSocket c
  672 #endif
  673     io   = _connWriteIOObj c
  674     b    = _backend c
  675     loop = _evLoop b
  676     lock = _loopLock b
  677     asy  = _asyncObj b
  678 
  679 
  680 ------------------------------------------------------------------------------
  681 enumerate :: (MonadIO m)
  682           => Connection
  683           -> NetworkSession
  684           -> Enumerator ByteString m a
  685 enumerate conn session = loop
  686   where
  687     dbg s = debug $ "Libev.enumerate(" ++ show (_socket session)
  688                     ++ "): " ++ s
  689 
  690     loop (Continue k) = do
  691         m <- liftIO $ recvData
  692         let s = fromMaybe "" m
  693         sendOne k s
  694     loop x = returnI x
  695 
  696     sendOne k s | S.null s  = do
  697         dbg "sending EOF to continuation"
  698         enumEOF $ Continue k
  699 
  700                 | otherwise = do
  701         dbg $ "sending " ++ show s ++ " to continuation"
  702         step <- lift $ runIteratee $ k $ Chunks [s]
  703         case step of
  704           (Yield x st)   -> do
  705                       dbg $ "got yield, remainder is " ++ show st
  706                       yield x st
  707           r@(Continue _) -> do
  708                       dbg $ "got continue"
  709                       loop r
  710           (Error e)      -> throwError e
  711 
  712     recvData = Listen.recv (_listenSocket conn)
  713                            (waitForLock True conn) session
  714 
  715 
  716 ------------------------------------------------------------------------------
  717 writeOut :: (MonadIO m)
  718          => Int
  719          -> Connection
  720          -> NetworkSession
  721          -> Iteratee ByteString m ()
  722 writeOut defaultTimeout conn session = loop
  723   where
  724     loop = continue k
  725 
  726     k EOF = yield () EOF
  727     k (Chunks xs) = do
  728         liftIO $ sendData $ S.concat xs
  729         loop
  730 
  731     sendData = Listen.send (_listenSocket conn)
  732                            (tickleTimeout conn defaultTimeout)
  733                            (waitForLock False conn)
  734                            session
  735 
  736 #endif