feat: add autoqa (#5779)

* feat: add autoqa

* chore: add auto start computer_server

* chore: add ci autoqa windows

* chore: add ci support for both windows and linux

* chore: add ci support for macos

* chore: refactor auto qa

* chore: refactor autoqa workflow

* chore: fix upload turn
This commit is contained in:
hiento09 2025-07-18 15:22:31 +07:00 committed by GitHub
parent a56e58f69b
commit 4d44f4324d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 3257 additions and 1 deletions

View File

@ -0,0 +1,37 @@
name: Manual trigger AutoQA Test Runner
on:
workflow_dispatch:
inputs:
jan_app_url_windows:
description: 'URL to download Jan app for Windows (.exe)'
required: true
type: string
default: 'https://delta.jan.ai/nightly/Jan-nightly_0.6.5-758_x64-setup.exe'
jan_app_url_ubuntu:
description: 'URL to download Jan app for Ubuntu (.deb)'
required: true
type: string
default: 'https://delta.jan.ai/nightly/Jan-nightly_0.6.5-758_amd64.deb'
jan_app_url_macos:
description: 'URL to download Jan app for macOS (.dmg)'
required: true
type: string
default: 'https://delta.jan.ai/nightly/Jan-nightly_0.6.5-758_universal.dmg'
is_nightly:
description: 'Is this a nightly build?'
required: true
type: boolean
default: true
jobs:
call-autoqa-template:
uses: ./.github/workflows/autoqa-template.yml
with:
jan_app_windows_source: ${{ inputs.jan_app_url_windows }}
jan_app_ubuntu_source: ${{ inputs.jan_app_url_ubuntu }}
jan_app_macos_source: ${{ inputs.jan_app_url_macos }}
is_nightly: ${{ inputs.is_nightly }}
source_type: 'url'
secrets:
RP_TOKEN: ${{ secrets.RP_TOKEN }}

396
.github/workflows/autoqa-template.yml vendored Normal file
View File

@ -0,0 +1,396 @@
name: Auto QA Test Runner Template
on:
workflow_call:
inputs:
jan_app_windows_source:
description: 'Windows app source - can be URL or local path'
required: true
type: string
jan_app_ubuntu_source:
description: 'Ubuntu app source - can be URL or local path'
required: true
type: string
jan_app_macos_source:
description: 'macOS app source - can be URL or local path'
required: true
type: string
is_nightly:
description: 'Is this a nightly build?'
required: true
type: boolean
default: true
source_type:
description: 'Source type: url or local'
required: true
type: string
default: 'url'
artifact_name_windows:
description: 'Windows artifact name (only needed for local)'
required: false
type: string
default: ''
artifact_name_ubuntu:
description: 'Ubuntu artifact name (only needed for local)'
required: false
type: string
default: ''
artifact_name_macos:
description: 'macOS artifact name (only needed for local)'
required: false
type: string
default: ''
secrets:
RP_TOKEN:
description: 'ReportPortal API token'
required: true
jobs:
windows:
runs-on: windows-11-nvidia-gpu
timeout-minutes: 60
env:
DEFAULT_JAN_APP_URL: 'https://catalog.jan.ai/windows/Jan-nightly_0.6.5-758_x64-setup.exe'
DEFAULT_IS_NIGHTLY: 'true'
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python 3.13
uses: actions/setup-python@v4
with:
python-version: '3.13'
- name: Download artifact (if source_type is local)
if: inputs.source_type == 'local'
uses: actions/download-artifact@v4
with:
name: ${{ inputs.artifact_name_windows }}
path: ${{ runner.temp }}/windows-artifact
- name: Clean existing Jan installations
shell: powershell
run: |
.\autoqa\scripts\windows_cleanup.ps1 -IsNightly "${{ inputs.is_nightly }}"
- name: Download/Prepare Jan app
shell: powershell
run: |
if ("${{ inputs.source_type }}" -eq "local") {
# Find the exe file in the artifact
$exeFile = Get-ChildItem -Path "${{ runner.temp }}/windows-artifact" -Recurse -Filter "*.exe" | Select-Object -First 1
if ($exeFile) {
Write-Host "✅ Found local installer: $($exeFile.FullName)"
Copy-Item -Path $exeFile.FullName -Destination "$env:TEMP\jan-installer.exe" -Force
Write-Host "✅ Installer copied to: $env:TEMP\jan-installer.exe"
# Don't set JAN_APP_PATH here - let the install script set it to the correct installed app path
echo "IS_NIGHTLY=${{ inputs.is_nightly }}" >> $env:GITHUB_ENV
} else {
Write-Error "❌ No .exe file found in artifact"
exit 1
}
} else {
# Use the existing download script for URLs
.\autoqa\scripts\windows_download.ps1 `
-WorkflowInputUrl "${{ inputs.jan_app_windows_source }}" `
-WorkflowInputIsNightly "${{ inputs.is_nightly }}" `
-RepoVariableUrl "${{ vars.JAN_APP_URL }}" `
-RepoVariableIsNightly "${{ vars.IS_NIGHTLY }}" `
-DefaultUrl "$env:DEFAULT_JAN_APP_URL" `
-DefaultIsNightly "$env:DEFAULT_IS_NIGHTLY"
}
- name: Install Jan app
shell: powershell
run: |
.\autoqa\scripts\windows_install.ps1 -IsNightly "$env:IS_NIGHTLY"
- name: Install Python dependencies
working-directory: autoqa
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run Auto QA Tests
working-directory: autoqa
shell: powershell
env:
RP_TOKEN: ${{ secrets.RP_TOKEN }}
ENABLE_REPORTPORTAL: 'true'
RP_ENDPOINT: 'https://reportportal.menlo.ai'
RP_PROJECT: 'default_personal'
MAX_TURNS: '50'
DELAY_BETWEEN_TESTS: '3'
LAUNCH_NAME: 'CI AutoQA Run Windows - ${{ github.run_number }} - ${{ github.ref_name }}'
run: |
.\scripts\run_tests.ps1 -JanAppPath "$env:JAN_APP_PATH" -ProcessName "$env:JAN_PROCESS_NAME" -RpToken "$env:RP_TOKEN"
- name: Cleanup after tests
if: always()
shell: powershell
run: |
.\autoqa\scripts\windows_post_cleanup.ps1 -IsNightly "${{ inputs.is_nightly }}"
ubuntu:
runs-on: ubuntu-22-04-nvidia-gpu
timeout-minutes: 60
env:
DEFAULT_JAN_APP_URL: 'https://delta.jan.ai/nightly/Jan-nightly_0.6.4-728_amd64.deb'
DEFAULT_IS_NIGHTLY: 'true'
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python 3.13
uses: actions/setup-python@v4
with:
python-version: '3.13'
- name: Download artifact (if source_type is local)
if: inputs.source_type == 'local'
uses: actions/download-artifact@v4
with:
name: ${{ inputs.artifact_name_ubuntu }}
path: ${{ runner.temp }}/ubuntu-artifact
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y \
x11-utils \
python3-tk \
python3-dev \
wmctrl \
xdotool \
libnss3-dev \
libgconf-2-4 \
libxss1 \
libasound2 \
libxtst6 \
libgtk-3-0 \
libgbm-dev \
libxshmfence1 \
libxrandr2 \
libpangocairo-1.0-0 \
libatk1.0-0 \
libcairo-gobject2 \
libgdk-pixbuf2.0-0 \
gnome-screenshot
- name: Setup script permissions
run: |
chmod +x autoqa/scripts/setup_permissions.sh
./autoqa/scripts/setup_permissions.sh
- name: Clean existing Jan installations
run: |
./autoqa/scripts/ubuntu_cleanup.sh
- name: Download/Prepare Jan app
run: |
if [ "${{ inputs.source_type }}" = "local" ]; then
# Find the deb file in the artifact
DEB_FILE=$(find "${{ runner.temp }}/ubuntu-artifact" -name "*.deb" -type f | head -1)
if [ -n "$DEB_FILE" ]; then
echo "✅ Found local installer: $DEB_FILE"
cp "$DEB_FILE" "/tmp/jan-installer.deb"
echo "✅ Installer copied to: /tmp/jan-installer.deb"
echo "JAN_APP_PATH=/tmp/jan-installer.deb" >> $GITHUB_ENV
echo "IS_NIGHTLY=${{ inputs.is_nightly }}" >> $GITHUB_ENV
if [ "${{ inputs.is_nightly }}" = "true" ]; then
echo "JAN_PROCESS_NAME=Jan-nightly" >> $GITHUB_ENV
else
echo "JAN_PROCESS_NAME=Jan" >> $GITHUB_ENV
fi
else
echo "❌ No .deb file found in artifact"
exit 1
fi
else
# Use the existing download script for URLs
./autoqa/scripts/ubuntu_download.sh \
"${{ inputs.jan_app_ubuntu_source }}" \
"${{ inputs.is_nightly }}" \
"${{ vars.JAN_APP_URL_LINUX }}" \
"${{ vars.IS_NIGHTLY }}" \
"$DEFAULT_JAN_APP_URL" \
"$DEFAULT_IS_NIGHTLY"
# Set the correct environment variables for the test runner
echo "JAN_APP_PATH=/tmp/jan-installer.deb" >> $GITHUB_ENV
if [ "${{ inputs.is_nightly }}" = "true" ]; then
echo "JAN_PROCESS_NAME=Jan-nightly" >> $GITHUB_ENV
else
echo "JAN_PROCESS_NAME=Jan" >> $GITHUB_ENV
fi
fi
- name: Install Jan app
run: |
./autoqa/scripts/ubuntu_install.sh "$IS_NIGHTLY"
- name: Install Python dependencies
working-directory: autoqa
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run Auto QA Tests
working-directory: autoqa
env:
RP_TOKEN: ${{ secrets.RP_TOKEN }}
ENABLE_REPORTPORTAL: 'true'
RP_ENDPOINT: 'https://reportportal.menlo.ai'
RP_PROJECT: 'default_personal'
MAX_TURNS: '50'
DELAY_BETWEEN_TESTS: '3'
LAUNCH_NAME: 'CI AutoQA Run Ubuntu - ${{ github.run_number }} - ${{ github.ref_name }}'
run: |
./scripts/run_tests.sh "$JAN_APP_PATH" "$JAN_PROCESS_NAME" "$RP_TOKEN" "ubuntu"
- name: Cleanup after tests
if: always()
run: |
./autoqa/scripts/ubuntu_post_cleanup.sh "$IS_NIGHTLY"
macos:
runs-on: macos-selfhosted-15-arm64
timeout-minutes: 60
env:
DEFAULT_JAN_APP_URL: 'https://delta.jan.ai/nightly/Jan-nightly_0.6.4-728_universal.dmg'
DEFAULT_IS_NIGHTLY: 'true'
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Setup Python 3.13
uses: actions/setup-python@v4
with:
python-version: '3.13'
- name: Download artifact (if source_type is local)
if: inputs.source_type == 'local'
uses: actions/download-artifact@v4
with:
name: ${{ inputs.artifact_name_macos }}
path: ${{ runner.temp }}/macos-artifact
- name: Setup script permissions
run: |
chmod +x autoqa/scripts/setup_permissions.sh
./autoqa/scripts/setup_permissions.sh
- name: Clean existing Jan installations
run: |
./autoqa/scripts/macos_cleanup.sh
- name: Download/Prepare Jan app
run: |
if [ "${{ inputs.source_type }}" = "local" ]; then
# Find the dmg file in the artifact
DMG_FILE=$(find "${{ runner.temp }}/macos-artifact" -name "*.dmg" -type f | head -1)
if [ -n "$DMG_FILE" ]; then
echo "✅ Found local installer: $DMG_FILE"
cp "$DMG_FILE" "/tmp/jan-installer.dmg"
echo "✅ Installer copied to: /tmp/jan-installer.dmg"
echo "JAN_APP_PATH=/tmp/jan-installer.dmg" >> $GITHUB_ENV
echo "IS_NIGHTLY=${{ inputs.is_nightly }}" >> $GITHUB_ENV
if [ "${{ inputs.is_nightly }}" = "true" ]; then
echo "PROCESS_NAME=Jan-nightly" >> $GITHUB_ENV
else
echo "PROCESS_NAME=Jan" >> $GITHUB_ENV
fi
else
echo "❌ No .dmg file found in artifact"
exit 1
fi
else
# Use the existing download script for URLs
./autoqa/scripts/macos_download.sh \
"${{ inputs.jan_app_macos_source }}" \
"${{ inputs.is_nightly }}" \
"${{ vars.JAN_APP_URL }}" \
"${{ vars.IS_NIGHTLY }}" \
"$DEFAULT_JAN_APP_URL" \
"$DEFAULT_IS_NIGHTLY"
# Set the correct environment variables for the test runner
echo "JAN_APP_PATH=/tmp/jan-installer.dmg" >> $GITHUB_ENV
if [ "${{ inputs.is_nightly }}" = "true" ]; then
echo "PROCESS_NAME=Jan-nightly" >> $GITHUB_ENV
else
echo "PROCESS_NAME=Jan" >> $GITHUB_ENV
fi
fi
- name: Install Jan app
run: |
./autoqa/scripts/macos_install.sh
- name: Install system dependencies
run: |
echo "Installing system dependencies for macOS..."
# Check if Homebrew is available
if command -v brew >/dev/null 2>&1; then
echo "Homebrew is available"
# Install python-tk if not available
python3 -c "import tkinter" 2>/dev/null || {
echo "Installing python-tk via Homebrew..."
brew install python-tk || true
}
else
echo "Homebrew not available, checking if tkinter works..."
python3 -c "import tkinter" || {
echo "⚠️ tkinter not available and Homebrew not found"
echo "This may cause issues with mouse control"
}
fi
echo "System dependencies check completed"
- name: Install Python dependencies
run: |
cd autoqa
echo "Installing Python dependencies..."
pip install --upgrade pip
pip install -r requirements.txt
echo "✅ Python dependencies installed"
- name: Setup ReportPortal environment
run: |
echo "Setting up ReportPortal environment..."
echo "RP_TOKEN=${{ secrets.RP_TOKEN }}" >> $GITHUB_ENV
echo "ReportPortal environment configured"
- name: Run E2E tests
env:
RP_TOKEN: ${{ secrets.RP_TOKEN }}
ENABLE_REPORTPORTAL: 'true'
RP_ENDPOINT: 'https://reportportal.menlo.ai'
RP_PROJECT: 'default_personal'
MAX_TURNS: '50'
DELAY_BETWEEN_TESTS: '3'
LAUNCH_NAME: 'CI AutoQA Run Macos - ${{ github.run_number }} - ${{ github.ref_name }}'
run: |
cd autoqa
echo "Starting E2E test execution..."
echo "Environment variables:"
echo "JAN_APP_PATH: $JAN_APP_PATH"
echo "PROCESS_NAME: $PROCESS_NAME"
echo "IS_NIGHTLY: $IS_NIGHTLY"
./scripts/run_tests.sh "$JAN_APP_PATH" "$PROCESS_NAME" "$RP_TOKEN" "macos"
- name: Cleanup after tests
if: always()
run: |
./autoqa/scripts/macos_post_cleanup.sh

View File

@ -223,3 +223,49 @@ jobs:
RUN_ID=${{ github.run_id }} RUN_ID=${{ github.run_id }}
COMMENT="This is the build for this pull request. You can download it from the Artifacts section here: [Build URL](https://github.com/${{ github.repository }}/actions/runs/${RUN_ID})." COMMENT="This is the build for this pull request. You can download it from the Artifacts section here: [Build URL](https://github.com/${{ github.repository }}/actions/runs/${RUN_ID})."
gh pr comment $PR_URL --body "$COMMENT" gh pr comment $PR_URL --body "$COMMENT"
# AutoQA trigger for S3 builds
trigger-autoqa-s3:
needs:
[
build-macos,
build-windows-x64,
build-linux-x64,
get-update-version,
set-public-provider,
sync-temp-to-latest,
]
if: needs.set-public-provider.outputs.public_provider == 'aws-s3'
uses: ./.github/workflows/autoqa-template.yml
with:
jan_app_windows_source: 'https://delta.jan.ai/nightly/Jan-nightly_${{ needs.get-update-version.outputs.new_version }}_x64-setup.exe'
jan_app_ubuntu_source: 'https://delta.jan.ai/nightly/Jan-nightly_${{ needs.get-update-version.outputs.new_version }}_amd64.deb'
jan_app_macos_source: 'https://delta.jan.ai/nightly/Jan-nightly_${{ needs.get-update-version.outputs.new_version }}_universal.dmg'
is_nightly: true
source_type: 'url'
secrets:
RP_TOKEN: ${{ secrets.RP_TOKEN }}
# AutoQA trigger for artifact builds
trigger-autoqa-artifacts:
needs:
[
build-macos,
build-windows-x64,
build-linux-x64,
get-update-version,
set-public-provider,
]
if: needs.set-public-provider.outputs.public_provider == 'none'
uses: ./.github/workflows/autoqa-template.yml
with:
jan_app_windows_source: '' # Not needed for artifacts
jan_app_ubuntu_source: '' # Not needed for artifacts
jan_app_macos_source: '' # Not needed for artifacts
is_nightly: true
source_type: 'local'
artifact_name_windows: 'jan-windows-${{ needs.get-update-version.outputs.new_version }}'
artifact_name_ubuntu: 'jan-linux-amd64-${{ needs.get-update-version.outputs.new_version }}-deb'
artifact_name_macos: 'jan-nightly-mac-universal-${{ needs.get-update-version.outputs.new_version }}.dmg'
secrets:
RP_TOKEN: ${{ secrets.RP_TOKEN }}

7
.gitignore vendored
View File

@ -50,4 +50,9 @@ src-tauri/resources/bin
.opencode .opencode
OpenCode.md OpenCode.md
archive/ archive/
.cache/ .cache/
# auto qa
autoqa/trajectories
autoqa/recordings
autoqa/__pycache__

319
autoqa/README.md Normal file
View File

@ -0,0 +1,319 @@
# E2E Test Runner with ReportPortal Integration
🚀 An automated end-to-end test runner for Jan application with ReportPortal integration, screen recording, and comprehensive test monitoring.
## Features
- ✅ **Automated Jan App Testing**: Automatically starts/stops Jan application
- 🖥️ **Auto Computer Server**: Automatically starts computer server in background
- 📹 **Screen Recording**: Records test execution for debugging
- 📊 **ReportPortal Integration**: Optional test results upload to ReportPortal
- 🔄 **Turn Monitoring**: Prevents infinite loops with configurable turn limits
- 🎯 **Flexible Configuration**: Command-line arguments and environment variables
- 🌐 **Cross-platform**: Windows, macOS, and Linux support
- 📁 **Test Discovery**: Automatically scans test files from directory
## Prerequisites
- Python 3.8+
- Jan application installed
- Windows Sandbox (for computer provider)
- Computer server package installed
- Required Python packages (see requirements.txt)
## Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd autoqa
```
2. Install dependencies:
```bash
## For Windows and Linux
pip install -r requirements.txt
```
3. Ensure Jan application is installed in one of the default locations:
- Windows: `%LOCALAPPDATA%\Programs\jan\Jan.exe`
- macOS: `~/Applications/Jan.app/Contents/MacOS/Jan`
- Linux: `jan` (in PATH)
## Quick Start
### Local Development (No ReportPortal)
```bash
# Run all tests in ./tests directory (auto-starts computer server)
python main.py
# Run with custom test directory
python main.py --tests-dir "my_tests"
# Run with custom Jan app path
python main.py --jan-app-path "C:/Custom/Path/Jan.exe"
# Skip auto computer server start (if already running)
python main.py --skip-server-start
```
### With ReportPortal Integration
```bash
# Enable ReportPortal with token
python main.py --enable-reportportal --rp-token "YOUR_API_TOKEN"
# Full ReportPortal configuration
python main.py \
--enable-reportportal \
--rp-endpoint "https://reportportal.example.com" \
--rp-project "my_project" \
--rp-token "YOUR_API_TOKEN"
```
## Configuration
### Command Line Arguments
| Argument | Environment Variable | Default | Description |
| ----------------------- | --------------------- | ------------------------------- | ------------------------------------------------- |
| **Computer Server** |
| `--skip-server-start` | `SKIP_SERVER_START` | `false` | Skip automatic computer server startup |
| **ReportPortal** |
| `--enable-reportportal` | `ENABLE_REPORTPORTAL` | `false` | Enable ReportPortal integration |
| `--rp-endpoint` | `RP_ENDPOINT` | `https://reportportal.menlo.ai` | ReportPortal endpoint URL |
| `--rp-project` | `RP_PROJECT` | `default_personal` | ReportPortal project name |
| `--rp-token` | `RP_TOKEN` | - | ReportPortal API token (required when RP enabled) |
| **Jan Application** |
| `--jan-app-path` | `JAN_APP_PATH` | _auto-detected_ | Path to Jan application executable |
| `--jan-process-name` | `JAN_PROCESS_NAME` | `Jan.exe` | Jan process name for monitoring |
| **Model Configuration** |
| `--model-name` | `MODEL_NAME` | `ByteDance-Seed/UI-TARS-1.5-7B` | AI model name |
| `--model-base-url` | `MODEL_BASE_URL` | `http://10.200.108.58:1234/v1` | Model API endpoint |
| `--model-provider` | `MODEL_PROVIDER` | `oaicompat` | Model provider type |
| `--model-loop` | `MODEL_LOOP` | `uitars` | Agent loop type |
| **Test Execution** |
| `--max-turns` | `MAX_TURNS` | `30` | Maximum turns per test |
| `--tests-dir` | `TESTS_DIR` | `tests` | Directory containing test files |
| `--delay-between-tests` | `DELAY_BETWEEN_TESTS` | `3` | Delay between tests (seconds) |
### Environment Variables
Create a `.env` file or set environment variables:
```bash
# Computer Server
SKIP_SERVER_START=false
# ReportPortal Configuration
ENABLE_REPORTPORTAL=true
RP_ENDPOINT=https://reportportal.example.com
RP_PROJECT=my_project
RP_TOKEN=your_secret_token
# Jan Application
JAN_APP_PATH=C:\Custom\Path\Jan.exe
JAN_PROCESS_NAME=Jan.exe
# Model Configuration
MODEL_NAME=gpt-4
MODEL_BASE_URL=https://api.openai.com/v1
MODEL_PROVIDER=openai
MODEL_LOOP=uitars
# Test Settings
MAX_TURNS=50
TESTS_DIR=e2e_tests
DELAY_BETWEEN_TESTS=5
```
## Test Structure
### Test Files
- Test files should be `.txt` files containing test prompts
- Place test files in the `tests/` directory (or custom directory)
- Support nested directories for organization
Example test file (`tests/basic/login_test.txt`):
```
Test the login functionality of Jan application.
Navigate to login screen, enter valid credentials, and verify successful login.
```
### Directory Structure
```
autoqa/
├── main.py # Main test runner
├── utils.py # Jan app utilities
├── test_runner.py # Test execution logic
├── screen_recorder.py # Screen recording functionality
├── reportportal_handler.py # ReportPortal integration
├── tests/ # Test files directory
│ ├── basic/
│ │ ├── login_test.txt
│ │ └── navigation_test.txt
│ └── advanced/
│ └── complex_workflow.txt
├── recordings/ # Screen recordings (auto-created)
├── trajectories/ # Agent trajectories (auto-created)
└── README.md
```
## Usage Examples
### Basic Usage
```bash
# Run all tests locally (auto-starts computer server)
python main.py
# Get help
python main.py --help
# Run without auto-starting computer server
python main.py --skip-server-start
```
### Advanced Usage
```bash
# Custom configuration
python main.py \
--tests-dir "integration_tests" \
--max-turns 40 \
--delay-between-tests 10 \
--model-name "gpt-4"
# Environment + Arguments
ENABLE_REPORTPORTAL=true RP_TOKEN=secret python main.py --max-turns 50
# Different model provider
python main.py \
--model-provider "openai" \
--model-name "gpt-4" \
--model-base-url "https://api.openai.com/v1"
# External computer server (skip auto-start)
SKIP_SERVER_START=true python main.py
```
### CI/CD Usage
```bash
# GitHub Actions / CI environment
ENABLE_REPORTPORTAL=true \
RP_TOKEN=${{ secrets.RP_TOKEN }} \
MODEL_NAME=production-model \
MAX_TURNS=40 \
SKIP_SERVER_START=false \
python main.py
```
## Computer Server Management
The test runner automatically manages the computer server:
### Automatic Server Management (Default)
- **Auto-start**: Computer server starts automatically in background thread
- **Auto-cleanup**: Server stops when main program exits (daemon thread)
- **Error handling**: Graceful fallback if server fails to start
### Manual Server Management
```bash
# If you prefer to manage computer server manually:
python -m computer_server # In separate terminal
# Then run tests without auto-start:
python main.py --skip-server-start
```
### Server Logs
```
2025-07-15 15:30:45 - INFO - Starting computer server in background...
2025-07-15 15:30:45 - INFO - Calling computer_server.run_cli()...
2025-07-15 15:30:45 - INFO - Computer server thread started
2025-07-15 15:30:50 - INFO - Computer server is running successfully
```
## Output
### Local Development
- **Console logs**: Detailed execution information
- **Screen recordings**: Saved to `recordings/` directory as MP4 files
- **Trajectories**: Agent interaction data in `trajectories/` directory
- **Local results**: Test results logged to console
### ReportPortal Integration
When enabled, results are uploaded to ReportPortal including:
- Test execution status (PASSED/FAILED)
- Screen recordings as attachments
- Detailed turn-by-turn interaction logs
- Error messages and debugging information
## Troubleshooting
### Common Issues
1. **Computer server startup failed**:
```bash
# Install required dependencies
pip install computer_server
# Check if computer_server is available
python -c "import computer_server; print('OK')"
# Use manual server if auto-start fails
python main.py --skip-server-start
```
2. **Jan app not found**:
```bash
# Specify custom path
python main.py --jan-app-path "D:/Apps/Jan/Jan.exe"
```
3. **Windows dependencies missing**:
```bash
# Install Windows-specific packages
pip install pywin32 psutil
```
4. **ReportPortal connection failed**:
- Verify endpoint URL and token
- Check network connectivity
- Ensure project exists
5. **Screen recording issues**:
- Check disk space in `recordings/` directory
- Verify screen recording permissions
6. **Test timeouts**:
```bash
# Increase turn limit
python main.py --max-turns 50
```
### Debug Mode
Enable detailed logging by modifying the logging level in `main.py`:
```python
logging.basicConfig(level=logging.DEBUG)
```

514
autoqa/main.py Normal file
View File

@ -0,0 +1,514 @@
import asyncio
import logging
import os
import argparse
import threading
import time
import platform
from datetime import datetime
from computer import Computer
from reportportal_client import RPClient
from reportportal_client.helpers import timestamp
from utils import scan_test_files
from test_runner import run_single_test_with_timeout
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Platform detection
IS_WINDOWS = platform.system() == "Windows"
IS_LINUX = platform.system() == "Linux"
IS_MACOS = platform.system() == "Darwin"
def get_computer_config():
"""Get computer configuration based on platform"""
if IS_WINDOWS:
return {
"os_type": "windows"
}
elif IS_LINUX:
return {
"os_type": "linux"
}
elif IS_MACOS:
return {
"os_type": "macos"
}
else:
# Default fallback
logger.warning(f"Unknown platform {platform.system()}, using Linux config as fallback")
return {
"os_type": "linux"
}
def get_default_jan_path():
"""Get default Jan app path based on OS"""
if IS_WINDOWS:
# Try multiple common locations on Windows
possible_paths = [
os.path.expanduser(r"~\AppData\Local\Programs\jan\Jan.exe"),
os.path.join(os.environ.get('LOCALAPPDATA', ''), 'Programs', 'jan', 'Jan.exe'),
os.path.join(os.environ.get('APPDATA', ''), 'jan', 'Jan.exe'),
r"C:\Program Files\jan\Jan.exe",
r"C:\Program Files (x86)\jan\Jan.exe"
]
# Return first existing path, or first option as default
for path in possible_paths:
if os.path.exists(path):
return path
# If none exist, return the most likely default
return possible_paths[0]
elif IS_LINUX:
# Linux possible locations
possible_paths = [
"/usr/bin/Jan",
"/usr/local/bin/Jan",
os.path.expanduser("~/Applications/Jan/Jan"),
"/opt/Jan/Jan"
]
# Return first existing path, or first option as default
for path in possible_paths:
if os.path.exists(path):
return path
# Default to nightly build path
return "/usr/bin/Jan"
elif IS_MACOS:
# macOS defaults
possible_paths = [
"/Applications/Jan.app/Contents/MacOS/Jan",
os.path.expanduser("~/Applications/Jan.app/Contents/MacOS/Jan")
]
for path in possible_paths:
if os.path.exists(path):
return path
return possible_paths[0]
else:
# Unknown platform
return "jan"
def start_computer_server():
"""Start computer server in background thread"""
try:
logger.info("Starting computer server in background...")
# Import computer_server module
import computer_server
import sys
# Start server in a separate thread
def run_server():
try:
# Save original sys.argv to avoid argument conflicts
original_argv = sys.argv.copy()
# Override sys.argv for computer_server to use default args
sys.argv = ['computer_server'] # Reset to minimal args
# Use the proper entry point
logger.info("Calling computer_server.run_cli()...")
computer_server.run_cli()
logger.info("Computer server.run_cli() completed")
except KeyboardInterrupt:
logger.info("Computer server interrupted")
except Exception as e:
logger.error(f"Computer server error: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
finally:
# Restore original sys.argv
try:
sys.argv = original_argv
except:
pass
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
logger.info("Computer server thread started")
# Give server more time to start up
time.sleep(5)
# Check if thread is still alive (server is running)
if server_thread.is_alive():
logger.info("Computer server is running successfully")
return server_thread
else:
logger.error("Computer server thread died unexpectedly")
return None
except ImportError as e:
logger.error(f"Cannot import computer_server module: {e}")
logger.error("Please install computer_server package")
return None
except Exception as e:
logger.error(f"Error starting computer server: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return None
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="E2E Test Runner with ReportPortal integration",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run locally without ReportPortal
python main.py
# Run with ReportPortal integration
python main.py --enable-reportportal --rp-token YOUR_TOKEN
# Run with custom Jan app path
python main.py --jan-app-path "C:/Custom/Path/Jan.exe"
# Run with different model
python main.py --model-name "gpt-4" --model-base-url "https://api.openai.com/v1"
# Using environment variables
ENABLE_REPORTPORTAL=true RP_TOKEN=xxx MODEL_NAME=gpt-4 python main.py
"""
)
# Get default Jan path
default_jan_path = get_default_jan_path()
# Computer server arguments
server_group = parser.add_argument_group('Computer Server Configuration')
server_group.add_argument(
'--skip-server-start',
action='store_true',
default=os.getenv('SKIP_SERVER_START', 'false').lower() == 'true',
help='Skip automatic computer server startup (env: SKIP_SERVER_START, default: false)'
)
# ReportPortal arguments
rp_group = parser.add_argument_group('ReportPortal Configuration')
rp_group.add_argument(
'--enable-reportportal',
action='store_true',
default=os.getenv('ENABLE_REPORTPORTAL', 'false').lower() == 'true',
help='Enable ReportPortal integration (env: ENABLE_REPORTPORTAL, default: false)'
)
rp_group.add_argument(
'--rp-endpoint',
default=os.getenv('RP_ENDPOINT', 'https://reportportal.menlo.ai'),
help='ReportPortal endpoint URL (env: RP_ENDPOINT, default: %(default)s)'
)
rp_group.add_argument(
'--rp-project',
default=os.getenv('RP_PROJECT', 'default_personal'),
help='ReportPortal project name (env: RP_PROJECT, default: %(default)s)'
)
rp_group.add_argument(
'--rp-token',
default=os.getenv('RP_TOKEN'),
help='ReportPortal API token (env: RP_TOKEN, required when --enable-reportportal is used)'
)
rp_group.add_argument(
'--launch-name',
default=os.getenv('LAUNCH_NAME'),
help='Custom launch name for ReportPortal (env: LAUNCH_NAME, default: auto-generated with timestamp)'
)
# Jan app arguments
jan_group = parser.add_argument_group('Jan Application Configuration')
jan_group.add_argument(
'--jan-app-path',
default=os.getenv('JAN_APP_PATH', default_jan_path),
help=f'Path to Jan application executable (env: JAN_APP_PATH, default: auto-detected or {default_jan_path})'
)
jan_group.add_argument(
'--jan-process-name',
default=os.getenv('JAN_PROCESS_NAME', 'Jan.exe' if IS_WINDOWS else ('Jan' if IS_MACOS else 'Jan-nightly')),
help='Jan process name for monitoring (env: JAN_PROCESS_NAME, default: platform-specific)'
)
# Model/Agent arguments
model_group = parser.add_argument_group('Model Configuration')
model_group.add_argument(
'--model-loop',
default=os.getenv('MODEL_LOOP', 'uitars'),
help='Agent loop type (env: MODEL_LOOP, default: %(default)s)'
)
model_group.add_argument(
'--model-provider',
default=os.getenv('MODEL_PROVIDER', 'oaicompat'),
help='Model provider (env: MODEL_PROVIDER, default: %(default)s)'
)
model_group.add_argument(
'--model-name',
default=os.getenv('MODEL_NAME', 'ByteDance-Seed/UI-TARS-1.5-7B'),
help='Model name (env: MODEL_NAME, default: %(default)s)'
)
model_group.add_argument(
'--model-base-url',
default=os.getenv('MODEL_BASE_URL', 'http://10.200.108.58:1234/v1'),
help='Model base URL (env: MODEL_BASE_URL, default: %(default)s)'
)
# Test execution arguments
test_group = parser.add_argument_group('Test Execution Configuration')
test_group.add_argument(
'--max-turns',
type=int,
default=int(os.getenv('MAX_TURNS', '30')),
help='Maximum number of turns per test (env: MAX_TURNS, default: %(default)s)'
)
test_group.add_argument(
'--tests-dir',
default=os.getenv('TESTS_DIR', 'tests'),
help='Directory containing test files (env: TESTS_DIR, default: %(default)s)'
)
test_group.add_argument(
'--delay-between-tests',
type=int,
default=int(os.getenv('DELAY_BETWEEN_TESTS', '3')),
help='Delay in seconds between tests (env: DELAY_BETWEEN_TESTS, default: %(default)s)'
)
args = parser.parse_args()
# Validate ReportPortal token if ReportPortal is enabled
if args.enable_reportportal and not args.rp_token:
parser.error("--rp-token (or RP_TOKEN env var) is required when --enable-reportportal is used")
return args
async def main():
"""
Main function to scan and run all test files with optional ReportPortal integration
"""
# Parse command line arguments
args = parse_arguments()
# Initialize final exit code
final_exit_code = 0
# Start computer server if not skipped
server_thread = None
if not args.skip_server_start:
server_thread = start_computer_server()
if server_thread is None:
logger.error("Failed to start computer server. Exiting...")
return
else:
logger.info("Skipping computer server startup (assuming it's already running)")
try:
# Build agent config from arguments
agent_config = {
"loop": args.model_loop,
"model_provider": args.model_provider,
"model_name": args.model_name,
"model_base_url": args.model_base_url
}
# Log configuration
logger.info("=== Configuration ===")
logger.info(f"Computer server: {'STARTED' if server_thread else 'EXTERNAL'}")
logger.info(f"Tests directory: {args.tests_dir}")
logger.info(f"Max turns per test: {args.max_turns}")
logger.info(f"Delay between tests: {args.delay_between_tests}s")
logger.info(f"Jan app path: {args.jan_app_path}")
logger.info(f"Jan app exists: {os.path.exists(args.jan_app_path)}")
logger.info(f"Jan process name: {args.jan_process_name}")
logger.info(f"Model: {args.model_name}")
logger.info(f"Model URL: {args.model_base_url}")
logger.info(f"Model provider: {args.model_provider}")
logger.info(f"ReportPortal integration: {'ENABLED' if args.enable_reportportal else 'DISABLED'}")
if args.enable_reportportal:
logger.info(f"ReportPortal endpoint: {args.rp_endpoint}")
logger.info(f"ReportPortal project: {args.rp_project}")
logger.info(f"ReportPortal token: {'SET' if args.rp_token else 'NOT SET'}")
logger.info(f"Launch name: {args.launch_name if args.launch_name else 'AUTO-GENERATED'}")
logger.info("======================")
# Scan all test files
test_files = scan_test_files(args.tests_dir)
if not test_files:
logger.warning(f"No test files found in directory: {args.tests_dir}")
return
logger.info(f"Found {len(test_files)} test files")
# Track test results for final exit code
test_results = {"passed": 0, "failed": 0, "total": len(test_files)}
# Initialize ReportPortal client only if enabled
rp_client = None
launch_id = None
if args.enable_reportportal:
try:
rp_client = RPClient(
endpoint=args.rp_endpoint,
project=args.rp_project,
api_key=args.rp_token
)
# Start ReportPortal launch
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
# Use custom launch name if provided, otherwise generate default
if args.launch_name:
launch_name = args.launch_name
logger.info(f"Using custom launch name: {launch_name}")
else:
launch_name = f"E2E Test Run - {current_time}"
logger.info(f"Using auto-generated launch name: {launch_name}")
launch_id = rp_client.start_launch(
name=launch_name,
start_time=timestamp(),
description=f"Automated E2E test run with {len(test_files)} test cases\n"
f"Model: {args.model_name}\n"
f"Max turns: {args.max_turns}"
)
logger.info(f"Started ReportPortal launch: {launch_name}")
except Exception as e:
logger.error(f"Failed to initialize ReportPortal: {e}")
logger.warning("Continuing without ReportPortal integration...")
rp_client = None
launch_id = None
else:
logger.info("Running in local development mode - results will not be uploaded to ReportPortal")
# Start computer environment
logger.info("Initializing computer environment...")
# Get platform-specific computer configuration
computer_config = get_computer_config()
logger.info(f"Using computer config: {computer_config}")
computer = Computer(
os_type=computer_config["os_type"],
use_host_computer_server=True
)
await computer.run()
logger.info("Computer environment ready")
# Run each test sequentially with turn monitoring
for i, test_data in enumerate(test_files, 1):
logger.info(f"Running test {i}/{len(test_files)}: {test_data['path']}")
try:
# Pass all configs to test runner
test_result = await run_single_test_with_timeout(
computer=computer,
test_data=test_data,
rp_client=rp_client, # Can be None
launch_id=launch_id, # Can be None
max_turns=args.max_turns,
jan_app_path=args.jan_app_path,
jan_process_name=args.jan_process_name,
agent_config=agent_config,
enable_reportportal=args.enable_reportportal
)
# Track test result - properly handle different return formats
test_passed = False
if test_result:
# Check different possible return formats
if isinstance(test_result, dict):
# Dictionary format: check 'success' key
test_passed = test_result.get('success', False)
elif isinstance(test_result, bool):
# Boolean format: direct boolean value
test_passed = test_result
elif hasattr(test_result, 'success'):
# Object format: check success attribute
test_passed = getattr(test_result, 'success', False)
else:
# Any truthy value is considered success
test_passed = bool(test_result)
else:
test_passed = False
# Update counters and log result
if test_passed:
test_results["passed"] += 1
logger.info(f"✅ Test {i} PASSED: {test_data['path']}")
else:
test_results["failed"] += 1
logger.error(f"❌ Test {i} FAILED: {test_data['path']}")
# Debug log for troubleshooting
logger.info(f"🔍 Debug - Test result: type={type(test_result)}, value={test_result}, success_field={test_result.get('success', 'N/A') if isinstance(test_result, dict) else 'N/A'}, final_passed={test_passed}")
except Exception as e:
test_results["failed"] += 1
logger.error(f"❌ Test {i} FAILED with exception: {test_data['path']} - {e}")
# Add delay between tests
if i < len(test_files):
logger.info(f"Waiting {args.delay_between_tests} seconds before next test...")
await asyncio.sleep(args.delay_between_tests)
# Log final test results summary
logger.info("=" * 50)
logger.info("TEST EXECUTION SUMMARY")
logger.info("=" * 50)
logger.info(f"Total tests: {test_results['total']}")
logger.info(f"Passed: {test_results['passed']}")
logger.info(f"Failed: {test_results['failed']}")
logger.info(f"Success rate: {(test_results['passed']/test_results['total']*100):.1f}%")
logger.info("=" * 50)
if test_results["failed"] > 0:
logger.error(f"❌ Test execution completed with {test_results['failed']} failures!")
final_exit_code = 1
else:
logger.info("✅ All tests completed successfully!")
final_exit_code = 0
except KeyboardInterrupt:
logger.info("Test execution interrupted by user")
final_exit_code = 1
except Exception as e:
logger.error(f"Error in main execution: {e}")
final_exit_code = 1
finally:
# Finish ReportPortal launch only if it was started
if args.enable_reportportal and rp_client and launch_id:
try:
rp_client.finish_launch(
launch_id=launch_id,
end_time=timestamp()
)
rp_client.session.close()
logger.info("ReportPortal launch finished and session closed")
except Exception as e:
logger.error(f"Error finishing ReportPortal launch: {e}")
# Note: daemon thread will automatically terminate when main program ends
if server_thread:
logger.info("Computer server will stop when main program exits (daemon thread)")
# Exit with appropriate code based on test results
logger.info(f"Exiting with code: {final_exit_code}")
exit(final_exit_code)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,307 @@
import os
import json
import mimetypes
import re
import logging
from reportportal_client.helpers import timestamp
logger = logging.getLogger(__name__)
def upload_turn_folder(client, test_item_id, turn_path, turn_name, force_fail=False):
"""
Upload turn folder content to ReportPortal
"""
step_item_id = client.start_test_item(
parent_item_id=test_item_id,
name=turn_name,
start_time=timestamp(),
item_type="STEP"
)
uploaded = False
step_has_errors = False # Track if this step has any errors
for fname in sorted(os.listdir(turn_path)):
fpath = os.path.join(turn_path, fname)
if fname.endswith(".json"):
try:
with open(fpath, "r", encoding="utf-8") as f:
data = json.load(f)
client.log(
time=timestamp(),
level="INFO",
message=f"[{fname}]\n{json.dumps(data, indent=2)}",
item_id=step_item_id
)
uploaded = True
except Exception as e:
client.log(
time=timestamp(),
level="ERROR",
message=f"[ERROR parsing {fname}] {str(e)}",
item_id=step_item_id
)
step_has_errors = True
elif fname.endswith(".png"):
try:
with open(fpath, "rb") as img_file:
client.log(
time=timestamp(),
level="INFO",
message=f"Screenshot: {fname}",
item_id=step_item_id,
attachment={
"name": fname,
"data": img_file.read(),
"mime": mimetypes.guess_type(fname)[0] or "image/png"
}
)
uploaded = True
except Exception as e:
client.log(
time=timestamp(),
level="ERROR",
message=f"[ERROR attaching {fname}] {str(e)}",
item_id=step_item_id
)
step_has_errors = True
if not uploaded:
client.log(
time=timestamp(),
level="WARNING",
message="No data found in this turn.",
item_id=step_item_id
)
# Determine step status based on test case result
if force_fail:
step_status = "FAILED"
else:
step_status = "FAILED" if step_has_errors else "PASSED"
client.finish_test_item(
item_id=step_item_id,
end_time=timestamp(),
status=step_status
)
def extract_test_result_from_trajectory(trajectory_dir):
"""
Extract test result from the last turn's API response
Returns True only if found {"result": True}, False for all other cases including {"result": False}
"""
if not trajectory_dir or not os.path.exists(trajectory_dir):
logger.warning(f"Trajectory directory not found: {trajectory_dir}")
return False
try:
# Get all turn folders and find the last one
turn_folders = [f for f in os.listdir(trajectory_dir)
if os.path.isdir(os.path.join(trajectory_dir, f)) and f.startswith("turn_")]
if not turn_folders:
logger.warning("No turn folders found")
return False
# Sort to get the last turn
last_turn = sorted(turn_folders)[-1]
last_turn_path = os.path.join(trajectory_dir, last_turn)
logger.info(f"Checking result in last turn: {last_turn}")
# Look for API call response files
response_files = [f for f in os.listdir(last_turn_path)
if f.startswith("api_call_") and f.endswith("_response.json")]
if not response_files:
logger.warning("No API response files found in last turn")
return False
# Check the last response file
last_response_file = sorted(response_files)[-1]
response_file_path = os.path.join(last_turn_path, last_response_file)
logger.info(f"Checking response file: {last_response_file}")
with open(response_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extract content from response
if 'response' in data and 'choices' in data['response'] and data['response']['choices']:
last_choice = data['response']['choices'][-1]
if 'message' in last_choice and 'content' in last_choice['message']:
content = last_choice['message']['content']
logger.info(f"Last response content: {content}")
# Look for result patterns - need to check both True and False
true_pattern = r'\{\s*"result"\s*:\s*True\s*\}'
false_pattern = r'\{\s*"result"\s*:\s*False\s*\}'
true_match = re.search(true_pattern, content)
false_match = re.search(false_pattern, content)
if true_match:
logger.info(f"Found test result: True - PASSED")
return True
elif false_match:
logger.info(f"Found test result: False - FAILED")
return False
else:
logger.warning("No valid result pattern found in response content - marking as FAILED")
return False
logger.warning("Could not extract content from response structure")
return False
except Exception as e:
logger.error(f"Error extracting test result: {e}")
return False
def upload_test_results_to_rp(client, launch_id, test_path, trajectory_dir, force_stopped=False, video_path=None):
"""
Upload test results to ReportPortal with proper status based on test result
"""
if not trajectory_dir or not os.path.exists(trajectory_dir):
logger.warning(f"Trajectory directory not found: {trajectory_dir}")
formatted_test_path = test_path.replace('\\', '/').replace('.txt', '').replace('/', '__')
test_item_id = client.start_test_item(
launch_id=launch_id,
name=formatted_test_path,
start_time=timestamp(),
item_type="TEST",
description=f"Test case from: {test_path}"
)
client.log(
time=timestamp(),
level="ERROR",
message="❌ TEST FAILED ❌\nNo trajectory directory found",
item_id=test_item_id
)
# Upload video if available
if video_path and os.path.exists(video_path):
try:
with open(video_path, "rb") as video_file:
client.log(
time=timestamp(),
level="INFO",
message="Screen recording of test execution",
item_id=test_item_id,
attachment={
"name": f"test_recording_{formatted_test_path}.mp4",
"data": video_file.read(),
"mime": "video/x-msvideo"
}
)
logger.info(f"Uploaded video for failed test: {video_path}")
except Exception as e:
logger.error(f"Error uploading video: {e}")
client.finish_test_item(
item_id=test_item_id,
end_time=timestamp(),
status="FAILED"
)
return
formatted_test_path = test_path.replace('\\', '/').replace('.txt', '').replace('/', '__')
# Determine final status
if force_stopped:
final_status = "FAILED"
status_message = "exceeded maximum turn limit (30 turns)"
else:
test_result = extract_test_result_from_trajectory(trajectory_dir)
if test_result is True:
final_status = "PASSED"
status_message = "completed successfully with positive result"
else:
final_status = "FAILED"
status_message = "no valid success result found"
# Create test item
test_item_id = client.start_test_item(
launch_id=launch_id,
name=formatted_test_path,
start_time=timestamp(),
item_type="TEST",
description=f"Test case from: {test_path}"
)
try:
turn_folders = [f for f in os.listdir(trajectory_dir)
if os.path.isdir(os.path.join(trajectory_dir, f)) and f.startswith("turn_")]
# Add clear status log
status_emoji = "" if final_status == "PASSED" else ""
client.log(
time=timestamp(),
level="INFO" if final_status == "PASSED" else "ERROR",
message=f"{status_emoji} TEST {final_status} {status_emoji}\nReason: {status_message}\nTotal turns: {len(turn_folders)}",
item_id=test_item_id
)
# Upload screen recording video first
if video_path and os.path.exists(video_path):
logger.info(f"Attempting to upload video: {video_path}")
logger.info(f"Video file size: {os.path.getsize(video_path)} bytes")
try:
with open(video_path, "rb") as video_file:
video_data = video_file.read()
logger.info(f"Read video data: {len(video_data)} bytes")
client.log(
time=timestamp(),
level="INFO",
message="🎥 Screen recording of test execution",
item_id=test_item_id,
attachment={
"name": f"test_recording_{formatted_test_path}.mp4",
"data": video_data,
"mime": "video/x-msvideo"
}
)
logger.info(f"Successfully uploaded screen recording: {video_path}")
except Exception as e:
logger.error(f"Error uploading screen recording: {e}")
client.log(
time=timestamp(),
level="WARNING",
message=f"Failed to upload screen recording: {str(e)}",
item_id=test_item_id
)
else:
logger.warning(f"Video upload skipped - video_path: {video_path}, exists: {os.path.exists(video_path) if video_path else 'N/A'}")
client.log(
time=timestamp(),
level="WARNING",
message="No screen recording available for this test",
item_id=test_item_id
)
# Upload all turn data with appropriate status
# If test failed, mark all turns as failed
force_fail_turns = (final_status == "FAILED")
for turn_folder in sorted(turn_folders):
turn_path = os.path.join(trajectory_dir, turn_folder)
upload_turn_folder(client, test_item_id, turn_path, turn_folder, force_fail=force_fail_turns)
# Finish with correct status
client.finish_test_item(
item_id=test_item_id,
end_time=timestamp(),
status=final_status
)
logger.info(f"Uploaded test results for {formatted_test_path}: {final_status}")
except Exception as e:
logger.error(f"Error uploading test results: {e}")
client.finish_test_item(
item_id=test_item_id,
end_time=timestamp(),
status="FAILED"
)

18
autoqa/requirements.txt Normal file
View File

@ -0,0 +1,18 @@
# Core dependencies
cua-computer[all]>=0.3.5
cua-agent[all]>=0.3.0
cua-agent @ git+https://github.com/menloresearch/cua.git@compute-agent-0.3.0-patch#subdirectory=libs/python/agent
# ReportPortal integration
reportportal-client>=5.6.5
# Screen recording and automation
opencv-python>=4.12.0
numpy>=2.2.6
PyAutoGUI>=0.9.54
# System utilities
psutil>=7.0.0
# Server component
cua-computer-server>=0.1.19

84
autoqa/screen_recorder.py Normal file
View File

@ -0,0 +1,84 @@
import cv2
import numpy as np
import pyautogui
import threading
import time
import logging
logger = logging.getLogger(__name__)
class ScreenRecorder:
def __init__(self, output_path, fps=10):
self.output_path = output_path
self.fps = fps
self.recording = False
self.writer = None
self.thread = None
def start_recording(self):
"""Start screen recording"""
if self.recording:
logger.warning("Recording already in progress")
return
self.recording = True
self.thread = threading.Thread(target=self._record_screen, daemon=True)
self.thread.start()
logger.info(f"Started screen recording: {self.output_path}")
def stop_recording(self):
"""Stop screen recording"""
if not self.recording:
logger.warning("No recording in progress")
return
self.recording = False
if self.thread:
self.thread.join(timeout=5)
if self.writer:
self.writer.release()
logger.info(f"Stopped screen recording: {self.output_path}")
def _record_screen(self):
"""Internal method to record screen"""
try:
# Get screen dimensions
screen_size = pyautogui.size()
# Try MP4 with H264 codec for better compatibility
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # or 'H264'
output_path_mp4 = self.output_path
self.writer = cv2.VideoWriter(
output_path_mp4,
fourcc,
self.fps,
screen_size
)
while self.recording:
try:
# Capture screen
screenshot = pyautogui.screenshot()
# Convert PIL image to numpy array
frame = np.array(screenshot)
# Convert RGB to BGR (OpenCV uses BGR)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Write frame
self.writer.write(frame)
# Control FPS
time.sleep(1.0 / self.fps)
except Exception as e:
logger.error(f"Error capturing frame: {e}")
break
except Exception as e:
logger.error(f"Error in screen recording: {e}")
finally:
if self.writer:
self.writer.release()

116
autoqa/scripts/README.md Normal file
View File

@ -0,0 +1,116 @@
# AutoQA Scripts
This directory contains platform-specific scripts used by the AutoQA GitHub Actions workflow. These scripts help maintain a cleaner and more maintainable workflow file by extracting complex inline scripts into separate files.
## Directory Structure
```text
autoqa/scripts/
├── setup_permissions.sh # Setup executable permissions for all scripts
├── windows_cleanup.ps1 # Windows: Clean existing Jan installations
├── windows_download.ps1 # Windows: Download Jan app installer
├── windows_install.ps1 # Windows: Install Jan app
├── windows_post_cleanup.ps1 # Windows: Post-test cleanup
├── run_tests.ps1 # Windows: Run AutoQA tests
├── ubuntu_cleanup.sh # Ubuntu: Clean existing Jan installations
├── ubuntu_download.sh # Ubuntu: Download Jan app (.deb)
├── ubuntu_install.sh # Ubuntu: Install Jan app
├── ubuntu_post_cleanup.sh # Ubuntu: Post-test cleanup
├── macos_cleanup.sh # macOS: Clean existing Jan installations
├── macos_download.sh # macOS: Download Jan app (.dmg)
├── macos_install.sh # macOS: Install Jan app
├── macos_post_cleanup.sh # macOS: Post-test cleanup
├── run_tests.sh # Unix: Run AutoQA tests (Ubuntu/macOS)
├── README.md # This file
└── PERMISSIONS.md # Permission setup documentation
```
## Script Functions
### Windows Scripts (.ps1)
- **windows_cleanup.ps1**: Removes existing Jan installations and kills running processes
- **windows_download.ps1**: Downloads Jan installer with priority-based URL selection
- **windows_install.ps1**: Installs Jan app and sets environment variables
- **windows_post_cleanup.ps1**: Comprehensive cleanup after tests including uninstallation
- **run_tests.ps1**: Runs the AutoQA Python tests with proper arguments
### Ubuntu Scripts (.sh)
- **ubuntu_cleanup.sh**: Removes existing Jan installations and kills running processes
- **ubuntu_download.sh**: Downloads Jan .deb package with priority-based URL selection
- **ubuntu_install.sh**: Installs Jan .deb package and sets environment variables
- **ubuntu_post_cleanup.sh**: Comprehensive cleanup after tests including package removal
### macOS Scripts (.sh)
- **macos_cleanup.sh**: Removes existing Jan installations and kills running processes
- **macos_download.sh**: Downloads Jan .dmg package with priority-based URL selection
- **macos_install.sh**: Mounts DMG, extracts .app, and installs to Applications
- **macos_post_cleanup.sh**: Comprehensive cleanup after tests
### Common Scripts
- **setup_permissions.sh**: Automatically sets executable permissions for all shell scripts
- **run_tests.sh**: Platform-agnostic test runner for Unix-based systems (Ubuntu/macOS)
## Usage in GitHub Actions
These scripts are called from the `.github/workflows/autoqa.yml` workflow file:
```yaml
# Setup permissions first (Ubuntu/macOS)
- name: Setup script permissions
run: |
chmod +x autoqa/scripts/setup_permissions.sh
./autoqa/scripts/setup_permissions.sh
# Then use scripts without chmod
- name: Clean existing Jan installations
run: |
./autoqa/scripts/ubuntu_cleanup.sh
# Windows example (no chmod needed)
- name: Clean existing Jan installations
shell: powershell
run: |
.\autoqa\scripts\windows_cleanup.ps1
```
## Benefits
1. **Maintainability**: Complex scripts are in separate files, easier to read and modify
2. **Reusability**: Scripts can be reused across different workflows or locally
3. **Testing**: Scripts can be tested independently
4. **Version Control**: Better diff tracking for script changes
5. **Platform Consistency**: Similar functionality across platforms in separate files
## Development
When modifying these scripts:
1. Test them locally on the respective platforms
2. Ensure proper error handling and exit codes
3. Follow platform-specific best practices
4. Update this README if new scripts are added
## Script Parameters
### Windows Scripts
- Most scripts accept `-IsNightly` parameter to handle nightly vs stable builds
- Download script accepts multiple URL sources with priority ordering
### Unix Scripts
- Most scripts accept positional parameters for nightly flag and URLs
- Scripts use `$1`, `$2`, etc. for parameter access
## Environment Variables
Scripts set these environment variables for subsequent workflow steps:
- `JAN_APP_URL`: The selected Jan app download URL
- `IS_NIGHTLY`: Boolean flag indicating if it's a nightly build
- `JAN_APP_PATH`: Path to the installed Jan executable
- `JAN_PROCESS_NAME`: Name of the Jan process for monitoring

View File

@ -0,0 +1,34 @@
#!/bin/bash
# macOS cleanup script for Jan app
echo "Cleaning existing Jan installations..."
# Kill any running Jan processes (both regular and nightly)
pkill -f "Jan" || true
pkill -f "jan" || true
pkill -f "Jan-nightly" || true
pkill -f "jan-nightly" || true
# Remove Jan app directories
rm -rf /Applications/Jan.app
rm -rf /Applications/Jan-nightly.app
rm -rf ~/Applications/Jan.app
rm -rf ~/Applications/Jan-nightly.app
# Remove Jan data folders (both regular and nightly)
rm -rf ~/Library/Application\ Support/Jan
rm -rf ~/Library/Application\ Support/Jan-nightly
rm -rf ~/Library/Application\ Support/jan.ai.app
rm -rf ~/Library/Application\ Support/jan-nightly.ai.app
rm -rf ~/Library/Preferences/jan.*
rm -rf ~/Library/Preferences/jan-nightly.*
rm -rf ~/Library/Caches/jan.*
rm -rf ~/Library/Caches/jan-nightly.*
rm -rf ~/Library/Caches/jan.ai.app
rm -rf ~/Library/Caches/jan-nightly.ai.app
rm -rf ~/Library/WebKit/jan.ai.app
rm -rf ~/Library/WebKit/jan-nightly.ai.app
rm -rf ~/Library/Saved\ Application\ State/jan.ai.app
rm -rf ~/Library/Saved\ Application\ State/jan-nightly.ai.app
echo "Jan cleanup completed"

View File

@ -0,0 +1,49 @@
#!/bin/bash
# macOS download script for Jan app
WORKFLOW_INPUT_URL="$1"
WORKFLOW_INPUT_IS_NIGHTLY="$2"
REPO_VARIABLE_URL="$3"
REPO_VARIABLE_IS_NIGHTLY="$4"
DEFAULT_URL="$5"
DEFAULT_IS_NIGHTLY="$6"
# Determine Jan app URL and nightly flag from multiple sources (priority order):
# 1. Workflow dispatch input (manual trigger)
# 2. Repository variable JAN_APP_URL
# 3. Default URL from env
JAN_APP_URL=""
IS_NIGHTLY="false"
if [ -n "$WORKFLOW_INPUT_URL" ]; then
JAN_APP_URL="$WORKFLOW_INPUT_URL"
IS_NIGHTLY="$WORKFLOW_INPUT_IS_NIGHTLY"
echo "Using Jan app URL from workflow input: $JAN_APP_URL"
echo "Is nightly build: $IS_NIGHTLY"
elif [ -n "$REPO_VARIABLE_URL" ]; then
JAN_APP_URL="$REPO_VARIABLE_URL"
IS_NIGHTLY="$REPO_VARIABLE_IS_NIGHTLY"
echo "Using Jan app URL from repository variable: $JAN_APP_URL"
echo "Is nightly build: $IS_NIGHTLY"
else
JAN_APP_URL="$DEFAULT_URL"
IS_NIGHTLY="$DEFAULT_IS_NIGHTLY"
echo "Using default Jan app URL: $JAN_APP_URL"
echo "Is nightly build: $IS_NIGHTLY"
fi
# Export for later steps
echo "JAN_APP_URL=$JAN_APP_URL" >> $GITHUB_ENV
echo "IS_NIGHTLY=$IS_NIGHTLY" >> $GITHUB_ENV
echo "Downloading Jan app from: $JAN_APP_URL"
curl -L -o "/tmp/jan-installer.dmg" "$JAN_APP_URL"
if [ ! -f "/tmp/jan-installer.dmg" ]; then
echo "❌ Failed to download Jan app"
exit 1
fi
echo "✅ Successfully downloaded Jan app"
ls -la "/tmp/jan-installer.dmg"

View File

@ -0,0 +1,86 @@
#!/bin/bash
# macOS install script for Jan app
echo "Installing Jan app from DMG..."
# Mount the DMG
hdiutil attach "/tmp/jan-installer.dmg" -mountpoint "/tmp/jan-mount"
# Find the .app file in the mounted DMG
APP_FILE=$(find "/tmp/jan-mount" -name "*.app" -type d | head -1)
if [ -z "$APP_FILE" ]; then
echo "❌ No .app file found in DMG"
hdiutil detach "/tmp/jan-mount" || true
exit 1
fi
echo "Found app file: $APP_FILE"
# Copy to Applications directory
cp -R "$APP_FILE" /Applications/
# Unmount the DMG
hdiutil detach "/tmp/jan-mount"
# Determine app name and executable path
APP_NAME=$(basename "$APP_FILE")
echo "App name: $APP_NAME"
# First, check what's actually in the MacOS folder
echo "Contents of MacOS folder:"
ls -la "/Applications/$APP_NAME/Contents/MacOS/"
# Find all executable files in MacOS folder
echo "Looking for executable files..."
find "/Applications/$APP_NAME/Contents/MacOS/" -type f -perm +111 -ls
# Try to find the main executable - it's usually the one with the same name as the app (without .app)
APP_BASE_NAME=$(basename "$APP_NAME" .app)
POTENTIAL_EXECUTABLES=(
"/Applications/$APP_NAME/Contents/MacOS/$APP_BASE_NAME"
"/Applications/$APP_NAME/Contents/MacOS/Jan"
"/Applications/$APP_NAME/Contents/MacOS/Jan-nightly"
)
APP_PATH=""
for potential_exec in "${POTENTIAL_EXECUTABLES[@]}"; do
echo "Checking: $potential_exec"
if [ -f "$potential_exec" ] && [ -x "$potential_exec" ]; then
APP_PATH="$potential_exec"
echo "Found executable: $APP_PATH"
break
fi
done
# If still not found, get any executable file
if [ -z "$APP_PATH" ]; then
echo "No predefined executable found, searching for any executable..."
APP_PATH=$(find "/Applications/$APP_NAME/Contents/MacOS/" -type f -perm +111 | head -1)
fi
if [ -z "$APP_PATH" ]; then
echo "❌ No executable found in MacOS folder"
ls -la "/Applications/$APP_NAME/Contents/MacOS/"
exit 1
fi
PROCESS_NAME=$(basename "$APP_PATH")
echo "App installed at: /Applications/$APP_NAME"
echo "Executable path: $APP_PATH"
echo "Process name: $PROCESS_NAME"
# Export for test step
echo "JAN_APP_PATH=$APP_PATH" >> $GITHUB_ENV
echo "PROCESS_NAME=$PROCESS_NAME" >> $GITHUB_ENV
# Verify installation
if [ -f "$APP_PATH" ]; then
echo "✅ Jan app installed successfully"
ls -la "/Applications/$APP_NAME"
else
echo "❌ Jan app installation failed - executable not found"
exit 1
fi

View File

@ -0,0 +1,38 @@
#!/bin/bash
# macOS post-test cleanup script
echo "Cleaning up after tests..."
# Kill any running Jan processes (both regular and nightly)
pkill -f "Jan" || true
pkill -f "jan" || true
pkill -f "Jan-nightly" || true
pkill -f "jan-nightly" || true
# Remove Jan app directories
rm -rf /Applications/Jan.app
rm -rf /Applications/Jan-nightly.app
rm -rf ~/Applications/Jan.app
rm -rf ~/Applications/Jan-nightly.app
# Remove Jan data folders (both regular and nightly)
rm -rf ~/Library/Application\ Support/Jan
rm -rf ~/Library/Application\ Support/Jan-nightly
rm -rf ~/Library/Application\ Support/jan.ai.app
rm -rf ~/Library/Application\ Support/jan-nightly.ai.app
rm -rf ~/Library/Preferences/jan.*
rm -rf ~/Library/Preferences/jan-nightly.*
rm -rf ~/Library/Caches/jan.*
rm -rf ~/Library/Caches/jan-nightly.*
rm -rf ~/Library/Caches/jan.ai.app
rm -rf ~/Library/Caches/jan-nightly.ai.app
rm -rf ~/Library/WebKit/jan.ai.app
rm -rf ~/Library/WebKit/jan-nightly.ai.app
rm -rf ~/Library/Saved\ Application\ State/jan.ai.app
rm -rf ~/Library/Saved\ Application\ State/jan-nightly.ai.app
# Clean up downloaded installer
rm -f "/tmp/jan-installer.dmg"
rm -rf "/tmp/jan-mount"
echo "Cleanup completed"

View File

@ -0,0 +1,31 @@
#!/usr/bin/env pwsh
# Windows test runner script
param(
[string]$JanAppPath,
[string]$ProcessName,
[string]$RpToken
)
Write-Host "Starting Auto QA Tests..."
Write-Host "Jan app path: $JanAppPath"
Write-Host "Process name: $ProcessName"
Write-Host "Current working directory: $(Get-Location)"
Write-Host "Contents of current directory:"
Get-ChildItem
Write-Host "Contents of trajectories directory (if exists):"
if (Test-Path "trajectories") {
Get-ChildItem "trajectories"
} else {
Write-Host "trajectories directory not found"
}
# Run the main test with proper arguments
if ($JanAppPath -and $ProcessName) {
python main.py --enable-reportportal --rp-token "$RpToken" --jan-app-path "$JanAppPath" --jan-process-name "$ProcessName"
} elseif ($JanAppPath) {
python main.py --enable-reportportal --rp-token "$RpToken" --jan-app-path "$JanAppPath"
} else {
python main.py --enable-reportportal --rp-token "$RpToken"
}

View File

@ -0,0 +1,69 @@
#!/bin/bash
# Common test runner script
JAN_APP_PATH="$1"
PROCESS_NAME="$2"
RP_TOKEN="$3"
PLATFORM="$4"
echo "Starting Auto QA Tests..."
echo "Platform: $PLATFORM"
echo "Jan app path: $JAN_APP_PATH"
echo "Process name: $PROCESS_NAME"
# Platform-specific setup
if [ "$PLATFORM" = "ubuntu" ]; then
# Get the current display session
export DISPLAY=$(w -h | awk 'NR==1 {print $2}')
echo "Display ID: $DISPLAY"
# Verify display is working
if [ -z "$DISPLAY" ]; then
echo "No display session found, falling back to :0"
export DISPLAY=:0
fi
echo "Using display: $DISPLAY"
# Test display connection
xdpyinfo -display $DISPLAY >/dev/null 2>&1 || {
echo "Display $DISPLAY is not available"
exit 1
}
# Make Jan executable if needed
if [ -f "/usr/bin/Jan-nightly" ]; then
sudo chmod +x /usr/bin/Jan-nightly
fi
if [ -f "/usr/bin/Jan" ]; then
sudo chmod +x /usr/bin/Jan
fi
fi
# macOS specific setup
if [ "$PLATFORM" = "macos" ]; then
# Verify Jan app path
if [ ! -f "$JAN_APP_PATH" ]; then
echo "❌ Jan app not found at: $JAN_APP_PATH"
echo "Available files in /Applications:"
ls -la /Applications/ | grep -i jan || echo "No Jan apps found"
exit 1
fi
fi
# Change to autoqa directory to ensure correct working directory
cd "$(dirname "$0")/.."
echo "Current working directory: $(pwd)"
echo "Contents of current directory:"
ls -la
echo "Contents of trajectories directory (if exists):"
ls -la trajectories/ 2>/dev/null || echo "trajectories directory not found"
# Run the main test with proper arguments
if [ -n "$JAN_APP_PATH" ] && [ -n "$PROCESS_NAME" ]; then
python main.py --enable-reportportal --rp-token "$RP_TOKEN" --jan-app-path "$JAN_APP_PATH" --jan-process-name "$PROCESS_NAME"
elif [ -n "$JAN_APP_PATH" ]; then
python main.py --enable-reportportal --rp-token "$RP_TOKEN" --jan-app-path "$JAN_APP_PATH"
else
python main.py --enable-reportportal --rp-token "$RP_TOKEN"
fi

View File

@ -0,0 +1,15 @@
#!/bin/bash
# Setup script permissions for AutoQA scripts
echo "Setting up permissions for AutoQA scripts..."
# Get the directory where this script is located
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Make all shell scripts executable
chmod +x "$SCRIPT_DIR"/*.sh
echo "✅ All shell scripts are now executable:"
ls -la "$SCRIPT_DIR"/*.sh
echo "✅ Permission setup completed"

View File

@ -0,0 +1,22 @@
#!/bin/bash
# Ubuntu cleanup script for Jan app
echo "Cleaning existing Jan installations..."
# Remove Jan data folders (both regular and nightly)
rm -rf ~/.config/Jan
rm -rf ~/.config/Jan-nightly
rm -rf ~/.local/share/Jan
rm -rf ~/.local/share/Jan-nightly
rm -rf ~/.cache/jan
rm -rf ~/.cache/jan-nightly
rm -rf ~/.local/share/jan-nightly.ai.app
rm -rf ~/.local/share/jan.ai.app
# Kill any running Jan processes (both regular and nightly)
pkill -f "Jan" || true
pkill -f "jan" || true
pkill -f "Jan-nightly" || true
pkill -f "jan-nightly" || true
echo "Jan cleanup completed"

View File

@ -0,0 +1,57 @@
#!/bin/bash
# Ubuntu download script for Jan app
WORKFLOW_INPUT_URL="$1"
WORKFLOW_INPUT_IS_NIGHTLY="$2"
REPO_VARIABLE_URL="$3"
REPO_VARIABLE_IS_NIGHTLY="$4"
DEFAULT_URL="$5"
DEFAULT_IS_NIGHTLY="$6"
# Determine Jan app URL and nightly flag from multiple sources (priority order):
# 1. Workflow dispatch input (manual trigger)
# 2. Repository variable JAN_APP_URL_LINUX
# 3. Default URL from env
JAN_APP_URL=""
IS_NIGHTLY=false
if [ -n "$WORKFLOW_INPUT_URL" ]; then
JAN_APP_URL="$WORKFLOW_INPUT_URL"
IS_NIGHTLY="$WORKFLOW_INPUT_IS_NIGHTLY"
echo "Using Jan app URL from workflow input: $JAN_APP_URL"
echo "Is nightly build: $IS_NIGHTLY"
elif [ -n "$REPO_VARIABLE_URL" ]; then
JAN_APP_URL="$REPO_VARIABLE_URL"
IS_NIGHTLY="$REPO_VARIABLE_IS_NIGHTLY"
echo "Using Jan app URL from repository variable: $JAN_APP_URL"
echo "Is nightly build: $IS_NIGHTLY"
else
JAN_APP_URL="$DEFAULT_URL"
IS_NIGHTLY="$DEFAULT_IS_NIGHTLY"
echo "Using default Jan app URL: $JAN_APP_URL"
echo "Is nightly build: $IS_NIGHTLY"
fi
# Set environment variables for later steps
echo "JAN_APP_URL=$JAN_APP_URL" >> $GITHUB_ENV
echo "IS_NIGHTLY=$IS_NIGHTLY" >> $GITHUB_ENV
echo "Downloading Jan app from: $JAN_APP_URL"
DOWNLOAD_PATH="/tmp/jan-installer.deb"
# Download the package
if ! wget "$JAN_APP_URL" -O "$DOWNLOAD_PATH"; then
echo "Failed to download Jan app"
exit 1
fi
if [ -f "$DOWNLOAD_PATH" ]; then
FILE_SIZE=$(stat -c%s "$DOWNLOAD_PATH")
echo "Downloaded Jan app successfully. Size: $FILE_SIZE bytes"
echo "File saved to: $DOWNLOAD_PATH"
else
echo "Downloaded file not found"
exit 1
fi

View File

@ -0,0 +1,34 @@
#!/bin/bash
# Ubuntu install script for Jan app
IS_NIGHTLY="$1"
INSTALLER_PATH="/tmp/jan-installer.deb"
echo "Installing Jan app..."
echo "Is nightly build: $IS_NIGHTLY"
# Install the .deb package
sudo apt install "$INSTALLER_PATH" -y
sudo apt-get install -f -y
# Wait for installation to complete
sleep 10
# Verify installation based on nightly flag
if [ "$IS_NIGHTLY" = "true" ]; then
DEFAULT_JAN_PATH="/usr/bin/Jan-nightly"
PROCESS_NAME="Jan-nightly"
else
DEFAULT_JAN_PATH="/usr/bin/Jan"
PROCESS_NAME="Jan"
fi
if [ -f "$DEFAULT_JAN_PATH" ]; then
echo "Jan app installed successfully at: $DEFAULT_JAN_PATH"
echo "JAN_APP_PATH=$DEFAULT_JAN_PATH" >> $GITHUB_ENV
echo "JAN_PROCESS_NAME=$PROCESS_NAME" >> $GITHUB_ENV
else
echo "Jan app not found at expected location: $DEFAULT_JAN_PATH"
echo "Will auto-detect during test run"
fi

View File

@ -0,0 +1,44 @@
#!/bin/bash
# Ubuntu post-test cleanup script
IS_NIGHTLY="$1"
echo "Cleaning up after tests..."
# Kill any running Jan processes (both regular and nightly)
pkill -f "Jan" || true
pkill -f "jan" || true
pkill -f "Jan-nightly" || true
pkill -f "jan-nightly" || true
# Remove Jan data folders (both regular and nightly)
rm -rf ~/.config/Jan
rm -rf ~/.config/Jan-nightly
rm -rf ~/.local/share/Jan
rm -rf ~/.local/share/Jan-nightly
rm -rf ~/.cache/jan
rm -rf ~/.cache/jan-nightly
rm -rf ~/.local/share/jan-nightly.ai.app
rm -rf ~/.local/share/jan.ai.app
# Try to uninstall Jan app
if [ "$IS_NIGHTLY" = "true" ]; then
PACKAGE_NAME="jan-nightly"
else
PACKAGE_NAME="jan"
fi
echo "Attempting to uninstall package: $PACKAGE_NAME"
if dpkg -l | grep -q "$PACKAGE_NAME"; then
echo "Found package $PACKAGE_NAME, uninstalling..."
sudo dpkg -r "$PACKAGE_NAME" || true
sudo apt-get autoremove -y || true
else
echo "Package $PACKAGE_NAME not found in dpkg list"
fi
# Clean up downloaded installer
rm -f "/tmp/jan-installer.deb"
echo "Cleanup completed"

View File

@ -0,0 +1,50 @@
#!/usr/bin/env pwsh
# Windows cleanup script for Jan app
param(
[string]$IsNightly = "false"
)
Write-Host "Cleaning existing Jan installations..."
# Remove Jan data folders (both regular and nightly)
$janAppData = "$env:APPDATA\Jan"
$janNightlyAppData = "$env:APPDATA\Jan-nightly"
$janLocalAppData = "$env:LOCALAPPDATA\jan.ai.app"
$janNightlyLocalAppData = "$env:LOCALAPPDATA\jan-nightly.ai.app"
if (Test-Path $janAppData) {
Write-Host "Removing $janAppData"
Remove-Item -Path $janAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janNightlyAppData) {
Write-Host "Removing $janNightlyAppData"
Remove-Item -Path $janNightlyAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janLocalAppData) {
Write-Host "Removing $janLocalAppData"
Remove-Item -Path $janLocalAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janNightlyLocalAppData) {
Write-Host "Removing $janNightlyLocalAppData"
Remove-Item -Path $janNightlyLocalAppData -Recurse -Force -ErrorAction SilentlyContinue
}
# Kill any running Jan processes (both regular and nightly)
Get-Process -Name "Jan" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
Get-Process -Name "jan" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
Get-Process -Name "Jan-nightly" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
Get-Process -Name "jan-nightly" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
# Remove Jan extensions folder
$janExtensionsPath = "$env:USERPROFILE\jan\extensions"
if (Test-Path $janExtensionsPath) {
Write-Host "Removing $janExtensionsPath"
Remove-Item -Path $janExtensionsPath -Recurse -Force -ErrorAction SilentlyContinue
}
Write-Host "Jan cleanup completed"

View File

@ -0,0 +1,63 @@
#!/usr/bin/env pwsh
# Windows download script for Jan app
param(
[string]$WorkflowInputUrl = "",
[string]$WorkflowInputIsNightly = "",
[string]$RepoVariableUrl = "",
[string]$RepoVariableIsNightly = "",
[string]$DefaultUrl = "",
[string]$DefaultIsNightly = ""
)
# Determine Jan app URL and nightly flag from multiple sources (priority order):
# 1. Workflow dispatch input (manual trigger)
# 2. Repository variable JAN_APP_URL
# 3. Default URL from env
$janAppUrl = ""
$isNightly = $false
if ($WorkflowInputUrl -ne "") {
$janAppUrl = $WorkflowInputUrl
$isNightly = [System.Convert]::ToBoolean($WorkflowInputIsNightly)
Write-Host "Using Jan app URL from workflow input: $janAppUrl"
Write-Host "Is nightly build: $isNightly"
}
elseif ($RepoVariableUrl -ne "") {
$janAppUrl = $RepoVariableUrl
$isNightly = [System.Convert]::ToBoolean($RepoVariableIsNightly)
Write-Host "Using Jan app URL from repository variable: $janAppUrl"
Write-Host "Is nightly build: $isNightly"
}
else {
$janAppUrl = $DefaultUrl
$isNightly = [System.Convert]::ToBoolean($DefaultIsNightly)
Write-Host "Using default Jan app URL: $janAppUrl"
Write-Host "Is nightly build: $isNightly"
}
# Set environment variables for later steps
Write-Output "JAN_APP_URL=$janAppUrl" >> $env:GITHUB_ENV
Write-Output "IS_NIGHTLY=$isNightly" >> $env:GITHUB_ENV
Write-Host "Downloading Jan app from: $janAppUrl"
$downloadPath = "$env:TEMP\jan-installer.exe"
try {
# Use wget for better performance
wget.exe "$janAppUrl" -O "$downloadPath"
if (Test-Path $downloadPath) {
$fileSize = (Get-Item $downloadPath).Length
Write-Host "Downloaded Jan app successfully. Size: $fileSize bytes"
Write-Host "File saved to: $downloadPath"
} else {
throw "Downloaded file not found"
}
}
catch {
Write-Error "Failed to download Jan app: $_"
exit 1
}

View File

@ -0,0 +1,43 @@
#!/usr/bin/env pwsh
# Windows install script for Jan app
param(
[string]$IsNightly = "false"
)
$installerPath = "$env:TEMP\jan-installer.exe"
$isNightly = [System.Convert]::ToBoolean($IsNightly)
Write-Host "Installing Jan app..."
Write-Host "Is nightly build: $isNightly"
# Try silent installation first
try {
Start-Process -FilePath $installerPath -ArgumentList "/S" -Wait -NoNewWindow
Write-Host "Jan app installed silently"
}
catch {
Write-Host "Silent installation failed, trying normal installation..."
Start-Process -FilePath $installerPath -Wait -NoNewWindow
}
# Wait a bit for installation to complete
Start-Sleep -Seconds 10
# Verify installation based on nightly flag
if ($isNightly) {
$defaultJanPath = "$env:LOCALAPPDATA\Programs\jan-nightly\Jan-nightly.exe"
$processName = "Jan-nightly.exe"
} else {
$defaultJanPath = "$env:LOCALAPPDATA\Programs\jan\Jan.exe"
$processName = "Jan.exe"
}
if (Test-Path $defaultJanPath) {
Write-Host "Jan app installed successfully at: $defaultJanPath"
Write-Output "JAN_APP_PATH=$defaultJanPath" >> $env:GITHUB_ENV
Write-Output "JAN_PROCESS_NAME=$processName" >> $env:GITHUB_ENV
} else {
Write-Warning "Jan app not found at expected location: $defaultJanPath"
Write-Host "Will auto-detect during test run"
}

View File

@ -0,0 +1,102 @@
#!/usr/bin/env pwsh
# Windows post-test cleanup script
param(
[string]$IsNightly = "false"
)
Write-Host "Cleaning up after tests..."
# Kill any running Jan processes (both regular and nightly)
Get-Process -Name "Jan" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
Get-Process -Name "jan" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
Get-Process -Name "Jan-nightly" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
Get-Process -Name "jan-nightly" -ErrorAction SilentlyContinue | Stop-Process -Force -ErrorAction SilentlyContinue
# Remove Jan data folders (both regular and nightly)
$janAppData = "$env:APPDATA\Jan"
$janNightlyAppData = "$env:APPDATA\Jan-nightly"
$janLocalAppData = "$env:LOCALAPPDATA\jan.ai.app"
$janNightlyLocalAppData = "$env:LOCALAPPDATA\jan-nightly.ai.app"
$janProgramsPath = "$env:LOCALAPPDATA\Programs\Jan"
$janNightlyProgramsPath = "$env:LOCALAPPDATA\Programs\Jan-nightly"
if (Test-Path $janAppData) {
Write-Host "Removing $janAppData"
Remove-Item -Path $janAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janNightlyAppData) {
Write-Host "Removing $janNightlyAppData"
Remove-Item -Path $janNightlyAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janLocalAppData) {
Write-Host "Removing $janLocalAppData"
Remove-Item -Path $janLocalAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janNightlyLocalAppData) {
Write-Host "Removing $janNightlyLocalAppData"
Remove-Item -Path $janNightlyLocalAppData -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janProgramsPath) {
Write-Host "Removing $janProgramsPath"
Remove-Item -Path $janProgramsPath -Recurse -Force -ErrorAction SilentlyContinue
}
if (Test-Path $janNightlyProgramsPath) {
Write-Host "Removing $janNightlyProgramsPath"
Remove-Item -Path $janNightlyProgramsPath -Recurse -Force -ErrorAction SilentlyContinue
}
# Remove Jan extensions folder
$janExtensionsPath = "$env:USERPROFILE\jan\extensions"
if (Test-Path $janExtensionsPath) {
Write-Host "Removing $janExtensionsPath"
Remove-Item -Path $janExtensionsPath -Recurse -Force -ErrorAction SilentlyContinue
}
# Try to uninstall Jan app silently
try {
$isNightly = [System.Convert]::ToBoolean($IsNightly)
# Determine uninstaller path based on nightly flag
if ($isNightly) {
$uninstallerPath = "$env:LOCALAPPDATA\Programs\jan-nightly\uninstall.exe"
$installPath = "$env:LOCALAPPDATA\Programs\jan-nightly"
} else {
$uninstallerPath = "$env:LOCALAPPDATA\Programs\jan\uninstall.exe"
$installPath = "$env:LOCALAPPDATA\Programs\jan"
}
Write-Host "Looking for uninstaller at: $uninstallerPath"
if (Test-Path $uninstallerPath) {
Write-Host "Found uninstaller, attempting silent uninstall..."
Start-Process -FilePath $uninstallerPath -ArgumentList "/S" -Wait -NoNewWindow -ErrorAction SilentlyContinue
Write-Host "Uninstall completed"
} else {
Write-Host "No uninstaller found, attempting manual cleanup..."
if (Test-Path $installPath) {
Write-Host "Removing installation directory: $installPath"
Remove-Item -Path $installPath -Recurse -Force -ErrorAction SilentlyContinue
}
}
Write-Host "Jan app cleanup completed"
}
catch {
Write-Warning "Failed to uninstall Jan app cleanly: $_"
Write-Host "Manual cleanup may be required"
}
# Clean up downloaded installer
$installerPath = "$env:TEMP\jan-installer.exe"
if (Test-Path $installerPath) {
Remove-Item -Path $installerPath -Force -ErrorAction SilentlyContinue
}
Write-Host "Cleanup completed"

319
autoqa/test_runner.py Normal file
View File

@ -0,0 +1,319 @@
import os
import asyncio
import threading
import time
import logging
from datetime import datetime
from pathlib import Path
# from computer import Computer
from agent import ComputerAgent, LLM
from utils import is_jan_running, force_close_jan, start_jan_app, get_latest_trajectory_folder
from screen_recorder import ScreenRecorder
from reportportal_handler import upload_test_results_to_rp
from reportportal_client.helpers import timestamp
logger = logging.getLogger(__name__)
async def run_single_test_with_timeout(computer, test_data, rp_client, launch_id, max_turns=30,
jan_app_path=None, jan_process_name="Jan.exe", agent_config=None,
enable_reportportal=False):
"""
Run a single test case with turn count monitoring, forced stop, and screen recording
Returns dict with test result: {"success": bool, "status": str, "message": str}
"""
path = test_data['path']
prompt = test_data['prompt']
# Default agent config if not provided
if agent_config is None:
agent_config = {
"loop": "uitars",
"model_provider": "oaicompat",
"model_name": "ByteDance-Seed/UI-TARS-1.5-7B",
"model_base_url": "http://10.200.108.58:1234/v1"
}
# Create trajectory_dir from path (remove .txt extension)
trajectory_name = str(Path(path).with_suffix(''))
trajectory_base_dir = os.path.abspath(f"trajectories/{trajectory_name.replace(os.sep, '/')}")
# Ensure trajectories directory exists
os.makedirs(os.path.dirname(trajectory_base_dir), exist_ok=True)
# Create recordings directory
recordings_dir = "recordings"
os.makedirs(recordings_dir, exist_ok=True)
# Create video filename
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_test_name = trajectory_name.replace('/', '_').replace('\\', '_')
video_filename = f"{safe_test_name}_{current_time}.mp4"
video_path = os.path.abspath(os.path.join(recordings_dir, video_filename))
# Initialize result tracking
test_result_data = {
"success": False,
"status": "UNKNOWN",
"message": "Test execution incomplete",
"trajectory_dir": None,
"video_path": video_path
}
logger.info(f"Starting test: {path}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Trajectory base directory: {trajectory_base_dir}")
logger.info(f"Screen recording will be saved to: {video_path}")
logger.info(f"Using model: {agent_config['model_name']} from {agent_config['model_base_url']}")
logger.info(f"ReportPortal upload: {'ENABLED' if enable_reportportal else 'DISABLED'}")
trajectory_dir = None
agent_task = None
monitor_stop_event = threading.Event()
force_stopped_due_to_turns = False # Track if test was force stopped
# Initialize screen recorder
recorder = ScreenRecorder(video_path, fps=10)
try:
# Step 1: Check and force close Jan app if running
if is_jan_running(jan_process_name):
logger.info("Jan application is running, force closing...")
force_close_jan(jan_process_name)
# Step 2: Start Jan app in maximized mode
if jan_app_path:
start_jan_app(jan_app_path)
else:
start_jan_app() # Use default path
# Step 3: Start screen recording
recorder.start_recording()
# Step 4: Create agent for this test using config
agent = ComputerAgent(
computer=computer,
loop=agent_config["loop"],
model=LLM(
provider=agent_config["model_provider"],
name=agent_config["model_name"],
provider_base_url=agent_config["model_base_url"]
),
trajectory_dir=trajectory_base_dir
)
# Step 5: Start monitoring thread
def monitor_thread():
nonlocal force_stopped_due_to_turns
while not monitor_stop_event.is_set():
try:
if os.path.exists(trajectory_base_dir):
folders = [f for f in os.listdir(trajectory_base_dir)
if os.path.isdir(os.path.join(trajectory_base_dir, f))]
if folders:
latest_folder = sorted(folders)[-1]
latest_folder_path = os.path.join(trajectory_base_dir, latest_folder)
if os.path.exists(latest_folder_path):
turn_folders = [f for f in os.listdir(latest_folder_path)
if os.path.isdir(os.path.join(latest_folder_path, f)) and f.startswith("turn_")]
turn_count = len(turn_folders)
logger.info(f"Current turn count: {turn_count}")
if turn_count >= max_turns:
logger.warning(f"Turn count exceeded {max_turns} for test {path}, forcing stop")
force_stopped_due_to_turns = True # Mark as force stopped
# Cancel the agent task
if agent_task and not agent_task.done():
agent_task.cancel()
monitor_stop_event.set()
return
# Check every 5 seconds
if not monitor_stop_event.wait(5):
continue
else:
break
except Exception as e:
logger.error(f"Error in monitor thread: {e}")
time.sleep(5)
# Start monitoring in background thread
monitor_thread_obj = threading.Thread(target=monitor_thread, daemon=True)
monitor_thread_obj.start()
# Step 6: Run the test with prompt
logger.info(f"Running test case: {path}")
try:
# Create the agent task
async def run_agent():
async for result in agent.run(prompt):
if monitor_stop_event.is_set():
logger.warning(f"Test {path} stopped due to turn limit")
break
logger.info(f"Test result for {path}: {result}")
print(result)
agent_task = asyncio.create_task(run_agent())
# Wait for agent task to complete or timeout
try:
await asyncio.wait_for(agent_task, timeout=600) # 10 minute timeout as backup
if not monitor_stop_event.is_set():
logger.info(f"Successfully completed test execution: {path}")
else:
logger.warning(f"Test {path} was stopped due to turn limit")
except asyncio.TimeoutError:
logger.warning(f"Test {path} timed out after 10 minutes")
agent_task.cancel()
except asyncio.CancelledError:
logger.warning(f"Test {path} was cancelled due to turn limit")
finally:
# Stop monitoring
monitor_stop_event.set()
except Exception as e:
logger.error(f"Error running test {path}: {e}")
monitor_stop_event.set()
# Update result data for exception case
test_result_data.update({
"success": False,
"status": "ERROR",
"message": f"Test execution failed with exception: {str(e)}",
"trajectory_dir": None
})
finally:
# Step 7: Stop screen recording
try:
recorder.stop_recording()
logger.info(f"Screen recording saved to: {video_path}")
except Exception as e:
logger.error(f"Error stopping screen recording: {e}")
# Step 8: Upload results to ReportPortal only if enabled
if enable_reportportal and rp_client and launch_id:
# Get trajectory folder first
trajectory_dir = get_latest_trajectory_folder(trajectory_base_dir)
try:
if trajectory_dir:
logger.info(f"Uploading results to ReportPortal for: {path}")
logger.info(f"Video path for upload: {video_path}")
logger.info(f"Video exists: {os.path.exists(video_path)}")
if os.path.exists(video_path):
logger.info(f"Video file size: {os.path.getsize(video_path)} bytes")
upload_test_results_to_rp(rp_client, launch_id, path, trajectory_dir, force_stopped_due_to_turns, video_path)
else:
logger.warning(f"Test completed but no trajectory found for: {path}")
# Handle case where test completed but no trajectory found
formatted_test_path = path.replace('\\', '/').replace('.txt', '').replace('/', '__')
test_item_id = rp_client.start_test_item(
launch_id=launch_id,
name=formatted_test_path,
start_time=timestamp(),
item_type="TEST"
)
rp_client.log(
time=timestamp(),
level="ERROR",
message="Test execution completed but no trajectory data found",
item_id=test_item_id
)
# Still upload video for failed test
if video_path and os.path.exists(video_path):
try:
with open(video_path, "rb") as video_file:
rp_client.log(
time=timestamp(),
level="INFO",
message="🎥 Screen recording of failed test",
item_id=test_item_id,
attachment={
"name": f"failed_test_recording_{formatted_test_path}.mp4",
"data": video_file.read(),
"mime": "video/x-msvideo"
}
)
except Exception as e:
logger.error(f"Error uploading video for failed test: {e}")
rp_client.finish_test_item(
item_id=test_item_id,
end_time=timestamp(),
status="FAILED"
)
except Exception as upload_error:
logger.error(f"Error uploading results for {path}: {upload_error}")
else:
# For non-ReportPortal mode, still get trajectory for final results
trajectory_dir = get_latest_trajectory_folder(trajectory_base_dir)
# Always process results for consistency (both RP and local mode)
# trajectory_dir is already set above, no need to call get_latest_trajectory_folder again
if trajectory_dir:
# Extract test result for processing
from reportportal_handler import extract_test_result_from_trajectory
if force_stopped_due_to_turns:
final_status = "FAILED"
status_message = "exceeded maximum turn limit ({} turns)".format(max_turns)
test_result_data.update({
"success": False,
"status": final_status,
"message": status_message,
"trajectory_dir": trajectory_dir
})
else:
test_result = extract_test_result_from_trajectory(trajectory_dir)
if test_result is True:
final_status = "PASSED"
status_message = "completed successfully with positive result"
test_result_data.update({
"success": True,
"status": final_status,
"message": status_message,
"trajectory_dir": trajectory_dir
})
else:
final_status = "FAILED"
status_message = "no valid success result found"
test_result_data.update({
"success": False,
"status": final_status,
"message": status_message,
"trajectory_dir": trajectory_dir
})
if not enable_reportportal:
# Local development mode - log results
logger.info(f"🏠 LOCAL RESULT: {path} - {final_status} ({status_message})")
logger.info(f"📹 Video saved: {video_path}")
logger.info(f"📁 Trajectory: {trajectory_dir}")
else:
final_status = "FAILED"
status_message = "no trajectory found"
test_result_data.update({
"success": False,
"status": final_status,
"message": status_message,
"trajectory_dir": None
})
if not enable_reportportal:
logger.warning(f"🏠 LOCAL RESULT: {path} - {final_status} ({status_message})")
# Step 9: Always force close Jan app after test completion
logger.info(f"Cleaning up after test: {path}")
force_close_jan(jan_process_name)
# Return test result
return test_result_data

View File

@ -0,0 +1,15 @@
prompt = """
You are going to test the Jan application by downloading and chatting with a model (qwen2.5).
Step-by-step instructions:
1. Given the Jan application is already opened.
2. In the **bottom-left corner**, click the **“Hub”** menu item.
3. Scroll through the model list or use the search bar to find **qwen2.5**.
4. Click **“Use”** on the qwen2.5 model.
5. Wait for the model to finish downloading and become ready.
6. Once redirected to the chat screen, type any message into the input box (e.g. `Hello qwen2.5`).
7. Press **Enter** to send the message.
8. Wait for the models response.
If the model responds correctly, return: {"result": True}, otherwise return: {"result": False}.
"""

343
autoqa/utils.py Normal file
View File

@ -0,0 +1,343 @@
import os
import logging
import subprocess
import psutil
import time
import pyautogui
import platform
from pathlib import Path
logger = logging.getLogger(__name__)
# Cross-platform window management
IS_LINUX = platform.system() == "Linux"
IS_WINDOWS = platform.system() == "Windows"
IS_MACOS = platform.system() == "Darwin"
if IS_WINDOWS:
try:
import pygetwindow as gw
except ImportError:
gw = None
logger.warning("pygetwindow not available on this system")
def is_jan_running(jan_process_name="Jan.exe"):
"""
Check if Jan application is currently running
"""
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'] and jan_process_name.lower() in proc.info['name'].lower():
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
def force_close_jan(jan_process_name="Jan.exe"):
"""
Force close Jan application if it's running
"""
logger.info("Checking for running Jan processes...")
closed_any = False
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'] and jan_process_name.lower() in proc.info['name'].lower():
logger.info(f"Force closing Jan process (PID: {proc.info['pid']})")
proc.kill()
closed_any = True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
if closed_any:
logger.info("Waiting for Jan processes to terminate...")
time.sleep(3) # Wait for processes to fully terminate
else:
logger.info("No Jan processes found running")
def find_jan_window_linux():
"""
Find Jan window on Linux using wmctrl
"""
try:
result = subprocess.run(['wmctrl', '-l'], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
for line in result.stdout.split('\n'):
if 'jan' in line.lower() or 'Jan' in line:
# Extract window ID (first column)
window_id = line.split()[0]
logger.info(f"Found Jan window with ID: {window_id}")
return window_id
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError) as e:
logger.warning(f"wmctrl command failed: {e}")
return None
def maximize_jan_window_linux():
"""
Maximize Jan window on Linux using wmctrl
"""
window_id = find_jan_window_linux()
if window_id:
try:
# Maximize window using wmctrl
subprocess.run(['wmctrl', '-i', '-r', window_id, '-b', 'add,maximized_vert,maximized_horz'],
timeout=5)
logger.info("Jan window maximized using wmctrl")
return True
except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"Failed to maximize with wmctrl: {e}")
# Fallback: Try xdotool
try:
result = subprocess.run(['xdotool', 'search', '--name', 'Jan'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
window_id = result.stdout.strip().split('\n')[0]
subprocess.run(['xdotool', 'windowactivate', window_id], timeout=5)
subprocess.run(['xdotool', 'key', 'alt+F10'], timeout=5) # Maximize shortcut
logger.info("Jan window maximized using xdotool")
return True
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError) as e:
logger.warning(f"xdotool command failed: {e}")
return False
def find_jan_window_macos():
"""
Find Jan window on macOS using AppleScript
"""
try:
# AppleScript to find Jan window
script = '''
tell application "System Events"
set janApps to (every process whose name contains "Jan")
if length of janApps > 0 then
return name of first item of janApps
else
return ""
end if
end tell
'''
result = subprocess.run(['osascript', '-e', script],
capture_output=True, text=True, timeout=10)
if result.returncode == 0 and result.stdout.strip():
app_name = result.stdout.strip()
logger.info(f"Found Jan app: {app_name}")
return app_name
except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError) as e:
logger.warning(f"AppleScript command failed: {e}")
return None
def maximize_jan_window_macos():
"""
Maximize Jan window on macOS using AppleScript
"""
app_name = find_jan_window_macos()
if app_name:
try:
# AppleScript to maximize window
script = f'''
tell application "System Events"
tell process "{app_name}"
set frontmost to true
tell window 1
set value of attribute "AXFullScreen" to true
end tell
end tell
end tell
'''
result = subprocess.run(['osascript', '-e', script], timeout=10)
if result.returncode == 0:
logger.info("Jan window maximized using AppleScript")
return True
except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
logger.warning(f"Failed to maximize with AppleScript: {e}")
# Fallback: Try Command+M (fullscreen hotkey on macOS)
try:
logger.info("Trying Cmd+Ctrl+F hotkey to maximize")
pyautogui.hotkey('cmd', 'ctrl', 'f')
time.sleep(1)
logger.info("Attempted to maximize using Cmd+Ctrl+F")
return True
except Exception as e:
logger.warning(f"Hotkey maximize failed: {e}")
return False
def maximize_jan_window():
"""
Find and maximize Jan window (cross-platform)
"""
try:
# Wait a bit for window to appear
time.sleep(2)
if IS_LINUX:
return maximize_jan_window_linux()
elif IS_MACOS:
return maximize_jan_window_macos()
elif IS_WINDOWS and gw:
# Method 1: Try to find window by title containing "Jan"
windows = gw.getWindowsWithTitle("Jan")
if windows:
jan_window = windows[0]
logger.info(f"Found Jan window: {jan_window.title}")
jan_window.maximize()
logger.info("Jan window maximized using pygetwindow")
return True
# Fallback methods for both platforms
# Method 2: Try Alt+Space then X (maximize hotkey) - works on both platforms
logger.info("Trying Alt+Space+X hotkey to maximize")
pyautogui.hotkey('alt', 'space')
time.sleep(0.5)
pyautogui.press('x')
logger.info("Attempted to maximize using Alt+Space+X")
return True
except Exception as e:
logger.warning(f"Could not maximize Jan window: {e}")
# Method 3: Platform-specific fallback
try:
if IS_WINDOWS:
logger.info("Trying Windows+Up arrow to maximize")
pyautogui.hotkey('win', 'up')
elif IS_LINUX:
logger.info("Trying Alt+F10 to maximize")
pyautogui.hotkey('alt', 'F10')
elif IS_MACOS:
logger.info("Trying macOS specific maximize")
pyautogui.hotkey('cmd', 'tab') # Switch to Jan if it's running
time.sleep(0.5)
return True
except Exception as e2:
logger.warning(f"All maximize methods failed: {e2}")
return False
def start_jan_app(jan_app_path=None):
"""
Start Jan application in maximized window (cross-platform)
"""
# Set default path based on platform
if jan_app_path is None:
if IS_WINDOWS:
jan_app_path = os.path.expanduser(r"~\AppData\Local\Programs\jan\Jan.exe")
elif IS_LINUX:
jan_app_path = "/usr/bin/Jan" # or "/usr/bin/Jan" for regular
elif IS_MACOS:
jan_app_path = "/Applications/Jan.app/Contents/MacOS/Jan" # Default macOS path
else:
raise NotImplementedError(f"Platform {platform.system()} not supported")
logger.info(f"Starting Jan application from: {jan_app_path}")
if not os.path.exists(jan_app_path):
logger.error(f"Jan executable not found at: {jan_app_path}")
raise FileNotFoundError(f"Jan app not found at {jan_app_path}")
try:
# Start the Jan application
if IS_WINDOWS:
subprocess.Popen([jan_app_path], shell=True)
elif IS_LINUX:
# On Linux, start with DISPLAY environment variable
env = os.environ.copy()
subprocess.Popen([jan_app_path], env=env)
elif IS_MACOS:
# On macOS, use 'open' command to launch .app bundle properly
if jan_app_path.endswith('.app/Contents/MacOS/Jan'):
# Use the .app bundle path instead
app_bundle = jan_app_path.replace('/Contents/MacOS/Jan', '')
subprocess.Popen(['open', app_bundle])
elif jan_app_path.endswith('.app'):
# Direct .app bundle
subprocess.Popen(['open', jan_app_path])
elif '/Contents/MacOS/' in jan_app_path:
# Extract app bundle from full executable path
app_bundle = jan_app_path.split('/Contents/MacOS/')[0]
subprocess.Popen(['open', app_bundle])
else:
# Fallback: try to execute directly
subprocess.Popen([jan_app_path])
else:
raise NotImplementedError(f"Platform {platform.system()} not supported")
logger.info("Jan application started")
# Wait for app to fully load
logger.info("Waiting for Jan application to initialize...")
time.sleep(5)
# Try to maximize the window
if maximize_jan_window():
logger.info("Jan application maximized successfully")
else:
logger.warning("Could not maximize Jan application window")
# Wait a bit more after maximizing
time.sleep(10)
logger.info("Jan application should be ready")
time.sleep(10) # Additional wait to ensure everything is ready
except Exception as e:
logger.error(f"Error starting Jan application: {e}")
raise
def scan_test_files(tests_dir="tests"):
"""
Scan tests folder and find all .txt files
Returns list with format [{'path': 'relative_path', 'prompt': 'file_content'}]
"""
test_files = []
tests_path = Path(tests_dir)
if not tests_path.exists():
logger.error(f"Tests directory {tests_dir} does not exist!")
return test_files
# Scan all .txt files in folder and subfolders
for txt_file in tests_path.rglob("*.txt"):
try:
# Read file content
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# Get relative path
relative_path = txt_file.relative_to(tests_path)
test_files.append({
'path': str(relative_path),
'prompt': content
})
logger.info(f"Found test file: {relative_path}")
except Exception as e:
logger.error(f"Error reading file {txt_file}: {e}")
return test_files
def get_latest_trajectory_folder(trajectory_base_path):
"""
Get the latest created folder in trajectory base path
"""
if not os.path.exists(trajectory_base_path):
logger.warning(f"Trajectory base path not found: {trajectory_base_path}")
return None
# Get all folders and sort by creation time (latest first)
folders = [f for f in os.listdir(trajectory_base_path)
if os.path.isdir(os.path.join(trajectory_base_path, f))]
if not folders:
logger.warning(f"No trajectory folders found in: {trajectory_base_path}")
return None
# Sort by folder name (assuming timestamp format like 20250715_100443)
folders.sort(reverse=True)
latest_folder = folders[0]
full_path = os.path.join(trajectory_base_path, latest_folder)
logger.info(f"Found latest trajectory folder: {full_path}")
return full_path