Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ on:

env:
CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
DOCKER_USERNAME: jeffschnittercortex
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}
DOCKER_ORGANIZATION: cortexapp
Expand Down Expand Up @@ -95,6 +96,9 @@ jobs:
runs-on: ubuntu-latest
container:
image: cortexapp/cli:latest
env:
CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
steps:

- name: Post pypi deploy event to Cortex
Expand Down Expand Up @@ -164,6 +168,9 @@ jobs:
runs-on: ubuntu-latest
container:
image: cortexapp/cli:latest
env:
CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
steps:

- name: Post docker deploy event to Cortex
Expand Down Expand Up @@ -211,6 +218,9 @@ jobs:
runs-on: ubuntu-latest
container:
image: cortexapp/cli:latest
env:
CORTEX_API_KEY: ${{ secrets.CORTEX_API_KEY_PRODUCTION }}
CORTEX_BASE_URL: ${{ vars.CORTEX_BASE_URL }}
steps:

- name: Post homebrew deploy event to Cortex
Expand Down
10 changes: 10 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

<!-- insertion marker -->
## [1.2.0](https://github.com/cortexapps/cli/releases/tag/1.2.0) - 2025-11-04

<small>[Compare with 1.1.0](https://github.com/cortexapps/cli/compare/1.1.0...1.2.0)</small>

### Fixed

- fix: handle 409 Already evaluating in trigger-evaluation test ([6715ea8](https://github.com/cortexapps/cli/commit/6715ea8ace42e5e137b649daf927bf2bec225b5e) by Jeff Schnitter).
- fix: remove entity_relationships imports from wrong branch ([3d467f6](https://github.com/cortexapps/cli/commit/3d467f699a0d4883316e039fcca571bde03d7f0a) by Jeff Schnitter).
- fix: ensure base_url defaults correctly when not set ([cadf62e](https://github.com/cortexapps/cli/commit/cadf62e79c96fb6e89046d399d9247680e8057da) by Jeff Schnitter).

## [1.1.0](https://github.com/cortexapps/cli/releases/tag/1.1.0) - 2025-11-04

<small>[Compare with 1.0.6](https://github.com/cortexapps/cli/compare/1.0.6...1.1.0)</small>
Expand Down
195 changes: 160 additions & 35 deletions cortexapps_cli/commands/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from rich.console import Console
from enum import Enum
import yaml
from concurrent.futures import ThreadPoolExecutor, as_completed

import cortexapps_cli.commands.scorecards as scorecards
import cortexapps_cli.commands.catalog as catalog
Expand Down Expand Up @@ -60,7 +61,7 @@ def _file_name(directory, tag, content, extension):
def _write_file(content, file, is_json=False):
with open(file, 'w') as f:
if is_json:
print(content, file=f)
json.dump(content, f, indent=2)
else:
f.write(str(content) + "\n")
f.close()
Expand Down Expand Up @@ -108,32 +109,72 @@ def _export_plugins(ctx, directory):
list = plugins.list(ctx, _print=False, include_drafts="true", page=None, page_size=None)
tags = [plugin["tag"] for plugin in list["plugins"]]
tags_sorted = sorted(tags)
for tag in tags_sorted:
content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False)
_file_name(directory, tag, content, "json")

def fetch_plugin(tag):
try:
content = plugins.get(ctx, tag_or_id=tag, include_blob="true", _print=False)
return (tag, content, None)
except Exception as e:
return (tag, None, str(e))

# Fetch all plugins in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(fetch_plugin, tag): tag for tag in tags_sorted}
results = []
for future in as_completed(futures):
results.append(future.result())

# Sort results alphabetically and write in order
for tag, content, error in sorted(results, key=lambda x: x[0]):
if error:
print(f"Failed to export plugin {tag}: {error}")
else:
_file_name(directory, tag, content, "json")

def _export_scorecards(ctx, directory):
directory = _directory_name(directory, "scorecards")

list = scorecards.list(ctx, show_drafts=True, page=None, page_size=None, _print=False)
tags = [scorecard["tag"] for scorecard in list["scorecards"]]
tags_sorted = sorted(tags)
for tag in tags_sorted:
content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False)
_file_name(directory, tag, content, "yaml")

def fetch_scorecard(tag):
try:
content = scorecards.descriptor(ctx, scorecard_tag=tag, _print=False)
return (tag, content, None)
except Exception as e:
return (tag, None, str(e))

# Fetch all scorecards in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(fetch_scorecard, tag): tag for tag in tags_sorted}
results = []
for future in as_completed(futures):
results.append(future.result())

# Sort results alphabetically and write in order
for tag, content, error in sorted(results, key=lambda x: x[0]):
if error:
print(f"Failed to export scorecard {tag}: {error}")
else:
_file_name(directory, tag, content, "yaml")

def _export_workflows(ctx, directory):
directory = _directory_name(directory, "workflows")

list = workflows.list(ctx, _print=False, include_actions="false", page=None, page_size=None, search_query=None)
tags = [workflow["tag"] for workflow in list["workflows"]]
tags_sorted = sorted(tags)
for tag in tags_sorted:
# Get all workflows with actions in one API call
list = workflows.list(ctx, _print=False, include_actions="true", page=None, page_size=None, search_query=None)
workflows_data = sorted(list["workflows"], key=lambda x: x["tag"])

# Convert JSON workflows to YAML and write them
for workflow in workflows_data:
tag = workflow["tag"]
try:
content = workflows.get(ctx, tag=tag, yaml="true", _print=False)
_file_name(directory, tag, content, "yaml")
except:
print("failed for " + tag)
# Convert the JSON workflow data to YAML format
workflow_yaml = yaml.dump(workflow, default_flow_style=False, sort_keys=False)
_file_name(directory, tag, workflow_yaml, "yaml")
except Exception as e:
print(f"Failed to export workflow {tag}: {e}")

backupTypes = {
"catalog",
Expand Down Expand Up @@ -257,38 +298,122 @@ def _import_entity_types(ctx, force, directory):
def _import_catalog(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
for filename in sorted(os.listdir(directory)):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
print(" Importing: " + filename)
catalog.create(ctx, file_input=open(file_path), _print=False)
files = [(filename, os.path.join(directory, filename))
for filename in sorted(os.listdir(directory))
if os.path.isfile(os.path.join(directory, filename))]

def import_catalog_file(file_info):
filename, file_path = file_info
try:
with open(file_path) as f:
catalog.create(ctx, file_input=f, _print=False)
return (filename, None)
except Exception as e:
return (filename, str(e))

# Import all files in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(import_catalog_file, file_info): file_info[0] for file_info in files}
results = []
for future in as_completed(futures):
results.append(future.result())

# Print results in alphabetical order
for filename, error in sorted(results, key=lambda x: x[0]):
if error:
print(f" Failed to import {filename}: {error}")
else:
print(f" Importing: {filename}")

def _import_plugins(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
for filename in sorted(os.listdir(directory)):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
print(" Importing: " + filename)
plugins.create(ctx, file_input=open(file_path), force=True)
files = [(filename, os.path.join(directory, filename))
for filename in sorted(os.listdir(directory))
if os.path.isfile(os.path.join(directory, filename))]

def import_plugin_file(file_info):
filename, file_path = file_info
try:
with open(file_path) as f:
plugins.create(ctx, file_input=f, force=True)
return (filename, None)
except Exception as e:
return (filename, str(e))

# Import all files in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(import_plugin_file, file_info): file_info[0] for file_info in files}
results = []
for future in as_completed(futures):
results.append(future.result())

# Print results in alphabetical order
for filename, error in sorted(results, key=lambda x: x[0]):
if error:
print(f" Failed to import {filename}: {error}")
else:
print(f" Importing: {filename}")

def _import_scorecards(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
for filename in sorted(os.listdir(directory)):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
print(" Importing: " + filename)
scorecards.create(ctx, file_input=open(file_path), dry_run=False)
files = [(filename, os.path.join(directory, filename))
for filename in sorted(os.listdir(directory))
if os.path.isfile(os.path.join(directory, filename))]

def import_scorecard_file(file_info):
filename, file_path = file_info
try:
with open(file_path) as f:
scorecards.create(ctx, file_input=f, dry_run=False)
return (filename, None)
except Exception as e:
return (filename, str(e))

# Import all files in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(import_scorecard_file, file_info): file_info[0] for file_info in files}
results = []
for future in as_completed(futures):
results.append(future.result())

# Print results in alphabetical order
for filename, error in sorted(results, key=lambda x: x[0]):
if error:
print(f" Failed to import {filename}: {error}")
else:
print(f" Importing: {filename}")

def _import_workflows(ctx, directory):
if os.path.isdir(directory):
print("Processing: " + directory)
for filename in sorted(os.listdir(directory)):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
print(" Importing: " + filename)
workflows.create(ctx, file_input=open(file_path))
files = [(filename, os.path.join(directory, filename))
for filename in sorted(os.listdir(directory))
if os.path.isfile(os.path.join(directory, filename))]

def import_workflow_file(file_info):
filename, file_path = file_info
try:
with open(file_path) as f:
workflows.create(ctx, file_input=f)
return (filename, None)
except Exception as e:
return (filename, str(e))

# Import all files in parallel
with ThreadPoolExecutor(max_workers=30) as executor:
futures = {executor.submit(import_workflow_file, file_info): file_info[0] for file_info in files}
results = []
for future in as_completed(futures):
results.append(future.result())

# Print results in alphabetical order
for filename, error in sorted(results, key=lambda x: x[0]):
if error:
print(f" Failed to import {filename}: {error}")
else:
print(f" Importing: {filename}")

@app.command("import")
def import_tenant(
Expand Down
19 changes: 18 additions & 1 deletion cortexapps_cli/cortex_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import json
import typer
from rich import print
Expand All @@ -20,6 +22,20 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort
logging.basicConfig(level=numeric_level)
self.logger = logging.getLogger(__name__)

# Create a session with connection pooling for better performance
self.session = requests.Session()

# Configure connection pool to support concurrent requests
# pool_connections: number of connection pools to cache
# pool_maxsize: maximum number of connections to save in the pool
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=50,
max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504])
)
self.session.mount('https://', adapter)
self.session.mount('http://', adapter)

def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=False, raw_response=False, content_type='application/json'):
req_headers = {
'Authorization': f'Bearer {self.api_key}',
Expand All @@ -33,7 +49,8 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F
if content_type == 'application/json' and isinstance(data, dict):
req_data = json.dumps(data)

response = requests.request(method, url, params=params, headers=req_headers, data=req_data)
# Use session for connection pooling and reuse
response = self.session.request(method, url, params=params, headers=req_headers, data=req_data)

self.logger.debug(f"Request Headers: {response.request.headers}")
self.logger.debug(f"Response Status Code: {response.status_code}")
Expand Down
26 changes: 24 additions & 2 deletions tests/test_scorecards.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from tests.helpers.utils import *
import yaml
import time

# Get rule id to be used in exemption tests.
# TODO: check for and revoke any PENDING exemptions.
Expand All @@ -10,7 +11,18 @@ def _get_rule(title):
return rule_id[0]

def test_scorecards():
cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"])
# Retry scorecard create in case there's an active evaluation
# (can happen if test_import.py just triggered an evaluation)
max_retries = 3
for attempt in range(max_retries):
try:
cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-scorecard.yaml"])
break
except Exception as e:
if "500" in str(e) and attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s
continue
raise

response = cli(["scorecards", "list"])
assert any(scorecard['tag'] == 'cli-test-scorecard' for scorecard in response['scorecards']), "Should find scorecard with tag cli-test-scorecard"
Expand Down Expand Up @@ -43,7 +55,17 @@ def test_scorecards():
# cli(["scorecards", "scores", "-t", "cli-test-scorecard"])

def test_scorecards_drafts():
cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"])
# Retry scorecard create in case there's an active evaluation
max_retries = 3
for attempt in range(max_retries):
try:
cli(["scorecards", "create", "-f", "data/import/scorecards/cli-test-draft-scorecard.yaml"])
break
except Exception as e:
if "500" in str(e) and attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s
continue
raise

response = cli(["scorecards", "list", "-s"])
assert any(scorecard['tag'] == 'cli-test-draft-scorecard' for scorecard in response['scorecards'])
Expand Down