fix: app failed to load model sometime due to race condition (#1071)

This commit is contained in:
Louis 2023-12-19 11:10:07 +07:00 committed by GitHub
parent 99acaffd2c
commit e3c1787d57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 143 deletions

View File

@ -31,9 +31,7 @@
"dependencies": { "dependencies": {
"@janhq/core": "file:../../core", "@janhq/core": "file:../../core",
"download-cli": "^1.1.1", "download-cli": "^1.1.1",
"electron-log": "^5.0.1",
"fetch-retry": "^5.0.6", "fetch-retry": "^5.0.6",
"kill-port": "^2.0.1",
"path-browserify": "^1.0.1", "path-browserify": "^1.0.1",
"rxjs": "^7.8.1", "rxjs": "^7.8.1",
"systeminformation": "^5.21.20", "systeminformation": "^5.21.20",
@ -51,9 +49,7 @@
], ],
"bundleDependencies": [ "bundleDependencies": [
"tcp-port-used", "tcp-port-used",
"kill-port",
"fetch-retry", "fetch-retry",
"electron-log",
"systeminformation" "systeminformation"
] ]
} }

View File

@ -1,13 +1,10 @@
const fs = require("fs"); const fs = require("fs");
const kill = require("kill-port");
const path = require("path"); const path = require("path");
const { spawn } = require("child_process"); const { spawn } = require("child_process");
const tcpPortUsed = require("tcp-port-used"); const tcpPortUsed = require("tcp-port-used");
const fetchRetry = require("fetch-retry")(global.fetch); const fetchRetry = require("fetch-retry")(global.fetch);
const si = require("systeminformation"); const si = require("systeminformation");
const log = require("electron-log");
// The PORT to use for the Nitro subprocess // The PORT to use for the Nitro subprocess
const PORT = 3928; const PORT = 3928;
const LOCAL_HOST = "127.0.0.1"; const LOCAL_HOST = "127.0.0.1";
@ -18,19 +15,17 @@ const NITRO_HTTP_VALIDATE_MODEL_URL = `${NITRO_HTTP_SERVER_URL}/inferences/llama
const NITRO_HTTP_KILL_URL = `${NITRO_HTTP_SERVER_URL}/processmanager/destroy`; const NITRO_HTTP_KILL_URL = `${NITRO_HTTP_SERVER_URL}/processmanager/destroy`;
// The subprocess instance for Nitro // The subprocess instance for Nitro
let subprocess = null; let subprocess = undefined;
let currentModelFile = null; let currentModelFile = undefined;
let currentSettings = undefined;
/** /**
* Stops a Nitro subprocess. * Stops a Nitro subprocess.
* @param wrapper - The model wrapper. * @param wrapper - The model wrapper.
* @returns A Promise that resolves when the subprocess is terminated successfully, or rejects with an error message if the subprocess fails to terminate. * @returns A Promise that resolves when the subprocess is terminated successfully, or rejects with an error message if the subprocess fails to terminate.
*/ */
function stopModel(): Promise<ModelOperationResponse> { function stopModel(): Promise<void> {
return new Promise((resolve, reject) => { return killSubprocess();
checkAndUnloadNitro();
resolve({ error: undefined });
});
} }
/** /**
@ -45,9 +40,7 @@ async function initModel(wrapper: any): Promise<ModelOperationResponse> {
if (wrapper.model.engine !== "nitro") { if (wrapper.model.engine !== "nitro") {
return Promise.resolve({ error: "Not a nitro model" }); return Promise.resolve({ error: "Not a nitro model" });
} else { } else {
// Gather system information for CPU physical cores and memory
const nitroResourceProbe = await getResourcesInfo(); const nitroResourceProbe = await getResourcesInfo();
// Convert settings.prompt_template to system_prompt, user_prompt, ai_prompt // Convert settings.prompt_template to system_prompt, user_prompt, ai_prompt
if (wrapper.model.settings.prompt_template) { if (wrapper.model.settings.prompt_template) {
const promptTemplate = wrapper.model.settings.prompt_template; const promptTemplate = wrapper.model.settings.prompt_template;
@ -60,29 +53,28 @@ async function initModel(wrapper: any): Promise<ModelOperationResponse> {
wrapper.model.settings.ai_prompt = prompt.ai_prompt; wrapper.model.settings.ai_prompt = prompt.ai_prompt;
} }
const settings = { currentSettings = {
llama_model_path: currentModelFile, llama_model_path: currentModelFile,
...wrapper.model.settings, ...wrapper.model.settings,
// This is critical and requires real system information // This is critical and requires real system information
cpu_threads: nitroResourceProbe.numCpuPhysicalCore, cpu_threads: nitroResourceProbe.numCpuPhysicalCore,
}; };
log.info(`Load model settings: ${JSON.stringify(settings, null, 2)}`); return loadModel(nitroResourceProbe);
return ( }
// 1. Check if the port is used, if used, attempt to unload model / kill nitro process }
validateModelVersion()
.then(checkAndUnloadNitro) async function loadModel(nitroResourceProbe: any | undefined) {
// 2. Spawn the Nitro subprocess // Gather system information for CPU physical cores and memory
.then(await spawnNitroProcess(nitroResourceProbe)) if (!nitroResourceProbe) nitroResourceProbe = await getResourcesInfo();
// 4. Load the model into the Nitro subprocess (HTTP POST request) return killSubprocess()
.then(() => loadLLMModel(settings)) .then(() => spawnNitroProcess(nitroResourceProbe))
// 5. Check if the model is loaded successfully .then(() => loadLLMModel(currentSettings))
.then(validateModelStatus) .then(validateModelStatus)
.catch((err) => { .catch((err) => {
log.error("error: " + JSON.stringify(err)); console.log("error: ", err);
// TODO: Broadcast error so app could display proper error message
return { error: err, currentModelFile }; return { error: err, currentModelFile };
}) });
);
}
} }
function promptTemplateConverter(promptTemplate) { function promptTemplateConverter(promptTemplate) {
@ -138,13 +130,8 @@ function loadLLMModel(settings): Promise<Response> {
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
body: JSON.stringify(settings), body: JSON.stringify(settings),
retries: 5, retries: 3,
retryDelay: 1000, retryDelay: 500,
}).catch((err) => {
console.error(err);
log.error("error: " + JSON.stringify(err));
// Fetch error, Nitro server might not started properly
throw new Error("Model loading failed.");
}); });
} }
@ -164,8 +151,7 @@ async function validateModelStatus(): Promise<ModelOperationResponse> {
}, },
retries: 5, retries: 5,
retryDelay: 500, retryDelay: 500,
}) }).then(async (res: Response) => {
.then(async (res: Response) => {
// If the response is OK, check model_loaded status. // If the response is OK, check model_loaded status.
if (res.ok) { if (res.ok) {
const body = await res.json(); const body = await res.json();
@ -176,10 +162,6 @@ async function validateModelStatus(): Promise<ModelOperationResponse> {
} }
} }
return { error: "Model loading failed" }; return { error: "Model loading failed" };
})
.catch((err) => {
log.error("Model loading failed" + err.toString());
return { error: `Model loading failed.` };
}); });
} }
@ -187,44 +169,29 @@ async function validateModelStatus(): Promise<ModelOperationResponse> {
* Terminates the Nitro subprocess. * Terminates the Nitro subprocess.
* @returns A Promise that resolves when the subprocess is terminated successfully, or rejects with an error message if the subprocess fails to terminate. * @returns A Promise that resolves when the subprocess is terminated successfully, or rejects with an error message if the subprocess fails to terminate.
*/ */
function killSubprocess(): Promise<void> { async function killSubprocess(): Promise<void> {
fetch(NITRO_HTTP_KILL_URL, { const controller = new AbortController();
setTimeout(() => controller.abort(), 5000);
console.log("Start requesting to kill Nitro...");
return fetch(NITRO_HTTP_KILL_URL, {
method: "DELETE", method: "DELETE",
}).catch((err) => { signal: controller.signal,
console.error(err); })
.then(() => {
subprocess?.kill(); subprocess?.kill();
kill(PORT, "tcp").then(console.log).catch(console.log); subprocess = undefined;
subprocess = null; })
}); .catch(() => {})
return .then(() => tcpPortUsed.waitUntilFree(PORT, 300, 5000))
.then(() => console.log("Nitro is killed"));
} }
/**
* Check port is used or not, if used, attempt to unload model
* If unload failed, kill the port
*/
async function checkAndUnloadNitro() {
return tcpPortUsed.check(PORT, LOCAL_HOST).then(async (inUse) => {
// If inUse - try unload or kill process, otherwise do nothing
if (inUse) {
// Attempt to unload model
return fetch(NITRO_HTTP_UNLOAD_MODEL_URL, {
method: "GET",
}).catch((err) => {
console.error(err);
// Fallback to kill the port
return killSubprocess();
});
}
});
}
/** /**
* Look for the Nitro binary and execute it * Look for the Nitro binary and execute it
* Using child-process to spawn the process * Using child-process to spawn the process
* Should run exactly platform specified Nitro binary version * Should run exactly platform specified Nitro binary version
*/ */
async function spawnNitroProcess(nitroResourceProbe: any): Promise<any> { function spawnNitroProcess(nitroResourceProbe: any): Promise<any> {
console.log("Starting Nitro subprocess...");
return new Promise(async (resolve, reject) => { return new Promise(async (resolve, reject) => {
let binaryFolder = path.join(__dirname, "bin"); // Current directory by default let binaryFolder = path.join(__dirname, "bin"); // Current directory by default
let binaryName; let binaryName;
@ -243,7 +210,6 @@ async function spawnNitroProcess(nitroResourceProbe: any): Promise<any> {
} }
const binaryPath = path.join(binaryFolder, binaryName); const binaryPath = path.join(binaryFolder, binaryName);
// Execute the binary // Execute the binary
subprocess = spawn(binaryPath, [1, LOCAL_HOST, PORT], { subprocess = spawn(binaryPath, [1, LOCAL_HOST, PORT], {
cwd: binaryFolder, cwd: binaryFolder,
@ -255,77 +221,27 @@ async function spawnNitroProcess(nitroResourceProbe: any): Promise<any> {
}); });
subprocess.stderr.on("data", (data) => { subprocess.stderr.on("data", (data) => {
log.error("subprocess error:" + data.toString()); console.log("subprocess error:" + data.toString());
console.error(`stderr: ${data}`); console.error(`stderr: ${data}`);
}); });
subprocess.on("close", (code) => { subprocess.on("close", (code) => {
console.debug(`child process exited with code ${code}`); console.debug(`child process exited with code ${code}`);
subprocess = null; subprocess = null;
reject(`Nitro process exited. ${code ?? ""}`); reject(`child process exited with code ${code}`);
}); });
tcpPortUsed.waitUntilUsed(PORT, 300, 30000).then(() => { tcpPortUsed.waitUntilUsed(PORT, 300, 30000).then(() => {
resolve(nitroResourceProbe); resolve(nitroResourceProbe);
}); });
}); });
} }
/**
* Validate the model version, if it is GGUFv1, reject the promise
* @returns A Promise that resolves when the model is loaded successfully, or rejects with an error message if the model is not found or fails to load.
*/
function validateModelVersion(): Promise<void> {
log.info("validateModelVersion");
// Read the file
return new Promise((resolve, reject) => {
fs.open(currentModelFile, "r", (err, fd) => {
if (err) {
log.error("validateModelVersion error" + JSON.stringify(err));
console.error(err.message);
reject(err);
return;
}
// Buffer to store the byte
const buffer = Buffer.alloc(1);
// Model version will be the 5th byte of the file
fs.read(fd, buffer, 0, 1, 4, (err, bytesRead, buffer) => {
if (err) {
log.error("validateModelVersion open error" + JSON.stringify(err));
console.error(err.message);
fs.close(fd, (err) => {
log.error("validateModelVersion close error" + JSON.stringify(err));
if (err) console.error(err.message);
});
reject(err);
} else {
// Interpret the byte as ASCII
if (buffer[0] === 0x01) {
// This is GGUFv1, which is deprecated
reject("GGUFv1 model is deprecated, please try another model.");
}
}
// Close the file descriptor
fs.close(fd, (err) => {
if (err) console.error(err.message);
});
resolve();
});
});
});
}
function dispose() {
// clean other registered resources here
killSubprocess();
}
/** /**
* Get the system resources information * Get the system resources information
* TODO: Move to Core so that it can be reused
*/ */
async function getResourcesInfo(): Promise<ResourcesInfo> { function getResourcesInfo(): Promise<ResourcesInfo> {
return new Promise(async (resolve) => { return new Promise(async (resolve) => {
const cpu = await si.cpu(); const cpu = await si.cpu();
const mem = await si.mem(); const mem = await si.mem();
@ -338,6 +254,11 @@ async function getResourcesInfo(): Promise<ResourcesInfo> {
}); });
} }
function dispose() {
// clean other registered resources here
killSubprocess();
}
module.exports = { module.exports = {
initModel, initModel,
stopModel, stopModel,