diff --git a/core/src/types/message/messageInterface.ts b/core/src/types/message/messageInterface.ts index 1ea04298a..784a3de88 100644 --- a/core/src/types/message/messageInterface.ts +++ b/core/src/types/message/messageInterface.ts @@ -20,6 +20,13 @@ export interface MessageInterface { */ listMessages(threadId: string): Promise + /** + * Updates an existing message in a thread. + * @param {ThreadMessage} message - The message to be updated (must have existing ID). + * @returns {Promise} A promise that resolves to the updated message. + */ + modifyMessage(message: ThreadMessage): Promise + /** * Deletes a specific message from a thread. * @param {string} threadId - The ID of the thread from which the message will be deleted. diff --git a/web-app/src/hooks/useChat.ts b/web-app/src/hooks/useChat.ts index 6f636cbec..a730e41e6 100644 --- a/web-app/src/hooks/useChat.ts +++ b/web-app/src/hooks/useChat.ts @@ -20,7 +20,7 @@ import { import { CompletionMessagesBuilder } from '@/lib/messages' import { renderInstructions } from '@/lib/instructionTemplate' import { ChatCompletionMessageToolCall } from 'openai/resources' -import { MessageStatus } from '@janhq/core' +import { MessageStatus, ContentType } from '@janhq/core' import { useServiceHub } from '@/hooks/useServiceHub' import { useToolApproval } from '@/hooks/useToolApproval' @@ -100,7 +100,8 @@ const processStreamingCompletion = async ( updateTokenSpeed: (message: ThreadMessage, increment?: number) => void, updatePromptProgress: (progress: unknown) => void, continueFromMessageId?: string, - updateMessage?: (message: ThreadMessage) => void + updateMessage?: (message: ThreadMessage) => void, + continueFromMessage?: ThreadMessage ) => { // High-throughput scheduler: batch UI updates on rAF (requestAnimationFrame) let rafScheduled = false @@ -117,9 +118,10 @@ const processStreamingCompletion = async ( ) // When continuing, update the message directly instead of using streamingContent - if (continueFromMessageId && updateMessage) { + if (continueFromMessageId && updateMessage && continueFromMessage) { updateMessage({ - ...currentContent, + ...continueFromMessage, // Preserve original message metadata + content: currentContent.content, // Update content status: MessageStatus.Stopped, // Keep as Stopped while streaming }) } else { @@ -567,7 +569,8 @@ export const useChat = () => { updateTokenSpeed, updatePromptProgress, continueFromMessageId, - updateMessage + updateMessage, + continueFromMessage ) } } catch (error) { @@ -633,7 +636,10 @@ export const useChat = () => { } // Normal completion flow (abort is handled after loop exits) - builder.addAssistantMessage(accumulatedTextRef.value, undefined, toolCalls) + // Don't add assistant message to builder if continuing - it's already there + if (!continueFromMessageId) { + builder.addAssistantMessage(accumulatedTextRef.value, undefined, toolCalls) + } const updatedMessage = await postMessageProcessing( toolCalls, builder, @@ -671,23 +677,40 @@ export const useChat = () => { accumulatedTextRef.value.length > 0 && activeProvider?.provider === 'llamacpp' ) { - // Create final content for the partial message with Stopped status - const partialContent = { - ...newAssistantThreadContent( - activeThread.id, - accumulatedTextRef.value, - { + // If continuing, update the existing message; otherwise add new + if (continueFromMessageId && continueFromMessage) { + // Preserve the original message metadata + updateMessage({ + ...continueFromMessage, + content: [ + { + type: ContentType.Text, + text: { + value: accumulatedTextRef.value, + annotations: [], + }, + }, + ], + status: MessageStatus.Stopped, + metadata: { + ...continueFromMessage.metadata, tokenSpeed: useAppState.getState().tokenSpeed, assistant: currentAssistant, - } - ), - status: MessageStatus.Stopped, - } - - // If continuing, update the existing message; otherwise add new - if (continueFromMessageId) { - updateMessage({ ...partialContent, id: continueFromMessageId }) + }, + }) } else { + // Create final content for the partial message with Stopped status + const partialContent = { + ...newAssistantThreadContent( + activeThread.id, + accumulatedTextRef.value, + { + tokenSpeed: useAppState.getState().tokenSpeed, + assistant: currentAssistant, + } + ), + status: MessageStatus.Stopped, + } addMessage(partialContent) } updatePromptProgress(undefined) @@ -708,22 +731,39 @@ export const useChat = () => { // Use streaming content if available, otherwise use accumulatedTextRef const contentText = streamingContent?.content?.[0]?.text?.value || accumulatedTextRef.value - const partialContent = { - ...newAssistantThreadContent( - activeThread.id, - contentText, - { + // If continuing, update the existing message; otherwise add new + if (continueFromMessageId && continueFromMessage) { + // Preserve the original message metadata + updateMessage({ + ...continueFromMessage, + content: [ + { + type: ContentType.Text, + text: { + value: contentText, + annotations: [], + }, + }, + ], + status: MessageStatus.Stopped, + metadata: { + ...continueFromMessage.metadata, tokenSpeed: useAppState.getState().tokenSpeed, assistant: currentAssistant, - } - ), - status: MessageStatus.Stopped, - } - - // If continuing, update the existing message; otherwise add new - if (continueFromMessageId) { - updateMessage({ ...partialContent, id: continueFromMessageId }) + }, + }) } else { + const partialContent = { + ...newAssistantThreadContent( + activeThread.id, + contentText, + { + tokenSpeed: useAppState.getState().tokenSpeed, + assistant: currentAssistant, + } + ), + status: MessageStatus.Stopped, + } addMessage(partialContent) } updatePromptProgress(undefined) diff --git a/web-app/src/hooks/useMessages.ts b/web-app/src/hooks/useMessages.ts index 7db28ff34..e6f516703 100644 --- a/web-app/src/hooks/useMessages.ts +++ b/web-app/src/hooks/useMessages.ts @@ -83,8 +83,9 @@ export const useMessages = create()((set, get) => ({ }, })) - // Persist to storage asynchronously - getServiceHub().messages().createMessage(updatedMessage).catch((error) => { + // Persist to storage asynchronously using modifyMessage instead of createMessage + // to prevent duplicates when updating existing messages + getServiceHub().messages().modifyMessage(updatedMessage).catch((error) => { console.error('Failed to persist message update:', error) }) }, diff --git a/web-app/src/services/messages/default.ts b/web-app/src/services/messages/default.ts index 8dbd63181..6e7b8f535 100644 --- a/web-app/src/services/messages/default.ts +++ b/web-app/src/services/messages/default.ts @@ -40,6 +40,20 @@ export class DefaultMessagesService implements MessagesService { ) } + async modifyMessage(message: ThreadMessage): Promise { + // Don't modify messages on server for temporary chat - it's local only + if (message.thread_id === TEMPORARY_CHAT_ID) { + return message + } + + return ( + ExtensionManager.getInstance() + .get(ExtensionTypeEnum.Conversational) + ?.modifyMessage(message) + ?.catch(() => message) ?? message + ) + } + async deleteMessage(threadId: string, messageId: string): Promise { // Don't delete messages on server for temporary chat - it's local only if (threadId === TEMPORARY_CHAT_ID) { diff --git a/web-app/src/services/messages/types.ts b/web-app/src/services/messages/types.ts index 731de06d0..090b69904 100644 --- a/web-app/src/services/messages/types.ts +++ b/web-app/src/services/messages/types.ts @@ -7,5 +7,6 @@ import { ThreadMessage } from '@janhq/core' export interface MessagesService { fetchMessages(threadId: string): Promise createMessage(message: ThreadMessage): Promise + modifyMessage(message: ThreadMessage): Promise deleteMessage(threadId: string, messageId: string): Promise }