From b736d09168560b074da1de9cbefca8755ff3f203 Mon Sep 17 00:00:00 2001 From: Akarshan Biswas Date: Wed, 16 Jul 2025 18:18:11 +0530 Subject: [PATCH] fix: Prevent spamming /health endpoint and improve startup and resolve compiler warnings (#5784) * fix: Prevent spamming /health endpoint and improve startup and resolve compiler warnings This commit introduces a delay and improved logic to the /health endpoint checks in the llamacpp extension, preventing excessive requests during model loading. Additionally, it addresses several Rust compiler warnings by: - Commenting out an unused `handle_app_quit` function in `src/core/mcp.rs`. - Explicitly declaring `target_port`, `session_api_key`, and `buffered_body` as mutable in `src/core/server.rs`. - Commenting out unused `tokio` imports in `src/core/setup.rs`. - Enhancing the `load_llama_model` function in `src/core/utils/extensions/inference_llamacpp_extension/server.rs` to better monitor stdout/stderr for readiness and errors, and handle timeouts. - Commenting out an unused `std::path::Prefix` import and adjusting `normalize_path` in `src/core/utils/mod.rs`. - Updating the application version to 0.6.904 in `tauri.conf.json`. * fix grammar! Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix grammar 2 Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * reimport prefix but only on Windows * remove instead of commenting * remove redundant check * sync app version in cargo.toml with tauri.conf --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> --- extensions/llamacpp-extension/src/index.ts | 56 ++++++++-- src-tauri/Cargo.toml | 2 +- src-tauri/src/core/mcp.rs | 21 ---- src-tauri/src/core/server.rs | 6 +- src-tauri/src/core/setup.rs | 6 +- .../inference_llamacpp_extension/server.rs | 103 ++++++++++++++---- src-tauri/src/core/utils/mod.rs | 5 +- src-tauri/tauri.conf.json | 2 +- 8 files changed, 139 insertions(+), 62 deletions(-) diff --git a/extensions/llamacpp-extension/src/index.ts b/extensions/llamacpp-extension/src/index.ts index f7f7cf240..5377b29b8 100644 --- a/extensions/llamacpp-extension/src/index.ts +++ b/extensions/llamacpp-extension/src/index.ts @@ -25,7 +25,7 @@ import { downloadBackend, isBackendInstalled, getBackendExePath, - getBackendDir + getBackendDir, } from './backend' import { invoke } from '@tauri-apps/api/core' @@ -348,7 +348,11 @@ export default class llamacpp_extension extends AIEngine { // --- Remove old backend files --- // Get Jan's data folder and build the backends directory path const janDataFolderPath = await getJanDataFolderPath() - const backendsDir = await joinPath([janDataFolderPath, 'llamacpp', 'backends']) + const backendsDir = await joinPath([ + janDataFolderPath, + 'llamacpp', + 'backends', + ]) if (await fs.existsSync(backendsDir)) { const versionDirs = await fs.readdirSync(backendsDir) for (const versionDir of versionDirs) { @@ -715,16 +719,34 @@ export default class llamacpp_extension extends AIEngine { sInfo: SessionInfo, timeoutMs = 240_000 ): Promise { + await this.sleep(500) // Wait before first check const start = Date.now() while (Date.now() - start < timeoutMs) { try { const res = await fetch(`http://localhost:${sInfo.port}/health`) - if (res.ok) { - return + + if (res.status === 503) { + const body = await res.json() + const msg = body?.error?.message ?? 'Model loading' + console.log(`waiting for model load... (${msg})`) + } else if (res.ok) { + const body = await res.json() + if (body.status === 'ok') { + return + } else { + console.warn('Unexpected OK response from /health:', body) + } + } else { + console.warn(`Unexpected status ${res.status} from /health`) } - } catch (e) {} - await this.sleep(500) // 500 sec interval during rechecks + } catch (e) { + await this.unload(sInfo.model_id) + throw new Error(`Model appears to have crashed: ${e}`) + } + + await this.sleep(800) // Retry interval } + await this.unload(sInfo.model_id) throw new Error( `Timed out loading model after ${timeoutMs}... killing llamacpp` @@ -808,8 +830,7 @@ export default class llamacpp_extension extends AIEngine { args.push('--main-gpu', String(cfg.main_gpu)) // Boolean flags - if (!cfg.ctx_shift) - args.push('--no-context-shift') + if (!cfg.ctx_shift) args.push('--no-context-shift') if (cfg.flash_attn) args.push('--flash-attn') if (cfg.cont_batching) args.push('--cont-batching') args.push('--no-mmap') @@ -822,7 +843,12 @@ export default class llamacpp_extension extends AIEngine { if (cfg.ctx_size > 0) args.push('--ctx-size', String(cfg.ctx_size)) if (cfg.n_predict > 0) args.push('--n-predict', String(cfg.n_predict)) args.push('--cache-type-k', cfg.cache_type_k) - args.push('--cache-type-v', cfg.cache_type_v) + if ( + (cfg.flash_attn && cfg.cache_type_v != 'f16') || + cfg.cache_type_v != 'f32' + ) { + args.push('--cache-type-v', cfg.cache_type_v) + } args.push('--defrag-thold', String(cfg.defrag_thold)) args.push('--rope-scaling', cfg.rope_scaling) @@ -851,7 +877,7 @@ export default class llamacpp_extension extends AIEngine { return sInfo } catch (error) { - console.error('Error loading llama-server:', error) + console.error('Error loading llama-server:\n', error) throw new Error(`Failed to load llama-server: ${error}`) } } @@ -974,7 +1000,15 @@ export default class llamacpp_extension extends AIEngine { const result = await invoke('is_process_running', { pid: sessionInfo.pid, }) - if (!result) { + console.log(`is_process_running result: ${result}`) + if (result) { + try { + await fetch(`http://localhost:${sessionInfo.port}/health`) + } catch (e) { + this.unload(sessionInfo.model_id) + throw new Error('Model appears to have crashed! Please reload!') + } + } else { this.activeSessions.delete(sessionInfo.pid) throw new Error('Model have crashed! Please reload!') } diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index ee49705b2..87d493d07 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "Jan" -version = "0.5.16" +version = "0.6.904" description = "Use offline LLMs with your own data. Run open source models like Llama2 or Falcon on your internal computers/servers." authors = ["Jan "] license = "MIT" diff --git a/src-tauri/src/core/mcp.rs b/src-tauri/src/core/mcp.rs index ada18a4a4..a5fa91463 100644 --- a/src-tauri/src/core/mcp.rs +++ b/src-tauri/src/core/mcp.rs @@ -731,27 +731,6 @@ pub async fn restart_active_mcp_servers( Ok(()) } -/// Handle app quit - stop all MCP servers cleanly (like cortex cleanup) -pub async fn handle_app_quit(state: &AppState) -> Result<(), String> { - log::info!("App quitting - stopping all MCP servers cleanly"); - - // Stop all running MCP servers - stop_mcp_servers(state.mcp_servers.clone()).await?; - - // Clear active servers and restart counts - { - let mut active_servers = state.mcp_active_servers.lock().await; - active_servers.clear(); - } - { - let mut restart_counts = state.mcp_restart_counts.lock().await; - restart_counts.clear(); - } - - log::info!("All MCP servers stopped cleanly"); - Ok(()) -} - /// Reset MCP restart count for a specific server (like cortex reset) #[tauri::command] pub async fn reset_mcp_restart_count(state: State<'_, AppState>, server_name: String) -> Result<(), String> { diff --git a/src-tauri/src/core/server.rs b/src-tauri/src/core/server.rs index d934ef9f9..734d4aa3a 100644 --- a/src-tauri/src/core/server.rs +++ b/src-tauri/src/core/server.rs @@ -309,9 +309,9 @@ async fn proxy_request( return Ok(error_response.body(Body::from("Not Found")).unwrap()); } - let mut target_port: Option = None; - let mut session_api_key: Option = None; - let mut buffered_body: Option = None; + let target_port: Option; + let session_api_key: Option; + let buffered_body: Option; let original_path = parts.uri.path(); let destination_path = get_destination_path(original_path, &config.prefix); diff --git a/src-tauri/src/core/setup.rs b/src-tauri/src/core/setup.rs index c04abc3f7..a2875072c 100644 --- a/src-tauri/src/core/setup.rs +++ b/src-tauri/src/core/setup.rs @@ -7,9 +7,9 @@ use std::{ use tar::Archive; use tauri::{App, Emitter, Listener, Manager}; use tauri_plugin_store::StoreExt; -use tokio::sync::Mutex; -use tokio::time::{sleep, Duration}; // Using tokio::sync::Mutex - // MCP +// use tokio::sync::Mutex; +// use tokio::time::{sleep, Duration}; // Using tokio::sync::Mutex +// // MCP // MCP use super::{ diff --git a/src-tauri/src/core/utils/extensions/inference_llamacpp_extension/server.rs b/src-tauri/src/core/utils/extensions/inference_llamacpp_extension/server.rs index 269e22800..449e0e543 100644 --- a/src-tauri/src/core/utils/extensions/inference_llamacpp_extension/server.rs +++ b/src-tauri/src/core/utils/extensions/inference_llamacpp_extension/server.rs @@ -10,7 +10,8 @@ use tauri::State; // Import Manager trait use thiserror; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; -use tokio::time::timeout; +use tokio::sync::mpsc; +use tokio::time::{timeout, Instant}; use crate::core::state::AppState; use crate::core::state::LLamaBackendSession; @@ -56,24 +57,13 @@ pub struct UnloadResult { error: Option, } -async fn capture_stderr(stderr: impl tokio::io::AsyncRead + Unpin) -> String { - let mut reader = BufReader::new(stderr).lines(); - let mut buf = String::new(); - while let Ok(Some(line)) = reader.next_line().await { - log::info!("[llamacpp] {}", line); // Don't use log::error! - buf.push_str(&line); - buf.push('\n'); - } - buf -} - // --- Load Command --- #[tauri::command] pub async fn load_llama_model( - state: State<'_, AppState>, // Access the shared state + state: State<'_, AppState>, backend_path: &str, library_path: Option<&str>, - args: Vec, // Arguments from the frontend + args: Vec, ) -> ServerResult { let mut process_map = state.llama_server_process.lock().await; @@ -162,17 +152,52 @@ pub async fn load_llama_model( let mut child = command.spawn().map_err(ServerError::Io)?; let stderr = child.stderr.take().expect("stderr was piped"); - let stderr_task = tokio::spawn(capture_stderr(stderr)); - let stdout = child.stdout.take().expect("stdout was piped"); - tokio::spawn(async move { + + // Create channels for communication between tasks + let (ready_tx, mut ready_rx) = mpsc::channel::(1); + let (error_tx, mut error_rx) = mpsc::channel::(1); + + // Spawn task to monitor stdout for readiness + let _stdout_task = tokio::spawn(async move { let mut reader = BufReader::new(stdout).lines(); while let Ok(Some(line)) = reader.next_line().await { log::info!("[llamacpp stdout] {}", line); } }); - tokio::time::sleep(Duration::from_millis(300)).await; + // Spawn task to capture stderr and monitor for errors + let stderr_task = tokio::spawn(async move { + let mut reader = BufReader::new(stderr).lines(); + let mut stderr_buffer = String::new(); + while let Ok(Some(line)) = reader.next_line().await { + log::info!("[llamacpp] {}", line); // Using your log format + stderr_buffer.push_str(&line); + stderr_buffer.push('\n'); + // Check for critical error indicators that should stop the process + // TODO: check for different errors + if line.to_lowercase().contains("error") + || line.to_lowercase().contains("failed") + || line.to_lowercase().contains("fatal") + || line.contains("CUDA error") + || line.contains("out of memory") + || line.contains("failed to load") + { + let _ = error_tx.send(line.clone()).await; + } + // Check for readiness indicator - llama-server outputs this when ready + else if line.contains("server is listening on") + || line.contains("starting the main loop") + || line.contains("server listening on") + { + log::info!("Server appears to be ready based on stdout: '{}'", line); + let _ = ready_tx.send(true).await; + } + } + stderr_buffer + }); + + // Check if process exited early if let Some(status) = child.try_wait()? { if !status.success() { let stderr_output = stderr_task.await.unwrap_or_default(); @@ -182,10 +207,48 @@ pub async fn load_llama_model( } } + // Wait for server to be ready or timeout + let timeout_duration = Duration::from_secs(300); // 5 minutes timeout + let start_time = Instant::now(); + log::info!("Waiting for server to be ready..."); + loop { + tokio::select! { + // Server is ready + Some(true) = ready_rx.recv() => { + log::info!("Server is ready to accept requests!"); + break; + } + // Error occurred + Some(error_msg) = error_rx.recv() => { + log::error!("Server encountered an error: {}", error_msg); + let _ = child.kill().await; + // Get full stderr output + let stderr_output = stderr_task.await.unwrap_or_default(); + return Err(ServerError::LlamacppError(format!("Error: {}\n\nFull stderr:\n{}", error_msg, stderr_output))); + } + // Timeout + _ = tokio::time::sleep(Duration::from_millis(100)) => { + if start_time.elapsed() > timeout_duration { + log::error!("Timeout waiting for server to be ready"); + let _ = child.kill().await; + return Err(ServerError::LlamacppError("Server startup timeout".to_string())); + } + // Check if process is still alive + if let Some(status) = child.try_wait()? { + if !status.success() { + let stderr_output = stderr_task.await.unwrap_or_default(); + log::error!("llama.cpp exited during startup with code {status:?}"); + return Err(ServerError::LlamacppError(format!("Process exited with code {status:?}\n\nStderr:\n{}", stderr_output))); + } + } + } + } + } + // Get the PID to use as session ID let pid = child.id().map(|id| id as i32).unwrap_or(-1); - log::info!("Server process started with PID: {}", pid); + log::info!("Server process started with PID: {} and is ready", pid); let session_info = SessionInfo { pid: pid.clone(), port: port, @@ -194,7 +257,7 @@ pub async fn load_llama_model( api_key: api_key, }; - // insert sesinfo to process_map + // Insert session info to process_map process_map.insert( pid.clone(), LLamaBackendSession { diff --git a/src-tauri/src/core/utils/mod.rs b/src-tauri/src/core/utils/mod.rs index 1df4e5231..43620fa2b 100644 --- a/src-tauri/src/core/utils/mod.rs +++ b/src-tauri/src/core/utils/mod.rs @@ -6,6 +6,7 @@ use std::path::{Component, Path, PathBuf}; use tauri::Runtime; use super::cmd::get_jan_data_folder_path; +#[cfg(windows)] use std::path::Prefix; pub const THREADS_DIR: &str = "threads"; @@ -54,11 +55,11 @@ pub fn ensure_thread_dir_exists( // https://github.com/rust-lang/cargo/blob/rust-1.67.0/crates/cargo-util/src/paths.rs#L82-L107 pub fn normalize_path(path: &Path) -> PathBuf { let mut components = path.components().peekable(); - let mut ret = if let Some(c @ Component::Prefix(prefix_component)) = components.peek().cloned() + let mut ret = if let Some(c @ Component::Prefix(_prefix_component)) = components.peek().cloned() { #[cfg(windows)] // Remove only the Verbatim prefix, but keep the drive letter (e.g., C:\) - match prefix_component.kind() { + match _prefix_component.kind() { Prefix::VerbatimDisk(disk) => { components.next(); // skip this prefix // Re-add the disk prefix (e.g., C:) diff --git a/src-tauri/tauri.conf.json b/src-tauri/tauri.conf.json index 64166ec82..c0429bf97 100644 --- a/src-tauri/tauri.conf.json +++ b/src-tauri/tauri.conf.json @@ -1,7 +1,7 @@ { "$schema": "https://schema.tauri.app/config/2", "productName": "Jan", - "version": "0.6.901", + "version": "0.6.904", "identifier": "jan.ai.app", "build": { "frontendDist": "../web-app/dist",