From 523d2ebed4902c85a94ef463d2cc0eeff9954723 Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" Date: Sun, 25 Jan 2026 22:12:57 +0100 Subject: [PATCH 1/5] Add tests to cover exception and edge cases in server routes Expands unit test coverage for server endpoints by adding tests for malformed inputs, Redis and queue exceptions, and various lifespan and requeue scenarios. Improves reliability by ensuring robust handling of error paths and edge cases. --- fq_server/server.py | 2 +- tests/test_routes.py | 376 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 376 insertions(+), 2 deletions(-) diff --git a/fq_server/server.py b/fq_server/server.py index 2a06e7e..a8d122f 100644 --- a/fq_server/server.py +++ b/fq_server/server.py @@ -83,7 +83,7 @@ async def requeue_with_lock(self): @asynccontextmanager async def _lifespan(self, app: Starlette): # --- startup --- - await self.queue._initialize() + await self.queue.initialize() # mimic original behavior: use requeue_with_lock loop self._requeue_task = asyncio.create_task(self.requeue_with_lock()) diff --git a/tests/test_routes.py b/tests/test_routes.py index 1fc8d72..e41a5d0 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -5,9 +5,12 @@ import os import unittest +import asyncio import ujson as json +from unittest.mock import AsyncMock, MagicMock, patch from httpx import AsyncClient, ASGITransport from starlette.types import ASGIApp +from redis.exceptions import LockError from fq_server import setup_server from fq_server.server import FQServer @@ -49,7 +52,7 @@ async def asyncSetUp(self): # queue + redis client (async) self.queue = server.queue - await self.queue._initialize() # important: same loop as tests + await self.queue.initialize() # important: same loop as tests self.r = self.queue._r # flush redis before each test @@ -215,6 +218,377 @@ async def test_metrics_with_queue_type_and_queue_id(self): self.assertIn("enqueue_counts", data) self.assertIn("dequeue_counts", data) + # ===== NEW TESTS FOR UNCOVERED EXCEPTION PATHS ===== + + async def test_enqueue_malformed_json(self): + """Test enqueue with malformed JSON body.""" + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=b"invalid json {", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("message", response.json()) + + async def test_enqueue_empty_body(self): + """Test enqueue with empty body - fails because required fields missing.""" + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=b"", + headers={"Content-Type": "application/json"}, + ) + # Empty body becomes {}, but FQ requires payload, interval, job_id + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_enqueue_with_max_queued_length_not_exceeded(self): + """Test enqueue with max_queued_length when queue is below limit.""" + request_params = { + "job_id": "job-1", + "payload": {"message": "Test 1", "max_queued_length": 10}, + "interval": 1000, + } + response = await self.client.post( + "/enqueue/sms/test_queue_1/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()["status"], "queued") + self.assertEqual(response.json()["current_queue_length"], 0) + + async def test_enqueue_with_max_queued_length_exceeded(self): + """Test enqueue when max_queued_length is exceeded (429 response).""" + # First, enqueue some jobs to fill queue + for i in range(3): + request_params = { + "job_id": f"job-{i}", + "payload": {"message": f"Test {i}"}, + "interval": 1000, + } + await self.client.post( + "/enqueue/sms/test_queue_2/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + + # Now try to enqueue with max_queued_length=2 (should fail with 429) + request_params = { + "job_id": "job-overflow", + "payload": {"message": "Overflow", "max_queued_length": 2}, + "interval": 1000, + } + response = await self.client.post( + "/enqueue/sms/test_queue_2/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 429) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Max queue length reached", response.json()["message"]) + self.assertGreaterEqual(response.json()["current_queue_length"], 2) + + async def test_enqueue_get_queue_length_exception(self): + """Test enqueue when get_queue_length() raises an exception.""" + request_params = { + "job_id": "job-error", + "payload": {"message": "Error test", "max_queued_length": 5}, + "interval": 1000, + } + + with patch.object(self.queue, "get_queue_length", side_effect=Exception("Redis error")): + response = await self.client.post( + "/enqueue/sms/test_queue_3/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + # Even if get_queue_length fails, enqueue proceeds (prints error) + # The exception is caught and printed; enqueue still attempts + # Check if response indicates the error + self.assertIn(response.status_code, [201, 400]) + + async def test_enqueue_queue_enqueue_exception(self): + """Test enqueue when queue.enqueue() raises an exception.""" + request_params = { + "job_id": "job-queue-error", + "payload": {"message": "Queue error"}, + "interval": 1000, + } + + with patch.object(self.queue, "enqueue", side_effect=Exception("Queue error")): + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Queue error", response.json()["message"]) + + async def test_dequeue_get_queue_length_exception(self): + """Test dequeue when get_queue_length() raises an exception.""" + # First enqueue a job + request_params = { + "job_id": "job-for-dequeue", + "payload": {"message": "Dequeue test"}, + "interval": 1000, + } + await self.client.post( + "/enqueue/sms/dequeue_error_queue/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + + # Now dequeue but mock get_queue_length to fail + with patch.object(self.queue, "get_queue_length", side_effect=Exception("Redis error")): + response = await self.client.get("/dequeue/sms/") + # Should still return 200 but without current_queue_length + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["status"], "success") + self.assertEqual(response.json()["current_queue_length"], 0) + + async def test_dequeue_exception_general(self): + """Test dequeue when queue.dequeue() raises a general exception.""" + with patch.object(self.queue, "dequeue", side_effect=Exception("Dequeue failed")): + response = await self.client.get("/dequeue/sms/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Dequeue failed", response.json()["message"]) + + async def test_finish_exception(self): + """Test finish when queue.finish() raises an exception.""" + with patch.object(self.queue, "finish", side_effect=Exception("Finish error")): + response = await self.client.post( + "/finish/sms/johndoe/job-123/" + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Finish error", response.json()["message"]) + + async def test_interval_malformed_json(self): + """Test interval with malformed JSON body.""" + response = await self.client.post( + "/interval/sms/johndoe/", + content=b"invalid json", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_interval_missing_interval_key(self): + """Test interval request without 'interval' key.""" + request_params = {"some_other_key": 5000} + response = await self.client.post( + "/interval/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_interval_exception(self): + """Test interval when queue.interval() raises an exception.""" + request_params = {"interval": 5000} + + with patch.object(self.queue, "interval", side_effect=Exception("Interval error")): + response = await self.client.post( + "/interval/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Interval error", response.json()["message"]) + + async def test_metrics_exception(self): + """Test metrics when queue.metrics() raises an exception.""" + with patch.object(self.queue, "metrics", side_effect=Exception("Metrics error")): + response = await self.client.get("/metrics/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Metrics error", response.json()["message"]) + + async def test_metrics_with_queue_type_exception(self): + """Test metrics with queue_type when exception occurs.""" + with patch.object(self.queue, "metrics", side_effect=Exception("Metrics error")): + response = await self.client.get("/metrics/sms/") + self.assertEqual(response.status_code, 400) + + async def test_clear_queue_malformed_json(self): + """Test clear_queue - testing through the server's request body parsing.""" + # Note: httpx doesn't easily let us send raw body with DELETE, + # so we test the exception path via mocking instead + with patch.object(self.queue, "clear_queue", side_effect=Exception("Clear error")): + # The server will still try to parse a body even if empty + response = await self.client.delete("/deletequeue/sms/johndoe/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_clear_queue_exception(self): + """Test clear_queue when queue.clear_queue() raises an exception.""" + with patch.object(self.queue, "clear_queue", side_effect=Exception("Clear error")): + response = await self.client.delete("/deletequeue/sms/johndoe/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Clear error", response.json()["message"]) + + async def test_enqueue_max_length_with_queue_exception(self): + """Test enqueue max_queued_length when enqueue itself throws.""" + request_params = { + "job_id": "job-with-max", + "payload": {"message": "Test", "max_queued_length": 10}, + "interval": 1000, + } + + with patch.object(self.queue, "enqueue", side_effect=Exception("Enqueue failed")): + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Enqueue failed", response.json()["message"]) + + async def test_deep_status_exception(self): + """Test deep_status when queue.deep_status() raises an exception.""" + with patch.object(self.queue, "deep_status", side_effect=Exception("Status check failed")): + with self.assertRaises(Exception): + response = await self.client.get("/deepstatus/") + + async def test_deep_status_success(self): + """Test deep_status successful response.""" + response = await self.client.get("/deepstatus/") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["status"], "success") + + # ===== TESTS FOR REQUEUE AND LIFESPAN ===== + + async def test_requeue_exception_handling(self): + """Test requeue loop catches and continues on exception.""" + server = self.server + + # Mock the queue.requeue to raise an exception + with patch.object(server.queue, "requeue", side_effect=Exception("Requeue failed")): + # Create a requeue coroutine and run it briefly + requeue_task = asyncio.create_task(server.requeue()) + + # Let it run for a short moment + await asyncio.sleep(0.1) + + # Cancel the task + requeue_task.cancel() + + with self.assertRaises(asyncio.CancelledError): + await requeue_task + + async def test_requeue_with_lock_disabled(self): + """Test requeue_with_lock when requeue is disabled.""" + server = self.server + + # Mock config to disable requeue + with patch.object(server.config, "get", return_value="false"): + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Should return immediately (task completes) + await asyncio.sleep(0.1) + + # Task should be done (returned, not cancelled) + self.assertTrue(requeue_task.done()) + + async def test_requeue_with_lock_lock_error(self): + """Test requeue_with_lock when lock acquisition fails with LockError.""" + server = self.server + + # Use a real redis lock that times out + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Let it try to acquire lock and timeout (the default behavior when another process holds it) + await asyncio.sleep(0.15) + + # Cancel it + requeue_task.cancel() + + try: + await requeue_task + except asyncio.CancelledError: + pass # Expected behavior + + async def test_requeue_with_lock_inner_exception(self): + """Test requeue_with_lock when requeue() inside lock context fails.""" + server = self.server + + # First request succeeds to get past initial try, second fails + call_count = [0] + + async def mock_requeue_with_failure(): + call_count[0] += 1 + if call_count[0] >= 1: # Fail on first and subsequent calls + raise Exception("Inner requeue error") + return None + + with patch.object(server.queue, "requeue", side_effect=mock_requeue_with_failure): + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Let it run enough times to hit the exception in lock + await asyncio.sleep(0.15) + requeue_task.cancel() + + try: + await requeue_task + except asyncio.CancelledError: + pass # Expected - task was cancelled after executing exception code path + + +class FQServerLifespanTestCase(unittest.IsolatedAsyncioTestCase): + """Test FQServer lifespan (startup/shutdown).""" + + async def test_lifespan_startup_shutdown(self): + """Test lifespan startup and graceful shutdown.""" + config_path = os.path.join(os.path.dirname(__file__), "test.conf") + server = setup_server(config_path) + + # Simulate startup + app = server.app + lifespan_cm = server._lifespan(app) + + # Enter lifespan (startup) + await lifespan_cm.__aenter__() + + # Check that requeue task was created + self.assertIsNotNone(server._requeue_task) + self.assertFalse(server._requeue_task.done()) + + # Exit lifespan (shutdown) + try: + await lifespan_cm.__aexit__(None, None, None) + except Exception: + pass # May raise if task is cancelled + + # Task should be cancelled or done + await asyncio.sleep(0.05) + self.assertTrue(server._requeue_task.done() or server._requeue_task.cancelled()) + + async def test_lifespaninitializes_queue(self): + """Test that lifespan calls queue.initialize().""" + config_path = os.path.join(os.path.dirname(__file__), "test.conf") + server = setup_server(config_path) + + with patch.object(server.queue, "initialize", new_callable=AsyncMock) as mock_init: + lifespan_cm = server._lifespan(server.app) + await lifespan_cm.__aenter__() + + mock_init.assert_called_once() + + # Cleanup + server._requeue_task.cancel() + try: + await lifespan_cm.__aexit__(None, None, None) + except: + pass + + if __name__ == "__main__": unittest.main() From 907b515844b1a57b3126658b076df2b14a400a6b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 25 Jan 2026 21:26:25 +0000 Subject: [PATCH 2/5] Initial plan From 2d71bfe9af7fb310888ee54d8a53875e92493cb7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 25 Jan 2026 21:30:40 +0000 Subject: [PATCH 3/5] Address review comments: fix unused imports, improve test assertions and exception handling Co-authored-by: ochui <21917688+ochui@users.noreply.github.com> --- tests/test_routes.py | 81 +++++++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 31 deletions(-) diff --git a/tests/test_routes.py b/tests/test_routes.py index e41a5d0..784d2be 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -7,10 +7,9 @@ import unittest import asyncio import ujson as json -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, patch from httpx import AsyncClient, ASGITransport from starlette.types import ASGIApp -from redis.exceptions import LockError from fq_server import setup_server from fq_server.server import FQServer @@ -297,16 +296,17 @@ async def test_enqueue_get_queue_length_exception(self): "interval": 1000, } + # Mock get_queue_length to fail, but let enqueue succeed normally with patch.object(self.queue, "get_queue_length", side_effect=Exception("Redis error")): response = await self.client.post( "/enqueue/sms/test_queue_3/", content=json.dumps(request_params), headers={"Content-Type": "application/json"}, ) - # Even if get_queue_length fails, enqueue proceeds (prints error) - # The exception is caught and printed; enqueue still attempts - # Check if response indicates the error - self.assertIn(response.status_code, [201, 400]) + # When get_queue_length fails, enqueue still succeeds with current_queue_length=0 + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()["status"], "queued") + self.assertEqual(response.json()["current_queue_length"], 0) async def test_enqueue_queue_enqueue_exception(self): """Test enqueue when queue.enqueue() raises an exception.""" @@ -416,14 +416,15 @@ async def test_metrics_with_queue_type_exception(self): self.assertEqual(response.status_code, 400) async def test_clear_queue_malformed_json(self): - """Test clear_queue - testing through the server's request body parsing.""" - # Note: httpx doesn't easily let us send raw body with DELETE, - # so we test the exception path via mocking instead - with patch.object(self.queue, "clear_queue", side_effect=Exception("Clear error")): - # The server will still try to parse a body even if empty - response = await self.client.delete("/deletequeue/sms/johndoe/") - self.assertEqual(response.status_code, 400) - self.assertEqual(response.json()["status"], "failure") + """Test clear_queue with malformed JSON body.""" + response = await self.client.request( + "DELETE", + "/deletequeue/sms/johndoe/", + content=b"invalid json", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") async def test_clear_queue_exception(self): """Test clear_queue when queue.clear_queue() raises an exception.""" @@ -455,7 +456,7 @@ async def test_deep_status_exception(self): """Test deep_status when queue.deep_status() raises an exception.""" with patch.object(self.queue, "deep_status", side_effect=Exception("Status check failed")): with self.assertRaises(Exception): - response = await self.client.get("/deepstatus/") + await self.client.get("/deepstatus/") async def test_deep_status_success(self): """Test deep_status successful response.""" @@ -499,21 +500,33 @@ async def test_requeue_with_lock_disabled(self): async def test_requeue_with_lock_lock_error(self): """Test requeue_with_lock when lock acquisition fails with LockError.""" + from redis.exceptions import LockError server = self.server - # Use a real redis lock that times out - requeue_task = asyncio.create_task(server.requeue_with_lock()) + # Create a mock lock that raises LockError when used as async context manager + mock_lock = AsyncMock() + mock_lock.__aenter__.side_effect = LockError("Failed to acquire lock") - # Let it try to acquire lock and timeout (the default behavior when another process holds it) - await asyncio.sleep(0.15) + # Mock redis_client to return a mock with lock method that returns the failing lock + mock_redis = AsyncMock() + mock_redis.lock.return_value = mock_lock - # Cancel it - requeue_task.cancel() - - try: - await requeue_task - except asyncio.CancelledError: - pass # Expected behavior + with patch.object(server.queue, "redis_client", return_value=mock_redis): + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Let it try to acquire lock and handle LockError (sleeps and continues) + await asyncio.sleep(0.15) + + # Verify the lock was attempted + self.assertTrue(mock_redis.lock.called) + + # Cancel it + requeue_task.cancel() + + try: + await requeue_task + except asyncio.CancelledError: + pass # Expected - loop continues after LockError, then cancelled async def test_requeue_with_lock_inner_exception(self): """Test requeue_with_lock when requeue() inside lock context fails.""" @@ -563,8 +576,9 @@ async def test_lifespan_startup_shutdown(self): # Exit lifespan (shutdown) try: await lifespan_cm.__aexit__(None, None, None) - except Exception: - pass # May raise if task is cancelled + except asyncio.CancelledError: + # Expected if the requeue task is cancelled during shutdown + pass # Task should be cancelled or done await asyncio.sleep(0.05) @@ -575,17 +589,22 @@ async def test_lifespaninitializes_queue(self): config_path = os.path.join(os.path.dirname(__file__), "test.conf") server = setup_server(config_path) - with patch.object(server.queue, "initialize", new_callable=AsyncMock) as mock_init: + # Stub out both queue.initialize and the background requeue task to make + # startup/shutdown deterministic and avoid hitting an uninitialized queue. + with patch.object(server.queue, "initialize", new_callable=AsyncMock) as mock_init, \ + patch.object(server, "requeue_with_lock", new_callable=AsyncMock): lifespan_cm = server._lifespan(server.app) await lifespan_cm.__aenter__() mock_init.assert_called_once() # Cleanup - server._requeue_task.cancel() + if server._requeue_task is not None and not server._requeue_task.done(): + server._requeue_task.cancel() try: await lifespan_cm.__aexit__(None, None, None) - except: + except asyncio.CancelledError: + # Expected if the requeue task is cancelled during shutdown pass From cac9d56d40a2a13804c2c18129961efff5683679 Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" Date: Sun, 25 Jan 2026 22:41:21 +0100 Subject: [PATCH 4/5] Improves lock error simulation in async test Refactors the test to use a custom async context manager that raises the lock error, providing more accurate simulation of lock acquisition failure. Removes unnecessary assertions on mock calls for clarity. --- tests/test_routes.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/test_routes.py b/tests/test_routes.py index 784d2be..6b08d2c 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -1,7 +1,7 @@ # tests/test_routes.py # -*- coding: utf-8 -*- -# Copyright ... +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. import os import unittest @@ -503,13 +503,18 @@ async def test_requeue_with_lock_lock_error(self): from redis.exceptions import LockError server = self.server - # Create a mock lock that raises LockError when used as async context manager - mock_lock = AsyncMock() - mock_lock.__aenter__.side_effect = LockError("Failed to acquire lock") + # Create an async context manager that raises LockError on enter + class FailingLock: + async def __aenter__(self): + raise LockError("Failed to acquire lock") + + async def __aexit__(self, *args): + pass - # Mock redis_client to return a mock with lock method that returns the failing lock + # Mock redis_client with a lock method that returns the failing lock mock_redis = AsyncMock() - mock_redis.lock.return_value = mock_lock + # Make lock a regular (non-async) function that returns the context manager + mock_redis.lock = lambda *args, **kwargs: FailingLock() with patch.object(server.queue, "redis_client", return_value=mock_redis): requeue_task = asyncio.create_task(server.requeue_with_lock()) @@ -517,9 +522,6 @@ async def test_requeue_with_lock_lock_error(self): # Let it try to acquire lock and handle LockError (sleeps and continues) await asyncio.sleep(0.15) - # Verify the lock was attempted - self.assertTrue(mock_redis.lock.called) - # Cancel it requeue_task.cancel() From ccf164ef95048327dc6e892d5dcf84d87ee6bb62 Mon Sep 17 00:00:00 2001 From: "Ochui, Princewill Patrick" Date: Sun, 25 Jan 2026 22:51:57 +0100 Subject: [PATCH 5/5] Corrects test method naming for consistency --- tests/test_routes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_routes.py b/tests/test_routes.py index 6b08d2c..e8aa9bf 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -586,7 +586,7 @@ async def test_lifespan_startup_shutdown(self): await asyncio.sleep(0.05) self.assertTrue(server._requeue_task.done() or server._requeue_task.cancelled()) - async def test_lifespaninitializes_queue(self): + async def test_lifespan_initializes_queue(self): """Test that lifespan calls queue.initialize().""" config_path = os.path.join(os.path.dirname(__file__), "test.conf") server = setup_server(config_path)