Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 127 additions & 152 deletions examples/streamable_http_client.rb
Original file line number Diff line number Diff line change
@@ -1,57 +1,35 @@
# frozen_string_literal: true

require "mcp"
require "mcp/client"
require "mcp/client/http"
require "mcp/client/tool"
require "net/http"
require "uri"
require "json"
require "logger"

# Logger for client operations
logger = Logger.new($stdout)
logger.formatter = proc do |severity, datetime, _progname, msg|
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
end
SERVER_URL = "http://localhost:9393"

# Server configuration
SERVER_URL = "http://localhost:9393/mcp"
PROTOCOL_VERSION = "2024-11-05"

# Helper method to make JSON-RPC requests
def make_request(session_id, method, params = {}, id = nil)
uri = URI(SERVER_URL)
http = Net::HTTP.new(uri.host, uri.port)

request = Net::HTTP::Post.new(uri)
request["Content-Type"] = "application/json"
request["Mcp-Session-Id"] = session_id if session_id

body = {
jsonrpc: "2.0",
method: method,
params: params,
id: id || SecureRandom.uuid,
}

request.body = body.to_json
response = http.request(request)

{
status: response.code,
headers: response.to_hash,
body: JSON.parse(response.body),
}
rescue => e
{ error: e.message }
# Logger for client operations
def create_logger
logger = Logger.new($stdout)
logger.formatter = proc do |severity, datetime, _progname, msg|
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
end
logger
end

# Connect to SSE stream
# Connect to SSE stream for real-time notifications
# Note: The SDK doesn't support SSE streaming yet, so we use raw Net::HTTP
def connect_sse(session_id, logger)
uri = URI(SERVER_URL)

logger.info("Connecting to SSE stream...")

Net::HTTP.start(uri.host, uri.port) do |http|
request = Net::HTTP::Get.new(uri)
request["Mcp-Session-Id"] = session_id
request["MCP-Session-Id"] = session_id
request["Accept"] = "text/event-stream"
request["Cache-Control"] = "no-cache"

Expand All @@ -62,14 +40,10 @@ def connect_sse(session_id, logger)
response.read_body do |chunk|
Copy link
Contributor

@atesgoral atesgoral Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use https://rubygems.org/gems/event_stream_parser for parsing. SSE appears easy to parse but it has weird edge cases.

Thanks for kicking off the effort! I've been planning on porting this standalone client I had written for a project, but never got around to doing it: https://gist.github.com/atesgoral/75172912b5951d9be33497b80aba4397

You can how event_stream_parser can be used for parsing chunks as they come in.

chunk.split("\n").each do |line|
if line.start_with?("data: ")
data = line[6..-1]
begin
logger.info("SSE data: #{data}")
rescue JSON::ParserError
logger.debug("Non-JSON SSE data: #{data}")
end
data = line[6..]
logger.info("SSE event: #{data}")
elsif line.start_with?(": ")
logger.debug("SSE keepalive received: #{line}")
logger.debug("SSE keepalive: #{line}")
end
end
end
Expand All @@ -79,125 +53,126 @@ def connect_sse(session_id, logger)
end
end
rescue Interrupt
logger.info("SSE connection interrupted by user")
logger.info("SSE connection interrupted")
rescue => e
logger.error("SSE connection error: #{e.message}")
end

# Main client flow
def main
logger = Logger.new($stdout)
logger.formatter = proc do |severity, datetime, _progname, msg|
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
end

puts "=== MCP SSE Test Client ==="

# Step 1: Initialize session
logger.info("Initializing session...")

init_response = make_request(
nil,
"initialize",
{
protocolVersion: PROTOCOL_VERSION,
capabilities: {},
clientInfo: {
name: "sse-test-client",
version: "1.0",
},
},
"init-1",
)

if init_response[:error]
logger.error("Failed to initialize: #{init_response[:error]}")
exit(1)
end

session_id = init_response[:headers]["mcp-session-id"]&.first

if session_id.nil?
logger.error("No session ID received")
exit(1)
end

logger.info("Session initialized: #{session_id}")
logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}")

# Step 2: Start SSE connection in a separate thread
sse_thread = Thread.new { connect_sse(session_id, logger) }

# Give SSE time to connect
sleep(1)

# Step 3: Interactive menu
loop do
puts <<~MESSAGE.chomp

=== Available Actions ===
1. Send custom notification
2. Test echo
3. List tools
0. Exit

Choose an action:#{" "}
MESSAGE

choice = gets.chomp

case choice
when "1"
print("Enter notification message: ")
message = gets.chomp
print("Enter delay in seconds (0 for immediate): ")
delay = gets.chomp.to_f

response = make_request(
session_id,
"tools/call",
{
name: "notification_tool",
arguments: {
message: message,
delay: delay,
},
},
)
if response[:body]["accepted"]
logger.info("Notification sent successfully")
logger = create_logger

puts <<~MESSAGE
MCP Streamable HTTP Client (SDK + SSE)
Make sure the server is running (ruby examples/streamable_http_server.rb)
#{"=" * 60}
MESSAGE

# Initialize SDK client
transport = MCP::Client::HTTP.new(url: SERVER_URL)
client = MCP::Client.new(transport: transport)

begin
# Initialize session using SDK
puts "=== Initializing session ==="
init_response = client.connect(
client_info: { name: "streamable-http-client", version: "1.0" },
)
puts "Session ID: #{client.session_id}"
puts "Protocol Version: #{client.protocol_version}"
puts "Server Info: #{init_response.dig("result", "serverInfo")}"

# Get available tools BEFORE establishing SSE connection
# (Once SSE is active, server sends responses via SSE stream, not POST response)
puts "=== Listing tools ==="
tools = client.tools
tools.each { |t| puts " - #{t.name}: #{t.description}" }

echo_tool = tools.find { |t| t.name == "echo" }
notification_tool = tools.find { |t| t.name == "notification_tool" }

# Start SSE connection in a separate thread (uses raw HTTP)
# Note: After this, server responses will be sent via SSE, not POST
sse_thread = Thread.new { connect_sse(client.session_id, logger) }

# Give SSE time to connect
sleep(1)

# Interactive menu
loop do
puts <<~MENU.chomp

=== Available Actions ===
1. Send notification (triggers SSE event)
2. Echo message
3. List tools
0. Exit

Choose an action:#{" "}
MENU

choice = gets&.chomp

case choice
when "1"
if notification_tool
print("Enter notification message: ")
message = gets&.chomp || "Test"
print("Enter delay in seconds (0 for immediate): ")
delay = (gets&.chomp || "0").to_f

puts "=== Calling tool: notification_tool ==="
response = client.call_tool(
tool: notification_tool,
arguments: { message: message, delay: delay },
)
puts "Response: #{JSON.pretty_generate(response)}"
else
puts "notification_tool not available"
end
when "2"
if echo_tool
print("Enter message to echo: ")
message = gets&.chomp || "Hello"

puts "=== Calling tool: echo ==="
response = client.call_tool(tool: echo_tool, arguments: { message: message })
puts "Response: #{JSON.pretty_generate(response)}"
else
puts "echo tool not available"
end
when "3"
puts "=== Listing tools ==="
puts "(Note: Response will appear in SSE stream when active)"
client.tools.each do |tool|
puts " - #{tool.name}: #{tool.description}"
end
when "0", nil
logger.info("Exiting...")
break
else
logger.error("Error: #{response[:body]["error"]}")
puts "Invalid choice"
end
when "2"
print("Enter message to echo: ")
message = gets.chomp
make_request(session_id, "tools/call", { name: "echo", arguments: { message: message } })
when "3"
make_request(session_id, "tools/list")
when "0"
logger.info("Exiting...")
break
else
puts "Invalid choice"
end
rescue MCP::Client::SessionExpiredError => e
logger.error("Session expired: #{e.message}")
rescue MCP::Client::RequestHandlerError => e
logger.error("Request error: #{e.message}")
rescue Interrupt
logger.info("Client interrupted")
rescue => e
logger.error("Error: #{e.message}")
logger.error(e.backtrace.first(5).join("\n"))
ensure
# Clean up SSE thread
sse_thread&.kill if sse_thread&.alive?

# Close session using SDK
puts "=== Closing session ==="
client.close
puts "Session closed"
end

# Clean up
sse_thread.kill if sse_thread.alive?

# Close session
logger.info("Closing session...")
make_request(session_id, "close")
logger.info("Session closed")
rescue Interrupt
logger.info("Client interrupted by user")
rescue => e
logger.error("Client error: #{e.message}")
logger.error(e.backtrace.join("\n"))
end

# Run the client
if __FILE__ == $PROGRAM_NAME
main
end
2 changes: 1 addition & 1 deletion examples/streamable_http_server.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../lib", __dir__))
# Usage: bundle exec ruby examples/streamable_http_server.rb
require "mcp"
require "rackup"
require "json"
Expand Down
Loading