jan/autoqa/test_runner.py
Nguyen Ngoc Minh 5cbd79b525
fix: charmap encoding (#5865)
* fix: handle charmap encoding error

* enhancement: prompt template for new user flow
2025-07-22 23:33:12 +07:00

322 lines
14 KiB
Python

import os
import asyncio
import threading
import time
import logging
from datetime import datetime
from pathlib import Path
# from computer import Computer
from agent import ComputerAgent, LLM
from utils import is_jan_running, force_close_jan, start_jan_app, get_latest_trajectory_folder
from screen_recorder import ScreenRecorder
from reportportal_handler import upload_test_results_to_rp
from reportportal_client.helpers import timestamp
logger = logging.getLogger(__name__)
async def run_single_test_with_timeout(computer, test_data, rp_client, launch_id, max_turns=30,
jan_app_path=None, jan_process_name="Jan.exe", agent_config=None,
enable_reportportal=False):
"""
Run a single test case with turn count monitoring, forced stop, and screen recording
Returns dict with test result: {"success": bool, "status": str, "message": str}
"""
path = test_data['path']
prompt = test_data['prompt']
# Detect if using nightly version based on process name
is_nightly = "nightly" in jan_process_name.lower() if jan_process_name else False
# Default agent config if not provided
if agent_config is None:
agent_config = {
"loop": "uitars",
"model_provider": "oaicompat",
"model_name": "ByteDance-Seed/UI-TARS-1.5-7B",
"model_base_url": "http://10.200.108.58:1234/v1"
}
# Create trajectory_dir from path (remove .txt extension)
trajectory_name = str(Path(path).with_suffix(''))
trajectory_base_dir = os.path.abspath(f"trajectories/{trajectory_name.replace(os.sep, '/')}")
# Ensure trajectories directory exists
os.makedirs(os.path.dirname(trajectory_base_dir), exist_ok=True)
# Create recordings directory
recordings_dir = "recordings"
os.makedirs(recordings_dir, exist_ok=True)
# Create video filename
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_test_name = trajectory_name.replace('/', '_').replace('\\', '_')
video_filename = f"{safe_test_name}_{current_time}.mp4"
video_path = os.path.abspath(os.path.join(recordings_dir, video_filename))
# Initialize result tracking
test_result_data = {
"success": False,
"status": "UNKNOWN",
"message": "Test execution incomplete",
"trajectory_dir": None,
"video_path": video_path
}
logger.info(f"Starting test: {path}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Trajectory base directory: {trajectory_base_dir}")
logger.info(f"Screen recording will be saved to: {video_path}")
logger.info(f"Using model: {agent_config['model_name']} from {agent_config['model_base_url']}")
logger.info(f"ReportPortal upload: {'ENABLED' if enable_reportportal else 'DISABLED'}")
trajectory_dir = None
agent_task = None
monitor_stop_event = threading.Event()
force_stopped_due_to_turns = False # Track if test was force stopped
# Initialize screen recorder
recorder = ScreenRecorder(video_path, fps=10)
try:
# Step 1: Check and force close Jan app if running
if is_jan_running(jan_process_name):
logger.info("Jan application is running, force closing...")
force_close_jan(jan_process_name)
# Step 2: Start Jan app in maximized mode
if jan_app_path:
start_jan_app(jan_app_path)
else:
start_jan_app() # Use default path
# Step 3: Start screen recording
recorder.start_recording()
# Step 4: Create agent for this test using config
agent = ComputerAgent(
computer=computer,
loop=agent_config["loop"],
model=LLM(
provider=agent_config["model_provider"],
name=agent_config["model_name"],
provider_base_url=agent_config["model_base_url"]
),
trajectory_dir=trajectory_base_dir
)
# Step 5: Start monitoring thread
def monitor_thread():
nonlocal force_stopped_due_to_turns
while not monitor_stop_event.is_set():
try:
if os.path.exists(trajectory_base_dir):
folders = [f for f in os.listdir(trajectory_base_dir)
if os.path.isdir(os.path.join(trajectory_base_dir, f))]
if folders:
latest_folder = sorted(folders)[-1]
latest_folder_path = os.path.join(trajectory_base_dir, latest_folder)
if os.path.exists(latest_folder_path):
turn_folders = [f for f in os.listdir(latest_folder_path)
if os.path.isdir(os.path.join(latest_folder_path, f)) and f.startswith("turn_")]
turn_count = len(turn_folders)
logger.info(f"Current turn count: {turn_count}")
if turn_count >= max_turns:
logger.warning(f"Turn count exceeded {max_turns} for test {path}, forcing stop")
force_stopped_due_to_turns = True # Mark as force stopped
# Cancel the agent task
if agent_task and not agent_task.done():
agent_task.cancel()
monitor_stop_event.set()
return
# Check every 5 seconds
if not monitor_stop_event.wait(5):
continue
else:
break
except Exception as e:
logger.error(f"Error in monitor thread: {e}")
time.sleep(5)
# Start monitoring in background thread
monitor_thread_obj = threading.Thread(target=monitor_thread, daemon=True)
monitor_thread_obj.start()
# Step 6: Run the test with prompt
logger.info(f"Running test case: {path}")
try:
# Create the agent task
async def run_agent():
async for result in agent.run(prompt):
if monitor_stop_event.is_set():
logger.warning(f"Test {path} stopped due to turn limit")
break
logger.info(f"Test result for {path}: {result}")
print(result)
agent_task = asyncio.create_task(run_agent())
# Wait for agent task to complete or timeout
try:
await asyncio.wait_for(agent_task, timeout=600) # 10 minute timeout as backup
if not monitor_stop_event.is_set():
logger.info(f"Successfully completed test execution: {path}")
else:
logger.warning(f"Test {path} was stopped due to turn limit")
except asyncio.TimeoutError:
logger.warning(f"Test {path} timed out after 10 minutes")
agent_task.cancel()
except asyncio.CancelledError:
logger.warning(f"Test {path} was cancelled due to turn limit")
finally:
# Stop monitoring
monitor_stop_event.set()
except Exception as e:
logger.error(f"Error running test {path}: {e}")
monitor_stop_event.set()
# Update result data for exception case
test_result_data.update({
"success": False,
"status": "ERROR",
"message": f"Test execution failed with exception: {str(e)}",
"trajectory_dir": None
})
finally:
# Step 7: Stop screen recording
try:
recorder.stop_recording()
logger.info(f"Screen recording saved to: {video_path}")
except Exception as e:
logger.error(f"Error stopping screen recording: {e}")
# Step 8: Upload results to ReportPortal only if enabled
if enable_reportportal and rp_client and launch_id:
# Get trajectory folder first
trajectory_dir = get_latest_trajectory_folder(trajectory_base_dir)
try:
if trajectory_dir:
logger.info(f"Uploading results to ReportPortal for: {path}")
logger.info(f"Video path for upload: {video_path}")
logger.info(f"Video exists: {os.path.exists(video_path)}")
if os.path.exists(video_path):
logger.info(f"Video file size: {os.path.getsize(video_path)} bytes")
upload_test_results_to_rp(rp_client, launch_id, path, trajectory_dir, force_stopped_due_to_turns, video_path, is_nightly)
else:
logger.warning(f"Test completed but no trajectory found for: {path}")
# Handle case where test completed but no trajectory found
formatted_test_path = path.replace('\\', '/').replace('.txt', '').replace('/', '__')
test_item_id = rp_client.start_test_item(
launch_id=launch_id,
name=formatted_test_path,
start_time=timestamp(),
item_type="TEST"
)
rp_client.log(
time=timestamp(),
level="ERROR",
message="Test execution completed but no trajectory data found",
item_id=test_item_id
)
# Still upload video for failed test
if video_path and os.path.exists(video_path):
try:
with open(video_path, "rb") as video_file:
rp_client.log(
time=timestamp(),
level="INFO",
message="[INFO] Screen recording of failed test",
item_id=test_item_id,
attachment={
"name": f"failed_test_recording_{formatted_test_path}.mp4",
"data": video_file.read(),
"mime": "video/x-msvideo"
}
)
except Exception as e:
logger.error(f"Error uploading video for failed test: {e}")
rp_client.finish_test_item(
item_id=test_item_id,
end_time=timestamp(),
status="FAILED"
)
except Exception as upload_error:
logger.error(f"Error uploading results for {path}: {upload_error}")
else:
# For non-ReportPortal mode, still get trajectory for final results
trajectory_dir = get_latest_trajectory_folder(trajectory_base_dir)
# Always process results for consistency (both RP and local mode)
# trajectory_dir is already set above, no need to call get_latest_trajectory_folder again
if trajectory_dir:
# Extract test result for processing
from reportportal_handler import extract_test_result_from_trajectory
if force_stopped_due_to_turns:
final_status = "FAILED"
status_message = "exceeded maximum turn limit ({} turns)".format(max_turns)
test_result_data.update({
"success": False,
"status": final_status,
"message": status_message,
"trajectory_dir": trajectory_dir
})
else:
test_result = extract_test_result_from_trajectory(trajectory_dir)
if test_result is True:
final_status = "PASSED"
status_message = "completed successfully with positive result"
test_result_data.update({
"success": True,
"status": final_status,
"message": status_message,
"trajectory_dir": trajectory_dir
})
else:
final_status = "FAILED"
status_message = "no valid success result found"
test_result_data.update({
"success": False,
"status": final_status,
"message": status_message,
"trajectory_dir": trajectory_dir
})
if not enable_reportportal:
# Local development mode - log results
logger.info(f"[INFO] LOCAL RESULT: {path} - {final_status} ({status_message})")
logger.info(f"[INFO] Video saved: {video_path}")
logger.info(f"[INFO] Trajectory: {trajectory_dir}")
else:
final_status = "FAILED"
status_message = "no trajectory found"
test_result_data.update({
"success": False,
"status": final_status,
"message": status_message,
"trajectory_dir": None
})
if not enable_reportportal:
logger.warning(f"[INFO] LOCAL RESULT: {path} - {final_status} ({status_message})")
# Step 9: Always force close Jan app after test completion
logger.info(f"Cleaning up after test: {path}")
force_close_jan(jan_process_name)
# Return test result
return test_result_data