Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
9df1002
feat(messages): add support for injecting cancelled tool results in m…
danyalxahid-askui Dec 16, 2025
f7fc4c3
fix(messages): update tool result content type in MessageService
danyalxahid-askui Dec 16, 2025
6e74dc9
feat(scheduled_jobs): implement scheduled job management with APSched…
danyalxahid-askui Dec 24, 2025
58a42a8
Merge branch 'main' into CL-1935-scheduling-workflows-which-run-on-a-…
danyalxahid-askui Dec 24, 2025
3bc7230
refactor(scheduled_jobs): remove unused query parameter from list_sch…
danyalxahid-askui Dec 24, 2025
3c2a95e
chore: update .gitignore and enhance SQLite configuration
danyalxahid-askui Dec 30, 2025
bd513ca
refactor(scheduled_jobs): update ScheduledJob creation to use params
danyalxahid-askui Dec 30, 2025
8d01503
feat(scheduled_jobs): add name field to MessageRerunnerData and updat…
danyalxahid-askui Dec 30, 2025
83e743f
feat(scheduled_jobs): enhance job execution with ASKUI token management
danyalxahid-askui Dec 30, 2025
2c3a940
feat(scheduled_jobs): add scheduler database support and improve SQLi…
danyalxahid-askui Dec 30, 2025
7137268
refactor(scheduled_jobs): update job execution logic and improve erro…
danyalxahid-askui Dec 30, 2025
b5b5872
refactor: remove functools.cache decorator from create_api_client fun…
danyalxahid-askui Dec 30, 2025
6560bbe
Merge branch 'CL-2013-delivery-chat-unauthorized-chat-api' into CL-19…
danyalxahid-askui Dec 30, 2025
93b1f5f
refactor(scheduled_jobs): enhance job execution and token management
danyalxahid-askui Dec 30, 2025
927e641
refactor(scheduled_jobs): update execute_job to return ScheduledJobEx…
danyalxahid-askui Jan 2, 2026
e015c8b
refactor(settings, engine): optimize SQLite configuration and update …
danyalxahid-askui Jan 5, 2026
61eed2e
refactor(scheduled_jobs): sort scheduled jobs by next fire time
danyalxahid-askui Jan 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ reports/
.DS_Store
/chat
/askui_chat.db
/askui_chat.db-shm
/askui_chat.db-wal
.cache/

bom.json
34 changes: 33 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"aiofiles>=24.1.0",
"anyio==4.10.0", # We need to pin this version otherwise listing mcp tools using fastmcp within runner fails
"sqlalchemy[mypy]>=2.0.44",
"apscheduler==4.0.0a6",
]
requires-python = ">=3.10,<3.14"
readme = "README.md"
Expand Down
13 changes: 13 additions & 0 deletions src/askui/chat/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from askui.chat.api.mcp_servers.utility import mcp as utility_mcp
from askui.chat.api.messages.router import router as messages_router
from askui.chat.api.runs.router import router as runs_router
from askui.chat.api.scheduled_jobs.router import router as scheduled_jobs_router
from askui.chat.api.scheduled_jobs.scheduler import shutdown_scheduler, start_scheduler
from askui.chat.api.threads.router import router as threads_router
from askui.chat.api.workflows.router import router as workflows_router
from askui.chat.migrations.runner import run_migrations
Expand Down Expand Up @@ -49,7 +51,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001
session = next(get_session())
mcp_config_service = get_mcp_config_service(session=session, settings=settings)
mcp_config_service.seed()

# Start the scheduler for scheduled jobs
logger.info("Starting scheduled job scheduler...")
await start_scheduler()

yield

# Shutdown scheduler
logger.info("Shutting down scheduled job scheduler...")
await shutdown_scheduler()

logger.info("Disconnecting all MCP clients...")
await get_mcp_client_manager_manager(mcp_config_service).disconnect_all(force=True)

Expand All @@ -70,6 +82,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001
v1_router.include_router(mcp_configs_router)
v1_router.include_router(files_router)
v1_router.include_router(workflows_router)
v1_router.include_router(scheduled_jobs_router)
v1_router.include_router(health_router)
app.include_router(v1_router)

Expand Down
38 changes: 29 additions & 9 deletions src/askui/chat/api/db/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,40 @@
from sqlite3 import Connection as SQLite3Connection
from typing import Any

from sqlalchemy import Engine, create_engine, event
from sqlalchemy import create_engine, event

from askui.chat.api.dependencies import get_settings

logger = logging.getLogger(__name__)
_logger = logging.getLogger(__name__)

settings = get_settings()
connect_args = {"check_same_thread": False}
echo = logger.isEnabledFor(logging.DEBUG)
engine = create_engine(settings.db.url, connect_args=connect_args, echo=echo)
_settings = get_settings()
_connect_args = {"check_same_thread": False}
_echo = _logger.isEnabledFor(logging.DEBUG)

# Create engine with optimized settings
engine = create_engine(
_settings.db.url,
connect_args=_connect_args,
echo=_echo,
)

@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001

@event.listens_for(engine, "connect")
def _set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001
"""
Configure SQLite pragmas for optimal web application performance.

Applied on each new connection:
- foreign_keys=ON: Enable foreign key constraint enforcement
- journal_mode=WAL: Write-Ahead Logging for better concurrency (readers don't block writers)
- synchronous=NORMAL: Sync every 1000 pages instead of every write (faster, still durable with WAL)
- busy_timeout=30000: Wait up to 30 seconds for locks instead of failing immediately
"""
cursor = dbapi_conn.cursor()
cursor.execute("PRAGMA foreign_keys=ON")

cursor.execute("PRAGMA foreign_keys = ON")
cursor.execute("PRAGMA journal_mode = WAL")
cursor.execute("PRAGMA synchronous = NORMAL")
cursor.execute("PRAGMA busy_timeout = 30000")

cursor.close()
1 change: 1 addition & 0 deletions src/askui/chat/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
FileId = Annotated[str, IdField("file")]
MessageId = Annotated[str, IdField("msg")]
RunId = Annotated[str, IdField("run")]
ScheduledJobId = Annotated[str, IdField("schedjob")]
ThreadId = Annotated[str, IdField("thread")]
WorkspaceId = UUID4

Expand Down
69 changes: 65 additions & 4 deletions src/askui/chat/api/runs/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
from fastapi import Depends
from pydantic import UUID4
from sqlalchemy.orm import Session

from askui.chat.api.assistants.dependencies import AssistantServiceDep
from askui.chat.api.assistants.dependencies import (
AssistantServiceDep,
get_assistant_service,
)
from askui.chat.api.assistants.service import AssistantService
from askui.chat.api.db.session import SessionDep
from askui.chat.api.dependencies import SettingsDep
from askui.chat.api.mcp_clients.dependencies import McpClientManagerManagerDep
from askui.chat.api.dependencies import SettingsDep, get_settings
from askui.chat.api.files.dependencies import get_file_service
from askui.chat.api.mcp_clients.dependencies import (
McpClientManagerManagerDep,
get_mcp_client_manager_manager,
)
from askui.chat.api.mcp_clients.manager import McpClientManagerManager
from askui.chat.api.mcp_configs.dependencies import get_mcp_config_service
from askui.chat.api.messages.chat_history_manager import ChatHistoryManager
from askui.chat.api.messages.dependencies import ChatHistoryManagerDep
from askui.chat.api.messages.dependencies import (
ChatHistoryManagerDep,
get_chat_history_manager,
get_message_service,
get_message_translator,
get_truncation_strategy_factory,
)
from askui.chat.api.runs.models import RunListQuery
from askui.chat.api.settings import Settings

Expand All @@ -23,6 +39,12 @@ def get_runs_service(
mcp_client_manager_manager: McpClientManagerManager = McpClientManagerManagerDep,
settings: Settings = SettingsDep,
) -> RunService:
"""
Get RunService instance for FastAPI dependency injection.

This function is designed for use with FastAPI's DI system.
For manual construction outside of a request context, use `create_run_service()`.
"""
return RunService(
session=session,
assistant_service=assistant_service,
Expand All @@ -33,3 +55,42 @@ def get_runs_service(


RunServiceDep = Depends(get_runs_service)


def create_run_service(session: Session, workspace_id: UUID4) -> RunService:
"""
Create a RunService with all required dependencies manually.

Use this function when you need a `RunService` outside of FastAPI's
dependency injection context (e.g. APScheduler callbacks).

Args:
session (Session): Database session.
workspace_id (UUID4): The workspace ID for the run execution.

Returns:
RunService: Configured run service.
"""
settings = get_settings()

assistant_service = get_assistant_service(session)
file_service = get_file_service(session, settings)
mcp_config_service = get_mcp_config_service(session, settings)
mcp_client_manager_manager = get_mcp_client_manager_manager(mcp_config_service)

message_service = get_message_service(session)
message_translator = get_message_translator(file_service, workspace_id)
truncation_strategy_factory = get_truncation_strategy_factory()
chat_history_manager = get_chat_history_manager(
message_service,
message_translator,
truncation_strategy_factory,
)

return RunService(
session=session,
assistant_service=assistant_service,
mcp_client_manager_manager=mcp_client_manager_manager,
chat_history_manager=chat_history_manager,
settings=settings,
)
Empty file.
14 changes: 14 additions & 0 deletions src/askui/chat/api/scheduled_jobs/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""FastAPI dependencies for scheduled jobs."""

from fastapi import Depends

from askui.chat.api.scheduled_jobs.scheduler import scheduler
from askui.chat.api.scheduled_jobs.service import ScheduledJobService


def get_scheduled_job_service() -> ScheduledJobService:
"""Get ScheduledJobService instance with the singleton scheduler."""
return ScheduledJobService(scheduler=scheduler)


ScheduledJobServiceDep = Depends(get_scheduled_job_service)
122 changes: 122 additions & 0 deletions src/askui/chat/api/scheduled_jobs/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Executor for scheduled job callbacks."""

import base64
import logging
import os
from typing import Any

from sqlalchemy.orm import Session

from askui.chat.api.db.engine import engine
from askui.chat.api.messages.dependencies import get_message_service
from askui.chat.api.runs.dependencies import create_run_service
from askui.chat.api.runs.models import RunCreate
from askui.chat.api.scheduled_jobs.models import (
MessageRerunnerData,
ScheduledJobExecutionResult,
scheduled_job_data_adapter,
)

_logger = logging.getLogger(__name__)


async def execute_job(
**_kwargs: Any,
) -> ScheduledJobExecutionResult:
"""
APScheduler callback that creates fresh services and executes the job.

This function is called by APScheduler when a job fires. It creates fresh
database sessions and service instances to avoid stale connections.

Args:
**_kwargs (Any): Keyword arguments containing job data.

Returns:
ScheduledJobExecutionResult: The result containing job data and optional error.
"""
# Validates and returns the correct concrete type based on the `type` discriminator
job_data = scheduled_job_data_adapter.validate_python(_kwargs)

_logger.info(
"Executing scheduled job: workspace=%s, thread=%s",
job_data.workspace_id,
job_data.thread_id,
)

error: str | None = None

try:
# future proofing of new job types
if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance]
# Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars
_previous_authorization = os.environ.get("ASKUI__AUTHORIZATION")

# remove authorization header since it takes precedence over the token and is set when forwarding bearer token
os.environ["ASKUI__AUTHORIZATION"] = (
f"Basic {base64.b64encode(job_data.askui_token.get_secret_value().encode()).decode()}"
)

try:
await _execute_message_rerunner_job(job_data)
finally:
# Restore previous AUTHORIZATION_HEADER env var
if _previous_authorization is not None:
os.environ["ASKUI__AUTHORIZATION"] = _previous_authorization
except Exception as e:
error = f"{type(e).__name__}: {e}"
_logger.exception("Scheduled job failed: %s", error)

# Always return job data with optional error
return ScheduledJobExecutionResult(data=job_data, error=error)


async def _execute_message_rerunner_job(
job_data: MessageRerunnerData,
) -> None:
"""
Execute a message rerunner job.

Args:
job_data: The job data.
"""
with Session(engine) as session:
message_service = get_message_service(session)
run_service = create_run_service(session, job_data.workspace_id)

# Create message
message_service.create(
workspace_id=job_data.workspace_id,
thread_id=job_data.thread_id,
params=job_data.message,
)

# Create and execute run
_logger.debug("Creating run with assistant %s", job_data.assistant_id)
run, generator = await run_service.create(
workspace_id=job_data.workspace_id,
thread_id=job_data.thread_id,
params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model),
)

# Consume generator to completion of run
_logger.debug("Waiting for run %s to complete", run.id)
async for _event in generator:
pass

# Check if run completed with error
completed_run = run_service.retrieve(
workspace_id=job_data.workspace_id,
thread_id=job_data.thread_id,
run_id=run.id,
)

if completed_run.status == "failed":
error_message = (
completed_run.last_error.message
if completed_run.last_error
else "Run failed with unknown error"
)
raise RuntimeError(error_message)

_logger.info("Scheduled job completed: run_id=%s", run.id)
Loading