diff --git a/Data/Pool.hs b/Data/Pool.hs index 6764e8b..e04be54 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -1,9 +1,5 @@ {-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-} -#if MIN_VERSION_monad_control(0,3,0) -{-# LANGUAGE FlexibleContexts #-} -#endif - #if !MIN_VERSION_base(4,3,0) {-# LANGUAGE RankNTypes #-} #endif @@ -31,6 +27,8 @@ module Data.Pool ( Pool(idleTime, maxResources, numStripes) , LocalPool + , Stats(..) + , PoolStats(..) , createPool , withResource , takeResource @@ -39,39 +37,23 @@ module Data.Pool , destroyResource , putResource , destroyAllResources + , stats ) where -import Control.Applicative ((<$>)) import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay) import Control.Concurrent.STM import Control.Exception (SomeException, onException, mask_) -import Control.Monad (forM_, forever, join, liftM3, unless, when) +import Control.Monad (forM_, forever, join, liftM5, unless, when) import Data.Hashable (hash) import Data.IORef (IORef, newIORef, mkWeakIORef) import Data.List (partition) +import Data.Pool.WaiterQueue (WaiterQueue, newQueueIO, push, pop) import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) import Data.Typeable (Typeable) import GHC.Conc.Sync (labelThread) import qualified Control.Exception as E import qualified Data.Vector as V - -#if MIN_VERSION_monad_control(0,3,0) -import Control.Monad.Trans.Control (MonadBaseControl, control) -import Control.Monad.Base (liftBase) -#else -import Control.Monad.IO.Control (MonadControlIO, controlIO) -import Control.Monad.IO.Class (liftIO) -#define control controlIO -#define liftBase liftIO -#endif - -#if MIN_VERSION_base(4,3,0) -import Control.Exception (mask) -#else --- Don't do any async exception protection for older GHCs. -mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b -mask f = f id -#endif +import UnliftIO (MonadUnliftIO, mask, withRunInIO) -- | A single resource pool entry. data Entry a = Entry { @@ -80,12 +62,45 @@ data Entry a = Entry { -- ^ Time of last return. } + +-- | Stats for a single 'LocalPool'. +data PoolStats = PoolStats { + highwaterUsage :: Int + -- ^ Highest usage since last reset. + , currentUsage :: Int + -- ^ Current number of items. + , takes :: Int + -- ^ Number of takes since last reset. + , creates :: Int + -- ^ Number of creates since last reset. + , createFailures :: Int + -- ^ Number of creation failures since last reset. +} deriving (Show) + +-- | Pool-wide stats. +data Stats = Stats { + perStripe :: V.Vector PoolStats + -- ^ Stats per 'LocalPool' (stripe). + , poolStats :: PoolStats + -- ^ Aggregate stats across pool. +} deriving (Show) + -- | A single striped pool. data LocalPool a = LocalPool { inUse :: TVar Int -- ^ Count of open entries (both idle and in use). , entries :: TVar [Entry a] -- ^ Idle entries. + , highwaterVar :: TVar Int + -- ^ Highest value of 'inUse' since last reset. + , takeVar :: TVar Int + -- ^ Number of takes since last reset. + , createVar :: TVar Int + -- ^ Number of creates since last reset. + , createFailureVar :: TVar Int + -- ^ Number of create failures since last reset. + , waiters :: WaiterQueue (TMVar (Maybe (Entry a))) + -- ^ threads waiting for a resource , lfin :: IORef () -- ^ empty value used to attach a finalizer to (internal) } deriving (Typeable) @@ -159,7 +174,7 @@ createPool create destroy numStripes idleTime maxResources = do when (maxResources < 1) $ modError "pool " $ "invalid maximum resource count " ++ show maxResources localPools <- V.replicateM numStripes $ - liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ()) + LocalPool <$> newTVarIO 0 <*> newTVarIO [] <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newQueueIO <*> newIORef () reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask -> unmask $ reaper destroy idleTime localPools fin <- newIORef () @@ -247,15 +262,9 @@ purgeLocalPool destroy LocalPool{..} = do -- destroy a pooled resource, as doing so will almost certainly cause -- a subsequent user (who expects the resource to be valid) to throw -- an exception. -withResource :: -#if MIN_VERSION_monad_control(0,3,0) - (MonadBaseControl IO m) -#else - (MonadControlIO m) -#endif - => Pool a -> (a -> m b) -> m b +withResource :: MonadUnliftIO m => Pool a -> (a -> m b) -> m b {-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-} -withResource pool act = control $ \runInIO -> mask $ \restore -> do +withResource pool act = withRunInIO $ \runInIO -> mask $ \restore -> do (resource, local) <- takeResource pool ret <- restore (runInIO (act resource)) `onException` destroyResource pool local resource @@ -275,16 +284,35 @@ withResource pool act = control $ \runInIO -> mask $ \restore -> do takeResource :: Pool a -> IO (a, LocalPool a) takeResource pool@Pool{..} = do local@LocalPool{..} <- getLocalPool pool - resource <- liftBase . join . atomically $ do + resource <- join . atomically $ do + modifyTVar_ takeVar (+ 1) ents <- readTVar entries case ents of (Entry{..}:es) -> writeTVar entries es >> return (return entry) [] -> do used <- readTVar inUse - when (used == maxResources) retry - writeTVar inUse $! used + 1 - return $ - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + case used == maxResources of + False -> do + writeTVar inUse $! used + 1 + modifyTVar_ highwaterVar (`max` (used + 1)) + modifyTVar_ createVar (+ 1) + return $ + create `onException` atomically (modifyTVar_ createFailureVar (+ 1) >> destroyResourceSTM local) + True -> do + var <- newEmptyTMVar + removeSelf <- push waiters var + let getResource x = case x of + Just y -> pure (entry y) + Nothing -> create `onException` atomically (destroyResourceSTM local) + let dequeue = do + maybeEntry <- atomically $ do + removeSelf + tryTakeTMVar var + atomically $ case maybeEntry of + Nothing -> pure () + Just Nothing -> destroyResourceSTM local + Just (Just v) -> putResourceSTM local v + return (getResource =<< atomically (takeTMVar var) `onException` dequeue) return (resource, local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE takeResource #-} @@ -295,14 +323,8 @@ takeResource pool@Pool{..} = do -- returns immediately with 'Nothing' (ie. the action function is /not/ called). -- Conversely, if a resource can be borrowed from the pool without blocking, the -- action is performed and it's result is returned, wrapped in a 'Just'. -tryWithResource :: forall m a b. -#if MIN_VERSION_monad_control(0,3,0) - (MonadBaseControl IO m) -#else - (MonadControlIO m) -#endif - => Pool a -> (a -> m b) -> m (Maybe b) -tryWithResource pool act = control $ \runInIO -> mask $ \restore -> do +tryWithResource :: forall m a b. MonadUnliftIO m => Pool a -> (a -> m b) -> m (Maybe b) +tryWithResource pool act = withRunInIO $ \runInIO -> mask $ \restore -> do res <- tryTakeResource pool case res of Just (resource, local) -> do @@ -321,7 +343,7 @@ tryWithResource pool act = control $ \runInIO -> mask $ \restore -> do tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a)) tryTakeResource pool@Pool{..} = do local@LocalPool{..} <- getLocalPool pool - resource <- liftBase . join . atomically $ do + resource <- join . atomically $ do ents <- readTVar entries case ents of (Entry{..}:es) -> writeTVar entries es >> return (return . Just $ entry) @@ -332,7 +354,7 @@ tryTakeResource pool@Pool{..} = do else do writeTVar inUse $! used + 1 return $ Just <$> - create `onException` atomically (modifyTVar_ inUse (subtract 1)) + create `onException` atomically (destroyResourceSTM local) return $ (flip (,) local) <$> resource #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE tryTakeResource #-} @@ -343,7 +365,7 @@ tryTakeResource pool@Pool{..} = do -- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource' getLocalPool :: Pool a -> IO (LocalPool a) getLocalPool Pool{..} = do - i <- liftBase $ ((`mod` numStripes) . hash) <$> myThreadId + i <- ((`mod` numStripes) . hash) <$> myThreadId return $ localPools V.! i #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE getLocalPool #-} @@ -352,22 +374,38 @@ getLocalPool Pool{..} = do -- | Destroy a resource. Note that this will ignore any exceptions in the -- destroy function. destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource Pool{..} LocalPool{..} resource = do +destroyResource Pool{..} local resource = do destroy resource `E.catch` \(_::SomeException) -> return () - atomically (modifyTVar_ inUse (subtract 1)) + atomically (destroyResourceSTM local) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE destroyResource #-} #endif -- | Return a resource to the given 'LocalPool'. putResource :: LocalPool a -> a -> IO () -putResource LocalPool{..} resource = do +putResource lp resource = do now <- getCurrentTime - atomically $ modifyTVar_ entries (Entry resource now:) + atomically $ putResourceSTM lp (Entry resource now) #if __GLASGOW_HASKELL__ >= 700 {-# INLINABLE putResource #-} #endif +putResourceSTM :: LocalPool a -> Entry a -> STM () +putResourceSTM LocalPool{..} resourceEntry = do + mWaiters <- pop waiters + case mWaiters of + Nothing -> modifyTVar_ entries (resourceEntry:) + Just w -> putTMVar w (Just resourceEntry) +{-# INLINE putResourceSTM #-} + +destroyResourceSTM :: LocalPool a -> STM () +destroyResourceSTM LocalPool{..} = do + mwaiter <- pop waiters + case mwaiter of + Nothing -> modifyTVar_ inUse (subtract 1) + Just w -> putTMVar w Nothing +{-# INLINE destroyResourceSTM #-} + -- | Destroy all resources in all stripes in the pool. Note that this -- will ignore any exceptions in the destroy function. -- @@ -385,6 +423,22 @@ putResource LocalPool{..} resource = do destroyAllResources :: Pool a -> IO () destroyAllResources Pool{..} = V.forM_ localPools $ purgeLocalPool destroy +-- | @stats pool reset@ returns statistics on each 'LocalPool' as well as a summary across the entire Pool. +-- When @reset@ is true, the stats are reset. +stats :: Pool a -> Bool -> IO Stats +stats Pool{..} reset = do + let stripeStats LocalPool{..} = atomically $ do + s <- liftM5 PoolStats (readTVar highwaterVar) (readTVar inUse) (readTVar takeVar) (readTVar createVar) (readTVar createFailureVar) + when reset $ do + mapM_ (\v -> writeTVar v 0) [takeVar, createVar, createFailureVar] + writeTVar highwaterVar $! currentUsage s + return s + + per <- V.mapM stripeStats localPools + let poolWide = V.foldr merge (PoolStats 0 0 0 0 0) per + merge (PoolStats hw1 cu1 t1 c1 f1) (PoolStats hw2 cu2 t2 c2 f2) = PoolStats (hw1 + hw2) (cu1 + cu2) (t1 + t2) (c1 + c2) (f1 + f2) + return $ Stats per poolWide + modifyTVar_ :: TVar a -> (a -> a) -> STM () modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a diff --git a/Data/Pool/WaiterQueue.hs b/Data/Pool/WaiterQueue.hs new file mode 100644 index 0000000..f91eda8 --- /dev/null +++ b/Data/Pool/WaiterQueue.hs @@ -0,0 +1,87 @@ +module Data.Pool.WaiterQueue + ( WaiterQueue, + newQueueIO, + push, + pop, + ) +where + +import Control.Concurrent.STM + +-- | A FIFO queue that supports removing any element from the queue. +-- +-- We have a pointer to the head of the list, and a pointer to the +-- final forward pointer in the list. +data WaiterQueue a + = WaiterQueue + (TVar (TDList a)) + (TVar (TVar (TDList a))) + +-- | Each element has a pointer to the previous element's forward +-- pointer where "previous element" can be a 'TDList' cons cell or the +-- 'WaiterQueue' head pointer. +data TDList a + = TCons + (TVar (TVar (TDList a))) + a + (TVar (TDList a)) + | TNil + +newQueueIO :: IO (WaiterQueue a) +newQueueIO = do + emptyVarL <- newTVarIO TNil + emptyVarR <- newTVarIO emptyVarL + pure (WaiterQueue emptyVarL emptyVarR) + +removeSelf :: + -- | 'WaiterQueue's final forward pointer pointer + TVar (TVar (TDList a)) -> + -- | Our back pointer + TVar (TVar (TDList a)) -> + -- | Our forward pointer + TVar (TDList a) -> + STM () +removeSelf tv prevPP nextP = do + prevP <- readTVar prevPP + -- If our back pointer points to our forward pointer then we have + -- already been removed from the queue + case prevP == nextP of + True -> pure () + False -> do + next <- readTVar nextP + writeTVar prevP next + case next of + TNil -> writeTVar tv prevP + TCons bp _ _ -> writeTVar bp prevP + writeTVar prevPP nextP +{-# INLINE removeSelf #-} + +-- | Returns an STM action that removes the pushed element from the +-- queue +push :: WaiterQueue a -> a -> STM (STM ()) +push (WaiterQueue _ tv) a = do + fwdPointer <- readTVar tv + backPointer <- newTVar fwdPointer + emptyVar <- newTVar TNil + let cell = TCons backPointer a emptyVar + writeTVar fwdPointer cell + writeTVar tv emptyVar + pure (removeSelf tv backPointer emptyVar) +{-# INLINE push #-} + +pop :: WaiterQueue a -> STM (Maybe a) +pop (WaiterQueue hv tv) = do + firstElem <- readTVar hv + case firstElem of + TNil -> pure Nothing + TCons bp a fp -> do + f <- readTVar fp + writeTVar hv f + case f of + TNil -> writeTVar tv hv + TCons fbp _ _ -> writeTVar fbp hv + -- point the back pointer to the forward pointer as a sign that + -- the cell has been popped (referenced in removeSelf) + writeTVar bp fp + pure (Just a) +{-# INLINE pop #-} diff --git a/resource-pool.cabal b/resource-pool.cabal index 6a9bc09..982855f 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -1,5 +1,5 @@ name: resource-pool -version: 0.2.3.2 +version: 0.2.4.0 synopsis: A high-performance striped resource pooling implementation description: A high-performance striped pooling abstraction for managing @@ -18,7 +18,7 @@ build-type: Simple extra-source-files: README.markdown -cabal-version: >=1.8 +cabal-version: >=1.10 flag developer description: operate in developer mode @@ -26,17 +26,19 @@ flag developer manual: True library + default-language: Haskell2010 exposed-modules: Data.Pool + other-modules: Data.Pool.WaiterQueue build-depends: base >= 4.4 && < 5, hashable, - monad-control >= 0.2.0.1, transformers, transformers-base >= 0.4, stm >= 2.3, time, + unliftio, vector >= 0.7 if flag(developer)