feat: MCP streamable http and sse transports

This commit is contained in:
Louis 2025-08-15 10:12:41 +07:00
parent 13a1969150
commit 25043dda7b
No known key found for this signature in database
GPG Key ID: 44FA9F4D33C37DE2
3 changed files with 251 additions and 113 deletions

View File

@ -1,7 +1,15 @@
use rmcp::{transport::TokioChildProcess, ServiceExt}; use rmcp::{
model::{ClientCapabilities, ClientInfo, Implementation},
transport::{
streamable_http_client::StreamableHttpClientTransportConfig, SseClientTransport,
StreamableHttpClientTransport, TokioChildProcess,
},
ServiceExt,
};
use serde_json::Value; use serde_json::Value;
use std::{collections::HashMap, env, sync::Arc, time::Duration}; use std::{collections::HashMap, env, sync::Arc, time::Duration};
use tauri::{AppHandle, Emitter, Manager, Runtime, State}; use tauri::{AppHandle, Emitter, Manager, Runtime, State};
use tauri_plugin_http::reqwest;
use tokio::{ use tokio::{
process::Command, process::Command,
sync::Mutex, sync::Mutex,
@ -12,8 +20,7 @@ use super::constants::{
MCP_BACKOFF_MULTIPLIER, MCP_BASE_RESTART_DELAY_MS, MCP_MAX_RESTART_DELAY_MS, MCP_BACKOFF_MULTIPLIER, MCP_BASE_RESTART_DELAY_MS, MCP_MAX_RESTART_DELAY_MS,
}; };
use crate::core::{ use crate::core::{
app::commands::get_jan_data_folder_path, app::commands::get_jan_data_folder_path, mcp::models::McpServerConfig, state::{AppState, RunningServiceEnum, SharedMcpServers}
state::{AppState, RunningServiceEnum, SharedMcpServers},
}; };
use jan_utils::can_override_npx; use jan_utils::can_override_npx;
@ -462,7 +469,7 @@ pub async fn start_restart_loop<R: Runtime>(
} }
} }
pub async fn schedule_mcp_start_task<R: Runtime>( async fn schedule_mcp_start_task<R: Runtime>(
app: tauri::AppHandle<R>, app: tauri::AppHandle<R>,
servers: SharedMcpServers, servers: SharedMcpServers,
name: String, name: String,
@ -475,141 +482,260 @@ pub async fn schedule_mcp_start_task<R: Runtime>(
.expect("Executable must have a parent directory"); .expect("Executable must have a parent directory");
let bin_path = exe_parent_path.to_path_buf(); let bin_path = exe_parent_path.to_path_buf();
let (command, args, envs) = extract_command_args(&config) let config_params = extract_command_args(&config)
.ok_or_else(|| format!("Failed to extract command args from config for {name}"))?; .ok_or_else(|| format!("Failed to extract command args from config for {name}"))?;
let mut cmd = Command::new(command.clone()); if config_params.transport_type.as_deref() == Some("http") && config_params.url.is_some() {
let transport = StreamableHttpClientTransport::with_client(
reqwest::Client::builder()
.default_headers({
// Map envs to request headers
let mut headers = reqwest::header::HeaderMap::new();
for (key, value) in config_params.envs.iter() {
if let Some(v_str) = value.as_str() {
// Try to map env keys to HTTP header names (case-insensitive)
// Most HTTP headers are Title-Case, so we try to convert
let header_name =
reqwest::header::HeaderName::from_bytes(key.as_bytes());
if let Ok(header_name) = header_name {
if let Ok(header_value) =
reqwest::header::HeaderValue::from_str(v_str)
{
headers.insert(header_name, header_value);
}
}
}
}
headers
})
.build()
.unwrap(),
StreamableHttpClientTransportConfig {
uri: config_params.url.unwrap().into(),
..Default::default()
},
);
if command == "npx" && can_override_npx() { let client_info = ClientInfo {
let mut cache_dir = app_path.clone(); protocol_version: Default::default(),
cache_dir.push(".npx"); capabilities: ClientCapabilities::default(),
let bun_x_path = format!("{}/bun", bin_path.display()); client_info: Implementation {
cmd = Command::new(bun_x_path); name: "Jan Streamable Client".to_string(),
cmd.arg("x"); version: "0.0.1".to_string(),
cmd.env("BUN_INSTALL", cache_dir.to_str().unwrap().to_string()); },
} };
let client = client_info.serve(transport).await.inspect_err(|e| {
log::error!("client error: {:?}", e);
});
if command == "uvx" { match client {
let mut cache_dir = app_path.clone(); Ok(client) => {
cache_dir.push(".uvx"); log::info!("Connected to server: {:?}", client.peer_info());
let bun_x_path = format!("{}/uv", bin_path.display()); servers
cmd = Command::new(bun_x_path); .lock()
cmd.arg("tool"); .await
cmd.arg("run"); .insert(name.clone(), RunningServiceEnum::WithInit(client));
cmd.env("UV_CACHE_DIR", cache_dir.to_str().unwrap().to_string());
}
#[cfg(windows)] // Mark server as successfully connected (for restart policy)
{ {
cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW: prevents shell window on Windows let app_state = app.state::<AppState>();
} let mut connected = app_state.mcp_successfully_connected.lock().await;
connected.insert(name.clone(), true);
let app_path_str = app_path.to_str().unwrap().to_string(); log::info!("Marked MCP server {} as successfully connected", name);
let log_file_path = format!("{}/logs/app.log", app_path_str); }
match std::fs::OpenOptions::new() }
.create(true) Err(e) => {
.append(true) log::error!("Failed to connect to server: {}", e);
.open(log_file_path) return Err(format!("Failed to connect to server: {}", e));
{ }
Ok(file) => {
cmd.stderr(std::process::Stdio::from(file));
} }
Err(err) => { } else if config_params.transport_type.as_deref() == Some("sse") && config_params.url.is_some() {
log::error!("Failed to open log file: {}", err); let transport = SseClientTransport::start_with_client(
} reqwest::Client::builder()
}; .default_headers({
// Map envs to request headers
cmd.kill_on_drop(true); let mut headers = reqwest::header::HeaderMap::new();
log::trace!("Command: {cmd:#?}"); for (key, value) in config_params.envs.iter() {
if let Some(v_str) = value.as_str() {
args.iter().filter_map(Value::as_str).for_each(|arg| { // Try to map env keys to HTTP header names (case-insensitive)
cmd.arg(arg); // Most HTTP headers are Title-Case, so we try to convert
}); let header_name =
envs.iter().for_each(|(k, v)| { reqwest::header::HeaderName::from_bytes(key.as_bytes());
if let Some(v_str) = v.as_str() { if let Ok(header_name) = header_name {
cmd.env(k, v_str); if let Ok(header_value) =
} reqwest::header::HeaderValue::from_str(v_str)
}); {
headers.insert(header_name, header_value);
let process = TokioChildProcess::new(cmd).map_err(|e| { }
log::error!("Failed to run command {name}: {e}"); }
format!("Failed to run command {name}: {e}") }
})?; }
headers
let service = () })
.serve(process) .build()
.unwrap(),
rmcp::transport::sse_client::SseClientConfig {
sse_endpoint: config_params.url.unwrap().into(),
..Default::default()
},
)
.await .await
.map_err(|e| format!("Failed to start MCP server {name}: {e}"))?; .map_err(|e| {
log::error!("transport error: {:?}", e);
format!("Failed to start SSE transport: {}", e)
})?;
// Get peer info and clone the needed values before moving the service let client_info = ClientInfo {
let (server_name, server_version) = { protocol_version: Default::default(),
capabilities: ClientCapabilities::default(),
client_info: Implementation {
name: "Jan SSE Client".to_string(),
version: "0.0.1".to_string(),
},
};
let client = client_info.serve(transport).await.map_err(|e| {
log::error!("client error: {:?}", e);
e.to_string()
});
match client {
Ok(client) => {
log::info!("Connected to server: {:?}", client.peer_info());
servers
.lock()
.await
.insert(name.clone(), RunningServiceEnum::WithInit(client));
// Mark server as successfully connected (for restart policy)
{
let app_state = app.state::<AppState>();
let mut connected = app_state.mcp_successfully_connected.lock().await;
connected.insert(name.clone(), true);
log::info!("Marked MCP server {} as successfully connected", name);
}
}
Err(e) => {
log::error!("Failed to connect to server: {}", e);
return Err(format!("Failed to connect to server: {}", e));
}
}
} else {
let mut cmd = Command::new(config_params.command.clone());
if config_params.command.clone() == "npx" && can_override_npx() {
let mut cache_dir = app_path.clone();
cache_dir.push(".npx");
let bun_x_path = format!("{}/bun", bin_path.display());
cmd = Command::new(bun_x_path);
cmd.arg("x");
cmd.env("BUN_INSTALL", cache_dir.to_str().unwrap().to_string());
}
if config_params.command.clone() == "uvx" {
let mut cache_dir = app_path.clone();
cache_dir.push(".uvx");
let bun_x_path = format!("{}/uv", bin_path.display());
cmd = Command::new(bun_x_path);
cmd.arg("tool");
cmd.arg("run");
cmd.env("UV_CACHE_DIR", cache_dir.to_str().unwrap().to_string());
}
#[cfg(windows)]
{
cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW: prevents shell window on Windows
}
let app_path_str = app_path.to_str().unwrap().to_string();
let log_file_path = format!("{}/logs/app.log", app_path_str);
match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_file_path)
{
Ok(file) => {
cmd.stderr(std::process::Stdio::from(file));
}
Err(err) => {
log::error!("Failed to open log file: {}", err);
}
};
cmd.kill_on_drop(true);
log::trace!("Command: {cmd:#?}");
config_params.args.iter().filter_map(Value::as_str).for_each(|arg| {
cmd.arg(arg);
});
config_params.envs.iter().for_each(|(k, v)| {
if let Some(v_str) = v.as_str() {
cmd.env(k, v_str);
}
});
let process = TokioChildProcess::new(cmd).map_err(|e| {
log::error!("Failed to run command {name}: {e}");
format!("Failed to run command {name}: {e}")
})?;
let service = ()
.serve(process)
.await
.map_err(|e| format!("Failed to start MCP server {name}: {e}"))?;
// Get peer info and clone the needed values before moving the service
let server_info = service.peer_info(); let server_info = service.peer_info();
log::trace!("Connected to server: {server_info:#?}"); log::trace!("Connected to server: {server_info:#?}");
(
server_info.unwrap().server_info.name.clone(),
server_info.unwrap().server_info.version.clone(),
)
};
// Now move the service into the HashMap // Now move the service into the HashMap
// Now move the service into the HashMap servers
servers .lock()
.lock() .await
.await .insert(name.clone(), RunningServiceEnum::NoInit(service));
.insert(name.clone(), RunningServiceEnum::NoInit(service)); log::info!("Server {name} started successfully.");
log::info!("Server {name} started successfully.");
log::info!("Server {name} started successfully.");
// Wait a short time to verify the server is stable before marking as connected // Wait a short time to verify the server is stable before marking as connected
// This prevents race conditions where the server quits immediately // This prevents race conditions where the server quits immediately
let verification_delay = Duration::from_millis(500); let verification_delay = Duration::from_millis(500);
sleep(verification_delay).await; sleep(verification_delay).await;
// Check if server is still running after the verification delay // Check if server is still running after the verification delay
let server_still_running = { let server_still_running = {
let servers_map = servers.lock().await; let servers_map = servers.lock().await;
servers_map.contains_key(&name) servers_map.contains_key(&name)
}; };
if !server_still_running { if !server_still_running {
return Err(format!( return Err(format!(
"MCP server {} quit immediately after starting", "MCP server {} quit immediately after starting",
name name
)); ));
}
// Mark server as successfully connected (for restart policy)
{
let app_state = app.state::<AppState>();
let mut connected = app_state.mcp_successfully_connected.lock().await;
connected.insert(name.clone(), true);
log::info!("Marked MCP server {} as successfully connected", name);
}
} }
// Mark server as successfully connected (for restart policy)
{
let app_state = app.state::<AppState>();
let mut connected = app_state.mcp_successfully_connected.lock().await;
connected.insert(name.clone(), true);
log::info!("Marked MCP server {} as successfully connected", name);
}
// Emit event to the frontend
let event = format!("mcp-connected");
let payload = serde_json::json!({
"name": server_name,
"version": server_version,
});
app.emit(&event, payload)
.map_err(|e| format!("Failed to emit event: {}", e))?;
Ok(()) Ok(())
} }
pub fn extract_command_args( pub fn extract_command_args(config: &Value) -> Option<McpServerConfig> {
config: &Value,
) -> Option<(String, Vec<Value>, serde_json::Map<String, Value>)> {
let obj = config.as_object()?; let obj = config.as_object()?;
let command = obj.get("command")?.as_str()?.to_string(); let command = obj.get("command")?.as_str()?.to_string();
let args = obj.get("args")?.as_array()?.clone(); let args = obj.get("args")?.as_array()?.clone();
let url = obj.get("url").and_then(|u| u.as_str()).map(String::from);
let transport_type = obj.get("type").and_then(|t| t.as_str()).map(String::from);
let envs = obj let envs = obj
.get("env") .get("env")
.unwrap_or(&Value::Object(serde_json::Map::new())) .unwrap_or(&Value::Object(serde_json::Map::new()))
.as_object()? .as_object()?
.clone(); .clone();
Some((command, args, envs)) Some(McpServerConfig {
transport_type,
url,
command,
args,
envs,
})
} }
pub fn extract_active_status(config: &Value) -> Option<bool> { pub fn extract_active_status(config: &Value) -> Option<bool> {

View File

@ -1,6 +1,7 @@
pub mod commands; pub mod commands;
mod constants; mod constants;
pub mod helpers; pub mod helpers;
pub mod models;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;

View File

@ -0,0 +1,11 @@
use serde_json::Value;
/// Configuration parameters extracted from MCP server config
#[derive(Debug, Clone)]
pub struct McpServerConfig {
pub transport_type: Option<String>,
pub url: Option<String>,
pub command: String,
pub args: Vec<Value>,
pub envs: serde_json::Map<String, Value>,
}