diff --git a/src-tauri/src/core/mcp.rs b/src-tauri/src/core/mcp.rs index f9509c8e5..e340a9aaa 100644 --- a/src-tauri/src/core/mcp.rs +++ b/src-tauri/src/core/mcp.rs @@ -3,8 +3,12 @@ use rmcp::{service::RunningService, transport::TokioChildProcess, RoleClient, Se use serde_json::{Map, Value}; use std::fs; use std::{collections::HashMap, env, sync::Arc, time::Duration}; -use tauri::{AppHandle, Emitter, Runtime, State}; -use tokio::{process::Command, sync::Mutex, time::timeout}; +use tauri::{AppHandle, Emitter, Manager, Runtime, State}; +use tokio::{ + process::Command, + sync::Mutex, + time::{sleep, timeout}, +}; use super::{cmd::get_jan_data_folder_path, state::AppState}; @@ -51,6 +55,58 @@ const DEFAULT_MCP_CONFIG: &str = r#"{ // Timeout for MCP tool calls (30 seconds) const MCP_TOOL_CALL_TIMEOUT: Duration = Duration::from_secs(30); +// MCP server restart configuration with exponential backoff +const MCP_BASE_RESTART_DELAY_MS: u64 = 1000; // Start with 1 second +const MCP_MAX_RESTART_DELAY_MS: u64 = 30000; // Cap at 30 seconds +const MCP_BACKOFF_MULTIPLIER: f64 = 2.0; // Double the delay each time + +/// Calculate exponential backoff delay with jitter +/// +/// # Arguments +/// * `attempt` - The current restart attempt number (1-based) +/// +/// # Returns +/// * `u64` - Delay in milliseconds, capped at MCP_MAX_RESTART_DELAY_MS +fn calculate_exponential_backoff_delay(attempt: u32) -> u64 { + use std::cmp; + + // Calculate base exponential delay: base_delay * multiplier^(attempt-1) + let exponential_delay = (MCP_BASE_RESTART_DELAY_MS as f64) + * MCP_BACKOFF_MULTIPLIER.powi((attempt - 1) as i32); + + // Cap the delay at maximum + let capped_delay = cmp::min(exponential_delay as u64, MCP_MAX_RESTART_DELAY_MS); + + // Add jitter (±25% randomness) to prevent thundering herd + let jitter_range = (capped_delay as f64 * 0.25) as u64; + let jitter = if jitter_range > 0 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + // Use attempt number as seed for deterministic but varied jitter + let mut hasher = DefaultHasher::new(); + attempt.hash(&mut hasher); + let hash = hasher.finish(); + + // Convert hash to jitter value in range [-jitter_range, +jitter_range] + let jitter_offset = (hash % (jitter_range * 2)) as i64 - jitter_range as i64; + jitter_offset + } else { + 0 + }; + + // Apply jitter while ensuring delay stays positive and within bounds + let final_delay = cmp::max( + 100, // Minimum 100ms delay + cmp::min( + MCP_MAX_RESTART_DELAY_MS, + (capped_delay as i64 + jitter) as u64 + ) + ); + + final_delay +} + /// Runs MCP commands by reading configuration from a JSON file and initializing servers /// /// # Arguments @@ -70,46 +126,361 @@ pub async fn run_mcp_commands( "Load MCP configs from {}", app_path_str.clone() + "/mcp_config.json" ); - let config_content = std::fs::read_to_string(app_path_str.clone() + "/mcp_config.json") - .map_err(|e| format!("Failed to read config file: {}", e))?; + let config_content = std::fs::read_to_string(app_path_str + "/mcp_config.json") + .map_err(|e| format!("Failed to read config file: {e}"))?; let mcp_servers: serde_json::Value = serde_json::from_str(&config_content) - .map_err(|e| format!("Failed to parse config: {}", e))?; + .map_err(|e| format!("Failed to parse config: {e}"))?; - if let Some(server_map) = mcp_servers.get("mcpServers").and_then(Value::as_object) { - log::trace!("MCP Servers: {server_map:#?}"); + let server_map = mcp_servers + .get("mcpServers") + .and_then(Value::as_object) + .ok_or("No mcpServers found in config")?; - for (name, config) in server_map { - if let Some(false) = extract_active_status(config) { - log::trace!("Server {name} is not active, skipping."); - continue; + log::trace!("MCP Servers: {server_map:#?}"); + + // Collect handles for initial server startup + let mut startup_handles = Vec::new(); + + for (name, config) in server_map { + if extract_active_status(config) == Some(false) { + log::trace!("Server {name} is not active, skipping."); + continue; + } + + let app_clone = app.clone(); + let servers_clone = servers_state.clone(); + let name_clone = name.clone(); + let config_clone = config.clone(); + + // Spawn task for initial startup attempt + let handle = tokio::spawn(async move { + // Only wait for the initial startup attempt, not the monitoring + let result = start_mcp_server_with_restart( + app_clone.clone(), + servers_clone.clone(), + name_clone.clone(), + config_clone.clone(), + Some(3), // Default max restarts for startup + ).await; + + // If initial startup failed, we still want to continue with other servers + if let Err(e) = &result { + log::error!("Initial startup failed for MCP server {}: {}", name_clone, e); } - match start_mcp_server( - app.clone(), - servers_state.clone(), - name.clone(), - config.clone(), - ) - .await - { - Ok(_) => { - log::info!("Server {name} activated successfully."); - } - Err(e) => { - let _ = app.emit( - "mcp-error", - format!("Failed to activate MCP server {name}: {e}"), - ); - log::error!("Failed to activate server {name}: {e}"); - continue; // Skip to the next server + + (name_clone, result) + }); + + startup_handles.push(handle); + } + + // Wait for all initial startup attempts to complete + let mut successful_count = 0; + let mut failed_count = 0; + + for handle in startup_handles { + match handle.await { + Ok((name, result)) => { + match result { + Ok(_) => { + log::info!("MCP server {} initialized successfully", name); + successful_count += 1; + } + Err(e) => { + log::error!("MCP server {} failed to initialize: {}", name, e); + failed_count += 1; + } } } + Err(e) => { + log::error!("Failed to join startup task: {}", e); + failed_count += 1; + } } } + + log::info!( + "MCP server initialization complete: {} successful, {} failed", + successful_count, + failed_count + ); Ok(()) } +/// Monitor MCP server health without removing it from the HashMap +async fn monitor_mcp_server_handle( + servers_state: Arc>>>, + name: String, +) -> Option { + log::info!("Monitoring MCP server {} health", name); + + // Monitor server health with periodic checks + loop { + // Small delay between health checks + sleep(Duration::from_secs(5)).await; + + // Check if server is still healthy by trying to list tools + let health_check_result = { + let servers = servers_state.lock().await; + if let Some(service) = servers.get(&name) { + // Try to list tools as a health check with a short timeout + match timeout(Duration::from_secs(2), service.list_all_tools()).await { + Ok(Ok(_)) => { + // Server responded successfully + true + } + Ok(Err(e)) => { + log::warn!("MCP server {} health check failed: {}", name, e); + false + } + Err(_) => { + log::warn!("MCP server {} health check timed out", name); + false + } + } + } else { + // Server was removed from HashMap (e.g., by deactivate_mcp_server) + log::info!("MCP server {} no longer in running services", name); + return Some(rmcp::service::QuitReason::Closed); + } + }; + + if !health_check_result { + // Server failed health check - remove it and return + log::error!("MCP server {} failed health check, removing from active servers", name); + let mut servers = servers_state.lock().await; + if let Some(service) = servers.remove(&name) { + // Try to cancel the service gracefully + let _ = service.cancel().await; + } + return Some(rmcp::service::QuitReason::Closed); + } + } +} + +/// Starts an MCP server with restart monitoring (similar to cortex restart) +/// Returns the result of the first start attempt, then continues with restart monitoring +async fn start_mcp_server_with_restart( + app: AppHandle, + servers_state: Arc>>>, + name: String, + config: Value, + max_restarts: Option, +) -> Result<(), String> { + let app_state = app.state::(); + let restart_counts = app_state.mcp_restart_counts.clone(); + let active_servers_state = app_state.mcp_active_servers.clone(); + let successfully_connected = app_state.mcp_successfully_connected.clone(); + + // Store active server config for restart purposes + store_active_server_config(&active_servers_state, &name, &config).await; + + let max_restarts = max_restarts.unwrap_or(5); + + // Try the first start attempt and return its result + log::info!("Starting MCP server {} (Initial attempt)", name); + let first_start_result = schedule_mcp_start_task( + app.clone(), + servers_state.clone(), + name.clone(), + config.clone(), + ).await; + + match first_start_result { + Ok(_) => { + log::info!("MCP server {} started successfully on first attempt", name); + reset_restart_count(&restart_counts, &name).await; + + // Check if server was marked as successfully connected (passed verification) + let was_verified = { + let connected = successfully_connected.lock().await; + connected.get(&name).copied().unwrap_or(false) + }; + + if was_verified { + // Only spawn monitoring task if server passed verification + spawn_server_monitoring_task( + app, + servers_state, + name, + config, + max_restarts, + restart_counts, + successfully_connected, + ).await; + + Ok(()) + } else { + // Server failed verification, don't monitor for restarts + log::error!("MCP server {} failed verification after startup", name); + Err(format!("MCP server {} failed verification after startup", name)) + } + } + Err(e) => { + log::error!("Failed to start MCP server {} on first attempt: {}", name, e); + Err(e) + } + } +} + +/// Helper function to handle the restart loop logic +async fn start_restart_loop( + app: AppHandle, + servers_state: Arc>>>, + name: String, + config: Value, + max_restarts: u32, + restart_counts: Arc>>, + successfully_connected: Arc>>, +) { + loop { + let current_restart_count = { + let mut counts = restart_counts.lock().await; + let count = counts.entry(name.clone()).or_insert(0); + *count += 1; + *count + }; + + if current_restart_count > max_restarts { + log::error!( + "MCP server {} reached maximum restart attempts ({}). Giving up.", + name, + max_restarts + ); + if let Err(e) = app.emit("mcp_max_restarts_reached", + serde_json::json!({ + "server": name, + "max_restarts": max_restarts + }) + ) { + log::error!("Failed to emit mcp_max_restarts_reached event: {e}"); + } + break; + } + + log::info!( + "Restarting MCP server {} (Attempt {}/{})", + name, + current_restart_count, + max_restarts + ); + + // Calculate exponential backoff delay + let delay_ms = calculate_exponential_backoff_delay(current_restart_count); + log::info!( + "Waiting {}ms before restart attempt {} for MCP server {}", + delay_ms, + current_restart_count, + name + ); + sleep(Duration::from_millis(delay_ms)).await; + + // Attempt to restart the server + let start_result = schedule_mcp_start_task( + app.clone(), + servers_state.clone(), + name.clone(), + config.clone(), + ).await; + + match start_result { + Ok(_) => { + log::info!("MCP server {} restarted successfully.", name); + + // Check if server passed verification (was marked as successfully connected) + let passed_verification = { + let connected = successfully_connected.lock().await; + connected.get(&name).copied().unwrap_or(false) + }; + + if !passed_verification { + log::error!( + "MCP server {} failed verification after restart - stopping permanently", + name + ); + break; + } + + // Reset restart count on successful restart with verification + { + let mut counts = restart_counts.lock().await; + if let Some(count) = counts.get_mut(&name) { + if *count > 0 { + log::info!( + "MCP server {} restarted successfully, resetting restart count from {} to 0.", + name, + *count + ); + *count = 0; + } + } + } + + // Monitor the server again + let quit_reason = monitor_mcp_server_handle( + servers_state.clone(), + name.clone(), + ).await; + + log::info!("MCP server {} quit with reason: {:?}", name, quit_reason); + + // Check if server was marked as successfully connected + let was_connected = { + let connected = successfully_connected.lock().await; + connected.get(&name).copied().unwrap_or(false) + }; + + // Only continue restart loop if server was previously connected + if !was_connected { + log::error!( + "MCP server {} failed before establishing successful connection - stopping permanently", + name + ); + break; + } + + // Determine if we should restart based on quit reason + let should_restart = match quit_reason { + Some(reason) => { + log::warn!("MCP server {} terminated unexpectedly: {:?}", name, reason); + true + } + None => { + log::info!("MCP server {} was manually stopped - not restarting", name); + false + } + }; + + if !should_restart { + break; + } + // Continue the loop for another restart attempt + } + Err(e) => { + log::error!("Failed to restart MCP server {}: {}", name, e); + + // Check if server was marked as successfully connected before + let was_connected = { + let connected = successfully_connected.lock().await; + connected.get(&name).copied().unwrap_or(false) + }; + + // Only continue restart attempts if server was previously connected + if !was_connected { + log::error!( + "MCP server {} failed restart and was never successfully connected - stopping permanently", + name + ); + break; + } + // Continue the loop for another restart attempt + } + } + } +} + #[tauri::command] pub async fn activate_mcp_server( app: tauri::AppHandle, @@ -119,10 +490,12 @@ pub async fn activate_mcp_server( ) -> Result<(), String> { let servers: Arc>>> = state.mcp_servers.clone(); - start_mcp_server(app, servers, name, config).await + + // Use the modified start_mcp_server_with_restart that returns first attempt result + start_mcp_server_with_restart(app, servers, name, config, Some(3)).await } -async fn start_mcp_server( +async fn schedule_mcp_start_task( app: tauri::AppHandle, servers: Arc>>>, name: String, @@ -134,113 +507,159 @@ async fn start_mcp_server( .parent() .expect("Executable must have a parent directory"); let bin_path = exe_parent_path.to_path_buf(); - if let Some((command, args, envs)) = extract_command_args(&config) { - let mut cmd = Command::new(command.clone()); - if command.clone() == "npx" { - let mut cache_dir = app_path.clone(); - cache_dir.push(".npx"); - let bun_x_path = format!("{}/bun", bin_path.display()); - cmd = Command::new(bun_x_path); - cmd.arg("x"); - cmd.env("BUN_INSTALL", cache_dir.to_str().unwrap().to_string()); - } + + let (command, args, envs) = extract_command_args(&config) + .ok_or_else(|| format!("Failed to extract command args from config for {name}"))?; - if command.clone() == "uvx" { - let mut cache_dir = app_path.clone(); - cache_dir.push(".uvx"); - let bun_x_path = format!("{}/uv", bin_path.display()); - cmd = Command::new(bun_x_path); - cmd.arg("tool"); - cmd.arg("run"); - cmd.env("UV_CACHE_DIR", cache_dir.to_str().unwrap().to_string()); - } - #[cfg(windows)] - { - cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW: prevents shell window on Windows - } - let app_path_str = app_path.to_str().unwrap().to_string(); - let log_file_path = format!("{}/logs/app.log", app_path_str); - match std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(log_file_path) - { - Ok(file) => { - cmd.stderr(std::process::Stdio::from(file)); - } - Err(err) => { - log::error!("Failed to open log file: {}", err); - } - }; - - cmd.kill_on_drop(true); - - log::trace!("Command: {cmd:#?}"); - - args.iter().filter_map(Value::as_str).for_each(|arg| { - cmd.arg(arg); - }); - envs.iter().for_each(|(k, v)| { - if let Some(v_str) = v.as_str() { - cmd.env(k, v_str); - } - }); - - let process = TokioChildProcess::new(cmd); - match process { - Ok(p) => { - let service = ().serve(p).await; - - match service { - Ok(running_service) => { - // Get peer info and clone the needed values before moving the service - let (server_name, server_version) = { - let server_info = running_service.peer_info(); - log::trace!("Connected to server: {server_info:#?}"); - ( - server_info.server_info.name.clone(), - server_info.server_info.version.clone(), - ) - }; - - // Now move the service into the HashMap - servers.lock().await.insert(name.clone(), running_service); - log::info!("Server {name} started successfully."); - - // Emit event to the frontend - let event = format!("mcp-connected"); - let payload = serde_json::json!({ - "name": server_name, - "version": server_version, - }); - app.emit(&event, payload) - .map_err(|e| format!("Failed to emit event: {}", e))?; - } - Err(e) => { - return Err(format!("Failed to start MCP server {name}: {e}")); - } - } - } - Err(e) => { - log::error!("Failed to run command {name}: {e}"); - return Err(format!("Failed to run command {name}: {e}")); - } - } + let mut cmd = Command::new(command.clone()); + + if command == "npx" { + let mut cache_dir = app_path.clone(); + cache_dir.push(".npx"); + let bun_x_path = format!("{}/bun", bin_path.display()); + cmd = Command::new(bun_x_path); + cmd.arg("x"); + cmd.env("BUN_INSTALL", cache_dir.to_str().unwrap().to_string()); } + + if command == "uvx" { + let mut cache_dir = app_path.clone(); + cache_dir.push(".uvx"); + let bun_x_path = format!("{}/uv", bin_path.display()); + cmd = Command::new(bun_x_path); + cmd.arg("tool"); + cmd.arg("run"); + cmd.env("UV_CACHE_DIR", cache_dir.to_str().unwrap().to_string()); + } + + #[cfg(windows)] + { + cmd.creation_flags(0x08000000); // CREATE_NO_WINDOW: prevents shell window on Windows + } + + let app_path_str = app_path.to_str().unwrap().to_string(); + let log_file_path = format!("{}/logs/app.log", app_path_str); + match std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(log_file_path) + { + Ok(file) => { + cmd.stderr(std::process::Stdio::from(file)); + } + Err(err) => { + log::error!("Failed to open log file: {}", err); + } + }; + + cmd.kill_on_drop(true); + log::trace!("Command: {cmd:#?}"); + + args.iter().filter_map(Value::as_str).for_each(|arg| { + cmd.arg(arg); + }); + envs.iter().for_each(|(k, v)| { + if let Some(v_str) = v.as_str() { + cmd.env(k, v_str); + } + }); + + let process = TokioChildProcess::new(cmd) + .map_err(|e| { + log::error!("Failed to run command {name}: {e}"); + format!("Failed to run command {name}: {e}") + })?; + + let service = ().serve(process).await + .map_err(|e| format!("Failed to start MCP server {name}: {e}"))?; + + // Get peer info and clone the needed values before moving the service + let (server_name, server_version) = { + let server_info = service.peer_info(); + log::trace!("Connected to server: {server_info:#?}"); + ( + server_info.server_info.name.clone(), + server_info.server_info.version.clone(), + ) + }; + + // Now move the service into the HashMap + servers.lock().await.insert(name.clone(), service); + log::info!("Server {name} started successfully."); + + // Wait a short time to verify the server is stable before marking as connected + // This prevents race conditions where the server quits immediately + let verification_delay = Duration::from_millis(500); + sleep(verification_delay).await; + + // Check if server is still running after the verification delay + let server_still_running = { + let servers_map = servers.lock().await; + servers_map.contains_key(&name) + }; + + if !server_still_running { + return Err(format!("MCP server {} quit immediately after starting", name)); + } + + // Mark server as successfully connected (for restart policy) + { + let app_state = app.state::(); + let mut connected = app_state.mcp_successfully_connected.lock().await; + connected.insert(name.clone(), true); + log::info!("Marked MCP server {} as successfully connected", name); + } + + // Emit event to the frontend + let event = format!("mcp-connected"); + let payload = serde_json::json!({ + "name": server_name, + "version": server_version, + }); + app.emit(&event, payload) + .map_err(|e| format!("Failed to emit event: {}", e))?; + Ok(()) } #[tauri::command] pub async fn deactivate_mcp_server(state: State<'_, AppState>, name: String) -> Result<(), String> { + log::info!("Deactivating MCP server: {}", name); + + // First, mark server as manually deactivated to prevent restart + // Remove from active servers list to prevent restart + { + let mut active_servers = state.mcp_active_servers.lock().await; + active_servers.remove(&name); + log::info!("Removed MCP server {} from active servers list", name); + } + + // Mark as not successfully connected to prevent restart logic + { + let mut connected = state.mcp_successfully_connected.lock().await; + connected.insert(name.clone(), false); + log::info!("Marked MCP server {} as not successfully connected", name); + } + + // Reset restart count + { + let mut counts = state.mcp_restart_counts.lock().await; + counts.remove(&name); + log::info!("Reset restart count for MCP server {}", name); + } + + // Now remove and stop the server let servers = state.mcp_servers.clone(); let mut servers_map = servers.lock().await; - if let Some(service) = servers_map.remove(&name) { - service.cancel().await.map_err(|e| e.to_string())?; - log::info!("Server {name} stopped successfully."); - } else { - return Err(format!("Server {} not found", name)); - } + let service = servers_map.remove(&name) + .ok_or_else(|| format!("Server {} not found", name))?; + + // Release the lock before calling cancel + drop(servers_map); + + service.cancel().await.map_err(|e| e.to_string())?; + log::info!("Server {name} stopped successfully and marked as deactivated."); Ok(()) } @@ -270,11 +689,83 @@ pub async fn restart_mcp_servers(app: AppHandle, state: State<'_, AppState>) -> // Stop the servers stop_mcp_servers(state.mcp_servers.clone()).await?; - // Restart the servers - run_mcp_commands(&app, servers).await?; + // Restart only previously active servers (like cortex) + restart_active_mcp_servers(&app, servers).await?; app.emit("mcp-update", "MCP servers updated") - .map_err(|e| format!("Failed to emit event: {}", e)) + .map_err(|e| format!("Failed to emit event: {}", e))?; + + Ok(()) +} + +/// Restart only servers that were previously active (like cortex restart behavior) +pub async fn restart_active_mcp_servers( + app: &AppHandle, + servers_state: Arc>>>, +) -> Result<(), String> { + let app_state = app.state::(); + let active_servers = app_state.mcp_active_servers.lock().await; + + log::info!("Restarting {} previously active MCP servers", active_servers.len()); + + for (name, config) in active_servers.iter() { + log::info!("Restarting MCP server: {}", name); + + // Start server with restart monitoring - spawn async task + let app_clone = app.clone(); + let servers_clone = servers_state.clone(); + let name_clone = name.clone(); + let config_clone = config.clone(); + + tauri::async_runtime::spawn(async move { + let _ = start_mcp_server_with_restart( + app_clone, + servers_clone, + name_clone, + config_clone, + Some(3), // Default max restarts for startup + ).await; + }); + } + + Ok(()) +} + +/// Handle app quit - stop all MCP servers cleanly (like cortex cleanup) +pub async fn handle_app_quit(state: &AppState) -> Result<(), String> { + log::info!("App quitting - stopping all MCP servers cleanly"); + + // Stop all running MCP servers + stop_mcp_servers(state.mcp_servers.clone()).await?; + + // Clear active servers and restart counts + { + let mut active_servers = state.mcp_active_servers.lock().await; + active_servers.clear(); + } + { + let mut restart_counts = state.mcp_restart_counts.lock().await; + restart_counts.clear(); + } + + log::info!("All MCP servers stopped cleanly"); + Ok(()) +} + +/// Reset MCP restart count for a specific server (like cortex reset) +#[tauri::command] +pub async fn reset_mcp_restart_count(state: State<'_, AppState>, server_name: String) -> Result<(), String> { + let mut counts = state.mcp_restart_counts.lock().await; + + let count = match counts.get_mut(&server_name) { + Some(count) => count, + None => return Ok(()), // Server not found, nothing to reset + }; + + let old_count = *count; + *count = 0; + log::info!("MCP server {} restart count reset from {} to 0.", server_name, old_count); + Ok(()) } pub async fn stop_mcp_servers( @@ -290,6 +781,7 @@ pub async fn stop_mcp_servers( drop(servers_map); // Release the lock after stopping Ok(()) } + #[tauri::command] pub async fn get_connected_servers( _app: AppHandle, @@ -366,31 +858,31 @@ pub async fn call_tool( // Iterate through servers and find the first one that contains the tool for (_, service) in servers.iter() { - if let Ok(tools) = service.list_all_tools().await { - if tools.iter().any(|t| t.name == tool_name) { - println!("Found tool {} in server", tool_name); + let tools = match service.list_all_tools().await { + Ok(tools) => tools, + Err(_) => continue, // Skip this server if we can't list tools + }; - // Call the tool with timeout - let tool_call = service.call_tool(CallToolRequestParam { - name: tool_name.clone().into(), - arguments, - }); - - return match timeout(MCP_TOOL_CALL_TIMEOUT, tool_call).await { - Ok(result) => { - match result { - Ok(ok_result) => Ok(ok_result), - Err(e) => Err(e.to_string()), - } - } - Err(_) => Err(format!( - "Tool call '{}' timed out after {} seconds", - tool_name, - MCP_TOOL_CALL_TIMEOUT.as_secs() - )), - }; - } + if !tools.iter().any(|t| t.name == tool_name) { + continue; // Tool not found in this server, try next } + + println!("Found tool {} in server", tool_name); + + // Call the tool with timeout + let tool_call = service.call_tool(CallToolRequestParam { + name: tool_name.clone().into(), + arguments, + }); + + return match timeout(MCP_TOOL_CALL_TIMEOUT, tool_call).await { + Ok(result) => result.map_err(|e| e.to_string()), + Err(_) => Err(format!( + "Tool call '{}' timed out after {} seconds", + tool_name, + MCP_TOOL_CALL_TIMEOUT.as_secs() + )), + }; } Err(format!("Tool {} not found", tool_name)) @@ -409,8 +901,7 @@ pub async fn get_mcp_configs(app: AppHandle) -> Result { .map_err(|e| format!("Failed to create default MCP config: {}", e))?; } - let contents = fs::read_to_string(path).map_err(|e| e.to_string())?; - return Ok(contents); + fs::read_to_string(path).map_err(|e| e.to_string()) } #[tauri::command] @@ -422,6 +913,100 @@ pub async fn save_mcp_configs(app: AppHandle, configs: String) -> Result<(), Str fs::write(path, configs).map_err(|e| e.to_string()) } +/// Store active server configuration for restart purposes +async fn store_active_server_config( + active_servers_state: &Arc>>, + name: &str, + config: &Value, +) { + let mut active_servers = active_servers_state.lock().await; + active_servers.insert(name.to_string(), config.clone()); +} + + +/// Reset restart count for a server +async fn reset_restart_count( + restart_counts: &Arc>>, + name: &str, +) { + let mut counts = restart_counts.lock().await; + counts.insert(name.to_string(), 0); +} + +/// Spawn the server monitoring task for handling restarts +async fn spawn_server_monitoring_task( + app: AppHandle, + servers_state: Arc>>>, + name: String, + config: Value, + max_restarts: u32, + restart_counts: Arc>>, + successfully_connected: Arc>>, +) { + let app_clone = app.clone(); + let servers_clone = servers_state.clone(); + let name_clone = name.clone(); + let config_clone = config.clone(); + + tauri::async_runtime::spawn(async move { + // Monitor the server using RunningService's JoinHandle + let quit_reason = monitor_mcp_server_handle( + servers_clone.clone(), + name_clone.clone(), + ).await; + + log::info!("MCP server {} quit with reason: {:?}", name_clone, quit_reason); + + // Check if we should restart based on connection status and quit reason + if should_restart_server(&successfully_connected, &name_clone, &quit_reason).await { + // Start the restart loop + start_restart_loop( + app_clone, + servers_clone, + name_clone, + config_clone, + max_restarts, + restart_counts, + successfully_connected, + ).await; + } + }); +} + +/// Determine if a server should be restarted based on its connection status and quit reason +async fn should_restart_server( + successfully_connected: &Arc>>, + name: &str, + quit_reason: &Option, +) -> bool { + // Check if server was marked as successfully connected + let was_connected = { + let connected = successfully_connected.lock().await; + connected.get(name).copied().unwrap_or(false) + }; + + // Only restart if server was previously connected + if !was_connected { + log::error!( + "MCP server {} failed before establishing successful connection - stopping permanently", + name + ); + return false; + } + + // Determine if we should restart based on quit reason + match quit_reason { + Some(reason) => { + log::warn!("MCP server {} terminated unexpectedly: {:?}", name, reason); + true + } + None => { + log::info!("MCP server {} was manually stopped - not restarting", name); + false + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src-tauri/src/core/setup.rs b/src-tauri/src/core/setup.rs index c2d3499f3..8d8a3d557 100644 --- a/src-tauri/src/core/setup.rs +++ b/src-tauri/src/core/setup.rs @@ -197,9 +197,39 @@ fn extract_extension_manifest( } pub fn setup_mcp(app: &App) { - let state = app.state::().inner(); + let state = app.state::(); let servers = state.mcp_servers.clone(); let app_handle: tauri::AppHandle = app.handle().clone(); + + // Setup kill-mcp-servers event listener (similar to cortex kill-sidecar) + let app_handle_for_kill = app_handle.clone(); + app_handle.listen("kill-mcp-servers", move |_event| { + let app_handle = app_handle_for_kill.clone(); + tauri::async_runtime::spawn(async move { + log::info!("Received kill-mcp-servers event - cleaning up MCP servers"); + + let app_state = app_handle.state::(); + + // Stop all running MCP servers + if let Err(e) = super::mcp::stop_mcp_servers(app_state.mcp_servers.clone()).await { + log::error!("Failed to stop MCP servers: {}", e); + return; + } + + // Clear active servers and restart counts + { + let mut active_servers = app_state.mcp_active_servers.lock().await; + active_servers.clear(); + } + { + let mut restart_counts = app_state.mcp_restart_counts.lock().await; + restart_counts.clear(); + } + + log::info!("MCP servers cleaned up successfully"); + }); + }); + tauri::async_runtime::spawn(async move { if let Err(e) = run_mcp_commands(&app_handle, servers).await { log::error!("Failed to run mcp commands: {}", e); diff --git a/src-tauri/src/core/state.rs b/src-tauri/src/core/state.rs index 9957ba92e..dab29aa85 100644 --- a/src-tauri/src/core/state.rs +++ b/src-tauri/src/core/state.rs @@ -16,6 +16,9 @@ pub struct AppState { pub download_manager: Arc>, pub cortex_restart_count: Arc>, pub cortex_killed_intentionally: Arc>, + pub mcp_restart_counts: Arc>>, + pub mcp_active_servers: Arc>>, + pub mcp_successfully_connected: Arc>>, pub server_handle: Arc>>, } pub fn generate_app_token() -> String { diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 4ed6ecee7..0ef6f059a 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -68,6 +68,7 @@ pub fn run() { core::mcp::get_mcp_configs, core::mcp::activate_mcp_server, core::mcp::deactivate_mcp_server, + core::mcp::reset_mcp_restart_count, // Threads core::threads::list_threads, core::threads::create_thread, @@ -93,6 +94,9 @@ pub fn run() { download_manager: Arc::new(Mutex::new(DownloadManagerState::default())), cortex_restart_count: Arc::new(Mutex::new(0)), cortex_killed_intentionally: Arc::new(Mutex::new(false)), + mcp_restart_counts: Arc::new(Mutex::new(HashMap::new())), + mcp_active_servers: Arc::new(Mutex::new(HashMap::new())), + mcp_successfully_connected: Arc::new(Mutex::new(HashMap::new())), server_handle: Arc::new(Mutex::new(None)), }) .setup(|app| { @@ -124,6 +128,7 @@ pub fn run() { tauri::WindowEvent::CloseRequested { .. } => { if window.label() == "main" { window.emit("kill-sidecar", ()).unwrap(); + window.emit("kill-mcp-servers", ()).unwrap(); clean_up(); } }