Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ data ConsumerMetrics = ConsumerMetrics
, jobsOverdue :: Prom.Vector Prom.Label1 Prom.Gauge
, jobsReserved :: Prom.Vector Prom.Label1 Prom.Counter
, jobsExecution :: Prom.Vector Prom.Label2 Prom.Histogram
, jobsParsingFailure :: Prom.Vector Prom.Label1 Prom.Counter
}

registerConsumerMetrics :: MonadBaseControl IO m => ConsumerMetricsConfig -> m ConsumerMetrics
Expand Down Expand Up @@ -116,6 +117,14 @@ registerConsumerMetrics ConsumerMetricsConfig {..} = liftBase $ do
, metricHelp = "Execution time of jobs in seconds, by job_name, includes the job_result"
}
jobExecutionBuckets
jobsParsingFailure <-
Prom.register
. Prom.vector "job_name"
$ Prom.counter
Prom.Info
{ metricName = "consumers_job_parsing_failures"
, metricHelp = "Number of jobs that failed to parse, by job_name"
}
pure $ ConsumerMetrics {..}

-- | Run a 'ConsumerConfig', but with instrumentation added.
Expand Down Expand Up @@ -235,7 +244,7 @@ instrumentConsumerConfig
-> ConsumerConfig m idx job
-> ConsumerConfig m idx job
instrumentConsumerConfig ConsumerMetrics {..} ConsumerConfig {..} =
ConsumerConfig {ccProcessJob = ccProcessJob', ..}
ConsumerConfig {ccProcessJob = ccProcessJob', ccOnFailedToFetchJob = ccOnFailedToFetchJob', ..}
where
jobName = unRawSQL ccJobsTable

Expand Down Expand Up @@ -263,3 +272,7 @@ instrumentConsumerConfig ConsumerMetrics {..} ConsumerConfig {..} =
liftBase $ Prom.withLabel jobsExecution (jobName, resultLabel) (`Prom.observe` duration)

handleEx e = logAttention "Exception while instrumenting job" $ object ["exception" .= show e]

ccOnFailedToFetchJob' errorTxt idx = do
liftBase $ Prom.withLabel jobsParsingFailure jobName Prom.incCounter
ccOnFailedToFetchJob errorTxt idx
3 changes: 2 additions & 1 deletion consumers/example/Example.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ main = do
{ ccJobsTable = "consumers_example_jobs"
, ccConsumersTable = "consumers_example_consumers"
, ccJobSelectors = ["id", "message"]
, ccJobFetcher = id
, ccJobFetcher = Right
, ccJobIndex = \(i :: Int64, _msg :: T.Text) -> i
, ccNotificationChannel = Just "consumers_example_chan"
, ccNotificationTimeout = 10 * 1000000 -- 10 sec
, ccMaxRunningJobs = 1
, ccProcessJob = processJob
, ccOnFailedToFetchJob = defaultOnFailedToFetchJob
, ccOnException = handleException
, ccJobLogData = \(i, _) -> ["job_id" .= i]
}
Expand Down
28 changes: 23 additions & 5 deletions consumers/src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import Control.Monad.Catch
import Control.Monad.Time
import Control.Monad.Trans
import Control.Monad.Trans.Control
import Data.Either
import Data.Foldable qualified as F
import Data.Function
import Data.Int
import Data.Map.Strict qualified as M
import Data.Monoid.Utils
import Data.Text qualified as T
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
import Database.PostgreSQL.Consumers.Utils
Expand Down Expand Up @@ -152,6 +154,15 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal
, "action" .= show action
]
pure action
, ccOnFailedToFetchJob = \t i ->
ccOnFailedToFetchJob cc0 t i `ES.catchAny` \handlerEx -> do
let action = RerunAfter $ idays 1
logAttention "ccOnFailedToFetchJob threw an exception" $
object
[ "exception" .= show handlerEx
, "action" .= show action
]
pure action
}

waitForRunningJobs runningJobsInfo runningJobs = do
Expand Down Expand Up @@ -271,15 +282,20 @@ spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do
stuckJobs <- fetchMany ccJobFetcher
unless (null stuckJobs) $ do
results <- forM stuckJobs $ \job -> do
action <- lift $ ccOnException (toException ThreadKilled) job
pure (ccJobIndex job, Failed action)
action <-
lift $
either
(\(idx, t) -> ccOnFailedToFetchJob t idx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That reminded me that we probably want to give ccOnFailedToFetchJob the same treatment we give to ccOnException here, i.e. have a safety net in case it itself throws due to a bug.

(ccOnException (toException ThreadKilled))
job
pure (either fst ccJobIndex job, Failed action)
runSQL_ $ updateJobsQuery ccJobsTable results now
runPreparedSQL_ (preparedSqlName "removeInactive" ccConsumersTable) $
smconcat
[ "DELETE FROM" <+> raw ccConsumersTable
, "WHERE id = ANY(" <?> Array1 inactive <+> ")"
]
pure (length inactive, map ccJobIndex stuckJobs)
pure (length inactive, fmap (either fst ccJobIndex) stuckJobs)
when (inactiveConsumers > 0) $ do
logInfo "Unregistered inactive consumers" $
object
Expand Down Expand Up @@ -328,6 +344,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
loop :: Int -> m Bool
loop limit = do
(batch, batchSize) <- reserveJobs limit
let (failedJobs, succeededJobs) = partitionEithers batch
when (batchSize > 0) $ do
logInfo "Processing batch" $
object
Expand All @@ -346,7 +363,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
. (`finally` subtractJobs)
. restore
$ do
mapM startJob batch >>= mapM joinJob >>= updateJobs
mapM startJob succeededJobs >>= mapM joinJob >>= updateJobs
mapM (\(idx, txt) -> sequence (idx, Failed <$> ccOnFailedToFetchJob txt idx)) failedJobs >>= updateJobs

when (batchSize == limit) $ do
maxBatchSize <- atomically $ do
Expand All @@ -357,7 +375,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs

pure (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m ([Either (idx, T.Text) job], Int)
reserveJobs limit = runDBT cs ts $ do
now <- currentTime
n <-
Expand Down
19 changes: 18 additions & 1 deletion consumers/src/Database/PostgreSQL/Consumers/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ module Database.PostgreSQL.Consumers.Config
( Action (..)
, Result (..)
, ConsumerConfig (..)
, defaultOnFailedToFetchJob
) where

import Control.Exception (SomeException)
import Data.Aeson.Types qualified as A
import Data.Text (Text)
import Data.Time
import Database.PostgreSQL.PQTypes.FromRow
import Database.PostgreSQL.PQTypes.Interval
import Database.PostgreSQL.PQTypes.Notification
import Database.PostgreSQL.PQTypes.SQL
import Database.PostgreSQL.PQTypes.SQL.Raw
import Log

-- | Action to take after a job was processed.
data Action
Expand Down Expand Up @@ -78,7 +81,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig
, ccJobSelectors :: ![SQL]
-- ^ Fields needed to be selected from the jobs table in order to assemble a
-- job.
, ccJobFetcher :: !(row -> job)
, ccJobFetcher :: !(row -> Either (idx, Text) job)
-- ^ Function that transforms the list of fields into a job.
, ccJobIndex :: !(job -> idx)
-- ^ Selector for taking out job ID from the job object.
Expand Down Expand Up @@ -110,10 +113,24 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig
-- ^ Function that processes a job. It's recommended to process each job in a
-- separate DB transaction, otherwise you'll have to remember to commit your
-- changes to the database manually.
, ccOnFailedToFetchJob :: !(Text -> idx -> m Action)
-- ^ Action taken if fetching a job failed. It is advised to reenqueue the
-- job at a later date and emit a warning in such a case. This is mostly
-- to ensure the application using consumers won't fail completely when
-- this happens. See 'defaultOnFailedToFetchJob' for a simple implementation
-- which logs and reenqueue.
, ccOnException :: !(SomeException -> job -> m Action)
-- ^ Action taken if a job processing function throws an exception. For
-- robustness it's best to ensure that it doesn't throw. If it does, the
-- exception will be logged and the job in question postponed by a day.
, ccJobLogData :: !(job -> [A.Pair])
-- ^ Data to attach to each log message while processing a job.
}

-- | A default implementation for ccOnFailedToFetchJob,
-- when the parsing of the row should never fail.
-- This will create a logAttention and reenqueue, to be replayed in one day.
defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action
defaultOnFailedToFetchJob msg idx = do
logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "job_id" A..= show idx]
pure . RerunAfter $ idays 1
3 changes: 2 additions & 1 deletion consumers/test/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,12 @@ test = do
{ ccJobsTable = "consumers_test_jobs"
, ccConsumersTable = "consumers_test_consumers"
, ccJobSelectors = ["id", "countdown"]
, ccJobFetcher = id
, ccJobFetcher = Right
, ccJobIndex = \(i :: Int64, _ :: Int32) -> i
, ccNotificationChannel = Just "consumers_test_chan"
, -- select some small timeout
ccNotificationTimeout = 100 * 1000 -- 100 msec
, ccOnFailedToFetchJob = defaultOnFailedToFetchJob
, ccMaxRunningJobs = 20
, ccProcessJob = processJob
, ccOnException = handleException
Expand Down