From 03ae07c898e5fa10c4b4450b8758aa327ff3e6ac Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 25 Sep 2025 13:00:55 +0200 Subject: [PATCH 01/14] WIP --- consumers/example/Example.hs | 6 ++++++ .../src/Database/PostgreSQL/Consumers/Components.hs | 12 ++++++++---- .../src/Database/PostgreSQL/Consumers/Config.hs | 6 ++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/consumers/example/Example.hs b/consumers/example/Example.hs index 47f563a..a59783a 100644 --- a/consumers/example/Example.hs +++ b/consumers/example/Example.hs @@ -108,6 +108,7 @@ main = do , ccNotificationTimeout = 10 * 1000000 -- 10 sec , ccMaxRunningJobs = 1 , ccProcessJob = processJob + , ccOnFailedToFetchJob = handleFailedToFetchJob , ccOnException = handleException , ccJobLogData = \(i, _) -> ["job_id" .= i] } @@ -135,6 +136,11 @@ main = do handleException :: SomeException -> (Int64, T.Text) -> AppM Action handleException _ _ = pure . RerunAfter $ imicroseconds 500000 + handleFailedToFetchJob :: SomeException -> (Int64, T.Text) -> AppM (Either (Int64, Result) (Int64, T.Text)) + handleFailedToFetchJob e (idx, _) = do + logAttention "Failing to fetch job" $ object ["error" .= show e] + pure . Left $ (idx, Failed . RerunAfter $ idays 48) + -- | Table where jobs are stored. See -- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'. jobsTable :: Table diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 127e30a..b6bc100 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -28,6 +28,7 @@ import Database.PostgreSQL.Consumers.Consumer import Database.PostgreSQL.Consumers.Utils import Database.PostgreSQL.PQTypes import Log +import Data.Either (partitionEithers) -- | Run the consumer. The purpose of the returned monadic action is to wait for -- currently processed jobs and clean up. This function is best used in @@ -328,6 +329,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs loop :: Int -> m Bool loop limit = do (batch, batchSize) <- reserveJobs limit + let (failedJobs, succeededJobs) = partitionEithers batch when (batchSize > 0) $ do logInfo "Processing batch" $ object @@ -346,7 +348,8 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . (`finally` subtractJobs) . restore $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + mapM startJob succeededJobs >>= mapM joinJob >>= updateJobs + mapM (\failedRow -> sequence (ccRowIndex failedRow, ccOnFailedToFetchJob failedRow)) failedJobs >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -357,7 +360,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: (MonadCatch m) => Int -> m ([Either row job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- @@ -372,8 +375,9 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id IN (" <> reservedJobs now <> ")" , "RETURNING" <+> mintercalate ", " ccJobSelectors ] - -- Decode lazily as we want the transaction to be as short as possible. - (,n) . F.toList . fmap ccJobFetcher <$> queryResult + let tryAndParse :: (row -> job) -> row -> Either row job + tryAndParse p row = ES.handle (\(SomeException _) -> Left row) (Right $ p row) + (,n) . F.toList . fmap (tryAndParse ccJobFetcher) <$> queryResult where reservedJobs :: UTCTime -> SQL reservedJobs now = diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index fde9636..88da747 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -110,6 +110,12 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- ^ Function that processes a job. It's recommended to process each job in a -- separate DB transaction, otherwise you'll have to remember to commit your -- changes to the database manually. + , ccRowIndex :: !(row -> idx) + , ccOnFailedToFetchJob :: !(row -> m Result) + -- ^ Action taken if fetching a job failed. It is advised to reenqueue the + -- job at a later date and emit a warning in such a case. This is mostly + -- to ensure the application using consumers won't fail completely when + -- this happens. , ccOnException :: !(SomeException -> job -> m Action) -- ^ Action taken if a job processing function throws an exception. For -- robustness it's best to ensure that it doesn't throw. If it does, the From 3de2d3725b88d1b8fb85b217f77a5b7fdad6f473 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Fri, 26 Sep 2025 01:14:50 +0200 Subject: [PATCH 02/14] fix --- .../src/Database/PostgreSQL/Consumers/Components.hs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index b6bc100..685c1dd 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted import Control.Concurrent.STM hiding (atomically) import Control.Concurrent.STM qualified as STM import Control.Concurrent.Thread.Lifted qualified as T -import Control.Exception (AsyncException (ThreadKilled)) +import Control.Exception (AsyncException (ThreadKilled), evaluate) import Control.Exception.Safe qualified as ES import Control.Monad import Control.Monad.Base @@ -375,9 +375,11 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id IN (" <> reservedJobs now <> ")" , "RETURNING" <+> mintercalate ", " ccJobSelectors ] - let tryAndParse :: (row -> job) -> row -> Either row job - tryAndParse p row = ES.handle (\(SomeException _) -> Left row) (Right $ p row) - (,n) . F.toList . fmap (tryAndParse ccJobFetcher) <$> queryResult + let tryAndParse row = ES.handle + (\(SomeException _) -> pure $ Left row) + (fmap Right . liftBase . evaluate $ ccJobFetcher row) + _ {- ([row], [job]) -} <- fmap partitionEithers . traverse tryAndParse . F.toList =<< queryResult + undefined where reservedJobs :: UTCTime -> SQL reservedJobs now = From cf48850492c15fca455aded78ce17cdf38dc0164 Mon Sep 17 00:00:00 2001 From: Raveline Date: Fri, 26 Sep 2025 12:55:09 +0200 Subject: [PATCH 03/14] Change ccJobFetcher signature --- consumers/example/Example.hs | 12 ++++++------ .../Database/PostgreSQL/Consumers/Components.hs | 14 +++++--------- .../src/Database/PostgreSQL/Consumers/Config.hs | 5 ++--- consumers/test/Test.hs | 3 ++- 4 files changed, 15 insertions(+), 19 deletions(-) diff --git a/consumers/example/Example.hs b/consumers/example/Example.hs index a59783a..d77d535 100644 --- a/consumers/example/Example.hs +++ b/consumers/example/Example.hs @@ -102,13 +102,13 @@ main = do { ccJobsTable = "consumers_example_jobs" , ccConsumersTable = "consumers_example_consumers" , ccJobSelectors = ["id", "message"] - , ccJobFetcher = id + , ccJobFetcher = Right , ccJobIndex = \(i :: Int64, _msg :: T.Text) -> i , ccNotificationChannel = Just "consumers_example_chan" , ccNotificationTimeout = 10 * 1000000 -- 10 sec , ccMaxRunningJobs = 1 , ccProcessJob = processJob - , ccOnFailedToFetchJob = handleFailedToFetchJob + , ccOnFailedToFetchJob = handleFailedToFetchJob , ccOnException = handleException , ccJobLogData = \(i, _) -> ["job_id" .= i] } @@ -136,10 +136,10 @@ main = do handleException :: SomeException -> (Int64, T.Text) -> AppM Action handleException _ _ = pure . RerunAfter $ imicroseconds 500000 - handleFailedToFetchJob :: SomeException -> (Int64, T.Text) -> AppM (Either (Int64, Result) (Int64, T.Text)) - handleFailedToFetchJob e (idx, _) = do - logAttention "Failing to fetch job" $ object ["error" .= show e] - pure . Left $ (idx, Failed . RerunAfter $ idays 48) + handleFailedToFetchJob :: String -> Int64 -> AppM Action + handleFailedToFetchJob msg idx = do + logAttention "Failing to fetch job" $ object ["error" .= msg, "job_index" .= idx] + pure . RerunAfter $ idays 48 -- | Table where jobs are stored. See -- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'. diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 685c1dd..f7ea6b6 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -18,6 +18,7 @@ import Control.Monad.Catch import Control.Monad.Time import Control.Monad.Trans import Control.Monad.Trans.Control +import Data.Either import Data.Foldable qualified as F import Data.Function import Data.Int @@ -28,7 +29,6 @@ import Database.PostgreSQL.Consumers.Consumer import Database.PostgreSQL.Consumers.Utils import Database.PostgreSQL.PQTypes import Log -import Data.Either (partitionEithers) -- | Run the consumer. The purpose of the returned monadic action is to wait for -- currently processed jobs and clean up. This function is best used in @@ -269,7 +269,7 @@ spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do , "WHERE reserved_by = ANY(" Array1 inactive <+> ")" , "FOR UPDATE SKIP LOCKED" ] - stuckJobs <- fetchMany ccJobFetcher + stuckJobs <- rights <$> fetchMany ccJobFetcher unless (null stuckJobs) $ do results <- forM stuckJobs $ \job -> do action <- lift $ ccOnException (toException ThreadKilled) job @@ -349,7 +349,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . restore $ do mapM startJob succeededJobs >>= mapM joinJob >>= updateJobs - mapM (\failedRow -> sequence (ccRowIndex failedRow, ccOnFailedToFetchJob failedRow)) failedJobs >>= updateJobs + mapM (\(idx, string) -> sequence (idx, Failed <$> ccOnFailedToFetchJob string idx)) failedJobs >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -360,7 +360,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: (MonadCatch m) => Int -> m ([Either row job], Int) + reserveJobs :: MonadCatch m => Int -> m ([Either (idx, String) job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- @@ -375,11 +375,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id IN (" <> reservedJobs now <> ")" , "RETURNING" <+> mintercalate ", " ccJobSelectors ] - let tryAndParse row = ES.handle - (\(SomeException _) -> pure $ Left row) - (fmap Right . liftBase . evaluate $ ccJobFetcher row) - _ {- ([row], [job]) -} <- fmap partitionEithers . traverse tryAndParse . F.toList =<< queryResult - undefined + (,n) . F.toList . fmap ccJobFetcher <$> queryResult where reservedJobs :: UTCTime -> SQL reservedJobs now = diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index 88da747..93b595c 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -78,7 +78,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig , ccJobSelectors :: ![SQL] -- ^ Fields needed to be selected from the jobs table in order to assemble a -- job. - , ccJobFetcher :: !(row -> job) + , ccJobFetcher :: !(row -> Either (idx, String) job) -- ^ Function that transforms the list of fields into a job. , ccJobIndex :: !(job -> idx) -- ^ Selector for taking out job ID from the job object. @@ -110,8 +110,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- ^ Function that processes a job. It's recommended to process each job in a -- separate DB transaction, otherwise you'll have to remember to commit your -- changes to the database manually. - , ccRowIndex :: !(row -> idx) - , ccOnFailedToFetchJob :: !(row -> m Result) + , ccOnFailedToFetchJob :: !(String -> idx -> m Action) -- ^ Action taken if fetching a job failed. It is advised to reenqueue the -- job at a later date and emit a warning in such a case. This is mostly -- to ensure the application using consumers won't fail completely when diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index fd2b61d..f466ce0 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -145,11 +145,12 @@ test = do { ccJobsTable = "consumers_test_jobs" , ccConsumersTable = "consumers_test_consumers" , ccJobSelectors = ["id", "countdown"] - , ccJobFetcher = id + , ccJobFetcher = Right , ccJobIndex = \(i :: Int64, _ :: Int32) -> i , ccNotificationChannel = Just "consumers_test_chan" , -- select some small timeout ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccOnFailedToFetchJob = \_ _ -> pure . RerunAfter $ idays 14 , ccMaxRunningJobs = 20 , ccProcessJob = processJob , ccOnException = handleException From 468af9fbb2a66f052b209284d1f53a484cb94c77 Mon Sep 17 00:00:00 2001 From: Raveline Date: Fri, 3 Oct 2025 09:51:22 +0200 Subject: [PATCH 04/14] Add a default implementation for parsing that should not fail --- consumers/example/Example.hs | 7 +------ .../src/Database/PostgreSQL/Consumers/Components.hs | 2 +- consumers/src/Database/PostgreSQL/Consumers/Config.hs | 10 ++++++++++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/consumers/example/Example.hs b/consumers/example/Example.hs index d77d535..880d888 100644 --- a/consumers/example/Example.hs +++ b/consumers/example/Example.hs @@ -108,7 +108,7 @@ main = do , ccNotificationTimeout = 10 * 1000000 -- 10 sec , ccMaxRunningJobs = 1 , ccProcessJob = processJob - , ccOnFailedToFetchJob = handleFailedToFetchJob + , ccOnFailedToFetchJob = shouldNotFail , ccOnException = handleException , ccJobLogData = \(i, _) -> ["job_id" .= i] } @@ -136,11 +136,6 @@ main = do handleException :: SomeException -> (Int64, T.Text) -> AppM Action handleException _ _ = pure . RerunAfter $ imicroseconds 500000 - handleFailedToFetchJob :: String -> Int64 -> AppM Action - handleFailedToFetchJob msg idx = do - logAttention "Failing to fetch job" $ object ["error" .= msg, "job_index" .= idx] - pure . RerunAfter $ idays 48 - -- | Table where jobs are stored. See -- 'Database.PostgreSQL.Consumers.Config.ConsumerConfig'. jobsTable :: Table diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index f7ea6b6..7e5742d 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted import Control.Concurrent.STM hiding (atomically) import Control.Concurrent.STM qualified as STM import Control.Concurrent.Thread.Lifted qualified as T -import Control.Exception (AsyncException (ThreadKilled), evaluate) +import Control.Exception (AsyncException (ThreadKilled)) import Control.Exception.Safe qualified as ES import Control.Monad import Control.Monad.Base diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index 93b595c..dc9ee9f 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -2,6 +2,7 @@ module Database.PostgreSQL.Consumers.Config ( Action (..) , Result (..) , ConsumerConfig (..) + , shouldNotFail ) where import Control.Exception (SomeException) @@ -12,6 +13,7 @@ import Database.PostgreSQL.PQTypes.Interval import Database.PostgreSQL.PQTypes.Notification import Database.PostgreSQL.PQTypes.SQL import Database.PostgreSQL.PQTypes.SQL.Raw +import Log -- | Action to take after a job was processed. data Action @@ -122,3 +124,11 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig , ccJobLogData :: !(job -> [A.Pair]) -- ^ Data to attach to each log message while processing a job. } + +-- | A default implementation for ccOnFailedToFetchJob, +-- when the parsing of the row should never fail. +-- This will create a logAttention and reenqueue for the next day. +shouldNotFail :: MonadLog m => String -> idx -> m Action +shouldNotFail msg _ = do + logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg] + pure . RerunAfter $ idays 48 From 0fd9fdda2ebe70a241bd2ca943ad3149948c92c5 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 27 Nov 2025 13:59:48 +0100 Subject: [PATCH 05/14] Apply PR suggestions --- consumers/example/Example.hs | 2 +- .../src/Database/PostgreSQL/Consumers/Components.hs | 5 +++-- .../src/Database/PostgreSQL/Consumers/Config.hs | 13 +++++++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/consumers/example/Example.hs b/consumers/example/Example.hs index 880d888..1400b6b 100644 --- a/consumers/example/Example.hs +++ b/consumers/example/Example.hs @@ -108,7 +108,7 @@ main = do , ccNotificationTimeout = 10 * 1000000 -- 10 sec , ccMaxRunningJobs = 1 , ccProcessJob = processJob - , ccOnFailedToFetchJob = shouldNotFail + , ccOnFailedToFetchJob = defaultOnFailedToFetchJob , ccOnException = handleException , ccJobLogData = \(i, _) -> ["job_id" .= i] } diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 7e5742d..7ad486f 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -24,6 +24,7 @@ import Data.Function import Data.Int import Data.Map.Strict qualified as M import Data.Monoid.Utils +import Data.Text qualified as T import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer import Database.PostgreSQL.Consumers.Utils @@ -349,7 +350,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . restore $ do mapM startJob succeededJobs >>= mapM joinJob >>= updateJobs - mapM (\(idx, string) -> sequence (idx, Failed <$> ccOnFailedToFetchJob string idx)) failedJobs >>= updateJobs + mapM (\(idx, txt) -> sequence (idx, Failed <$> ccOnFailedToFetchJob txt idx)) failedJobs >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -360,7 +361,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: MonadCatch m => Int -> m ([Either (idx, String) job], Int) + reserveJobs :: MonadCatch m => Int -> m ([Either (idx, T.Text) job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index dc9ee9f..edb76cd 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -2,7 +2,7 @@ module Database.PostgreSQL.Consumers.Config ( Action (..) , Result (..) , ConsumerConfig (..) - , shouldNotFail + , defaultOnFailedToFetchJob ) where import Control.Exception (SomeException) @@ -14,6 +14,7 @@ import Database.PostgreSQL.PQTypes.Notification import Database.PostgreSQL.PQTypes.SQL import Database.PostgreSQL.PQTypes.SQL.Raw import Log +import Data.Text -- | Action to take after a job was processed. data Action @@ -80,7 +81,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig , ccJobSelectors :: ![SQL] -- ^ Fields needed to be selected from the jobs table in order to assemble a -- job. - , ccJobFetcher :: !(row -> Either (idx, String) job) + , ccJobFetcher :: !(row -> Either (idx, Text) job) -- ^ Function that transforms the list of fields into a job. , ccJobIndex :: !(job -> idx) -- ^ Selector for taking out job ID from the job object. @@ -112,7 +113,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- ^ Function that processes a job. It's recommended to process each job in a -- separate DB transaction, otherwise you'll have to remember to commit your -- changes to the database manually. - , ccOnFailedToFetchJob :: !(String -> idx -> m Action) + , ccOnFailedToFetchJob :: !(Text -> idx -> m Action) -- ^ Action taken if fetching a job failed. It is advised to reenqueue the -- job at a later date and emit a warning in such a case. This is mostly -- to ensure the application using consumers won't fail completely when @@ -128,7 +129,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- | A default implementation for ccOnFailedToFetchJob, -- when the parsing of the row should never fail. -- This will create a logAttention and reenqueue for the next day. -shouldNotFail :: MonadLog m => String -> idx -> m Action -shouldNotFail msg _ = do - logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg] +defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action +defaultOnFailedToFetchJob msg idx = do + logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx] pure . RerunAfter $ idays 48 From ec838d6973cf70b764fe2719a680686bd3198e2d Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 27 Nov 2025 14:19:59 +0100 Subject: [PATCH 06/14] Set a more sensible reenqueue default time --- consumers/src/Database/PostgreSQL/Consumers/Config.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index edb76cd..2554a38 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -128,8 +128,8 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- | A default implementation for ccOnFailedToFetchJob, -- when the parsing of the row should never fail. --- This will create a logAttention and reenqueue for the next day. +-- This will create a logAttention and reenqueue, to be replayed in 2 days. defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action defaultOnFailedToFetchJob msg idx = do logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx] - pure . RerunAfter $ idays 48 + pure . RerunAfter $ ihours 48 From 712e537d153170ea637902ae589ddaf6f9031754 Mon Sep 17 00:00:00 2001 From: Raveline Date: Tue, 2 Dec 2025 17:04:15 +0100 Subject: [PATCH 07/14] Monitor jobs that failed to parse --- .../Database/PostgreSQL/Consumers/Instrumented.hs | 15 ++++++++++++++- .../src/Database/PostgreSQL/Consumers/Config.hs | 3 ++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs b/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs index 3a6a357..b93d074 100644 --- a/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs +++ b/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs @@ -79,6 +79,7 @@ data ConsumerMetrics = ConsumerMetrics , jobsOverdue :: Prom.Vector Prom.Label1 Prom.Gauge , jobsReserved :: Prom.Vector Prom.Label1 Prom.Counter , jobsExecution :: Prom.Vector Prom.Label2 Prom.Histogram + , jobsParsingFailure :: Prom.Vector Prom.Label1 Prom.Counter } registerConsumerMetrics :: MonadBaseControl IO m => ConsumerMetricsConfig -> m ConsumerMetrics @@ -116,6 +117,14 @@ registerConsumerMetrics ConsumerMetricsConfig {..} = liftBase $ do , metricHelp = "Execution time of jobs in seconds, by job_name, includes the job_result" } jobExecutionBuckets + jobsParsingFailure <- + Prom.register + . Prom.vector "job_name" + $ Prom.counter + Prom.Info + { metricName = "consumers_job_parsing_failures", + metricHelp = "Number of jobs that failed to parse, by job_name" + } pure $ ConsumerMetrics {..} -- | Run a 'ConsumerConfig', but with instrumentation added. @@ -235,7 +244,7 @@ instrumentConsumerConfig -> ConsumerConfig m idx job -> ConsumerConfig m idx job instrumentConsumerConfig ConsumerMetrics {..} ConsumerConfig {..} = - ConsumerConfig {ccProcessJob = ccProcessJob', ..} + ConsumerConfig {ccProcessJob = ccProcessJob', ccOnFailedToFetchJob = ccOnFailedToFetchJob', ..} where jobName = unRawSQL ccJobsTable @@ -263,3 +272,7 @@ instrumentConsumerConfig ConsumerMetrics {..} ConsumerConfig {..} = liftBase $ Prom.withLabel jobsExecution (jobName, resultLabel) (`Prom.observe` duration) handleEx e = logAttention "Exception while instrumenting job" $ object ["exception" .= show e] + + ccOnFailedToFetchJob' errorTxt idx = do + liftBase $ Prom.withLabel jobsParsingFailure jobName Prom.incCounter + ccOnFailedToFetchJob errorTxt idx diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index 2554a38..6c98abe 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -117,7 +117,8 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- ^ Action taken if fetching a job failed. It is advised to reenqueue the -- job at a later date and emit a warning in such a case. This is mostly -- to ensure the application using consumers won't fail completely when - -- this happens. + -- this happens. See 'defaultOnFailedToFetchJob' for a simple implementation + -- which logs and reenqueue. , ccOnException :: !(SomeException -> job -> m Action) -- ^ Action taken if a job processing function throws an exception. For -- robustness it's best to ensure that it doesn't throw. If it does, the From 87f4cae546b8ca3c9908824ae01c82e105c61116 Mon Sep 17 00:00:00 2001 From: Raveline Date: Wed, 17 Dec 2025 18:20:06 +0100 Subject: [PATCH 08/14] Fix formatting --- consumers/src/Database/PostgreSQL/Consumers/Config.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index 6c98abe..8badca6 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -7,6 +7,7 @@ module Database.PostgreSQL.Consumers.Config import Control.Exception (SomeException) import Data.Aeson.Types qualified as A +import Data.Text import Data.Time import Database.PostgreSQL.PQTypes.FromRow import Database.PostgreSQL.PQTypes.Interval @@ -14,7 +15,6 @@ import Database.PostgreSQL.PQTypes.Notification import Database.PostgreSQL.PQTypes.SQL import Database.PostgreSQL.PQTypes.SQL.Raw import Log -import Data.Text -- | Action to take after a job was processed. data Action From 0c1570eb4f7996078b15f095bceac15196cef5a7 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 18 Dec 2025 17:20:07 +0100 Subject: [PATCH 09/14] Address PR comments --- .../Database/PostgreSQL/Consumers/Components.hs | 15 ++++++++++----- .../src/Database/PostgreSQL/Consumers/Config.hs | 6 +++--- consumers/test/Test.hs | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 7ad486f..e313b05 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -270,18 +270,22 @@ spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do , "WHERE reserved_by = ANY(" Array1 inactive <+> ")" , "FOR UPDATE SKIP LOCKED" ] - stuckJobs <- rights <$> fetchMany ccJobFetcher + stuckJobs <- fetchMany ccJobFetcher unless (null stuckJobs) $ do results <- forM stuckJobs $ \job -> do - action <- lift $ ccOnException (toException ThreadKilled) job - pure (ccJobIndex job, Failed action) + action <- lift $ + either + (\(idx, t) -> ccOnFailedToFetchJob t idx) + (ccOnException (toException ThreadKilled)) + job + pure (either fst ccJobIndex job, Failed action) runSQL_ $ updateJobsQuery ccJobsTable results now runPreparedSQL_ (preparedSqlName "removeInactive" ccConsumersTable) $ smconcat [ "DELETE FROM" <+> raw ccConsumersTable , "WHERE id = ANY(" Array1 inactive <+> ")" ] - pure (length inactive, map ccJobIndex stuckJobs) + pure (length inactive, fmap (either fst ccJobIndex) stuckJobs) when (inactiveConsumers > 0) $ do logInfo "Unregistered inactive consumers" $ object @@ -361,7 +365,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: MonadCatch m => Int -> m ([Either (idx, T.Text) job], Int) + reserveJobs :: Int -> m ([Either (idx, T.Text) job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- @@ -376,6 +380,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id IN (" <> reservedJobs now <> ")" , "RETURNING" <+> mintercalate ", " ccJobSelectors ] + -- Decode lazily as we want the transaction to be as short as possible (,n) . F.toList . fmap ccJobFetcher <$> queryResult where reservedJobs :: UTCTime -> SQL diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index 8badca6..da4a284 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -7,7 +7,7 @@ module Database.PostgreSQL.Consumers.Config import Control.Exception (SomeException) import Data.Aeson.Types qualified as A -import Data.Text +import Data.Text (Text) import Data.Time import Database.PostgreSQL.PQTypes.FromRow import Database.PostgreSQL.PQTypes.Interval @@ -132,5 +132,5 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- This will create a logAttention and reenqueue, to be replayed in 2 days. defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action defaultOnFailedToFetchJob msg idx = do - logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx] - pure . RerunAfter $ ihours 48 + logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "job_id" A..= show idx] + pure . RerunAfter $ idays 1 diff --git a/consumers/test/Test.hs b/consumers/test/Test.hs index f466ce0..661ed5c 100644 --- a/consumers/test/Test.hs +++ b/consumers/test/Test.hs @@ -150,7 +150,7 @@ test = do , ccNotificationChannel = Just "consumers_test_chan" , -- select some small timeout ccNotificationTimeout = 100 * 1000 -- 100 msec - , ccOnFailedToFetchJob = \_ _ -> pure . RerunAfter $ idays 14 + , ccOnFailedToFetchJob = defaultOnFailedToFetchJob , ccMaxRunningJobs = 20 , ccProcessJob = processJob , ccOnException = handleException From 73c244e050f50ab3207fdbdbc52ecaca020b9430 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 18 Dec 2025 17:20:18 +0100 Subject: [PATCH 10/14] Fix formatting again --- .../src/Database/PostgreSQL/Consumers/Components.hs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index e313b05..651f8af 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -273,11 +273,12 @@ spawnMonitor ConsumerConfig {..} cs cid = forkP "monitor" . forever $ do stuckJobs <- fetchMany ccJobFetcher unless (null stuckJobs) $ do results <- forM stuckJobs $ \job -> do - action <- lift $ - either - (\(idx, t) -> ccOnFailedToFetchJob t idx) - (ccOnException (toException ThreadKilled)) - job + action <- + lift $ + either + (\(idx, t) -> ccOnFailedToFetchJob t idx) + (ccOnException (toException ThreadKilled)) + job pure (either fst ccJobIndex job, Failed action) runSQL_ $ updateJobsQuery ccJobsTable results now runPreparedSQL_ (preparedSqlName "removeInactive" ccConsumersTable) $ From 56a705d97e85a6c70870fdec1251f4edcabdf9d7 Mon Sep 17 00:00:00 2001 From: Raveline Date: Thu, 18 Dec 2025 17:28:56 +0100 Subject: [PATCH 11/14] Format also the changes to consumer-metrics --- .../src/Database/PostgreSQL/Consumers/Instrumented.hs | 4 ++-- consumers/src/Database/PostgreSQL/Consumers/Config.hs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs b/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs index b93d074..7fe0ac4 100644 --- a/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs +++ b/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs @@ -122,8 +122,8 @@ registerConsumerMetrics ConsumerMetricsConfig {..} = liftBase $ do . Prom.vector "job_name" $ Prom.counter Prom.Info - { metricName = "consumers_job_parsing_failures", - metricHelp = "Number of jobs that failed to parse, by job_name" + { metricName = "consumers_job_parsing_failures" + , metricHelp = "Number of jobs that failed to parse, by job_name" } pure $ ConsumerMetrics {..} diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index da4a284..c7393fb 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -129,7 +129,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- | A default implementation for ccOnFailedToFetchJob, -- when the parsing of the row should never fail. --- This will create a logAttention and reenqueue, to be replayed in 2 days. +-- This will create a logAttention and reenqueue, to be replayed in one day. defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action defaultOnFailedToFetchJob msg idx = do logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "job_id" A..= show idx] From 6f76b3c4142e5397cd1b3f74ba2a3627bc16588c Mon Sep 17 00:00:00 2001 From: Raveline Date: Fri, 19 Dec 2025 09:47:44 +0100 Subject: [PATCH 12/14] Add protection against crashlooping in ccOnFailedToFetchJob --- .../src/Database/PostgreSQL/Consumers/Components.hs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 651f8af..444fc42 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -154,6 +154,15 @@ runConsumerWithMaybeIdleSignal cc0 cs mIdleSignal , "action" .= show action ] pure action + , ccOnFailedToFetchJob = \t i -> + ccOnFailedToFetchJob cc0 t i `ES.catchAny` \handlerEx -> do + let action = RerunAfter $ idays 1 + logAttention "ccOnFailedToFetchJob threw an exception" $ + object + [ "exception" .= show handlerEx + , "action" .= show action + ] + pure action } waitForRunningJobs runningJobsInfo runningJobs = do @@ -381,7 +390,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs , "WHERE id IN (" <> reservedJobs now <> ")" , "RETURNING" <+> mintercalate ", " ccJobSelectors ] - -- Decode lazily as we want the transaction to be as short as possible + -- Decode lazily as we want the transaction to be as short as possible. (,n) . F.toList . fmap ccJobFetcher <$> queryResult where reservedJobs :: UTCTime -> SQL From d3b4d10e74456ed127886764855e953160bdab46 Mon Sep 17 00:00:00 2001 From: Raveline Date: Fri, 9 Jan 2026 15:46:19 +0100 Subject: [PATCH 13/14] Fix job_id field name conflict in logging --- consumers/src/Database/PostgreSQL/Consumers/Config.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumers/src/Database/PostgreSQL/Consumers/Config.hs b/consumers/src/Database/PostgreSQL/Consumers/Config.hs index c7393fb..ac9f623 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Config.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Config.hs @@ -132,5 +132,5 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig -- This will create a logAttention and reenqueue, to be replayed in one day. defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action defaultOnFailedToFetchJob msg idx = do - logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "job_id" A..= show idx] + logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "consumers_job_id" A..= show idx] pure . RerunAfter $ idays 1 From 6ad771a767d3d9ec87310827446093303a82c8f8 Mon Sep 17 00:00:00 2001 From: Raveline Date: Fri, 9 Jan 2026 15:48:41 +0100 Subject: [PATCH 14/14] Bump cabal version and changelog --- consumers/CHANGELOG.md | 3 +++ consumers/consumers.cabal | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/consumers/CHANGELOG.md b/consumers/CHANGELOG.md index 487016b..48ba251 100644 --- a/consumers/CHANGELOG.md +++ b/consumers/CHANGELOG.md @@ -1,3 +1,6 @@ +# consumers-2.4.0.0 (TBD) +* Add ability to reenqueue unfetchable jobs, let ccJobFetcher fail + # consumers-2.3.4.0 (2025-11-27) * Compatibility with `hpqtypes` >= 1.13.0.0. diff --git a/consumers/consumers.cabal b/consumers/consumers.cabal index 5754092..e2db5df 100644 --- a/consumers/consumers.cabal +++ b/consumers/consumers.cabal @@ -1,6 +1,6 @@ cabal-version: 3.0 name: consumers -version: 2.3.4.0 +version: 2.4.0.0 synopsis: Concurrent PostgreSQL data consumers description: Library for setting up concurrent consumers of data