enahancement: mcp server activation response and error handling (#5220)

* fix: mcp server error handling

* fix: custom installation path of MCP package managers

* chore: clean up

* chore: clean up

* chore: append mcp server errors to app logs

* fix: logs reading

* chore: typo
This commit is contained in:
Louis 2025-06-09 19:43:16 +07:00 committed by GitHub
parent 0d0c624e99
commit 919b6671a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 261 additions and 135 deletions

View File

@ -1,9 +1,8 @@
use std::{collections::HashMap, env, sync::Arc, time::Duration};
use rmcp::model::{CallToolRequestParam, CallToolResult, Tool};
use rmcp::{service::RunningService, transport::TokioChildProcess, RoleClient, ServiceExt};
use serde_json::{Map, Value};
use std::fs;
use std::{collections::HashMap, env, sync::Arc, time::Duration};
use tauri::{AppHandle, Emitter, Runtime, State};
use tokio::{process::Command, sync::Mutex, time::timeout};
@ -29,7 +28,7 @@ pub async fn run_mcp_commands<R: Runtime>(
) -> Result<(), String> {
let app_path = get_jan_data_folder_path(app.clone());
let app_path_str = app_path.to_str().unwrap().to_string();
log::info!(
log::trace!(
"Load MCP configs from {}",
app_path_str.clone() + "/mcp_config.json"
);
@ -40,88 +39,165 @@ pub async fn run_mcp_commands<R: Runtime>(
.map_err(|e| format!("Failed to parse config: {}", e))?;
if let Some(server_map) = mcp_servers.get("mcpServers").and_then(Value::as_object) {
log::info!("MCP Servers: {server_map:#?}");
log::trace!("MCP Servers: {server_map:#?}");
let exe_path = env::current_exe().expect("Failed to get current exe path");
let exe_parent_path = exe_path
.parent()
.expect("Executable must have a parent directory");
let bin_path = exe_parent_path.to_path_buf();
for (name, config) in server_map {
if let Some(false) = extract_active_status(config) {
log::info!("Server {name} is not active, skipping.");
log::trace!("Server {name} is not active, skipping.");
continue;
}
if let Some((command, args, envs)) = extract_command_args(config) {
let mut cmd = Command::new(command.clone());
if command.clone() == "npx" {
let bun_x_path = format!("{}/bun", bin_path.display());
cmd = Command::new(bun_x_path);
cmd.arg("x");
match start_mcp_server(
app.clone(),
servers_state.clone(),
name.clone(),
config.clone(),
)
.await
{
Ok(_) => {
log::info!("Server {name} activated successfully.");
}
if command.clone() == "uvx" {
let bun_x_path = format!("{}/uv", bin_path.display());
cmd = Command::new(bun_x_path);
cmd.arg("tool");
cmd.arg("run");
}
println!("Command: {cmd:#?}");
args.iter().filter_map(Value::as_str).for_each(|arg| {
cmd.arg(arg);
});
envs.iter().for_each(|(k, v)| {
if let Some(v_str) = v.as_str() {
cmd.env(k, v_str);
}
});
let process = TokioChildProcess::new(cmd);
match process {
Ok(p) => {
let service = ().serve(p).await;
match service {
Ok(running_service) => {
servers_state
.lock()
.await
.insert(name.clone(), running_service);
log::info!("Server {name} started successfully.");
}
Err(e) => {
log::error!("Failed to start server {name}: {e}");
}
}
}
Err(e) => {
log::error!("Failed to run command {name}: {e}");
}
Err(e) => {
let _ = app.emit(
"mcp-error",
format!("Failed to activate MCP server {name}: {e}"),
);
log::error!("Failed to activate server {name}: {e}");
continue; // Skip to the next server
}
}
}
}
// Collect servers into a Vec to avoid holding the RwLockReadGuard across await points
let servers_map = servers_state.lock().await;
for (_, service) in servers_map.iter() {
// Initialize
let _server_info = service.peer_info();
log::info!("Connected to server: {_server_info:#?}");
// Emit event to the frontend
let event = format!("mcp-connected");
let server_info: &rmcp::model::InitializeResult = service.peer_info();
let name = server_info.server_info.name.clone();
let version = server_info.server_info.version.clone();
let payload = serde_json::json!({
"name": name,
"version": version,
Ok(())
}
#[tauri::command]
pub async fn activate_mcp_server<R: Runtime>(
app: tauri::AppHandle<R>,
state: State<'_, AppState>,
name: String,
config: Value,
) -> Result<(), String> {
let servers: Arc<Mutex<HashMap<String, RunningService<RoleClient, ()>>>> =
state.mcp_servers.clone();
start_mcp_server(app, servers, name, config).await
}
async fn start_mcp_server<R: Runtime>(
app: tauri::AppHandle<R>,
servers: Arc<Mutex<HashMap<String, RunningService<RoleClient, ()>>>>,
name: String,
config: Value,
) -> Result<(), String> {
let app_path = get_jan_data_folder_path(app.clone());
let exe_path = env::current_exe().expect("Failed to get current exe path");
let exe_parent_path = exe_path
.parent()
.expect("Executable must have a parent directory");
let bin_path = exe_parent_path.to_path_buf();
if let Some((command, args, envs)) = extract_command_args(&config) {
let mut cmd = Command::new(command.clone());
if command.clone() == "npx" {
let mut cache_dir = app_path.clone();
cache_dir.push(".npx");
let bun_x_path = format!("{}/bun", bin_path.display());
cmd = Command::new(bun_x_path);
cmd.arg("x");
cmd.env("BUN_INSTALL", cache_dir.to_str().unwrap().to_string());
}
if command.clone() == "uvx" {
let mut cache_dir = app_path.clone();
cache_dir.push(".uvx");
let bun_x_path = format!("{}/uv", bin_path.display());
cmd = Command::new(bun_x_path);
cmd.arg("tool");
cmd.arg("run");
cmd.env("UV_CACHE_DIR", cache_dir.to_str().unwrap().to_string());
}
let app_path_str = app_path.to_str().unwrap().to_string();
let log_file_path = format!("{}/logs/app.log", app_path_str);
match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(log_file_path)
{
Ok(file) => {
cmd.stderr(std::process::Stdio::from(file));
}
Err(err) => {
log::error!("Failed to open log file: {}", err);
}
};
cmd.kill_on_drop(true);
log::trace!("Command: {cmd:#?}");
args.iter().filter_map(Value::as_str).for_each(|arg| {
cmd.arg(arg);
});
// service.peer_info().server_info.name
app.emit(&event, payload)
.map_err(|e| format!("Failed to emit event: {}", e))?;
log::info!("Emitted event: {event}");
envs.iter().for_each(|(k, v)| {
if let Some(v_str) = v.as_str() {
cmd.env(k, v_str);
}
});
let process = TokioChildProcess::new(cmd);
match process {
Ok(p) => {
let service = ().serve(p).await;
match service {
Ok(running_service) => {
// Get peer info and clone the needed values before moving the service
let (server_name, server_version) = {
let server_info = running_service.peer_info();
log::trace!("Connected to server: {server_info:#?}");
(
server_info.server_info.name.clone(),
server_info.server_info.version.clone(),
)
};
// Now move the service into the HashMap
servers.lock().await.insert(name.clone(), running_service);
log::info!("Server {name} started successfully.");
// Emit event to the frontend
let event = format!("mcp-connected");
let payload = serde_json::json!({
"name": server_name,
"version": server_version,
});
app.emit(&event, payload)
.map_err(|e| format!("Failed to emit event: {}", e))?;
}
Err(e) => {
return Err(format!("Failed to start MCP server {name}: {e}"));
}
}
}
Err(e) => {
log::error!("Failed to run command {name}: {e}");
return Err(format!("Failed to run command {name}: {e}"));
}
}
}
Ok(())
}
#[tauri::command]
pub async fn deactivate_mcp_server(state: State<'_, AppState>, name: String) -> Result<(), String> {
let servers = state.mcp_servers.clone();
let mut servers_map = servers.lock().await;
if let Some(service) = servers_map.remove(&name) {
service.cancel().await.map_err(|e| e.to_string())?;
log::info!("Server {name} stopped successfully.");
} else {
return Err(format!("Server {} not found", name));
}
Ok(())
}

View File

@ -65,6 +65,8 @@ pub fn run() {
core::mcp::get_connected_servers,
core::mcp::save_mcp_configs,
core::mcp::get_mcp_configs,
core::mcp::activate_mcp_server,
core::mcp::deactivate_mcp_server,
// Threads
core::threads::list_threads,
core::threads::create_thread,

View File

@ -1,5 +1,5 @@
import { create } from 'zustand'
import { updateMCPConfig } from '@/services/mcp'
import { restartMCPServers, updateMCPConfig } from '@/services/mcp'
// Define the structure of an MCP server configuration
export type MCPServerConfig = {
@ -25,6 +25,7 @@ type MCPServerStoreState = {
deleteServer: (key: string) => void
setServers: (servers: MCPServers) => void
syncServers: () => void
syncServersAndRestart: () => void
}
export const useMCPServers = create<MCPServerStoreState>()((set, get) => ({
@ -78,4 +79,12 @@ export const useMCPServers = create<MCPServerStoreState>()((set, get) => ({
})
)
},
syncServersAndRestart: async () => {
const mcpServers = get().mcpServers
await updateMCPConfig(
JSON.stringify({
mcpServers,
})
).then(() => restartMCPServers())
},
}))

View File

@ -10,14 +10,6 @@ export const Route = createFileRoute(route.localApiServerlogs as any)({
component: LogsViewer,
})
// Define log entry type
interface LogEntry {
timestamp: string
level: 'info' | 'warn' | 'error' | 'debug'
target: string
message: string
}
const SERVER_LOG_TARGET = 'app_lib::core::server'
const LOG_EVENT_NAME = 'log://log'
@ -84,7 +76,7 @@ function LogsViewer() {
}
// Format timestamp to be more readable
const formatTimestamp = (timestamp: string) => {
const formatTimestamp = (timestamp: string | number) => {
const date = new Date(timestamp)
return date.toLocaleTimeString()
}

View File

@ -2,8 +2,7 @@ import { createFileRoute } from '@tanstack/react-router'
import { route } from '@/constants/routes'
import { useEffect, useState, useRef } from 'react'
import { parseLogLine, readLogs } from '@/services/app'
import { listen } from '@tauri-apps/api/event'
import { readLogs } from '@/services/app'
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const Route = createFileRoute(route.appLogs as any)({
@ -11,48 +10,33 @@ export const Route = createFileRoute(route.appLogs as any)({
})
// Define log entry type
interface LogEntry {
timestamp: string
level: 'info' | 'warn' | 'error' | 'debug'
target: string
message: string
}
const LOG_EVENT_NAME = 'log://log'
function LogsViewer() {
const [logs, setLogs] = useState<LogEntry[]>([])
const logsContainerRef = useRef<HTMLDivElement>(null)
useEffect(() => {
readLogs().then((logData) => {
const logs = logData.filter(Boolean) as LogEntry[]
setLogs(logs)
let lastLogsLength = 0
function updateLogs() {
readLogs().then((logData) => {
let needScroll = false
const filteredLogs = logData.filter(Boolean) as LogEntry[]
if (filteredLogs.length > lastLogsLength) needScroll = true
lastLogsLength = filteredLogs.length
setLogs(filteredLogs)
// Scroll to bottom after initial logs are loaded
if (needScroll) setTimeout(() => scrollToBottom(), 100)
})
}
updateLogs()
// repeat action each 3s
const intervalId = setInterval(() => updateLogs(), 3000)
// Scroll to bottom after initial logs are loaded
setTimeout(() => {
scrollToBottom()
}, 100)
})
let unsubscribe = () => {}
listen(LOG_EVENT_NAME, (event) => {
const { message } = event.payload as { message: string }
const log: LogEntry | undefined = parseLogLine(message)
if (log) {
setLogs((prevLogs) => {
const newLogs = [...prevLogs, log]
// Schedule scroll to bottom after state update
setTimeout(() => {
scrollToBottom()
}, 0)
return newLogs
})
}
}).then((unsub) => {
unsubscribe = unsub
})
return () => {
unsubscribe()
clearInterval(intervalId)
}
}, [])
@ -81,7 +65,7 @@ function LogsViewer() {
}
// Format timestamp to be more readable
const formatTimestamp = (timestamp: string) => {
const formatTimestamp = (timestamp: string | number) => {
const date = new Date(timestamp)
return date.toLocaleTimeString()
}

View File

@ -19,6 +19,8 @@ import { Switch } from '@/components/ui/switch'
import { twMerge } from 'tailwind-merge'
import { getConnectedServers } from '@/services/mcp'
import { useToolApproval } from '@/hooks/useToolApproval'
import { toast } from 'sonner'
import { invoke } from '@tauri-apps/api/core'
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export const Route = createFileRoute(route.settings.mcp_servers as any)({
@ -26,8 +28,14 @@ export const Route = createFileRoute(route.settings.mcp_servers as any)({
})
function MCPServers() {
const { mcpServers, addServer, editServer, deleteServer, syncServers } =
useMCPServers()
const {
mcpServers,
addServer,
editServer,
deleteServer,
syncServers,
syncServersAndRestart,
} = useMCPServers()
const { allowAllMCPPermissions, setAllowAllMCPPermissions } =
useToolApproval()
@ -77,7 +85,7 @@ function MCPServers() {
// Add new server
addServer(name, config)
}
syncServers()
syncServersAndRestart()
}
const handleEdit = (serverKey: string) => {
@ -93,7 +101,7 @@ function MCPServers() {
if (serverToDelete) {
deleteServer(serverToDelete)
setServerToDelete(null)
syncServers()
syncServersAndRestart()
}
}
@ -130,18 +138,51 @@ function MCPServers() {
}
)
}
syncServers()
syncServersAndRestart()
}
const toggleServer = (serverKey: string, active: boolean) => {
if (serverKey) {
// Save single server
editServer(serverKey, {
...(mcpServers[serverKey] as MCPServerConfig),
active,
})
syncServers()
}
if (serverKey)
if (active)
invoke('activate_mcp_server', {
name: serverKey,
config: {
...(mcpServers[serverKey] as MCPServerConfig),
active,
},
})
.then(() => {
// Save single server
editServer(serverKey, {
...(mcpServers[serverKey] as MCPServerConfig),
active,
})
syncServers()
toast.success(
`Server ${serverKey} is now ${active ? 'active' : 'inactive'}.`
)
getConnectedServers().then(setConnectedServers)
})
.catch((error) => {
toast.error(error, {
description:
'Please check the parameters according to the tutorial.',
})
})
else {
editServer(serverKey, {
...(mcpServers[serverKey] as MCPServerConfig),
active,
})
syncServers()
invoke('deactivate_mcp_server', { name: serverKey })
.catch((error) => {
toast.error(`Failed to deactivate server ${serverKey}: ${error}`)
})
.finally(() => {
getConnectedServers().then(setConnectedServers)
})
}
}
useEffect(() => {

View File

@ -42,7 +42,13 @@ export const parseLogLine = (line: string) => {
const regex = /^\[(.*?)\]\[(.*?)\]\[(.*?)\]\[(.*?)\]\s(.*)$/
const match = line.match(regex)
if (!match) return undefined // Skip invalid lines
if (!match)
return {
timestamp: Date.now(),
level: 'info' as 'info' | 'warn' | 'error' | 'debug',
target: 'info',
message: line ?? '',
} as LogEntry
const [, date, time, target, levelRaw, message] = match

View File

@ -7,6 +7,13 @@ import { MCPTool } from '@/types/completion'
*/
export const updateMCPConfig = async (configs: string) => {
await window.core?.api?.saveMcpConfigs({ configs })
}
/**
* @description This function restarts the MCP servers.
* @param configs
*/
export const restartMCPServers = async () => {
await window.core?.api?.restartMcpServers()
}
@ -15,7 +22,9 @@ export const updateMCPConfig = async (configs: string) => {
* @returns {Promise<object>} The MCP configuration.
*/
export const getMCPConfig = async () => {
const mcpConfig = JSON.parse((await window.core?.api?.getMcpConfigs()) ?? '{}')
const mcpConfig = JSON.parse(
(await window.core?.api?.getMcpConfigs()) ?? '{}'
)
return mcpConfig
}

View File

@ -1 +1,7 @@
type Language = 'en' | 'id' | 'vn'
interface LogEntry {
timestamp: string | number
level: 'info' | 'warn' | 'error' | 'debug'
target: string
message: string
}

View File

@ -1,4 +1,5 @@
export enum SystemEvent {
MCP_UPDATE = 'mcp-update',
KILL_SIDECAR = 'kill-sidecar',
MCP_ERROR = 'mcp-error',
}