diff --git a/autoqa/main.py b/autoqa/main.py index c125ced0e..c26f9c3b7 100644 --- a/autoqa/main.py +++ b/autoqa/main.py @@ -12,6 +12,7 @@ from reportportal_client.helpers import timestamp from utils import scan_test_files from test_runner import run_single_test_with_timeout +from individual_migration_runner import run_individual_migration_test, run_all_migration_tests, MIGRATION_TEST_CASES # Configure logging logging.basicConfig( @@ -275,8 +276,8 @@ Examples: ) test_group.add_argument( '--tests-dir', - default=os.getenv('TESTS_DIR', 'tests'), - help='Directory containing test files (env: TESTS_DIR, default: %(default)s)' + default=os.getenv('TESTS_DIR', 'tests/base'), + help='Directory containing test files for current version testing (env: TESTS_DIR, default: %(default)s for base tests, tests/migration for migration tests)' ) test_group.add_argument( '--delay-between-tests', @@ -285,12 +286,73 @@ Examples: help='Delay in seconds between tests (env: DELAY_BETWEEN_TESTS, default: %(default)s)' ) + # Migration testing arguments + migration_group = parser.add_argument_group('Migration Testing Configuration') + migration_group.add_argument( + '--enable-migration-test', + action='store_true', + default=os.getenv('ENABLE_MIGRATION_TEST', 'false').lower() == 'true', + help='Enable migration testing mode (env: ENABLE_MIGRATION_TEST, default: false)' + ) + migration_group.add_argument( + '--old-version', + default=os.getenv('OLD_VERSION'), + help='Path to old version installer for migration testing (env: OLD_VERSION)' + ) + migration_group.add_argument( + '--new-version', + default=os.getenv('NEW_VERSION'), + help='Path to new version installer for migration testing (env: NEW_VERSION)' + ) + migration_group.add_argument( + '--migration-test-case', + default=os.getenv('MIGRATION_TEST_CASE'), + help='Specific migration test case(s) to run. Can be a single case or comma-separated list (e.g., "assistants" or "models,threads"). Available cases: appearance, threads, models, assistants, assistant-chat, assistants-complete, mcp-servers, local-api, proxy-settings, thread-conversations. Note: "assistants-complete" is only supported in batch mode. If not specified, runs all test cases. Use --list-migration-tests to see all available cases. (env: MIGRATION_TEST_CASE)' + ) + migration_group.add_argument( + '--migration-batch-mode', + action='store_true', + default=os.getenv('MIGRATION_BATCH_MODE', 'false').lower() == 'true', + help='Use batch migration mode: setup all → upgrade → verify all (env: MIGRATION_BATCH_MODE, default: false - uses individual mode)' + ) + migration_group.add_argument( + '--list-migration-tests', + action='store_true', + help='List available migration test cases and exit' + ) + args = parser.parse_args() + # Handle list migration tests + if args.list_migration_tests: + print("Available migration test cases:") + print("=" * 50) + for key, test_case in MIGRATION_TEST_CASES.items(): + print(f" {key}:") + print(f" Name: {test_case['name']}") + print(f" Description: {test_case['description']}") + print() + exit(0) + # Validate ReportPortal token if ReportPortal is enabled if args.enable_reportportal and not args.rp_token: parser.error("--rp-token (or RP_TOKEN env var) is required when --enable-reportportal is used") + # Validate migration test arguments + if args.enable_migration_test: + if not args.old_version: + parser.error("--old-version (or OLD_VERSION env var) is required when --enable-migration-test is used") + if not args.new_version: + parser.error("--new-version (or NEW_VERSION env var) is required when --enable-migration-test is used") + if not os.path.exists(args.old_version): + parser.error(f"Old version installer not found: {args.old_version}") + if not os.path.exists(args.new_version): + parser.error(f"New version installer not found: {args.new_version}") + + # Validate specific test case if provided + if args.migration_test_case and args.migration_test_case not in MIGRATION_TEST_CASES: + parser.error(f"Unknown migration test case: {args.migration_test_case}. Use --list-migration-tests to see available test cases.") + return args async def main(): @@ -324,6 +386,7 @@ async def main(): # Log configuration logger.info("=== Configuration ===") + logger.info(f"Testing Mode: {'MIGRATION (old → new version)' if args.enable_migration_test else 'BASE (current version)'}") logger.info(f"Computer server: {'STARTED' if server_thread else 'EXTERNAL'}") logger.info(f"Tests directory: {args.tests_dir}") logger.info(f"Max turns per test: {args.max_turns}") @@ -340,20 +403,12 @@ async def main(): logger.info(f"ReportPortal project: {args.rp_project}") logger.info(f"ReportPortal token: {'SET' if args.rp_token else 'NOT SET'}") logger.info(f"Launch name: {args.launch_name if args.launch_name else 'AUTO-GENERATED'}") + logger.info(f"Migration testing: {'ENABLED' if args.enable_migration_test else 'DISABLED'}") + if args.enable_migration_test: + logger.info(f"Old version installer: {args.old_version}") + logger.info(f"New version installer: {args.new_version}") logger.info("======================") - # Scan all test files - test_files = scan_test_files(args.tests_dir) - - if not test_files: - logger.warning(f"No test files found in directory: {args.tests_dir}") - return - - logger.info(f"Found {len(test_files)} test files") - - # Track test results for final exit code - test_results = {"passed": 0, "failed": 0, "total": len(test_files)} - # Initialize ReportPortal client only if enabled rp_client = None launch_id = None @@ -408,80 +463,229 @@ async def main(): await computer.run() logger.info("Computer environment ready") - # Run each test sequentially with turn monitoring - for i, test_data in enumerate(test_files, 1): - logger.info(f"Running test {i}/{len(test_files)}: {test_data['path']}") + # Check if migration testing is enabled + if args.enable_migration_test: + logger.info("=" * 60) + logger.info("MIGRATION TESTING MODE ENABLED") + logger.info("=" * 60) + logger.info(f"Old version installer: {args.old_version}") + logger.info(f"New version installer: {args.new_version}") + logger.info(f"Migration mode: {'BATCH (all setups → upgrade → all verifies)' if args.migration_batch_mode else 'INDIVIDUAL (setup → upgrade → verify per test)'}") - try: - # Pass all configs to test runner - test_result = await run_single_test_with_timeout( - computer=computer, - test_data=test_data, - rp_client=rp_client, # Can be None - launch_id=launch_id, # Can be None - max_turns=args.max_turns, - jan_app_path=args.jan_app_path, - jan_process_name=args.jan_process_name, - agent_config=agent_config, + if args.migration_test_case: + # Parse comma-separated test cases + test_cases = [case.strip() for case in args.migration_test_case.split(',')] + logger.info(f"Running specific test case(s): {', '.join(test_cases)}") + + # Validate all test cases exist + for test_case in test_cases: + if test_case not in MIGRATION_TEST_CASES: + logger.error(f"Unknown test case: {test_case}") + logger.error(f"Available test cases: {', '.join(MIGRATION_TEST_CASES.keys())}") + final_exit_code = 1 + return final_exit_code + + if args.migration_batch_mode: + # Import and run batch migration with specified test cases + from batch_migration_runner import run_batch_migration_test + + migration_results = await run_batch_migration_test( + computer=computer, + old_version_path=args.old_version, + new_version_path=args.new_version, + rp_client=rp_client, + launch_id=launch_id, + max_turns=args.max_turns, + agent_config=agent_config, + enable_reportportal=args.enable_reportportal, + test_cases=test_cases # Multiple test cases in batch mode + ) + + # Handle batch test result + if migration_results and migration_results.get("overall_success", False): + logger.info(f"[SUCCESS] Batch migration test '{', '.join(test_cases)}' completed successfully!") + final_exit_code = 0 + else: + logger.error(f"[FAILED] Batch migration test '{', '.join(test_cases)}' failed!") + if migration_results and migration_results.get("error_message"): + logger.error(f"Error: {migration_results['error_message']}") + final_exit_code = 1 + else: + # Run individual migration tests for each specified test case + all_individual_results = [] + overall_individual_success = True + + for test_case in test_cases: + logger.info(f"Running individual migration test for: {test_case}") + migration_result = await run_individual_migration_test( + computer=computer, + test_case_key=test_case, + old_version_path=args.old_version, + new_version_path=args.new_version, + rp_client=rp_client, + launch_id=launch_id, + max_turns=args.max_turns, + agent_config=agent_config, + enable_reportportal=args.enable_reportportal + ) + + all_individual_results.append(migration_result) + if not (migration_result and migration_result.get("overall_success", False)): + overall_individual_success = False + + # Handle individual test results + if overall_individual_success: + logger.info(f"[SUCCESS] All individual migration tests '{', '.join(test_cases)}' completed successfully!") + final_exit_code = 0 + else: + logger.error(f"[FAILED] One or more individual migration tests '{', '.join(test_cases)}' failed!") + for i, result in enumerate(all_individual_results): + if result and result.get("error_message"): + logger.error(f"Error in {test_cases[i]}: {result['error_message']}") + final_exit_code = 1 + else: + logger.info("Running all migration test cases") + + if args.migration_batch_mode: + # Import and run batch migration runner + from batch_migration_runner import run_batch_migration_test + + migration_results = await run_batch_migration_test( + computer=computer, + old_version_path=args.old_version, + new_version_path=args.new_version, + rp_client=rp_client, + launch_id=launch_id, + max_turns=args.max_turns, + agent_config=agent_config, + enable_reportportal=args.enable_reportportal + ) + else: + # Run all migration tests individually + migration_results = await run_all_migration_tests( + computer=computer, + old_version_path=args.old_version, + new_version_path=args.new_version, + rp_client=rp_client, + launch_id=launch_id, + max_turns=args.max_turns, + agent_config=agent_config, enable_reportportal=args.enable_reportportal ) - # Track test result - properly handle different return formats - test_passed = False - - if test_result: - # Check different possible return formats - if isinstance(test_result, dict): - # Dictionary format: check 'success' key - test_passed = test_result.get('success', False) - elif isinstance(test_result, bool): - # Boolean format: direct boolean value - test_passed = test_result - elif hasattr(test_result, 'success'): - # Object format: check success attribute - test_passed = getattr(test_result, 'success', False) - else: - # Any truthy value is considered success - test_passed = bool(test_result) + # Handle overall results + if migration_results and migration_results.get("overall_success", False): + logger.info("[SUCCESS] All migration tests completed successfully!") + final_exit_code = 0 else: - test_passed = False - - # Update counters and log result - if test_passed: - test_results["passed"] += 1 - logger.info(f"[SUCCESS] Test {i} PASSED: {test_data['path']}") - else: - test_results["failed"] += 1 - logger.error(f"[FAILED] Test {i} FAILED: {test_data['path']}") - - # Debug log for troubleshooting - logger.info(f"[INFO] Debug - Test result: type={type(test_result)}, value={test_result}, success_field={test_result.get('success', 'N/A') if isinstance(test_result, dict) else 'N/A'}, final_passed={test_passed}") - - except Exception as e: - test_results["failed"] += 1 - logger.error(f"[FAILED] Test {i} FAILED with exception: {test_data['path']} - {e}") + logger.error("[FAILED] One or more migration tests failed!") + if migration_results: + logger.error(f"Failed {migration_results.get('failed', 0)} out of {migration_results.get('total_tests', 0)} tests") + final_exit_code = 1 + + # Skip regular test execution in migration mode + logger.info("Migration testing completed. Skipping regular test execution.") - # Add delay between tests - if i < len(test_files): - logger.info(f"Waiting {args.delay_between_tests} seconds before next test...") - await asyncio.sleep(args.delay_between_tests) - - # Log final test results summary - logger.info("=" * 50) - logger.info("TEST EXECUTION SUMMARY") - logger.info("=" * 50) - logger.info(f"Total tests: {test_results['total']}") - logger.info(f"Passed: {test_results['passed']}") - logger.info(f"Failed: {test_results['failed']}") - logger.info(f"Success rate: {(test_results['passed']/test_results['total']*100):.1f}%") - logger.info("=" * 50) - - if test_results["failed"] > 0: - logger.error(f"[FAILED] Test execution completed with {test_results['failed']} failures!") - final_exit_code = 1 else: - logger.info("[SUCCESS] All tests completed successfully!") - final_exit_code = 0 + # Regular test execution mode (base/current version testing) + logger.info("Running base test execution mode (current version testing)") + + # Use base tests directory if default tests_dir is being used + base_tests_dir = args.tests_dir + if args.tests_dir == 'tests/base' and not os.path.exists(args.tests_dir): + # Fallback to old structure if base directory doesn't exist + if os.path.exists('tests'): + base_tests_dir = 'tests' + logger.warning("tests/base directory not found, using 'tests' as fallback") + + logger.info(f"Using test directory: {base_tests_dir}") + + # Scan all test files + test_files = scan_test_files(base_tests_dir) + + if not test_files: + logger.warning(f"No test files found in directory: {base_tests_dir}") + return + + logger.info(f"Found {len(test_files)} test files") + + # Track test results for final exit code + test_results = {"passed": 0, "failed": 0, "total": len(test_files)} + + # Run each test sequentially with turn monitoring + for i, test_data in enumerate(test_files, 1): + logger.info(f"Running test {i}/{len(test_files)}: {test_data['path']}") + + try: + # Pass all configs to test runner + test_result = await run_single_test_with_timeout( + computer=computer, + test_data=test_data, + rp_client=rp_client, # Can be None + launch_id=launch_id, # Can be None + max_turns=args.max_turns, + jan_app_path=args.jan_app_path, + jan_process_name=args.jan_process_name, + agent_config=agent_config, + enable_reportportal=args.enable_reportportal + ) + + # Track test result - properly handle different return formats + test_passed = False + + if test_result: + # Check different possible return formats + if isinstance(test_result, dict): + # Dictionary format: check 'success' key + test_passed = test_result.get('success', False) + elif isinstance(test_result, bool): + # Boolean format: direct boolean value + test_passed = test_result + elif hasattr(test_result, 'success'): + # Object format: check success attribute + test_passed = getattr(test_result, 'success', False) + else: + # Any truthy value is considered success + test_passed = bool(test_result) + else: + test_passed = False + + # Update counters and log result + if test_passed: + test_results["passed"] += 1 + logger.info(f"[SUCCESS] Test {i} PASSED: {test_data['path']}") + else: + test_results["failed"] += 1 + logger.error(f"[FAILED] Test {i} FAILED: {test_data['path']}") + + # Debug log for troubleshooting + logger.info(f"[INFO] Debug - Test result: type={type(test_result)}, value={test_result}, success_field={test_result.get('success', 'N/A') if isinstance(test_result, dict) else 'N/A'}, final_passed={test_passed}") + + except Exception as e: + test_results["failed"] += 1 + logger.error(f"[FAILED] Test {i} FAILED with exception: {test_data['path']} - {e}") + + # Add delay between tests + if i < len(test_files): + logger.info(f"Waiting {args.delay_between_tests} seconds before next test...") + await asyncio.sleep(args.delay_between_tests) + + # Log final test results summary + logger.info("=" * 50) + logger.info("TEST EXECUTION SUMMARY") + logger.info("=" * 50) + logger.info(f"Total tests: {test_results['total']}") + logger.info(f"Passed: {test_results['passed']}") + logger.info(f"Failed: {test_results['failed']}") + logger.info(f"Success rate: {(test_results['passed']/test_results['total']*100):.1f}%") + logger.info("=" * 50) + + if test_results["failed"] > 0: + logger.error(f"[FAILED] Test execution completed with {test_results['failed']} failures!") + final_exit_code = 1 + else: + logger.info("[SUCCESS] All tests completed successfully!") + final_exit_code = 0 except KeyboardInterrupt: logger.info("Test execution interrupted by user") diff --git a/autoqa/utils.py b/autoqa/utils.py index 611b5e4ad..4a1176720 100644 --- a/autoqa/utils.py +++ b/autoqa/utils.py @@ -286,10 +286,11 @@ def start_jan_app(jan_app_path=None): logger.error(f"Error starting Jan application: {e}") raise -def scan_test_files(tests_dir="tests"): +def scan_test_files(tests_dir="tests/base"): """ Scan tests folder and find all .txt files Returns list with format [{'path': 'relative_path', 'prompt': 'file_content'}] + Note: Default changed to tests/base for current version testing """ test_files = [] tests_path = Path(tests_dir)