Skip to content

Conversation

@manavgup
Copy link
Owner

Add a comprehensive agent hook system for the RAG search pipeline with:

Database Layer:

  • AgentConfig model for storing agent configurations
  • CollectionAgent model for collection-agent associations with overrides
  • JSONB columns for flexible default_config and config_override

Agent Execution Service:

  • CircuitBreaker pattern for failure isolation (closed/open/half_open states)
  • Sequential execution for pre-search and post-search stages
  • Parallel execution for response stage agents
  • Dynamic handler loading via importlib
  • Timeout and retry support per agent

Pipeline Integration:

  • PreSearchAgentStage: Query expansion, language detection, intent classification
  • PostSearchAgentStage: Re-ranking, deduplication, PII redaction
  • ResponseAgentStage: PDF, PowerPoint, chart generation
  • Integrated into SearchService pipeline at stages 3, 6, and 9

API Endpoints:

  • CRUD operations for agent configurations
  • Collection-agent association management
  • Batch priority updates
  • Filtering by stage, status, and agent type

Schemas:

  • AgentStage, AgentConfigStatus, AgentExecutionStatus enums
  • Input/Output/Update schemas for configuration
  • AgentContext, AgentResult, AgentArtifact execution schemas
  • PipelineMetadata with agent execution summary

Tests:

  • Unit tests for CircuitBreaker with state transitions
  • Unit tests for AgentExecutorService
  • Schema validation tests for all Pydantic models

Closes #697

Add a comprehensive agent hook system for the RAG search pipeline with:

**Database Layer:**
- AgentConfig model for storing agent configurations
- CollectionAgent model for collection-agent associations with overrides
- JSONB columns for flexible default_config and config_override

**Agent Execution Service:**
- CircuitBreaker pattern for failure isolation (closed/open/half_open states)
- Sequential execution for pre-search and post-search stages
- Parallel execution for response stage agents
- Dynamic handler loading via importlib
- Timeout and retry support per agent

**Pipeline Integration:**
- PreSearchAgentStage: Query expansion, language detection, intent classification
- PostSearchAgentStage: Re-ranking, deduplication, PII redaction
- ResponseAgentStage: PDF, PowerPoint, chart generation
- Integrated into SearchService pipeline at stages 3, 6, and 9

**API Endpoints:**
- CRUD operations for agent configurations
- Collection-agent association management
- Batch priority updates
- Filtering by stage, status, and agent type

**Schemas:**
- AgentStage, AgentConfigStatus, AgentExecutionStatus enums
- Input/Output/Update schemas for configuration
- AgentContext, AgentResult, AgentArtifact execution schemas
- PipelineMetadata with agent execution summary

**Tests:**
- Unit tests for CircuitBreaker with state transitions
- Unit tests for AgentExecutorService
- Schema validation tests for all Pydantic models

Closes #697
@github-actions
Copy link
Contributor

🚀 Development Environment Options

This repository supports Dev Containers for a consistent development environment.

Option 1: GitHub Codespaces (Recommended)

Create a cloud-based development environment:

  1. Click the green Code button above
  2. Select the Codespaces tab
  3. Click Create codespace on claude/implement-issue-697-01NYz1XuVcfwZKHY7F6Hfjyz
  4. Wait 2-3 minutes for environment setup
  5. Start coding with all tools pre-configured!

Option 2: VS Code Dev Containers (Local)

Use Dev Containers on your local machine:

  1. Install Docker Desktop
  2. Install VS Code
  3. Install the Dev Containers extension
  4. Clone this PR branch locally
  5. Open in VS Code and click "Reopen in Container" when prompted

Option 3: Traditional Local Setup

Set up the development environment manually:

# Clone the repository
git clone https://github.com/manavgup/rag_modulo.git
cd rag_modulo
git checkout claude/implement-issue-697-01NYz1XuVcfwZKHY7F6Hfjyz

# Initialize development environment
make dev-init
make dev-build
make dev-up
make dev-validate

Available Commands

Once in your development environment:

make help           # Show all available commands
make dev-validate   # Validate environment setup
make test-atomic    # Run atomic tests
make test-unit      # Run unit tests
make lint          # Run linting

Services Available

When running make dev-up:


This automated message helps reviewers quickly set up the development environment.

@github-actions
Copy link
Contributor

Code Review - PR #702: Implement RAG Modulo Agent Hook System (Issue #697)

Summary

This PR introduces a comprehensive agent hook system for the RAG search pipeline with support for 3 execution stages (pre-search, post-search, and response). The implementation is well-structured with proper separation of concerns and follows the repository's patterns.


Strengths

1. Architecture & Design

  • Clean separation of concerns: Models, repositories, services, and routers properly separated
  • Circuit breaker pattern: Excellent implementation for failure isolation with proper state management (closed → open → half_open)
  • Extensible design: BaseAgentHandler abstract class allows easy addition of new agent types
  • Pipeline integration: Seamlessly integrated into SearchService at appropriate stages

2. Database Layer

  • Well-designed schema: AgentConfig and CollectionAgent models with proper relationships
  • Flexible configuration: JSONB columns for default_config and config_override provide good flexibility
  • Proper indexing: Strategic indexes on stage, agent_type, status, and composite indexes for performance
  • Cascade behavior: Appropriate CASCADE and SET NULL foreign key constraints

3. Code Quality

  • Comprehensive documentation: Excellent docstrings and inline comments throughout
  • Type hints: Proper type annotations for Python 3.12+
  • Error handling: Graceful degradation - agent failures don't break the search pipeline
  • Logging: Good use of structured logging with contextual information

4. Testing

  • Good test coverage: 454 unit tests for agent executor service
  • Schema validation: 426 tests for Pydantic models
  • Circuit breaker testing: Comprehensive state transition tests
  • Test organization: Well-structured with clear test categories

⚠️ Issues & Concerns

1. CRITICAL: Missing Database Migration

Severity: HIGH 🔴

The PR introduces new database tables (agent_configs and collection_agents) but does not include an Alembic migration file. This will cause deployment failures.

Required Action:

# Generate migration
poetry run alembic revision --autogenerate -m "Add agent_config and collection_agent tables for issue 697"

# Review and edit the generated migration
# Then commit it to the PR

Impact: Without this migration, the application will fail to start in any environment where the tables don't exist.


2. Security Concerns

a. Dynamic Code Loading Risk (Line 291 in agent_executor_service.py)

Severity: MEDIUM 🟡

module = importlib.import_module(agent_config.handler_module)
handler_class = getattr(module, agent_config.handler_class)

Issue: Dynamic module loading without validation creates potential for code injection if handler_module or handler_class values are controlled by untrusted users.

Recommendation:

# Add allowlist validation
ALLOWED_HANDLER_MODULES = [
    'rag_solution.services.agents.query_expander',
    'rag_solution.services.agents.reranker',
    # ... other trusted modules
]

if agent_config.handler_module not in ALLOWED_HANDLER_MODULES:
    logger.error("Untrusted handler module: %s", agent_config.handler_module)
    return None
b. Authentication in Router (Line 63-89 in agent_config_router.py)

Severity: LOW 🟢

The get_current_user_id() function properly validates authentication, but consider using a dependency to make it more reusable:

from fastapi import Depends
from rag_solution.auth.dependencies import get_current_user

# Then in endpoints:
async def create_agent_config(
    config_input: AgentConfigInput,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db),
) -> AgentConfigOutput:

3. Performance Considerations

a. Handler Caching (Line 247 in agent_executor_service.py)

Status: GOOD

The handler class caching (self._handler_cache) is well-implemented and will prevent repeated module imports.

b. Parallel Execution (Line 534 in agent_executor_service.py)

Status: GOOD

Response agents execute in parallel using asyncio.gather, which is appropriate since they're independent.

c. Database Query Optimization (Line 263 in agent_executor_service.py)

Potential Issue: The _get_collection_agents query could benefit from eager loading:

return (
    self.db.query(CollectionAgent)
    .join(AgentConfig)
    .options(joinedload(CollectionAgent.agent_config))  # Add this
    .filter(...)
    .order_by(CollectionAgent.priority)
    .all()
)

This prevents N+1 query issues when accessing collection_agent.agent_config.


4. Code Quality Issues

a. Broad Exception Catching (Lines 392, 621, 645, 669)

Severity: LOW 🟡

Multiple uses of except Exception with broad-except disabled via pylint comments. While justified for pipeline resilience, consider:

# Be more specific where possible
except (ImportError, AttributeError, TimeoutError, RuntimeError) as e:
    logger.error("Agent execution failed: %s", e)
b. Magic Numbers (Line 242 in agent_executor_service.py)
self.circuit_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=60.0,
    half_open_max_calls=3,
)

Consider making these configurable via Settings:

self.circuit_breaker = CircuitBreaker(
    failure_threshold=settings.AGENT_CIRCUIT_FAILURE_THRESHOLD,
    recovery_timeout=settings.AGENT_CIRCUIT_RECOVERY_TIMEOUT,
    half_open_max_calls=settings.AGENT_CIRCUIT_HALF_OPEN_CALLS,
)

5. Integration Issues

a. Missing SearchContext Fields (Line 362 in search_context.py)

The PR adds agent_execution_summary and agent_artifacts to SearchContext but these may need type hints:

from rag_solution.schemas.agent_config_schema import AgentExecutionSummary, AgentArtifact

agent_execution_summary: AgentExecutionSummary | None = None
agent_artifacts: list[AgentArtifact] = field(default_factory=list)
b. SearchService Integration (search_service.py)

The integration looks clean, but verify that the async execution doesn't block the main search flow unnecessarily.


6. Testing Gaps

a. Missing Integration Tests

While unit tests are comprehensive, there are no integration tests that verify:

  • End-to-end agent execution through the actual search pipeline
  • Database operations with real PostgreSQL
  • Multi-stage agent coordination

Recommendation: Add integration tests in tests/integration/test_agent_execution_integration.py.

b. Missing API Tests

No tests for the router endpoints (/api/agent-configs, /api/collections/{id}/agents).

Recommendation: Add API tests in tests/api/test_agent_config_router.py.


📝 Additional Recommendations

1. Documentation

  • Add architectural decision record (ADR) for the agent hook system
  • Update API documentation with agent config examples
  • Add usage guide: docs/features/agent-hooks.md

2. Configuration

Consider adding agent configuration presets for common use cases:

# System-level agent configs to seed at startup
SYSTEM_AGENT_CONFIGS = [
    {
        "name": "Query Expander",
        "agent_type": "query_expander",
        "stage": "pre_search",
        "handler_module": "rag_solution.services.agents.query_expander",
        "handler_class": "QueryExpanderAgent",
    },
    # ... more presets
]

3. Monitoring

Add metrics for agent execution:

  • Success/failure rates per agent
  • Execution time percentiles
  • Circuit breaker state changes
  • Agent configuration changes (audit log)

4. Rate Limiting

Consider adding rate limiting for agent executions to prevent resource exhaustion from aggressive agent configurations.


🎯 Action Items

Before Merge:

  1. REQUIRED: Create Alembic migration for new tables
  2. REQUIRED: Add integration tests for agent execution
  3. REQUIRED: Add API tests for agent config endpoints
  4. 🔧 RECOMMENDED: Add handler module allowlist validation
  5. 🔧 RECOMMENDED: Add eager loading to prevent N+1 queries

Post-Merge (Future Enhancements):

  • Add monitoring and metrics
  • Create system agent config presets
  • Add rate limiting
  • Write comprehensive documentation

📊 Test Coverage

  • Unit Tests: ✅ Excellent (454 + 426 tests)
  • Integration Tests: ❌ Missing
  • API Tests: ❌ Missing
  • E2E Tests: ❌ Missing

Estimated Coverage: ~40% (unit tests only, missing integration and API layers)
Target Coverage: 90%+


🏆 Overall Assessment

Code Quality: ⭐⭐⭐⭐ (4/5)
Architecture: ⭐⭐⭐⭐⭐ (5/5)
Testing: ⭐⭐⭐ (3/5)
Documentation: ⭐⭐⭐⭐ (4/5)
Security: ⭐⭐⭐ (3/5)

Recommendation: APPROVE WITH CHANGES

This is a well-designed feature with solid architecture and good code quality. The main blockers are:

  1. Missing database migration (critical)
  2. Missing integration/API tests (important)
  3. Dynamic code loading security concern (important)

Once these are addressed, this PR is ready to merge. The agent hook system provides excellent extensibility for the RAG pipeline and follows the repository's best practices.


📚 References

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: Implement SearchService 3-stage agent execution hooks

3 participants