From e515b94a6d4e7f5477b18a0da0a7a9b20f8f255b Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Mon, 16 Mar 2026 16:57:38 -0700 Subject: [PATCH 1/7] Update result analyzer and remaining changes Update result_analyzer with improved categorization and reporting. Signed-off-by: Yunxuan Shi --- .../compatibility/result_analyzer/analyzer.py | 107 ++++++++++-------- .../compatibility/result_analyzer/cli.py | 12 -- .../result_analyzer/report_generator.py | 47 ++++++++ .../tests/collection/__init__.py | 1 + 4 files changed, 106 insertions(+), 61 deletions(-) create mode 100644 documentdb_tests/compatibility/tests/collection/__init__.py diff --git a/documentdb_tests/compatibility/result_analyzer/analyzer.py b/documentdb_tests/compatibility/result_analyzer/analyzer.py index 3b5fe62..5f21e57 100644 --- a/documentdb_tests/compatibility/result_analyzer/analyzer.py +++ b/documentdb_tests/compatibility/result_analyzer/analyzer.py @@ -13,25 +13,7 @@ # Module-level constants -INFRA_EXCEPTIONS = { - # Python built-in connection errors - "ConnectionError", - "ConnectionRefusedError", - "ConnectionResetError", - "ConnectionAbortedError", - # Python timeout errors - "TimeoutError", - "socket.timeout", - "socket.error", - # PyMongo connection errors - "pymongo.errors.ConnectionFailure", - "pymongo.errors.ServerSelectionTimeoutError", - "pymongo.errors.NetworkTimeout", - "pymongo.errors.AutoReconnect", - "pymongo.errors.ExecutionTimeout", - # Generic network/OS errors - "OSError", -} +from documentdb_tests.framework.infra_exceptions import INFRA_EXCEPTION_NAMES as INFRA_EXCEPTIONS # Mapping from TestOutcome to counter key names @@ -79,10 +61,10 @@ def categorize_outcome(test_result: Dict[str, Any]) -> str: def extract_exception_type(crash_message: str) -> str: """ Extract exception type from pytest crash message. - + Args: crash_message: Message like "module.Exception: error details" - + Returns: Full exception type (e.g., "pymongo.errors.OperationFailure") or empty string if not found @@ -92,21 +74,45 @@ def extract_exception_type(crash_message: str) -> str: match = re.match(r'^([a-zA-Z0-9_.]+):\s', crash_message) if match: return match.group(1) - + + return "" + + +def extract_failure_tag(test_result: Dict[str, Any]) -> str: + """ + Extract failure tag (e.g. RESULT_MISMATCH) from assertion message. + + The framework assertions prefix errors with tags like: + [RESULT_MISMATCH], [UNEXPECTED_ERROR], [UNEXPECTED_SUCCESS], + [ERROR_MISMATCH], [TEST_EXCEPTION] + + Args: + test_result: Full test result dict from pytest JSON + + Returns: + Tag string without brackets, or empty string if not found + """ + call_info = test_result.get("call", {}) + crash_info = call_info.get("crash", {}) + crash_message = crash_info.get("message", "") + + match = re.search(r'\[([A-Z_]+)\]', crash_message) + if match: + return match.group(1) return "" def is_infrastructure_error(test_result: Dict[str, Any]) -> bool: """ Check if error is infrastructure-related based on exception type. - + This checks the actual exception type rather than keywords in error messages, preventing false positives from error messages that happen to contain infrastructure-related words (e.g., "host" in an assertion message). - + Args: test_result: Full test result dict from pytest JSON - + Returns: True if error is infrastructure-related, False otherwise """ @@ -114,16 +120,16 @@ def is_infrastructure_error(test_result: Dict[str, Any]) -> bool: call_info = test_result.get("call", {}) crash_info = call_info.get("crash", {}) crash_message = crash_info.get("message", "") - + if not crash_message: return False - + # Extract exception type from "module.ExceptionClass: message" format exception_type = extract_exception_type(crash_message) - + if not exception_type: return False - + # Check against module-level constant return exception_type in INFRA_EXCEPTIONS @@ -131,86 +137,86 @@ def is_infrastructure_error(test_result: Dict[str, Any]) -> bool: def load_registered_markers(pytest_ini_path: str = "pytest.ini") -> set: """ Load registered markers from pytest.ini. - + Parses the markers section to extract marker names, ensuring we only use markers that are explicitly registered in pytest configuration. - + Args: pytest_ini_path: Path to pytest.ini file (defaults to "pytest.ini") - + Returns: Set of registered marker names """ # Check if pytest.ini exists if not Path(pytest_ini_path).exists(): return set() - + registered_markers = set() - + try: with open(pytest_ini_path, 'r') as f: in_markers_section = False - + for line in f: # Check if we're entering the markers section if line.strip() == "markers =": in_markers_section = True continue - + if in_markers_section: # Marker lines are indented, config keys are not if line and not line[0].isspace(): # Non-indented line means we left the markers section break - + # Parse indented marker lines like " find: Find operation tests" match = re.match(r'^\s+([a-zA-Z0-9_]+):', line) if match: registered_markers.add(match.group(1)) - + except Exception: # If parsing fails, return empty set pass - + return registered_markers class ResultAnalyzer: """ Analyzer for pytest JSON test results. - + This class provides stateful analysis with configurable pytest.ini path, making it easier to test and use in multiple contexts. - + Args: pytest_ini_path: Path to pytest.ini file for marker configuration - + Example: analyzer = ResultAnalyzer("pytest.ini") results = analyzer.analyze_results("report.json") """ - + def __init__(self, pytest_ini_path: str = "pytest.ini"): """ Initialize the result analyzer. - + Args: pytest_ini_path: Path to pytest.ini file (default: "pytest.ini") """ self.pytest_ini_path = pytest_ini_path self._markers_cache: set = None - + def _get_registered_markers(self) -> set: """ Get registered markers (cached per instance). - + Returns: Set of registered marker names """ if self._markers_cache is None: self._markers_cache = load_registered_markers(self.pytest_ini_path) return self._markers_cache - + def extract_markers(self, test_result: Dict[str, Any]) -> List[str]: """ Extract pytest markers (tags) from a test result. @@ -331,11 +337,14 @@ def analyze_results(self, json_report_path: str) -> Dict[str, Any]: "tags": tags, } - # Add error information and infra error flag for failed tests + # Add error information for failed tests if test_outcome == TestOutcome.FAIL: call_info = test.get("call", {}) test_detail["error"] = call_info.get("longrepr", "") - test_detail["is_infra_error"] = is_infrastructure_error(test) + if is_infrastructure_error(test): + test_detail["failure_type"] = "INFRA_ERROR" + else: + test_detail["failure_type"] = extract_failure_tag(test) or "UNKNOWN" tests_details.append(test_detail) diff --git a/documentdb_tests/compatibility/result_analyzer/cli.py b/documentdb_tests/compatibility/result_analyzer/cli.py index f166715..ba0c747 100644 --- a/documentdb_tests/compatibility/result_analyzer/cli.py +++ b/documentdb_tests/compatibility/result_analyzer/cli.py @@ -95,18 +95,6 @@ def main(): if not args.quiet: print(f"\nReport saved to: {args.output}") - # If no output file and quiet mode, print to stdout - elif not args.quiet: - print("\nResults by Tag:") - print("-" * 60) - for tag, stats in sorted( - analysis["by_tag"].items(), key=lambda x: x[1]["pass_rate"], reverse=True - ): - passed = stats["passed"] - total = stats["total"] - rate = stats["pass_rate"] - print(f"{tag:30s} | {passed:3d}/{total:3d} passed ({rate:5.1f}%)") - # Return exit code based on test results if analysis["summary"]["failed"] > 0: return 1 diff --git a/documentdb_tests/compatibility/result_analyzer/report_generator.py b/documentdb_tests/compatibility/result_analyzer/report_generator.py index 37ff172..1d20f73 100644 --- a/documentdb_tests/compatibility/result_analyzer/report_generator.py +++ b/documentdb_tests/compatibility/result_analyzer/report_generator.py @@ -99,12 +99,23 @@ def generate_text_report(analysis: Dict[str, Any], output_path: str): lines.append("-" * 80) for test in failed_tests: lines.append(f"\n{test['name']}") + failure_type = test.get("failure_type", "UNKNOWN") + lines.append(f" Type: {failure_type}") lines.append(f" Tags: {', '.join(test['tags'])}") lines.append(f" Duration: {test['duration']:.2f}s") if "error" in test: error_preview = test["error"][:200] lines.append(f" Error: {error_preview}...") + # Skipped tests + skipped_tests = [t for t in analysis["tests"] if t["outcome"] == "SKIPPED"] + if skipped_tests: + lines.append("") + lines.append("SKIPPED TESTS") + lines.append("-" * 80) + for test in skipped_tests: + lines.append(f" {test['name']}") + lines.append("") lines.append("=" * 80) @@ -128,4 +139,40 @@ def print_summary(analysis: Dict[str, Any]): print(f"Passed: {summary['passed']} ({summary['pass_rate']}%)") print(f"Failed: {summary['failed']}") print(f"Skipped: {summary['skipped']}") + print("=" * 60) + + # By tag + by_tag = analysis.get("by_tag", {}) + if by_tag: + print("\nResults by Tag:") + print("-" * 60) + sorted_tags = sorted(by_tag.items(), key=lambda x: x[1]["pass_rate"]) + for tag, stats in sorted_tags: + print(f" {tag:<30s} | {stats['passed']:>3}/{stats['total']:>3} passed ({stats['pass_rate']:>5.1f}%)") + + # Failed tests + failed_tests = [t for t in analysis["tests"] if t["outcome"] == "FAIL"] + if failed_tests: + # Count by failure_type + from collections import Counter + type_counts = Counter(t.get("failure_type", "UNKNOWN") for t in failed_tests) + + print(f"\nFailed Tests ({len(failed_tests)}):") + print("-" * 60) + for ft, count in sorted(type_counts.items()): + print(f"\n {ft} ({count}):") + for test in failed_tests: + if test.get("failure_type", "UNKNOWN") == ft: + name = test["name"].split("::")[-1] + print(f" {name}") + + # Skipped tests + skipped_tests = [t for t in analysis["tests"] if t["outcome"] == "SKIPPED"] + if skipped_tests: + print(f"\nSkipped Tests ({len(skipped_tests)}):") + print("-" * 60) + for test in skipped_tests: + name = test["name"].split("::")[-1] + print(f" {name}") + print("=" * 60 + "\n") diff --git a/documentdb_tests/compatibility/tests/collection/__init__.py b/documentdb_tests/compatibility/tests/collection/__init__.py new file mode 100644 index 0000000..b73cf50 --- /dev/null +++ b/documentdb_tests/compatibility/tests/collection/__init__.py @@ -0,0 +1 @@ +"""Collection management tests.""" From c0e0bed184b8643c5dcc9e01af343c650ecd7770 Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Thu, 2 Apr 2026 16:08:38 -0700 Subject: [PATCH 2/7] Fix lint: line too long in report_generator.py, formatting in analyzer.py Signed-off-by: Yunxuan Shi --- documentdb_tests/compatibility/result_analyzer/analyzer.py | 3 +-- .../compatibility/result_analyzer/report_generator.py | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/documentdb_tests/compatibility/result_analyzer/analyzer.py b/documentdb_tests/compatibility/result_analyzer/analyzer.py index bfb2521..5bb9569 100644 --- a/documentdb_tests/compatibility/result_analyzer/analyzer.py +++ b/documentdb_tests/compatibility/result_analyzer/analyzer.py @@ -14,7 +14,6 @@ # Module-level constants from documentdb_tests.framework.infra_exceptions import INFRA_EXCEPTION_NAMES as INFRA_EXCEPTIONS - # Mapping from TestOutcome to counter key names OUTCOME_TO_KEY = { "PASS": "passed", @@ -95,7 +94,7 @@ def extract_failure_tag(test_result: Dict[str, Any]) -> str: crash_info = call_info.get("crash", {}) crash_message = crash_info.get("message", "") - match = re.search(r'\[([A-Z_]+)\]', crash_message) + match = re.search(r"\[([A-Z_]+)\]", crash_message) if match: return match.group(1) return "" diff --git a/documentdb_tests/compatibility/result_analyzer/report_generator.py b/documentdb_tests/compatibility/result_analyzer/report_generator.py index 1d20f73..7b1387c 100644 --- a/documentdb_tests/compatibility/result_analyzer/report_generator.py +++ b/documentdb_tests/compatibility/result_analyzer/report_generator.py @@ -148,13 +148,17 @@ def print_summary(analysis: Dict[str, Any]): print("-" * 60) sorted_tags = sorted(by_tag.items(), key=lambda x: x[1]["pass_rate"]) for tag, stats in sorted_tags: - print(f" {tag:<30s} | {stats['passed']:>3}/{stats['total']:>3} passed ({stats['pass_rate']:>5.1f}%)") + passed = stats["passed"] + total = stats["total"] + rate = stats["pass_rate"] + print(f" {tag:<30s} | {passed:>3}/{total:>3} passed ({rate:>5.1f}%)") # Failed tests failed_tests = [t for t in analysis["tests"] if t["outcome"] == "FAIL"] if failed_tests: # Count by failure_type from collections import Counter + type_counts = Counter(t.get("failure_type", "UNKNOWN") for t in failed_tests) print(f"\nFailed Tests ({len(failed_tests)}):") From 7b7850a935a5b87cab6342f271f8787842a39ef3 Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Thu, 2 Apr 2026 16:10:38 -0700 Subject: [PATCH 3/7] Move Counter import to top-level per PEP 8 Signed-off-by: Yunxuan Shi --- .../compatibility/result_analyzer/report_generator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/documentdb_tests/compatibility/result_analyzer/report_generator.py b/documentdb_tests/compatibility/result_analyzer/report_generator.py index 7b1387c..38430e0 100644 --- a/documentdb_tests/compatibility/result_analyzer/report_generator.py +++ b/documentdb_tests/compatibility/result_analyzer/report_generator.py @@ -6,6 +6,7 @@ """ import json +from collections import Counter from datetime import datetime, timezone from typing import Any, Dict @@ -157,8 +158,6 @@ def print_summary(analysis: Dict[str, Any]): failed_tests = [t for t in analysis["tests"] if t["outcome"] == "FAIL"] if failed_tests: # Count by failure_type - from collections import Counter - type_counts = Counter(t.get("failure_type", "UNKNOWN") for t in failed_tests) print(f"\nFailed Tests ({len(failed_tests)}):") From 35af610cd05f5fd848ca9d24c14e47fd8a8449c1 Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Thu, 2 Apr 2026 16:19:04 -0700 Subject: [PATCH 4/7] Fix pytest.ini discovery: resolve relative to package instead of CWD Signed-off-by: Yunxuan Shi --- documentdb_tests/compatibility/result_analyzer/analyzer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/documentdb_tests/compatibility/result_analyzer/analyzer.py b/documentdb_tests/compatibility/result_analyzer/analyzer.py index 5bb9569..39e3add 100644 --- a/documentdb_tests/compatibility/result_analyzer/analyzer.py +++ b/documentdb_tests/compatibility/result_analyzer/analyzer.py @@ -194,12 +194,14 @@ class ResultAnalyzer: results = analyzer.analyze_results("report.json") """ - def __init__(self, pytest_ini_path: str = "pytest.ini"): + _DEFAULT_PYTEST_INI = str(Path(__file__).resolve().parent.parent.parent / "pytest.ini") + + def __init__(self, pytest_ini_path: str = _DEFAULT_PYTEST_INI): """ Initialize the result analyzer. Args: - pytest_ini_path: Path to pytest.ini file (default: "pytest.ini") + pytest_ini_path: Path to pytest.ini file (default: documentdb_tests/pytest.ini) """ self.pytest_ini_path = pytest_ini_path self._markers_cache: set = None From 71f07e1e1b3af250a0d544cffd51c490f65e1cc2 Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Thu, 2 Apr 2026 16:26:03 -0700 Subject: [PATCH 5/7] Merge generate_text_report and print_summary into single function Signed-off-by: Yunxuan Shi --- .../result_analyzer/report_generator.py | 116 +++++++++--------- 1 file changed, 56 insertions(+), 60 deletions(-) diff --git a/documentdb_tests/compatibility/result_analyzer/report_generator.py b/documentdb_tests/compatibility/result_analyzer/report_generator.py index 38430e0..c7d59d6 100644 --- a/documentdb_tests/compatibility/result_analyzer/report_generator.py +++ b/documentdb_tests/compatibility/result_analyzer/report_generator.py @@ -6,7 +6,6 @@ """ import json -from collections import Counter from datetime import datetime, timezone from typing import Any, Dict @@ -47,9 +46,39 @@ def generate_json_report(analysis: Dict[str, Any], output_path: str): json.dump(report, f, indent=2) +def _format_by_tag(analysis: Dict[str, Any]) -> list: + """Format by-tag results as lines. Shared by both report functions.""" + lines = [] + by_tag = analysis.get("by_tag", {}) + if by_tag: + sorted_tags = sorted(by_tag.items(), key=lambda x: x[1]["pass_rate"]) + for tag, stats in sorted_tags: + lines.append( + { + "tag": tag, + "passed": stats["passed"], + "total": stats["total"], + "failed": stats["failed"], + "skipped": stats["skipped"], + "pass_rate": stats["pass_rate"], + } + ) + return lines + + +def _categorize_failures(analysis: Dict[str, Any]) -> Dict[str, list]: + """Group failed tests by failure_type. Shared by both report functions.""" + failed_tests = [t for t in analysis["tests"] if t["outcome"] == "FAIL"] + grouped = {} + for test in failed_tests: + ft = test.get("failure_type", "UNKNOWN") + grouped.setdefault(ft, []).append(test) + return grouped + + def generate_text_report(analysis: Dict[str, Any], output_path: str): """ - Generate a human-readable text report. + Generate a detailed human-readable text report to file. Args: analysis: Analysis results from analyze_results() @@ -77,36 +106,31 @@ def generate_text_report(analysis: Dict[str, Any], output_path: str): # Results by tag lines.append("RESULTS BY TAG") lines.append("-" * 80) - - if analysis["by_tag"]: - # Sort tags by pass rate (ascending) to highlight problematic areas - sorted_tags = sorted(analysis["by_tag"].items(), key=lambda x: x[1]["pass_rate"]) - - for tag, stats in sorted_tags: - lines.append(f"\n{tag}:") - lines.append(f" Total: {stats['total']}") - lines.append(f" Passed: {stats['passed']} ({stats['pass_rate']}%)") - lines.append(f" Failed: {stats['failed']}") - lines.append(f" Skipped: {stats['skipped']}") + tag_data = _format_by_tag(analysis) + if tag_data: + for t in tag_data: + lines.append(f"\n{t['tag']}:") + lines.append(f" Total: {t['total']}") + lines.append(f" Passed: {t['passed']} ({t['pass_rate']}%)") + lines.append(f" Failed: {t['failed']}") + lines.append(f" Skipped: {t['skipped']}") else: lines.append("No tags found in test results.") - lines.append("") # Failed tests details - failed_tests = [t for t in analysis["tests"] if t["outcome"] == "FAIL"] - if failed_tests: + grouped = _categorize_failures(analysis) + if grouped: lines.append("FAILED TESTS") lines.append("-" * 80) - for test in failed_tests: - lines.append(f"\n{test['name']}") - failure_type = test.get("failure_type", "UNKNOWN") - lines.append(f" Type: {failure_type}") - lines.append(f" Tags: {', '.join(test['tags'])}") - lines.append(f" Duration: {test['duration']:.2f}s") - if "error" in test: - error_preview = test["error"][:200] - lines.append(f" Error: {error_preview}...") + for ft in sorted(grouped): + lines.append(f"\n {ft} ({len(grouped[ft])}):") + for test in grouped[ft]: + lines.append(f"\n {test['name']}") + lines.append(f" Tags: {', '.join(test['tags'])}") + lines.append(f" Duration: {test['duration']:.2f}s") + if "error" in test: + lines.append(f" Error: {test['error'][:200]}...") # Skipped tests skipped_tests = [t for t in analysis["tests"] if t["outcome"] == "SKIPPED"] @@ -120,7 +144,6 @@ def generate_text_report(analysis: Dict[str, Any], output_path: str): lines.append("") lines.append("=" * 80) - # Write report with open(output_path, "w") as f: f.write("\n".join(lines)) @@ -142,40 +165,13 @@ def print_summary(analysis: Dict[str, Any]): print(f"Skipped: {summary['skipped']}") print("=" * 60) - # By tag - by_tag = analysis.get("by_tag", {}) - if by_tag: - print("\nResults by Tag:") - print("-" * 60) - sorted_tags = sorted(by_tag.items(), key=lambda x: x[1]["pass_rate"]) - for tag, stats in sorted_tags: - passed = stats["passed"] - total = stats["total"] - rate = stats["pass_rate"] - print(f" {tag:<30s} | {passed:>3}/{total:>3} passed ({rate:>5.1f}%)") - - # Failed tests - failed_tests = [t for t in analysis["tests"] if t["outcome"] == "FAIL"] - if failed_tests: - # Count by failure_type - type_counts = Counter(t.get("failure_type", "UNKNOWN") for t in failed_tests) - - print(f"\nFailed Tests ({len(failed_tests)}):") + # Failed test counts by type + grouped = _categorize_failures(analysis) + if grouped: + total = sum(len(v) for v in grouped.values()) + print(f"\nFailed Tests ({total}):") print("-" * 60) - for ft, count in sorted(type_counts.items()): - print(f"\n {ft} ({count}):") - for test in failed_tests: - if test.get("failure_type", "UNKNOWN") == ft: - name = test["name"].split("::")[-1] - print(f" {name}") - - # Skipped tests - skipped_tests = [t for t in analysis["tests"] if t["outcome"] == "SKIPPED"] - if skipped_tests: - print(f"\nSkipped Tests ({len(skipped_tests)}):") - print("-" * 60) - for test in skipped_tests: - name = test["name"].split("::")[-1] - print(f" {name}") + for ft in sorted(grouped): + print(f" {ft}: {len(grouped[ft])}") print("=" * 60 + "\n") From d7fef3c2780d6069e367b92900be6933b1162468 Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Thu, 2 Apr 2026 16:51:28 -0700 Subject: [PATCH 6/7] Add unit tests for failure extraction and categorization Signed-off-by: Yunxuan Shi --- .../result_analyzer/test_analyzer.py | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 documentdb_tests/compatibility/result_analyzer/test_analyzer.py diff --git a/documentdb_tests/compatibility/result_analyzer/test_analyzer.py b/documentdb_tests/compatibility/result_analyzer/test_analyzer.py new file mode 100644 index 0000000..b65aa17 --- /dev/null +++ b/documentdb_tests/compatibility/result_analyzer/test_analyzer.py @@ -0,0 +1,104 @@ +"""Tests for failure extraction and categorization in the analyzer.""" + +from documentdb_tests.compatibility.result_analyzer.analyzer import ( + extract_exception_type, + extract_failure_tag, + is_infrastructure_error, +) + + +def _make_test_result(crash_message: str) -> dict: + """Helper to build a minimal test result dict with a crash message.""" + return {"call": {"crash": {"message": crash_message}}} + + +# --- extract_failure_tag --- + + +class TestExtractFailureTag: + def test_result_mismatch(self): + result = _make_test_result("[RESULT_MISMATCH] Expected [1,2,3] but got [1,2]") + assert extract_failure_tag(result) == "RESULT_MISMATCH" + + def test_unexpected_error(self): + result = _make_test_result("[UNEXPECTED_ERROR] Expected success but got exception") + assert extract_failure_tag(result) == "UNEXPECTED_ERROR" + + def test_error_mismatch(self): + result = _make_test_result("[ERROR_MISMATCH] Expected code 11000 but got 26") + assert extract_failure_tag(result) == "ERROR_MISMATCH" + + def test_unexpected_success(self): + result = _make_test_result("[UNEXPECTED_SUCCESS] Expected error but got result") + assert extract_failure_tag(result) == "UNEXPECTED_SUCCESS" + + def test_test_exception(self): + result = _make_test_result("[TEST_EXCEPTION] Bad test setup") + assert extract_failure_tag(result) == "TEST_EXCEPTION" + + def test_no_tag(self): + result = _make_test_result("AssertionError: values differ") + assert extract_failure_tag(result) == "" + + def test_empty_message(self): + result = _make_test_result("") + assert extract_failure_tag(result) == "" + + def test_missing_call(self): + assert extract_failure_tag({}) == "" + + +# --- extract_exception_type --- + + +class TestExtractExceptionType: + def test_simple_exception(self): + assert extract_exception_type("ConnectionError: refused") == "ConnectionError" + + def test_dotted_exception(self): + assert ( + extract_exception_type("pymongo.errors.OperationFailure: code 11000") + == "pymongo.errors.OperationFailure" + ) + + def test_no_colon(self): + assert extract_exception_type("just a message") == "" + + def test_empty(self): + assert extract_exception_type("") == "" + + +# --- is_infrastructure_error --- + + +class TestIsInfrastructureError: + def test_connection_error(self): + result = _make_test_result("ConnectionError: Cannot connect") + assert is_infrastructure_error(result) is True + + def test_timeout_error(self): + result = _make_test_result("TimeoutError: timed out") + assert is_infrastructure_error(result) is True + + def test_pymongo_connection_failure(self): + result = _make_test_result("pymongo.errors.ConnectionFailure: connection lost") + assert is_infrastructure_error(result) is True + + def test_pymongo_server_selection(self): + result = _make_test_result("pymongo.errors.ServerSelectionTimeoutError: no servers") + assert is_infrastructure_error(result) is True + + def test_assertion_error_not_infra(self): + result = _make_test_result("AssertionError: [RESULT_MISMATCH] wrong value") + assert is_infrastructure_error(result) is False + + def test_operation_failure_not_infra(self): + result = _make_test_result("pymongo.errors.OperationFailure: code 11000") + assert is_infrastructure_error(result) is False + + def test_empty_message(self): + result = _make_test_result("") + assert is_infrastructure_error(result) is False + + def test_missing_call(self): + assert is_infrastructure_error({}) is False From d76bd80fb4cbb9c6489ee898fc34cbf9fd8237bb Mon Sep 17 00:00:00 2001 From: Yunxuan Shi Date: Thu, 2 Apr 2026 16:56:29 -0700 Subject: [PATCH 7/7] Add unit tests job to PR checks Signed-off-by: Yunxuan Shi --- .github/workflows/lint.yml | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 783de06..dd0bfad 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -1,4 +1,4 @@ -name: Lint +name: Lint & Unit Tests on: pull_request: @@ -27,3 +27,20 @@ jobs: - name: Check import sorting run: isort --check-only . + + unit-tests: + name: Unit Tests + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Run unit tests + run: pytest documentdb_tests/compatibility/result_analyzer/test_analyzer.py -v