From 0fca102d6410bab50bf3e0a7809f655817ac814c Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Tue, 4 Nov 2025 19:25:39 +0000 Subject: [PATCH 01/12] chore: update HISTORY.md for main --- HISTORY.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index abf5dd1..0f3a9e8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [1.2.0](https://github.com/cortexapps/cli/releases/tag/1.2.0) - 2025-11-04 + +[Compare with 1.1.0](https://github.com/cortexapps/cli/compare/1.1.0...1.2.0) + +### Fixed + +- fix: handle 409 Already evaluating in trigger-evaluation test ([6715ea8](https://github.com/cortexapps/cli/commit/6715ea8ace42e5e137b649daf927bf2bec225b5e) by Jeff Schnitter). +- fix: remove entity_relationships imports from wrong branch ([3d467f6](https://github.com/cortexapps/cli/commit/3d467f699a0d4883316e039fcca571bde03d7f0a) by Jeff Schnitter). +- fix: ensure base_url defaults correctly when not set ([cadf62e](https://github.com/cortexapps/cli/commit/cadf62e79c96fb6e89046d399d9247680e8057da) by Jeff Schnitter). + ## [1.1.0](https://github.com/cortexapps/cli/releases/tag/1.1.0) - 2025-11-04 [Compare with 1.0.6](https://github.com/cortexapps/cli/compare/1.0.6...1.1.0) From 8a3b4d5308191c4d28ab78c4d8fab762a2713e95 Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 13:27:48 -0800 Subject: [PATCH 02/12] feat: improve backup import/export performance with parallel processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implemented parallel API calls using ThreadPoolExecutor for backup export and import operations, significantly improving performance. Changes: - Added ThreadPoolExecutor with max_workers=10 for concurrent API calls - Updated _export_plugins(), _export_scorecards(), _export_workflows() to fetch items in parallel - Updated _import_catalog(), _import_plugins(), _import_scorecards(), _import_workflows() to import files in parallel - Enhanced error handling to report failures without stopping entire operation - Maintained file ordering where applicable Performance improvements: - Export operations now run with concurrent API calls - Import operations process multiple files simultaneously - All existing tests pass (218 passed, 1 skipped) Fixes #154 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cortexapps_cli/commands/backup.py | 161 ++++++++++++++++++++++++------ 1 file changed, 131 insertions(+), 30 deletions(-) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index fa31212..db80f03 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -9,6 +9,7 @@ from rich.console import Console from enum import Enum import yaml +from concurrent.futures import ThreadPoolExecutor, as_completed import cortexapps_cli.commands.scorecards as scorecards import cortexapps_cli.commands.catalog as catalog @@ -108,9 +109,22 @@ def _export_plugins(ctx, directory): list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None) tags = [plugin["tag"] for plugin in list["plugins"]] tags_sorted = sorted(tags) - for tag in tags_sorted: - content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False) - _file_name(directory, tag, content, "json") + + def fetch_plugin(tag): + try: + content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False) + return (tag, content, None) + except Exception as e: + return (tag, None, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} + for future in as_completed(futures): + tag, content, error = future.result() + if error: + print(f"Failed to export plugin {tag}: {error}") + else: + _file_name(directory, tag, content, "json") def _export_scorecards(ctx, directory): directory = _directory_name(directory, "scorecards") @@ -118,9 +132,22 @@ def _export_scorecards(ctx, directory): list = scorecards.list(ctx, show_drafts=True, page=None, page_size=None, _print=False) tags = [scorecard["tag"] for scorecard in list["scorecards"]] tags_sorted = sorted(tags) - for tag in tags_sorted: - content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False) - _file_name(directory, tag, content, "yaml") + + def fetch_scorecard(tag): + try: + content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False) + return (tag, content, None) + except Exception as e: + return (tag, None, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(fetch_scorecard, tag): tag for tag in tags_sorted} + for future in as_completed(futures): + tag, content, error = future.result() + if error: + print(f"Failed to export scorecard {tag}: {error}") + else: + _file_name(directory, tag, content, "yaml") def _export_workflows(ctx, directory): directory = _directory_name(directory, "workflows") @@ -128,12 +155,22 @@ def _export_workflows(ctx, directory): list = workflows.list(ctx, _print=False, include_actions="false", page=None, page_size=None, search_query=None) tags = [workflow["tag"] for workflow in list["workflows"]] tags_sorted = sorted(tags) - for tag in tags_sorted: + + def fetch_workflow(tag): try: content = workflows.get(ctx, tag=tag, yaml="true", _print=False) - _file_name(directory, tag, content, "yaml") - except: - print("failed for " + tag) + return (tag, content, None) + except Exception as e: + return (tag, None, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(fetch_workflow, tag): tag for tag in tags_sorted} + for future in as_completed(futures): + tag, content, error = future.result() + if error: + print(f"Failed to export workflow {tag}: {error}") + else: + _file_name(directory, tag, content, "yaml") backupTypes = { "catalog", @@ -257,38 +294,102 @@ def _import_entity_types(ctx, force, directory): def _import_catalog(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - catalog.create(ctx, file_input=open(file_path), _print=False) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_catalog_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + catalog.create(ctx, file_input=f, _print=False) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(import_catalog_file, file_info): file_info[0] for file_info in files} + for future in as_completed(futures): + filename, error = future.result() + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_plugins(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - plugins.create(ctx, file_input=open(file_path), force=True) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_plugin_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + plugins.create(ctx, file_input=f, force=True) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(import_plugin_file, file_info): file_info[0] for file_info in files} + for future in as_completed(futures): + filename, error = future.result() + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_scorecards(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - scorecards.create(ctx, file_input=open(file_path), dry_run=False) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_scorecard_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + scorecards.create(ctx, file_input=f, dry_run=False) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(import_scorecard_file, file_info): file_info[0] for file_info in files} + for future in as_completed(futures): + filename, error = future.result() + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_workflows(ctx, directory): if os.path.isdir(directory): print("Processing: " + directory) - for filename in sorted(os.listdir(directory)): - file_path = os.path.join(directory, filename) - if os.path.isfile(file_path): - print(" Importing: " + filename) - workflows.create(ctx, file_input=open(file_path)) + files = [(filename, os.path.join(directory, filename)) + for filename in sorted(os.listdir(directory)) + if os.path.isfile(os.path.join(directory, filename))] + + def import_workflow_file(file_info): + filename, file_path = file_info + try: + with open(file_path) as f: + workflows.create(ctx, file_input=f) + return (filename, None) + except Exception as e: + return (filename, str(e)) + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = {executor.submit(import_workflow_file, file_info): file_info[0] for file_info in files} + for future in as_completed(futures): + filename, error = future.result() + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") @app.command("import") def import_tenant( From 743579d760e900da693696df2841e7b710b08d39 Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 13:32:27 -0800 Subject: [PATCH 03/12] fix: ensure CORTEX_BASE_URL is available in publish workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Added CORTEX_BASE_URL from GitHub Actions variables to the publish workflow, following the same pattern as test-pr.yml. Changes: - Added CORTEX_BASE_URL to top-level env section - Added env sections to pypi-deploy-event job to pass CORTEX_API_KEY and CORTEX_BASE_URL to container - Added env sections to docker-deploy-event job to pass CORTEX_API_KEY and CORTEX_BASE_URL to container - Added env sections to homebrew-custom-event job to pass CORTEX_API_KEY and CORTEX_BASE_URL to container This fixes the 401 Unauthorized and base_url errors when posting deploy events to Cortex during the publish workflow. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .github/workflows/publish.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 13ff137..5d88da0 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -10,6 +10,7 @@ on: env: CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} DOCKER_USERNAME: jeffschnittercortex DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKER_ORGANIZATION: cortexapp @@ -95,6 +96,9 @@ jobs: runs-on: ubuntu-latest container: image: cortexapp/cli:latest + env: + CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} steps: - name: Post pypi deploy event to Cortex @@ -164,6 +168,9 @@ jobs: runs-on: ubuntu-latest container: image: cortexapp/cli:latest + env: + CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} steps: - name: Post docker deploy event to Cortex @@ -211,6 +218,9 @@ jobs: runs-on: ubuntu-latest container: image: cortexapp/cli:latest + env: + CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }} + CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }} steps: - name: Post homebrew deploy event to Cortex From 3bdd45ab07a0aabc8c045d7cde63e6d9908c6e8a Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 14:14:57 -0800 Subject: [PATCH 04/12] perf: optimize backup export with increased parallelism and reduced API calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Further performance optimizations for backup export: 1. Increased ThreadPoolExecutor workers from 10 to 30 - Network I/O bound operations can handle more parallelism - Should provide 2-3x improvement for plugins and scorecards 2. Eliminated N individual API calls for workflows export - Changed workflows.list() to use include_actions="true" - Single API call now returns all workflow data with actions - Convert JSON to YAML format directly without individual get() calls - This eliminates N network round-trips for N workflows Expected performance improvements: - Workflows: Near-instant (1 API call vs N calls) - Plugins/Scorecards: 2-3x faster with 30 workers vs 10 Previous timing: 2m19s (with 10 workers) Original timing: 3m29s 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cortexapps_cli/commands/backup.py | 38 +++++++++++++------------------ 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index db80f03..7c6e82f 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -117,7 +117,7 @@ def fetch_plugin(tag): except Exception as e: return (tag, None, str(e)) - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} for future in as_completed(futures): tag, content, error = future.result() @@ -140,7 +140,7 @@ def fetch_scorecard(tag): except Exception as e: return (tag, None, str(e)) - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_scorecard, tag): tag for tag in tags_sorted} for future in as_completed(futures): tag, content, error = future.result() @@ -152,25 +152,19 @@ def fetch_scorecard(tag): def _export_workflows(ctx, directory): directory = _directory_name(directory, "workflows") - list = workflows.list(ctx, _print=False, include_actions="false", page=None, page_size=None, search_query=None) - tags = [workflow["tag"] for workflow in list["workflows"]] - tags_sorted = sorted(tags) + # Get all workflows with actions in one API call + list = workflows.list(ctx, _print=False, include_actions="true", page=None, page_size=None, search_query=None) + workflows_data = sorted(list["workflows"], key=lambda x: x["tag"]) - def fetch_workflow(tag): + # Convert JSON workflows to YAML and write them + for workflow in workflows_data: + tag = workflow["tag"] try: - content = workflows.get(ctx, tag=tag, yaml="true", _print=False) - return (tag, content, None) + # Convert the JSON workflow data to YAML format + workflow_yaml = yaml.dump(workflow, default_flow_style=False, sort_keys=False) + _file_name(directory, tag, workflow_yaml, "yaml") except Exception as e: - return (tag, None, str(e)) - - with ThreadPoolExecutor(max_workers=10) as executor: - futures = {executor.submit(fetch_workflow, tag): tag for tag in tags_sorted} - for future in as_completed(futures): - tag, content, error = future.result() - if error: - print(f"Failed to export workflow {tag}: {error}") - else: - _file_name(directory, tag, content, "yaml") + print(f"Failed to export workflow {tag}: {e}") backupTypes = { "catalog", @@ -307,7 +301,7 @@ def import_catalog_file(file_info): except Exception as e: return (filename, str(e)) - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_catalog_file, file_info): file_info[0] for file_info in files} for future in as_completed(futures): filename, error = future.result() @@ -332,7 +326,7 @@ def import_plugin_file(file_info): except Exception as e: return (filename, str(e)) - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_plugin_file, file_info): file_info[0] for file_info in files} for future in as_completed(futures): filename, error = future.result() @@ -357,7 +351,7 @@ def import_scorecard_file(file_info): except Exception as e: return (filename, str(e)) - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_scorecard_file, file_info): file_info[0] for file_info in files} for future in as_completed(futures): filename, error = future.result() @@ -382,7 +376,7 @@ def import_workflow_file(file_info): except Exception as e: return (filename, str(e)) - with ThreadPoolExecutor(max_workers=10) as executor: + with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_workflow_file, file_info): file_info[0] for file_info in files} for future in as_completed(futures): filename, error = future.result() From 9055f78cc4e1136da20e4e42883ff3c0f248825b Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 14:17:23 -0800 Subject: [PATCH 05/12] fix: ensure export/import output is in alphabetical order MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Modified all parallel export and import functions to collect results first, then sort alphabetically before writing/printing. This makes debugging failed exports much easier while maintaining parallel execution performance. Changes: - Export functions (plugins, scorecards) now collect all results, sort by tag, then write files in alphabetical order - Import functions (catalog, plugins, scorecards, workflows) now collect all results, sort by filename, then print in alphabetical order - Maintains parallel execution speed - only the output order is affected Example output now shows consistent alphabetical ordering: --> about-learn-cortex --> bogus-plugin --> developer-relations-plugin --> google-plugin --> map-test --> my-cortex-plugin ... 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cortexapps_cli/commands/backup.py | 90 ++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 30 deletions(-) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index 7c6e82f..80c2c1a 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -117,14 +117,19 @@ def fetch_plugin(tag): except Exception as e: return (tag, None, str(e)) + # Fetch all plugins in parallel with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} + results = [] for future in as_completed(futures): - tag, content, error = future.result() - if error: - print(f"Failed to export plugin {tag}: {error}") - else: - _file_name(directory, tag, content, "json") + results.append(future.result()) + + # Sort results alphabetically and write in order + for tag, content, error in sorted(results, key=lambda x: x[0]): + if error: + print(f"Failed to export plugin {tag}: {error}") + else: + _file_name(directory, tag, content, "json") def _export_scorecards(ctx, directory): directory = _directory_name(directory, "scorecards") @@ -140,14 +145,19 @@ def fetch_scorecard(tag): except Exception as e: return (tag, None, str(e)) + # Fetch all scorecards in parallel with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_scorecard, tag): tag for tag in tags_sorted} + results = [] for future in as_completed(futures): - tag, content, error = future.result() - if error: - print(f"Failed to export scorecard {tag}: {error}") - else: - _file_name(directory, tag, content, "yaml") + results.append(future.result()) + + # Sort results alphabetically and write in order + for tag, content, error in sorted(results, key=lambda x: x[0]): + if error: + print(f"Failed to export scorecard {tag}: {error}") + else: + _file_name(directory, tag, content, "yaml") def _export_workflows(ctx, directory): directory = _directory_name(directory, "workflows") @@ -301,14 +311,19 @@ def import_catalog_file(file_info): except Exception as e: return (filename, str(e)) + # Import all files in parallel with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_catalog_file, file_info): file_info[0] for file_info in files} + results = [] for future in as_completed(futures): - filename, error = future.result() - if error: - print(f" Failed to import {filename}: {error}") - else: - print(f" Importing: {filename}") + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_plugins(ctx, directory): if os.path.isdir(directory): @@ -326,14 +341,19 @@ def import_plugin_file(file_info): except Exception as e: return (filename, str(e)) + # Import all files in parallel with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_plugin_file, file_info): file_info[0] for file_info in files} + results = [] for future in as_completed(futures): - filename, error = future.result() - if error: - print(f" Failed to import {filename}: {error}") - else: - print(f" Importing: {filename}") + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_scorecards(ctx, directory): if os.path.isdir(directory): @@ -351,14 +371,19 @@ def import_scorecard_file(file_info): except Exception as e: return (filename, str(e)) + # Import all files in parallel with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_scorecard_file, file_info): file_info[0] for file_info in files} + results = [] for future in as_completed(futures): - filename, error = future.result() - if error: - print(f" Failed to import {filename}: {error}") - else: - print(f" Importing: {filename}") + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") def _import_workflows(ctx, directory): if os.path.isdir(directory): @@ -376,14 +401,19 @@ def import_workflow_file(file_info): except Exception as e: return (filename, str(e)) + # Import all files in parallel with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(import_workflow_file, file_info): file_info[0] for file_info in files} + results = [] for future in as_completed(futures): - filename, error = future.result() - if error: - print(f" Failed to import {filename}: {error}") - else: - print(f" Importing: {filename}") + results.append(future.result()) + + # Print results in alphabetical order + for filename, error in sorted(results, key=lambda x: x[0]): + if error: + print(f" Failed to import {filename}: {error}") + else: + print(f" Importing: {filename}") @app.command("import") def import_tenant( From 6117eb3c2a8b3a9ced439a5953a84d06099b1c1e Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 14:19:02 -0800 Subject: [PATCH 06/12] perf: add HTTP connection pooling to CortexClient for massive speedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is the critical fix for slow parallel export/import performance. Problem: - CortexClient was calling requests.request() directly without a session - Each API call created a new TCP connection (DNS lookup, TCP handshake, TLS) - Even with 30 parallel threads, each request was slow (~3+ seconds) - 44 plugins took 2m24s (no parallelism benefit) Solution: - Created requests.Session() in __init__ with connection pooling - Configured HTTPAdapter with pool_maxsize=50 for concurrent requests - Added automatic retries for transient failures (500, 502, 503, 504) - All requests now reuse existing TCP connections Expected impact: - First request: normal latency (connection setup) - Subsequent requests: dramatically faster (connection reuse) - With 30 workers: should see ~30x speedup for I/O bound operations - 44 plugins: should drop from 2m24s to ~5-10 seconds Technical details: - pool_connections=10: number of connection pools to cache - pool_maxsize=50: max connections per pool (supports 30+ parallel workers) - Retry with backoff for transient server errors 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- cortexapps_cli/cortex_client.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/cortexapps_cli/cortex_client.py b/cortexapps_cli/cortex_client.py index b84eafe..1e35713 100644 --- a/cortexapps_cli/cortex_client.py +++ b/cortexapps_cli/cortex_client.py @@ -1,4 +1,6 @@ import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry import json import typer from rich import print @@ -20,6 +22,20 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort logging.basicConfig(level=numeric_level) self.logger = logging.getLogger(__name__) + # Create a session with connection pooling for better performance + self.session = requests.Session() + + # Configure connection pool to support concurrent requests + # pool_connections: number of connection pools to cache + # pool_maxsize: maximum number of connections to save in the pool + adapter = HTTPAdapter( + pool_connections=10, + pool_maxsize=50, + max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504]) + ) + self.session.mount('https://', adapter) + self.session.mount('http://', adapter) + def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=False, raw_response=False, content_type='application/json'): req_headers = { 'Authorization': f'Bearer {self.api_key}', @@ -33,7 +49,8 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F if content_type == 'application/json' and isinstance(data, dict): req_data = json.dumps(data) - response = requests.request(method, url, params=params, headers=req_headers, data=req_data) + # Use session for connection pooling and reuse + response = self.session.request(method, url, params=params, headers=req_headers, data=req_data) self.logger.debug(f"Request Headers: {response.request.headers}") self.logger.debug(f"Response Status Code: {response.status_code}") From c915e5300c82e68d0bb55a54e20d4cdfb060632c Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 14:23:44 -0800 Subject: [PATCH 07/12] debug: add timing and connection pool debug logging --- cortexapps_cli/commands/backup.py | 8 ++++++++ cortexapps_cli/cortex_client.py | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index 80c2c1a..7a4194c 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -104,6 +104,7 @@ def _export_ip_allowlist(ctx, directory): _file_name(directory, "ip-allowlist", str(content), "json") def _export_plugins(ctx, directory): + import time directory = _directory_name(directory, "plugins") list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None) @@ -112,17 +113,24 @@ def _export_plugins(ctx, directory): def fetch_plugin(tag): try: + start = time.time() content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False) + elapsed = time.time() - start + print(f"[DEBUG] Fetched {tag} in {elapsed:.2f}s") return (tag, content, None) except Exception as e: return (tag, None, str(e)) # Fetch all plugins in parallel + print(f"[DEBUG] Starting parallel fetch with 30 workers for {len(tags_sorted)} plugins") + overall_start = time.time() with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} results = [] for future in as_completed(futures): results.append(future.result()) + overall_elapsed = time.time() - overall_start + print(f"[DEBUG] All fetches completed in {overall_elapsed:.2f}s") # Sort results alphabetically and write in order for tag, content, error in sorted(results, key=lambda x: x[0]): diff --git a/cortexapps_cli/cortex_client.py b/cortexapps_cli/cortex_client.py index 1e35713..cfe6a7a 100644 --- a/cortexapps_cli/cortex_client.py +++ b/cortexapps_cli/cortex_client.py @@ -57,6 +57,12 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F self.logger.debug(f"Response Headers: {response.headers}") self.logger.debug(f"Response Content: {response.text}") + # Debug connection pool stats + adapter = self.session.get_adapter(url) + if hasattr(adapter, 'poolmanager') and adapter.poolmanager: + pool = adapter.poolmanager.connection_pool_kw + self.logger.debug(f"Connection pool stats: {pool}") + if not response.ok: try: # try to parse the error message From 7200e8bb9f223b4f0460bb1cc576fcee295bf81f Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 15:11:27 -0800 Subject: [PATCH 08/12] debug: add detailed timing for list, fetch, and write operations --- cortexapps_cli/commands/backup.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index 7a4194c..2302b4b 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -105,9 +105,15 @@ def _export_ip_allowlist(ctx, directory): def _export_plugins(ctx, directory): import time + overall_start = time.time() + directory = _directory_name(directory, "plugins") + list_start = time.time() list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None) + list_elapsed = time.time() - list_start + print(f"[DEBUG] plugins.list() took {list_elapsed:.2f}s") + tags = [plugin["tag"] for plugin in list["plugins"]] tags_sorted = sorted(tags) @@ -123,21 +129,27 @@ def fetch_plugin(tag): # Fetch all plugins in parallel print(f"[DEBUG] Starting parallel fetch with 30 workers for {len(tags_sorted)} plugins") - overall_start = time.time() + fetch_start = time.time() with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} results = [] for future in as_completed(futures): results.append(future.result()) - overall_elapsed = time.time() - overall_start - print(f"[DEBUG] All fetches completed in {overall_elapsed:.2f}s") + fetch_elapsed = time.time() - fetch_start + print(f"[DEBUG] All fetches completed in {fetch_elapsed:.2f}s") # Sort results alphabetically and write in order + write_start = time.time() for tag, content, error in sorted(results, key=lambda x: x[0]): if error: print(f"Failed to export plugin {tag}: {error}") else: _file_name(directory, tag, content, "json") + write_elapsed = time.time() - write_start + print(f"[DEBUG] Writing files took {write_elapsed:.2f}s") + + total_elapsed = time.time() - overall_start + print(f"[DEBUG] Total _export_plugins took {total_elapsed:.2f}s") def _export_scorecards(ctx, directory): directory = _directory_name(directory, "scorecards") From ef4e81f20002ecb6b53a0210cde9e02742ce6e50 Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 15:19:22 -0800 Subject: [PATCH 09/12] debug: add granular timing for print vs write operations --- cortexapps_cli/commands/backup.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index 2302b4b..b1dd325 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -50,13 +50,23 @@ def _directory_name(directory, backup_type): return directory def _file_name(directory, tag, content, extension): + import time + start = time.time() print("--> " + tag) + print_elapsed = time.time() - start + file = directory + "/" + tag + "." + extension if extension == "json": is_json = True else: is_json = False + + write_start = time.time() _write_file(content, file, is_json) + write_elapsed = time.time() - write_start + + if write_elapsed > 1.0 or print_elapsed > 1.0: + print(f"[DEBUG] {tag}: print={print_elapsed:.2f}s, write={write_elapsed:.2f}s") def _write_file(content, file, is_json=False): with open(file, 'w') as f: From c66c2fe438cc95f8343fbd4ba3cecae605c435ea Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 15:30:35 -0800 Subject: [PATCH 10/12] fix: use json.dump instead of Rich print for file writing The Rich library's print() was being used to write JSON to files, causing massive slowdowns (1-78 seconds per file!). Using json.dump() directly should reduce write time to milliseconds. --- cortexapps_cli/commands/backup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index b1dd325..958a780 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -71,7 +71,7 @@ def _file_name(directory, tag, content, extension): def _write_file(content, file, is_json=False): with open(file, 'w') as f: if is_json: - print(content, file=f) + json.dump(content, f, indent=2) else: f.write(str(content) + "\n") f.close() From 1aa2a6fe358ac203d0f3fdc844e613d2a09d8711 Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 15:31:56 -0800 Subject: [PATCH 11/12] chore: remove debug logging from backup and cortex_client --- cortexapps_cli/commands/backup.py | 30 ------------------------------ cortexapps_cli/cortex_client.py | 6 ------ 2 files changed, 36 deletions(-) diff --git a/cortexapps_cli/commands/backup.py b/cortexapps_cli/commands/backup.py index 958a780..a09fdc8 100644 --- a/cortexapps_cli/commands/backup.py +++ b/cortexapps_cli/commands/backup.py @@ -50,23 +50,13 @@ def _directory_name(directory, backup_type): return directory def _file_name(directory, tag, content, extension): - import time - start = time.time() print("--> " + tag) - print_elapsed = time.time() - start - file = directory + "/" + tag + "." + extension if extension == "json": is_json = True else: is_json = False - - write_start = time.time() _write_file(content, file, is_json) - write_elapsed = time.time() - write_start - - if write_elapsed > 1.0 or print_elapsed > 1.0: - print(f"[DEBUG] {tag}: print={print_elapsed:.2f}s, write={write_elapsed:.2f}s") def _write_file(content, file, is_json=False): with open(file, 'w') as f: @@ -114,52 +104,32 @@ def _export_ip_allowlist(ctx, directory): _file_name(directory, "ip-allowlist", str(content), "json") def _export_plugins(ctx, directory): - import time - overall_start = time.time() - directory = _directory_name(directory, "plugins") - list_start = time.time() list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None) - list_elapsed = time.time() - list_start - print(f"[DEBUG] plugins.list() took {list_elapsed:.2f}s") - tags = [plugin["tag"] for plugin in list["plugins"]] tags_sorted = sorted(tags) def fetch_plugin(tag): try: - start = time.time() content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False) - elapsed = time.time() - start - print(f"[DEBUG] Fetched {tag} in {elapsed:.2f}s") return (tag, content, None) except Exception as e: return (tag, None, str(e)) # Fetch all plugins in parallel - print(f"[DEBUG] Starting parallel fetch with 30 workers for {len(tags_sorted)} plugins") - fetch_start = time.time() with ThreadPoolExecutor(max_workers=30) as executor: futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted} results = [] for future in as_completed(futures): results.append(future.result()) - fetch_elapsed = time.time() - fetch_start - print(f"[DEBUG] All fetches completed in {fetch_elapsed:.2f}s") # Sort results alphabetically and write in order - write_start = time.time() for tag, content, error in sorted(results, key=lambda x: x[0]): if error: print(f"Failed to export plugin {tag}: {error}") else: _file_name(directory, tag, content, "json") - write_elapsed = time.time() - write_start - print(f"[DEBUG] Writing files took {write_elapsed:.2f}s") - - total_elapsed = time.time() - overall_start - print(f"[DEBUG] Total _export_plugins took {total_elapsed:.2f}s") def _export_scorecards(ctx, directory): directory = _directory_name(directory, "scorecards") diff --git a/cortexapps_cli/cortex_client.py b/cortexapps_cli/cortex_client.py index cfe6a7a..1e35713 100644 --- a/cortexapps_cli/cortex_client.py +++ b/cortexapps_cli/cortex_client.py @@ -57,12 +57,6 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F self.logger.debug(f"Response Headers: {response.headers}") self.logger.debug(f"Response Content: {response.text}") - # Debug connection pool stats - adapter = self.session.get_adapter(url) - if hasattr(adapter, 'poolmanager') and adapter.poolmanager: - pool = adapter.poolmanager.connection_pool_kw - self.logger.debug(f"Connection pool stats: {pool}") - if not response.ok: try: # try to parse the error message From cc40b55ed9ef5af4146360b5a879afc6dc67fe06 Mon Sep 17 00:00:00 2001 From: Jeff Schnitter Date: Tue, 4 Nov 2025 15:53:27 -0800 Subject: [PATCH 12/12] fix: add retry logic for scorecard create to handle active evaluations Scorecard create operations can fail with 500 errors if there's an active evaluation running. This is a race condition that occurs when: 1. test_import.py creates/updates a scorecard 2. An evaluation is triggered automatically or by another test 3. test_scorecards.py tries to update the same scorecard Added exponential backoff retry logic (1s, 2s) with max 3 attempts to handle these transient 500 errors gracefully. --- tests/test_scorecards.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/tests/test_scorecards.py b/tests/test_scorecards.py index 9ec4216..801f556 100644 --- a/tests/test_scorecards.py +++ b/tests/test_scorecards.py @@ -1,5 +1,6 @@ from tests.helpers.utils import * import yaml +import time # Get rule id to be used in exemption tests. # TODO: check for and revoke any PENDING exemptions. @@ -10,7 +11,18 @@ def _get_rule(title): return rule_id[0] def test_scorecards(): - cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"]) + # Retry scorecard create in case there's an active evaluation + # (can happen if test_import.py just triggered an evaluation) + max_retries = 3 + for attempt in range(max_retries): + try: + cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"]) + break + except Exception as e: + if "500" in str(e) and attempt < max_retries - 1: + time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s + continue + raise response = cli(["scorecards", "list"]) assert any(scorecard['tag'] == 'cli-test-scorecard' for scorecard in response['scorecards']), "Should find scorecard with tag cli-test-scorecard" @@ -43,7 +55,17 @@ def test_scorecards(): # cli(["scorecards", "scores", "-t", "cli-test-scorecard"]) def test_scorecards_drafts(): - cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"]) + # Retry scorecard create in case there's an active evaluation + max_retries = 3 + for attempt in range(max_retries): + try: + cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"]) + break + except Exception as e: + if "500" in str(e) and attempt < max_retries - 1: + time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s + continue + raise response = cli(["scorecards", "list", "-s"]) assert any(scorecard['tag'] == 'cli-test-draft-scorecard' for scorecard in response['scorecards'])