From dc05bca563ffb26cb93901b535693ebd712e39e5 Mon Sep 17 00:00:00 2001 From: Carlos Date: Wed, 24 Dec 2025 15:29:30 +0100 Subject: [PATCH 1/2] feat: Add Celery task queue for event-driven test processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace cron-based polling with an optional Celery task queue system for faster, more reliable test execution. This architectural improvement enables event-driven processing, parallel test execution, and better retry handling. New files: - celery_app.py: Celery application factory with Flask context - mod_ci/tasks.py: Task definitions (start_test, cleanup, pending check) - install/celery-worker.service: Systemd service for Celery worker - install/celery-beat.service: Systemd service for Celery beat scheduler - tests/test_ci/test_tasks.py: Unit tests for Celery functionality Modified files: - requirements.txt: Added celery[redis] and redis packages - config_sample.py: Added Celery configuration options - mod_ci/controllers.py: Added trigger_test_tasks(), updated add_test_entry() - install/installation.md: Added Celery setup documentation Key features: - USE_CELERY_TASKS feature flag for gradual migration (default: False) - Parallel mode: cron continues as fallback during transition - Three task queues: default, test_execution, maintenance - Periodic tasks via Celery Beat for cleanup and pending test discovery - Graceful degradation if Celery unavailable 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- celery_app.py | 81 ++++++++++++ config_sample.py | 18 +++ install/celery-beat.service | 23 ++++ install/celery-worker.service | 30 +++++ install/installation.md | 101 ++++++++++++++ mod_ci/controllers.py | 61 ++++++++- mod_ci/tasks.py | 241 ++++++++++++++++++++++++++++++++++ requirements.txt | 2 + tests/test_ci/test_tasks.py | 128 ++++++++++++++++++ 9 files changed, 679 insertions(+), 6 deletions(-) create mode 100644 celery_app.py create mode 100644 install/celery-beat.service create mode 100644 install/celery-worker.service create mode 100644 mod_ci/tasks.py create mode 100644 tests/test_ci/test_tasks.py diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 00000000..1d232165 --- /dev/null +++ b/celery_app.py @@ -0,0 +1,81 @@ +"""Celery application factory and configuration for Sample Platform.""" + +import os + +from celery import Celery +from celery.schedules import crontab + +# Load configuration - use empty dict for testing when config.py doesn't exist +try: + from config_parser import parse_config + config = parse_config('config') +except Exception: + # In test environment, config.py may not exist + config = {} + + +def make_celery(app=None): + """ + Create a Celery application configured for the Sample Platform. + + :param app: Optional Flask application for context binding + :return: Configured Celery application + """ + celery_app = Celery( + 'sample_platform', + broker=config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), + backend=config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), + include=['mod_ci.tasks'] + ) + + # Apply configuration from config.py + celery_app.conf.update( + task_serializer=config.get('CELERY_TASK_SERIALIZER', 'json'), + result_serializer=config.get('CELERY_RESULT_SERIALIZER', 'json'), + accept_content=config.get('CELERY_ACCEPT_CONTENT', ['json']), + timezone=config.get('CELERY_TIMEZONE', 'UTC'), + enable_utc=config.get('CELERY_ENABLE_UTC', True), + task_acks_late=config.get('CELERY_TASK_ACKS_LATE', True), + worker_prefetch_multiplier=config.get('CELERY_WORKER_PREFETCH_MULTIPLIER', 1), + task_reject_on_worker_lost=config.get('CELERY_TASK_REJECT_ON_WORKER_LOST', True), + task_soft_time_limit=config.get('CELERY_TASK_SOFT_TIME_LIMIT', 3600), + task_time_limit=config.get('CELERY_TASK_TIME_LIMIT', 3900), + ) + + # Beat schedule for periodic tasks + celery_app.conf.beat_schedule = { + 'check-expired-instances-every-5-minutes': { + 'task': 'mod_ci.tasks.check_expired_instances_task', + 'schedule': crontab(minute='*/5'), + 'options': {'queue': 'maintenance'} + }, + 'process-pending-tests-every-minute': { + 'task': 'mod_ci.tasks.process_pending_tests_task', + 'schedule': crontab(minute='*'), + 'options': {'queue': 'default'} + }, + } + + # Queue routing + celery_app.conf.task_routes = { + 'mod_ci.tasks.start_test_task': {'queue': 'test_execution'}, + 'mod_ci.tasks.check_expired_instances_task': {'queue': 'maintenance'}, + 'mod_ci.tasks.process_pending_tests_task': {'queue': 'default'}, + } + + # If Flask app is provided, bind tasks to its context + if app is not None: + class ContextTask(celery_app.Task): + """Task base class that maintains Flask application context.""" + + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app.Task = ContextTask + + return celery_app + + +# Create the default celery instance (used by worker when started standalone) +celery = make_celery() diff --git a/config_sample.py b/config_sample.py index 34f30a46..2b2fd64f 100755 --- a/config_sample.py +++ b/config_sample.py @@ -37,3 +37,21 @@ GCP_INSTANCE_MAX_RUNTIME = 120 # In minutes GCS_BUCKET_NAME = 'spdev' GCS_SIGNED_URL_EXPIRY_LIMIT = 720 # In minutes + + +# CELERY TASK QUEUE CONFIG +CELERY_BROKER_URL = 'redis://localhost:6379/0' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TIMEZONE = 'UTC' +CELERY_ENABLE_UTC = True +CELERY_TASK_ACKS_LATE = True # Task acknowledged after completion +CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # One task at a time per worker +CELERY_TASK_REJECT_ON_WORKER_LOST = True # Requeue tasks if worker dies +CELERY_TASK_SOFT_TIME_LIMIT = 3600 # 1 hour soft limit +CELERY_TASK_TIME_LIMIT = 3900 # 1 hour 5 minutes hard limit + +# Feature flag for gradual migration (set to True to enable Celery, False for cron fallback) +USE_CELERY_TASKS = False diff --git a/install/celery-beat.service b/install/celery-beat.service new file mode 100644 index 00000000..81fce68b --- /dev/null +++ b/install/celery-beat.service @@ -0,0 +1,23 @@ +[Unit] +Description=Sample Platform Celery Beat Scheduler +After=network.target redis.service celery-worker.service +Requires=redis.service + +[Service] +Type=simple +User=www-data +Group=www-data +WorkingDirectory=/var/www/sample-platform +Environment="PATH=/var/www/sample-platform/venv/bin" +ExecStart=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery beat \ + --pidfile=/var/run/celery/beat.pid \ + --logfile=/var/www/sample-platform/logs/celery/beat.log \ + --loglevel=INFO \ + --schedule=/var/www/sample-platform/celerybeat-schedule +RuntimeDirectory=celery +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target diff --git a/install/celery-worker.service b/install/celery-worker.service new file mode 100644 index 00000000..d9c1cfaa --- /dev/null +++ b/install/celery-worker.service @@ -0,0 +1,30 @@ +[Unit] +Description=Sample Platform Celery Worker +After=network.target redis.service mysql.service +Requires=redis.service + +[Service] +Type=forking +User=www-data +Group=www-data +WorkingDirectory=/var/www/sample-platform +Environment="PATH=/var/www/sample-platform/venv/bin" +ExecStart=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery multi start worker \ + --pidfile=/var/run/celery/%n.pid \ + --logfile=/var/www/sample-platform/logs/celery/%n%I.log \ + --loglevel=INFO \ + -Q default,test_execution,maintenance \ + --concurrency=2 +ExecStop=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery multi stopwait worker \ + --pidfile=/var/run/celery/%n.pid +ExecReload=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery multi restart worker \ + --pidfile=/var/run/celery/%n.pid +RuntimeDirectory=celery +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target diff --git a/install/installation.md b/install/installation.md index 0b577cec..839bcac0 100644 --- a/install/installation.md +++ b/install/installation.md @@ -217,6 +217,107 @@ The file `mod_ci/cron.py` is to be run in periodic intervals. To setup a cron jo ``` Change the `/var/www/sample-plaform` directory, if you have installed the platform in a different directory. +## Optional: Setting up Celery Task Queue + +As an alternative to cron-based polling, you can use Celery with Redis for event-driven test processing. This provides faster test execution, better retry handling, and parallel processing. + +### Installing Redis + +```bash +sudo apt update +sudo apt install redis-server + +# Configure Redis +sudo nano /etc/redis/redis.conf +# Set: supervised systemd +# Set: bind 127.0.0.1 ::1 + +# Enable and start Redis +sudo systemctl enable redis-server +sudo systemctl start redis-server + +# Verify Redis is running +redis-cli ping # Should return PONG +``` + +### Configuring Celery + +Add the following to your `config.py`: + +```python +# Celery Configuration +CELERY_BROKER_URL = 'redis://localhost:6379/0' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' +USE_CELERY_TASKS = True # Set to False to use cron instead +``` + +### Installing Celery Services + +```bash +# Create log directory +sudo mkdir -p /var/www/sample-platform/logs/celery +sudo chown -R www-data:www-data /var/www/sample-platform/logs/celery + +# Create runtime directory +sudo mkdir -p /var/run/celery +sudo chown www-data:www-data /var/run/celery + +# Install systemd services +sudo cp /var/www/sample-platform/install/celery-worker.service /etc/systemd/system/ +sudo cp /var/www/sample-platform/install/celery-beat.service /etc/systemd/system/ + +# Reload systemd and enable services +sudo systemctl daemon-reload +sudo systemctl enable celery-worker celery-beat + +# Start the services +sudo systemctl start celery-worker +sudo systemctl start celery-beat +``` + +### Monitoring Celery + +```bash +# Check worker status +celery -A celery_app.celery inspect active + +# Check queue depth +redis-cli LLEN celery + +# View logs +tail -f /var/www/sample-platform/logs/celery/*.log + +# Optional: Install Flower for web-based monitoring +pip install flower +celery -A celery_app.celery flower --port=5555 +``` + +### Gradual Migration + +For a safe transition from cron to Celery: + +1. **Stage 1**: Set `USE_CELERY_TASKS = False` and keep cron running. Start Celery services and verify they work correctly in logs. + +2. **Stage 2**: Set `USE_CELERY_TASKS = True`. Reduce cron frequency to every 30 minutes as a fallback. + +3. **Stage 3**: Disable cron entirely once you're confident Celery is working correctly. + +### Rollback to Cron + +If you need to disable Celery: + +```bash +# Stop Celery services +sudo systemctl stop celery-beat celery-worker + +# Edit config.py and set USE_CELERY_TASKS = False + +# Restart platform +sudo systemctl restart platform + +# Ensure cron is running every 10 minutes +``` + ## GCS configuration to serve file downloads using Signed URLs To serve file downloads directly from the private GCS bucket, Signed download URLs have been used. diff --git a/mod_ci/controllers.py b/mod_ci/controllers.py index 2b5b1169..96ad3193 100755 --- a/mod_ci/controllers.py +++ b/mod_ci/controllers.py @@ -1124,7 +1124,7 @@ def save_xml_to_file(xml_node, folder_name, file_name) -> None: ) -def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: +def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list: """ Add test details entry into Test model for each platform. @@ -1140,8 +1140,8 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: :type branch: str :param pr_nr: Pull Request number, if applicable. :type pr_nr: int - :return: Nothing - :rtype: None + :return: List of created test IDs + :rtype: list """ from run import log @@ -1149,7 +1149,7 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: # Based on issue identified by NexionisJake in PR #937 if not is_valid_commit_hash(commit): log.error(f"Invalid commit hash '{commit}' - skipping test entry creation") - return + return [] fork_url = f"%/{g.github['repository_owner']}/{g.github['repository']}.git" fork = Fork.query.filter(Fork.github.like(fork_url)).first() @@ -1158,12 +1158,59 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: log.debug('pull request test type detected') branch = "pull_request" + test_ids = [] linux_test = Test(TestPlatform.linux, test_type, fork.id, branch, commit, pr_nr) db.add(linux_test) + db.flush() # Get ID before commit + test_ids.append(linux_test.id) + windows_test = Test(TestPlatform.windows, test_type, fork.id, branch, commit, pr_nr) db.add(windows_test) + db.flush() # Get ID before commit + test_ids.append(windows_test.id) + if not safe_db_commit(db, f"adding test entries for commit {commit[:7]}"): log.error(f"Failed to add test entries for commit {commit}") + return [] + + return test_ids + + +def trigger_test_tasks(test_ids: list, bot_token: str) -> None: + """ + Optionally trigger Celery tasks for newly created tests. + + Only triggers if USE_CELERY_TASKS is True in config. + Falls back to waiting for cron/periodic task otherwise. + + :param test_ids: List of Test IDs to queue + :type test_ids: list + :param bot_token: GitHub bot token + :type bot_token: str + """ + from run import config, log + + if not config.get('USE_CELERY_TASKS', False): + log.debug("Celery tasks disabled, tests will be picked up by cron/periodic task") + return + + if not test_ids: + return + + try: + from mod_ci.tasks import start_test_task + + for test_id in test_ids: + start_test_task.apply_async( + args=[test_id, bot_token], + queue='test_execution', + countdown=30 # 30 second delay for artifact upload to complete + ) + log.info(f"Queued test {test_id} via Celery") + except ImportError: + log.warning("Celery tasks module not available, falling back to cron") + except Exception as e: + log.error(f"Failed to queue Celery tasks: {e}, tests will be picked up by cron") def schedule_test(gh_commit: Commit.Commit) -> None: @@ -1429,7 +1476,8 @@ def start_ci(): last_commit.value = ref.object.sha if not safe_db_commit(g.db, "updating last commit"): return 'ERROR' - add_test_entry(g.db, commit_hash, TestType.commit) + test_ids = add_test_entry(g.db, commit_hash, TestType.commit) + trigger_test_tasks(test_ids, g.github['bot_token']) else: g.log.warning('Unknown push type! Dumping payload for analysis') g.log.warning(payload) @@ -1459,7 +1507,8 @@ def start_ci(): try: pr = retry_with_backoff(lambda: repository.get_pull(number=pr_nr)) if pr.mergeable is not False: - add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr) + test_ids = add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr) + trigger_test_tasks(test_ids, g.github['bot_token']) except GithubException as e: g.log.error(f"Failed to get PR {pr_nr} after retries: {e}") diff --git a/mod_ci/tasks.py b/mod_ci/tasks.py new file mode 100644 index 00000000..d1c140aa --- /dev/null +++ b/mod_ci/tasks.py @@ -0,0 +1,241 @@ +"""Celery tasks for CI platform operations.""" + +from celery import shared_task +from celery.exceptions import SoftTimeLimitExceeded +from celery.utils.log import get_task_logger +from github import Auth, Github, GithubException + +from celery_app import celery + +logger = get_task_logger(__name__) + + +@celery.task( + bind=True, + max_retries=3, + default_retry_delay=60, + autoretry_for=(GithubException,), + retry_backoff=True, + retry_backoff_max=300, + acks_late=True +) +def start_test_task(self, test_id: int, bot_token: str): + """ + Execute a single test by creating a GCP VM instance. + + This task wraps the existing start_test() function with Celery's + retry mechanisms and proper error handling. + + :param test_id: The ID of the Test to execute + :param bot_token: GitHub bot token for artifact download + :return: Dict with status and message + """ + # Import inside task to avoid circular imports and ensure fresh Flask context + from run import app, config + + from database import create_session + from mod_ci.controllers import ( + get_compute_service_object, + mark_test_failed, + start_test, + ) + from mod_ci.models import GcpInstance + from mod_test.models import Test + + with app.app_context(): + db = create_session(config['DATABASE_URI']) + + try: + # Fetch the test + test = Test.query.get(test_id) + if test is None: + logger.error(f"Test {test_id} not found") + return {'status': 'error', 'message': 'Test not found'} + + # Check if test is already finished + if test.finished: + logger.info(f"Test {test_id} already finished, skipping") + return {'status': 'skipped', 'message': 'Test already finished'} + + # Check if test already has a GCP instance (prevent duplicates) + existing_instance = GcpInstance.query.filter( + GcpInstance.test_id == test_id + ).first() + if existing_instance is not None: + logger.info(f"Test {test_id} already has GCP instance, skipping") + return {'status': 'skipped', 'message': 'Test already has instance'} + + # Get GitHub repository + gh = Github(auth=Auth.Token(bot_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + + # Execute the test + compute = get_compute_service_object() + start_test(compute, app, db, repository, test, bot_token) + + logger.info(f"Test {test_id} started successfully") + return {'status': 'success', 'test_id': test_id} + + except SoftTimeLimitExceeded: + logger.error(f"Test {test_id} exceeded time limit") + try: + test = Test.query.get(test_id) + if test and not test.finished: + gh = Github(auth=Auth.Token(bot_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + mark_test_failed(db, test, repository, "Task timed out") + except Exception as mark_error: + logger.error(f"Failed to mark test {test_id} as failed: {mark_error}") + raise + + except Exception as e: + logger.exception(f"Error starting test {test_id}: {e}") + # Retry on transient failures + if self.request.retries < self.max_retries: + raise self.retry(exc=e) + # Final failure - mark test as failed + try: + test = Test.query.get(test_id) + if test and not test.finished: + gh = Github(auth=Auth.Token(bot_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + mark_test_failed(db, test, repository, f"Task failed: {str(e)[:100]}") + except Exception as mark_error: + logger.error(f"Failed to mark test {test_id} as failed: {mark_error}") + raise + + finally: + db.remove() + + +@celery.task(bind=True, acks_late=True) +def check_expired_instances_task(self): + """ + Periodic task to clean up expired GCP instances. + + This wraps delete_expired_instances() for Celery scheduling. + + :return: Dict with status and message + """ + from run import app, config + + from database import create_session + from github import Auth, Github + from mod_ci.controllers import delete_expired_instances, get_compute_service_object + + with app.app_context(): + db = create_session(config['DATABASE_URI']) + + try: + vm_max_runtime = config.get('GCP_INSTANCE_MAX_RUNTIME', 120) + zone = config.get('ZONE', '') + project = config.get('PROJECT_NAME', '') + + if not zone or not project: + logger.error('GCP zone or project not configured') + return {'status': 'error', 'message': 'GCP not configured'} + + # Get GitHub repository + github_token = config.get('GITHUB_TOKEN', '') + if not github_token: + logger.error('GitHub token not configured') + return {'status': 'error', 'message': 'GitHub token missing'} + + gh = Github(auth=Auth.Token(github_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + + compute = get_compute_service_object() + delete_expired_instances( + compute, vm_max_runtime, project, zone, db, repository + ) + + logger.info('Expired instances check completed') + return {'status': 'success'} + + except Exception as e: + logger.exception(f"Error checking expired instances: {e}") + return {'status': 'error', 'message': str(e)} + + finally: + db.remove() + + +@celery.task(bind=True, acks_late=True) +def process_pending_tests_task(self): + """ + Periodic task to find and queue pending tests for execution. + + This replaces the cron-based approach by finding pending tests + and dispatching individual start_test_task for each. + + :return: Dict with status and count of queued tests + """ + from run import app, config + + from database import create_session + from mod_ci.models import GcpInstance, MaintenanceMode + from mod_test.models import Test, TestPlatform, TestProgress, TestStatus + + with app.app_context(): + db = create_session(config['DATABASE_URI']) + + try: + github_token = config.get('GITHUB_TOKEN', '') + if not github_token: + logger.error('GitHub token not configured') + return {'status': 'error', 'message': 'GitHub token missing'} + + bot_token = github_token + queued_count = 0 + + # Find pending tests for each platform + for platform in [TestPlatform.linux, TestPlatform.windows]: + # Check maintenance mode + maintenance_mode = MaintenanceMode.query.filter( + MaintenanceMode.platform == platform + ).first() + if maintenance_mode is not None and maintenance_mode.disabled: + logger.debug(f'[{platform.value}] In maintenance mode, skipping') + continue + + # Get tests with progress (finished or in progress) + finished_tests = db.query(TestProgress.test_id).filter( + TestProgress.status.in_([TestStatus.canceled, TestStatus.completed]) + ) + + # Get tests with GCP instances (currently running) + running_tests = db.query(GcpInstance.test_id) + + # Find pending tests (limit to 5 per platform per run) + pending_tests = Test.query.filter( + Test.id.notin_(finished_tests), + Test.id.notin_(running_tests), + Test.platform == platform + ).order_by(Test.id.asc()).limit(5).all() + + for test in pending_tests: + # Queue each test as a separate task + start_test_task.apply_async( + args=[test.id, bot_token], + queue='test_execution', + countdown=1 # Small delay between tasks + ) + queued_count += 1 + logger.info(f'Queued test {test.id} for {platform.value}') + + return {'status': 'success', 'queued_count': queued_count} + + except Exception as e: + logger.exception(f"Error processing pending tests: {e}") + return {'status': 'error', 'message': str(e)} + + finally: + db.remove() diff --git a/requirements.txt b/requirements.txt index db49fccb..10112768 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ cffi==2.0.0 PyGithub==2.8.1 blinker==1.9.0 click==8.1.7 +celery[redis]==5.3.6 +redis==5.0.1 diff --git a/tests/test_ci/test_tasks.py b/tests/test_ci/test_tasks.py new file mode 100644 index 00000000..7190e227 --- /dev/null +++ b/tests/test_ci/test_tasks.py @@ -0,0 +1,128 @@ +"""Unit tests for Celery tasks and related controller functions.""" + +import unittest +from unittest import mock +from unittest.mock import MagicMock, patch + +from flask import g + +from mod_test.models import TestType +from tests.base import BaseTestCase + + +class TestTriggerTestTasks(BaseTestCase): + """Test cases for trigger_test_tasks function.""" + + def test_trigger_test_tasks_disabled_by_default(self): + """Test that tasks are not triggered when USE_CELERY_TASKS is False (default).""" + from mod_ci.controllers import trigger_test_tasks + + # By default, USE_CELERY_TASKS is False + with self.app.app_context(): + # Should not raise exception and should log debug message + trigger_test_tasks([1, 2], 'fake_token') + # Function returns silently when disabled + + def test_trigger_test_tasks_empty_list(self): + """Test that empty list doesn't trigger any tasks.""" + from mod_ci.controllers import trigger_test_tasks + + with patch.dict(self.app.config, {'USE_CELERY_TASKS': True}): + with self.app.app_context(): + # Should return early without error + trigger_test_tasks([], 'fake_token') + + def test_trigger_test_tasks_handles_import_error(self): + """Test graceful handling when Celery module is not available.""" + from mod_ci.controllers import trigger_test_tasks + + with patch.dict(self.app.config, {'USE_CELERY_TASKS': True}): + # Simulate import error by patching the import + with patch.dict('sys.modules', {'mod_ci.tasks': None}): + with self.app.app_context(): + # Should not raise exception + trigger_test_tasks([1, 2], 'fake_token') + + +class TestAddTestEntryReturnsIds(BaseTestCase): + """Test that add_test_entry returns test IDs correctly.""" + + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.safe_db_commit') + @mock.patch('mod_ci.controllers.Fork') + @mock.patch('mod_ci.controllers.Test') + def test_add_test_entry_returns_test_ids(self, mock_test, mock_fork, mock_commit, mock_g): + """Test that add_test_entry returns list of created test IDs.""" + from mod_ci.controllers import add_test_entry + + # Setup mocks + mock_g.github = {'repository_owner': 'test', 'repository': 'test'} + mock_fork_obj = MagicMock() + mock_fork_obj.id = 1 + mock_fork.query.filter.return_value.first.return_value = mock_fork_obj + mock_commit.return_value = True + + # Setup mock Test objects to return IDs + mock_linux_test = MagicMock() + mock_linux_test.id = 100 + mock_windows_test = MagicMock() + mock_windows_test.id = 101 + mock_test.side_effect = [mock_linux_test, mock_windows_test] + + mock_db = MagicMock() + + # Call the function with a valid commit hash (40 hex chars) + test_ids = add_test_entry(mock_db, 'a' * 40, TestType.commit) + + # Verify we got a list with 2 IDs + self.assertIsInstance(test_ids, list) + self.assertEqual(len(test_ids), 2) + self.assertEqual(test_ids[0], 100) + self.assertEqual(test_ids[1], 101) + + def test_add_test_entry_invalid_commit_returns_empty(self): + """Test that invalid commit hash returns empty list.""" + from mod_ci.controllers import add_test_entry + + mock_db = MagicMock() + + # Call with invalid commit hash + test_ids = add_test_entry(mock_db, 'invalid_hash', TestType.commit) + + # Verify empty list for invalid commit + self.assertIsInstance(test_ids, list) + self.assertEqual(len(test_ids), 0) + + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.safe_db_commit') + @mock.patch('mod_ci.controllers.Fork') + @mock.patch('mod_ci.controllers.Test') + def test_add_test_entry_db_failure_returns_empty(self, mock_test, mock_fork, mock_commit, mock_g): + """Test that db commit failure returns empty list.""" + from mod_ci.controllers import add_test_entry + + # Setup mocks + mock_g.github = {'repository_owner': 'test', 'repository': 'test'} + mock_fork_obj = MagicMock() + mock_fork_obj.id = 1 + mock_fork.query.filter.return_value.first.return_value = mock_fork_obj + mock_commit.return_value = False # Simulate commit failure + + mock_linux_test = MagicMock() + mock_linux_test.id = 100 + mock_windows_test = MagicMock() + mock_windows_test.id = 101 + mock_test.side_effect = [mock_linux_test, mock_windows_test] + + mock_db = MagicMock() + + # Call the function + test_ids = add_test_entry(mock_db, 'a' * 40, TestType.commit) + + # Verify empty list when commit fails + self.assertIsInstance(test_ids, list) + self.assertEqual(len(test_ids), 0) + + +if __name__ == '__main__': + unittest.main() From a0920b099317e170355212d9d444f1df54e81b05 Mon Sep 17 00:00:00 2001 From: Carlos Date: Fri, 26 Dec 2025 10:53:11 +0100 Subject: [PATCH 2/2] fix: Fix isort import ordering and remove unused import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove unused 'shared_task' import from celery - Fix import ordering to satisfy isort checks 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- mod_ci/tasks.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/mod_ci/tasks.py b/mod_ci/tasks.py index d1c140aa..2b9a079c 100644 --- a/mod_ci/tasks.py +++ b/mod_ci/tasks.py @@ -1,6 +1,5 @@ """Celery tasks for CI platform operations.""" -from celery import shared_task from celery.exceptions import SoftTimeLimitExceeded from celery.utils.log import get_task_logger from github import Auth, Github, GithubException @@ -31,16 +30,12 @@ def start_test_task(self, test_id: int, bot_token: str): :return: Dict with status and message """ # Import inside task to avoid circular imports and ensure fresh Flask context - from run import app, config - from database import create_session - from mod_ci.controllers import ( - get_compute_service_object, - mark_test_failed, - start_test, - ) + from mod_ci.controllers import (get_compute_service_object, + mark_test_failed, start_test) from mod_ci.models import GcpInstance from mod_test.models import Test + from run import app, config with app.app_context(): db = create_session(config['DATABASE_URI']) @@ -123,11 +118,12 @@ def check_expired_instances_task(self): :return: Dict with status and message """ - from run import app, config + from github import Auth, Github from database import create_session - from github import Auth, Github - from mod_ci.controllers import delete_expired_instances, get_compute_service_object + from mod_ci.controllers import (delete_expired_instances, + get_compute_service_object) + from run import app, config with app.app_context(): db = create_session(config['DATABASE_URI']) @@ -178,11 +174,10 @@ def process_pending_tests_task(self): :return: Dict with status and count of queued tests """ - from run import app, config - from database import create_session from mod_ci.models import GcpInstance, MaintenanceMode from mod_test.models import Test, TestPlatform, TestProgress, TestStatus + from run import app, config with app.app_context(): db = create_session(config['DATABASE_URI'])