Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist-newstyle
71 changes: 65 additions & 6 deletions Data/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ module Data.Pool
(
Pool(idleTime, maxResources, numStripes)
, LocalPool
, Stats(..)
, PoolStats(..)
, createPool
, withResource
, takeResource
Expand All @@ -39,14 +41,14 @@ module Data.Pool
, destroyResource
, putResource
, destroyAllResources
, stats
) where

import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Data.Hashable (hash)
import Control.Monad (forM_, forever, join, unless, when)
import Data.Hashable (hashWithSalt)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime)
Expand Down Expand Up @@ -80,12 +82,47 @@ data Entry a = Entry {
-- ^ Time of last return.
}


-- | Stats for a single 'LocalPool'.
data PoolStats = PoolStats {
highwaterUsage :: Int
-- ^ Highest value of 'currentUsage' since last reset.
, currentUsage :: Int
-- ^ Current number of items (both idle and in use).
, currentIdle :: Int
-- ^ Current number of items idle.
, currentNonIdle :: Int
-- ^ Current number of items that are not idle.
, takes :: Int
-- ^ Number of takes since last reset.
, creates :: Int
-- ^ Number of creates since last reset.
, createFailures :: Int
-- ^ Number of creation failures since last reset.
} deriving (Show)

-- | Pool-wide stats.
data Stats = Stats {
perStripe :: V.Vector PoolStats
-- ^ Stats per 'LocalPool' (stripe).
, poolStats :: PoolStats
-- ^ Aggregate stats across pool.
} deriving (Show)

-- | A single striped pool.
data LocalPool a = LocalPool {
inUse :: TVar Int
-- ^ Count of open entries (both idle and in use).
, entries :: TVar [Entry a]
-- ^ Idle entries.
, highwaterVar :: TVar Int
-- ^ Highest value of 'inUse' since last reset.
, takeVar :: TVar Int
-- ^ Number of takes since last reset.
, createVar :: TVar Int
-- ^ Number of creates since last reset.
, createFailureVar :: TVar Int
-- ^ Number of create failures since last reset.
, lfin :: IORef ()
-- ^ empty value used to attach a finalizer to (internal)
} deriving (Typeable)
Expand Down Expand Up @@ -159,7 +196,7 @@ createPool create destroy numStripes idleTime maxResources = do
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- V.replicateM numStripes $
liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ())
LocalPool <$> newTVarIO 0 <*> newTVarIO [] <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newTVarIO 0 <*> newIORef ()
reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask ->
unmask $ reaper destroy idleTime localPools
fin <- newIORef ()
Expand Down Expand Up @@ -276,15 +313,18 @@ takeResource :: Pool a -> IO (a, LocalPool a)
takeResource pool@Pool{..} = do
local@LocalPool{..} <- getLocalPool pool
resource <- liftBase . join . atomically $ do
modifyTVar_ takeVar (+ 1)
ents <- readTVar entries
case ents of
(Entry{..}:es) -> writeTVar entries es >> return (return entry)
[] -> do
used <- readTVar inUse
when (used == maxResources) retry
writeTVar inUse $! used + 1
modifyTVar_ highwaterVar (`max` (used + 1))
modifyTVar_ createVar (+ 1)
return $
create `onException` atomically (modifyTVar_ inUse (subtract 1))
create `onException` atomically (modifyTVar_ createFailureVar (+ 1) >> modifyTVar_ inUse (subtract 1))
return (resource, local)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE takeResource #-}
Expand Down Expand Up @@ -343,7 +383,7 @@ tryTakeResource pool@Pool{..} = do
-- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource'
getLocalPool :: Pool a -> IO (LocalPool a)
getLocalPool Pool{..} = do
i <- liftBase $ ((`mod` numStripes) . hash) <$> myThreadId
i <- liftBase $ ((`mod` numStripes) . hashWithSalt (-3750763034362895579)) <$> myThreadId
return $ localPools V.! i
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE getLocalPool #-}
Expand Down Expand Up @@ -385,6 +425,25 @@ putResource LocalPool{..} resource = do
destroyAllResources :: Pool a -> IO ()
destroyAllResources Pool{..} = V.forM_ localPools $ purgeLocalPool destroy

-- | @stats pool reset@ returns statistics on each 'LocalPool' as well as a summary across the entire Pool.
-- When @reset@ is true, the stats are reset.
stats :: Pool a -> Bool -> IO Stats
stats Pool{..} reset = do
let stripeStats LocalPool{..} = atomically $ do
is <- readTVar inUse
idle <- length <$> readTVar entries
let nonIdle = is - idle
s <- PoolStats <$> readTVar highwaterVar <*> pure is <*> pure idle <*> pure nonIdle <*> readTVar takeVar <*> readTVar createVar <*> readTVar createFailureVar
when reset $ do
mapM_ (\v -> writeTVar v 0) [takeVar, createVar, createFailureVar]
writeTVar highwaterVar $! currentUsage s
return s

per <- V.mapM stripeStats localPools
let poolWide = V.foldr merge (PoolStats 0 0 0 0 0 0 0) per
merge (PoolStats hw1 cu1 i1 ni1 t1 c1 f1) (PoolStats hw2 cu2 i2 ni2 t2 c2 f2) = PoolStats (hw1 + hw2) (cu1 + cu2) (i1 + i2) (ni1 + ni2) (t1 + t2) (c1 + c2) (f1 + f2)
return $ Stats per poolWide

modifyTVar_ :: TVar a -> (a -> a) -> STM ()
modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a

Expand Down