Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions celery_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Celery application factory and configuration for Sample Platform."""

import os

from celery import Celery
from celery.schedules import crontab

# Load configuration - use empty dict for testing when config.py doesn't exist
try:
from config_parser import parse_config
config = parse_config('config')
except Exception:
# In test environment, config.py may not exist
config = {}


def make_celery(app=None):
"""
Create a Celery application configured for the Sample Platform.

:param app: Optional Flask application for context binding
:return: Configured Celery application
"""
celery_app = Celery(
'sample_platform',
broker=config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
include=['mod_ci.tasks']
)

# Apply configuration from config.py
celery_app.conf.update(
task_serializer=config.get('CELERY_TASK_SERIALIZER', 'json'),
result_serializer=config.get('CELERY_RESULT_SERIALIZER', 'json'),
accept_content=config.get('CELERY_ACCEPT_CONTENT', ['json']),
timezone=config.get('CELERY_TIMEZONE', 'UTC'),
enable_utc=config.get('CELERY_ENABLE_UTC', True),
task_acks_late=config.get('CELERY_TASK_ACKS_LATE', True),
worker_prefetch_multiplier=config.get('CELERY_WORKER_PREFETCH_MULTIPLIER', 1),
task_reject_on_worker_lost=config.get('CELERY_TASK_REJECT_ON_WORKER_LOST', True),
task_soft_time_limit=config.get('CELERY_TASK_SOFT_TIME_LIMIT', 3600),
task_time_limit=config.get('CELERY_TASK_TIME_LIMIT', 3900),
)

# Beat schedule for periodic tasks
celery_app.conf.beat_schedule = {
'check-expired-instances-every-5-minutes': {
'task': 'mod_ci.tasks.check_expired_instances_task',
'schedule': crontab(minute='*/5'),
'options': {'queue': 'maintenance'}
},
'process-pending-tests-every-minute': {
'task': 'mod_ci.tasks.process_pending_tests_task',
'schedule': crontab(minute='*'),
'options': {'queue': 'default'}
},
}

# Queue routing
celery_app.conf.task_routes = {
'mod_ci.tasks.start_test_task': {'queue': 'test_execution'},
'mod_ci.tasks.check_expired_instances_task': {'queue': 'maintenance'},
'mod_ci.tasks.process_pending_tests_task': {'queue': 'default'},
}

# If Flask app is provided, bind tasks to its context
if app is not None:
class ContextTask(celery_app.Task):
"""Task base class that maintains Flask application context."""

def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

celery_app.Task = ContextTask

return celery_app


# Create the default celery instance (used by worker when started standalone)
celery = make_celery()
18 changes: 18 additions & 0 deletions config_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,21 @@
GCP_INSTANCE_MAX_RUNTIME = 120 # In minutes
GCS_BUCKET_NAME = 'spdev'
GCS_SIGNED_URL_EXPIRY_LIMIT = 720 # In minutes


# CELERY TASK QUEUE CONFIG
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'UTC'
CELERY_ENABLE_UTC = True
CELERY_TASK_ACKS_LATE = True # Task acknowledged after completion
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # One task at a time per worker
CELERY_TASK_REJECT_ON_WORKER_LOST = True # Requeue tasks if worker dies
CELERY_TASK_SOFT_TIME_LIMIT = 3600 # 1 hour soft limit
CELERY_TASK_TIME_LIMIT = 3900 # 1 hour 5 minutes hard limit

# Feature flag for gradual migration (set to True to enable Celery, False for cron fallback)
USE_CELERY_TASKS = False
23 changes: 23 additions & 0 deletions install/celery-beat.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[Unit]
Description=Sample Platform Celery Beat Scheduler
After=network.target redis.service celery-worker.service
Requires=redis.service

[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/var/www/sample-platform
Environment="PATH=/var/www/sample-platform/venv/bin"
ExecStart=/var/www/sample-platform/venv/bin/celery \
-A celery_app.celery beat \
--pidfile=/var/run/celery/beat.pid \
--logfile=/var/www/sample-platform/logs/celery/beat.log \
--loglevel=INFO \
--schedule=/var/www/sample-platform/celerybeat-schedule
RuntimeDirectory=celery
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
30 changes: 30 additions & 0 deletions install/celery-worker.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[Unit]
Description=Sample Platform Celery Worker
After=network.target redis.service mysql.service
Requires=redis.service

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/var/www/sample-platform
Environment="PATH=/var/www/sample-platform/venv/bin"
ExecStart=/var/www/sample-platform/venv/bin/celery \
-A celery_app.celery multi start worker \
--pidfile=/var/run/celery/%n.pid \
--logfile=/var/www/sample-platform/logs/celery/%n%I.log \
--loglevel=INFO \
-Q default,test_execution,maintenance \
--concurrency=2
ExecStop=/var/www/sample-platform/venv/bin/celery \
-A celery_app.celery multi stopwait worker \
--pidfile=/var/run/celery/%n.pid
ExecReload=/var/www/sample-platform/venv/bin/celery \
-A celery_app.celery multi restart worker \
--pidfile=/var/run/celery/%n.pid
RuntimeDirectory=celery
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
101 changes: 101 additions & 0 deletions install/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,107 @@ The file `mod_ci/cron.py` is to be run in periodic intervals. To setup a cron jo
```
Change the `/var/www/sample-plaform` directory, if you have installed the platform in a different directory.

## Optional: Setting up Celery Task Queue

As an alternative to cron-based polling, you can use Celery with Redis for event-driven test processing. This provides faster test execution, better retry handling, and parallel processing.

### Installing Redis

```bash
sudo apt update
sudo apt install redis-server

# Configure Redis
sudo nano /etc/redis/redis.conf
# Set: supervised systemd
# Set: bind 127.0.0.1 ::1

# Enable and start Redis
sudo systemctl enable redis-server
sudo systemctl start redis-server

# Verify Redis is running
redis-cli ping # Should return PONG
```

### Configuring Celery

Add the following to your `config.py`:

```python
# Celery Configuration
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
USE_CELERY_TASKS = True # Set to False to use cron instead
```

### Installing Celery Services

```bash
# Create log directory
sudo mkdir -p /var/www/sample-platform/logs/celery
sudo chown -R www-data:www-data /var/www/sample-platform/logs/celery

# Create runtime directory
sudo mkdir -p /var/run/celery
sudo chown www-data:www-data /var/run/celery

# Install systemd services
sudo cp /var/www/sample-platform/install/celery-worker.service /etc/systemd/system/
sudo cp /var/www/sample-platform/install/celery-beat.service /etc/systemd/system/

# Reload systemd and enable services
sudo systemctl daemon-reload
sudo systemctl enable celery-worker celery-beat

# Start the services
sudo systemctl start celery-worker
sudo systemctl start celery-beat
```

### Monitoring Celery

```bash
# Check worker status
celery -A celery_app.celery inspect active

# Check queue depth
redis-cli LLEN celery

# View logs
tail -f /var/www/sample-platform/logs/celery/*.log

# Optional: Install Flower for web-based monitoring
pip install flower
celery -A celery_app.celery flower --port=5555
```

### Gradual Migration

For a safe transition from cron to Celery:

1. **Stage 1**: Set `USE_CELERY_TASKS = False` and keep cron running. Start Celery services and verify they work correctly in logs.

2. **Stage 2**: Set `USE_CELERY_TASKS = True`. Reduce cron frequency to every 30 minutes as a fallback.

3. **Stage 3**: Disable cron entirely once you're confident Celery is working correctly.

### Rollback to Cron

If you need to disable Celery:

```bash
# Stop Celery services
sudo systemctl stop celery-beat celery-worker

# Edit config.py and set USE_CELERY_TASKS = False

# Restart platform
sudo systemctl restart platform

# Ensure cron is running every 10 minutes
```

## GCS configuration to serve file downloads using Signed URLs

To serve file downloads directly from the private GCS bucket, Signed download URLs have been used.
Expand Down
61 changes: 55 additions & 6 deletions mod_ci/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ def save_xml_to_file(xml_node, folder_name, file_name) -> None:
)


def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None:
def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list:
"""
Add test details entry into Test model for each platform.

Expand All @@ -1140,16 +1140,16 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None:
:type branch: str
:param pr_nr: Pull Request number, if applicable.
:type pr_nr: int
:return: Nothing
:rtype: None
:return: List of created test IDs
:rtype: list
"""
from run import log

# Validate commit hash before creating test entries
# Based on issue identified by NexionisJake in PR #937
if not is_valid_commit_hash(commit):
log.error(f"Invalid commit hash '{commit}' - skipping test entry creation")
return
return []

fork_url = f"%/{g.github['repository_owner']}/{g.github['repository']}.git"
fork = Fork.query.filter(Fork.github.like(fork_url)).first()
Expand All @@ -1158,12 +1158,59 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None:
log.debug('pull request test type detected')
branch = "pull_request"

test_ids = []
linux_test = Test(TestPlatform.linux, test_type, fork.id, branch, commit, pr_nr)
db.add(linux_test)
db.flush() # Get ID before commit
test_ids.append(linux_test.id)

windows_test = Test(TestPlatform.windows, test_type, fork.id, branch, commit, pr_nr)
db.add(windows_test)
db.flush() # Get ID before commit
test_ids.append(windows_test.id)

if not safe_db_commit(db, f"adding test entries for commit {commit[:7]}"):
log.error(f"Failed to add test entries for commit {commit}")
return []

return test_ids


def trigger_test_tasks(test_ids: list, bot_token: str) -> None:
"""
Optionally trigger Celery tasks for newly created tests.

Only triggers if USE_CELERY_TASKS is True in config.
Falls back to waiting for cron/periodic task otherwise.

:param test_ids: List of Test IDs to queue
:type test_ids: list
:param bot_token: GitHub bot token
:type bot_token: str
"""
from run import config, log

if not config.get('USE_CELERY_TASKS', False):
log.debug("Celery tasks disabled, tests will be picked up by cron/periodic task")
return

if not test_ids:
return

try:
from mod_ci.tasks import start_test_task

for test_id in test_ids:
start_test_task.apply_async(
args=[test_id, bot_token],
queue='test_execution',
countdown=30 # 30 second delay for artifact upload to complete
)
log.info(f"Queued test {test_id} via Celery")
except ImportError:
log.warning("Celery tasks module not available, falling back to cron")
except Exception as e:
log.error(f"Failed to queue Celery tasks: {e}, tests will be picked up by cron")


def schedule_test(gh_commit: Commit.Commit) -> None:
Expand Down Expand Up @@ -1429,7 +1476,8 @@ def start_ci():
last_commit.value = ref.object.sha
if not safe_db_commit(g.db, "updating last commit"):
return 'ERROR'
add_test_entry(g.db, commit_hash, TestType.commit)
test_ids = add_test_entry(g.db, commit_hash, TestType.commit)
trigger_test_tasks(test_ids, g.github['bot_token'])
else:
g.log.warning('Unknown push type! Dumping payload for analysis')
g.log.warning(payload)
Expand Down Expand Up @@ -1459,7 +1507,8 @@ def start_ci():
try:
pr = retry_with_backoff(lambda: repository.get_pull(number=pr_nr))
if pr.mergeable is not False:
add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr)
test_ids = add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr)
trigger_test_tasks(test_ids, g.github['bot_token'])
except GithubException as e:
g.log.error(f"Failed to get PR {pr_nr} after retries: {e}")

Expand Down
Loading