feat: Modify on-going response instead of creating new message to avoid message ID duplication

This commit is contained in:
Vanalite 2025-10-01 17:14:59 +07:00
parent f4b187ba11
commit ccca331d6c
5 changed files with 98 additions and 35 deletions

View File

@ -20,6 +20,13 @@ export interface MessageInterface {
*/
listMessages(threadId: string): Promise<ThreadMessage[]>
/**
* Updates an existing message in a thread.
* @param {ThreadMessage} message - The message to be updated (must have existing ID).
* @returns {Promise<ThreadMessage>} A promise that resolves to the updated message.
*/
modifyMessage(message: ThreadMessage): Promise<ThreadMessage>
/**
* Deletes a specific message from a thread.
* @param {string} threadId - The ID of the thread from which the message will be deleted.

View File

@ -20,7 +20,7 @@ import {
import { CompletionMessagesBuilder } from '@/lib/messages'
import { renderInstructions } from '@/lib/instructionTemplate'
import { ChatCompletionMessageToolCall } from 'openai/resources'
import { MessageStatus } from '@janhq/core'
import { MessageStatus, ContentType } from '@janhq/core'
import { useServiceHub } from '@/hooks/useServiceHub'
import { useToolApproval } from '@/hooks/useToolApproval'
@ -100,7 +100,8 @@ const processStreamingCompletion = async (
updateTokenSpeed: (message: ThreadMessage, increment?: number) => void,
updatePromptProgress: (progress: unknown) => void,
continueFromMessageId?: string,
updateMessage?: (message: ThreadMessage) => void
updateMessage?: (message: ThreadMessage) => void,
continueFromMessage?: ThreadMessage
) => {
// High-throughput scheduler: batch UI updates on rAF (requestAnimationFrame)
let rafScheduled = false
@ -117,9 +118,10 @@ const processStreamingCompletion = async (
)
// When continuing, update the message directly instead of using streamingContent
if (continueFromMessageId && updateMessage) {
if (continueFromMessageId && updateMessage && continueFromMessage) {
updateMessage({
...currentContent,
...continueFromMessage, // Preserve original message metadata
content: currentContent.content, // Update content
status: MessageStatus.Stopped, // Keep as Stopped while streaming
})
} else {
@ -567,7 +569,8 @@ export const useChat = () => {
updateTokenSpeed,
updatePromptProgress,
continueFromMessageId,
updateMessage
updateMessage,
continueFromMessage
)
}
} catch (error) {
@ -633,7 +636,10 @@ export const useChat = () => {
}
// Normal completion flow (abort is handled after loop exits)
builder.addAssistantMessage(accumulatedTextRef.value, undefined, toolCalls)
// Don't add assistant message to builder if continuing - it's already there
if (!continueFromMessageId) {
builder.addAssistantMessage(accumulatedTextRef.value, undefined, toolCalls)
}
const updatedMessage = await postMessageProcessing(
toolCalls,
builder,
@ -671,23 +677,40 @@ export const useChat = () => {
accumulatedTextRef.value.length > 0 &&
activeProvider?.provider === 'llamacpp'
) {
// Create final content for the partial message with Stopped status
const partialContent = {
...newAssistantThreadContent(
activeThread.id,
accumulatedTextRef.value,
{
// If continuing, update the existing message; otherwise add new
if (continueFromMessageId && continueFromMessage) {
// Preserve the original message metadata
updateMessage({
...continueFromMessage,
content: [
{
type: ContentType.Text,
text: {
value: accumulatedTextRef.value,
annotations: [],
},
},
],
status: MessageStatus.Stopped,
metadata: {
...continueFromMessage.metadata,
tokenSpeed: useAppState.getState().tokenSpeed,
assistant: currentAssistant,
}
),
status: MessageStatus.Stopped,
}
// If continuing, update the existing message; otherwise add new
if (continueFromMessageId) {
updateMessage({ ...partialContent, id: continueFromMessageId })
},
})
} else {
// Create final content for the partial message with Stopped status
const partialContent = {
...newAssistantThreadContent(
activeThread.id,
accumulatedTextRef.value,
{
tokenSpeed: useAppState.getState().tokenSpeed,
assistant: currentAssistant,
}
),
status: MessageStatus.Stopped,
}
addMessage(partialContent)
}
updatePromptProgress(undefined)
@ -708,22 +731,39 @@ export const useChat = () => {
// Use streaming content if available, otherwise use accumulatedTextRef
const contentText = streamingContent?.content?.[0]?.text?.value || accumulatedTextRef.value
const partialContent = {
...newAssistantThreadContent(
activeThread.id,
contentText,
{
// If continuing, update the existing message; otherwise add new
if (continueFromMessageId && continueFromMessage) {
// Preserve the original message metadata
updateMessage({
...continueFromMessage,
content: [
{
type: ContentType.Text,
text: {
value: contentText,
annotations: [],
},
},
],
status: MessageStatus.Stopped,
metadata: {
...continueFromMessage.metadata,
tokenSpeed: useAppState.getState().tokenSpeed,
assistant: currentAssistant,
}
),
status: MessageStatus.Stopped,
}
// If continuing, update the existing message; otherwise add new
if (continueFromMessageId) {
updateMessage({ ...partialContent, id: continueFromMessageId })
},
})
} else {
const partialContent = {
...newAssistantThreadContent(
activeThread.id,
contentText,
{
tokenSpeed: useAppState.getState().tokenSpeed,
assistant: currentAssistant,
}
),
status: MessageStatus.Stopped,
}
addMessage(partialContent)
}
updatePromptProgress(undefined)

View File

@ -83,8 +83,9 @@ export const useMessages = create<MessageState>()((set, get) => ({
},
}))
// Persist to storage asynchronously
getServiceHub().messages().createMessage(updatedMessage).catch((error) => {
// Persist to storage asynchronously using modifyMessage instead of createMessage
// to prevent duplicates when updating existing messages
getServiceHub().messages().modifyMessage(updatedMessage).catch((error) => {
console.error('Failed to persist message update:', error)
})
},

View File

@ -40,6 +40,20 @@ export class DefaultMessagesService implements MessagesService {
)
}
async modifyMessage(message: ThreadMessage): Promise<ThreadMessage> {
// Don't modify messages on server for temporary chat - it's local only
if (message.thread_id === TEMPORARY_CHAT_ID) {
return message
}
return (
ExtensionManager.getInstance()
.get<ConversationalExtension>(ExtensionTypeEnum.Conversational)
?.modifyMessage(message)
?.catch(() => message) ?? message
)
}
async deleteMessage(threadId: string, messageId: string): Promise<void> {
// Don't delete messages on server for temporary chat - it's local only
if (threadId === TEMPORARY_CHAT_ID) {

View File

@ -7,5 +7,6 @@ import { ThreadMessage } from '@janhq/core'
export interface MessagesService {
fetchMessages(threadId: string): Promise<ThreadMessage[]>
createMessage(message: ThreadMessage): Promise<ThreadMessage>
modifyMessage(message: ThreadMessage): Promise<ThreadMessage>
deleteMessage(threadId: string, messageId: string): Promise<void>
}