Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/server/src/handlers/task/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
export * from "./task.create.handler";
export * from "./task.update.handler";
export * from "./task.assign.handler"; // Backward compat wrapper
export * from "./task.unassign.handler"; // NEW unassign handler
export * from "./task.claim.handler"; // NEW pull model
export * from "./task.complete.handler";
export * from "./task.list.handler"; // NEW list/query handler
Expand Down
114 changes: 114 additions & 0 deletions apps/server/src/handlers/task/task.unassign.handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { EventHandler, Instrumented, Resilient } from "@/core/decorator";
import type { EventContext } from "@/core/context";
import { taskUnassignInput, taskUnassignOutput } from "@/schemas/task.schema";
import type { TaskUnassignInput, TaskUnassignOutput } from "@/schemas/task.schema";
import { redisKey } from "@/core/redis";

@EventHandler({
event: "task.unassign",
inputSchema: taskUnassignInput,
outputSchema: taskUnassignOutput,
persist: true,
rateLimit: 20,
description: "Remove assignment from a task",
})
export class TaskUnassignHandler {
@Instrumented(0) // No caching - this operation changes state
@Resilient({
rateLimit: { limit: 20, windowMs: 60000 }, // 20 requests per minute
timeout: 5000, // 5 second timeout
circuitBreaker: {
threshold: 5,
timeout: 30000,
fallback: () => {
throw new Error("Task unassignment service temporarily unavailable");
}
}
})
async handle(input: TaskUnassignInput, ctx: EventContext): Promise<TaskUnassignOutput> {
const taskKey = redisKey("task", input.taskId);

// Verify task exists
const taskData = await ctx.redis.stream.hgetall(taskKey);
if (!taskData || Object.keys(taskData).length === 0) {
throw new Error(`Task not found: ${input.taskId}`);
}

// Get previous assignment
const previousAssignment = taskData.assignedTo || null;

if (!previousAssignment) {
throw new Error(`Task ${input.taskId} is not currently assigned`);
}

// Check task status - don't unassign completed or failed tasks
const currentStatus = taskData.status;
if (currentStatus === "completed" || currentStatus === "failed") {
throw new Error(`Cannot unassign task ${input.taskId} with status ${currentStatus}`);
}

const now = new Date().toISOString();

// Remove assignment from task
await ctx.redis.stream.hdel(taskKey, "assignedTo", "assignedAt");

// Update task status back to pending and update timestamp
await ctx.redis.stream.hset(taskKey, {
status: "pending",
updatedAt: now,
});

// Add back to pending queue with original priority
const pendingQueueKey = redisKey("queue", "tasks", "pending");
const priority = Number(taskData.priority) || 50;
const score = Date.now() - (priority * 1000); // Higher priority = lower score
await ctx.redis.stream.zadd(pendingQueueKey, score, input.taskId);

// Remove from instance queue if present
const instanceQueueKey = redisKey("queue", "instance", previousAssignment);
await ctx.redis.stream.lrem(instanceQueueKey, 0, input.taskId);

// Track unassignment in history
const historyKey = redisKey("history", "task", input.taskId, "assignments");
await ctx.redis.stream.rpush(historyKey, JSON.stringify({
instanceId: previousAssignment,
action: "unassigned",
unassignedAt: now,
unassignedBy: ctx.instanceId
}));

// Update instance metrics
const instanceTasksKey = redisKey("metrics", "instance", previousAssignment);
await ctx.redis.stream.hincrby(instanceTasksKey, "assignedTasks", -1);

// Persist to PostgreSQL if configured
if (ctx.persist) {
await ctx.prisma.task.update({
where: { id: input.taskId },
data: {
assignedTo: null,
status: "pending",
},
});
}

// Publish event
await ctx.publish({
type: "task.unassigned",
payload: {
taskId: input.taskId,
previousAssignment,
},
metadata: {
unassignedBy: ctx.instanceId,
unassignedAt: now,
},
});

return {
taskId: input.taskId,
previousAssignment,
unassignedAt: now,
};
}
}
14 changes: 14 additions & 0 deletions apps/server/src/schemas/task.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,20 @@ export const taskDeleteOutput = z.object({
export type TaskDeleteInput = z.infer<typeof taskDeleteInput>;
export type TaskDeleteOutput = z.infer<typeof taskDeleteOutput>;

// task.unassign
export const taskUnassignInput = z.object({
taskId: z.string().min(1),
});

export const taskUnassignOutput = z.object({
taskId: z.string(),
previousAssignment: z.string().nullable(),
unassignedAt: z.string().datetime(),
});

export type TaskUnassignInput = z.infer<typeof taskUnassignInput>;
export type TaskUnassignOutput = z.infer<typeof taskUnassignOutput>;

// task.context
export const taskContextInput = z.object({
taskId: z.string().min(1),
Expand Down
13 changes: 13 additions & 0 deletions apps/web/src/components/TaskCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ interface TaskCardProps {
onUpdate?: (taskId: string, updates: any) => void;
onComplete?: (taskId: string) => void;
onAssign?: (taskId: string, instanceId: string) => void;
onUnassign?: (taskId: string) => void;
onDelete?: (taskId: string) => void;
onGenerateContext?: (taskId: string) => void;
onClick?: (task: Task) => void;
Expand All @@ -69,6 +70,7 @@ export function TaskCard({
onUpdate,
onComplete,
onAssign,
onUnassign,
onDelete,
onGenerateContext,
onClick,
Expand Down Expand Up @@ -210,6 +212,17 @@ export function TaskCard({
Generate Context
</DropdownMenuItem>
)}
{task.assignedTo && onUnassign && (
<DropdownMenuItem
onClick={(e) => {
e.stopPropagation();
onUnassign(task.id);
}}
>
<User className="h-4 w-4 mr-2 text-orange-500" />
Unassign from {task.assignedTo}
</DropdownMenuItem>
)}
{instances.length > 0 && onAssign && (
<>
<DropdownMenuSeparator />
Expand Down
2 changes: 2 additions & 0 deletions apps/web/src/components/TaskDetailModal.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ interface TaskDetailModalProps {
onComplete?: (taskId: string) => void;
onDelete?: (taskId: string) => void;
onAssign?: (taskId: string, instanceId: string) => void;
onUnassign?: (taskId: string) => void;
instances?: Array<{ id: string; roles: string[]; status?: string; health?: string }>;
}

Expand All @@ -74,6 +75,7 @@ export function TaskDetailModal({
onComplete,
onDelete,
onAssign,
onUnassign,
instances = [],
}: TaskDetailModalProps) {
if (!task) return null;
Expand Down
14 changes: 14 additions & 0 deletions apps/web/src/components/TaskKanban.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ export function TaskKanban({ className }: TaskKanbanProps) {
const completeTaskMutation = useCompleteTask();
const deleteTaskMutation = useDeleteTask();
const assignTaskMutation = useEventMutation("task.assign");
const unassignTaskMutation = useEventMutation("task.unassign");
const generateContextMutation = useGenerateContext();

// WebSocket connection for real-time updates
Expand Down Expand Up @@ -587,6 +588,17 @@ export function TaskKanban({ className }: TaskKanbanProps) {
}
};

const handleTaskUnassign = async (taskId: string) => {
try {
await unassignTaskMutation.mutateAsync({
taskId,
});
await refetchState();
} catch (error) {
console.error("Failed to unassign task:", error);
}
};

const handleGenerateContext = (taskId: string) => {
const task = tasks.find(t => t.id === taskId);
if (task) {
Expand Down Expand Up @@ -900,6 +912,7 @@ export function TaskKanban({ className }: TaskKanbanProps) {
onDelete={handleTaskDelete}
onGenerateContext={handleGenerateContext}
onAssign={handleTaskAssign}
onUnassign={handleTaskUnassign}
onClick={handleTaskClick}
instances={instances}
/>
Expand Down Expand Up @@ -976,6 +989,7 @@ export function TaskKanban({ className }: TaskKanbanProps) {
onComplete={handleTaskComplete}
onDelete={handleTaskDelete}
onAssign={handleTaskAssign}
onUnassign={handleTaskUnassign}
instances={instances}
/>

Expand Down
2 changes: 2 additions & 0 deletions docs/docs/api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ ClaudeBench organizes events into domains:
- [task.create](./task/create) - Create new task
- [task.update](./task/update) - Update task
- [task.complete](./task/complete) - Complete task
- [task.assign](./task/assign) - Assign task to instance (backward compatibility)
- [task.unassign](./task/unassign) - Remove assignment from task
- [task.claim](./task/claim) - Claim tasks for processing
- [task.list](./task/list) - List tasks with filters
- [task.decompose](./task/decompose) - Decompose complex tasks into subtasks (replaces swarm.decompose)
Expand Down
131 changes: 131 additions & 0 deletions docs/docs/api/task/unassign.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
---
sidebar_position: 7
title: task.unassign
description: Remove assignment from a task
---

# task.unassign

Remove the current assignment from a task and return it to the pending queue. This allows the task to be claimed by another worker or reassigned manually.

## Request

### Method
`task.unassign`

### Parameters

| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `taskId` | string | ✓ | ID of the task to unassign |

### Example Request

```json
{
"jsonrpc": "2.0",
"method": "task.unassign",
"params": {
"taskId": "t-1234567890"
},
"id": "req-009"
}
```

## Response

### Success Response

```json
{
"jsonrpc": "2.0",
"result": {
"taskId": "t-1234567890",
"previousAssignment": "worker-001",
"unassignedAt": "2025-01-19T10:50:00Z"
},
"id": "req-009"
}
```

### Error Response

```json
{
"jsonrpc": "2.0",
"error": {
"code": -32602,
"message": "Task t-1234567890 is not currently assigned",
"data": {
"taskId": "t-1234567890"
}
},
"id": "req-009"
}
```

## Event Emission

When a task is successfully unassigned, the following event is emitted:

```json
{
"type": "task.unassigned",
"payload": {
"taskId": "t-1234567890",
"previousAssignment": "worker-001"
},
"metadata": {
"unassignedBy": "instance-1",
"unassignedAt": "2025-01-19T10:50:00Z"
},
"timestamp": 1758273000000
}
```

## Notes

### Prerequisites
- Task must exist in the system
- Task must be currently assigned
- Task status cannot be "completed" or "failed"

### Behavior
- Removes assignment fields from task data
- Updates task status back to "pending"
- Returns task to pending queue with original priority
- Removes task from instance-specific queue
- Tracks unassignment in history
- Decrements instance task count metrics

### Priority Handling
Tasks are returned to the pending queue with their original priority value. The queue score is calculated as:
```
score = current_timestamp - (priority * 1000)
```
This ensures higher priority tasks (higher numeric value) get lower scores and are processed first.

### Limitations
- Rate limited to 20 requests per minute
- Cannot unassign completed or failed tasks
- Cannot unassign tasks that are not currently assigned

### Data Storage
- Assignment removal stored in both Redis and PostgreSQL (if configured)
- History tracking in Redis for audit purposes
- Instance metrics updated for monitoring

## Use Cases

1. **Worker Failure Recovery**: When a worker goes offline, unassign its tasks so they can be claimed by healthy workers
2. **Task Rebalancing**: Redistribute tasks across workers for load balancing
3. **Manual Intervention**: Admin needs to reassign a stuck or incorrectly assigned task
4. **Priority Changes**: Unassign and update priority when task urgency changes

## Related

- [task.assign](./assign) - Assign a task to a specific instance
- [task.claim](./claim) - Claim a pending task (preferred method)
- [task.update](./update) - Update task details
- [task.complete](./complete) - Mark task as completed
- [task.list](./list) - List and filter tasks