diff --git a/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs b/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs index 3a6a357..7fe0ac4 100644 --- a/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs +++ b/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs @@ -79,6 +79,7 @@ data ConsumerMetrics = ConsumerMetrics , jobsOverdue :: Prom.Vector Prom.Label1 Prom.Gauge , jobsReserved :: Prom.Vector Prom.Label1 Prom.Counter , jobsExecution :: Prom.Vector Prom.Label2 Prom.Histogram + , jobsParsingFailure :: Prom.Vector Prom.Label1 Prom.Counter } registerConsumerMetrics :: MonadBaseControl IO m => ConsumerMetricsConfig -> m ConsumerMetrics @@ -116,6 +117,14 @@ registerConsumerMetrics ConsumerMetricsConfig {..} = liftBase $ do , metricHelp = "Execution time of jobs in seconds, by job_name, includes the job_result" } jobExecutionBuckets + jobsParsingFailure <- + Prom.register + . Prom.vector "job_name" + $ Prom.counter + Prom.Info + { metricName = "consumers_job_parsing_failures" + , metricHelp = "Number of jobs that failed to parse, by job_name" + } pure $ ConsumerMetrics {..} -- | Run a 'ConsumerConfig', but with instrumentation added. @@ -235,7 +244,7 @@ instrumentConsumerConfig -> ConsumerConfig m idx job -> ConsumerConfig m idx job instrumentConsumerConfig ConsumerMetrics {..} ConsumerConfig {..} = - ConsumerConfig {ccProcessJob = ccProcessJob', ..} + ConsumerConfig {ccProcessJob = ccProcessJob', ccOnFailedToFetchJob = ccOnFailedToFetchJob', ..} where jobName = unRawSQL ccJobsTable @@ -263,3 +272,7 @@ instrumentConsumerConfig ConsumerMetrics {..} ConsumerConfig {..} = liftBase $ Prom.withLabel jobsExecution (jobName, resultLabel) (`Prom.observe` duration) handleEx e = logAttention "Exception while instrumenting job" $ object ["exception" .= show e] + + ccOnFailedToFetchJob' errorTxt idx = do + liftBase $ Prom.withLabel jobsParsingFailure jobName Prom.incCounter + ccOnFailedToFetchJob errorTxt idx diff --git a/consumers/CHANGELOG.md b/consumers/CHANGELOG.md index 487016b..48ba251 100644 --- a/consumers/CHANGELOG.md +++ b/consumers/CHANGELOG.md @@ -1,3 +1,6 @@ +# consumers-2.4.0.0 (TBD) +* Add ability to reenqueue unfetchable jobs, let ccJobFetcher fail + # consumers-2.3.4.0 (2025-11-27) * Compatibility with `hpqtypes` >= 1.13.0.0. diff --git a/consumers/consumers.cabal b/consumers/consumers.cabal index 5754092..e2db5df 100644 --- a/consumers/consumers.cabal +++ b/consumers/consumers.cabal @@ -1,6 +1,6 @@ cabal-version: 3.0 name: consumers -version: 2.3.4.0 +version: 2.4.0.0 synopsis: Concurrent PostgreSQL data consumers description: Library for setting up concurrent consumers of data diff --git a/consumers/example/Example.hs b/consumers/example/Example.hs index 47f563a..1400b6b 100644 --- a/consumers/example/Example.hs +++ b/consumers/example/Example.hs @@ -102,12 +102,13 @@ main = do { ccJobsTable = "consumers_example_jobs" , ccConsumersTable = "consumers_example_consumers" , ccJobSelectors = ["id", "message"] - , ccJobFetcher = id + , ccJobFetcher = Right , ccJobIndex = \(i :: Int64, _msg :: T.Text) -> i , ccNotificationChannel = Just "consumers_example_chan" , ccNotificationTimeout = 10 * 1000000 -- 10 sec , ccMaxRunningJobs = 1 , ccProcessJob = processJob + , ccOnFailedToFetchJob = defaultOnFailedToFetchJob , ccOnException = handleException , ccJobLogData = \(i, _) -> ["job_id" .= i] } diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 127e30a..444fc42 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -18,11 +18,13 @@ import Control.Monad.Catch import Control.Monad.Time import Control.Monad.Trans import Control.Monad.Trans.Control +import Data.Either import Data.Foldable qualified as F import Data.Function import Data.Int import Data.Map.Strict qualified as M import Data.Monoid.Utils +import Data.Text qualified as T import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer import Database.PostgreSQL.Consumers.Utils @@ -152,6 +154,15 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal , "action" .= show action ] pure action + , ccOnFailedToFetchJob = \t i -> + ccOnFailedToFetchJob cc0 t i `ES.catchAny` \handlerEx -> do + let action = RerunAfter $ idays 1 + logAttention "ccOnFailedToFetchJob threw an exception" $ + object + [ "exception" .= show handlerEx + , "action" .= show action + ] + pure action } waitForRunningJobs runningJobsInfo runningJobs = do @@ -271,15 +282,20 @@ spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do stuckJobs <- fetchMany ccJobFetcher unless (null stuckJobs) $ do results <- forM stuckJobs $ \job -> do - action <- lift $ ccOnException (toException ThreadKilled) job - pure (ccJobIndex job, Failed action) + action <- + lift $ + either + (\(idx, t) -> ccOnFailedToFetchJob t idx) + (ccOnException (toException ThreadKilled)) + job + pure (either fst ccJobIndex job, Failed action) runSQL_ $ updateJobsQuery ccJobsTable results now runPreparedSQL_ (preparedSqlName "removeInactive" ccConsumersTable) $ smconcat [ "DELETE FROM" <+> raw ccConsumersTable , "WHERE id = ANY(" Array1 inactive <+> ")" ] - pure (length inactive, map ccJobIndex stuckJobs) + pure (length inactive, fmap (either fst ccJobIndex) stuckJobs) when (inactiveConsumers > 0) $ do logInfo "Unregistered inactive consumers" $ object @@ -328,6 +344,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs loop :: Int -> m Bool loop limit = do (batch, batchSize) <- reserveJobs limit + let (failedJobs, succeededJobs) = partitionEithers batch when (batchSize > 0) $ do logInfo "Processing batch" $ object @@ -346,7 +363,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . (`finally` subtractJobs) . restore $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + mapM startJob succeededJobs >>= mapM joinJob >>= updateJobs + mapM (\(idx, txt) -> sequence (idx, Failed <$> ccOnFailedToFetchJob txt idx)) failedJobs >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -357,7 +375,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: Int -> m ([Either (idx, T.Text) job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index fde9636..ac9f623 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -2,16 +2,19 @@ module Database.PostgreSQL.Consumers.Config ( Action (..) , Result (..) , ConsumerConfig (..) + , defaultOnFailedToFetchJob ) where import Control.Exception (SomeException) import Data.Aeson.Types qualified as A +import Data.Text (Text) import Data.Time import Database.PostgreSQL.PQTypes.FromRow import Database.PostgreSQL.PQTypes.Interval import Database.PostgreSQL.PQTypes.Notification import Database.PostgreSQL.PQTypes.SQL import Database.PostgreSQL.PQTypes.SQL.Raw +import Log -- | Action to take after a job was processed. data Action @@ -78,7 +81,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig , ccJobSelectors :: ![SQL] -- ^ Fields needed to be selected from the jobs table in order to assemble a -- job. - , ccJobFetcher :: !(row -> job) + , ccJobFetcher :: !(row -> Either (idx, Text) job) -- ^ Function that transforms the list of fields into a job. , ccJobIndex :: !(job -> idx) -- ^ Selector for taking out job ID from the job object. @@ -110,6 +113,12 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- ^ Function that processes a job. It's recommended to process each job in a -- separate DB transaction, otherwise you'll have to remember to commit your -- changes to the database manually. + , ccOnFailedToFetchJob :: !(Text -> idx -> m Action) + -- ^ Action taken if fetching a job failed. It is advised to reenqueue the + -- job at a later date and emit a warning in such a case. This is mostly + -- to ensure the application using consumers won't fail completely when + -- this happens. See 'defaultOnFailedToFetchJob' for a simple implementation + -- which logs and reenqueue. , ccOnException :: !(SomeException -> job -> m Action) -- ^ Action taken if a job processing function throws an exception. For -- robustness it's best to ensure that it doesn't throw. If it does, the @@ -117,3 +126,11 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig , ccJobLogData :: !(job -> [A.Pair]) -- ^ Data to attach to each log message while processing a job. } + +-- | A default implementation for ccOnFailedToFetchJob, +-- when the parsing of the row should never fail. +-- This will create a logAttention and reenqueue, to be replayed in one day. +defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action +defaultOnFailedToFetchJob msg idx = do + logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "consumers_job_id" A..= show idx] + pure . RerunAfter $ idays 1 diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index fd2b61d..661ed5c 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -145,11 +145,12 @@ test = do { ccJobsTable = "consumers_test_jobs" , ccConsumersTable = "consumers_test_consumers" , ccJobSelectors = ["id", "countdown"] - , ccJobFetcher = id + , ccJobFetcher = Right , ccJobIndex = \(i :: Int64, _ :: Int32) -> i , ccNotificationChannel = Just "consumers_test_chan" , -- select some small timeout ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccOnFailedToFetchJob = defaultOnFailedToFetchJob , ccMaxRunningJobs = 20 , ccProcessJob = processJob , ccOnException = handleException