-
Notifications
You must be signed in to change notification settings - Fork 0
Add analysis queue for batch binary processing #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Introduces `ghidra queue` subcommands (add, list, remove, wait) that let users/agents queue globs of binaries for import+analysis. The daemon processes queued items sequentially through the Ghidra bridge. The `wait` subcommand blocks until all analysis completes, allowing agents to wait hours/days without wasting tokens. - New CLI: queue add/list/remove/wait with glob expansion (glob crate) - New daemon module: analysis_queue.rs with background processor - IPC protocol extended with QueueAdd/QueueList/QueueRemove/QueueStatus - Daemon handler and IPC server updated to pass queue state https://claude.ai/code/session_01XthrEQcMmW3AePfHbgeUpm
Summary of ChangesHello @akiselev, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust analysis queue system designed to streamline the batch processing of binaries using the Ghidra bridge. It provides a structured way to manage multiple analysis tasks, ensuring sequential execution due to the bridge's single-threaded nature, and offers user-friendly CLI commands for interaction and monitoring. The primary goal is to enhance the tool's capability for handling large sets of binaries efficiently. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a robust analysis queue for batch processing binaries, which is a great feature. The implementation is solid, using Tokio for asynchronous processing. My review focuses on a critical race condition in how queue items are processed, some performance improvements for queue manipulation, and general code clarity enhancements. Addressing these points will make the queue system even more reliable and efficient.
| async fn process_entry(&self, idx: usize) { | ||
| let (path, project, program) = { | ||
| let entries = self.entries.lock().await; | ||
| let entry = &entries[idx]; | ||
| ( | ||
| entry.path.clone(), | ||
| entry.project.clone(), | ||
| entry.program.clone().unwrap_or_else(|| "program".to_string()), | ||
| ) | ||
| }; | ||
|
|
||
| info!( | ||
| "Processing queue entry: {} (project={}, program={})", | ||
| path.display(), | ||
| project, | ||
| program | ||
| ); | ||
|
|
||
| let result = self.import_and_analyze(&path, &project, &program).await; | ||
|
|
||
| let mut entries = self.entries.lock().await; | ||
| if idx < entries.len() { | ||
| entries[idx].finished_at = Some(Utc::now()); | ||
| match result { | ||
| Ok(()) => { | ||
| entries[idx].status = QueueEntryStatus::Completed; | ||
| info!("Completed analysis of: {}", path.display()); | ||
| } | ||
| Err(e) => { | ||
| let error_msg = format!("{:#}", e); | ||
| entries[idx].status = QueueEntryStatus::Failed { | ||
| error: error_msg.clone(), | ||
| }; | ||
| error!("Failed to analyze {}: {}", path.display(), error_msg); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a critical race condition here. The idx of a queue entry is not stable if other entries are removed from the Vec by the remove function while this entry is being processed. This could lead to updating the wrong entry or a panic.
To fix this, you should use a stable identifier, like the file path, to identify and update the entry. This requires changes to take_next_pending to return a PathBuf instead of an index, and updating processor_loop to use it.
Here's the suggested change for process_entry. You'll need to adjust take_next_pending and processor_loop accordingly. For example, take_next_pending should be changed to return Option<PathBuf> and processor_loop should handle this.
async fn process_entry(&self, path: PathBuf) {
let (project, program) = {
let entries = self.entries.lock().await;
// The entry should always be found if take_next_pending logic is correct.
let entry = entries.iter().find(|e| e.path == path).expect("Queue entry not found after taking it");
(
entry.project.clone(),
entry.program.clone().unwrap_or_else(|| "program".to_string()),
)
};
info!(
"Processing queue entry: {} (project={}, program={})",
path.display(),
project,
program
);
let result = self.import_and_analyze(&path, &project, &program).await;
let mut entries = self.entries.lock().await;
if let Some(entry) = entries.iter_mut().find(|e| e.path == path) {
entry.finished_at = Some(Utc::now());
match result {
Ok(()) => {
entry.status = QueueEntryStatus::Completed;
info!("Completed analysis of: {}", path.display());
}
Err(e) => {
let error_msg = format!("{:#}", e);
entry.status = QueueEntryStatus::Failed {
error: error_msg.clone(),
};
error!("Failed to analyze {}: {}", path.display(), error_msg);
}
}
}
}| pub async fn add(&self, paths: Vec<PathBuf>, project: String) -> usize { | ||
| let mut entries = self.entries.lock().await; | ||
| let mut added = 0; | ||
|
|
||
| for path in paths { | ||
| // Skip if already in queue (by path) | ||
| let already_exists = entries.iter().any(|e| e.path == path); | ||
| if already_exists { | ||
| warn!("Skipping duplicate: {}", path.display()); | ||
| continue; | ||
| } | ||
|
|
||
| let program = path | ||
| .file_stem() | ||
| .and_then(|s| s.to_str()) | ||
| .map(|s| s.to_string()); | ||
|
|
||
| entries.push(QueueEntry { | ||
| path, | ||
| status: QueueEntryStatus::Pending, | ||
| project: project.clone(), | ||
| program, | ||
| added_at: Utc::now(), | ||
| started_at: None, | ||
| finished_at: None, | ||
| }); | ||
| added += 1; | ||
| } | ||
|
|
||
| if added > 0 { | ||
| self.work_notify.notify_one(); | ||
| } | ||
|
|
||
| added | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking for duplicates with entries.iter().any() inside a loop has O(N*M) complexity, where N is the number of existing entries and M is the number of paths being added. For better performance, you can first collect existing paths into a HashSet for O(1) average time complexity lookups. This reduces the overall complexity to O(N+M).
pub async fn add(&self, paths: Vec<PathBuf>, project: String) -> usize {
use std::collections::HashSet;
let mut entries = self.entries.lock().await;
let existing_paths: HashSet<_> = entries.iter().map(|e| &e.path).collect();
let mut added = 0;
for path in paths {
if existing_paths.contains(&path) {
warn!("Skipping duplicate: {}", path.display());
continue;
}
let program = path
.file_stem()
.and_then(|s| s.to_str())
.map(|s| s.to_string());
entries.push(QueueEntry {
path,
status: QueueEntryStatus::Pending,
project: project.clone(),
program,
added_at: Utc::now(),
started_at: None,
finished_at: None,
});
added += 1;
}
if added > 0 {
self.work_notify.notify_one();
}
added
}| pub async fn remove(&self, paths: &[PathBuf]) -> usize { | ||
| let mut entries = self.entries.lock().await; | ||
| let before = entries.len(); | ||
|
|
||
| entries.retain(|e| { | ||
| if e.status == QueueEntryStatus::Pending && paths.contains(&e.path) { | ||
| false // Remove it | ||
| } else { | ||
| true // Keep it | ||
| } | ||
| }); | ||
|
|
||
| before - entries.len() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The paths.contains() check inside retain results in O(N*M) complexity, where N is the number of queue entries and M is the number of paths to remove. This can be inefficient for large queues or many paths. You can improve performance to O(N+M) by converting paths to a HashSet before the retain loop.
| pub async fn remove(&self, paths: &[PathBuf]) -> usize { | |
| let mut entries = self.entries.lock().await; | |
| let before = entries.len(); | |
| entries.retain(|e| { | |
| if e.status == QueueEntryStatus::Pending && paths.contains(&e.path) { | |
| false // Remove it | |
| } else { | |
| true // Keep it | |
| } | |
| }); | |
| before - entries.len() | |
| } | |
| pub async fn remove(&self, paths: &[PathBuf]) -> usize { | |
| use std::collections::HashSet; | |
| let paths_to_remove: HashSet<_> = paths.iter().collect(); | |
| if paths_to_remove.is_empty() { | |
| return 0; | |
| } | |
| let mut entries = self.entries.lock().await; | |
| let before = entries.len(); | |
| entries.retain(|e| { | |
| !(e.status == QueueEntryStatus::Pending && paths_to_remove.contains(&e.path)) | |
| }); | |
| before - entries.len() | |
| } |
| async fn import_and_analyze( | ||
| &self, | ||
| binary_path: &PathBuf, | ||
| project: &str, | ||
| program: &str, | ||
| ) -> Result<()> { | ||
| use serde_json::json; | ||
|
|
||
| let binary_path_str = binary_path | ||
| .to_str() | ||
| .ok_or_else(|| anyhow::anyhow!("Invalid path: {}", binary_path.display()))?; | ||
|
|
||
| // Import | ||
| { | ||
| let mut bridge_guard = self.bridge.lock().await; | ||
| let bridge = bridge_guard | ||
| .as_mut() | ||
| .ok_or_else(|| anyhow::anyhow!("Bridge not initialized"))?; | ||
|
|
||
| if !bridge.is_running() { | ||
| anyhow::bail!("Bridge is not running"); | ||
| } | ||
|
|
||
| info!("Importing: {}", binary_path_str); | ||
| let response = bridge.send_command::<serde_json::Value>( | ||
| "import", | ||
| Some(json!({ | ||
| "binary_path": binary_path_str, | ||
| "project": project, | ||
| "program": program, | ||
| })), | ||
| )?; | ||
|
|
||
| if response.status != "success" { | ||
| let msg = response | ||
| .message | ||
| .unwrap_or_else(|| "Import failed".to_string()); | ||
| anyhow::bail!("Import failed: {}", msg); | ||
| } | ||
| } | ||
|
|
||
| // Analyze | ||
| { | ||
| let mut bridge_guard = self.bridge.lock().await; | ||
| let bridge = bridge_guard | ||
| .as_mut() | ||
| .ok_or_else(|| anyhow::anyhow!("Bridge not initialized"))?; | ||
|
|
||
| if !bridge.is_running() { | ||
| anyhow::bail!("Bridge is not running"); | ||
| } | ||
|
|
||
| info!("Analyzing: {}", program); | ||
| let response = bridge.send_command::<serde_json::Value>( | ||
| "analyze", | ||
| Some(json!({ | ||
| "project": project, | ||
| "program": program, | ||
| })), | ||
| )?; | ||
|
|
||
| if response.status != "success" { | ||
| let msg = response | ||
| .message | ||
| .unwrap_or_else(|| "Analysis failed".to_string()); | ||
| anyhow::bail!("Analysis failed: {}", msg); | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function acquires the bridge lock twice. Since the analysis queue processes items sequentially, it's safe and more efficient to acquire the lock once and hold it for both the import and analyze commands. This also reduces code duplication for getting the bridge and checking if it's running.
async fn import_and_analyze(
&self,
binary_path: &PathBuf,
project: &str,
program: &str,
) -> Result<()> {
use serde_json::json;
let binary_path_str = binary_path
.to_str()
.ok_or_else(|| anyhow::anyhow!("Invalid path: {}", binary_path.display()))?;
let mut bridge_guard = self.bridge.lock().await;
let bridge = bridge_guard
.as_mut()
.ok_or_else(|| anyhow::anyhow!("Bridge not initialized"))?;
if !bridge.is_running() {
anyhow::bail!("Bridge is not running");
}
// Import
info!("Importing: {}", binary_path_str);
let response = bridge.send_command::<serde_json::Value>(
"import",
Some(json!({
"binary_path": binary_path_str,
"project": project,
"program": program,
})),
)?;
if response.status != "success" {
let msg = response
.message
.unwrap_or_else(|| "Import failed".to_string());
anyhow::bail!("Import failed: {}", msg);
}
// Analyze
info!("Analyzing: {}", program);
let response = bridge.send_command::<serde_json::Value>(
"analyze",
Some(json!({
"project": project,
"program": program,
})),
)?;
if response.status != "success" {
let msg = response
.message
.unwrap_or_else(|| "Analysis failed".to_string());
anyhow::bail!("Analysis failed: {}", msg);
}
Ok(())
}| QueueCommands::Wait(args) => { | ||
| println!("Waiting for analysis queue to complete..."); | ||
| let interval = | ||
| tokio::time::Duration::from_secs(args.interval); | ||
| loop { | ||
| let status = client.queue_status().await?; | ||
| let all_done = status | ||
| .get("all_done") | ||
| .and_then(|v| v.as_bool()) | ||
| .unwrap_or(true); | ||
| let total = status | ||
| .get("total") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0); | ||
| let completed = status | ||
| .get("completed") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0); | ||
| let failed = status | ||
| .get("failed") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0); | ||
| let pending = status | ||
| .get("pending") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0); | ||
| let analyzing = status | ||
| .get("analyzing") | ||
| .and_then(|v| v.as_u64()) | ||
| .unwrap_or(0); | ||
|
|
||
| if total == 0 { | ||
| println!("Queue is empty."); | ||
| return Ok(String::new()); | ||
| } | ||
|
|
||
| eprint!( | ||
| "\r Progress: {}/{} completed, {} failed, {} pending, {} analyzing", | ||
| completed, total, failed, pending, analyzing | ||
| ); | ||
|
|
||
| if all_done { | ||
| eprintln!(); | ||
| println!( | ||
| "All done! {} completed, {} failed out of {} total.", | ||
| completed, failed, total | ||
| ); | ||
| return Ok(String::new()); | ||
| } | ||
|
|
||
| tokio::time::sleep(interval).await; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Wait command handler manually parses the JSON response from queue_status. It would be cleaner and safer to deserialize the JSON into the QueueStatusSummary struct, which is already defined and used on the server side. This avoids magic strings for field names and makes the code more robust against changes in the response structure.
QueueCommands::Wait(args) => {
println!("Waiting for analysis queue to complete...");
let interval =
tokio::time::Duration::from_secs(args.interval);
use crate::daemon::analysis_queue::QueueStatusSummary;
loop {
let status_val = client.queue_status().await?;
let status: QueueStatusSummary = serde_json::from_value(status_val)?;
if status.total == 0 {
println!("Queue is empty.");
return Ok(String::new());
}
eprint!(
"\r Progress: {}/{} completed, {} failed, {} pending, {} analyzing",
status.completed, status.total, status.failed, status.pending, status.analyzing
);
if status.all_done {
eprintln!();
println!(
"All done! {} completed, {} failed out of {} total.",
status.completed, status.failed, status.total
);
return Ok(String::new());
}
tokio::time::sleep(interval).await;
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0923340388
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| match glob::glob(pattern) { | ||
| Ok(entries) => { | ||
| for entry in entries.flatten() { | ||
| let abs = if entry.is_absolute() { | ||
| entry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Match queue-remove patterns against queued paths
queue remove expands patterns via glob::glob, which only yields paths that exist on disk. If a binary is queued and then deleted or moved before removal, the expansion returns no matches and the daemon receives an empty list, so the pending entry can never be removed. This makes queue maintenance impossible for missing files; consider sending raw patterns to the daemon and matching against queued paths (or falling back to literal patterns when a glob yields zero matches).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a comprehensive analysis queue system that enables batch processing of binaries through the Ghidra bridge. The feature allows users to add multiple binaries via glob patterns, monitor queue status, remove items, and wait for completion with progress tracking. The implementation follows the existing daemon architecture and provides both a command-line interface and IPC protocol support.
Changes:
- Added new
AnalysisQueuemodule for managing sequential binary analysis with status tracking and background processing - Introduced four new CLI subcommands under
queue:add,list,remove, andwaitwith glob pattern support - Extended IPC protocol with queue-specific commands and integrated with daemon lifecycle
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/daemon/analysis_queue.rs | Core queue implementation with sequential processor, status tracking, and unit tests |
| src/cli.rs | Added Queue command enum with Add, List, Remove, and Wait subcommands |
| src/ipc/protocol.rs | Added QueueAdd, QueueList, QueueRemove, and QueueStatus IPC commands |
| src/ipc/client.rs | Added client methods for queue operations (queue_add, queue_list, queue_remove, queue_status) |
| src/daemon/mod.rs | Integrated queue creation and processor startup in daemon initialization |
| src/daemon/ipc_server.rs | Updated IPC server to pass queue instance to command handlers |
| src/daemon/handler.rs | Added queue command routing to appropriate queue methods |
| src/main.rs | Implemented client-side glob expansion and command execution for all queue subcommands |
| Cargo.toml | Added glob 0.3 dependency for pattern matching |
| Cargo.lock | Updated with glob 0.3.3 package entry |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| QueueCommands::List => { | ||
| client.queue_list().await? | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The QueueList command returns raw JSON data from the daemon, unlike the Add, Remove, and Wait commands which format their output with custom println! statements before returning Ok(String::new()).
This means queue list will output the raw JSON response that goes through the default formatter (lines 504-520), which may not be user-friendly. The output will be something like:
{"entries": [...]}
For consistency with the other queue commands and better user experience, consider handling the QueueList result in the match statement and formatting it appropriately before returning, similar to how Add (lines 388-395), Remove (line 427), and Wait (lines 466-477) are handled.
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[tokio::test] | ||
| async fn test_queue_add_and_list() { | ||
| let bridge = Arc::new(Mutex::new(None)); | ||
| let queue = AnalysisQueue::new(bridge); | ||
|
|
||
| let added = queue | ||
| .add( | ||
| vec![PathBuf::from("/tmp/binary1"), PathBuf::from("/tmp/binary2")], | ||
| "test-project".to_string(), | ||
| ) | ||
| .await; | ||
| assert_eq!(added, 2); | ||
|
|
||
| let entries = queue.list().await; | ||
| assert_eq!(entries.len(), 2); | ||
| assert_eq!(entries[0].path, PathBuf::from("/tmp/binary1")); | ||
| assert_eq!(entries[1].path, PathBuf::from("/tmp/binary2")); | ||
| assert_eq!(entries[0].status, QueueEntryStatus::Pending); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_queue_skip_duplicates() { | ||
| let bridge = Arc::new(Mutex::new(None)); | ||
| let queue = AnalysisQueue::new(bridge); | ||
|
|
||
| queue | ||
| .add( | ||
| vec![PathBuf::from("/tmp/binary1")], | ||
| "test-project".to_string(), | ||
| ) | ||
| .await; | ||
|
|
||
| let added = queue | ||
| .add( | ||
| vec![PathBuf::from("/tmp/binary1")], | ||
| "test-project".to_string(), | ||
| ) | ||
| .await; | ||
| assert_eq!(added, 0); | ||
|
|
||
| let entries = queue.list().await; | ||
| assert_eq!(entries.len(), 1); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_queue_remove() { | ||
| let bridge = Arc::new(Mutex::new(None)); | ||
| let queue = AnalysisQueue::new(bridge); | ||
|
|
||
| queue | ||
| .add( | ||
| vec![PathBuf::from("/tmp/binary1"), PathBuf::from("/tmp/binary2")], | ||
| "test-project".to_string(), | ||
| ) | ||
| .await; | ||
|
|
||
| let removed = queue.remove(&[PathBuf::from("/tmp/binary1")]).await; | ||
| assert_eq!(removed, 1); | ||
|
|
||
| let entries = queue.list().await; | ||
| assert_eq!(entries.len(), 1); | ||
| assert_eq!(entries[0].path, PathBuf::from("/tmp/binary2")); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_queue_status() { | ||
| let bridge = Arc::new(Mutex::new(None)); | ||
| let queue = AnalysisQueue::new(bridge); | ||
|
|
||
| let status = queue.status().await; | ||
| assert_eq!(status.total, 0); | ||
| assert!(status.all_done); | ||
|
|
||
| queue | ||
| .add( | ||
| vec![PathBuf::from("/tmp/binary1")], | ||
| "test-project".to_string(), | ||
| ) | ||
| .await; | ||
|
|
||
| let status = queue.status().await; | ||
| assert_eq!(status.total, 1); | ||
| assert_eq!(status.pending, 1); | ||
| assert!(!status.all_done); | ||
| } | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR adds a significant new feature (analysis queue) but doesn't include integration tests. Looking at the test suite structure (tests/daemon_tests.rs, tests/batch_tests.rs, etc.), other daemon-related features have comprehensive integration tests.
The unit tests in analysis_queue.rs (lines 355-444) only cover basic queue operations with a nil bridge, but don't test:
- Actual import/analyze operations through the queue
- Glob expansion and pattern matching
- Error handling when bridge operations fail
- Queue processor behavior under load
- Client-daemon interaction for queue commands
Consider adding integration tests that cover the main user workflows like:
- Adding files via glob patterns
- Listing queue status with various states
- Removing items from the queue
- Waiting for queue completion
- Error scenarios (invalid paths, bridge failures, etc.)
| /// Internal processor loop. | ||
| async fn processor_loop(&self) { | ||
| loop { | ||
| // Wait for work notification | ||
| self.work_notify.notified().await; | ||
|
|
||
| // Process all pending items | ||
| loop { | ||
| let next = self.take_next_pending().await; | ||
| match next { | ||
| Some(idx) => { | ||
| self.process_entry(idx).await; | ||
| // Notify waiters after each completion | ||
| self.done_notify.notify_waiters(); | ||
| } | ||
| None => break, // No more pending items | ||
| } | ||
| } | ||
| } | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The processor loop runs indefinitely without any shutdown mechanism. When the daemon shuts down (src/daemon/mod.rs:121), it sends a shutdown signal and stops the IPC server, but the queue processor task continues running forever.
While Tokio will terminate the task when the process exits, this means:
- The processor could be in the middle of importing/analyzing a binary when shutdown occurs
- There's no graceful cleanup or chance to complete in-progress work
- Queue state (especially entries marked as "Analyzing") won't be properly finalized
Consider adding:
- A shutdown channel/flag that the processor loop checks
- Logic to finish the current entry before stopping
- Setting any "Analyzing" entries back to "Pending" or "Failed" on shutdown
- Passing the shutdown signal to the queue in daemon/mod.rs similar to the IPC server
This would ensure graceful shutdown and prevent leaving entries in an inconsistent state.
| /// Import and analyze a binary through the Ghidra bridge. | ||
| async fn import_and_analyze( | ||
| &self, | ||
| binary_path: &PathBuf, | ||
| project: &str, | ||
| program: &str, | ||
| ) -> Result<()> { | ||
| use serde_json::json; | ||
|
|
||
| let binary_path_str = binary_path | ||
| .to_str() | ||
| .ok_or_else(|| anyhow::anyhow!("Invalid path: {}", binary_path.display()))?; | ||
|
|
||
| // Import | ||
| { | ||
| let mut bridge_guard = self.bridge.lock().await; | ||
| let bridge = bridge_guard | ||
| .as_mut() | ||
| .ok_or_else(|| anyhow::anyhow!("Bridge not initialized"))?; | ||
|
|
||
| if !bridge.is_running() { | ||
| anyhow::bail!("Bridge is not running"); | ||
| } | ||
|
|
||
| info!("Importing: {}", binary_path_str); | ||
| let response = bridge.send_command::<serde_json::Value>( | ||
| "import", | ||
| Some(json!({ | ||
| "binary_path": binary_path_str, | ||
| "project": project, | ||
| "program": program, | ||
| })), | ||
| )?; | ||
|
|
||
| if response.status != "success" { | ||
| let msg = response | ||
| .message | ||
| .unwrap_or_else(|| "Import failed".to_string()); | ||
| anyhow::bail!("Import failed: {}", msg); | ||
| } | ||
| } | ||
|
|
||
| // Analyze | ||
| { | ||
| let mut bridge_guard = self.bridge.lock().await; | ||
| let bridge = bridge_guard | ||
| .as_mut() | ||
| .ok_or_else(|| anyhow::anyhow!("Bridge not initialized"))?; | ||
|
|
||
| if !bridge.is_running() { | ||
| anyhow::bail!("Bridge is not running"); | ||
| } | ||
|
|
||
| info!("Analyzing: {}", program); | ||
| let response = bridge.send_command::<serde_json::Value>( | ||
| "analyze", | ||
| Some(json!({ | ||
| "project": project, | ||
| "program": program, | ||
| })), | ||
| )?; | ||
|
|
||
| if response.status != "success" { | ||
| let msg = response | ||
| .message | ||
| .unwrap_or_else(|| "Analysis failed".to_string()); | ||
| anyhow::bail!("Analysis failed: {}", msg); | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bridge lock is acquired and released twice (once for import at lines 285-310, once for analyze at lines 314-338). While this works correctly, it's inefficient and could theoretically allow other operations to interleave between import and analyze.
Consider combining both operations into a single lock acquisition:
let mut bridge_guard = self.bridge.lock().await;
let bridge = bridge_guard.as_mut()...;
// Import
let import_response = bridge.send_command(...)?;
// Analyze
let analyze_response = bridge.send_command(...)?;This would:
- Reduce lock contention
- Ensure import and analyze are atomic
- Be slightly more efficient
However, this is a minor optimization and not critical since the queue processes sequentially.
| #![allow(dead_code)] | ||
|
|
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The #![allow(dead_code)] attribute is present at the module level. This is typically added during development to suppress warnings, but shouldn't remain in production code.
Looking at the module, most items are actually used:
AnalysisQueuestruct and its methods are used by the daemonQueueEntryandQueueEntryStatusare serialized/returned via IPCQueueStatusSummaryis returned by the status endpoint
The only potentially unused item is done_notify() method (line 192), but even this is likely intended for future use or is part of the public API.
Consider:
- Removing this attribute and addressing any actual dead code warnings individually
- If specific items need to be kept for API completeness, mark only those items with
#[allow(dead_code)]
Having module-wide suppression makes it harder to notice when code actually becomes unused over time.
| #![allow(dead_code)] |
| QueueCommands::Remove(args) => { | ||
| // Expand globs on the client side | ||
| let mut paths = Vec::new(); | ||
| for pattern in &args.patterns { | ||
| match glob::glob(pattern) { | ||
| Ok(entries) => { | ||
| for entry in entries.flatten() { | ||
| let abs = if entry.is_absolute() { | ||
| entry | ||
| } else { | ||
| std::env::current_dir()?.join(&entry) | ||
| }; | ||
| paths.push(abs.to_string_lossy().to_string()); | ||
| } | ||
| } | ||
| Err(_) => { | ||
| // Treat as literal path if not a valid glob | ||
| paths.push(pattern.clone()); | ||
| } | ||
| } | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The glob expansion logic in the Remove command is inconsistent with the Add command. Specifically:
- It doesn't check if entries are files using
is_file(), so it could include directories - It silently falls back to treating invalid globs as literal paths (line 415-418), whereas Add reports warnings for invalid patterns
- It doesn't track or report whether any patterns matched
This inconsistency could lead to confusing behavior where users expect similar error messages and validation between Add and Remove operations. Consider aligning the logic by:
- Adding
is_file()check to filter out directories (as done in Add at line 341) - Reporting warnings for invalid glob patterns (as done in Add at lines 368-373)
- Warning when no files match a pattern (as done in Add at lines 361-366)
| async fn process_entry(&self, idx: usize) { | ||
| let (path, project, program) = { | ||
| let entries = self.entries.lock().await; | ||
| let entry = &entries[idx]; | ||
| ( | ||
| entry.path.clone(), | ||
| entry.project.clone(), | ||
| entry.program.clone().unwrap_or_else(|| "program".to_string()), | ||
| ) | ||
| }; | ||
|
|
||
| info!( | ||
| "Processing queue entry: {} (project={}, program={})", | ||
| path.display(), | ||
| project, | ||
| program | ||
| ); | ||
|
|
||
| let result = self.import_and_analyze(&path, &project, &program).await; | ||
|
|
||
| let mut entries = self.entries.lock().await; | ||
| if idx < entries.len() { | ||
| entries[idx].finished_at = Some(Utc::now()); | ||
| match result { | ||
| Ok(()) => { | ||
| entries[idx].status = QueueEntryStatus::Completed; | ||
| info!("Completed analysis of: {}", path.display()); | ||
| } | ||
| Err(e) => { | ||
| let error_msg = format!("{:#}", e); | ||
| entries[idx].status = QueueEntryStatus::Failed { | ||
| error: error_msg.clone(), | ||
| }; | ||
| error!("Failed to analyze {}: {}", path.display(), error_msg); | ||
| } | ||
| } | ||
| } |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential issue where the array index could be out of bounds if entries are removed concurrently. While process_entry is called sequentially by the processor loop, the entries vector could be modified by other operations (like remove()) between when the index is obtained in take_next_pending() and when it's used in process_entry().
The check at line 252 if idx < entries.len() guards against panics, but if the wrong entry is at that index (because entries shifted), the status update could be applied to the wrong queue entry.
Consider one of these approaches:
- Use a unique ID for each entry instead of relying on indices
- Lock the entries for the entire duration from
take_next_pending()throughprocess_entry() - Verify the entry at the index is still in "Analyzing" state before updating it
While this is unlikely to occur in practice since remove() only removes Pending entries (line 126), it's a fragile design that could break if future changes allow removing entries in other states.
|
|
||
| Command::QueueAdd { paths, project } => { | ||
| let paths: Vec<PathBuf> = paths.into_iter().map(PathBuf::from).collect(); | ||
| let project = project.unwrap_or_else(|| "queue-project".to_string()); |
Copilot
AI
Jan 28, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default project name "queue-project" is hardcoded here, but this differs from the established pattern used elsewhere in the codebase. For example:
- Import command uses "quick-analysis" as the default (src/main.rs:200)
- Quick command also uses "quick-analysis" as the default (src/main.rs:230)
- The
resolve_projectfunction (src/main.rs:1095-1100) generates project names based on the program name with pattern "{program}-project"
Consider either:
- Using "quick-analysis" for consistency with other import operations
- Using a similar pattern like "batch-project" or "queue-analysis"
- Documenting why a different default is used here
The current hardcoded value could confuse users who expect consistent defaults across import operations.
Summary
This PR introduces an analysis queue system that enables batch processing of binaries through the Ghidra bridge. The queue allows users to add multiple binaries via glob patterns, list queued items, remove items, and wait for all analyses to complete.
Key Changes
New
analysis_queuemodule (src/daemon/analysis_queue.rs): Core queue implementation with support for:New
QueueCLI command with subcommands:queue add <patterns>: Add binaries matching glob patterns to the queuequeue list: Display all queued items and their statusqueue remove <patterns>: Remove pending items from the queuequeue wait: Block until all queued analyses complete with progress reportingIPC protocol extensions (
src/ipc/protocol.rs):QueueAdd,QueueList,QueueRemove, andQueueStatuscommandsDaemon integration:
Dependencies: Added
glob = "0.3"crate for pattern matchingImplementation Details
https://claude.ai/code/session_01XthrEQcMmW3AePfHbgeUpm