diff --git a/fq_server/server.py b/fq_server/server.py index 2a06e7e..a8d122f 100644 --- a/fq_server/server.py +++ b/fq_server/server.py @@ -83,7 +83,7 @@ async def requeue_with_lock(self): @asynccontextmanager async def _lifespan(self, app: Starlette): # --- startup --- - await self.queue._initialize() + await self.queue.initialize() # mimic original behavior: use requeue_with_lock loop self._requeue_task = asyncio.create_task(self.requeue_with_lock()) diff --git a/tests/test_routes.py b/tests/test_routes.py index 1fc8d72..e8aa9bf 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -1,11 +1,13 @@ # tests/test_routes.py # -*- coding: utf-8 -*- -# Copyright ... +# Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. import os import unittest +import asyncio import ujson as json +from unittest.mock import AsyncMock, patch from httpx import AsyncClient, ASGITransport from starlette.types import ASGIApp from fq_server import setup_server @@ -49,7 +51,7 @@ async def asyncSetUp(self): # queue + redis client (async) self.queue = server.queue - await self.queue._initialize() # important: same loop as tests + await self.queue.initialize() # important: same loop as tests self.r = self.queue._r # flush redis before each test @@ -215,6 +217,399 @@ async def test_metrics_with_queue_type_and_queue_id(self): self.assertIn("enqueue_counts", data) self.assertIn("dequeue_counts", data) + # ===== NEW TESTS FOR UNCOVERED EXCEPTION PATHS ===== + + async def test_enqueue_malformed_json(self): + """Test enqueue with malformed JSON body.""" + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=b"invalid json {", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("message", response.json()) + + async def test_enqueue_empty_body(self): + """Test enqueue with empty body - fails because required fields missing.""" + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=b"", + headers={"Content-Type": "application/json"}, + ) + # Empty body becomes {}, but FQ requires payload, interval, job_id + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_enqueue_with_max_queued_length_not_exceeded(self): + """Test enqueue with max_queued_length when queue is below limit.""" + request_params = { + "job_id": "job-1", + "payload": {"message": "Test 1", "max_queued_length": 10}, + "interval": 1000, + } + response = await self.client.post( + "/enqueue/sms/test_queue_1/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()["status"], "queued") + self.assertEqual(response.json()["current_queue_length"], 0) + + async def test_enqueue_with_max_queued_length_exceeded(self): + """Test enqueue when max_queued_length is exceeded (429 response).""" + # First, enqueue some jobs to fill queue + for i in range(3): + request_params = { + "job_id": f"job-{i}", + "payload": {"message": f"Test {i}"}, + "interval": 1000, + } + await self.client.post( + "/enqueue/sms/test_queue_2/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + + # Now try to enqueue with max_queued_length=2 (should fail with 429) + request_params = { + "job_id": "job-overflow", + "payload": {"message": "Overflow", "max_queued_length": 2}, + "interval": 1000, + } + response = await self.client.post( + "/enqueue/sms/test_queue_2/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 429) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Max queue length reached", response.json()["message"]) + self.assertGreaterEqual(response.json()["current_queue_length"], 2) + + async def test_enqueue_get_queue_length_exception(self): + """Test enqueue when get_queue_length() raises an exception.""" + request_params = { + "job_id": "job-error", + "payload": {"message": "Error test", "max_queued_length": 5}, + "interval": 1000, + } + + # Mock get_queue_length to fail, but let enqueue succeed normally + with patch.object(self.queue, "get_queue_length", side_effect=Exception("Redis error")): + response = await self.client.post( + "/enqueue/sms/test_queue_3/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + # When get_queue_length fails, enqueue still succeeds with current_queue_length=0 + self.assertEqual(response.status_code, 201) + self.assertEqual(response.json()["status"], "queued") + self.assertEqual(response.json()["current_queue_length"], 0) + + async def test_enqueue_queue_enqueue_exception(self): + """Test enqueue when queue.enqueue() raises an exception.""" + request_params = { + "job_id": "job-queue-error", + "payload": {"message": "Queue error"}, + "interval": 1000, + } + + with patch.object(self.queue, "enqueue", side_effect=Exception("Queue error")): + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Queue error", response.json()["message"]) + + async def test_dequeue_get_queue_length_exception(self): + """Test dequeue when get_queue_length() raises an exception.""" + # First enqueue a job + request_params = { + "job_id": "job-for-dequeue", + "payload": {"message": "Dequeue test"}, + "interval": 1000, + } + await self.client.post( + "/enqueue/sms/dequeue_error_queue/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + + # Now dequeue but mock get_queue_length to fail + with patch.object(self.queue, "get_queue_length", side_effect=Exception("Redis error")): + response = await self.client.get("/dequeue/sms/") + # Should still return 200 but without current_queue_length + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["status"], "success") + self.assertEqual(response.json()["current_queue_length"], 0) + + async def test_dequeue_exception_general(self): + """Test dequeue when queue.dequeue() raises a general exception.""" + with patch.object(self.queue, "dequeue", side_effect=Exception("Dequeue failed")): + response = await self.client.get("/dequeue/sms/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Dequeue failed", response.json()["message"]) + + async def test_finish_exception(self): + """Test finish when queue.finish() raises an exception.""" + with patch.object(self.queue, "finish", side_effect=Exception("Finish error")): + response = await self.client.post( + "/finish/sms/johndoe/job-123/" + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Finish error", response.json()["message"]) + + async def test_interval_malformed_json(self): + """Test interval with malformed JSON body.""" + response = await self.client.post( + "/interval/sms/johndoe/", + content=b"invalid json", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_interval_missing_interval_key(self): + """Test interval request without 'interval' key.""" + request_params = {"some_other_key": 5000} + response = await self.client.post( + "/interval/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_interval_exception(self): + """Test interval when queue.interval() raises an exception.""" + request_params = {"interval": 5000} + + with patch.object(self.queue, "interval", side_effect=Exception("Interval error")): + response = await self.client.post( + "/interval/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Interval error", response.json()["message"]) + + async def test_metrics_exception(self): + """Test metrics when queue.metrics() raises an exception.""" + with patch.object(self.queue, "metrics", side_effect=Exception("Metrics error")): + response = await self.client.get("/metrics/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Metrics error", response.json()["message"]) + + async def test_metrics_with_queue_type_exception(self): + """Test metrics with queue_type when exception occurs.""" + with patch.object(self.queue, "metrics", side_effect=Exception("Metrics error")): + response = await self.client.get("/metrics/sms/") + self.assertEqual(response.status_code, 400) + + async def test_clear_queue_malformed_json(self): + """Test clear_queue with malformed JSON body.""" + response = await self.client.request( + "DELETE", + "/deletequeue/sms/johndoe/", + content=b"invalid json", + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + + async def test_clear_queue_exception(self): + """Test clear_queue when queue.clear_queue() raises an exception.""" + with patch.object(self.queue, "clear_queue", side_effect=Exception("Clear error")): + response = await self.client.delete("/deletequeue/sms/johndoe/") + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Clear error", response.json()["message"]) + + async def test_enqueue_max_length_with_queue_exception(self): + """Test enqueue max_queued_length when enqueue itself throws.""" + request_params = { + "job_id": "job-with-max", + "payload": {"message": "Test", "max_queued_length": 10}, + "interval": 1000, + } + + with patch.object(self.queue, "enqueue", side_effect=Exception("Enqueue failed")): + response = await self.client.post( + "/enqueue/sms/johndoe/", + content=json.dumps(request_params), + headers={"Content-Type": "application/json"}, + ) + self.assertEqual(response.status_code, 400) + self.assertEqual(response.json()["status"], "failure") + self.assertIn("Enqueue failed", response.json()["message"]) + + async def test_deep_status_exception(self): + """Test deep_status when queue.deep_status() raises an exception.""" + with patch.object(self.queue, "deep_status", side_effect=Exception("Status check failed")): + with self.assertRaises(Exception): + await self.client.get("/deepstatus/") + + async def test_deep_status_success(self): + """Test deep_status successful response.""" + response = await self.client.get("/deepstatus/") + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["status"], "success") + + # ===== TESTS FOR REQUEUE AND LIFESPAN ===== + + async def test_requeue_exception_handling(self): + """Test requeue loop catches and continues on exception.""" + server = self.server + + # Mock the queue.requeue to raise an exception + with patch.object(server.queue, "requeue", side_effect=Exception("Requeue failed")): + # Create a requeue coroutine and run it briefly + requeue_task = asyncio.create_task(server.requeue()) + + # Let it run for a short moment + await asyncio.sleep(0.1) + + # Cancel the task + requeue_task.cancel() + + with self.assertRaises(asyncio.CancelledError): + await requeue_task + + async def test_requeue_with_lock_disabled(self): + """Test requeue_with_lock when requeue is disabled.""" + server = self.server + + # Mock config to disable requeue + with patch.object(server.config, "get", return_value="false"): + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Should return immediately (task completes) + await asyncio.sleep(0.1) + + # Task should be done (returned, not cancelled) + self.assertTrue(requeue_task.done()) + + async def test_requeue_with_lock_lock_error(self): + """Test requeue_with_lock when lock acquisition fails with LockError.""" + from redis.exceptions import LockError + server = self.server + + # Create an async context manager that raises LockError on enter + class FailingLock: + async def __aenter__(self): + raise LockError("Failed to acquire lock") + + async def __aexit__(self, *args): + pass + + # Mock redis_client with a lock method that returns the failing lock + mock_redis = AsyncMock() + # Make lock a regular (non-async) function that returns the context manager + mock_redis.lock = lambda *args, **kwargs: FailingLock() + + with patch.object(server.queue, "redis_client", return_value=mock_redis): + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Let it try to acquire lock and handle LockError (sleeps and continues) + await asyncio.sleep(0.15) + + # Cancel it + requeue_task.cancel() + + try: + await requeue_task + except asyncio.CancelledError: + pass # Expected - loop continues after LockError, then cancelled + + async def test_requeue_with_lock_inner_exception(self): + """Test requeue_with_lock when requeue() inside lock context fails.""" + server = self.server + + # First request succeeds to get past initial try, second fails + call_count = [0] + + async def mock_requeue_with_failure(): + call_count[0] += 1 + if call_count[0] >= 1: # Fail on first and subsequent calls + raise Exception("Inner requeue error") + return None + + with patch.object(server.queue, "requeue", side_effect=mock_requeue_with_failure): + requeue_task = asyncio.create_task(server.requeue_with_lock()) + + # Let it run enough times to hit the exception in lock + await asyncio.sleep(0.15) + requeue_task.cancel() + + try: + await requeue_task + except asyncio.CancelledError: + pass # Expected - task was cancelled after executing exception code path + + +class FQServerLifespanTestCase(unittest.IsolatedAsyncioTestCase): + """Test FQServer lifespan (startup/shutdown).""" + + async def test_lifespan_startup_shutdown(self): + """Test lifespan startup and graceful shutdown.""" + config_path = os.path.join(os.path.dirname(__file__), "test.conf") + server = setup_server(config_path) + + # Simulate startup + app = server.app + lifespan_cm = server._lifespan(app) + + # Enter lifespan (startup) + await lifespan_cm.__aenter__() + + # Check that requeue task was created + self.assertIsNotNone(server._requeue_task) + self.assertFalse(server._requeue_task.done()) + + # Exit lifespan (shutdown) + try: + await lifespan_cm.__aexit__(None, None, None) + except asyncio.CancelledError: + # Expected if the requeue task is cancelled during shutdown + pass + + # Task should be cancelled or done + await asyncio.sleep(0.05) + self.assertTrue(server._requeue_task.done() or server._requeue_task.cancelled()) + + async def test_lifespan_initializes_queue(self): + """Test that lifespan calls queue.initialize().""" + config_path = os.path.join(os.path.dirname(__file__), "test.conf") + server = setup_server(config_path) + + # Stub out both queue.initialize and the background requeue task to make + # startup/shutdown deterministic and avoid hitting an uninitialized queue. + with patch.object(server.queue, "initialize", new_callable=AsyncMock) as mock_init, \ + patch.object(server, "requeue_with_lock", new_callable=AsyncMock): + lifespan_cm = server._lifespan(server.app) + await lifespan_cm.__aenter__() + + mock_init.assert_called_once() + + # Cleanup + if server._requeue_task is not None and not server._requeue_task.done(): + server._requeue_task.cancel() + try: + await lifespan_cm.__aexit__(None, None, None) + except asyncio.CancelledError: + # Expected if the requeue task is cancelled during shutdown + pass + + if __name__ == "__main__": unittest.main()