From 9df1002cf07d0073fb2a87eb3476f2b5d42e5523 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 16 Dec 2025 17:13:03 +0100 Subject: [PATCH 01/15] feat(messages): add support for injecting cancelled tool results in message creation Enhanced the `create_message` method in `MessageService` to include an optional parameter `inject_cancelled_tool_results`. When set to `True`, it creates cancelled tool results if the parent message contains pending tool_use blocks. This change improves the handling of tool execution interruptions by providing relevant feedback in the message flow. --- src/askui/chat/api/messages/router.py | 5 +- src/askui/chat/api/messages/service.py | 78 +++++++++++++++++++++++++- 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/src/askui/chat/api/messages/router.py b/src/askui/chat/api/messages/router.py index 1878a260..79f2eb9e 100644 --- a/src/askui/chat/api/messages/router.py +++ b/src/askui/chat/api/messages/router.py @@ -64,7 +64,10 @@ async def create_message( message_service: MessageService = MessageServiceDep, ) -> Message: return message_service.create( - workspace_id=askui_workspace, thread_id=thread_id, params=params + workspace_id=askui_workspace, + thread_id=thread_id, + params=params, + inject_cancelled_tool_results=True, ) diff --git a/src/askui/chat/api/messages/service.py b/src/askui/chat/api/messages/service.py index 45cfc06b..6c80f2aa 100644 --- a/src/askui/chat/api/messages/service.py +++ b/src/askui/chat/api/messages/service.py @@ -7,6 +7,8 @@ ROOT_MESSAGE_PARENT_ID, Message, MessageCreate, + ToolResultBlockParam, + ToolUseBlockParam, ) from askui.chat.api.messages.orms import MessageOrm from askui.chat.api.models import MessageId, ThreadId, WorkspaceId @@ -19,6 +21,11 @@ NotFoundError, ) +_CANCELLED_TOOL_RESULT_CONTENT = ( + "Tool execution was cancelled because the previous run was interrupted. " + "Please retry the operation if needed." +) + class MessageService: """Service for managing Message resources with database persistence.""" @@ -26,6 +33,56 @@ class MessageService: def __init__(self, session: Session) -> None: self._session = session + def _create_cancelled_tool_results( + self, + workspace_id: WorkspaceId, + thread_id: ThreadId, + parent_message: Message, + run_id: str | None, + ) -> MessageId: + """Create cancelled tool results if parent has pending tool_use blocks. + + Args: + workspace_id (WorkspaceId): The workspace ID. + thread_id (ThreadId): The thread ID. + parent_message (Message): The parent message to check for tool_use blocks. + run_id (str | None): The run ID to associate with the tool result message. + + Returns: + MessageId: The ID of the created tool result message, or the parent + message ID if no tool_use blocks were found. + """ + if not isinstance(parent_message.content, list): + return parent_message.id + + tool_use_blocks = [ + block + for block in parent_message.content + if isinstance(block, ToolUseBlockParam) + ] + if not tool_use_blocks: + return parent_message.id + + tool_result_content: list[ToolResultBlockParam] = [ + ToolResultBlockParam( + tool_use_id=block.id, + content=_CANCELLED_TOOL_RESULT_CONTENT, + is_error=True, + ) + for block in tool_use_blocks + ] + tool_result_params = MessageCreate( + role="user", + content=tool_result_content, # type: ignore[arg-type] + parent_id=parent_message.id, + run_id=run_id, + ) + tool_result_message = Message.create( + workspace_id, thread_id, tool_result_params + ) + self._session.add(MessageOrm.from_model(tool_result_message)) + return tool_result_message.id + def _find_by_id( self, workspace_id: WorkspaceId, thread_id: ThreadId, message_id: MessageId ) -> MessageOrm: @@ -216,8 +273,18 @@ def create( workspace_id: WorkspaceId, thread_id: ThreadId, params: MessageCreate, + inject_cancelled_tool_results: bool = False, ) -> Message: - """Create a new message.""" + """Create a new message. + + Args: + workspace_id (WorkspaceId): The workspace ID. + thread_id (ThreadId): The thread ID. + params (MessageCreate): The message creation parameters. + inject_cancelled_tool_results (bool, optional): If `True`, inject cancelled + tool results when the parent message has pending tool_use blocks. + Defaults to `False`. + """ # Validate thread exists thread_orm: ThreadOrm | None = ( self._session.query(ThreadOrm) @@ -258,6 +325,15 @@ def create( ) raise NotFoundError(error_msg) + # If parent has tool_use, create cancelled tool_result first + if inject_cancelled_tool_results: + params.parent_id = self._create_cancelled_tool_results( + workspace_id, + thread_id, + parent_message_orm.to_model(), + params.run_id, + ) + message = Message.create(workspace_id, thread_id, params) message_orm = MessageOrm.from_model(message) self._session.add(message_orm) From f7fc4c3ba4b8349d1c59cddf1760236d5c19491e Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 16 Dec 2025 17:39:28 +0100 Subject: [PATCH 02/15] fix(messages): update tool result content type in MessageService Changed the type of `tool_result_content` in the `create_message` method from `list[ToolResultBlockParam]` to `list[ContentBlockParam]` to ensure mypy compatability. --- src/askui/chat/api/messages/service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/askui/chat/api/messages/service.py b/src/askui/chat/api/messages/service.py index 6c80f2aa..917e22d2 100644 --- a/src/askui/chat/api/messages/service.py +++ b/src/askui/chat/api/messages/service.py @@ -5,6 +5,7 @@ from askui.chat.api.messages.models import ( ROOT_MESSAGE_PARENT_ID, + ContentBlockParam, Message, MessageCreate, ToolResultBlockParam, @@ -63,7 +64,7 @@ def _create_cancelled_tool_results( if not tool_use_blocks: return parent_message.id - tool_result_content: list[ToolResultBlockParam] = [ + tool_result_content: list[ContentBlockParam] = [ ToolResultBlockParam( tool_use_id=block.id, content=_CANCELLED_TOOL_RESULT_CONTENT, @@ -73,7 +74,7 @@ def _create_cancelled_tool_results( ] tool_result_params = MessageCreate( role="user", - content=tool_result_content, # type: ignore[arg-type] + content=tool_result_content, parent_id=parent_message.id, run_id=run_id, ) From 6e74dc91dd3f93d9d2bc244ac861e2889a54eba0 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Wed, 24 Dec 2025 18:08:22 +0100 Subject: [PATCH 03/15] feat(scheduled_jobs): implement scheduled job management with APScheduler Added functionality for managing scheduled jobs, including creation, listing, and cancellation. Introduced `ScheduledJobService` for handling job operations and integrated it into the FastAPI application. Updated the application to start and shutdown the scheduler during lifespan events. Added necessary models and dependencies for scheduled job data handling. --- pdm.lock | 34 +++- pyproject.toml | 1 + src/askui/chat/api/app.py | 13 ++ src/askui/chat/api/models.py | 1 + src/askui/chat/api/runs/dependencies.py | 69 +++++++- src/askui/chat/api/scheduled_jobs/__init__.py | 0 .../chat/api/scheduled_jobs/dependencies.py | 16 ++ src/askui/chat/api/scheduled_jobs/executor.py | 72 ++++++++ src/askui/chat/api/scheduled_jobs/models.py | 166 ++++++++++++++++++ src/askui/chat/api/scheduled_jobs/router.py | 75 ++++++++ .../chat/api/scheduled_jobs/scheduler.py | 50 ++++++ src/askui/chat/api/scheduled_jobs/service.py | 151 ++++++++++++++++ 12 files changed, 643 insertions(+), 5 deletions(-) create mode 100644 src/askui/chat/api/scheduled_jobs/__init__.py create mode 100644 src/askui/chat/api/scheduled_jobs/dependencies.py create mode 100644 src/askui/chat/api/scheduled_jobs/executor.py create mode 100644 src/askui/chat/api/scheduled_jobs/models.py create mode 100644 src/askui/chat/api/scheduled_jobs/router.py create mode 100644 src/askui/chat/api/scheduled_jobs/scheduler.py create mode 100644 src/askui/chat/api/scheduled_jobs/service.py diff --git a/pdm.lock b/pdm.lock index 4b929458..6fa386e4 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "all", "android", "bedrock", "chat", "dev", "pynput", "vertex", "web"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:bceafb533a2ba147d58188facea3b328fbe5bf1adf15fa8af86950f7a65a1cf0" +content_hash = "sha256:9f1fc6702e8b01ea917c353abc069e6a310e13bec4b186c9ec86303b0067aef7" [[metadata.targets]] requires_python = ">=3.10,<3.14" @@ -123,6 +123,24 @@ files = [ {file = "anyio-4.10.0.tar.gz", hash = "sha256:3f3fae35c96039744587aa5b8371e7e8e603c0702999535961dd336026973ba6"}, ] +[[package]] +name = "apscheduler" +version = "4.0.0a6" +requires_python = ">=3.9" +summary = "In-process task scheduler with Cron-like capabilities" +groups = ["default"] +dependencies = [ + "anyio~=4.0", + "attrs>=22.1", + "tenacity<10.0,>=8.0", + "typing-extensions>=4.0; python_version < \"3.11\"", + "tzlocal>=3.0", +] +files = [ + {file = "apscheduler-4.0.0a6-py3-none-any.whl", hash = "sha256:87031f7537aaa6ea2d3e69fdd9454b641f18938fc8cd0c589972006eceba8ee9"}, + {file = "apscheduler-4.0.0a6.tar.gz", hash = "sha256:5134617c028f097de4a09abbeefc42625cb0ce3adcb4ce49d74cc26054084761"}, +] + [[package]] name = "argcomplete" version = "3.6.2" @@ -4496,6 +4514,20 @@ files = [ {file = "tzdata-2025.2.tar.gz", hash = "sha256:b60a638fcc0daffadf82fe0f57e53d06bdec2f36c4df66280ae79bce6bd6f2b9"}, ] +[[package]] +name = "tzlocal" +version = "5.3.1" +requires_python = ">=3.9" +summary = "tzinfo object for the local timezone" +groups = ["default"] +dependencies = [ + "tzdata; platform_system == \"Windows\"", +] +files = [ + {file = "tzlocal-5.3.1-py3-none-any.whl", hash = "sha256:eb1a66c3ef5847adf7a834f1be0800581b683b5608e74f86ecbcef8ab91bb85d"}, + {file = "tzlocal-5.3.1.tar.gz", hash = "sha256:cceffc7edecefea1f595541dbd6e990cb1ea3d19bf01b2809f362a03dd7921fd"}, +] + [[package]] name = "urllib3" version = "2.5.0" diff --git a/pyproject.toml b/pyproject.toml index 6a21051f..6bb4664e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ "aiofiles>=24.1.0", "anyio==4.10.0", # We need to pin this version otherwise listing mcp tools using fastmcp within runner fails "sqlalchemy[mypy]>=2.0.44", + "apscheduler==4.0.0a6", ] requires-python = ">=3.10,<3.14" readme = "README.md" diff --git a/src/askui/chat/api/app.py b/src/askui/chat/api/app.py index 81d8dcd0..82711ab5 100644 --- a/src/askui/chat/api/app.py +++ b/src/askui/chat/api/app.py @@ -22,6 +22,8 @@ from askui.chat.api.mcp_servers.utility import mcp as utility_mcp from askui.chat.api.messages.router import router as messages_router from askui.chat.api.runs.router import router as runs_router +from askui.chat.api.scheduled_jobs.router import router as scheduled_jobs_router +from askui.chat.api.scheduled_jobs.scheduler import shutdown_scheduler, start_scheduler from askui.chat.api.threads.router import router as threads_router from askui.chat.api.workflows.router import router as workflows_router from askui.chat.migrations.runner import run_migrations @@ -49,7 +51,17 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001 session = next(get_session()) mcp_config_service = get_mcp_config_service(session=session, settings=settings) mcp_config_service.seed() + + # Start the scheduler for scheduled jobs + logger.info("Starting scheduled job scheduler...") + await start_scheduler() + yield + + # Shutdown scheduler + logger.info("Shutting down scheduled job scheduler...") + await shutdown_scheduler() + logger.info("Disconnecting all MCP clients...") await get_mcp_client_manager_manager(mcp_config_service).disconnect_all(force=True) @@ -70,6 +82,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # noqa: ARG001 v1_router.include_router(mcp_configs_router) v1_router.include_router(files_router) v1_router.include_router(workflows_router) +v1_router.include_router(scheduled_jobs_router) v1_router.include_router(health_router) app.include_router(v1_router) diff --git a/src/askui/chat/api/models.py b/src/askui/chat/api/models.py index faf1f622..9028abcc 100644 --- a/src/askui/chat/api/models.py +++ b/src/askui/chat/api/models.py @@ -10,6 +10,7 @@ FileId = Annotated[str, IdField("file")] MessageId = Annotated[str, IdField("msg")] RunId = Annotated[str, IdField("run")] +ScheduledJobId = Annotated[str, IdField("schedjob")] ThreadId = Annotated[str, IdField("thread")] WorkspaceId = UUID4 diff --git a/src/askui/chat/api/runs/dependencies.py b/src/askui/chat/api/runs/dependencies.py index 0fefdaca..a03a8710 100644 --- a/src/askui/chat/api/runs/dependencies.py +++ b/src/askui/chat/api/runs/dependencies.py @@ -1,13 +1,29 @@ from fastapi import Depends +from pydantic import UUID4 +from sqlalchemy.orm import Session -from askui.chat.api.assistants.dependencies import AssistantServiceDep +from askui.chat.api.assistants.dependencies import ( + AssistantServiceDep, + get_assistant_service, +) from askui.chat.api.assistants.service import AssistantService from askui.chat.api.db.session import SessionDep -from askui.chat.api.dependencies import SettingsDep -from askui.chat.api.mcp_clients.dependencies import McpClientManagerManagerDep +from askui.chat.api.dependencies import SettingsDep, get_settings +from askui.chat.api.files.dependencies import get_file_service +from askui.chat.api.mcp_clients.dependencies import ( + McpClientManagerManagerDep, + get_mcp_client_manager_manager, +) from askui.chat.api.mcp_clients.manager import McpClientManagerManager +from askui.chat.api.mcp_configs.dependencies import get_mcp_config_service from askui.chat.api.messages.chat_history_manager import ChatHistoryManager -from askui.chat.api.messages.dependencies import ChatHistoryManagerDep +from askui.chat.api.messages.dependencies import ( + ChatHistoryManagerDep, + get_chat_history_manager, + get_message_service, + get_message_translator, + get_truncation_strategy_factory, +) from askui.chat.api.runs.models import RunListQuery from askui.chat.api.settings import Settings @@ -23,6 +39,12 @@ def get_runs_service( mcp_client_manager_manager: McpClientManagerManager = McpClientManagerManagerDep, settings: Settings = SettingsDep, ) -> RunService: + """ + Get RunService instance for FastAPI dependency injection. + + This function is designed for use with FastAPI's DI system. + For manual construction outside of a request context, use `create_run_service()`. + """ return RunService( session=session, assistant_service=assistant_service, @@ -33,3 +55,42 @@ def get_runs_service( RunServiceDep = Depends(get_runs_service) + + +def create_run_service(session: Session, workspace_id: UUID4) -> RunService: + """ + Create a RunService with all required dependencies manually. + + Use this function when you need a `RunService` outside of FastAPI's + dependency injection context (e.g., background tasks, APScheduler callbacks). + + Args: + session (Session): Database session. + workspace_id (UUID4): The workspace ID for the run execution. + + Returns: + RunService: Configured run service. + """ + settings = get_settings() + + assistant_service = get_assistant_service(session) + file_service = get_file_service(session, settings) + mcp_config_service = get_mcp_config_service(session, settings) + mcp_client_manager_manager = get_mcp_client_manager_manager(mcp_config_service) + + message_service = get_message_service(session) + message_translator = get_message_translator(file_service, workspace_id) + truncation_strategy_factory = get_truncation_strategy_factory() + chat_history_manager = get_chat_history_manager( + message_service, + message_translator, + truncation_strategy_factory, + ) + + return RunService( + session=session, + assistant_service=assistant_service, + mcp_client_manager_manager=mcp_client_manager_manager, + chat_history_manager=chat_history_manager, + settings=settings, + ) diff --git a/src/askui/chat/api/scheduled_jobs/__init__.py b/src/askui/chat/api/scheduled_jobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/askui/chat/api/scheduled_jobs/dependencies.py b/src/askui/chat/api/scheduled_jobs/dependencies.py new file mode 100644 index 00000000..9adf7f9d --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/dependencies.py @@ -0,0 +1,16 @@ +"""FastAPI dependencies for scheduled jobs.""" + +from typing import Any + +from fastapi import Depends + +from askui.chat.api.scheduled_jobs.scheduler import scheduler +from askui.chat.api.scheduled_jobs.service import ScheduledJobService + + +def get_scheduled_job_service() -> ScheduledJobService: + """Get ScheduledJobService instance with the singleton scheduler.""" + return ScheduledJobService(scheduler=scheduler) + + +ScheduledJobServiceDep = Depends(get_scheduled_job_service) diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py new file mode 100644 index 00000000..75b1425e --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -0,0 +1,72 @@ +"""Executor for scheduled job callbacks.""" + +import logging +from typing import Any + +from sqlalchemy.orm import Session + +from askui.chat.api.db.engine import engine +from askui.chat.api.messages.dependencies import get_message_service +from askui.chat.api.runs.dependencies import create_run_service +from askui.chat.api.runs.models import RunCreate +from askui.chat.api.scheduled_jobs.models import scheduled_job_data_adapter + +_logger = logging.getLogger(__name__) + + +async def execute_job( + **_kwargs: Any, +) -> None: + """ + APScheduler callback that creates fresh services and executes the job. + + This function is called by APScheduler when a job fires. It creates fresh + database sessions and service instances to avoid stale connections. + + Args: + workspace_id (str): The workspace ID (as string from JSON serialization). + thread_id (str): The thread ID for the message. + assistant_id (str): The assistant ID to run. + model (str): The model to use for the run. + message (dict[str, Any]): The message data to create. + **_kwargs (Any): Additional keyword arguments (ignored). + + Returns: + dict[str, Any]: Result containing the `run_id`. + """ + + # Validates and returns the correct concrete type based on the `type` discriminator + job_data = scheduled_job_data_adapter.validate_python(_kwargs) + + _logger.info( + "Executing scheduled job: workspace=%s, thread=%s", + job_data.workspace_id, + job_data.thread_id, + ) + + # Create fresh session for this job execution + with Session(engine) as session: + message_service = get_message_service(session) + run_service = create_run_service(session, job_data.workspace_id) + + # Create message + message_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=job_data.message, + ) + + # Create and execute run + _logger.debug("Creating run with assistant %s", job_data.assistant_id) + run, generator = await run_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model), + ) + + # Consume generator to completion + _logger.debug("Waiting for run %s to complete", run.id) + async for _event in generator: + pass + + _logger.info("Scheduled job completed: run_id=%s", run.id) diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py new file mode 100644 index 00000000..04796f1a --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -0,0 +1,166 @@ +from typing import Literal, Union + +from apscheduler import Schedule +from apscheduler.triggers.date import DateTrigger +from pydantic import BaseModel, Field, TypeAdapter + +from askui.chat.api.messages.models import ROOT_MESSAGE_PARENT_ID, MessageCreate +from askui.chat.api.models import ( + AssistantId, + MessageId, + ScheduledJobId, + ThreadId, + WorkspaceId, +) +from askui.utils.datetime_utils import UnixDatetime +from askui.utils.id_utils import generate_time_ordered_id + + +class ScheduledMessageCreate(MessageCreate): + """ + Message creation parameters for scheduled jobs. + + Extends `MessageCreate` with required `parent_id` to ensure the message + is added to the correct branch in the conversation tree, since the thread + state may change between scheduling and execution. + + Args: + parent_id (MessageId): The parent message ID for branching. + Required for scheduled messages. Use `ROOT_MESSAGE_PARENT_ID` to + create a new root branch. + """ + + parent_id: MessageId = Field(default=ROOT_MESSAGE_PARENT_ID) # pyright: ignore + + +# ============================================================================= +# API Input Models (without workspace_id - injected from header) +# ============================================================================= + + +class _BaseMessageRerunnerDataCreate(BaseModel): + """ + API input data for the message_rerunner job type. + + This job type creates a new message in a thread and executes a run + at the scheduled time. + + Args: + type (ScheduledJobType): The type of the scheduled job. + thread_id (ThreadId): The thread to add the message to. + assistant_id (AssistantId): The assistant to run. + model (str): The model to use for the run. + message (ScheduledMessageCreate): The message to create. + """ + + type: Literal["message_rerunner"] = "message_rerunner" + thread_id: ThreadId + assistant_id: AssistantId + model: str + message: ScheduledMessageCreate + + +class ScheduledJobCreate(BaseModel): + """ + API input data for scheduled job creation. + + Args: + next_fire_time (UnixDatetime): The time when the job should execute. + data (ScheduledJobData): The data for the job. + """ + + next_fire_time: UnixDatetime + data: _BaseMessageRerunnerDataCreate + + +# ============================================================================= +# Internal Models (with workspace_id - populated after injection) +# ============================================================================= + + +class MessageRerunnerData(_BaseMessageRerunnerDataCreate): + """ + Internal data for the message_rerunner job type. + + Extends `MessageRerunnerDataCreate` with required `workspace_id` that is + injected from the request header. + + Args: + workspace_id (WorkspaceId): The workspace this job belongs to. + """ + + workspace_id: WorkspaceId + + +# Discriminated union of all job data types (extensible for future types) +ScheduledJobData = Union[MessageRerunnerData] + +scheduled_job_data_adapter: TypeAdapter[ScheduledJobData] = TypeAdapter( + ScheduledJobData +) + + +class ScheduledJob(BaseModel): + """ + A scheduled job that will execute at a specified time. + + Maps to APScheduler's `Schedule` structure for easy conversion. + + Args: + id (ScheduledJobId): Unique identifier for the scheduled job. + Maps to `Schedule.id`. + next_fire_time (UnixDatetime): When the job is scheduled to execute. + Maps to `Schedule.next_fire_time` or `Schedule.trigger.run_time`. + data (ScheduledJobData): Type-specific job data. Always contains `type` and + `workspace_id`. Maps to `Schedule.kwargs`. + object (Literal["scheduled_job"]): Object type identifier. + """ + + id: ScheduledJobId + object: Literal["scheduled_job"] = "scheduled_job" + next_fire_time: UnixDatetime + data: ScheduledJobData + + @classmethod + def create( + cls, next_fire_time: UnixDatetime, data: ScheduledJobData + ) -> "ScheduledJob": + """Create a new ScheduledJob with a generated ID.""" + return cls( + id=generate_time_ordered_id("schedjob"), + next_fire_time=next_fire_time, + data=data, + ) + + @classmethod + def from_schedule(cls, schedule: Schedule) -> "ScheduledJob": + """ + Create a ScheduledJob from an APScheduler Schedule. + + Args: + schedule (Schedule): The APScheduler schedule to convert. + + Returns: + ScheduledJob: The converted scheduled job. + + Raises: + ValueError: If the schedule has no determinable `next_fire_time`. + """ + # Extract next_fire_time from schedule or trigger + next_fire_time: UnixDatetime + if schedule.next_fire_time is not None: + next_fire_time = schedule.next_fire_time + elif isinstance(schedule.trigger, DateTrigger): + next_fire_time = schedule.trigger.run_time + else: + error_msg = f"Schedule {schedule.id} has no next_fire_time" + raise ValueError(error_msg) + + # Reconstruct data from kwargs + data = MessageRerunnerData.model_validate(schedule.kwargs or {}) + + return cls( + id=schedule.id, + next_fire_time=next_fire_time, + data=data, + ) diff --git a/src/askui/chat/api/scheduled_jobs/router.py b/src/askui/chat/api/scheduled_jobs/router.py new file mode 100644 index 00000000..47b95464 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/router.py @@ -0,0 +1,75 @@ +"""API router for scheduled jobs.""" + +from typing import Annotated + +from fastapi import APIRouter, Header, status + +from askui.chat.api.dependencies import ListQueryDep +from askui.chat.api.models import ScheduledJobId, WorkspaceId +from askui.chat.api.scheduled_jobs.dependencies import ScheduledJobServiceDep +from askui.chat.api.scheduled_jobs.models import ( + MessageRerunnerData, + ScheduledJob, + ScheduledJobCreate, +) +from askui.chat.api.scheduled_jobs.service import ScheduledJobService +from askui.utils.api_utils import ListQuery, ListResponse + +router = APIRouter(prefix="/scheduled-jobs", tags=["scheduled-jobs"]) + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def create_scheduled_job( + askui_workspace: Annotated[WorkspaceId, Header()], + params: ScheduledJobCreate, + scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, +) -> ScheduledJob: + """Create a new scheduled job.""" + # Inject workspace_id into the data + data = MessageRerunnerData( + workspace_id=askui_workspace, + thread_id=params.data.thread_id, + assistant_id=params.data.assistant_id, + model=params.data.model, + message=params.data.message, + ) + + return await scheduled_job_service.create( + workspace_id=askui_workspace, + next_fire_time=params.next_fire_time, + data=data, + ) + + +@router.get("") +async def list_scheduled_jobs( + askui_workspace: Annotated[WorkspaceId, Header()], + query: ListQuery = ListQueryDep, + scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, +) -> ListResponse[ScheduledJob]: + """List scheduled jobs with optional status filter.""" + return await scheduled_job_service.list_( + workspace_id=askui_workspace, + query=query, + ) + + +@router.delete("/{job_id}", status_code=status.HTTP_204_NO_CONTENT) +async def cancel_scheduled_job( + askui_workspace: Annotated[WorkspaceId, Header()], + job_id: ScheduledJobId, + scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, +) -> None: + """ + Cancel a scheduled job. + + Only works for jobs with status 'pending'. Removes the job from the scheduler. + Cancelled jobs have no history (they are simply removed). + + Raises: + NotFoundError: If the job is not found or already executed. + """ + await scheduled_job_service.cancel( + workspace_id=askui_workspace, + job_id=job_id, + ) diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py new file mode 100644 index 00000000..53e67ebd --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -0,0 +1,50 @@ +""" +Module-level APScheduler singleton management. + +Similar to how `engine.py` manages the database engine, this module manages +the APScheduler instance as a singleton to ensure jobs persist across requests. +""" + +import logging +from typing import Any + +from apscheduler import AsyncScheduler # type: ignore[import-untyped] +from apscheduler.datastores.sqlalchemy import ( + SQLAlchemyDataStore, # type: ignore[import-untyped] +) + +from askui.chat.api.db.engine import engine + +logger = logging.getLogger(__name__) + +# Module-level singleton data store (similar to engine pattern) +_data_store: Any = SQLAlchemyDataStore(engine_or_url=engine) + +# Module-level singleton scheduler instance +# - max_concurrent_jobs=1: only one job runs at a time (sequential execution) +# - lease_duration=600s: 10 minutes grace period for missed jobs +# At module level: just create the scheduler (don't start it) +scheduler: AsyncScheduler = AsyncScheduler( + data_store=_data_store, + max_concurrent_jobs=1, +) + + +async def start_scheduler() -> None: + """ + Start the scheduler to begin processing jobs. + + This initializes the scheduler and starts it in the background so it can + poll for and execute scheduled jobs while the FastAPI application handles requests. + """ + # First initialize the scheduler via context manager entry + await scheduler.__aenter__() + # Then start background processing of jobs + await scheduler.start_in_background() + logger.info("Scheduler started in background") + + +async def shutdown_scheduler() -> None: + """Shut down the scheduler gracefully.""" + await scheduler.__aexit__(None, None, None) + logger.info("Scheduler shut down") diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py new file mode 100644 index 00000000..1ad92196 --- /dev/null +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -0,0 +1,151 @@ +"""Service for managing scheduled jobs.""" + +import logging +from datetime import timedelta +from typing import Any +from uuid import UUID + +from apscheduler import AsyncScheduler, Schedule +from apscheduler.triggers.date import DateTrigger + +from askui.chat.api.models import ScheduledJobId, WorkspaceId +from askui.chat.api.scheduled_jobs.executor import execute_job +from askui.chat.api.scheduled_jobs.models import ScheduledJob, ScheduledJobData +from askui.utils.api_utils import ListQuery, ListResponse, NotFoundError +from askui.utils.datetime_utils import UnixDatetime + +logger = logging.getLogger(__name__) + + +class ScheduledJobService: + """ + Service for managing scheduled jobs using APScheduler. + + This service provides methods to create, list, and cancel scheduled jobs. + Job data is stored in APScheduler's SQLAlchemy data store. + + Args: + scheduler (Any): The APScheduler `AsyncScheduler` instance to use. + """ + + def __init__(self, scheduler: AsyncScheduler) -> None: + self._scheduler: AsyncScheduler = scheduler + + async def create( + self, + workspace_id: WorkspaceId, # noqa: ARG002 + next_fire_time: UnixDatetime, + data: ScheduledJobData, + ) -> ScheduledJob: + """ + Create a new scheduled job. + + Args: + workspace_id (WorkspaceId): The workspace this job belongs to. + next_fire_time (UnixDatetime): When the job should execute. + data (ScheduledJobData): Type-specific job data. + + Returns: + ScheduledJob: The created scheduled job. + """ + job = ScheduledJob.create( + next_fire_time=next_fire_time, + data=data, + ) + + # Prepare kwargs for the job callback + + logger.info( + "Creating scheduled job: id=%s, type=%s, next_fire_time=%s", + job.id, + data.type, + next_fire_time, + ) + + await self._scheduler.add_schedule( + func_or_task_id=execute_job, + trigger=DateTrigger(run_time=next_fire_time), + id=job.id, + kwargs=data.model_dump(mode="json"), + misfire_grace_time=timedelta(minutes=10), + job_result_expiration_time=timedelta(weeks=30000), # Never expire + ) + + logger.info("Scheduled job created: %s", job.id) + return job + + async def list_( + self, + workspace_id: WorkspaceId, + query: ListQuery, # noqa: ARG002 + ) -> ListResponse[ScheduledJob]: + """ + List pending scheduled jobs. + + Args: + workspace_id (WorkspaceId): Filter by workspace. + query (ListQuery): Query parameters. + + Returns: + ListResponse[ScheduledJob]: Paginated list of pending scheduled jobs. + """ + jobs = await self._get_pending_jobs(workspace_id) + + # TODO(scheduled-jobs): Implement pagination + # TODO(scheduled-jobs): Implement sorting + + return ListResponse( + data=jobs, + has_more=False, + first_id=jobs[0].id if jobs else None, + last_id=jobs[-1].id if jobs else None, + ) + + async def cancel( + self, + workspace_id: WorkspaceId, + job_id: ScheduledJobId, + ) -> None: + """ + Cancel a scheduled job. + + This removes the schedule from APScheduler. Only works for pending jobs. + + Args: + workspace_id (WorkspaceId): The workspace the job belongs to. + job_id (ScheduledJobId): The job ID to cancel. + + Raises: + NotFoundError: If the job is not found or already executed. + """ + logger.info("Canceling scheduled job: %s", job_id) + + schedules: list[Any] = await self._scheduler.data_store.get_schedules({job_id}) + + if not schedules: + error_msg = f"Scheduled job {job_id} not found" + raise NotFoundError(error_msg) + + schedule: Any = schedules[0] + kwargs: dict[str, Any] = schedule.kwargs or {} + schedule_workspace_id: str | None = kwargs.get("workspace_id") + if schedule_workspace_id is None or UUID(schedule_workspace_id) != workspace_id: + error_msg = f"Scheduled job {job_id} not found" + raise NotFoundError(error_msg) + + await self._scheduler.data_store.remove_schedules([job_id]) + logger.info("Scheduled job canceled: %s", job_id) + + async def _get_pending_jobs(self, workspace_id: WorkspaceId) -> list[ScheduledJob]: + """Get pending jobs from APScheduler schedules.""" + scheduled_jobs: list[ScheduledJob] = [] + + schedules: list[Schedule] = await self._scheduler.data_store.get_schedules() + + for schedule in schedules: + scheduled_job = ScheduledJob.from_schedule(schedule) + if scheduled_job.data.workspace_id != workspace_id: + continue + scheduled_jobs.append(scheduled_job) + + return scheduled_jobs From 3bc723069afed932eb36009bed8af7c03eda93a8 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Wed, 24 Dec 2025 18:30:06 +0100 Subject: [PATCH 04/15] refactor(scheduled_jobs): remove unused query parameter from list_scheduled_jobs Eliminated the unused `query` parameter from the `list_scheduled_jobs` function in the router. Cleaned up imports in `scheduler.py` by removing type ignore comments for better clarity. Updated the `list_` method in `ScheduledJobService` to reflect the removal of the `query` parameter. --- src/askui/chat/api/scheduled_jobs/router.py | 2 -- src/askui/chat/api/scheduled_jobs/scheduler.py | 6 ++---- src/askui/chat/api/scheduled_jobs/service.py | 4 ---- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/askui/chat/api/scheduled_jobs/router.py b/src/askui/chat/api/scheduled_jobs/router.py index 47b95464..d28c826b 100644 --- a/src/askui/chat/api/scheduled_jobs/router.py +++ b/src/askui/chat/api/scheduled_jobs/router.py @@ -44,13 +44,11 @@ async def create_scheduled_job( @router.get("") async def list_scheduled_jobs( askui_workspace: Annotated[WorkspaceId, Header()], - query: ListQuery = ListQueryDep, scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, ) -> ListResponse[ScheduledJob]: """List scheduled jobs with optional status filter.""" return await scheduled_job_service.list_( workspace_id=askui_workspace, - query=query, ) diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py index 53e67ebd..008cb3f7 100644 --- a/src/askui/chat/api/scheduled_jobs/scheduler.py +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -8,10 +8,8 @@ import logging from typing import Any -from apscheduler import AsyncScheduler # type: ignore[import-untyped] -from apscheduler.datastores.sqlalchemy import ( - SQLAlchemyDataStore, # type: ignore[import-untyped] -) +from apscheduler import AsyncScheduler +from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore from askui.chat.api.db.engine import engine diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py index 1ad92196..ca761467 100644 --- a/src/askui/chat/api/scheduled_jobs/service.py +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -77,7 +77,6 @@ async def create( async def list_( self, workspace_id: WorkspaceId, - query: ListQuery, # noqa: ARG002 ) -> ListResponse[ScheduledJob]: """ List pending scheduled jobs. @@ -91,9 +90,6 @@ async def list_( """ jobs = await self._get_pending_jobs(workspace_id) - # TODO(scheduled-jobs): Implement pagination - # TODO(scheduled-jobs): Implement sorting - return ListResponse( data=jobs, has_more=False, From 3c2a95e6635934817b0809197429dd034df2da37 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 09:32:52 +0100 Subject: [PATCH 05/15] chore: update .gitignore and enhance SQLite configuration Added entries for SQLite shared memory and write-ahead logging files to `.gitignore`. Enhanced SQLite connection settings in `engine.py` by enabling WAL mode for concurrent access and setting a busy timeout to prevent database locking issues. Removed unused return type documentation in `executor.py` for clarity. --- .gitignore | 2 ++ src/askui/chat/api/db/engine.py | 4 ++++ src/askui/chat/api/scheduled_jobs/executor.py | 4 ---- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index d3158143..f2356f53 100644 --- a/.gitignore +++ b/.gitignore @@ -166,6 +166,8 @@ reports/ .DS_Store /chat /askui_chat.db +/askui_chat.db-shm +/askui_chat.db-wal .cache/ bom.json diff --git a/src/askui/chat/api/db/engine.py b/src/askui/chat/api/db/engine.py index 87931455..c39b8c81 100644 --- a/src/askui/chat/api/db/engine.py +++ b/src/askui/chat/api/db/engine.py @@ -18,4 +18,8 @@ def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 cursor = dbapi_conn.cursor() cursor.execute("PRAGMA foreign_keys=ON") + # WAL mode - allows concurrent readers + writer + cursor.execute("PRAGMA journal_mode=WAL") + # Busy timeout - prevents SQLite from hanging on locked databases + cursor.execute("PRAGMA busy_timeout=10000") # 10 seconds cursor.close() diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py index 75b1425e..2b6b201f 100644 --- a/src/askui/chat/api/scheduled_jobs/executor.py +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -30,11 +30,7 @@ async def execute_job( model (str): The model to use for the run. message (dict[str, Any]): The message data to create. **_kwargs (Any): Additional keyword arguments (ignored). - - Returns: - dict[str, Any]: Result containing the `run_id`. """ - # Validates and returns the correct concrete type based on the `type` discriminator job_data = scheduled_job_data_adapter.validate_python(_kwargs) From bd513ca1d3ccf8946ab17b1dfbdda37ccf3412f4 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 11:52:51 +0100 Subject: [PATCH 06/15] refactor(scheduled_jobs): update ScheduledJob creation to use params Refactored the `create` method in `ScheduledJob` to accept `params` of type `ScheduledJobCreate` instead of individual parameters. Updated the `create_scheduled_job` function in the router to pass `params` directly to the service. Cleaned up imports and removed unnecessary data construction in the router for improved clarity. --- src/askui/chat/api/scheduled_jobs/models.py | 12 ++++++++--- src/askui/chat/api/scheduled_jobs/router.py | 20 +++--------------- src/askui/chat/api/scheduled_jobs/service.py | 22 +++++++++----------- 3 files changed, 22 insertions(+), 32 deletions(-) diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py index 04796f1a..021ff821 100644 --- a/src/askui/chat/api/scheduled_jobs/models.py +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -123,13 +123,19 @@ class ScheduledJob(BaseModel): @classmethod def create( - cls, next_fire_time: UnixDatetime, data: ScheduledJobData + cls, workspace_id: WorkspaceId, params: ScheduledJobCreate ) -> "ScheduledJob": """Create a new ScheduledJob with a generated ID.""" return cls( id=generate_time_ordered_id("schedjob"), - next_fire_time=next_fire_time, - data=data, + next_fire_time=params.next_fire_time, + data=MessageRerunnerData( + workspace_id=workspace_id, + thread_id=params.data.thread_id, + assistant_id=params.data.assistant_id, + model=params.data.model, + message=params.data.message, + ), ) @classmethod diff --git a/src/askui/chat/api/scheduled_jobs/router.py b/src/askui/chat/api/scheduled_jobs/router.py index d28c826b..794e9474 100644 --- a/src/askui/chat/api/scheduled_jobs/router.py +++ b/src/askui/chat/api/scheduled_jobs/router.py @@ -4,16 +4,11 @@ from fastapi import APIRouter, Header, status -from askui.chat.api.dependencies import ListQueryDep from askui.chat.api.models import ScheduledJobId, WorkspaceId from askui.chat.api.scheduled_jobs.dependencies import ScheduledJobServiceDep -from askui.chat.api.scheduled_jobs.models import ( - MessageRerunnerData, - ScheduledJob, - ScheduledJobCreate, -) +from askui.chat.api.scheduled_jobs.models import ScheduledJob, ScheduledJobCreate from askui.chat.api.scheduled_jobs.service import ScheduledJobService -from askui.utils.api_utils import ListQuery, ListResponse +from askui.utils.api_utils import ListResponse router = APIRouter(prefix="/scheduled-jobs", tags=["scheduled-jobs"]) @@ -25,19 +20,10 @@ async def create_scheduled_job( scheduled_job_service: ScheduledJobService = ScheduledJobServiceDep, ) -> ScheduledJob: """Create a new scheduled job.""" - # Inject workspace_id into the data - data = MessageRerunnerData( - workspace_id=askui_workspace, - thread_id=params.data.thread_id, - assistant_id=params.data.assistant_id, - model=params.data.model, - message=params.data.message, - ) return await scheduled_job_service.create( workspace_id=askui_workspace, - next_fire_time=params.next_fire_time, - data=data, + params=params, ) diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py index ca761467..02f6b76f 100644 --- a/src/askui/chat/api/scheduled_jobs/service.py +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -10,9 +10,8 @@ from askui.chat.api.models import ScheduledJobId, WorkspaceId from askui.chat.api.scheduled_jobs.executor import execute_job -from askui.chat.api.scheduled_jobs.models import ScheduledJob, ScheduledJobData -from askui.utils.api_utils import ListQuery, ListResponse, NotFoundError -from askui.utils.datetime_utils import UnixDatetime +from askui.chat.api.scheduled_jobs.models import ScheduledJob, ScheduledJobCreate +from askui.utils.api_utils import ListResponse, NotFoundError logger = logging.getLogger(__name__) @@ -33,9 +32,8 @@ def __init__(self, scheduler: AsyncScheduler) -> None: async def create( self, - workspace_id: WorkspaceId, # noqa: ARG002 - next_fire_time: UnixDatetime, - data: ScheduledJobData, + workspace_id: WorkspaceId, + params: ScheduledJobCreate, ) -> ScheduledJob: """ Create a new scheduled job. @@ -49,8 +47,8 @@ async def create( ScheduledJob: The created scheduled job. """ job = ScheduledJob.create( - next_fire_time=next_fire_time, - data=data, + workspace_id=workspace_id, + params=params, ) # Prepare kwargs for the job callback @@ -58,15 +56,15 @@ async def create( logger.info( "Creating scheduled job: id=%s, type=%s, next_fire_time=%s", job.id, - data.type, - next_fire_time, + job.data.type, + job.next_fire_time, ) await self._scheduler.add_schedule( func_or_task_id=execute_job, - trigger=DateTrigger(run_time=next_fire_time), + trigger=DateTrigger(run_time=job.next_fire_time), id=job.id, - kwargs=data.model_dump(mode="json"), + kwargs=job.data.model_dump(mode="json"), misfire_grace_time=timedelta(minutes=10), job_result_expiration_time=timedelta(weeks=30000), # Never expire ) From 8d015032bf8de485de5faba3532e4ee550e98f25 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 12:34:45 +0100 Subject: [PATCH 07/15] feat(scheduled_jobs): add name field to MessageRerunnerData and update ScheduledJob creation Introduced a new `name` field in the `MessageRerunnerDataCreate` model to enhance the data structure. Updated the `ScheduledJob` creation process to include the `name` parameter from `params.data`, ensuring that the new field is properly utilized during job creation. --- src/askui/chat/api/scheduled_jobs/models.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py index 021ff821..4d25a0a6 100644 --- a/src/askui/chat/api/scheduled_jobs/models.py +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -54,6 +54,7 @@ class _BaseMessageRerunnerDataCreate(BaseModel): """ type: Literal["message_rerunner"] = "message_rerunner" + name: str thread_id: ThreadId assistant_id: AssistantId model: str @@ -131,6 +132,7 @@ def create( next_fire_time=params.next_fire_time, data=MessageRerunnerData( workspace_id=workspace_id, + name=params.data.name, thread_id=params.data.thread_id, assistant_id=params.data.assistant_id, model=params.data.model, From 83e743f0dd1fab33b4a0b12904feb5b02caf1b03 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 18:03:37 +0100 Subject: [PATCH 08/15] feat(scheduled_jobs): enhance job execution with ASKUI token management Updated the `execute_job` function to support the new `askui_token` field in `ScheduledJobData`. Implemented environment variable management for `ASKUI_TOKEN` and `AUTHORIZATION` headers to accommodate different authentication methods. Refactored the job execution logic to ensure proper handling of new job types and restore previous environment states after execution. Enhanced the `ScheduledJobCreate` model to include the `askui_token` parameter for authenticated API calls. --- src/askui/chat/api/scheduled_jobs/executor.py | 85 ++++++++++++------- src/askui/chat/api/scheduled_jobs/models.py | 20 ++++- .../chat/api/scheduled_jobs/scheduler.py | 1 - src/askui/chat/api/scheduled_jobs/service.py | 3 +- 4 files changed, 72 insertions(+), 37 deletions(-) diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py index 2b6b201f..c4e85251 100644 --- a/src/askui/chat/api/scheduled_jobs/executor.py +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -1,6 +1,7 @@ """Executor for scheduled job callbacks.""" import logging +import os from typing import Any from sqlalchemy.orm import Session @@ -9,10 +10,16 @@ from askui.chat.api.messages.dependencies import get_message_service from askui.chat.api.runs.dependencies import create_run_service from askui.chat.api.runs.models import RunCreate -from askui.chat.api.scheduled_jobs.models import scheduled_job_data_adapter +from askui.chat.api.scheduled_jobs.models import ( + ScheduledJobData, + scheduled_job_data_adapter, +) _logger = logging.getLogger(__name__) +_ASKUI_TOKEN_ENV_VAR = "ASKUI_TOKEN" +_AUTHORIZATION_HEADER_ENV_VAR = "ASKUI__AUTHORIZATION" + async def execute_job( **_kwargs: Any, @@ -24,11 +31,6 @@ async def execute_job( database sessions and service instances to avoid stale connections. Args: - workspace_id (str): The workspace ID (as string from JSON serialization). - thread_id (str): The thread ID for the message. - assistant_id (str): The assistant ID to run. - model (str): The model to use for the run. - message (dict[str, Any]): The message data to create. **_kwargs (Any): Additional keyword arguments (ignored). """ # Validates and returns the correct concrete type based on the `type` discriminator @@ -40,29 +42,48 @@ async def execute_job( job_data.thread_id, ) - # Create fresh session for this job execution - with Session(engine) as session: - message_service = get_message_service(session) - run_service = create_run_service(session, job_data.workspace_id) - - # Create message - message_service.create( - workspace_id=job_data.workspace_id, - thread_id=job_data.thread_id, - params=job_data.message, - ) - - # Create and execute run - _logger.debug("Creating run with assistant %s", job_data.assistant_id) - run, generator = await run_service.create( - workspace_id=job_data.workspace_id, - thread_id=job_data.thread_id, - params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model), - ) - - # Consume generator to completion - _logger.debug("Waiting for run %s to complete", run.id) - async for _event in generator: - pass - - _logger.info("Scheduled job completed: run_id=%s", run.id) + # future proofing of new job types + if isinstance(job_data, ScheduledJobData): # pyright: ignore + # Set ASKUI_TOKEN env var for API calls + _previous_token = os.environ.get(_ASKUI_TOKEN_ENV_VAR) + _previous_authorization = os.environ.get(_AUTHORIZATION_HEADER_ENV_VAR) + + # remove authorization header since previously caesr authentication was used which could be invalid now due to bearer token authentication being used instead + del os.environ[_AUTHORIZATION_HEADER_ENV_VAR] + os.environ[_ASKUI_TOKEN_ENV_VAR] = job_data.askui_token + + # Create fresh session for this job execution + try: + with Session(engine) as session: + message_service = get_message_service(session) + run_service = create_run_service(session, job_data.workspace_id) + + # Create message + message_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=job_data.message, + ) + + # Create and execute run + _logger.debug("Creating run with assistant %s", job_data.assistant_id) + run, generator = await run_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=RunCreate( + assistant_id=job_data.assistant_id, model=job_data.model + ), + ) + + # Consume generator to completion + _logger.debug("Waiting for run %s to complete", run.id) + async for _event in generator: + pass + + _logger.info("Scheduled job completed: run_id=%s", run.id) + finally: + # Restore previous ASKUI_TOKEN env var state + if _previous_token is not None: + os.environ[_ASKUI_TOKEN_ENV_VAR] = _previous_token + if _previous_authorization is not None: + os.environ[_AUTHORIZATION_HEADER_ENV_VAR] = _previous_authorization diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py index 4d25a0a6..adc3121b 100644 --- a/src/askui/chat/api/scheduled_jobs/models.py +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -51,6 +51,9 @@ class _BaseMessageRerunnerDataCreate(BaseModel): assistant_id (AssistantId): The assistant to run. model (str): The model to use for the run. message (ScheduledMessageCreate): The message to create. + askui_token (str): The AskUI token to use for authenticated API calls + when the job executes. This is a long-lived credential that doesn't + expire like Bearer tokens. """ type: Literal["message_rerunner"] = "message_rerunner" @@ -59,6 +62,7 @@ class _BaseMessageRerunnerDataCreate(BaseModel): assistant_id: AssistantId model: str message: ScheduledMessageCreate + askui_token: str class ScheduledJobCreate(BaseModel): @@ -124,9 +128,20 @@ class ScheduledJob(BaseModel): @classmethod def create( - cls, workspace_id: WorkspaceId, params: ScheduledJobCreate + cls, + workspace_id: WorkspaceId, + params: ScheduledJobCreate, ) -> "ScheduledJob": - """Create a new ScheduledJob with a generated ID.""" + """ + Create a new ScheduledJob with a generated ID. + + Args: + workspace_id (WorkspaceId): The workspace this job belongs to. + params (ScheduledJobCreate): The job creation parameters. + + Returns: + ScheduledJob: The created scheduled job. + """ return cls( id=generate_time_ordered_id("schedjob"), next_fire_time=params.next_fire_time, @@ -137,6 +152,7 @@ def create( assistant_id=params.data.assistant_id, model=params.data.model, message=params.data.message, + askui_token=params.data.askui_token, ), ) diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py index 008cb3f7..2370a0b9 100644 --- a/src/askui/chat/api/scheduled_jobs/scheduler.py +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -20,7 +20,6 @@ # Module-level singleton scheduler instance # - max_concurrent_jobs=1: only one job runs at a time (sequential execution) -# - lease_duration=600s: 10 minutes grace period for missed jobs # At module level: just create the scheduler (don't start it) scheduler: AsyncScheduler = AsyncScheduler( data_store=_data_store, diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py index 02f6b76f..a1fe46ff 100644 --- a/src/askui/chat/api/scheduled_jobs/service.py +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -40,8 +40,7 @@ async def create( Args: workspace_id (WorkspaceId): The workspace this job belongs to. - next_fire_time (UnixDatetime): When the job should execute. - data (ScheduledJobData): Type-specific job data. + params (ScheduledJobCreate): The job creation parameters. Returns: ScheduledJob: The created scheduled job. From 2c3a940a719da1c82b9dc6dbaa12b3bfc7dbdedc Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 18:04:43 +0100 Subject: [PATCH 09/15] feat(scheduled_jobs): add scheduler database support and improve SQLite configuration, get around the write lock when scheduler and Runner wants to write to the same DB Introduced a new `scheduler_url` field in `DbSettings` for APScheduler job storage, allowing separate database management. Updated the `engine.py` to utilize the new scheduler database and removed unused SQLite connection settings. Enhanced the `scheduler.py` to create a dedicated engine for the scheduler database and ensure proper SQLite configuration for foreign key support. Updated logging to use a private logger for better encapsulation. --- .gitignore | 3 +- src/askui/chat/api/db/engine.py | 8 ++--- .../chat/api/scheduled_jobs/scheduler.py | 34 +++++++++++++++---- src/askui/chat/api/settings.py | 6 +++- 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index f2356f53..297bdc1c 100644 --- a/.gitignore +++ b/.gitignore @@ -166,8 +166,7 @@ reports/ .DS_Store /chat /askui_chat.db -/askui_chat.db-shm -/askui_chat.db-wal +/askui_scheduler.db .cache/ bom.json diff --git a/src/askui/chat/api/db/engine.py b/src/askui/chat/api/db/engine.py index c39b8c81..8aa4b9be 100644 --- a/src/askui/chat/api/db/engine.py +++ b/src/askui/chat/api/db/engine.py @@ -2,7 +2,7 @@ from sqlite3 import Connection as SQLite3Connection from typing import Any -from sqlalchemy import Engine, create_engine, event +from sqlalchemy import create_engine, event from askui.chat.api.dependencies import get_settings @@ -14,12 +14,8 @@ engine = create_engine(settings.db.url, connect_args=connect_args, echo=echo) -@event.listens_for(Engine, "connect") +@event.listens_for(engine, "connect") def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 cursor = dbapi_conn.cursor() cursor.execute("PRAGMA foreign_keys=ON") - # WAL mode - allows concurrent readers + writer - cursor.execute("PRAGMA journal_mode=WAL") - # Busy timeout - prevents SQLite from hanging on locked databases - cursor.execute("PRAGMA busy_timeout=10000") # 10 seconds cursor.close() diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py index 2370a0b9..9c8cf94f 100644 --- a/src/askui/chat/api/scheduled_jobs/scheduler.py +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -6,17 +6,32 @@ """ import logging +from sqlite3 import Connection as SQLite3Connection from typing import Any from apscheduler import AsyncScheduler from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore +from sqlalchemy import create_engine, event -from askui.chat.api.db.engine import engine +from askui.chat.api.dependencies import get_settings -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) -# Module-level singleton data store (similar to engine pattern) -_data_store: Any = SQLAlchemyDataStore(engine_or_url=engine) +# Module-level settings for scheduler database +_settings = get_settings() +_connect_args = {"check_same_thread": False} +_echo = _logger.isEnabledFor(logging.DEBUG) + +# Separate engine for scheduler database +scheduler_engine = create_engine( + _settings.db.scheduler_url, + connect_args=_connect_args, + echo=_echo, +) + + +# Module-level singleton data store using separate scheduler database +_data_store: Any = SQLAlchemyDataStore(engine_or_url=scheduler_engine) # Module-level singleton scheduler instance # - max_concurrent_jobs=1: only one job runs at a time (sequential execution) @@ -27,6 +42,13 @@ ) +@event.listens_for(scheduler_engine, "connect") +def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 + cursor = dbapi_conn.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + + async def start_scheduler() -> None: """ Start the scheduler to begin processing jobs. @@ -38,10 +60,10 @@ async def start_scheduler() -> None: await scheduler.__aenter__() # Then start background processing of jobs await scheduler.start_in_background() - logger.info("Scheduler started in background") + _logger.info("Scheduler started in background") async def shutdown_scheduler() -> None: """Shut down the scheduler gracefully.""" await scheduler.__aexit__(None, None, None) - logger.info("Scheduler shut down") + _logger.info("Scheduler shut down") diff --git a/src/askui/chat/api/settings.py b/src/askui/chat/api/settings.py index 59f7a117..853b0c38 100644 --- a/src/askui/chat/api/settings.py +++ b/src/askui/chat/api/settings.py @@ -18,12 +18,16 @@ class DbSettings(BaseModel): default_factory=lambda: f"sqlite:///{(Path.cwd().absolute() / 'askui_chat.db').as_posix()}", description="Database URL for SQLAlchemy connection", ) + scheduler_url: str = Field( + default_factory=lambda: f"sqlite:///{(Path.cwd().absolute() / 'askui_scheduler.db').as_posix()}", + description="Database URL for APScheduler job storage", + ) auto_migrate: bool = Field( default=True, description="Whether to run migrations automatically on startup", ) - @field_validator("url") + @field_validator("url", "scheduler_url") @classmethod def validate_sqlite_url(cls, v: str) -> str: """Ensure only synchronous SQLite URLs are allowed.""" From 7137268e22d1a1158f92fc7cf6f7a0db8726defd Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 18:33:07 +0100 Subject: [PATCH 10/15] refactor(scheduled_jobs): update job execution logic and improve error handling Refactored the `execute_job` function to utilize a new `_execute_message_rerunner_job` helper function for better clarity and separation of concerns. Updated the handling of `MessageRerunnerData` and improved error messages for better debugging. Removed unused imports and ensured proper type hints throughout the code. Enhanced the `ScheduledJob` model to streamline the extraction of `next_fire_time` and improved the handling of workspace validation in the `ScheduledJobService` class. --- src/askui/chat/api/runs/dependencies.py | 2 +- .../chat/api/scheduled_jobs/dependencies.py | 2 - src/askui/chat/api/scheduled_jobs/executor.py | 86 ++++++++++--------- src/askui/chat/api/scheduled_jobs/models.py | 13 +-- src/askui/chat/api/scheduled_jobs/service.py | 20 ++--- 5 files changed, 62 insertions(+), 61 deletions(-) diff --git a/src/askui/chat/api/runs/dependencies.py b/src/askui/chat/api/runs/dependencies.py index a03a8710..7dd5085f 100644 --- a/src/askui/chat/api/runs/dependencies.py +++ b/src/askui/chat/api/runs/dependencies.py @@ -62,7 +62,7 @@ def create_run_service(session: Session, workspace_id: UUID4) -> RunService: Create a RunService with all required dependencies manually. Use this function when you need a `RunService` outside of FastAPI's - dependency injection context (e.g., background tasks, APScheduler callbacks). + dependency injection context (e.g. APScheduler callbacks). Args: session (Session): Database session. diff --git a/src/askui/chat/api/scheduled_jobs/dependencies.py b/src/askui/chat/api/scheduled_jobs/dependencies.py index 9adf7f9d..bc835878 100644 --- a/src/askui/chat/api/scheduled_jobs/dependencies.py +++ b/src/askui/chat/api/scheduled_jobs/dependencies.py @@ -1,7 +1,5 @@ """FastAPI dependencies for scheduled jobs.""" -from typing import Any - from fastapi import Depends from askui.chat.api.scheduled_jobs.scheduler import scheduler diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py index c4e85251..6e6e6c74 100644 --- a/src/askui/chat/api/scheduled_jobs/executor.py +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -11,7 +11,7 @@ from askui.chat.api.runs.dependencies import create_run_service from askui.chat.api.runs.models import RunCreate from askui.chat.api.scheduled_jobs.models import ( - ScheduledJobData, + MessageRerunnerData, scheduled_job_data_adapter, ) @@ -43,47 +43,55 @@ async def execute_job( ) # future proofing of new job types - if isinstance(job_data, ScheduledJobData): # pyright: ignore - # Set ASKUI_TOKEN env var for API calls + if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance] + # Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars _previous_token = os.environ.get(_ASKUI_TOKEN_ENV_VAR) _previous_authorization = os.environ.get(_AUTHORIZATION_HEADER_ENV_VAR) - # remove authorization header since previously caesr authentication was used which could be invalid now due to bearer token authentication being used instead + # remove authorization header since it takes precedence over the token and is set when forwarding bearer token del os.environ[_AUTHORIZATION_HEADER_ENV_VAR] os.environ[_ASKUI_TOKEN_ENV_VAR] = job_data.askui_token - # Create fresh session for this job execution - try: - with Session(engine) as session: - message_service = get_message_service(session) - run_service = create_run_service(session, job_data.workspace_id) - - # Create message - message_service.create( - workspace_id=job_data.workspace_id, - thread_id=job_data.thread_id, - params=job_data.message, - ) - - # Create and execute run - _logger.debug("Creating run with assistant %s", job_data.assistant_id) - run, generator = await run_service.create( - workspace_id=job_data.workspace_id, - thread_id=job_data.thread_id, - params=RunCreate( - assistant_id=job_data.assistant_id, model=job_data.model - ), - ) - - # Consume generator to completion - _logger.debug("Waiting for run %s to complete", run.id) - async for _event in generator: - pass - - _logger.info("Scheduled job completed: run_id=%s", run.id) - finally: - # Restore previous ASKUI_TOKEN env var state - if _previous_token is not None: - os.environ[_ASKUI_TOKEN_ENV_VAR] = _previous_token - if _previous_authorization is not None: - os.environ[_AUTHORIZATION_HEADER_ENV_VAR] = _previous_authorization + await _execute_message_rerunner_job(job_data) + + # Restore previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars + if _previous_token is not None: + os.environ[_ASKUI_TOKEN_ENV_VAR] = _previous_token + if _previous_authorization is not None: + os.environ[_AUTHORIZATION_HEADER_ENV_VAR] = _previous_authorization + + +async def _execute_message_rerunner_job( + job_data: MessageRerunnerData, +) -> None: + """ + Execute a message rerunner job. + + Args: + job_data: The job data. + """ + with Session(engine) as session: + message_service = get_message_service(session) + run_service = create_run_service(session, job_data.workspace_id) + + # Create message + message_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=job_data.message, + ) + + # Create and execute run + _logger.debug("Creating run with assistant %s", job_data.assistant_id) + run, generator = await run_service.create( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model), + ) + + # Consume generator to completion + _logger.debug("Waiting for run %s to complete", run.id) + async for _event in generator: + pass + + _logger.info("Scheduled job completed: run_id=%s", run.id) diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py index adc3121b..e0bb686a 100644 --- a/src/askui/chat/api/scheduled_jobs/models.py +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -171,20 +171,15 @@ def from_schedule(cls, schedule: Schedule) -> "ScheduledJob": ValueError: If the schedule has no determinable `next_fire_time`. """ # Extract next_fire_time from schedule or trigger - next_fire_time: UnixDatetime - if schedule.next_fire_time is not None: - next_fire_time = schedule.next_fire_time - elif isinstance(schedule.trigger, DateTrigger): - next_fire_time = schedule.trigger.run_time - else: - error_msg = f"Schedule {schedule.id} has no next_fire_time" - raise ValueError(error_msg) + if schedule.next_fire_time is None: + msg = f"Schedule {schedule.id} has no next_fire_time" + raise ValueError(msg) # Reconstruct data from kwargs data = MessageRerunnerData.model_validate(schedule.kwargs or {}) return cls( id=schedule.id, - next_fire_time=next_fire_time, + next_fire_time=schedule.next_fire_time, data=data, ) diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py index a1fe46ff..91b0f79a 100644 --- a/src/askui/chat/api/scheduled_jobs/service.py +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -113,18 +113,18 @@ async def cancel( """ logger.info("Canceling scheduled job: %s", job_id) - schedules: list[Any] = await self._scheduler.data_store.get_schedules({job_id}) + schedules: list[Schedule] = await self._scheduler.data_store.get_schedules( + {job_id} + ) if not schedules: - error_msg = f"Scheduled job {job_id} not found" - raise NotFoundError(error_msg) - - schedule: Any = schedules[0] - kwargs: dict[str, Any] = schedule.kwargs or {} - schedule_workspace_id: str | None = kwargs.get("workspace_id") - if schedule_workspace_id is None or UUID(schedule_workspace_id) != workspace_id: - error_msg = f"Scheduled job {job_id} not found" - raise NotFoundError(error_msg) + msg = f"Scheduled job {job_id} not found" + raise NotFoundError(msg) + + scheduled_job = ScheduledJob.from_schedule(schedules[0]) + if scheduled_job.data.workspace_id != workspace_id: + msg = f"Scheduled job {job_id} not found in workspace {workspace_id}" + raise NotFoundError(msg) await self._scheduler.data_store.remove_schedules([job_id]) logger.info("Scheduled job canceled: %s", job_id) From b5b5872c1b79df0dab0299c8c9c4b09ac258cd7d Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 18:48:51 +0100 Subject: [PATCH 11/15] refactor: remove functools.cache decorator from create_api_client function for unauthorized issues after token expiry --- src/askui/models/anthropic/factory.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/askui/models/anthropic/factory.py b/src/askui/models/anthropic/factory.py index 4b81102d..27bd516f 100644 --- a/src/askui/models/anthropic/factory.py +++ b/src/askui/models/anthropic/factory.py @@ -1,4 +1,3 @@ -import functools from typing import Literal from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex @@ -9,7 +8,6 @@ AnthropicApiClient = Anthropic | AnthropicBedrock | AnthropicVertex -@functools.cache def create_api_client( api_provider: AnthropicApiProvider, ) -> AnthropicApiClient: From 93b1f5f15f97c20b3667eb73188225d50069cd1e Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Tue, 30 Dec 2025 20:42:13 +0100 Subject: [PATCH 12/15] refactor(scheduled_jobs): enhance job execution and token management Updated the `execute_job` function to improve the handling of `askui_token` by utilizing `SecretStr` for better security. Refactored the logic to manage the `ASKUI__AUTHORIZATION` environment variable, ensuring proper encoding of the token. Enhanced the extraction of `next_fire_time` in the `ScheduledJob` model for clearer error handling. Updated the `ScheduledJobService` to pass the decoded token securely during job execution. This refactor aims to streamline job execution and improve overall code clarity. --- src/askui/chat/api/scheduled_jobs/executor.py | 18 +++++++----------- src/askui/chat/api/scheduled_jobs/models.py | 18 +++++++++++------- src/askui/chat/api/scheduled_jobs/scheduler.py | 2 ++ src/askui/chat/api/scheduled_jobs/service.py | 7 ++++--- 4 files changed, 24 insertions(+), 21 deletions(-) diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py index 6e6e6c74..81c13590 100644 --- a/src/askui/chat/api/scheduled_jobs/executor.py +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -1,5 +1,6 @@ """Executor for scheduled job callbacks.""" +import base64 import logging import os from typing import Any @@ -17,9 +18,6 @@ _logger = logging.getLogger(__name__) -_ASKUI_TOKEN_ENV_VAR = "ASKUI_TOKEN" -_AUTHORIZATION_HEADER_ENV_VAR = "ASKUI__AUTHORIZATION" - async def execute_job( **_kwargs: Any, @@ -45,20 +43,18 @@ async def execute_job( # future proofing of new job types if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance] # Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars - _previous_token = os.environ.get(_ASKUI_TOKEN_ENV_VAR) - _previous_authorization = os.environ.get(_AUTHORIZATION_HEADER_ENV_VAR) + _previous_authorization = os.environ.get("ASKUI__AUTHORIZATION") # remove authorization header since it takes precedence over the token and is set when forwarding bearer token - del os.environ[_AUTHORIZATION_HEADER_ENV_VAR] - os.environ[_ASKUI_TOKEN_ENV_VAR] = job_data.askui_token + os.environ["ASKUI__AUTHORIZATION"] = ( + f"Basic {base64.b64encode(job_data.askui_token.get_secret_value().encode()).decode()}" + ) await _execute_message_rerunner_job(job_data) - # Restore previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars - if _previous_token is not None: - os.environ[_ASKUI_TOKEN_ENV_VAR] = _previous_token + # Restore previous AUTHORIZATION_HEADER env var if _previous_authorization is not None: - os.environ[_AUTHORIZATION_HEADER_ENV_VAR] = _previous_authorization + os.environ["ASKUI__AUTHORIZATION"] = _previous_authorization async def _execute_message_rerunner_job( diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py index e0bb686a..b49ad32a 100644 --- a/src/askui/chat/api/scheduled_jobs/models.py +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -2,7 +2,7 @@ from apscheduler import Schedule from apscheduler.triggers.date import DateTrigger -from pydantic import BaseModel, Field, TypeAdapter +from pydantic import BaseModel, Field, SecretStr, TypeAdapter from askui.chat.api.messages.models import ROOT_MESSAGE_PARENT_ID, MessageCreate from askui.chat.api.models import ( @@ -62,7 +62,7 @@ class _BaseMessageRerunnerDataCreate(BaseModel): assistant_id: AssistantId model: str message: ScheduledMessageCreate - askui_token: str + askui_token: SecretStr class ScheduledJobCreate(BaseModel): @@ -171,15 +171,19 @@ def from_schedule(cls, schedule: Schedule) -> "ScheduledJob": ValueError: If the schedule has no determinable `next_fire_time`. """ # Extract next_fire_time from schedule or trigger - if schedule.next_fire_time is None: - msg = f"Schedule {schedule.id} has no next_fire_time" - raise ValueError(msg) - + next_fire_time: UnixDatetime + if schedule.next_fire_time is not None: + next_fire_time = schedule.next_fire_time + elif isinstance(schedule.trigger, DateTrigger): + next_fire_time = schedule.trigger.run_time + else: + error_msg = f"Schedule {schedule.id} has no next_fire_time" + raise ValueError(error_msg) # Reconstruct data from kwargs data = MessageRerunnerData.model_validate(schedule.kwargs or {}) return cls( id=schedule.id, - next_fire_time=schedule.next_fire_time, + next_fire_time=next_fire_time, data=data, ) diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py index 9c8cf94f..242b6bde 100644 --- a/src/askui/chat/api/scheduled_jobs/scheduler.py +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -6,6 +6,7 @@ """ import logging +from datetime import timedelta from sqlite3 import Connection as SQLite3Connection from typing import Any @@ -39,6 +40,7 @@ scheduler: AsyncScheduler = AsyncScheduler( data_store=_data_store, max_concurrent_jobs=1, + cleanup_interval=timedelta(minutes=1), # Cleanup every minute ) diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py index 91b0f79a..6ea3c113 100644 --- a/src/askui/chat/api/scheduled_jobs/service.py +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -2,8 +2,6 @@ import logging from datetime import timedelta -from typing import Any -from uuid import UUID from apscheduler import AsyncScheduler, Schedule from apscheduler.triggers.date import DateTrigger @@ -63,7 +61,10 @@ async def create( func_or_task_id=execute_job, trigger=DateTrigger(run_time=job.next_fire_time), id=job.id, - kwargs=job.data.model_dump(mode="json"), + kwargs={ + **job.data.model_dump(mode="json"), + "askui_token": job.data.askui_token.get_secret_value(), + }, misfire_grace_time=timedelta(minutes=10), job_result_expiration_time=timedelta(weeks=30000), # Never expire ) From 927e64151dc1891df06b27b0aa285677b17568a5 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Fri, 2 Jan 2026 10:03:25 +0100 Subject: [PATCH 13/15] refactor(scheduled_jobs): update execute_job to return ScheduledJobExecutionResult Modified the `execute_job` function to return a `ScheduledJobExecutionResult`, encapsulating job data and optional error messages. Enhanced error handling during job execution, ensuring that the previous `ASKUI__AUTHORIZATION` environment variable is restored after execution. Improved documentation for the function's parameters and return type to clarify its behavior and expected output. --- src/askui/chat/api/scheduled_jobs/executor.py | 59 ++++++++++++++----- src/askui/chat/api/scheduled_jobs/models.py | 16 +++++ 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/askui/chat/api/scheduled_jobs/executor.py b/src/askui/chat/api/scheduled_jobs/executor.py index 81c13590..baff6100 100644 --- a/src/askui/chat/api/scheduled_jobs/executor.py +++ b/src/askui/chat/api/scheduled_jobs/executor.py @@ -13,6 +13,7 @@ from askui.chat.api.runs.models import RunCreate from askui.chat.api.scheduled_jobs.models import ( MessageRerunnerData, + ScheduledJobExecutionResult, scheduled_job_data_adapter, ) @@ -21,7 +22,7 @@ async def execute_job( **_kwargs: Any, -) -> None: +) -> ScheduledJobExecutionResult: """ APScheduler callback that creates fresh services and executes the job. @@ -29,7 +30,10 @@ async def execute_job( database sessions and service instances to avoid stale connections. Args: - **_kwargs (Any): Additional keyword arguments (ignored). + **_kwargs (Any): Keyword arguments containing job data. + + Returns: + ScheduledJobExecutionResult: The result containing job data and optional error. """ # Validates and returns the correct concrete type based on the `type` discriminator job_data = scheduled_job_data_adapter.validate_python(_kwargs) @@ -40,21 +44,31 @@ async def execute_job( job_data.thread_id, ) - # future proofing of new job types - if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance] - # Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars - _previous_authorization = os.environ.get("ASKUI__AUTHORIZATION") + error: str | None = None - # remove authorization header since it takes precedence over the token and is set when forwarding bearer token - os.environ["ASKUI__AUTHORIZATION"] = ( - f"Basic {base64.b64encode(job_data.askui_token.get_secret_value().encode()).decode()}" - ) + try: + # future proofing of new job types + if isinstance(job_data, MessageRerunnerData): # pyright: ignore[reportUnnecessaryIsInstance] + # Save previous ASKUI_TOKEN and AUTHORIZATION_HEADER env vars + _previous_authorization = os.environ.get("ASKUI__AUTHORIZATION") - await _execute_message_rerunner_job(job_data) + # remove authorization header since it takes precedence over the token and is set when forwarding bearer token + os.environ["ASKUI__AUTHORIZATION"] = ( + f"Basic {base64.b64encode(job_data.askui_token.get_secret_value().encode()).decode()}" + ) - # Restore previous AUTHORIZATION_HEADER env var - if _previous_authorization is not None: - os.environ["ASKUI__AUTHORIZATION"] = _previous_authorization + try: + await _execute_message_rerunner_job(job_data) + finally: + # Restore previous AUTHORIZATION_HEADER env var + if _previous_authorization is not None: + os.environ["ASKUI__AUTHORIZATION"] = _previous_authorization + except Exception as e: + error = f"{type(e).__name__}: {e}" + _logger.exception("Scheduled job failed: %s", error) + + # Always return job data with optional error + return ScheduledJobExecutionResult(data=job_data, error=error) async def _execute_message_rerunner_job( @@ -85,9 +99,24 @@ async def _execute_message_rerunner_job( params=RunCreate(assistant_id=job_data.assistant_id, model=job_data.model), ) - # Consume generator to completion + # Consume generator to completion of run _logger.debug("Waiting for run %s to complete", run.id) async for _event in generator: pass + # Check if run completed with error + completed_run = run_service.retrieve( + workspace_id=job_data.workspace_id, + thread_id=job_data.thread_id, + run_id=run.id, + ) + + if completed_run.status == "failed": + error_message = ( + completed_run.last_error.message + if completed_run.last_error + else "Run failed with unknown error" + ) + raise RuntimeError(error_message) + _logger.info("Scheduled job completed: run_id=%s", run.id) diff --git a/src/askui/chat/api/scheduled_jobs/models.py b/src/askui/chat/api/scheduled_jobs/models.py index b49ad32a..c220e9d0 100644 --- a/src/askui/chat/api/scheduled_jobs/models.py +++ b/src/askui/chat/api/scheduled_jobs/models.py @@ -187,3 +187,19 @@ def from_schedule(cls, schedule: Schedule) -> "ScheduledJob": next_fire_time=next_fire_time, data=data, ) + + +class ScheduledJobExecutionResult(BaseModel): + """ + Return value stored by the job executor in APScheduler's job result. + + This ensures we always have job data available even if the job fails, + since APScheduler clears return_value on exception. + + Args: + data (ScheduledJobData): The job data that was executed. + error (str | None): Error message if the job failed. + """ + + data: ScheduledJobData + error: str | None = None From e015c8b8287660065803079adcec9f9fc4780e86 Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Mon, 5 Jan 2026 14:00:14 +0100 Subject: [PATCH 14/15] refactor(settings, engine): optimize SQLite configuration and update database settings Refactored the `DbSettings` class to consolidate the database URL for SQLAlchemy connections, removing the separate `scheduler_url` field. Updated the database engine configuration in `engine.py` to utilize optimized SQLite pragmas for better performance and concurrency. Adjusted the `scheduler.py` to use the shared engine, ensuring that APScheduler operates with the same database settings. Enhanced the `.gitignore` to include SQLite shared memory and write-ahead logging files for improved project cleanliness. --- .gitignore | 3 +- src/askui/chat/api/db/engine.py | 34 +++++++++++++++---- .../chat/api/scheduled_jobs/scheduler.py | 32 ++++------------- src/askui/chat/api/settings.py | 8 ++--- 4 files changed, 38 insertions(+), 39 deletions(-) diff --git a/.gitignore b/.gitignore index 297bdc1c..f2356f53 100644 --- a/.gitignore +++ b/.gitignore @@ -166,7 +166,8 @@ reports/ .DS_Store /chat /askui_chat.db -/askui_scheduler.db +/askui_chat.db-shm +/askui_chat.db-wal .cache/ bom.json diff --git a/src/askui/chat/api/db/engine.py b/src/askui/chat/api/db/engine.py index 8aa4b9be..53d7d3d8 100644 --- a/src/askui/chat/api/db/engine.py +++ b/src/askui/chat/api/db/engine.py @@ -6,16 +6,36 @@ from askui.chat.api.dependencies import get_settings -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) -settings = get_settings() -connect_args = {"check_same_thread": False} -echo = logger.isEnabledFor(logging.DEBUG) -engine = create_engine(settings.db.url, connect_args=connect_args, echo=echo) +_settings = get_settings() +_connect_args = {"check_same_thread": False} +_echo = _logger.isEnabledFor(logging.DEBUG) + +# Create engine with optimized settings +engine = create_engine( + _settings.db.url, + connect_args=_connect_args, + echo=_echo, +) @event.listens_for(engine, "connect") -def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 +def _set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 + """ + Configure SQLite pragmas for optimal web application performance. + + Applied on each new connection: + - foreign_keys=ON: Enable foreign key constraint enforcement + - journal_mode=WAL: Write-Ahead Logging for better concurrency (readers don't block writers) + - synchronous=NORMAL: Sync every 1000 pages instead of every write (faster, still durable with WAL) + - busy_timeout=30000: Wait up to 30 seconds for locks instead of failing immediately + """ cursor = dbapi_conn.cursor() - cursor.execute("PRAGMA foreign_keys=ON") + + cursor.execute("PRAGMA foreign_keys = ON") + cursor.execute("PRAGMA journal_mode = WAL") + cursor.execute("PRAGMA synchronous = NORMAL") + cursor.execute("PRAGMA busy_timeout = 30000") + cursor.close() diff --git a/src/askui/chat/api/scheduled_jobs/scheduler.py b/src/askui/chat/api/scheduled_jobs/scheduler.py index 242b6bde..aa3d2afb 100644 --- a/src/askui/chat/api/scheduled_jobs/scheduler.py +++ b/src/askui/chat/api/scheduled_jobs/scheduler.py @@ -3,36 +3,25 @@ Similar to how `engine.py` manages the database engine, this module manages the APScheduler instance as a singleton to ensure jobs persist across requests. + +Uses the shared database engine from `engine.py` which is configured with +optimized SQLite pragmas for concurrent access (WAL mode, etc.). """ import logging from datetime import timedelta -from sqlite3 import Connection as SQLite3Connection from typing import Any from apscheduler import AsyncScheduler from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore -from sqlalchemy import create_engine, event -from askui.chat.api.dependencies import get_settings +from askui.chat.api.db.engine import engine _logger = logging.getLogger(__name__) -# Module-level settings for scheduler database -_settings = get_settings() -_connect_args = {"check_same_thread": False} -_echo = _logger.isEnabledFor(logging.DEBUG) - -# Separate engine for scheduler database -scheduler_engine = create_engine( - _settings.db.scheduler_url, - connect_args=_connect_args, - echo=_echo, -) - - -# Module-level singleton data store using separate scheduler database -_data_store: Any = SQLAlchemyDataStore(engine_or_url=scheduler_engine) +# Use shared engine from db/engine.py (already configured with SQLite pragmas) +# APScheduler will create its own tables (apscheduler_*) in the same database +_data_store: Any = SQLAlchemyDataStore(engine_or_url=engine) # Module-level singleton scheduler instance # - max_concurrent_jobs=1: only one job runs at a time (sequential execution) @@ -44,13 +33,6 @@ ) -@event.listens_for(scheduler_engine, "connect") -def set_sqlite_pragma(dbapi_conn: SQLite3Connection, connection_record: Any) -> None: # noqa: ARG001 - cursor = dbapi_conn.cursor() - cursor.execute("PRAGMA foreign_keys=ON") - cursor.close() - - async def start_scheduler() -> None: """ Start the scheduler to begin processing jobs. diff --git a/src/askui/chat/api/settings.py b/src/askui/chat/api/settings.py index 853b0c38..c2823984 100644 --- a/src/askui/chat/api/settings.py +++ b/src/askui/chat/api/settings.py @@ -16,18 +16,14 @@ class DbSettings(BaseModel): url: str = Field( default_factory=lambda: f"sqlite:///{(Path.cwd().absolute() / 'askui_chat.db').as_posix()}", - description="Database URL for SQLAlchemy connection", - ) - scheduler_url: str = Field( - default_factory=lambda: f"sqlite:///{(Path.cwd().absolute() / 'askui_scheduler.db').as_posix()}", - description="Database URL for APScheduler job storage", + description="Database URL for SQLAlchemy connection (used for all data including scheduler)", ) auto_migrate: bool = Field( default=True, description="Whether to run migrations automatically on startup", ) - @field_validator("url", "scheduler_url") + @field_validator("url") @classmethod def validate_sqlite_url(cls, v: str) -> str: """Ensure only synchronous SQLite URLs are allowed.""" From 61eed2e3e4b8fd8e752e8b3c3555325232b0defc Mon Sep 17 00:00:00 2001 From: danyalxahid-askui Date: Mon, 5 Jan 2026 14:00:32 +0100 Subject: [PATCH 15/15] refactor(scheduled_jobs): sort scheduled jobs by next fire time Updated the `ScheduledJobService` to return a sorted list of scheduled jobs based on their `next_fire_time`. This change enhances the order of job execution, ensuring that jobs are processed in a timely manner. --- src/askui/chat/api/scheduled_jobs/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/askui/chat/api/scheduled_jobs/service.py b/src/askui/chat/api/scheduled_jobs/service.py index 6ea3c113..37d2e0f2 100644 --- a/src/askui/chat/api/scheduled_jobs/service.py +++ b/src/askui/chat/api/scheduled_jobs/service.py @@ -142,4 +142,4 @@ async def _get_pending_jobs(self, workspace_id: WorkspaceId) -> list[ScheduledJo continue scheduled_jobs.append(scheduled_job) - return scheduled_jobs + return sorted(scheduled_jobs, key=lambda x: x.next_fire_time)