Merge pull request #159 from janhq/fix/#143-direct-communication-between-client-llm
fix: #143 - Direct communication between client and llm inference service
This commit is contained in:
commit
83d2e34bd7
@ -84,14 +84,3 @@ delete_permissions:
|
||||
user_id:
|
||||
_eq: X-Hasura-User-Id
|
||||
comment: ""
|
||||
event_triggers:
|
||||
- name: new_llm_message
|
||||
definition:
|
||||
enable_manual: false
|
||||
insert:
|
||||
columns: '*'
|
||||
retry_conf:
|
||||
interval_sec: 10
|
||||
num_retries: 0
|
||||
timeout_sec: 60
|
||||
webhook: '{{HASURA_EVENTS_HOOK_URL}}'
|
||||
|
||||
@ -1,16 +0,0 @@
|
||||
# Dockerfile
|
||||
|
||||
# alpine does not work with wrangler
|
||||
FROM node
|
||||
|
||||
RUN mkdir -p /worker
|
||||
|
||||
WORKDIR /worker
|
||||
|
||||
RUN npm install -g wrangler
|
||||
|
||||
COPY . /worker
|
||||
|
||||
EXPOSE 8787
|
||||
|
||||
CMD ["wrangler", "dev"]
|
||||
@ -1,196 +0,0 @@
|
||||
export interface Env {
|
||||
HASURA_ADMIN_API_KEY: string;
|
||||
LLM_INFERENCE_ENDPOINT: string;
|
||||
INFERENCE_API_KEY: string;
|
||||
HASURA_GRAPHQL_ENGINE_ENDPOINT: string;
|
||||
}
|
||||
|
||||
export default {
|
||||
async fetch(request: Request, env: Env) {
|
||||
return handleRequest(env, request);
|
||||
},
|
||||
};
|
||||
|
||||
async function handleRequest(env: Env, request: Request) {
|
||||
const apiurl = env.LLM_INFERENCE_ENDPOINT;
|
||||
const requestBody = await request.json();
|
||||
|
||||
let lastCallTime = 0;
|
||||
let timeoutId: any;
|
||||
let done = true;
|
||||
|
||||
function throttle(fn: () => void, delay: number) {
|
||||
return async function () {
|
||||
const now = new Date().getTime();
|
||||
const timeSinceLastCall = now - lastCallTime;
|
||||
|
||||
if (timeSinceLastCall >= delay && done) {
|
||||
lastCallTime = now;
|
||||
done = false;
|
||||
await fn();
|
||||
done = true;
|
||||
} else {
|
||||
clearTimeout(timeoutId);
|
||||
timeoutId = setTimeout(async () => {
|
||||
lastCallTime = now;
|
||||
done = false;
|
||||
await fn();
|
||||
done = true;
|
||||
}, delay - timeSinceLastCall);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const messageBody = {
|
||||
id: requestBody.event.data.new.id,
|
||||
content: requestBody.event.data.new.content,
|
||||
messages: requestBody.event.data.new.prompt_cache,
|
||||
status: requestBody.event.data.new.status,
|
||||
};
|
||||
|
||||
if (messageBody.status !== "pending") {
|
||||
return new Response(JSON.stringify({ status: "success" }), {
|
||||
status: 200,
|
||||
statusText: "success",
|
||||
});
|
||||
}
|
||||
|
||||
const llmRequestBody = {
|
||||
messages: messageBody.messages,
|
||||
stream: true,
|
||||
model: "gpt-3.5-turbo",
|
||||
max_tokens: 500,
|
||||
};
|
||||
|
||||
const init = {
|
||||
body: JSON.stringify(llmRequestBody),
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
Accept: "text/event-stream",
|
||||
Authorization: "Access-Control-Allow-Origin: *",
|
||||
"api-key": env.INFERENCE_API_KEY,
|
||||
},
|
||||
method: "POST",
|
||||
};
|
||||
return fetch(apiurl, init)
|
||||
.then((res) => res.body?.getReader())
|
||||
.then(async (reader) => {
|
||||
if (!reader) {
|
||||
console.error("Error: fail to read data from response");
|
||||
return;
|
||||
}
|
||||
let answer = "";
|
||||
let cachedChunk = "";
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
|
||||
const textDecoder = new TextDecoder("utf-8");
|
||||
const chunk = textDecoder.decode(value);
|
||||
cachedChunk += chunk;
|
||||
const matched = cachedChunk.match(/data: {(.*)}/g);
|
||||
if (!matched) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let deltaText = "";
|
||||
for (const line of cachedChunk.split("\n")) {
|
||||
const trimmedLine = line.trim();
|
||||
if (!trimmedLine || trimmedLine === "data: [DONE]") {
|
||||
continue;
|
||||
}
|
||||
|
||||
const json = trimmedLine.replace("data: ", "");
|
||||
try {
|
||||
const obj = JSON.parse(json);
|
||||
const content = obj.choices[0].delta.content;
|
||||
if (content) deltaText = deltaText.concat(content);
|
||||
} catch (e) {
|
||||
}
|
||||
}
|
||||
cachedChunk = "";
|
||||
|
||||
answer = answer + deltaText;
|
||||
|
||||
const variables = {
|
||||
id: messageBody.id,
|
||||
data: {
|
||||
content: answer.trim(),
|
||||
},
|
||||
};
|
||||
|
||||
throttle(async () => {
|
||||
await fetch(env.HASURA_GRAPHQL_ENGINE_ENDPOINT + "/v1/graphql", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ query: updateMessageQuery, variables }),
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-hasura-admin-secret": env.HASURA_ADMIN_API_KEY,
|
||||
},
|
||||
})
|
||||
.catch((error) => {
|
||||
console.error(error);
|
||||
})
|
||||
.finally(() => console.log("++-- request sent"));
|
||||
}, 300)();
|
||||
}
|
||||
|
||||
const variables = {
|
||||
id: messageBody.id,
|
||||
data: {
|
||||
status: "ready",
|
||||
prompt_cache: null,
|
||||
},
|
||||
};
|
||||
|
||||
await fetch(env.HASURA_GRAPHQL_ENGINE_ENDPOINT + "/v1/graphql", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ query: updateMessageQuery, variables }),
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-hasura-admin-secret": env.HASURA_ADMIN_API_KEY,
|
||||
},
|
||||
}).catch((error) => {
|
||||
console.error(error);
|
||||
});
|
||||
|
||||
const convUpdateVars = {
|
||||
id: requestBody.event.data.new.conversation_id,
|
||||
content: answer
|
||||
}
|
||||
await fetch(env.HASURA_GRAPHQL_ENGINE_ENDPOINT + "/v1/graphql", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({ query: updateConversationquery, variables: convUpdateVars }),
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"x-hasura-admin-secret": env.HASURA_ADMIN_API_KEY,
|
||||
},
|
||||
}).catch((error) => {
|
||||
console.error(error);
|
||||
});
|
||||
|
||||
return new Response(JSON.stringify({ status: "success" }), {
|
||||
status: 200,
|
||||
statusText: "success",
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
const updateMessageQuery = `
|
||||
mutation chatCompletions($id: uuid = "", $data: messages_set_input) {
|
||||
update_messages_by_pk(pk_columns: {id: $id}, _set: $data) {
|
||||
id
|
||||
content
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const updateConversationquery = `
|
||||
mutation updateConversation($id: uuid = "", $content: String = "") {
|
||||
update_conversations_by_pk(pk_columns: {id: $id}, _set: {last_text_message: $content}) {
|
||||
id
|
||||
}
|
||||
}
|
||||
`
|
||||
@ -1,11 +0,0 @@
|
||||
name = "cloudlfare_worker"
|
||||
main = "worker.ts"
|
||||
compatibility_date = "2023-06-08"
|
||||
workers_dev = true
|
||||
|
||||
[vars]
|
||||
HASURA_GRAPHQL_ENGINE_ENDPOINT = "http://graphql-engine:8080"
|
||||
HASURA_ADMIN_API_KEY = "myadminsecretkey"
|
||||
LLM_INFERENCE_ENDPOINT="http://llm:8000/v1/chat/completions"
|
||||
INFERENCE_API_KEY=""
|
||||
PROJECT_ID = ""
|
||||
@ -5,6 +5,7 @@ NEXT_PUBLIC_DOWNLOAD_APP_IOS=#
|
||||
NEXT_PUBLIC_DOWNLOAD_APP_ANDROID=#
|
||||
NEXT_PUBLIC_GRAPHQL_ENGINE_URL=http://localhost:8080/v1/graphql
|
||||
NEXT_PUBLIC_GRAPHQL_ENGINE_WEB_SOCKET_URL=ws://localhost:8080/v1/graphql
|
||||
NEXT_PUBLIC_OPENAPI_ENDPOINT=http://localhost:8000/v1/completions
|
||||
KEYCLOAK_CLIENT_ID=hasura
|
||||
KEYCLOAK_CLIENT_SECRET=oMtCPAV7diKpE564SBspgKj4HqlKM4Hy
|
||||
AUTH_ISSUER=http://localhost:8088/realms/$KEYCLOAK_CLIENT_ID
|
||||
@ -12,4 +13,3 @@ NEXTAUTH_URL=http://localhost:3000
|
||||
NEXTAUTH_SECRET=my-secret
|
||||
END_SESSION_URL=http://localhost:8088/realms/$KEYCLOAK_CLIENT_ID/protocol/openid-connect/logout
|
||||
REFRESH_TOKEN_URL=http://localhost:8088/realms/$KEYCLOAK_CLIENT_ID/protocol/openid-connect/token
|
||||
HASURA_ADMIN_TOKEN=myadminsecretkey
|
||||
|
||||
@ -68,21 +68,6 @@ services:
|
||||
jan_community:
|
||||
ipv4_address: 172.20.0.12
|
||||
|
||||
worker:
|
||||
build:
|
||||
context: ./app-backend/worker
|
||||
dockerfile: ./Dockerfile
|
||||
restart: always
|
||||
environment:
|
||||
- "NODE_ENV=development"
|
||||
volumes:
|
||||
- ./app-backend/worker:/worker
|
||||
ports:
|
||||
- 8787:8787
|
||||
networks:
|
||||
jan_community:
|
||||
ipv4_address: 172.20.0.13
|
||||
|
||||
data-connector-agent:
|
||||
image: hasura/graphql-data-connector:v2.31.0
|
||||
restart: always
|
||||
|
||||
@ -33,8 +33,13 @@ For using our complete solution, check [this](https://github.com/janhq/jan)
|
||||
```
|
||||
yarn dev
|
||||
```
|
||||
4. **Regenerate Graphql:**
|
||||
|
||||
4. **Access Jan Web:**
|
||||
```
|
||||
HASURA_ADMIN_TOKEN="[hasura_admin_secret_key]" yarn generate
|
||||
```
|
||||
|
||||
5. **Access Jan Web:**
|
||||
|
||||
Open your web browser and navigate to `http://localhost:3000` to access the Jan Web application.
|
||||
|
||||
|
||||
@ -128,7 +128,6 @@ export const ChatBody: React.FC<Props> = observer(({ onPromptSelected }) => {
|
||||
const renderItem = (
|
||||
index: number,
|
||||
{
|
||||
id,
|
||||
messageType,
|
||||
senderAvatarUrl,
|
||||
senderName,
|
||||
@ -173,11 +172,9 @@ const renderItem = (
|
||||
) : (
|
||||
<StreamTextMessage
|
||||
key={index}
|
||||
id={id}
|
||||
avatarUrl={senderAvatarUrl ?? "/icons/app_icon.svg"}
|
||||
senderName={senderName}
|
||||
createdAt={createdAt}
|
||||
text={text}
|
||||
/>
|
||||
);
|
||||
default:
|
||||
|
||||
@ -1,13 +1,9 @@
|
||||
import React, { useEffect } from "react";
|
||||
import React from "react";
|
||||
import { displayDate } from "@/_utils/datetime";
|
||||
import { TextCode } from "../TextCode";
|
||||
import { getMessageCode } from "@/_utils/message";
|
||||
import { useSubscription } from "@apollo/client";
|
||||
import {
|
||||
SubscribeMessageDocument,
|
||||
SubscribeMessageSubscription,
|
||||
} from "@/graphql";
|
||||
import { useStore } from "@/_models/RootStore";
|
||||
import { StreamingText, StreamingTextURL, useTextBuffer } from "nextjs-openai";
|
||||
import { MessageSenderType } from "@/_models/ChatMessage";
|
||||
import { Role } from "@/_models/History";
|
||||
|
||||
type Props = {
|
||||
id?: string;
|
||||
@ -18,54 +14,52 @@ type Props = {
|
||||
};
|
||||
|
||||
const StreamTextMessage: React.FC<Props> = ({
|
||||
id,
|
||||
senderName,
|
||||
createdAt,
|
||||
avatarUrl = "",
|
||||
text = "",
|
||||
}) => {
|
||||
const [textMessage, setTextMessage] = React.useState(text);
|
||||
const [completedTyping, setCompletedTyping] = React.useState(false);
|
||||
const tokenIndex = React.useRef(0);
|
||||
const [data, setData] = React.useState<any | undefined>();
|
||||
const { historyStore } = useStore();
|
||||
const { data } = useSubscription<SubscribeMessageSubscription>(
|
||||
SubscribeMessageDocument,
|
||||
{
|
||||
variables: {
|
||||
id,
|
||||
const conversation = historyStore?.getActiveConversation();
|
||||
|
||||
React.useEffect(() => {
|
||||
const messages = conversation?.chatMessages.slice(-5).map((e) => ({
|
||||
role:
|
||||
e.messageSenderType === MessageSenderType.User
|
||||
? Role.User
|
||||
: Role.Assistant,
|
||||
content: e.text,
|
||||
}));
|
||||
setData({
|
||||
messages,
|
||||
stream: true,
|
||||
model: "gpt-3.5-turbo",
|
||||
max_tokens: 500,
|
||||
});
|
||||
}, [conversation]);
|
||||
|
||||
const { buffer, refresh, cancel } = useTextBuffer({
|
||||
url: `${process.env.NEXT_PUBLIC_OPENAPI_ENDPOINT}`,
|
||||
throttle: 100,
|
||||
data,
|
||||
|
||||
options: {
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const parsedBuffer = (buffer: String) => {
|
||||
try {
|
||||
const json = buffer.replace("data: ", "");
|
||||
return JSON.parse(json).choices[0].text;
|
||||
} catch (e) {
|
||||
return "";
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
useEffect(() => {
|
||||
if (
|
||||
data?.messages_by_pk?.content &&
|
||||
data.messages_by_pk.content.length > text.length
|
||||
) {
|
||||
historyStore.finishActiveConversationWaiting();
|
||||
}
|
||||
}, [data, text]); // eslint-disable-line react-hooks/exhaustive-deps
|
||||
|
||||
useEffect(() => {
|
||||
setCompletedTyping(false);
|
||||
|
||||
const stringResponse = data?.messages_by_pk?.content ?? text;
|
||||
|
||||
const intervalId = setInterval(() => {
|
||||
setTextMessage(stringResponse.slice(0, tokenIndex.current));
|
||||
|
||||
tokenIndex.current++;
|
||||
|
||||
if (tokenIndex.current > stringResponse.length) {
|
||||
clearInterval(intervalId);
|
||||
setCompletedTyping(true);
|
||||
}
|
||||
}, 20);
|
||||
|
||||
return () => clearInterval(intervalId);
|
||||
}, [data?.messages_by_pk?.content, text]);
|
||||
|
||||
return textMessage.length > 0 ? (
|
||||
return data ? (
|
||||
<div className="flex items-start gap-2">
|
||||
<img
|
||||
className="rounded-full"
|
||||
@ -83,20 +77,11 @@ const StreamTextMessage: React.FC<Props> = ({
|
||||
{displayDate(createdAt)}
|
||||
</div>
|
||||
</div>
|
||||
{textMessage.includes("```") ? (
|
||||
getMessageCode(textMessage).map((item, i) => (
|
||||
<div className="flex gap-1 flex-col" key={i}>
|
||||
<p className="leading-[20px] whitespace-break-spaces text-[14px] font-normal dark:text-[#d1d5db]">
|
||||
{item.text}
|
||||
</p>
|
||||
{item.code.trim().length > 0 && <TextCode text={item.code} />}
|
||||
<div className="leading-[20px] whitespace-break-spaces text-[14px] font-normal dark:text-[#d1d5db]">
|
||||
<StreamingText
|
||||
buffer={buffer.map((b) => parsedBuffer(b))}
|
||||
></StreamingText>
|
||||
</div>
|
||||
))
|
||||
) : (
|
||||
<p className="leading-[20px] whitespace-break-spaces text-[14px] font-normal dark:text-[#d1d5db]">
|
||||
{textMessage}
|
||||
</p>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
) : (
|
||||
|
||||
@ -8,7 +8,6 @@
|
||||
"start": "next start",
|
||||
"lint": "next lint",
|
||||
"compile": "tsc --noEmit -p . --pretty",
|
||||
"configure": "op read op://Shared/WebDevelopmentEnv/.env > .env && op read op://Shared/WebDevelopmentFirebaseConfig/firebase_configs.json > app/_services/firebase/firebase_configs.json",
|
||||
"generate": "graphql-codegen --config codegen.ts",
|
||||
"watch": "graphql-codegen -w"
|
||||
},
|
||||
@ -36,6 +35,8 @@
|
||||
"next": "13.4.10",
|
||||
"next-auth": "^4.23.1",
|
||||
"next-themes": "^0.2.1",
|
||||
"nextjs-openai": "^7.2.0",
|
||||
"openai-streams": "^6.2.0",
|
||||
"postcss": "8.4.26",
|
||||
"react": "18.2.0",
|
||||
"react-dom": "18.2.0",
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user