From 0ccab826bb11e6e047cc7da4a73e21c2d7ab6ff3 Mon Sep 17 00:00:00 2001 From: FBLGit Date: Tue, 23 Sep 2025 16:45:55 +0800 Subject: [PATCH] feat: unassign tasks --- apps/server/src/handlers/task/index.ts | 1 + .../handlers/task/task.unassign.handler.ts | 114 +++++++++++++++ apps/server/src/schemas/task.schema.ts | 14 ++ apps/web/src/components/TaskCard.tsx | 13 ++ apps/web/src/components/TaskDetailModal.tsx | 2 + apps/web/src/components/TaskKanban.tsx | 14 ++ docs/docs/api/index.md | 2 + docs/docs/api/task/unassign.md | 131 ++++++++++++++++++ 8 files changed, 291 insertions(+) create mode 100644 apps/server/src/handlers/task/task.unassign.handler.ts create mode 100644 docs/docs/api/task/unassign.md diff --git a/apps/server/src/handlers/task/index.ts b/apps/server/src/handlers/task/index.ts index 9dc87cf..75edf68 100644 --- a/apps/server/src/handlers/task/index.ts +++ b/apps/server/src/handlers/task/index.ts @@ -2,6 +2,7 @@ export * from "./task.create.handler"; export * from "./task.update.handler"; export * from "./task.assign.handler"; // Backward compat wrapper +export * from "./task.unassign.handler"; // NEW unassign handler export * from "./task.claim.handler"; // NEW pull model export * from "./task.complete.handler"; export * from "./task.list.handler"; // NEW list/query handler diff --git a/apps/server/src/handlers/task/task.unassign.handler.ts b/apps/server/src/handlers/task/task.unassign.handler.ts new file mode 100644 index 0000000..aab949f --- /dev/null +++ b/apps/server/src/handlers/task/task.unassign.handler.ts @@ -0,0 +1,114 @@ +import { EventHandler, Instrumented, Resilient } from "@/core/decorator"; +import type { EventContext } from "@/core/context"; +import { taskUnassignInput, taskUnassignOutput } from "@/schemas/task.schema"; +import type { TaskUnassignInput, TaskUnassignOutput } from "@/schemas/task.schema"; +import { redisKey } from "@/core/redis"; + +@EventHandler({ + event: "task.unassign", + inputSchema: taskUnassignInput, + outputSchema: taskUnassignOutput, + persist: true, + rateLimit: 20, + description: "Remove assignment from a task", +}) +export class TaskUnassignHandler { + @Instrumented(0) // No caching - this operation changes state + @Resilient({ + rateLimit: { limit: 20, windowMs: 60000 }, // 20 requests per minute + timeout: 5000, // 5 second timeout + circuitBreaker: { + threshold: 5, + timeout: 30000, + fallback: () => { + throw new Error("Task unassignment service temporarily unavailable"); + } + } + }) + async handle(input: TaskUnassignInput, ctx: EventContext): Promise { + const taskKey = redisKey("task", input.taskId); + + // Verify task exists + const taskData = await ctx.redis.stream.hgetall(taskKey); + if (!taskData || Object.keys(taskData).length === 0) { + throw new Error(`Task not found: ${input.taskId}`); + } + + // Get previous assignment + const previousAssignment = taskData.assignedTo || null; + + if (!previousAssignment) { + throw new Error(`Task ${input.taskId} is not currently assigned`); + } + + // Check task status - don't unassign completed or failed tasks + const currentStatus = taskData.status; + if (currentStatus === "completed" || currentStatus === "failed") { + throw new Error(`Cannot unassign task ${input.taskId} with status ${currentStatus}`); + } + + const now = new Date().toISOString(); + + // Remove assignment from task + await ctx.redis.stream.hdel(taskKey, "assignedTo", "assignedAt"); + + // Update task status back to pending and update timestamp + await ctx.redis.stream.hset(taskKey, { + status: "pending", + updatedAt: now, + }); + + // Add back to pending queue with original priority + const pendingQueueKey = redisKey("queue", "tasks", "pending"); + const priority = Number(taskData.priority) || 50; + const score = Date.now() - (priority * 1000); // Higher priority = lower score + await ctx.redis.stream.zadd(pendingQueueKey, score, input.taskId); + + // Remove from instance queue if present + const instanceQueueKey = redisKey("queue", "instance", previousAssignment); + await ctx.redis.stream.lrem(instanceQueueKey, 0, input.taskId); + + // Track unassignment in history + const historyKey = redisKey("history", "task", input.taskId, "assignments"); + await ctx.redis.stream.rpush(historyKey, JSON.stringify({ + instanceId: previousAssignment, + action: "unassigned", + unassignedAt: now, + unassignedBy: ctx.instanceId + })); + + // Update instance metrics + const instanceTasksKey = redisKey("metrics", "instance", previousAssignment); + await ctx.redis.stream.hincrby(instanceTasksKey, "assignedTasks", -1); + + // Persist to PostgreSQL if configured + if (ctx.persist) { + await ctx.prisma.task.update({ + where: { id: input.taskId }, + data: { + assignedTo: null, + status: "pending", + }, + }); + } + + // Publish event + await ctx.publish({ + type: "task.unassigned", + payload: { + taskId: input.taskId, + previousAssignment, + }, + metadata: { + unassignedBy: ctx.instanceId, + unassignedAt: now, + }, + }); + + return { + taskId: input.taskId, + previousAssignment, + unassignedAt: now, + }; + } +} \ No newline at end of file diff --git a/apps/server/src/schemas/task.schema.ts b/apps/server/src/schemas/task.schema.ts index 83a8a1c..829e1f6 100644 --- a/apps/server/src/schemas/task.schema.ts +++ b/apps/server/src/schemas/task.schema.ts @@ -271,6 +271,20 @@ export const taskDeleteOutput = z.object({ export type TaskDeleteInput = z.infer; export type TaskDeleteOutput = z.infer; +// task.unassign +export const taskUnassignInput = z.object({ + taskId: z.string().min(1), +}); + +export const taskUnassignOutput = z.object({ + taskId: z.string(), + previousAssignment: z.string().nullable(), + unassignedAt: z.string().datetime(), +}); + +export type TaskUnassignInput = z.infer; +export type TaskUnassignOutput = z.infer; + // task.context export const taskContextInput = z.object({ taskId: z.string().min(1), diff --git a/apps/web/src/components/TaskCard.tsx b/apps/web/src/components/TaskCard.tsx index 24c44de..6b921a0 100644 --- a/apps/web/src/components/TaskCard.tsx +++ b/apps/web/src/components/TaskCard.tsx @@ -57,6 +57,7 @@ interface TaskCardProps { onUpdate?: (taskId: string, updates: any) => void; onComplete?: (taskId: string) => void; onAssign?: (taskId: string, instanceId: string) => void; + onUnassign?: (taskId: string) => void; onDelete?: (taskId: string) => void; onGenerateContext?: (taskId: string) => void; onClick?: (task: Task) => void; @@ -69,6 +70,7 @@ export function TaskCard({ onUpdate, onComplete, onAssign, + onUnassign, onDelete, onGenerateContext, onClick, @@ -210,6 +212,17 @@ export function TaskCard({ Generate Context )} + {task.assignedTo && onUnassign && ( + { + e.stopPropagation(); + onUnassign(task.id); + }} + > + + Unassign from {task.assignedTo} + + )} {instances.length > 0 && onAssign && ( <> diff --git a/apps/web/src/components/TaskDetailModal.tsx b/apps/web/src/components/TaskDetailModal.tsx index 6ab57df..6ff5b6e 100644 --- a/apps/web/src/components/TaskDetailModal.tsx +++ b/apps/web/src/components/TaskDetailModal.tsx @@ -63,6 +63,7 @@ interface TaskDetailModalProps { onComplete?: (taskId: string) => void; onDelete?: (taskId: string) => void; onAssign?: (taskId: string, instanceId: string) => void; + onUnassign?: (taskId: string) => void; instances?: Array<{ id: string; roles: string[]; status?: string; health?: string }>; } @@ -74,6 +75,7 @@ export function TaskDetailModal({ onComplete, onDelete, onAssign, + onUnassign, instances = [], }: TaskDetailModalProps) { if (!task) return null; diff --git a/apps/web/src/components/TaskKanban.tsx b/apps/web/src/components/TaskKanban.tsx index 5fcb672..584f7db 100644 --- a/apps/web/src/components/TaskKanban.tsx +++ b/apps/web/src/components/TaskKanban.tsx @@ -221,6 +221,7 @@ export function TaskKanban({ className }: TaskKanbanProps) { const completeTaskMutation = useCompleteTask(); const deleteTaskMutation = useDeleteTask(); const assignTaskMutation = useEventMutation("task.assign"); + const unassignTaskMutation = useEventMutation("task.unassign"); const generateContextMutation = useGenerateContext(); // WebSocket connection for real-time updates @@ -587,6 +588,17 @@ export function TaskKanban({ className }: TaskKanbanProps) { } }; + const handleTaskUnassign = async (taskId: string) => { + try { + await unassignTaskMutation.mutateAsync({ + taskId, + }); + await refetchState(); + } catch (error) { + console.error("Failed to unassign task:", error); + } + }; + const handleGenerateContext = (taskId: string) => { const task = tasks.find(t => t.id === taskId); if (task) { @@ -900,6 +912,7 @@ export function TaskKanban({ className }: TaskKanbanProps) { onDelete={handleTaskDelete} onGenerateContext={handleGenerateContext} onAssign={handleTaskAssign} + onUnassign={handleTaskUnassign} onClick={handleTaskClick} instances={instances} /> @@ -976,6 +989,7 @@ export function TaskKanban({ className }: TaskKanbanProps) { onComplete={handleTaskComplete} onDelete={handleTaskDelete} onAssign={handleTaskAssign} + onUnassign={handleTaskUnassign} instances={instances} /> diff --git a/docs/docs/api/index.md b/docs/docs/api/index.md index 477f17a..237e53f 100644 --- a/docs/docs/api/index.md +++ b/docs/docs/api/index.md @@ -37,6 +37,8 @@ ClaudeBench organizes events into domains: - [task.create](./task/create) - Create new task - [task.update](./task/update) - Update task - [task.complete](./task/complete) - Complete task +- [task.assign](./task/assign) - Assign task to instance (backward compatibility) +- [task.unassign](./task/unassign) - Remove assignment from task - [task.claim](./task/claim) - Claim tasks for processing - [task.list](./task/list) - List tasks with filters - [task.decompose](./task/decompose) - Decompose complex tasks into subtasks (replaces swarm.decompose) diff --git a/docs/docs/api/task/unassign.md b/docs/docs/api/task/unassign.md new file mode 100644 index 0000000..daa689c --- /dev/null +++ b/docs/docs/api/task/unassign.md @@ -0,0 +1,131 @@ +--- +sidebar_position: 7 +title: task.unassign +description: Remove assignment from a task +--- + +# task.unassign + +Remove the current assignment from a task and return it to the pending queue. This allows the task to be claimed by another worker or reassigned manually. + +## Request + +### Method +`task.unassign` + +### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `taskId` | string | ✓ | ID of the task to unassign | + +### Example Request + +```json +{ + "jsonrpc": "2.0", + "method": "task.unassign", + "params": { + "taskId": "t-1234567890" + }, + "id": "req-009" +} +``` + +## Response + +### Success Response + +```json +{ + "jsonrpc": "2.0", + "result": { + "taskId": "t-1234567890", + "previousAssignment": "worker-001", + "unassignedAt": "2025-01-19T10:50:00Z" + }, + "id": "req-009" +} +``` + +### Error Response + +```json +{ + "jsonrpc": "2.0", + "error": { + "code": -32602, + "message": "Task t-1234567890 is not currently assigned", + "data": { + "taskId": "t-1234567890" + } + }, + "id": "req-009" +} +``` + +## Event Emission + +When a task is successfully unassigned, the following event is emitted: + +```json +{ + "type": "task.unassigned", + "payload": { + "taskId": "t-1234567890", + "previousAssignment": "worker-001" + }, + "metadata": { + "unassignedBy": "instance-1", + "unassignedAt": "2025-01-19T10:50:00Z" + }, + "timestamp": 1758273000000 +} +``` + +## Notes + +### Prerequisites +- Task must exist in the system +- Task must be currently assigned +- Task status cannot be "completed" or "failed" + +### Behavior +- Removes assignment fields from task data +- Updates task status back to "pending" +- Returns task to pending queue with original priority +- Removes task from instance-specific queue +- Tracks unassignment in history +- Decrements instance task count metrics + +### Priority Handling +Tasks are returned to the pending queue with their original priority value. The queue score is calculated as: +``` +score = current_timestamp - (priority * 1000) +``` +This ensures higher priority tasks (higher numeric value) get lower scores and are processed first. + +### Limitations +- Rate limited to 20 requests per minute +- Cannot unassign completed or failed tasks +- Cannot unassign tasks that are not currently assigned + +### Data Storage +- Assignment removal stored in both Redis and PostgreSQL (if configured) +- History tracking in Redis for audit purposes +- Instance metrics updated for monitoring + +## Use Cases + +1. **Worker Failure Recovery**: When a worker goes offline, unassign its tasks so they can be claimed by healthy workers +2. **Task Rebalancing**: Redistribute tasks across workers for load balancing +3. **Manual Intervention**: Admin needs to reassign a stuck or incorrectly assigned task +4. **Priority Changes**: Unassign and update priority when task urgency changes + +## Related + +- [task.assign](./assign) - Assign a task to a specific instance +- [task.claim](./claim) - Claim a pending task (preferred method) +- [task.update](./update) - Update task details +- [task.complete](./complete) - Mark task as completed +- [task.list](./list) - List and filter tasks \ No newline at end of file