jan/app-backend/worker/worker.ts
James 4017f0f22b fix: remove parsing json error and trim llm answer
Signed-off-by: James <james@jan.ai>
2023-09-05 21:06:24 -07:00

197 lines
5.2 KiB
TypeScript

export interface Env {
HASURA_ADMIN_API_KEY: string;
LLM_INFERENCE_ENDPOINT: string;
INFERENCE_API_KEY: string;
HASURA_GRAPHQL_ENGINE_ENDPOINT: string;
}
export default {
async fetch(request: Request, env: Env) {
return handleRequest(env, request);
},
};
async function handleRequest(env: Env, request: Request) {
const apiurl = env.LLM_INFERENCE_ENDPOINT;
const requestBody = await request.json();
let lastCallTime = 0;
let timeoutId: any;
let done = true;
function throttle(fn: () => void, delay: number) {
return async function () {
const now = new Date().getTime();
const timeSinceLastCall = now - lastCallTime;
if (timeSinceLastCall >= delay && done) {
lastCallTime = now;
done = false;
await fn();
done = true;
} else {
clearTimeout(timeoutId);
timeoutId = setTimeout(async () => {
lastCallTime = now;
done = false;
await fn();
done = true;
}, delay - timeSinceLastCall);
}
};
}
const messageBody = {
id: requestBody.event.data.new.id,
content: requestBody.event.data.new.content,
messages: requestBody.event.data.new.prompt_cache,
status: requestBody.event.data.new.status,
};
if (messageBody.status !== "pending") {
return new Response(JSON.stringify({ status: "success" }), {
status: 200,
statusText: "success",
});
}
const llmRequestBody = {
messages: messageBody.messages,
stream: true,
model: "gpt-3.5-turbo",
max_tokens: 500,
};
const init = {
body: JSON.stringify(llmRequestBody),
headers: {
"Content-Type": "application/json",
Accept: "text/event-stream",
Authorization: "Access-Control-Allow-Origin: *",
"api-key": env.INFERENCE_API_KEY,
},
method: "POST",
};
return fetch(apiurl, init)
.then((res) => res.body?.getReader())
.then(async (reader) => {
if (!reader) {
console.error("Error: fail to read data from response");
return;
}
let answer = "";
let cachedChunk = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
const textDecoder = new TextDecoder("utf-8");
const chunk = textDecoder.decode(value);
cachedChunk += chunk;
const matched = cachedChunk.match(/data: {(.*)}/g);
if (!matched) {
continue;
}
let deltaText = "";
for (const line of cachedChunk.split("\n")) {
const trimmedLine = line.trim();
if (!trimmedLine || trimmedLine === "data: [DONE]") {
continue;
}
const json = trimmedLine.replace("data: ", "");
try {
const obj = JSON.parse(json);
const content = obj.choices[0].delta.content;
if (content) deltaText = deltaText.concat(content);
} catch (e) {
}
}
cachedChunk = "";
answer = answer + deltaText;
const variables = {
id: messageBody.id,
data: {
content: answer.trim(),
},
};
throttle(async () => {
await fetch(env.HASURA_GRAPHQL_ENGINE_ENDPOINT + "/v1/graphql", {
method: "POST",
body: JSON.stringify({ query: updateMessageQuery, variables }),
headers: {
"Content-Type": "application/json",
"x-hasura-admin-secret": env.HASURA_ADMIN_API_KEY,
},
})
.catch((error) => {
console.error(error);
})
.finally(() => console.log("++-- request sent"));
}, 300)();
}
const variables = {
id: messageBody.id,
data: {
status: "ready",
prompt_cache: null,
},
};
await fetch(env.HASURA_GRAPHQL_ENGINE_ENDPOINT + "/v1/graphql", {
method: "POST",
body: JSON.stringify({ query: updateMessageQuery, variables }),
headers: {
"Content-Type": "application/json",
"x-hasura-admin-secret": env.HASURA_ADMIN_API_KEY,
},
}).catch((error) => {
console.error(error);
});
const convUpdateVars = {
id: requestBody.event.data.new.conversation_id,
content: answer
}
await fetch(env.HASURA_GRAPHQL_ENGINE_ENDPOINT + "/v1/graphql", {
method: "POST",
body: JSON.stringify({ query: updateConversationquery, variables: convUpdateVars }),
headers: {
"Content-Type": "application/json",
"x-hasura-admin-secret": env.HASURA_ADMIN_API_KEY,
},
}).catch((error) => {
console.error(error);
});
return new Response(JSON.stringify({ status: "success" }), {
status: 200,
statusText: "success",
});
});
}
const updateMessageQuery = `
mutation chatCompletions($id: uuid = "", $data: messages_set_input) {
update_messages_by_pk(pk_columns: {id: $id}, _set: $data) {
id
content
}
}
`;
const updateConversationquery = `
mutation updateConversation($id: uuid = "", $content: String = "") {
update_conversations_by_pk(pk_columns: {id: $id}, _set: {last_text_message: $content}) {
id
}
}
`