Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:

# 6) Set up Python & install dependencies
- uses: actions/setup-python@v5
with: { python-version: "3.10" }
with: { python-version: "3.13" }
- name: Install Python deps
run: |
pip install -e .
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: Set up Python 3.10
- name: Set up Python 3.13
uses: actions/setup-python@v3
with:
python-version: "3.10"
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
128 changes: 128 additions & 0 deletions opto/features/priority_search/epsNetPS_plus_summarizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from opto.features.priority_search.priority_search import PrioritySearch, ModuleCandidate
from opto.features.priority_search.module_regressor import RegressorTemplate
from opto.features.priority_search.summarizer import Summarizer
from typing import Union, List, Tuple, Dict, Any, Optional, Callable
from opto.optimizers.utils import print_color
import numpy as np
from opto.features.priority_search.search_template import Samples


def calculate_distance_to_memory(memory, new_candidate):
"""For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory.

To use this funciton in PrioritySearch, set memory to be self.memory.memory.
"""
assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute."
min_distance = float('inf')
for _, candidate in memory:
distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding))
if distance < min_distance:
min_distance = distance
return min_distance

class EpsilonNetPS_plus_Summarizer(PrioritySearch):
"""
A subclass of PrioritySearch, which keeps an epsilon-net as the memory. Reject new candidates that are in the epsilon-net of the memory.

This class uses a summarizer to summarize the memory and the exploration candidates. It then sets the context for the optimizer to use the summary to guide the exploration.

Args:
epsilon: The epsilon value for the epsilon-net. 0 means no filtering, the same as vanilla PrioritySearch.
use_summarizer: Whether to use a summarizer to summarize the memory and the exploration candidates.
summarizer_model_name: The model name for the summarizer.
*args: Additional arguments for the parent class.
**kwargs: Additional keyword arguments for the parent class.
"""
def __init__(self,
epsilon: float = 0.1,
use_summarizer: bool = False,
summarizer_model_name: str = "gemini/gemini-2.0-flash",
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.epsilon = epsilon
self.use_summarizer = use_summarizer
self.regressor = RegressorTemplate()
self.summarizer = Summarizer(model_name=summarizer_model_name)
self.context = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: "

def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]:
""" Filter candidates by their embeddings.
"""
if self.epsilon == 0: # no filtering
print_color(f"No filtering of candidates.", "green")
return new_candidates
exploration_memory = [(0, candidate) for candidate in self._exploration_candidates]
current_memory = self.memory.memory + exploration_memory
# Add embeddings to all the candidates. The regressor will check if the candidates have embeddings, and if not, it will add them in parallel.
current_candidates = [candidate for _, candidate in current_memory]
self.regressor.add_embeddings_to_candidates(current_candidates+new_candidates)

# filter new candidates based on the distance to the current memory.
num_new_candidates = len(new_candidates)

added_candidates = []
success_distances = []

while len(new_candidates) > 0:
# calculate the distance to the memory for each new candidate
distances = [calculate_distance_to_memory(current_memory, new_candidate) for new_candidate in new_candidates]

# filter candidates: keep only those with distance > epsilon
filtered_candidates = []
filtered_distances = []
for i, (candidate, distance) in enumerate(zip(new_candidates, distances)):
if distance > self.epsilon:
filtered_candidates.append(candidate)
filtered_distances.append(distance)

# if no candidates remain, exit the loop
if len(filtered_candidates) == 0:
break

# add the candidate with the largest distance to the memory
max_distance_idx = np.argmax(filtered_distances)
new_node = filtered_candidates[max_distance_idx]
current_memory.append((0, new_node))
added_candidates.append(new_node)
success_distances.append(float(filtered_distances[max_distance_idx]))

# remove the added candidate from new_candidates list
new_candidates = [c for c in filtered_candidates if c is not new_node]

print_color(f"Proposed {num_new_candidates} new candidates, {len(added_candidates)} of them are added to the memory.", "green")
# print the distances between the added candidates and the memory before adding them.
print_color(f"Distances between the added candidates and the memory before adding them: {success_distances}", "green")
return added_candidates

def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandidate:
""" For the summarizer usage, we keep the entire rollout. """
if self.use_summarizer:
return candidate
else:
return super().compress_candidate_memory(candidate)

def propose(self,
samples : Samples,
verbose : bool = False,
**kwargs):
"""
Override the propose method to include a summary into the context of the optimizer.
"""

# Use the summarizer to summarize the memory and the exploration candidates.
if self.use_summarizer:
# Summarize the memory and the exploration candidates.
exploration_memory = [(0, candidate) for candidate in self._exploration_candidates]
print_color(f"Summarizing the history...", "green")
try:
summary = self.summarizer.summarize(self.memory.memory+exploration_memory)
print_color(f"Summary: {summary}", "green")
self.context = f"Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: {summary}"
except Exception as e:
print_color(f"Error: {e}", "red")
print_color(f"Using fallback context: {self.context}", "red")
# Set the context for the optimizer.
for candidate in self._exploration_candidates:
candidate.optimizer.set_context(self.context)
return super().propose(samples, verbose, **kwargs)
141 changes: 141 additions & 0 deletions opto/features/priority_search/module_regressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,147 @@
from opto.utils.auto_retry import retry_with_exponential_backoff
import litellm
import time
from opto.features.priority_search.priority_search import ModuleCandidate

class RegressorTemplate:
"""Base class template for regression-based predictors for ModuleCandidate objects.

Provides common functionality for embedding generation and candidate processing.
Subclasses should implement update() and predict_scores() methods.

Regressors can be built on this template by implementing the update() and predict_scores() methods.
This class itself is enough for getting embeddings for candidates.
"""

def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True):
# In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once.
self.max_candidates_to_predict = 500
self.embedding_model = embedding_model
self.num_threads = num_threads
self.regularization_strength = regularization_strength # L2 regularization strength (lambda)
self.rich_text = rich_text

# Default original embedding dimension (from text-embedding-004)
self.original_embedding_dim = 768

# if linear_dim is not None:
# # Use random projection from 768D to linear_dim
# self.linear_dim = linear_dim
# print_color(f"Using random projection: {self.original_embedding_dim}D → {linear_dim}D", "blue")
# self.random_projector = GaussianRandomProjection(
# input_dim=self.original_embedding_dim,
# output_dim=linear_dim,
# random_seed=42
# )
# else:
# # Use default 768D without projection
# self.linear_dim = self.original_embedding_dim
# self.random_projector = None
self.linear_dim = self.original_embedding_dim
self.random_projector = None

# Initialize weights with larger values for more aggressive learning
self.weights = np.random.normal(0, 0.1, self.linear_dim)
self.bias = 0.0

def _get_parameter_text(self, candidate):
"""Get the parameter text for a ModuleCandidate."""
if not hasattr(candidate, 'update_dict'):
print(candidate)
assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict"
# Convert parameter nodes to readable names for deterministic embedding
params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()}

# if self.rich_text:
# # Create rich text representation with problem definition and rating question
# rich_text_parts = []

# # Add problem definition
# rich_text_parts.append(f"Problem Definition: {DOMAIN_CONTEXT_VERIBENCH.strip()}")
# rich_text_parts.append("") # Empty line for separation

# # Add parameter configuration
# rich_text_parts.append("Parameter Configuration:")
# for param_name, param_value in params_with_names.items():
# rich_text_parts.append(f"{param_name}: {param_value}")
# rich_text_parts.append("") # Empty line for separation

# # Add rating question
# rich_text_parts.append("Question: Based on the problem context above and this parameter configuration, how do you rate this parameter?")

# return "\n".join(rich_text_parts)
# else:
return str(params_with_names)


def _get_embedding(self, candidate):
"""Get the embedding for a ModuleCandidate."""
parameter_text = self._get_parameter_text(candidate)

def single_embedding_call():
return litellm.embedding(
model=self.embedding_model,
input=parameter_text
)

try:
response = retry_with_exponential_backoff(
single_embedding_call,
max_retries=10,
base_delay=1.0,
operation_name="Embedding API call"
)
embedding = response.data[0].embedding
if self.random_projector is not None:
# Convert to numpy array and reshape for transform (expects 2D: n_samples x n_features)
embedding_array = np.array(embedding).reshape(1, -1)
projected = self.random_projector.transform(embedding_array)
# Convert back to list and flatten
embedding = projected.flatten().tolist()
return embedding
except Exception as e:
print_color(f"ERROR: Embedding API call failed after retries: {e}", "red")
return None

def add_embeddings_to_candidates(self, candidates: List[ModuleCandidate]):
"""Add embeddings to a list of candidates. This function could be used outside."""
self._update_memory_embeddings_for_batch(candidates)

def _update_memory_embeddings_for_batch(self, batch):
"""Update the embeddings for a batch of candidates."""
# Separate candidates that need embeddings from those that already have them
candidates_needing_embeddings = []
for candidate in batch:
if not hasattr(candidate, "embedding"):
candidates_needing_embeddings.append(candidate)

# Generate embeddings in parallel for candidates that need them
if candidates_needing_embeddings:
def get_embedding_for_candidate(candidate):
return self._get_embedding(candidate)

# Create function list for async_run
embedding_functions = [lambda c=candidate: get_embedding_for_candidate(c)
for candidate in candidates_needing_embeddings]

# Run embedding generation in parallel
new_embeddings = async_run(
embedding_functions,
max_workers=50,
description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates"
)

# Assign embeddings back to candidates
for candidate, embedding in zip(candidates_needing_embeddings, new_embeddings):
candidate.embedding = embedding

def update(self, memory: List[Tuple[float, ModuleCandidate]]):
"""Update the regression model parameters. Should be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement the update method")

def predict_scores(self, memory: List[Tuple[float, ModuleCandidate]]):
"""Predict scores for candidates. Should be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement the predict_scores method")

class ModuleCandidateRegressor:
"""
Expand Down
43 changes: 40 additions & 3 deletions opto/features/priority_search/priority_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,14 @@ def update(self,
# samples is None in the first iteration
if samples is not None:
# 1. Propose new parameters based on running LLM optimizers on the collected samples

# We add the exploration rollouts to the exploration candidates, before proposing. Then these samples will not be added in the validate step.
self.add_exploration_rollouts_to_candidates(self._exploration_candidates, samples)
candidates = self.propose(samples, verbose=verbose, **kwargs) # List of ModuleCandidates

# Filter the new candidates, only some of them will be added to the memory. Default is no filtering.
candidates = self.filter_candidates(candidates)

# 2. Validate the proposed parameters
validate_results = self.validate(candidates, samples, verbose=verbose, **kwargs) # this updates the priority queue
# 3. Update the priority queue with the validation results
Expand Down Expand Up @@ -612,7 +619,9 @@ def validate(self,
assert self._exploration_candidates is not None, "exploration_candidates must be set before calling validate."

# The current batch of samples can be used to validate the exploration candidates
validate_samples = copy.copy(samples)
# validate_samples = copy.copy(samples)
# Exploration samples are added before proposing, so we don't need to add them again here.
validate_samples = Samples([], {'inputs': [], 'infos': []})

# Validate newly proposed candidates
use_prev_batch = self.use_prev_batch # when True, self.validate_sampler == self.train_sampler, and the current batch is used for validation
Expand All @@ -629,11 +638,17 @@ def validate(self,
description_prefix='Validating exploration candidates: ')) # sample the exploration agents
validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples


matched_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates + candidates, validate_samples.samples)
# Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False)
candidates_to_be_matched = exploration_candidates + candidates if (self.validate_exploration_candidates and not use_prev_batch) else candidates
matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples)
results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts)
for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts
results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts
# Add exploration candidates that weren't included in match_candidates_and_samples
# This ensures they get re-added to memory even if they weren't validated again
for candidate in exploration_candidates:
if candidate not in results:
results[candidate] = [] # Add with empty rollouts list

return results

Expand Down Expand Up @@ -802,3 +817,25 @@ def _process_rollout(rollout):
for rollout in candidate.rollouts:
_process_rollout(rollout)
return candidate

# For the further usage.
def filter_candidates(self, candidates: List[ModuleCandidate]) -> List[ModuleCandidate]:
""" Filter candidates.
This function can be overridden by subclasses to filter candidates by other criteria.
Args:
candidates (List[ModuleCandidate]): A list of candidates to filter.
Returns:
List[ModuleCandidate]: A list of filtered candidates.
"""
return candidates

# For the further usage, we decide to add the exploration rollouts to the exploration candidates, before proposing.
def add_exploration_rollouts_to_candidates(self, exploration_candidates: List[ModuleCandidate], samples: Samples):
""" Add the exploration rollouts to the exploration candidates.
"""
matched_exploration_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates, samples.samples)
exploration_results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts)
for c, rollouts in matched_exploration_candidates_and_samples.items(): # rollouts is a list of BatchRollouts
exploration_results[c] = [ r for rr in rollouts for r in rr.to_list()]
for candidate, rollouts in exploration_results.items():
candidate.add_rollouts(rollouts)
Loading