diff --git a/.gitignore b/.gitignore index d3158143..f2356f53 100644 --- a/.gitignore +++ b/.gitignore @@ -166,6 +166,8 @@ reports/ .DS_Store /chat /askui_chat.db +/askui_chat.db-shm +/askui_chat.db-wal .cache/ bom.json diff --git a/pdm.lock b/pdm.lock index 6dca5aa4..315f6d85 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "all", "android", "bedrock", "chat", "dev", "pynput", "vertex", "web"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:e550e42ac1cd3060d41e6bc427581badcc9cc332f2b367ceda143dcc2c21b956" +content_hash = "sha256:ea8a886d3282bd07778ce1bbdc5782490f78deca9c4a50e22fbebb6a8742eabd" [[metadata.targets]] requires_python = ">=3.10,<3.14" @@ -123,6 +123,24 @@ files = [ {file = "anyio-4.10.0.tar.gz", hash = "sha256:3f3fae35c96039744587aa5b8371e7e8e603c0702999535961dd336026973ba6"}, ] +[[package]] +name = "apscheduler" +version = "4.0.0a6" +requires_python = ">=3.9" +summary = "In-process task scheduler with Cron-like capabilities" +groups = ["default"] +dependencies = [ + "anyio~=4.0", + "attrs>=22.1", + "tenacity<10.0,>=8.0", + "typing-extensions>=4.0; python_version < \"3.11\"", + "tzlocal>=3.0", +] +files = [ + {file = "apscheduler-4.0.0a6-py3-none-any.whl", hash = "sha256:87031f7537aaa6ea2d3e69fdd9454b641f18938fc8cd0c589972006eceba8ee9"}, + {file = "apscheduler-4.0.0a6.tar.gz", hash = "sha256:5134617c028f097de4a09abbeefc42625cb0ce3adcb4ce49d74cc26054084761"}, +] + [[package]] name = "argcomplete" version = "3.6.2" @@ -4762,6 +4780,20 @@ files = [ {file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +requires_python = ">=3.9" +summary = "tzinfo object for the local timezone" +groups = ["default"] +dependencies = [ + "tzdata; platform_system == \"Windows\"", +] +files = [ + {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"}, + {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"}, +] + [[package]] name = "uri-template" version = "1.3.0" diff --git a/pyproject.toml b/pyproject.toml index 5e48767b..bbec1cb6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "aiofiles>=24.1.0", "anyio==4.10.0", # We need to pin this version otherwise listing mcp tools using fastmcp within runner fails "sqlalchemy[mypy]>=2.0.44", + "apscheduler==4.0.0a6", ] requires-python = ">=3.10,<3.14" readme = "README.md" diff --git a/src/askui/chat/api/app.py b/src/askui/chat/api/app.py index 81d8dcd0..82711ab5 100644 --- a/src/askui/chat/api/app.py +++ b/src/askui/chat/api/app.py @@ -22,6 +22,8 @@ from askui.chat.api.mcp_servers.utility import mcp as utility_mcp from askui.chat.api.messages.router import router as messages_router from askui.chat.api.runs.router import router as runs_router +from askui.chat.api.scheduled_jobs.router import router as scheduled_jobs_router +from askui.chat.api.scheduled_jobs.scheduler import shutdown_scheduler, start_scheduler from askui.chat.api.threads.router import router as threads_router from askui.chat.api.workflows.router import router as workflows_router from askui.chat.migrations.runner import run_migrations @@ -49,7 +51,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001 session = next(get_session()) mcp_config_service = get_mcp_config_service(session=session, settings=settings) mcp_config_service.seed() + + # Start the scheduler for scheduled jobs + logger.info("Starting scheduled job scheduler...") + await start_scheduler() + yield + + # Shutdown scheduler + logger.info("Shutting down scheduled job scheduler...") + await shutdown_scheduler() + logger.info("Disconnecting all MCP clients...") await get_mcp_client_manager_manager(mcp_config_service).disconnect_all(force=True) @@ -70,6 +82,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001 v1_router.include_router(mcp_configs_router) v1_router.include_router(files_router) v1_router.include_router(workflows_router) +v1_router.include_router(scheduled_jobs_router) v1_router.include_router(health_router) app.include_router(v1_router) diff --git a/src/askui/chat/api/db/engine.py b/src/askui/chat/api/db/engine.py index 87931455..53d7d3d8 100644 --- a/src/askui/chat/api/db/engine.py +++ b/src/askui/chat/api/db/engine.py @@ -2,20 +2,40 @@ from sqlite3 import Connection as SQLite3Connection from typing import Any -from sqlalchemy import Engine, create_engine, event +from sqlalchemy import create_engine, event from askui.chat.api.dependencies import get_settings -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) -settings = get_settings() -connect_args = {"check_same_thread": False} -echo = logger.isEnabledFor(logging.DEBUG) -engine = create_engine(settings.db.url, connect_args=connect_args, echo=echo) +_settings = get_settings() +_connect_args = {"check_same_thread": False} +_echo = _logger.isEnabledFor(logging.DEBUG) +# Create engine with optimized settings +engine = create_engine( + _settings.db.url, + connect_args=_connect_args, + echo=_echo, +) -@event.listens_for(Engine, "connect") -def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 + +@event.listens_for(engine, "connect") +def _set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 + """ + Configure SQLite pragmas for optimal web application performance. + + Applied on each new connection: + - foreign_keys=ON: Enable foreign key constraint enforcement + - journal_mode=WAL: Write-Ahead Logging for better concurrency (readers don't block writers) + - synchronous=NORMAL: Sync every 1000 pages instead of every write (faster, still durable with WAL) + - busy_timeout=30000: Wait up to 30 seconds for locks instead of failing immediately + """ cursor = dbapi_conn.cursor() - cursor.execute("PRAGMA foreign_keys=ON") + + cursor.execute("PRAGMA foreign_keys = ON") + cursor.execute("PRAGMA journal_mode = WAL") + cursor.execute("PRAGMA synchronous = NORMAL") + cursor.execute("PRAGMA busy_timeout = 30000") + cursor.close() diff --git a/src/askui/chat/api/models.py b/src/askui/chat/api/models.py index faf1f622..9028abcc 100644 --- a/src/askui/chat/api/models.py +++ b/src/askui/chat/api/models.py @@ -10,6 +10,7 @@ FileId = Annotated[str, IdField("file")] MessageId = Annotated[str, IdField("msg")] RunId = Annotated[str, IdField("run")] +ScheduledJobId = Annotated[str, IdField("schedjob")] ThreadId = Annotated[str, IdField("thread")] WorkspaceId = UUID4 diff --git a/src/askui/chat/api/runs/dependencies.py b/src/askui/chat/api/runs/dependencies.py index 0fefdaca..7dd5085f 100644 --- a/src/askui/chat/api/runs/dependencies.py +++ b/src/askui/chat/api/runs/dependencies.py @@ -1,13 +1,29 @@ from fastapi import Depends +from pydantic import UUID4 +from sqlalchemy.orm import Session -from askui.chat.api.assistants.dependencies import AssistantServiceDep +from askui.chat.api.assistants.dependencies import ( + AssistantServiceDep, + get_assistant_service, +) from askui.chat.api.assistants.service import AssistantService from askui.chat.api.db.session import SessionDep -from askui.chat.api.dependencies import SettingsDep -from askui.chat.api.mcp_clients.dependencies import McpClientManagerManagerDep +from askui.chat.api.dependencies import SettingsDep, get_settings +from askui.chat.api.files.dependencies import get_file_service +from askui.chat.api.mcp_clients.dependencies import ( + McpClientManagerManagerDep, + get_mcp_client_manager_manager, +) from askui.chat.api.mcp_clients.manager import McpClientManagerManager +from askui.chat.api.mcp_configs.dependencies import get_mcp_config_service from askui.chat.api.messages.chat_history_manager import ChatHistoryManager -from askui.chat.api.messages.dependencies import ChatHistoryManagerDep +from askui.chat.api.messages.dependencies import ( + ChatHistoryManagerDep, + get_chat_history_manager, + get_message_service, + get_message_translator, + get_truncation_strategy_factory, +) from askui.chat.api.runs.models import RunListQuery from askui.chat.api.settings import Settings @@ -23,6 +39,12 @@ def get_runs_service( mcp_client_manager_manager: McpClientManagerManager = McpClientManagerManagerDep, settings: Settings = SettingsDep, ) -> RunService: + """ + Get RunService instance for FastAPI dependency injection. + + This function is designed for use with FastAPI's DI system. + For manual construction outside of a request context, use `create_run_service()`. + """ return RunService( session=session, assistant_service=assistant_service, @@ -33,3 +55,42 @@ def get_runs_service( RunServiceDep = Depends(get_runs_service) + + +def create_run_service(session: Session, workspace_id: UUID4) -> RunService: + """ + Create a RunService with all required dependencies manually. + + Use this function when you need a `RunService` outside of FastAPI's + dependency injection context (e.g. APScheduler callbacks). + + Args: + session (Session): Database session. + workspace_id (UUID4): The workspace ID for the run execution. + + Returns: + RunService: Configured run service. + """ + settings = get_settings() + + assistant_service = get_assistant_service(session) + file_service = get_file_service(session, settings) + mcp_config_service = get_mcp_config_service(session, settings) + mcp_client_manager_manager = get_mcp_client_manager_manager(mcp_config_service) + + message_service = get_message_service(session) + message_translator = get_message_translator(file_service, workspace_id) + truncation_strategy_factory = get_truncation_strategy_factory() + chat_history_manager = get_chat_history_manager( + message_service, + message_translator, + truncation_strategy_factory, + ) + + return RunService( + session=session, + assistant_service=assistant_service, + mcp_client_manager_manager=mcp_client_manager_manager, + chat_history_manager=chat_history_manager, + settings=settings, + ) diff --git a/src/askui/chat/api/scheduled_jobs/__init__.py b/src/askui/chat/api/scheduled_jobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/askui/chat/api/scheduled_jobs/dependencies.py b/src/askui/chat/api/scheduled_jobs/dependencies.py new file mode 100644 index 00000000..bc835878 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/dependencies.py @@ -0,0 +1,14 @@ +"""FastAPI dependencies for scheduled jobs.""" + +from fastapi import Depends + +from askui.chat.api.scheduled_jobs.scheduler import scheduler +from askui.chat.api.scheduled_jobs.service import ScheduledJobService + + +def get_scheduled_job_service() -> ScheduledJobService: + """Get ScheduledJobService instance with the singleton scheduler.""" + return ScheduledJobService(scheduler=scheduler) + + +ScheduledJobServiceDep = Depends(get_scheduled_job_service) diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py new file mode 100644 index 00000000..baff6100 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -0,0 +1,122 @@ +"""Executor for scheduled job callbacks.""" + +import base64 +import logging +import os +from typing import Any + +from sqlalchemy.orm import Session + +from askui.chat.api.db.engine import engine +from askui.chat.api.messages.dependencies import get_message_service +from askui.chat.api.runs.dependencies import create_run_service +from askui.chat.api.runs.models import RunCreate +from askui.chat.api.scheduled_jobs.models import ( + MessageRerunnerData, + ScheduledJobExecutionResult, + scheduled_job_data_adapter, +) + +_logger = logging.getLogger(__name__) + + +async def execute_job( + **_kwargs: Any, +) -> ScheduledJobExecutionResult: + """ + APScheduler callback that creates fresh services and executes the job. + + This function is called by APScheduler when a job fires. It creates fresh + database sessions and service instances to avoid stale connections. + + Args: + **_kwargs (Any): Keyword arguments containing job data. + + Returns: + ScheduledJobExecutionResult: The result containing job data and optional error. + """ + # Validates and returns the correct concrete type based on the `type` discriminator + job_data = scheduled_job_data_adapter.validate_python(_kwargs) + + _logger.info( + "Executing scheduled job: workspace=%s, thread=%s", + job_data.workspace_id, + job_data.thread_id, + ) + + error: str | None = None + + try: + # future proofing of new job types + if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance] + # Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars + _previous_authorization = os.environ.get("ASKUI__AUTHORIZATION") + + # remove authorization header since it takes precedence over the token and is set when forwarding bearer token + os.environ["ASKUI__AUTHORIZATION"] = ( + f"Basic {base64.b64encode(job_data.askui_token.get_secret_value().encode()).decode()}" + ) + + try: + await _execute_message_rerunner_job(job_data) + finally: + # Restore previous AUTHORIZATION_HEADER env var + if _previous_authorization is not None: + os.environ["ASKUI__AUTHORIZATION"] = _previous_authorization + except Exception as e: + error = f"{type(e).__name__}: {e}" + _logger.exception("Scheduled job failed: %s", error) + + # Always return job data with optional error + return ScheduledJobExecutionResult(data=job_data, error=error) + + +async def _execute_message_rerunner_job( + job_data: MessageRerunnerData, +) -> None: + """ + Execute a message rerunner job. + + Args: + job_data: The job data. + """ + with Session(engine) as session: + message_service = get_message_service(session) + run_service = create_run_service(session, job_data.workspace_id) + + # Create message + message_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=job_data.message, + ) + + # Create and execute run + _logger.debug("Creating run with assistant %s", job_data.assistant_id) + run, generator = await run_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model), + ) + + # Consume generator to completion of run + _logger.debug("Waiting for run %s to complete", run.id) + async for _event in generator: + pass + + # Check if run completed with error + completed_run = run_service.retrieve( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + run_id=run.id, + ) + + if completed_run.status == "failed": + error_message = ( + completed_run.last_error.message + if completed_run.last_error + else "Run failed with unknown error" + ) + raise RuntimeError(error_message) + + _logger.info("Scheduled job completed: run_id=%s", run.id) diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py new file mode 100644 index 00000000..c220e9d0 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -0,0 +1,205 @@ +from typing import Literal, Union + +from apscheduler import Schedule +from apscheduler.triggers.date import DateTrigger +from pydantic import BaseModel, Field, SecretStr, TypeAdapter + +from askui.chat.api.messages.models import ROOT_MESSAGE_PARENT_ID, MessageCreate +from askui.chat.api.models import ( + AssistantId, + MessageId, + ScheduledJobId, + ThreadId, + WorkspaceId, +) +from askui.utils.datetime_utils import UnixDatetime +from askui.utils.id_utils import generate_time_ordered_id + + +class ScheduledMessageCreate(MessageCreate): + """ + Message creation parameters for scheduled jobs. + + Extends `MessageCreate` with required `parent_id` to ensure the message + is added to the correct branch in the conversation tree, since the thread + state may change between scheduling and execution. + + Args: + parent_id (MessageId): The parent message ID for branching. + Required for scheduled messages. Use `ROOT_MESSAGE_PARENT_ID` to + create a new root branch. + """ + + parent_id: MessageId = Field(default=ROOT_MESSAGE_PARENT_ID) # pyright: ignore + + +# ============================================================================= +# API Input Models (without workspace_id - injected from header) +# ============================================================================= + + +class _BaseMessageRerunnerDataCreate(BaseModel): + """ + API input data for the message_rerunner job type. + + This job type creates a new message in a thread and executes a run + at the scheduled time. + + Args: + type (ScheduledJobType): The type of the scheduled job. + thread_id (ThreadId): The thread to add the message to. + assistant_id (AssistantId): The assistant to run. + model (str): The model to use for the run. + message (ScheduledMessageCreate): The message to create. + askui_token (str): The AskUI token to use for authenticated API calls + when the job executes. This is a long-lived credential that doesn't + expire like Bearer tokens. + """ + + type: Literal["message_rerunner"] = "message_rerunner" + name: str + thread_id: ThreadId + assistant_id: AssistantId + model: str + message: ScheduledMessageCreate + askui_token: SecretStr + + +class ScheduledJobCreate(BaseModel): + """ + API input data for scheduled job creation. + + Args: + next_fire_time (UnixDatetime): The time when the job should execute. + data (ScheduledJobData): The data for the job. + """ + + next_fire_time: UnixDatetime + data: _BaseMessageRerunnerDataCreate + + +# ============================================================================= +# Internal Models (with workspace_id - populated after injection) +# ============================================================================= + + +class MessageRerunnerData(_BaseMessageRerunnerDataCreate): + """ + Internal data for the message_rerunner job type. + + Extends `MessageRerunnerDataCreate` with required `workspace_id` that is + injected from the request header. + + Args: + workspace_id (WorkspaceId): The workspace this job belongs to. + """ + + workspace_id: WorkspaceId + + +# Discriminated union of all job data types (extensible for future types) +ScheduledJobData = Union[MessageRerunnerData] + +scheduled_job_data_adapter: TypeAdapter[ScheduledJobData] = TypeAdapter( + ScheduledJobData +) + + +class ScheduledJob(BaseModel): + """ + A scheduled job that will execute at a specified time. + + Maps to APScheduler's `Schedule` structure for easy conversion. + + Args: + id (ScheduledJobId): Unique identifier for the scheduled job. + Maps to `Schedule.id`. + next_fire_time (UnixDatetime): When the job is scheduled to execute. + Maps to `Schedule.next_fire_time` or `Schedule.trigger.run_time`. + data (ScheduledJobData): Type-specific job data. Always contains `type` and + `workspace_id`. Maps to `Schedule.kwargs`. + object (Literal["scheduled_job"]): Object type identifier. + """ + + id: ScheduledJobId + object: Literal["scheduled_job"] = "scheduled_job" + next_fire_time: UnixDatetime + data: ScheduledJobData + + @classmethod + def create( + cls, + workspace_id: WorkspaceId, + params: ScheduledJobCreate, + ) -> "ScheduledJob": + """ + Create a new ScheduledJob with a generated ID. + + Args: + workspace_id (WorkspaceId): The workspace this job belongs to. + params (ScheduledJobCreate): The job creation parameters. + + Returns: + ScheduledJob: The created scheduled job. + """ + return cls( + id=generate_time_ordered_id("schedjob"), + next_fire_time=params.next_fire_time, + data=MessageRerunnerData( + workspace_id=workspace_id, + name=params.data.name, + thread_id=params.data.thread_id, + assistant_id=params.data.assistant_id, + model=params.data.model, + message=params.data.message, + askui_token=params.data.askui_token, + ), + ) + + @classmethod + def from_schedule(cls, schedule: Schedule) -> "ScheduledJob": + """ + Create a ScheduledJob from an APScheduler Schedule. + + Args: + schedule (Schedule): The APScheduler schedule to convert. + + Returns: + ScheduledJob: The converted scheduled job. + + Raises: + ValueError: If the schedule has no determinable `next_fire_time`. + """ + # Extract next_fire_time from schedule or trigger + next_fire_time: UnixDatetime + if schedule.next_fire_time is not None: + next_fire_time = schedule.next_fire_time + elif isinstance(schedule.trigger, DateTrigger): + next_fire_time = schedule.trigger.run_time + else: + error_msg = f"Schedule {schedule.id} has no next_fire_time" + raise ValueError(error_msg) + # Reconstruct data from kwargs + data = MessageRerunnerData.model_validate(schedule.kwargs or {}) + + return cls( + id=schedule.id, + next_fire_time=next_fire_time, + data=data, + ) + + +class ScheduledJobExecutionResult(BaseModel): + """ + Return value stored by the job executor in APScheduler's job result. + + This ensures we always have job data available even if the job fails, + since APScheduler clears return_value on exception. + + Args: + data (ScheduledJobData): The job data that was executed. + error (str | None): Error message if the job failed. + """ + + data: ScheduledJobData + error: str | None = None diff --git a/src/askui/chat/api/scheduled_jobs/router.py b/src/askui/chat/api/scheduled_jobs/router.py new file mode 100644 index 00000000..794e9474 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/router.py @@ -0,0 +1,59 @@ +"""API router for scheduled jobs.""" + +from typing import Annotated + +from fastapi import APIRouter, Header, status + +from askui.chat.api.models import ScheduledJobId, WorkspaceId +from askui.chat.api.scheduled_jobs.dependencies import ScheduledJobServiceDep +from askui.chat.api.scheduled_jobs.models import ScheduledJob, ScheduledJobCreate +from askui.chat.api.scheduled_jobs.service import ScheduledJobService +from askui.utils.api_utils import ListResponse + +router = APIRouter(prefix="/scheduled-jobs", tags=["scheduled-jobs"]) + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_scheduled_job( + askui_workspace: Annotated[WorkspaceId, Header()], + params: ScheduledJobCreate, + scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, +) -> ScheduledJob: + """Create a new scheduled job.""" + + return await scheduled_job_service.create( + workspace_id=askui_workspace, + params=params, + ) + + +@router.get("") +async def list_scheduled_jobs( + askui_workspace: Annotated[WorkspaceId, Header()], + scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, +) -> ListResponse[ScheduledJob]: + """List scheduled jobs with optional status filter.""" + return await scheduled_job_service.list_( + workspace_id=askui_workspace, + ) + + +@router.delete("/{job_id}", status_code=status.HTTP_204_NO_CONTENT) +async def cancel_scheduled_job( + askui_workspace: Annotated[WorkspaceId, Header()], + job_id: ScheduledJobId, + scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, +) -> None: + """ + Cancel a scheduled job. + + Only works for jobs with status 'pending'. Removes the job from the scheduler. + Cancelled jobs have no history (they are simply removed). + + Raises: + NotFoundError: If the job is not found or already executed. + """ + await scheduled_job_service.cancel( + workspace_id=askui_workspace, + job_id=job_id, + ) diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py new file mode 100644 index 00000000..aa3d2afb --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -0,0 +1,53 @@ +""" +Module-level APScheduler singleton management. + +Similar to how `engine.py` manages the database engine, this module manages +the APScheduler instance as a singleton to ensure jobs persist across requests. + +Uses the shared database engine from `engine.py` which is configured with +optimized SQLite pragmas for concurrent access (WAL mode, etc.). +""" + +import logging +from datetime import timedelta +from typing import Any + +from apscheduler import AsyncScheduler +from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore + +from askui.chat.api.db.engine import engine + +_logger = logging.getLogger(__name__) + +# Use shared engine from db/engine.py (already configured with SQLite pragmas) +# APScheduler will create its own tables (apscheduler_*) in the same database +_data_store: Any = SQLAlchemyDataStore(engine_or_url=engine) + +# Module-level singleton scheduler instance +# - max_concurrent_jobs=1: only one job runs at a time (sequential execution) +# At module level: just create the scheduler (don't start it) +scheduler: AsyncScheduler = AsyncScheduler( + data_store=_data_store, + max_concurrent_jobs=1, + cleanup_interval=timedelta(minutes=1), # Cleanup every minute +) + + +async def start_scheduler() -> None: + """ + Start the scheduler to begin processing jobs. + + This initializes the scheduler and starts it in the background so it can + poll for and execute scheduled jobs while the FastAPI application handles requests. + """ + # First initialize the scheduler via context manager entry + await scheduler.__aenter__() + # Then start background processing of jobs + await scheduler.start_in_background() + _logger.info("Scheduler started in background") + + +async def shutdown_scheduler() -> None: + """Shut down the scheduler gracefully.""" + await scheduler.__aexit__(None, None, None) + _logger.info("Scheduler shut down") diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py new file mode 100644 index 00000000..37d2e0f2 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -0,0 +1,145 @@ +"""Service for managing scheduled jobs.""" + +import logging +from datetime import timedelta + +from apscheduler import AsyncScheduler, Schedule +from apscheduler.triggers.date import DateTrigger + +from askui.chat.api.models import ScheduledJobId, WorkspaceId +from askui.chat.api.scheduled_jobs.executor import execute_job +from askui.chat.api.scheduled_jobs.models import ScheduledJob, ScheduledJobCreate +from askui.utils.api_utils import ListResponse, NotFoundError + +logger = logging.getLogger(__name__) + + +class ScheduledJobService: + """ + Service for managing scheduled jobs using APScheduler. + + This service provides methods to create, list, and cancel scheduled jobs. + Job data is stored in APScheduler's SQLAlchemy data store. + + Args: + scheduler (Any): The APScheduler `AsyncScheduler` instance to use. + """ + + def __init__(self, scheduler: AsyncScheduler) -> None: + self._scheduler: AsyncScheduler = scheduler + + async def create( + self, + workspace_id: WorkspaceId, + params: ScheduledJobCreate, + ) -> ScheduledJob: + """ + Create a new scheduled job. + + Args: + workspace_id (WorkspaceId): The workspace this job belongs to. + params (ScheduledJobCreate): The job creation parameters. + + Returns: + ScheduledJob: The created scheduled job. + """ + job = ScheduledJob.create( + workspace_id=workspace_id, + params=params, + ) + + # Prepare kwargs for the job callback + + logger.info( + "Creating scheduled job: id=%s, type=%s, next_fire_time=%s", + job.id, + job.data.type, + job.next_fire_time, + ) + + await self._scheduler.add_schedule( + func_or_task_id=execute_job, + trigger=DateTrigger(run_time=job.next_fire_time), + id=job.id, + kwargs={ + **job.data.model_dump(mode="json"), + "askui_token": job.data.askui_token.get_secret_value(), + }, + misfire_grace_time=timedelta(minutes=10), + job_result_expiration_time=timedelta(weeks=30000), # Never expire + ) + + logger.info("Scheduled job created: %s", job.id) + return job + + async def list_( + self, + workspace_id: WorkspaceId, + ) -> ListResponse[ScheduledJob]: + """ + List pending scheduled jobs. + + Args: + workspace_id (WorkspaceId): Filter by workspace. + query (ListQuery): Query parameters. + + Returns: + ListResponse[ScheduledJob]: Paginated list of pending scheduled jobs. + """ + jobs = await self._get_pending_jobs(workspace_id) + + return ListResponse( + data=jobs, + has_more=False, + first_id=jobs[0].id if jobs else None, + last_id=jobs[-1].id if jobs else None, + ) + + async def cancel( + self, + workspace_id: WorkspaceId, + job_id: ScheduledJobId, + ) -> None: + """ + Cancel a scheduled job. + + This removes the schedule from APScheduler. Only works for pending jobs. + + Args: + workspace_id (WorkspaceId): The workspace the job belongs to. + job_id (ScheduledJobId): The job ID to cancel. + + Raises: + NotFoundError: If the job is not found or already executed. + """ + logger.info("Canceling scheduled job: %s", job_id) + + schedules: list[Schedule] = await self._scheduler.data_store.get_schedules( + {job_id} + ) + + if not schedules: + msg = f"Scheduled job {job_id} not found" + raise NotFoundError(msg) + + scheduled_job = ScheduledJob.from_schedule(schedules[0]) + if scheduled_job.data.workspace_id != workspace_id: + msg = f"Scheduled job {job_id} not found in workspace {workspace_id}" + raise NotFoundError(msg) + + await self._scheduler.data_store.remove_schedules([job_id]) + logger.info("Scheduled job canceled: %s", job_id) + + async def _get_pending_jobs(self, workspace_id: WorkspaceId) -> list[ScheduledJob]: + """Get pending jobs from APScheduler schedules.""" + scheduled_jobs: list[ScheduledJob] = [] + + schedules: list[Schedule] = await self._scheduler.data_store.get_schedules() + + for schedule in schedules: + scheduled_job = ScheduledJob.from_schedule(schedule) + if scheduled_job.data.workspace_id != workspace_id: + continue + scheduled_jobs.append(scheduled_job) + + return sorted(scheduled_jobs, key=lambda x: x.next_fire_time) diff --git a/src/askui/chat/api/settings.py b/src/askui/chat/api/settings.py index 59f7a117..c2823984 100644 --- a/src/askui/chat/api/settings.py +++ b/src/askui/chat/api/settings.py @@ -16,7 +16,7 @@ class DbSettings(BaseModel): url: str = Field( default_factory=lambda: f"sqlite:///{(Path.cwd().absolute() / 'askui_chat.db').as_posix()}", - description="Database URL for SQLAlchemy connection", + description="Database URL for SQLAlchemy connection (used for all data including scheduler)", ) auto_migrate: bool = Field( default=True, diff --git a/src/askui/models/anthropic/factory.py b/src/askui/models/anthropic/factory.py index 4b81102d..27bd516f 100644 --- a/src/askui/models/anthropic/factory.py +++ b/src/askui/models/anthropic/factory.py @@ -1,4 +1,3 @@ -import functools from typing import Literal from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex @@ -9,7 +8,6 @@ AnthropicApiClient = Anthropic | AnthropicBedrock | AnthropicVertex -@functools.cache def create_api_client( api_provider: AnthropicApiProvider, ) -> AnthropicApiClient: