fix: Prevent spamming /health endpoint and improve startup and resolve compiler warnings (#5784)
* fix: Prevent spamming /health endpoint and improve startup and resolve compiler warnings This commit introduces a delay and improved logic to the /health endpoint checks in the llamacpp extension, preventing excessive requests during model loading. Additionally, it addresses several Rust compiler warnings by: - Commenting out an unused `handle_app_quit` function in `src/core/mcp.rs`. - Explicitly declaring `target_port`, `session_api_key`, and `buffered_body` as mutable in `src/core/server.rs`. - Commenting out unused `tokio` imports in `src/core/setup.rs`. - Enhancing the `load_llama_model` function in `src/core/utils/extensions/inference_llamacpp_extension/server.rs` to better monitor stdout/stderr for readiness and errors, and handle timeouts. - Commenting out an unused `std::path::Prefix` import and adjusting `normalize_path` in `src/core/utils/mod.rs`. - Updating the application version to 0.6.904 in `tauri.conf.json`. * fix grammar! Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * fix grammar 2 Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com> * reimport prefix but only on Windows * remove instead of commenting * remove redundant check * sync app version in cargo.toml with tauri.conf --------- Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
This commit is contained in:
parent
bd3b8bff35
commit
b736d09168
@ -25,7 +25,7 @@ import {
|
||||
downloadBackend,
|
||||
isBackendInstalled,
|
||||
getBackendExePath,
|
||||
getBackendDir
|
||||
getBackendDir,
|
||||
} from './backend'
|
||||
import { invoke } from '@tauri-apps/api/core'
|
||||
|
||||
@ -348,7 +348,11 @@ export default class llamacpp_extension extends AIEngine {
|
||||
// --- Remove old backend files ---
|
||||
// Get Jan's data folder and build the backends directory path
|
||||
const janDataFolderPath = await getJanDataFolderPath()
|
||||
const backendsDir = await joinPath([janDataFolderPath, 'llamacpp', 'backends'])
|
||||
const backendsDir = await joinPath([
|
||||
janDataFolderPath,
|
||||
'llamacpp',
|
||||
'backends',
|
||||
])
|
||||
if (await fs.existsSync(backendsDir)) {
|
||||
const versionDirs = await fs.readdirSync(backendsDir)
|
||||
for (const versionDir of versionDirs) {
|
||||
@ -715,16 +719,34 @@ export default class llamacpp_extension extends AIEngine {
|
||||
sInfo: SessionInfo,
|
||||
timeoutMs = 240_000
|
||||
): Promise<void> {
|
||||
await this.sleep(500) // Wait before first check
|
||||
const start = Date.now()
|
||||
while (Date.now() - start < timeoutMs) {
|
||||
try {
|
||||
const res = await fetch(`http://localhost:${sInfo.port}/health`)
|
||||
if (res.ok) {
|
||||
return
|
||||
|
||||
if (res.status === 503) {
|
||||
const body = await res.json()
|
||||
const msg = body?.error?.message ?? 'Model loading'
|
||||
console.log(`waiting for model load... (${msg})`)
|
||||
} else if (res.ok) {
|
||||
const body = await res.json()
|
||||
if (body.status === 'ok') {
|
||||
return
|
||||
} else {
|
||||
console.warn('Unexpected OK response from /health:', body)
|
||||
}
|
||||
} else {
|
||||
console.warn(`Unexpected status ${res.status} from /health`)
|
||||
}
|
||||
} catch (e) {}
|
||||
await this.sleep(500) // 500 sec interval during rechecks
|
||||
} catch (e) {
|
||||
await this.unload(sInfo.model_id)
|
||||
throw new Error(`Model appears to have crashed: ${e}`)
|
||||
}
|
||||
|
||||
await this.sleep(800) // Retry interval
|
||||
}
|
||||
|
||||
await this.unload(sInfo.model_id)
|
||||
throw new Error(
|
||||
`Timed out loading model after ${timeoutMs}... killing llamacpp`
|
||||
@ -808,8 +830,7 @@ export default class llamacpp_extension extends AIEngine {
|
||||
args.push('--main-gpu', String(cfg.main_gpu))
|
||||
|
||||
// Boolean flags
|
||||
if (!cfg.ctx_shift)
|
||||
args.push('--no-context-shift')
|
||||
if (!cfg.ctx_shift) args.push('--no-context-shift')
|
||||
if (cfg.flash_attn) args.push('--flash-attn')
|
||||
if (cfg.cont_batching) args.push('--cont-batching')
|
||||
args.push('--no-mmap')
|
||||
@ -822,7 +843,12 @@ export default class llamacpp_extension extends AIEngine {
|
||||
if (cfg.ctx_size > 0) args.push('--ctx-size', String(cfg.ctx_size))
|
||||
if (cfg.n_predict > 0) args.push('--n-predict', String(cfg.n_predict))
|
||||
args.push('--cache-type-k', cfg.cache_type_k)
|
||||
args.push('--cache-type-v', cfg.cache_type_v)
|
||||
if (
|
||||
(cfg.flash_attn && cfg.cache_type_v != 'f16') ||
|
||||
cfg.cache_type_v != 'f32'
|
||||
) {
|
||||
args.push('--cache-type-v', cfg.cache_type_v)
|
||||
}
|
||||
args.push('--defrag-thold', String(cfg.defrag_thold))
|
||||
|
||||
args.push('--rope-scaling', cfg.rope_scaling)
|
||||
@ -851,7 +877,7 @@ export default class llamacpp_extension extends AIEngine {
|
||||
|
||||
return sInfo
|
||||
} catch (error) {
|
||||
console.error('Error loading llama-server:', error)
|
||||
console.error('Error loading llama-server:\n', error)
|
||||
throw new Error(`Failed to load llama-server: ${error}`)
|
||||
}
|
||||
}
|
||||
@ -974,7 +1000,15 @@ export default class llamacpp_extension extends AIEngine {
|
||||
const result = await invoke<boolean>('is_process_running', {
|
||||
pid: sessionInfo.pid,
|
||||
})
|
||||
if (!result) {
|
||||
console.log(`is_process_running result: ${result}`)
|
||||
if (result) {
|
||||
try {
|
||||
await fetch(`http://localhost:${sessionInfo.port}/health`)
|
||||
} catch (e) {
|
||||
this.unload(sessionInfo.model_id)
|
||||
throw new Error('Model appears to have crashed! Please reload!')
|
||||
}
|
||||
} else {
|
||||
this.activeSessions.delete(sessionInfo.pid)
|
||||
throw new Error('Model have crashed! Please reload!')
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "Jan"
|
||||
version = "0.5.16"
|
||||
version = "0.6.904"
|
||||
description = "Use offline LLMs with your own data. Run open source models like Llama2 or Falcon on your internal computers/servers."
|
||||
authors = ["Jan <service@jan.ai>"]
|
||||
license = "MIT"
|
||||
|
||||
@ -731,27 +731,6 @@ pub async fn restart_active_mcp_servers<R: Runtime>(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle app quit - stop all MCP servers cleanly (like cortex cleanup)
|
||||
pub async fn handle_app_quit(state: &AppState) -> Result<(), String> {
|
||||
log::info!("App quitting - stopping all MCP servers cleanly");
|
||||
|
||||
// Stop all running MCP servers
|
||||
stop_mcp_servers(state.mcp_servers.clone()).await?;
|
||||
|
||||
// Clear active servers and restart counts
|
||||
{
|
||||
let mut active_servers = state.mcp_active_servers.lock().await;
|
||||
active_servers.clear();
|
||||
}
|
||||
{
|
||||
let mut restart_counts = state.mcp_restart_counts.lock().await;
|
||||
restart_counts.clear();
|
||||
}
|
||||
|
||||
log::info!("All MCP servers stopped cleanly");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reset MCP restart count for a specific server (like cortex reset)
|
||||
#[tauri::command]
|
||||
pub async fn reset_mcp_restart_count(state: State<'_, AppState>, server_name: String) -> Result<(), String> {
|
||||
|
||||
@ -309,9 +309,9 @@ async fn proxy_request(
|
||||
return Ok(error_response.body(Body::from("Not Found")).unwrap());
|
||||
}
|
||||
|
||||
let mut target_port: Option<i32> = None;
|
||||
let mut session_api_key: Option<String> = None;
|
||||
let mut buffered_body: Option<Bytes> = None;
|
||||
let target_port: Option<i32>;
|
||||
let session_api_key: Option<String>;
|
||||
let buffered_body: Option<Bytes>;
|
||||
let original_path = parts.uri.path();
|
||||
let destination_path = get_destination_path(original_path, &config.prefix);
|
||||
|
||||
|
||||
@ -7,9 +7,9 @@ use std::{
|
||||
use tar::Archive;
|
||||
use tauri::{App, Emitter, Listener, Manager};
|
||||
use tauri_plugin_store::StoreExt;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::{sleep, Duration}; // Using tokio::sync::Mutex
|
||||
// MCP
|
||||
// use tokio::sync::Mutex;
|
||||
// use tokio::time::{sleep, Duration}; // Using tokio::sync::Mutex
|
||||
// // MCP
|
||||
|
||||
// MCP
|
||||
use super::{
|
||||
|
||||
@ -10,7 +10,8 @@ use tauri::State; // Import Manager trait
|
||||
use thiserror;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{timeout, Instant};
|
||||
|
||||
use crate::core::state::AppState;
|
||||
use crate::core::state::LLamaBackendSession;
|
||||
@ -56,24 +57,13 @@ pub struct UnloadResult {
|
||||
error: Option<String>,
|
||||
}
|
||||
|
||||
async fn capture_stderr(stderr: impl tokio::io::AsyncRead + Unpin) -> String {
|
||||
let mut reader = BufReader::new(stderr).lines();
|
||||
let mut buf = String::new();
|
||||
while let Ok(Some(line)) = reader.next_line().await {
|
||||
log::info!("[llamacpp] {}", line); // Don't use log::error!
|
||||
buf.push_str(&line);
|
||||
buf.push('\n');
|
||||
}
|
||||
buf
|
||||
}
|
||||
|
||||
// --- Load Command ---
|
||||
#[tauri::command]
|
||||
pub async fn load_llama_model(
|
||||
state: State<'_, AppState>, // Access the shared state
|
||||
state: State<'_, AppState>,
|
||||
backend_path: &str,
|
||||
library_path: Option<&str>,
|
||||
args: Vec<String>, // Arguments from the frontend
|
||||
args: Vec<String>,
|
||||
) -> ServerResult<SessionInfo> {
|
||||
let mut process_map = state.llama_server_process.lock().await;
|
||||
|
||||
@ -162,17 +152,52 @@ pub async fn load_llama_model(
|
||||
let mut child = command.spawn().map_err(ServerError::Io)?;
|
||||
|
||||
let stderr = child.stderr.take().expect("stderr was piped");
|
||||
let stderr_task = tokio::spawn(capture_stderr(stderr));
|
||||
|
||||
let stdout = child.stdout.take().expect("stdout was piped");
|
||||
tokio::spawn(async move {
|
||||
|
||||
// Create channels for communication between tasks
|
||||
let (ready_tx, mut ready_rx) = mpsc::channel::<bool>(1);
|
||||
let (error_tx, mut error_rx) = mpsc::channel::<String>(1);
|
||||
|
||||
// Spawn task to monitor stdout for readiness
|
||||
let _stdout_task = tokio::spawn(async move {
|
||||
let mut reader = BufReader::new(stdout).lines();
|
||||
while let Ok(Some(line)) = reader.next_line().await {
|
||||
log::info!("[llamacpp stdout] {}", line);
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
// Spawn task to capture stderr and monitor for errors
|
||||
let stderr_task = tokio::spawn(async move {
|
||||
let mut reader = BufReader::new(stderr).lines();
|
||||
let mut stderr_buffer = String::new();
|
||||
while let Ok(Some(line)) = reader.next_line().await {
|
||||
log::info!("[llamacpp] {}", line); // Using your log format
|
||||
stderr_buffer.push_str(&line);
|
||||
stderr_buffer.push('\n');
|
||||
// Check for critical error indicators that should stop the process
|
||||
// TODO: check for different errors
|
||||
if line.to_lowercase().contains("error")
|
||||
|| line.to_lowercase().contains("failed")
|
||||
|| line.to_lowercase().contains("fatal")
|
||||
|| line.contains("CUDA error")
|
||||
|| line.contains("out of memory")
|
||||
|| line.contains("failed to load")
|
||||
{
|
||||
let _ = error_tx.send(line.clone()).await;
|
||||
}
|
||||
// Check for readiness indicator - llama-server outputs this when ready
|
||||
else if line.contains("server is listening on")
|
||||
|| line.contains("starting the main loop")
|
||||
|| line.contains("server listening on")
|
||||
{
|
||||
log::info!("Server appears to be ready based on stdout: '{}'", line);
|
||||
let _ = ready_tx.send(true).await;
|
||||
}
|
||||
}
|
||||
stderr_buffer
|
||||
});
|
||||
|
||||
// Check if process exited early
|
||||
if let Some(status) = child.try_wait()? {
|
||||
if !status.success() {
|
||||
let stderr_output = stderr_task.await.unwrap_or_default();
|
||||
@ -182,10 +207,48 @@ pub async fn load_llama_model(
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for server to be ready or timeout
|
||||
let timeout_duration = Duration::from_secs(300); // 5 minutes timeout
|
||||
let start_time = Instant::now();
|
||||
log::info!("Waiting for server to be ready...");
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Server is ready
|
||||
Some(true) = ready_rx.recv() => {
|
||||
log::info!("Server is ready to accept requests!");
|
||||
break;
|
||||
}
|
||||
// Error occurred
|
||||
Some(error_msg) = error_rx.recv() => {
|
||||
log::error!("Server encountered an error: {}", error_msg);
|
||||
let _ = child.kill().await;
|
||||
// Get full stderr output
|
||||
let stderr_output = stderr_task.await.unwrap_or_default();
|
||||
return Err(ServerError::LlamacppError(format!("Error: {}\n\nFull stderr:\n{}", error_msg, stderr_output)));
|
||||
}
|
||||
// Timeout
|
||||
_ = tokio::time::sleep(Duration::from_millis(100)) => {
|
||||
if start_time.elapsed() > timeout_duration {
|
||||
log::error!("Timeout waiting for server to be ready");
|
||||
let _ = child.kill().await;
|
||||
return Err(ServerError::LlamacppError("Server startup timeout".to_string()));
|
||||
}
|
||||
// Check if process is still alive
|
||||
if let Some(status) = child.try_wait()? {
|
||||
if !status.success() {
|
||||
let stderr_output = stderr_task.await.unwrap_or_default();
|
||||
log::error!("llama.cpp exited during startup with code {status:?}");
|
||||
return Err(ServerError::LlamacppError(format!("Process exited with code {status:?}\n\nStderr:\n{}", stderr_output)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the PID to use as session ID
|
||||
let pid = child.id().map(|id| id as i32).unwrap_or(-1);
|
||||
|
||||
log::info!("Server process started with PID: {}", pid);
|
||||
log::info!("Server process started with PID: {} and is ready", pid);
|
||||
let session_info = SessionInfo {
|
||||
pid: pid.clone(),
|
||||
port: port,
|
||||
@ -194,7 +257,7 @@ pub async fn load_llama_model(
|
||||
api_key: api_key,
|
||||
};
|
||||
|
||||
// insert sesinfo to process_map
|
||||
// Insert session info to process_map
|
||||
process_map.insert(
|
||||
pid.clone(),
|
||||
LLamaBackendSession {
|
||||
|
||||
@ -6,6 +6,7 @@ use std::path::{Component, Path, PathBuf};
|
||||
use tauri::Runtime;
|
||||
|
||||
use super::cmd::get_jan_data_folder_path;
|
||||
#[cfg(windows)]
|
||||
use std::path::Prefix;
|
||||
|
||||
pub const THREADS_DIR: &str = "threads";
|
||||
@ -54,11 +55,11 @@ pub fn ensure_thread_dir_exists<R: Runtime>(
|
||||
// https://github.com/rust-lang/cargo/blob/rust-1.67.0/crates/cargo-util/src/paths.rs#L82-L107
|
||||
pub fn normalize_path(path: &Path) -> PathBuf {
|
||||
let mut components = path.components().peekable();
|
||||
let mut ret = if let Some(c @ Component::Prefix(prefix_component)) = components.peek().cloned()
|
||||
let mut ret = if let Some(c @ Component::Prefix(_prefix_component)) = components.peek().cloned()
|
||||
{
|
||||
#[cfg(windows)]
|
||||
// Remove only the Verbatim prefix, but keep the drive letter (e.g., C:\)
|
||||
match prefix_component.kind() {
|
||||
match _prefix_component.kind() {
|
||||
Prefix::VerbatimDisk(disk) => {
|
||||
components.next(); // skip this prefix
|
||||
// Re-add the disk prefix (e.g., C:)
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{
|
||||
"$schema": "https://schema.tauri.app/config/2",
|
||||
"productName": "Jan",
|
||||
"version": "0.6.901",
|
||||
"version": "0.6.904",
|
||||
"identifier": "jan.ai.app",
|
||||
"build": {
|
||||
"frontendDist": "../web-app/dist",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user