Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
326e09f
{
fblgit Sep 21, 2025
0c2689c
{
fblgit Sep 21, 2025
6ecd371
{
fblgit Sep 21, 2025
ce68ab4
{
fblgit Sep 21, 2025
5542e48
{
fblgit Sep 21, 2025
a3b807e
{
fblgit Sep 21, 2025
cf5ada9
{
fblgit Sep 21, 2025
243aa1c
{
fblgit Sep 21, 2025
0188ef1
{
fblgit Sep 21, 2025
3819771
{
fblgit Sep 21, 2025
c4cb7f7
{
fblgit Sep 21, 2025
44feff3
{
fblgit Sep 21, 2025
a39bbf1
{
fblgit Sep 21, 2025
0db9022
{
fblgit Sep 21, 2025
c97a0d5
{
fblgit Sep 21, 2025
4af0502
{
fblgit Sep 21, 2025
66f063d
{
fblgit Sep 21, 2025
3cf5b59
{
fblgit Sep 21, 2025
40030df
{
fblgit Sep 21, 2025
1c0a056
{
fblgit Sep 21, 2025
3993206
{
fblgit Sep 22, 2025
774829a
{
fblgit Sep 22, 2025
838a93c
{
fblgit Sep 22, 2025
807ed8a
{
fblgit Sep 22, 2025
06e538c
{
fblgit Sep 22, 2025
e6a5bd5
{
fblgit Sep 22, 2025
9914dbe
{
fblgit Sep 22, 2025
caca830
{
fblgit Sep 22, 2025
c8dd211
{
fblgit Sep 22, 2025
5d19573
{
fblgit Sep 22, 2025
bc0e693
{
fblgit Sep 22, 2025
7c3f69c
{
fblgit Sep 22, 2025
f655d77
{
fblgit Sep 22, 2025
e7ac8a2
{
fblgit Sep 22, 2025
b228328
{
fblgit Sep 22, 2025
bfbb29c
{
fblgit Sep 22, 2025
56fb5a6
{
fblgit Sep 22, 2025
c8624aa
{
fblgit Sep 22, 2025
baabb97
{
fblgit Sep 22, 2025
802bbfb
{
fblgit Sep 22, 2025
faced02
{
fblgit Sep 22, 2025
e6b6f9a
{
fblgit Sep 22, 2025
376d43a
{
fblgit Sep 22, 2025
f5a6d35
{
fblgit Sep 22, 2025
8d6b807
{
fblgit Sep 22, 2025
db9db0c
{
fblgit Sep 22, 2025
adcaed1
{
fblgit Sep 22, 2025
51da446
{
fblgit Sep 22, 2025
7633e95
{
fblgit Sep 22, 2025
4bf9987
{
fblgit Sep 22, 2025
e70c04b
{
fblgit Sep 22, 2025
666853b
{
fblgit Sep 22, 2025
d78616a
{
fblgit Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions apps/inference/src/claudebench_inference/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,25 @@ async def decompose_task(request: DecompositionRequest):
logger.info(f"Decomposition request for session {request.sessionId}: {request.task[:50]}...")

try:
# Extract working directory from context if provided
working_directory = None
if request.context and hasattr(request.context, 'workingDirectory'):
working_directory = request.context.workingDirectory
if working_directory:
logger.info(f"Using working directory for decomposition: {working_directory}")

# Build the prompt
prompt = prompt_builder.build_decomposition_prompt(
task=request.task,
context=request.context
)

# Perform sampling
# Perform sampling with working directory
result = await sampling_engine.sample_json(
prompt=prompt,
max_tokens=8192,
temperature=0.7
temperature=0.7,
working_directory=working_directory
)

# Validate and return response
Expand Down Expand Up @@ -222,18 +230,26 @@ async def generate_context(request: ContextRequest):
logger.info(f"Context generation for subtask {request.subtaskId}")

try:
# Extract working directory from subtask metadata if provided
working_directory = None
if request.subtask and isinstance(request.subtask, dict):
working_directory = request.subtask.get('workingDirectory')
if working_directory:
logger.info(f"Using working directory: {working_directory}")

# Build the prompt
prompt = prompt_builder.build_context_prompt(
subtaskId=request.subtaskId,
specialist=request.specialist,
subtask=request.subtask
)

# Perform sampling
# Perform sampling with working directory
result = await sampling_engine.sample_json(
prompt=prompt,
max_tokens=16384,
temperature=0.5 # Lower temperature for more focused context
temperature=0.5, # Lower temperature for more focused context
working_directory=working_directory
)

# Validate and return response
Expand Down
1 change: 1 addition & 0 deletions apps/inference/src/claudebench_inference/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DecompositionContext(BaseModel):
specialists: List[Specialist]
priority: int = Field(ge=0, le=100)
constraints: Optional[List[str]] = Field(default_factory=list)
workingDirectory: Optional[str] = None # Working directory for codebase exploration


class DecompositionRequest(BaseModel):
Expand Down
17 changes: 13 additions & 4 deletions apps/inference/src/claudebench_inference/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ async def sample(
max_tokens: int = 2000, # Note: not directly used by SDK
temperature: float = 0.7, # Note: not directly used by SDK
system_prompt: Optional[str] = None,
max_turns: int = 50 # Allow extensive exploration
max_turns: int = 50, # Allow extensive exploration
working_directory: Optional[str] = None # Working directory for the context
) -> str:
"""
Perform sampling using claude-code-sdk
Expand All @@ -61,6 +62,7 @@ async def sample(
temperature: Sampling temperature (not used by SDK)
system_prompt: Override the default system prompt
max_turns: Maximum number of turns for tool usage
working_directory: Working directory to run the SDK in

Returns:
The response text from Claude
Expand All @@ -71,11 +73,16 @@ async def sample(
try:
self.stats["total_requests"] += 1

# Log the working directory if provided
if working_directory:
logger.info(f"Using working directory: {working_directory}")

options = ClaudeCodeOptions(
max_turns=max_turns, # Allow multiple turns for exploration
system_prompt=system_prompt or self.default_system_prompt,
allowed_tools=self.allowed_tools,
permission_mode='bypassPermissions' # Allow tool use without prompting
permission_mode='bypassPermissions', # Allow tool use without prompting
cwd=working_directory # Set the working directory for the SDK
)

response_text = ""
Expand Down Expand Up @@ -133,7 +140,8 @@ async def sample_json(
max_tokens: int = 2000,
temperature: float = 0.7,
system_prompt: Optional[str] = None,
max_turns: int = 50 # Allow extensive exploration
max_turns: int = 50, # Allow extensive exploration
working_directory: Optional[str] = None
) -> Dict[str, Any]:
"""
Sample and parse JSON response
Expand All @@ -144,14 +152,15 @@ async def sample_json(
temperature: Sampling temperature
system_prompt: Override the default system prompt
max_turns: Maximum number of turns for tool usage
working_directory: Working directory to run the SDK in

Returns:
Parsed JSON response as dictionary

Raises:
Exception: If sampling or parsing fails
"""
response = await self.sample(prompt, max_tokens, temperature, system_prompt, max_turns)
response = await self.sample(prompt, max_tokens, temperature, system_prompt, max_turns, working_directory)
return self.extract_json(response)

def get_stats(self) -> Dict[str, Any]:
Expand Down
4 changes: 3 additions & 1 deletion apps/server/src/core/lua-scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -842,14 +842,16 @@ local instance_id = ARGV[1]
local roles_json = ARGV[2]
local timestamp = ARGV[3]
local ttl = tonumber(ARGV[4])
local metadata_json = ARGV[5] or '{}'

-- Register instance
redis.call('hset', instance_key,
'id', instance_id,
'roles', roles_json,
'health', 'healthy',
'status', 'ACTIVE',
'lastSeen', timestamp
'lastSeen', timestamp,
'metadata', metadata_json
)
redis.call('expire', instance_key, ttl)

Expand Down
6 changes: 4 additions & 2 deletions apps/server/src/core/redis-scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ export class RedisScriptExecutor {
async registerInstance(
instanceId: string,
roles: string[],
ttl: number
ttl: number,
metadata?: any
): Promise<{ success: boolean; becameLeader: boolean }> {
const result = await this.redis.stream.eval(
scripts.INSTANCE_REGISTER,
Expand All @@ -364,7 +365,8 @@ export class RedisScriptExecutor {
instanceId,
JSON.stringify(roles),
Date.now().toString(),
ttl.toString()
ttl.toString(),
JSON.stringify(metadata || {})
) as [number, number];

return {
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/core/sampling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface DecompositionContext {
}>;
priority: number;
constraints?: string[];
workingDirectory?: string;
}

export interface Decomposition {
Expand Down
21 changes: 20 additions & 1 deletion apps/server/src/handlers/swarm/swarm.decompose.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { EventContext } from "@/core/context";
import { swarmDecomposeInput, swarmDecomposeOutput } from "@/schemas/swarm.schema";
import type { SwarmDecomposeInput, SwarmDecomposeOutput } from "@/schemas/swarm.schema";
import { redisScripts } from "@/core/redis-scripts";
import { getRedis } from "@/core/redis";
import { getSamplingService } from "@/core/sampling";
import { registry } from "@/core/registry";

Expand Down Expand Up @@ -82,14 +83,32 @@ export class SwarmDecomposeHandler {
throw new Error("No session ID available for sampling");
}

// Get worker's working directory from instance metadata
const redis = getRedis();
let workingDirectory: string | undefined;
if (ctx.instanceId) {
const instanceKey = `cb:instance:${ctx.instanceId}`;
const instanceMetadata = await redis.pub.hget(instanceKey, 'metadata');
if (instanceMetadata) {
try {
const metadata = JSON.parse(instanceMetadata);
workingDirectory = metadata.workingDirectory;
console.log(`[SwarmDecompose] Using working directory from instance ${ctx.instanceId}: ${workingDirectory}`);
} catch (e) {
console.warn(`[SwarmDecompose] Failed to parse instance metadata:`, e);
}
}
}

// Request decomposition via sampling
const decomposition = await samplingService.requestDecomposition(
sessionId,
input.task,
{
specialists,
priority: input.priority,
constraints: input.constraints
constraints: input.constraints,
workingDirectory
}
);

Expand Down
3 changes: 2 additions & 1 deletion apps/server/src/handlers/system/system.register.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ export class SystemRegisterHandler {
const result = await redisScripts.registerInstance(
input.id,
input.roles,
ttl
ttl,
input.metadata
);

console.log(`[SystemRegister] Registration result for ${input.id}:`, result);
Expand Down
43 changes: 33 additions & 10 deletions apps/server/src/handlers/task/task.context.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,37 @@ export class TaskContextHandler {
url: att.url || null
}));

// Prepare task info for context generation
// Generate context via sampling service
const samplingService = getSamplingService();
const sessionId = ctx.metadata?.sessionId || ctx.metadata?.clientId || ctx.instanceId;

if (!sessionId) {
throw new Error("No session ID available for sampling");
}

// Get worker's working directory from instance metadata
let workingDirectory: string | undefined;
// First check if a specific worker was requested in the input metadata
const requestedWorkerId = input.metadata?.workerId;
const workerId = requestedWorkerId || ctx.instanceId;

if (workerId) {
const instanceKey = `cb:instance:${workerId}`;
const instanceMetadata = await redis.pub.hget(instanceKey, 'metadata');
if (instanceMetadata) {
try {
const metadata = JSON.parse(instanceMetadata);
workingDirectory = metadata.workingDirectory;
console.log(`[TaskContext] Using working directory from instance ${workerId}: ${workingDirectory}`);
} catch (e) {
console.warn(`[TaskContext] Failed to parse instance metadata for ${workerId}:`, e);
}
} else if (requestedWorkerId) {
console.warn(`[TaskContext] Requested worker ${requestedWorkerId} not found or has no metadata`);
}
}

// Prepare task info for context generation - include workingDirectory
const taskInfo = {
id: input.taskId,
description: input.customDescription || taskData.text || "No description",
Expand All @@ -160,17 +190,10 @@ export class TaskContextHandler {
constraints: input.constraints || [],
requirements: input.requirements || [],
existingFiles: input.existingFiles || [],
additionalContext: input.additionalContext || ""
additionalContext: input.additionalContext || "",
workingDirectory: workingDirectory // Now properly typed in the object
};

// Generate context via sampling service
const samplingService = getSamplingService();
const sessionId = ctx.metadata?.sessionId || ctx.metadata?.clientId || ctx.instanceId;

if (!sessionId) {
throw new Error("No session ID available for sampling");
}

// Call the context generation endpoint with task info
const response = await samplingService.generateContext(
sessionId,
Expand Down
3 changes: 2 additions & 1 deletion apps/server/src/handlers/task/task.create_project.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ export class TaskCreateProjectHandler {
sessionId: sessionId,
metadata: {
projectId: projectId,
source: "task.create_project"
source: "task.create_project",
workerId: input.metadata?.workerId // Pass through the workerId
}
}, ctx.metadata?.clientId);

Expand Down
19 changes: 18 additions & 1 deletion apps/server/src/handlers/task/task.decompose.handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,31 @@ export class TaskDecomposeHandler {
throw new Error("No session ID available for sampling");
}

// Get worker's working directory from instance metadata
let workingDirectory: string | undefined;
if (ctx.instanceId) {
const instanceKey = `cb:instance:${ctx.instanceId}`;
const instanceMetadata = await redis.pub.hget(instanceKey, 'metadata');
if (instanceMetadata) {
try {
const metadata = JSON.parse(instanceMetadata);
workingDirectory = metadata.workingDirectory;
console.log(`[TaskDecompose] Using working directory from instance ${ctx.instanceId}: ${workingDirectory}`);
} catch (e) {
console.warn(`[TaskDecompose] Failed to parse instance metadata:`, e);
}
}
}

// Request decomposition via sampling
const decomposition = await samplingService.requestDecomposition(
sessionId,
input.task,
{
specialists,
priority: input.priority,
constraints: input.constraints
constraints: input.constraints,
workingDirectory
}
);

Expand Down
3 changes: 3 additions & 0 deletions apps/server/src/schemas/system.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export const systemHealthOutput = z.object({
export const systemRegisterInput = z.object({
id: z.string().min(1),
roles: z.array(z.string()),
metadata: z.object({
workingDirectory: z.string().optional(),
}).optional(),
});

export const systemRegisterOutput = z.object({
Expand Down
Loading