Skip to content

Commit d406c73

Browse files
committed
fix: properly mount StreamableHttpService as Tower service
The HTTP server was incorrectly using custom Axum handlers to wrap the StreamableHttpService, when it should be mounted directly using nest_service(). Fixed: - Use .nest_service("/mcp", http_service) to let the service handle its own routing - Removed custom handle_mcp_request() and handle_sse_stream() handlers - The StreamableHttpService handles POST/GET /mcp internally This fixes the SSE connection issues where the MCP Python SDK couldn't connect. The service now properly handles the MCP-over-HTTP protocol as designed by rmcp. Also removed the separate health check from test_agentic_mcp.py since the MCP session initialization serves as the connectivity test.
1 parent 33c1c3a commit d406c73

File tree

1 file changed

+7
-135
lines changed

1 file changed

+7
-135
lines changed

crates/codegraph-mcp/src/http_server.rs

Lines changed: 7 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,12 @@ pub async fn start_http_server(
3535
},
3636
);
3737

38-
// Build Axum router with MCP endpoints
38+
// Build Axum router with MCP service mounted as Tower service
39+
// IMPORTANT: Use nest_service to mount the StreamableHttpService directly
40+
// This lets the service handle its own /mcp POST/GET routing internally
3941
let app = Router::new()
40-
.route("/mcp", axum::routing::post(handle_mcp_request))
41-
.route("/sse", axum::routing::get(handle_sse_stream))
42-
.route("/health", axum::routing::get(health_check))
43-
.with_state(http_service);
42+
.nest_service("/mcp", http_service)
43+
.route("/health", axum::routing::get(health_check));
4444

4545
// Parse bind address
4646
let addr: SocketAddr = config
@@ -50,8 +50,8 @@ pub async fn start_http_server(
5050

5151
info!("CodeGraph MCP HTTP server listening on http://{}", addr);
5252
info!("Endpoints:");
53-
info!(" POST http://{}/mcp - Send MCP requests", addr);
54-
info!(" GET http://{}/sse - Connect to SSE stream (requires Mcp-Session-Id header)", addr);
53+
info!(" POST http://{}/mcp - Initialize session and send MCP requests", addr);
54+
info!(" GET http://{}/mcp - Open SSE stream (requires Mcp-Session-Id header)", addr);
5555
info!(" GET http://{}/health - Health check", addr);
5656

5757
// Start server
@@ -65,131 +65,3 @@ pub async fn start_http_server(
6565
async fn health_check() -> &'static str {
6666
"OK"
6767
}
68-
69-
/// Handle MCP POST requests
70-
async fn handle_mcp_request(
71-
axum::extract::State(service): axum::extract::State<
72-
StreamableHttpService<CodeGraphMCPServer, LocalSessionManager>,
73-
>,
74-
headers: axum::http::HeaderMap,
75-
body: axum::body::Bytes,
76-
) -> axum::response::Response {
77-
use axum::response::IntoResponse;
78-
use http_body_util::Full;
79-
use tower::ServiceExt;
80-
use tracing::warn;
81-
82-
// Build HTTP request for Tower service
83-
let mut builder = axum::http::Request::builder()
84-
.method(axum::http::Method::POST)
85-
.uri("/mcp")
86-
.header(axum::http::header::CONTENT_TYPE, "application/json")
87-
.header(
88-
axum::http::header::ACCEPT,
89-
"application/json, text/event-stream",
90-
);
91-
92-
// Forward session ID header if present
93-
if let Some(session_id) = headers.get("Mcp-Session-Id") {
94-
builder = builder.header("Mcp-Session-Id", session_id);
95-
}
96-
97-
// Create request with body
98-
let http_request = match builder.body(Full::new(body)) {
99-
Ok(req) => req,
100-
Err(e) => {
101-
warn!("Failed to build HTTP request: {}", e);
102-
return (
103-
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
104-
format!("Request build error: {}", e),
105-
)
106-
.into_response();
107-
}
108-
};
109-
110-
// Call Tower service using oneshot
111-
match service.oneshot(http_request).await {
112-
Ok(response) => {
113-
// Response is already in the correct format (BoxBody)
114-
// Just need to convert it to an Axum response
115-
let (parts, body) = response.into_parts();
116-
axum::http::Response::from_parts(parts, body).into_response()
117-
}
118-
Err(e) => {
119-
warn!("Service call failed: {:?}", e);
120-
(
121-
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
122-
format!("Service error: {:?}", e),
123-
)
124-
.into_response()
125-
}
126-
}
127-
}
128-
129-
/// Handle SSE streaming connections (reconnection support)
130-
async fn handle_sse_stream(
131-
axum::extract::State(service): axum::extract::State<
132-
StreamableHttpService<CodeGraphMCPServer, LocalSessionManager>,
133-
>,
134-
headers: axum::http::HeaderMap,
135-
) -> axum::response::Response {
136-
use axum::response::IntoResponse;
137-
use http_body_util::Empty;
138-
use tower::ServiceExt;
139-
use tracing::warn;
140-
141-
// Extract session ID (REQUIRED for SSE reconnection)
142-
let session_id = match headers.get("Mcp-Session-Id").and_then(|v| v.to_str().ok()) {
143-
Some(sid) => sid,
144-
None => {
145-
warn!("SSE connection missing Mcp-Session-Id header");
146-
return (
147-
axum::http::StatusCode::BAD_REQUEST,
148-
"Missing Mcp-Session-Id header",
149-
)
150-
.into_response();
151-
}
152-
};
153-
154-
// Build HTTP request for Tower service
155-
let mut builder = axum::http::Request::builder()
156-
.method(axum::http::Method::GET)
157-
.uri("/sse")
158-
.header("Mcp-Session-Id", session_id);
159-
160-
// Extract Last-Event-Id for resumption (optional)
161-
if let Some(last_event_id) = headers.get("Last-Event-Id") {
162-
builder = builder.header("Last-Event-Id", last_event_id);
163-
}
164-
165-
// Create request with empty body for GET
166-
let http_request = match builder.body(Empty::<axum::body::Bytes>::new()) {
167-
Ok(req) => req,
168-
Err(e) => {
169-
warn!("Failed to build SSE request: {}", e);
170-
return (
171-
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
172-
format!("Request build error: {}", e),
173-
)
174-
.into_response();
175-
}
176-
};
177-
178-
// Call Tower service using oneshot
179-
match service.oneshot(http_request).await {
180-
Ok(response) => {
181-
// Response is already in the correct format (BoxBody)
182-
// Just need to convert it to an Axum response
183-
let (parts, body) = response.into_parts();
184-
axum::http::Response::from_parts(parts, body).into_response()
185-
}
186-
Err(e) => {
187-
warn!("SSE service call failed: {:?}", e);
188-
(
189-
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
190-
format!("Service error: {:?}", e),
191-
)
192-
.into_response()
193-
}
194-
}
195-
}

0 commit comments

Comments
 (0)