From 6f53f1056a25c9a45fd533cd6b5d1a99cd88bc2b Mon Sep 17 00:00:00 2001 From: Louis Date: Tue, 15 Apr 2025 18:57:43 +0700 Subject: [PATCH] refactor: Jan manages threads for a better performance (#4912) * refactor: Jan manages threads for a better performance * test: add tests --- .../conversational-extension/package.json | 4 +- .../conversational-extension/src/index.ts | 91 +-- src-tauri/Cargo.toml | 2 + src-tauri/src/core/cmd.rs | 4 + src-tauri/src/core/fs.rs | 13 +- src-tauri/src/core/mod.rs | 2 + src-tauri/src/core/threads.rs | 598 ++++++++++++++++++ src-tauri/src/core/utils/mod.rs | 48 ++ src-tauri/src/lib.rs | 14 +- .../ThreadCenterPanel/TextMessage/index.tsx | 5 +- web/services/tauriService.ts | 11 + 11 files changed, 703 insertions(+), 89 deletions(-) create mode 100644 src-tauri/src/core/threads.rs create mode 100644 src-tauri/src/core/utils/mod.rs diff --git a/extensions/conversational-extension/package.json b/extensions/conversational-extension/package.json index a5224b99b..693adf6d6 100644 --- a/extensions/conversational-extension/package.json +++ b/extensions/conversational-extension/package.json @@ -23,9 +23,7 @@ "typescript": "^5.7.2" }, "dependencies": { - "@janhq/core": "../../core/package.tgz", - "ky": "^1.7.2", - "p-queue": "^8.0.1" + "@janhq/core": "../../core/package.tgz" }, "engines": { "node": ">=18.0.0" diff --git a/extensions/conversational-extension/src/index.ts b/extensions/conversational-extension/src/index.ts index e2e068939..720291d88 100644 --- a/extensions/conversational-extension/src/index.ts +++ b/extensions/conversational-extension/src/index.ts @@ -4,40 +4,12 @@ import { ThreadAssistantInfo, ThreadMessage, } from '@janhq/core' -import ky, { KyInstance } from 'ky' - -type ThreadList = { - data: Thread[] -} - -type MessageList = { - data: ThreadMessage[] -} /** * JSONConversationalExtension is a ConversationalExtension implementation that provides * functionality for managing threads. */ export default class CortexConversationalExtension extends ConversationalExtension { - api?: KyInstance - /** - * Get the API instance - * @returns - */ - async apiInstance(): Promise { - if (this.api) return this.api - const apiKey = (await window.core?.api.appToken()) - this.api = ky.extend({ - prefixUrl: API_URL, - headers: apiKey - ? { - Authorization: `Bearer ${apiKey}`, - } - : {}, - retry: 10, - }) - return this.api - } /** * Called when the extension is loaded. */ @@ -54,12 +26,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * Returns a Promise that resolves to an array of Conversation objects. */ async listThreads(): Promise { - return this.apiInstance().then((api) => - api - .get('v1/threads?limit=-1') - .json() - .then((e) => e.data) - ) as Promise + return window.core.api.listThreads() } /** @@ -67,9 +34,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @param thread The Thread object to save. */ async createThread(thread: Thread): Promise { - return this.apiInstance().then((api) => - api.post('v1/threads', { json: thread }).json() - ) as Promise + return window.core.api.createThread({ thread }) } /** @@ -77,10 +42,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @param thread The Thread object to save. */ async modifyThread(thread: Thread): Promise { - return this.apiInstance() - .then((api) => api.patch(`v1/threads/${thread.id}`, { json: thread })) - - .then() + return window.core.api.modifyThread({ thread }) } /** @@ -88,9 +50,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @param threadId The ID of the thread to delete. */ async deleteThread(threadId: string): Promise { - return this.apiInstance() - .then((api) => api.delete(`v1/threads/${threadId}`)) - .then() + return window.core.api.deleteThread({ threadId }) } /** @@ -99,13 +59,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @returns A Promise that resolves when the message has been added. */ async createMessage(message: ThreadMessage): Promise { - return this.apiInstance().then((api) => - api - .post(`v1/threads/${message.thread_id}/messages`, { - json: message, - }) - .json() - ) as Promise + return window.core.api.createMessage({ message }) } /** @@ -114,13 +68,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @returns */ async modifyMessage(message: ThreadMessage): Promise { - return this.apiInstance().then((api) => - api - .patch(`v1/threads/${message.thread_id}/messages/${message.id}`, { - json: message, - }) - .json() - ) as Promise + return window.core.api.modifyMessage({ message }) } /** @@ -130,9 +78,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @returns A Promise that resolves when the message has been successfully deleted. */ async deleteMessage(threadId: string, messageId: string): Promise { - return this.apiInstance() - .then((api) => api.delete(`v1/threads/${threadId}/messages/${messageId}`)) - .then() + return window.core.api.deleteMessage({ threadId, messageId }) } /** @@ -141,12 +87,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * @returns A Promise that resolves to an array of ThreadMessage objects. */ async listMessages(threadId: string): Promise { - return this.apiInstance().then((api) => - api - .get(`v1/threads/${threadId}/messages?order=asc&limit=-1`) - .json() - .then((e) => e.data) - ) as Promise + return window.core.api.listMessages({ threadId }) } /** @@ -156,9 +97,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi * the details of the assistant associated with the specified thread. */ async getThreadAssistant(threadId: string): Promise { - return this.apiInstance().then((api) => - api.get(`v1/assistants/${threadId}?limit=-1`).json() - ) as Promise + return window.core.api.getThreadAssistant({ threadId }) } /** * Creates a new assistant for the specified thread. @@ -170,11 +109,7 @@ export default class CortexConversationalExtension extends ConversationalExtensi threadId: string, assistant: ThreadAssistantInfo ): Promise { - return this.apiInstance().then((api) => - api - .post(`v1/assistants/${threadId}`, { json: assistant }) - .json() - ) as Promise + return window.core.api.createThreadAssistant(threadId, assistant) } /** @@ -187,10 +122,6 @@ export default class CortexConversationalExtension extends ConversationalExtensi threadId: string, assistant: ThreadAssistantInfo ): Promise { - return this.apiInstance().then((api) => - api - .patch(`v1/assistants/${threadId}`, { json: assistant }) - .json() - ) as Promise + return window.core.api.modifyThreadAssistant({ threadId, assistant }) } } diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 5ef4e7a4e..f8444dcba 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -40,6 +40,8 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai "transport-child-process", "tower", ] } +uuid = { version = "1.7", features = ["v4"] } [target.'cfg(not(any(target_os = "android", target_os = "ios")))'.dependencies] tauri-plugin-updater = "2" +once_cell = "1.18" diff --git a/src-tauri/src/core/cmd.rs b/src-tauri/src/core/cmd.rs index 0fe706a1a..ca3d051af 100644 --- a/src-tauri/src/core/cmd.rs +++ b/src-tauri/src/core/cmd.rs @@ -93,6 +93,10 @@ pub fn update_app_configuration( #[tauri::command] pub fn get_jan_data_folder_path(app_handle: tauri::AppHandle) -> PathBuf { + if cfg!(test) { + return PathBuf::from("./data"); + } + let app_configurations = get_app_configurations(app_handle); PathBuf::from(app_configurations.data_folder) } diff --git a/src-tauri/src/core/fs.rs b/src-tauri/src/core/fs.rs index 9e77a812c..c0d7d423d 100644 --- a/src-tauri/src/core/fs.rs +++ b/src-tauri/src/core/fs.rs @@ -107,6 +107,7 @@ mod tests { use super::*; use std::fs::{self, File}; use std::io::Write; + use serde_json::to_string; use tauri::test::mock_app; #[test] @@ -154,9 +155,11 @@ mod tests { fn test_exists_sync() { let app = mock_app(); let path = "file://test_exists_sync_file"; - let file_path = get_jan_data_folder_path(app.handle().clone()).join(path); + let dir_path = get_jan_data_folder_path(app.handle().clone()); + fs::create_dir_all(&dir_path).unwrap(); + let file_path = dir_path.join("test_exists_sync_file"); File::create(&file_path).unwrap(); - let args = vec![path.to_string()]; + let args: Vec = vec![path.to_string()]; let result = exists_sync(app.handle().clone(), args).unwrap(); assert!(result); fs::remove_file(file_path).unwrap(); @@ -166,7 +169,9 @@ mod tests { fn test_read_file_sync() { let app = mock_app(); let path = "file://test_read_file_sync_file"; - let file_path = get_jan_data_folder_path(app.handle().clone()).join(path); + let dir_path = get_jan_data_folder_path(app.handle().clone()); + fs::create_dir_all(&dir_path).unwrap(); + let file_path = dir_path.join("test_read_file_sync_file"); let mut file = File::create(&file_path).unwrap(); file.write_all(b"test content").unwrap(); let args = vec![path.to_string()]; @@ -184,7 +189,7 @@ mod tests { File::create(dir_path.join("file1.txt")).unwrap(); File::create(dir_path.join("file2.txt")).unwrap(); - let args = vec![path.to_string()]; + let args = vec![dir_path.to_string_lossy().to_string()]; let result = readdir_sync(app.handle().clone(), args).unwrap(); assert_eq!(result.len(), 2); diff --git a/src-tauri/src/core/mod.rs b/src-tauri/src/core/mod.rs index e4f0ee6c4..8d4edde3c 100644 --- a/src-tauri/src/core/mod.rs +++ b/src-tauri/src/core/mod.rs @@ -4,3 +4,5 @@ pub mod mcp; pub mod server; pub mod setup; pub mod state; +pub mod threads; +pub mod utils; \ No newline at end of file diff --git a/src-tauri/src/core/threads.rs b/src-tauri/src/core/threads.rs new file mode 100644 index 000000000..e4b8f2e52 --- /dev/null +++ b/src-tauri/src/core/threads.rs @@ -0,0 +1,598 @@ +/*! + Thread and Message Persistence Module + + This module provides all logic for managing threads and their messages, including creation, modification, deletion, and listing. + Messages for each thread are persisted in a JSONL file (messages.jsonl) per thread directory. + + **Concurrency and Consistency Guarantee:** + - All operations that write or modify messages for a thread are protected by a global, per-thread asynchronous lock. + - This design ensures that only one operation can write to a thread's messages.jsonl file at a time, preventing race conditions. + - As a result, the messages.jsonl file for each thread is always consistent and never corrupted, even under concurrent access. +*/ + +use serde::{Deserialize, Serialize}; +use std::fs::{self, File}; +use std::io::{BufRead, BufReader, Write}; +use tauri::command; +use tauri::Runtime; +use uuid::Uuid; + +// For async file write serialization +use once_cell::sync::Lazy; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; + +// Global per-thread locks for message file writes +static MESSAGE_LOCKS: Lazy>>>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +use super::utils::{ + ensure_data_dirs, ensure_thread_dir_exists, get_data_dir, get_messages_path, get_thread_dir, + get_thread_metadata_path, THREADS_FILE, +}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Thread { + pub id: String, + pub object: String, + pub title: String, + pub assistants: Vec, + pub created: i64, + pub updated: i64, + pub metadata: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ThreadMessage { + pub id: String, + pub object: String, + pub thread_id: String, + pub assistant_id: Option, + pub attachments: Option>, + pub role: String, + pub content: Vec, + pub status: String, + pub created_at: i64, + pub completed_at: i64, + pub metadata: Option, + pub type_: Option, + pub error_code: Option, + pub tool_call_id: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Attachment { + pub file_id: Option, + pub tools: Option>, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type")] +pub enum Tool { + #[serde(rename = "file_search")] + FileSearch, + #[serde(rename = "code_interpreter")] + CodeInterpreter, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ThreadContent { + pub type_: String, + pub text: Option, + pub image_url: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ContentValue { + pub value: String, + pub annotations: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ImageContentValue { + pub detail: Option, + pub url: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ThreadAssistantInfo { + pub assistant_id: String, + pub assistant_name: String, + pub model: ModelInfo, + pub instructions: Option, + pub tools: Option>, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ModelInfo { + pub id: String, + pub name: String, + pub settings: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(tag = "type")] +pub enum AssistantTool { + #[serde(rename = "code_interpreter")] + CodeInterpreter, + #[serde(rename = "retrieval")] + Retrieval, + #[serde(rename = "function")] + Function { + name: String, + description: Option, + parameters: Option, + }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ThreadState { + pub has_more: bool, + pub waiting_for_response: bool, + pub error: Option, + pub last_message: Option, +} + +/// Lists all threads by reading their metadata from the threads directory. +/// Returns a vector of thread metadata as JSON values. +#[command] +pub async fn list_threads( + app_handle: tauri::AppHandle, +) -> Result, String> { + ensure_data_dirs(app_handle.clone())?; + let data_dir = get_data_dir(app_handle.clone()); + let mut threads = Vec::new(); + + if !data_dir.exists() { + return Ok(threads); + } + + for entry in fs::read_dir(&data_dir).map_err(|e| e.to_string())? { + let entry = entry.map_err(|e| e.to_string())?; + let path = entry.path(); + if path.is_dir() { + let thread_metadata_path = path.join(THREADS_FILE); + if thread_metadata_path.exists() { + let data = fs::read_to_string(&thread_metadata_path).map_err(|e| e.to_string())?; + match serde_json::from_str(&data) { + Ok(thread) => threads.push(thread), + Err(e) => { + println!("Failed to parse thread file: {}", e); + continue; // skip invalid thread files + } + } + } + } + } + + Ok(threads) +} + +/// Creates a new thread, assigns it a unique ID, and persists its metadata. +/// Ensures the thread directory exists and writes thread.json. +#[command] +pub async fn create_thread( + app_handle: tauri::AppHandle, + mut thread: serde_json::Value, +) -> Result { + ensure_data_dirs(app_handle.clone())?; + let uuid = Uuid::new_v4().to_string(); + thread["id"] = serde_json::Value::String(uuid.clone()); + let thread_dir = get_thread_dir(app_handle.clone(), &uuid); + if !thread_dir.exists() { + fs::create_dir_all(&thread_dir).map_err(|e| e.to_string())?; + } + let path = get_thread_metadata_path(app_handle.clone(), &uuid); + let data = serde_json::to_string_pretty(&thread).map_err(|e| e.to_string())?; + fs::write(path, data).map_err(|e| e.to_string())?; + Ok(thread) +} + +/// Modifies an existing thread's metadata by overwriting its thread.json file. +/// Returns an error if the thread directory does not exist. +#[command] +pub async fn modify_thread( + app_handle: tauri::AppHandle, + thread: serde_json::Value, +) -> Result<(), String> { + let thread_id = thread + .get("id") + .and_then(|id| id.as_str()) + .ok_or("Missing thread id")?; + let thread_dir = get_thread_dir(app_handle.clone(), thread_id); + if !thread_dir.exists() { + return Err("Thread directory does not exist".to_string()); + } + let path = get_thread_metadata_path(app_handle.clone(), thread_id); + let data = serde_json::to_string_pretty(&thread).map_err(|e| e.to_string())?; + fs::write(path, data).map_err(|e| e.to_string())?; + Ok(()) +} + +/// Deletes a thread and all its associated files by removing its directory. +#[command] +pub async fn delete_thread( + app_handle: tauri::AppHandle, + thread_id: String, +) -> Result<(), String> { + let thread_dir = get_thread_dir(app_handle.clone(), &thread_id); + if thread_dir.exists() { + fs::remove_dir_all(thread_dir).map_err(|e| e.to_string())?; + } + Ok(()) +} + +/// Lists all messages for a given thread by reading and parsing its messages.jsonl file. +/// Returns a vector of message JSON values. +#[command] +pub async fn list_messages( + app_handle: tauri::AppHandle, + thread_id: String, +) -> Result, String> { + let path = get_messages_path(app_handle, &thread_id); + if !path.exists() { + return Ok(vec![]); + } + + let file = File::open(&path).map_err(|e| { + eprintln!("Error opening file {}: {}", path.display(), e); + e.to_string() + })?; + let reader = BufReader::new(file); + + let mut messages = Vec::new(); + for line in reader.lines() { + let line = line.map_err(|e| { + eprintln!("Error reading line from file {}: {}", path.display(), e); + e.to_string() + })?; + let message: serde_json::Value = serde_json::from_str(&line).map_err(|e| { + eprintln!( + "Error parsing JSON from line in file {}: {}", + path.display(), + e + ); + e.to_string() + })?; + messages.push(message); + } + + Ok(messages) +} + +/// Appends a new message to a thread's messages.jsonl file. +/// Uses a per-thread async lock to prevent race conditions and ensure file consistency. +#[command] +pub async fn create_message( + app_handle: tauri::AppHandle, + mut message: serde_json::Value, +) -> Result { + let thread_id = { + let id = message + .get("thread_id") + .and_then(|v| v.as_str()) + .ok_or("Missing thread_id")?; + id.to_string() + }; + ensure_thread_dir_exists(app_handle.clone(), &thread_id)?; + let path = get_messages_path(app_handle.clone(), &thread_id); + + let uuid = Uuid::new_v4().to_string(); + message["id"] = serde_json::Value::String(uuid); + + // Acquire per-thread lock before writing + { + let mut locks = MESSAGE_LOCKS.lock().await; + let lock = locks + .entry(thread_id.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + drop(locks); // Release the map lock before awaiting the file lock + + let _guard = lock.lock().await; + + let mut file = fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .map_err(|e| e.to_string())?; + + let data = serde_json::to_string(&message).map_err(|e| e.to_string())?; + writeln!(file, "{}", data).map_err(|e| e.to_string())?; + } + + Ok(message) +} + +/// Modifies an existing message in a thread's messages.jsonl file. +/// Uses a per-thread async lock to prevent race conditions and ensure file consistency. +/// Rewrites the entire messages.jsonl file for the thread. +#[command] +pub async fn modify_message( + app_handle: tauri::AppHandle, + message: serde_json::Value, +) -> Result { + let thread_id = message + .get("thread_id") + .and_then(|v| v.as_str()) + .ok_or("Missing thread_id")?; + let message_id = message + .get("id") + .and_then(|v| v.as_str()) + .ok_or("Missing message id")?; + + // Acquire per-thread lock before modifying + { + let mut locks = MESSAGE_LOCKS.lock().await; + let lock = locks + .entry(thread_id.to_string()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone(); + drop(locks); // Release the map lock before awaiting the file lock + + let _guard = lock.lock().await; + + let mut messages = list_messages(app_handle.clone(), thread_id.to_string()).await?; + if let Some(index) = messages + .iter() + .position(|m| m.get("id").and_then(|v| v.as_str()) == Some(message_id)) + { + messages[index] = message.clone(); + + // Rewrite all messages + let path = get_messages_path(app_handle.clone(), thread_id); + let mut file = File::create(path).map_err(|e| e.to_string())?; + for msg in messages { + let data = serde_json::to_string(&msg).map_err(|e| e.to_string())?; + writeln!(file, "{}", data).map_err(|e| e.to_string())?; + } + } + } + Ok(message) +} + +/// Deletes a message from a thread's messages.jsonl file by message ID. +/// Rewrites the entire messages.jsonl file for the thread. +#[command] +pub async fn delete_message( + app_handle: tauri::AppHandle, + thread_id: String, + message_id: String, +) -> Result<(), String> { + let mut messages = list_messages(app_handle.clone(), thread_id.clone()).await?; + messages.retain(|m| m.get("id").and_then(|v| v.as_str()) != Some(message_id.as_str())); + + // Rewrite remaining messages + let path = get_messages_path(app_handle.clone(), &thread_id); + let mut file = File::create(path).map_err(|e| e.to_string())?; + for msg in messages { + let data = serde_json::to_string(&msg).map_err(|e| e.to_string())?; + writeln!(file, "{}", data).map_err(|e| e.to_string())?; + } + + Ok(()) +} + +/// Retrieves the first assistant associated with a thread. +/// Returns an error if the thread or assistant is not found. +#[command] +pub async fn get_thread_assistant( + app_handle: tauri::AppHandle, + thread_id: String, +) -> Result { + let path = get_thread_metadata_path(app_handle, &thread_id); + if !path.exists() { + return Err("Thread not found".to_string()); + } + let data = fs::read_to_string(&path).map_err(|e| e.to_string())?; + let thread: serde_json::Value = serde_json::from_str(&data).map_err(|e| e.to_string())?; + if let Some(assistants) = thread.get("assistants").and_then(|a| a.as_array()) { + if let Some(first) = assistants.get(0) { + Ok(first.clone()) + } else { + Err("Assistant not found".to_string()) + } + } else { + Err("Assistant not found".to_string()) + } +} + +/// Adds a new assistant to a thread's metadata. +/// Updates thread.json with the new assistant information. +#[command] +pub async fn create_thread_assistant( + app_handle: tauri::AppHandle, + thread_id: String, + assistant: serde_json::Value, +) -> Result { + let path = get_thread_metadata_path(app_handle.clone(), &thread_id); + if !path.exists() { + return Err("Thread not found".to_string()); + } + let mut thread: serde_json::Value = { + let data = fs::read_to_string(&path).map_err(|e| e.to_string())?; + serde_json::from_str(&data).map_err(|e| e.to_string())? + }; + if let Some(assistants) = thread.get_mut("assistants").and_then(|a| a.as_array_mut()) { + assistants.push(assistant.clone()); + } else { + thread["assistants"] = serde_json::Value::Array(vec![assistant.clone()]); + } + let data = serde_json::to_string_pretty(&thread).map_err(|e| e.to_string())?; + fs::write(&path, data).map_err(|e| e.to_string())?; + Ok(assistant) +} + +/// Modifies an existing assistant's information in a thread's metadata. +/// Updates thread.json with the modified assistant data. +#[command] +pub async fn modify_thread_assistant( + app_handle: tauri::AppHandle, + thread_id: String, + assistant: serde_json::Value, +) -> Result { + let path = get_thread_metadata_path(app_handle.clone(), &thread_id); + if !path.exists() { + return Err("Thread not found".to_string()); + } + let mut thread: serde_json::Value = { + let data = fs::read_to_string(&path).map_err(|e| e.to_string())?; + serde_json::from_str(&data).map_err(|e| e.to_string())? + }; + let assistant_id = assistant + .get("id") + .and_then(|v| v.as_str()) + .ok_or("Missing assistant_id")?; + if let Some(assistants) = thread + .get_mut("assistants") + .and_then(|a: &mut serde_json::Value| a.as_array_mut()) + { + if let Some(index) = assistants + .iter() + .position(|a| a.get("id").and_then(|v| v.as_str()) == Some(assistant_id)) + { + assistants[index] = assistant.clone(); + let data = serde_json::to_string_pretty(&thread).map_err(|e| e.to_string())?; + fs::write(&path, data).map_err(|e| e.to_string())?; + } + } + Ok(assistant) +} + +#[cfg(test)] +mod tests { + use crate::core::cmd::get_jan_data_folder_path; + + use super::*; + use serde_json::json; + use std::fs; + use std::path::PathBuf; + use tauri::test::{mock_app, MockRuntime}; + + // Helper to create a mock app handle with a temp data dir + fn mock_app_with_temp_data_dir() -> (tauri::App, PathBuf) { + let app = mock_app(); + let data_dir = get_jan_data_folder_path(app.handle().clone()); + println!("Mock app data dir: {}", data_dir.display()); + // Patch get_data_dir to use temp dir (requires get_data_dir to be overridable or injectable) + // For now, we assume get_data_dir uses tauri::api::path::app_data_dir(&app_handle) + // and that we can set the environment variable to redirect it. + (app, data_dir) + } + + #[tokio::test] + async fn test_create_and_list_threads() { + let (app, data_dir) = mock_app_with_temp_data_dir(); + // Create a thread + let thread = json!({ + "object": "thread", + "title": "Test Thread", + "assistants": [], + "created": 1234567890, + "updated": 1234567890, + "metadata": null + }); + let created = create_thread(app.handle().clone(), thread.clone()) + .await + .unwrap(); + assert_eq!(created["title"], "Test Thread"); + + // List threads + let threads = list_threads(app.handle().clone()).await.unwrap(); + assert!(threads.len() > 0); + + // Clean up + fs::remove_dir_all(data_dir).unwrap(); + } + + #[tokio::test] + async fn test_create_and_list_messages() { + let (app, data_dir) = mock_app_with_temp_data_dir(); + // Create a thread first + let thread = json!({ + "object": "thread", + "title": "Msg Thread", + "assistants": [], + "created": 123, + "updated": 123, + "metadata": null + }); + let created = create_thread(app.handle().clone(), thread.clone()) + .await + .unwrap(); + let thread_id = created["id"].as_str().unwrap().to_string(); + + // Create a message + let message = json!({ + "object": "message", + "thread_id": thread_id, + "assistant_id": null, + "attachments": null, + "role": "user", + "content": [], + "status": "sent", + "created_at": 123, + "completed_at": 123, + "metadata": null, + "type_": null, + "error_code": null, + "tool_call_id": null + }); + let created_msg = create_message(app.handle().clone(), message).await.unwrap(); + assert_eq!(created_msg["role"], "user"); + + // List messages + let messages = list_messages(app.handle().clone(), thread_id.clone()) + .await + .unwrap(); + assert!(messages.len() > 0); + assert_eq!(messages[0]["role"], "user"); + + // Clean up + fs::remove_dir_all(data_dir).unwrap(); + } + + #[tokio::test] + async fn test_create_and_get_thread_assistant() { + let (app, data_dir) = mock_app_with_temp_data_dir(); + // Create a thread + let thread = json!({ + "object": "thread", + "title": "Assistant Thread", + "assistants": [], + "created": 1, + "updated": 1, + "metadata": null + }); + let created = create_thread(app.handle().clone(), thread.clone()) + .await + .unwrap(); + let thread_id = created["id"].as_str().unwrap().to_string(); + + // Add assistant + let assistant = json!({ + "id": "assistant-1", + "assistant_name": "Test Assistant", + "model": { + "id": "model-1", + "name": "Test Model", + "settings": json!({}) + }, + "instructions": null, + "tools": null + }); + let _ = create_thread_assistant(app.handle().clone(), thread_id.clone(), assistant.clone()) + .await + .unwrap(); + + // Get assistant + let got = get_thread_assistant(app.handle().clone(), thread_id.clone()) + .await + .unwrap(); + assert_eq!(got["assistant_name"], "Test Assistant"); + + // Clean up + fs::remove_dir_all(data_dir).unwrap(); + } +} diff --git a/src-tauri/src/core/utils/mod.rs b/src-tauri/src/core/utils/mod.rs new file mode 100644 index 000000000..7f80e6f3a --- /dev/null +++ b/src-tauri/src/core/utils/mod.rs @@ -0,0 +1,48 @@ +use std::fs; +use std::path::PathBuf; +use tauri::Runtime; + +use super::cmd::get_jan_data_folder_path; + +pub const THREADS_DIR: &str = "threads"; +pub const THREADS_FILE: &str = "thread.json"; +pub const MESSAGES_FILE: &str = "messages.jsonl"; + +pub fn get_data_dir(app_handle: tauri::AppHandle) -> PathBuf { + get_jan_data_folder_path(app_handle).join(THREADS_DIR) +} + +pub fn get_thread_dir(app_handle: tauri::AppHandle, thread_id: &str) -> PathBuf { + get_data_dir(app_handle).join(thread_id) +} + +pub fn get_thread_metadata_path( + app_handle: tauri::AppHandle, + thread_id: &str, +) -> PathBuf { + get_thread_dir(app_handle, thread_id).join(THREADS_FILE) +} + +pub fn get_messages_path(app_handle: tauri::AppHandle, thread_id: &str) -> PathBuf { + get_thread_dir(app_handle, thread_id).join(MESSAGES_FILE) +} + +pub fn ensure_data_dirs(app_handle: tauri::AppHandle) -> Result<(), String> { + let data_dir = get_data_dir(app_handle.clone()); + if !data_dir.exists() { + fs::create_dir_all(&data_dir).map_err(|e| e.to_string())?; + } + Ok(()) +} + +pub fn ensure_thread_dir_exists( + app_handle: tauri::AppHandle, + thread_id: &str, +) -> Result<(), String> { + ensure_data_dirs(app_handle.clone())?; + let thread_dir = get_thread_dir(app_handle, thread_id); + if !thread_dir.exists() { + fs::create_dir(&thread_dir).map_err(|e| e.to_string())?; + } + Ok(()) +} diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 40cd83f57..f37d97ae2 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -41,7 +41,19 @@ pub fn run() { core::cmd::stop_server, // MCP commands core::cmd::get_tools, - core::cmd::call_tool + core::cmd::call_tool, + // Threads + core::threads::list_threads, + core::threads::create_thread, + core::threads::modify_thread, + core::threads::delete_thread, + core::threads::list_messages, + core::threads::create_message, + core::threads::modify_message, + core::threads::delete_message, + core::threads::get_thread_assistant, + core::threads::create_thread_assistant, + core::threads::modify_thread_assistant ]) .manage(AppState { app_token: Some(generate_app_token()), diff --git a/web/screens/Thread/ThreadCenterPanel/TextMessage/index.tsx b/web/screens/Thread/ThreadCenterPanel/TextMessage/index.tsx index 522b27a0d..3af675de5 100644 --- a/web/screens/Thread/ThreadCenterPanel/TextMessage/index.tsx +++ b/web/screens/Thread/ThreadCenterPanel/TextMessage/index.tsx @@ -68,7 +68,10 @@ const MessageContainer: React.FC< ) const attachedFile = useMemo( - () => 'attachments' in props && props.attachments?.length, + () => + 'attachments' in props && + !!props.attachments?.length && + props.attachments?.length > 0, [props] ) diff --git a/web/services/tauriService.ts b/web/services/tauriService.ts index 488593658..2c592f951 100644 --- a/web/services/tauriService.ts +++ b/web/services/tauriService.ts @@ -8,6 +8,17 @@ export const Routes = [ 'installExtensions', 'getTools', 'callTool', + 'listThreads', + 'createThread', + 'modifyThread', + 'deleteThread', + 'listMessages', + 'createMessage', + 'modifyMessage', + 'deleteMessage', + 'getThreadAssistant', + 'createThreadAssistant', + 'modifyThreadAssistant', ].map((r) => ({ path: `app`, route: r,