diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7889b69d..622c9626 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -49,7 +49,7 @@ jobs:
# 6) Set up Python & install dependencies
- uses: actions/setup-python@v5
- with: { python-version: "3.10" }
+ with: { python-version: "3.13" }
- name: Install Python deps
run: |
pip install -e .
diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml
index 8074be85..a111e34f 100644
--- a/.github/workflows/python-app.yml
+++ b/.github/workflows/python-app.yml
@@ -19,10 +19,10 @@ jobs:
steps:
- uses: actions/checkout@v4
- - name: Set up Python 3.10
+ - name: Set up Python 3.13
uses: actions/setup-python@v3
with:
- python-version: "3.10"
+ python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
diff --git a/opto/features/priority_search/epsNetPS_plus_summarizer.py b/opto/features/priority_search/epsNetPS_plus_summarizer.py
new file mode 100644
index 00000000..e41c948e
--- /dev/null
+++ b/opto/features/priority_search/epsNetPS_plus_summarizer.py
@@ -0,0 +1,128 @@
+from opto.features.priority_search.priority_search import PrioritySearch, ModuleCandidate
+from opto.features.priority_search.module_regressor import RegressorTemplate
+from opto.features.priority_search.summarizer import Summarizer
+from typing import Union, List, Tuple, Dict, Any, Optional, Callable
+from opto.optimizers.utils import print_color
+import numpy as np
+from opto.features.priority_search.search_template import Samples
+
+
+def calculate_distance_to_memory(memory, new_candidate):
+ """For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory.
+
+ To use this funciton in PrioritySearch, set memory to be self.memory.memory.
+ """
+ assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute."
+ min_distance = float('inf')
+ for _, candidate in memory:
+ distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding))
+ if distance < min_distance:
+ min_distance = distance
+ return min_distance
+
+class EpsilonNetPS_plus_Summarizer(PrioritySearch):
+ """
+ A subclass of PrioritySearch, which keeps an epsilon-net as the memory. Reject new candidates that are in the epsilon-net of the memory.
+
+ This class uses a summarizer to summarize the memory and the exploration candidates. It then sets the context for the optimizer to use the summary to guide the exploration.
+
+ Args:
+ epsilon: The epsilon value for the epsilon-net. 0 means no filtering, the same as vanilla PrioritySearch.
+ use_summarizer: Whether to use a summarizer to summarize the memory and the exploration candidates.
+ summarizer_model_name: The model name for the summarizer.
+ *args: Additional arguments for the parent class.
+ **kwargs: Additional keyword arguments for the parent class.
+ """
+ def __init__(self,
+ epsilon: float = 0.1,
+ use_summarizer: bool = False,
+ summarizer_model_name: str = "gemini/gemini-2.0-flash",
+ *args,
+ **kwargs):
+ super().__init__(*args, **kwargs)
+ self.epsilon = epsilon
+ self.use_summarizer = use_summarizer
+ self.regressor = RegressorTemplate()
+ self.summarizer = Summarizer(model_name=summarizer_model_name)
+ self.context = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: "
+
+ def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]:
+ """ Filter candidates by their embeddings.
+ """
+ if self.epsilon == 0: # no filtering
+ print_color(f"No filtering of candidates.", "green")
+ return new_candidates
+ exploration_memory = [(0, candidate) for candidate in self._exploration_candidates]
+ current_memory = self.memory.memory + exploration_memory
+ # Add embeddings to all the candidates. The regressor will check if the candidates have embeddings, and if not, it will add them in parallel.
+ current_candidates = [candidate for _, candidate in current_memory]
+ self.regressor.add_embeddings_to_candidates(current_candidates+new_candidates)
+
+ # filter new candidates based on the distance to the current memory.
+ num_new_candidates = len(new_candidates)
+
+ added_candidates = []
+ success_distances = []
+
+ while len(new_candidates) > 0:
+ # calculate the distance to the memory for each new candidate
+ distances = [calculate_distance_to_memory(current_memory, new_candidate) for new_candidate in new_candidates]
+
+ # filter candidates: keep only those with distance > epsilon
+ filtered_candidates = []
+ filtered_distances = []
+ for i, (candidate, distance) in enumerate(zip(new_candidates, distances)):
+ if distance > self.epsilon:
+ filtered_candidates.append(candidate)
+ filtered_distances.append(distance)
+
+ # if no candidates remain, exit the loop
+ if len(filtered_candidates) == 0:
+ break
+
+ # add the candidate with the largest distance to the memory
+ max_distance_idx = np.argmax(filtered_distances)
+ new_node = filtered_candidates[max_distance_idx]
+ current_memory.append((0, new_node))
+ added_candidates.append(new_node)
+ success_distances.append(float(filtered_distances[max_distance_idx]))
+
+ # remove the added candidate from new_candidates list
+ new_candidates = [c for c in filtered_candidates if c is not new_node]
+
+ print_color(f"Proposed {num_new_candidates} new candidates, {len(added_candidates)} of them are added to the memory.", "green")
+ # print the distances between the added candidates and the memory before adding them.
+ print_color(f"Distances between the added candidates and the memory before adding them: {success_distances}", "green")
+ return added_candidates
+
+ def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandidate:
+ """ For the summarizer usage, we keep the entire rollout. """
+ if self.use_summarizer:
+ return candidate
+ else:
+ return super().compress_candidate_memory(candidate)
+
+ def propose(self,
+ samples : Samples,
+ verbose : bool = False,
+ **kwargs):
+ """
+ Override the propose method to include a summary into the context of the optimizer.
+ """
+
+ # Use the summarizer to summarize the memory and the exploration candidates.
+ if self.use_summarizer:
+ # Summarize the memory and the exploration candidates.
+ exploration_memory = [(0, candidate) for candidate in self._exploration_candidates]
+ print_color(f"Summarizing the history...", "green")
+ try:
+ summary = self.summarizer.summarize(self.memory.memory+exploration_memory)
+ print_color(f"Summary: {summary}", "green")
+ self.context = f"Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: {summary}"
+ except Exception as e:
+ print_color(f"Error: {e}", "red")
+ print_color(f"Using fallback context: {self.context}", "red")
+ # Set the context for the optimizer.
+ for candidate in self._exploration_candidates:
+ candidate.optimizer.set_context(self.context)
+ return super().propose(samples, verbose, **kwargs)
diff --git a/opto/features/priority_search/module_regressor.py b/opto/features/priority_search/module_regressor.py
index a92793c5..4b9b48d9 100644
--- a/opto/features/priority_search/module_regressor.py
+++ b/opto/features/priority_search/module_regressor.py
@@ -17,6 +17,147 @@
from opto.utils.auto_retry import retry_with_exponential_backoff
import litellm
import time
+from opto.features.priority_search.priority_search import ModuleCandidate
+
+class RegressorTemplate:
+ """Base class template for regression-based predictors for ModuleCandidate objects.
+
+ Provides common functionality for embedding generation and candidate processing.
+ Subclasses should implement update() and predict_scores() methods.
+
+ Regressors can be built on this template by implementing the update() and predict_scores() methods.
+ This class itself is enough for getting embeddings for candidates.
+ """
+
+ def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True):
+ # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once.
+ self.max_candidates_to_predict = 500
+ self.embedding_model = embedding_model
+ self.num_threads = num_threads
+ self.regularization_strength = regularization_strength # L2 regularization strength (lambda)
+ self.rich_text = rich_text
+
+ # Default original embedding dimension (from text-embedding-004)
+ self.original_embedding_dim = 768
+
+ # if linear_dim is not None:
+ # # Use random projection from 768D to linear_dim
+ # self.linear_dim = linear_dim
+ # print_color(f"Using random projection: {self.original_embedding_dim}D → {linear_dim}D", "blue")
+ # self.random_projector = GaussianRandomProjection(
+ # input_dim=self.original_embedding_dim,
+ # output_dim=linear_dim,
+ # random_seed=42
+ # )
+ # else:
+ # # Use default 768D without projection
+ # self.linear_dim = self.original_embedding_dim
+ # self.random_projector = None
+ self.linear_dim = self.original_embedding_dim
+ self.random_projector = None
+
+ # Initialize weights with larger values for more aggressive learning
+ self.weights = np.random.normal(0, 0.1, self.linear_dim)
+ self.bias = 0.0
+
+ def _get_parameter_text(self, candidate):
+ """Get the parameter text for a ModuleCandidate."""
+ if not hasattr(candidate, 'update_dict'):
+ print(candidate)
+ assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict"
+ # Convert parameter nodes to readable names for deterministic embedding
+ params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()}
+
+ # if self.rich_text:
+ # # Create rich text representation with problem definition and rating question
+ # rich_text_parts = []
+
+ # # Add problem definition
+ # rich_text_parts.append(f"Problem Definition: {DOMAIN_CONTEXT_VERIBENCH.strip()}")
+ # rich_text_parts.append("") # Empty line for separation
+
+ # # Add parameter configuration
+ # rich_text_parts.append("Parameter Configuration:")
+ # for param_name, param_value in params_with_names.items():
+ # rich_text_parts.append(f"{param_name}: {param_value}")
+ # rich_text_parts.append("") # Empty line for separation
+
+ # # Add rating question
+ # rich_text_parts.append("Question: Based on the problem context above and this parameter configuration, how do you rate this parameter?")
+
+ # return "\n".join(rich_text_parts)
+ # else:
+ return str(params_with_names)
+
+
+ def _get_embedding(self, candidate):
+ """Get the embedding for a ModuleCandidate."""
+ parameter_text = self._get_parameter_text(candidate)
+
+ def single_embedding_call():
+ return litellm.embedding(
+ model=self.embedding_model,
+ input=parameter_text
+ )
+
+ try:
+ response = retry_with_exponential_backoff(
+ single_embedding_call,
+ max_retries=10,
+ base_delay=1.0,
+ operation_name="Embedding API call"
+ )
+ embedding = response.data[0].embedding
+ if self.random_projector is not None:
+ # Convert to numpy array and reshape for transform (expects 2D: n_samples x n_features)
+ embedding_array = np.array(embedding).reshape(1, -1)
+ projected = self.random_projector.transform(embedding_array)
+ # Convert back to list and flatten
+ embedding = projected.flatten().tolist()
+ return embedding
+ except Exception as e:
+ print_color(f"ERROR: Embedding API call failed after retries: {e}", "red")
+ return None
+
+ def add_embeddings_to_candidates(self, candidates: List[ModuleCandidate]):
+ """Add embeddings to a list of candidates. This function could be used outside."""
+ self._update_memory_embeddings_for_batch(candidates)
+
+ def _update_memory_embeddings_for_batch(self, batch):
+ """Update the embeddings for a batch of candidates."""
+ # Separate candidates that need embeddings from those that already have them
+ candidates_needing_embeddings = []
+ for candidate in batch:
+ if not hasattr(candidate, "embedding"):
+ candidates_needing_embeddings.append(candidate)
+
+ # Generate embeddings in parallel for candidates that need them
+ if candidates_needing_embeddings:
+ def get_embedding_for_candidate(candidate):
+ return self._get_embedding(candidate)
+
+ # Create function list for async_run
+ embedding_functions = [lambda c=candidate: get_embedding_for_candidate(c)
+ for candidate in candidates_needing_embeddings]
+
+ # Run embedding generation in parallel
+ new_embeddings = async_run(
+ embedding_functions,
+ max_workers=50,
+ description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates"
+ )
+
+ # Assign embeddings back to candidates
+ for candidate, embedding in zip(candidates_needing_embeddings, new_embeddings):
+ candidate.embedding = embedding
+
+ def update(self, memory: List[Tuple[float, ModuleCandidate]]):
+ """Update the regression model parameters. Should be implemented by subclasses."""
+ raise NotImplementedError("Subclasses must implement the update method")
+
+ def predict_scores(self, memory: List[Tuple[float, ModuleCandidate]]):
+ """Predict scores for candidates. Should be implemented by subclasses."""
+ raise NotImplementedError("Subclasses must implement the predict_scores method")
class ModuleCandidateRegressor:
"""
diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py
index 86bcbd60..a98b370e 100644
--- a/opto/features/priority_search/priority_search.py
+++ b/opto/features/priority_search/priority_search.py
@@ -424,7 +424,14 @@ def update(self,
# samples is None in the first iteration
if samples is not None:
# 1. Propose new parameters based on running LLM optimizers on the collected samples
+
+ # We add the exploration rollouts to the exploration candidates, before proposing. Then these samples will not be added in the validate step.
+ self.add_exploration_rollouts_to_candidates(self._exploration_candidates, samples)
candidates = self.propose(samples, verbose=verbose, **kwargs) # List of ModuleCandidates
+
+ # Filter the new candidates, only some of them will be added to the memory. Default is no filtering.
+ candidates = self.filter_candidates(candidates)
+
# 2. Validate the proposed parameters
validate_results = self.validate(candidates, samples, verbose=verbose, **kwargs) # this updates the priority queue
# 3. Update the priority queue with the validation results
@@ -612,7 +619,9 @@ def validate(self,
assert self._exploration_candidates is not None, "exploration_candidates must be set before calling validate."
# The current batch of samples can be used to validate the exploration candidates
- validate_samples = copy.copy(samples)
+ # validate_samples = copy.copy(samples)
+ # Exploration samples are added before proposing, so we don't need to add them again here.
+ validate_samples = Samples([], {'inputs': [], 'infos': []})
# Validate newly proposed candidates
use_prev_batch = self.use_prev_batch # when True, self.validate_sampler == self.train_sampler, and the current batch is used for validation
@@ -629,11 +638,17 @@ def validate(self,
description_prefix='Validating exploration candidates: ')) # sample the exploration agents
validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples
-
- matched_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates + candidates, validate_samples.samples)
+ # Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False)
+ candidates_to_be_matched = exploration_candidates + candidates if (self.validate_exploration_candidates and not use_prev_batch) else candidates
+ matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples)
results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts)
for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts
results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts
+ # Add exploration candidates that weren't included in match_candidates_and_samples
+ # This ensures they get re-added to memory even if they weren't validated again
+ for candidate in exploration_candidates:
+ if candidate not in results:
+ results[candidate] = [] # Add with empty rollouts list
return results
@@ -802,3 +817,25 @@ def _process_rollout(rollout):
for rollout in candidate.rollouts:
_process_rollout(rollout)
return candidate
+
+ # For the further usage.
+ def filter_candidates(self, candidates: List[ModuleCandidate]) -> List[ModuleCandidate]:
+ """ Filter candidates.
+ This function can be overridden by subclasses to filter candidates by other criteria.
+ Args:
+ candidates (List[ModuleCandidate]): A list of candidates to filter.
+ Returns:
+ List[ModuleCandidate]: A list of filtered candidates.
+ """
+ return candidates
+
+ # For the further usage, we decide to add the exploration rollouts to the exploration candidates, before proposing.
+ def add_exploration_rollouts_to_candidates(self, exploration_candidates: List[ModuleCandidate], samples: Samples):
+ """ Add the exploration rollouts to the exploration candidates.
+ """
+ matched_exploration_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates, samples.samples)
+ exploration_results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts)
+ for c, rollouts in matched_exploration_candidates_and_samples.items(): # rollouts is a list of BatchRollouts
+ exploration_results[c] = [ r for rr in rollouts for r in rr.to_list()]
+ for candidate, rollouts in exploration_results.items():
+ candidate.add_rollouts(rollouts)
diff --git a/opto/features/priority_search/search_template.py b/opto/features/priority_search/search_template.py
index ec244f74..616dd1ff 100644
--- a/opto/features/priority_search/search_template.py
+++ b/opto/features/priority_search/search_template.py
@@ -230,13 +230,13 @@ def train(self,
train_scores.append(info_sample['mean_score']) # so that mean can be computed
train_num_samples.append(info_sample['num_samples'])
+ self.n_samples += len(samples) # update the number of samples processed
if self.n_iters % log_frequency == 0:
avg_train_score = np.sum(np.array(train_scores) * np.array(train_num_samples)) / np.sum(train_num_samples)
self.logger.log('Algo/Average train score', avg_train_score, self.n_iters, color='blue')
self.log(info_update, prefix="Update/")
self.log(info_sample, prefix="Sample/")
- self.n_samples += len(samples) # update the number of samples processed
self.logger.log('Algo/Number of training samples', self.n_samples, self.n_iters, color='blue')
# Log parameters
for p in self.agent.parameters():
diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py
new file mode 100644
index 00000000..f18eb1ce
--- /dev/null
+++ b/opto/features/priority_search/summarizer.py
@@ -0,0 +1,148 @@
+from opto.optimizers.utils import print_color
+from opto.utils.llm import LLM # For the selector LLM
+import random
+import re
+
+
+def get_trajectory_of_one_rollout(rollout):
+ """
+ Convert a rollout into a structured markdown trajectory for optimization.
+
+ This function extracts the trainable parameters and formats the trajectory
+ to guide the optimizer in improving the module's performance.
+
+ Parameters
+ ----------
+ rollout : dict
+ A rollout dictionary containing:
+ - 'module': trace.Module - the agent module with trainable parameters
+ - 'x': Any - the input data
+ - 'info': Any - additional information about the input
+ - 'target': Any - the generated output
+ - 'score': float - evaluation score (0 = failed, 1 = success)
+ - 'feedback': Any - detailed feedback from the evaluation
+
+ Returns
+ -------
+ str
+ A markdown-formatted trajectory string for optimizer guidance.
+ """
+ assert rollout['module'] is not None, "rollout['module'] is None."
+ assert rollout['x'] is not None, "rollout['x'] is None."
+ assert rollout['target'] is not None, "rollout['target'] is None."
+ assert rollout['score'] is not None, "rollout['score'] is None."
+ assert rollout['feedback'] is not None, "rollout['feedback'] is None."
+
+ # Extract trainable parameters
+ parameters = rollout['module'].parameters()
+ parameters_dict = {p.py_name: p.data for p in parameters}
+
+ # Construct structured markdown trajectory
+ trajectory = f"""## Task Trajectory
+
+## Module Parameters
+{parameters_dict}
+
+## Input
+{rollout['x']}
+
+## Output
+{rollout['target']}
+
+## Result
+- **Score:** {rollout['score']}
+- **Feedback:** {rollout['feedback']}
+
+## Optimization Note
+Analyze what parameter patterns lead to successful vs. failed outputs.
+"""
+ return trajectory
+
+
+
+
+class Summarizer:
+ """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates.
+ """
+ def __init__(self, model_name: str = "gemini/gemini-2.0-flash"):
+ self.llm = LLM() # use the default model
+ self.max_candidates_in_prompt = 50
+
+ def _get_trajecories_for_memory(self, memory):
+ """
+ Get trajectories for the memory. Memory is a list of (neg_score, candidate) tuples.
+ We first collect rollouts from the each candidate, and then get the trajectories for each rollout.
+
+ Return one single string of all trajectories.
+ """
+ trajectories = []
+ print_color(f"Getting trajectories from {len(memory)} candidates.", "blue")
+ # copy a random shuffle of the memory
+ memory_with_rollouts = [(neg_score, candidate) for neg_score, candidate in memory if len([rollout for rollout in candidate.rollouts if rollout['score'] is not None]) > 0]
+ temporary_memory = random.sample(memory_with_rollouts, k=min(self.max_candidates_in_prompt, len(memory_with_rollouts)))
+ for _, candidate in temporary_memory:
+ rollouts = [rollout for rollout in candidate.rollouts if rollout['score'] is not None]
+ if len(rollouts) == 0:
+ continue
+ # For each candidate, add one (if exists) successful_rollout and one (if exists) failed_rollout.
+ candidate_update_dict = candidate.update_dict.values()
+ # print_color(f"Candidate pamameters: {candidate_update_dict}", "blue")# For debugging
+ prompt = f"Candidate pamameters: {candidate_update_dict}."
+ successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > 0]
+ failed_rollouts = [rollout for rollout in rollouts if rollout['score'] == 0]
+ if len(successful_rollouts) > 0:
+ random_successful_rollout = random.choice(successful_rollouts)
+ prompt += f"\nSuccessful trajectory: {get_trajectory_of_one_rollout(random_successful_rollout)}."
+ if len(failed_rollouts) > 0:
+ random_failed_rollout = random.choice(failed_rollouts)
+ prompt += f"\nFailed trajectory: {get_trajectory_of_one_rollout(random_failed_rollout)}."
+
+ trajectories.append(prompt)
+
+ print_color(f"Generated trajectories from {len(trajectories)} candidates.", "green")
+ return '\n'.join(trajectories)
+
+ def summarize(self, memory) -> str:
+ """Summarize the trajectories using the LLM.
+ Args:
+ memory: The memory containing trajectories to summarize.
+ Returns:
+ str: The summary.
+ """
+
+ history_trajectories = self._get_trajecories_for_memory(memory)
+
+ # print_color(f"History trajectories: {history_trajectories}", "green")
+
+ if len(history_trajectories) == 0:
+ return "No successful trajectories found for the memory."
+
+ system_prompt = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization."
+
+ user_prompt = f"""Analyze the following agent rollout trajectories and extract insights for optimization.
+
+ Trajectories:
+ {history_trajectories}
+
+ Provide your analysis in XML format:
+
+ Analyze the key patterns and strategies that led to success or failure in these trajectories.
+
+
+ Concrete recommendations for improving output quality based on successful or failed patterns observed in the trajectories.
+ """
+
+ prompt_messages = [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_prompt}
+ ]
+
+ response = self.llm(messages=prompt_messages)
+ response = response.choices[0].message.content
+
+ # Extract summary using XML regex
+ summary_match = re.search(r'(.*?)', response, re.DOTALL)
+
+ return summary_match.group(1).strip()
+
+