Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@ _setup:

# Run all tests
test-all: _setup test-import
{{pytest}} -n auto --dist loadfile -m "not setup" --html=report.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests
{{pytest}} -n auto -m "not setup and not perf" --html=report.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests

# Run all tests serially - helpful to see if any tests seem to be hanging
_test-all-individual: test-import
{{pytest}} --html=report-all-invidual.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests
{{pytest}} -m "not setup and not perf" --html=report-all-invidual.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests

# Run import test, a pre-requisite for any tests that rely on test data.
test-import:
{{pytest}} tests/test_import.py --cov=cortexapps_cli --cov-report=

# Run a single test, ie: just test tests/test_catalog.py
test testname:
{{pytest}} -n auto {{testname}}
{{pytest}} -n auto -m "" {{testname}}

# Run performance tests (rate limiting, long-running tests)
test-perf:
@echo "Running performance tests (this may take 60+ seconds)..."
{{pytest}} -v -s -m perf tests/
5 changes: 3 additions & 2 deletions cortexapps_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def global_callback(
url: str = typer.Option(None, "--url", "-u", help="Base URL for the API", envvar="CORTEX_BASE_URL"),
config_file: str = typer.Option(os.path.join(os.path.expanduser('~'), '.cortex', 'config'), "--config", "-c", help="Config file path", envvar="CORTEX_CONFIG"),
tenant: str = typer.Option("default", "--tenant", "-t", help="Tenant alias", envvar="CORTEX_TENANT_ALIAS"),
log_level: Annotated[str, typer.Option("--log-level", "-l", help="Set the logging level")] = "INFO"
log_level: Annotated[str, typer.Option("--log-level", "-l", help="Set the logging level")] = "INFO",
rate_limit: int = typer.Option(None, "--rate-limit", "-r", help="API rate limit in requests per minute (default: 1000)", envvar="CORTEX_RATE_LIMIT")
):
if not ctx.obj:
ctx.obj = {}
Expand Down Expand Up @@ -135,7 +136,7 @@ def global_callback(
api_key = api_key.strip('"\' ')
url = url.strip('"\' /')

ctx.obj["client"] = CortexClient(api_key, tenant, numeric_level, url)
ctx.obj["client"] = CortexClient(api_key, tenant, numeric_level, url, rate_limit)

@app.command()
def version():
Expand Down
92 changes: 90 additions & 2 deletions cortexapps_cli/cortex_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,84 @@
from rich.console import Console
import logging
import urllib.parse
import time
import threading
import os

from cortexapps_cli.utils import guess_data_key


class TokenBucket:
"""
Token bucket rate limiter for client-side rate limiting.

Allows bursts up to bucket capacity while enforcing long-term rate limit.
Thread-safe for concurrent use.
"""
def __init__(self, rate, capacity=None):
"""
Args:
rate: Tokens per second (e.g., 1000 req/min = 16.67 req/sec)
capacity: Maximum tokens in bucket (default: rate, allows 1 second burst)
"""
self.rate = rate
self.capacity = capacity or rate
self.tokens = self.capacity
self.last_update = time.time()
self.lock = threading.Lock()

def acquire(self, tokens=1):
"""
Acquire tokens, blocking until available.

Args:
tokens: Number of tokens to acquire (default: 1)
"""
with self.lock:
while True:
now = time.time()
elapsed = now - self.last_update

# Refill tokens based on elapsed time
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now

if self.tokens >= tokens:
self.tokens -= tokens
return

# Calculate wait time for next token
tokens_needed = tokens - self.tokens
wait_time = tokens_needed / self.rate

# Release lock and sleep
self.lock.release()
time.sleep(min(wait_time, 0.1)) # Sleep in small increments
self.lock.acquire()


class CortexClient:
def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcortexapp.com'):
def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcortexapp.com', rate_limit=None):
self.api_key = api_key
self.tenant = tenant
self.base_url = base_url

logging.basicConfig(level=numeric_level)
self.logger = logging.getLogger(__name__)

# Enable urllib3 retry logging to see when retries occur
urllib3_logger = logging.getLogger('urllib3.util.retry')
urllib3_logger.setLevel(logging.DEBUG)

# Read rate limit from environment variable or use default
if rate_limit is None:
rate_limit = int(os.environ.get('CORTEX_RATE_LIMIT', '1000'))

# Client-side rate limiter (default: 1000 req/min = 16.67 req/sec)
# Allows bursting up to 50 requests, then enforces rate limit
self.rate_limiter = TokenBucket(rate=rate_limit/60.0, capacity=50)
self.logger.info(f"Rate limiter initialized: {rate_limit} req/min (burst: 50)")

# Create a session with connection pooling for better performance
self.session = requests.Session()

Expand All @@ -31,7 +96,13 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=50,
max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504])
max_retries=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504], # Removed 429 - we avoid it with rate limiting
allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
respect_retry_after_header=True
)
)
self.session.mount('https://', adapter)
self.session.mount('http://', adapter)
Expand All @@ -50,13 +121,30 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F
req_data = json.dumps(data)

# Use session for connection pooling and reuse
# Acquire rate limit token before making request (blocks if needed)
self.rate_limiter.acquire()

start_time = time.time()
response = self.session.request(method, url, params=params, headers=req_headers, data=req_data)
duration = time.time() - start_time

# Log slow requests or non-200 responses (likely retries happened)
if duration > 2.0 or response.status_code != 200:
self.logger.info(f"{method} {endpoint} -> {response.status_code} ({duration:.1f}s)")

# Log if retries likely occurred (duration suggests backoff delays)
if duration > 5.0:
self.logger.warning(f"⚠️ Slow request ({duration:.1f}s) - likely retries occurred")

self.logger.debug(f"Request Headers: {response.request.headers}")
self.logger.debug(f"Response Status Code: {response.status_code}")
self.logger.debug(f"Response Headers: {response.headers}")
self.logger.debug(f"Response Content: {response.text}")

# Check if response is OK. Note: urllib3 Retry with status_forcelist should have already
# retried any 429/500/502/503/504 errors. If we're here with one of those status codes,
# it means retries were exhausted.

if not response.ok:
try:
# try to parse the error message
Expand Down
6 changes: 6 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[pytest]
markers =
perf: Performance tests that should not run in regular test suite (use 'just test-perf')
setup: Setup tests that run before other tests
# Exclude perf tests by default (setup tests are handled by Justfile)
addopts = -m "not perf"
186 changes: 186 additions & 0 deletions tests/test_perf_rate_limiting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
"""
Performance test for client-side rate limiting.

This test is excluded from the main test suite because:
1. It intentionally generates high API load (1000+ req/min)
2. It makes many rapid parallel API requests to stress rate limiting
3. It should only be run manually via: just test-perf

Purpose:
- Verify that CortexClient's TokenBucket rate limiter prevents 429 errors
- Use aggressive parallelism (250 workers) with direct API calls
- Validate that even under heavy load, creates succeed without hitting 429s

Strategy:
- Create 1000 test catalog entities with 250 parallel workers
- Use direct CortexClient API calls (no subprocess overhead)
- CortexClient proactively limits requests to 1000 req/min (with 50-token burst)
- Validate that 95%+ of creates succeed without any 429 errors
"""

import time
import pytest
import os
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from cortexapps_cli.cortex_client import CortexClient


@pytest.mark.perf
def test_rate_limiting_with_retry_validation():
"""
Rate limit test that validates CortexClient's TokenBucket rate limiter.

Creates 1000 entities with 250 parallel workers to stress the API.
CortexClient should proactively limit requests to avoid 429 errors.
Test validates that aggressive parallel creates succeed (95%+ success rate).
"""
print("\n=== Rate Limiting Test (Aggressive Parallel Creates) ===")
print(f"Starting time: {time.strftime('%H:%M:%S')}")
print("Testing CortexClient rate limiting with 250 parallel workers.\n")

# Initialize CortexClient with credentials from environment
api_key = os.environ.get('CORTEX_API_KEY')
base_url = os.environ.get('CORTEX_BASE_URL', 'https://api.getcortexapp.com')

if not api_key:
pytest.skip("CORTEX_API_KEY not set")

client = CortexClient(
api_key=api_key,
tenant='perf-test',
numeric_level=logging.INFO,
base_url=base_url
)

# Setup: Create custom entity type for test entities
print("Setting up perf-test entity type...")
entity_type_def = {
"type": "perf-test",
"name": "Performance Test Entity",
"description": "Temporary entity type for rate limiting performance tests",
"schema": {"type": "object", "required": [], "properties": {}}
}

# Create entity type (delete first if it exists from previous failed run)
try:
client.delete("api/v1/catalog/definitions/perf-test")
except:
pass # Ignore if doesn't exist

try:
client.post("api/v1/catalog/definitions", data=entity_type_def)
print(" ✅ Created perf-test entity type")
except Exception as e:
print(f" ⚠️ Warning: Could not create entity type: {e}")

# Create test data rapidly to stress rate limiting
print("Creating 1000 test entities in parallel (testing rate limiter)...")
start_time = time.time()

test_entities = []
create_errors = []
completed_count = 0

# Function to create a single entity using CortexClient directly (much faster than subprocess)
def create_entity(index):
entity_tag = f"rate-limit-test-{index:04d}"
# OpenAPI YAML format (what the API expects)
openapi_yaml = f"""openapi: 3.0.1
info:
title: Rate Limit Test {index}
x-cortex-tag: {entity_tag}
x-cortex-type: perf-test
description: Test entity {index} for rate limiting validation
"""

try:
# Make direct API call - CortexClient rate limiter prevents 429s
# Use the open-api endpoint which accepts OpenAPI YAML format
client.post("api/v1/open-api", data=openapi_yaml, content_type="application/openapi;charset=UTF-8")
return {'success': True, 'tag': entity_tag, 'index': index}
except Exception as e:
return {
'success': False,
'tag': entity_tag,
'index': index,
'error': str(e)[:200] # Truncate long errors
}

# Execute in parallel with 250 workers to stress the rate limiter
# CortexClient should proactively limit to 1000 req/min and avoid 429s
with ThreadPoolExecutor(max_workers=250) as executor:
futures = {executor.submit(create_entity, i): i for i in range(1000)}

for future in as_completed(futures):
result = future.result()
completed_count += 1

if result['success']:
test_entities.append(result['tag'])

if completed_count % 50 == 0:
elapsed = time.time() - start_time
rate = completed_count / elapsed * 60 if elapsed > 0 else 0
print(f" Completed {completed_count}/1000 entities in {elapsed:.1f}s | ~{rate:.0f} req/min")
else:
create_errors.append({
'entity': result['index'],
'tag': result['tag'],
'error': result['error']
})
print(f" Entity {result['index']}: ❌ FAILED: {result['error']}")

total_duration = time.time() - start_time

# Cleanup - delete all entities by type (much faster than individual deletes)
print(f"\nCleaning up test entities...")
cleanup_start = time.time()
try:
# Delete all entities of type perf-test using params
client.delete("api/v1/catalog", params={"types": "perf-test"})
cleanup_duration = time.time() - cleanup_start
print(f" ✅ Deleted all perf-test entities in {cleanup_duration:.1f}s")
except Exception as e:
print(f" ⚠️ Cleanup error: {str(e)}")

# Cleanup entity type
try:
client.delete("api/v1/catalog/definitions/perf-test")
print(f" ✅ Deleted perf-test entity type")
except Exception as e:
print(f" ⚠️ Could not delete entity type: {e}")

# Analysis
print(f"\n=== Test Results ===")
print(f"Duration: {total_duration:.1f}s")
print(f"Entities created: {len(test_entities)}/1000")
print(f"Create errors: {len(create_errors)}")

# Calculate approximate request rate (based on successful creates / time)
# Note: This doesn't include internal retries by CortexClient
requests_per_minute = (len(test_entities) / total_duration) * 60
print(f"Effective rate: ~{requests_per_minute:.0f} req/min")

# Assertions
print(f"\n=== Validation ===")

# 1. All or nearly all entities should be created successfully
# If CortexClient rate limiter works, no 429s should occur
success_rate = len(test_entities) / 1000
print(f"Success rate: {success_rate * 100:.1f}%")
assert success_rate >= 0.95, f"At least 95% of creates should succeed (got {success_rate * 100:.1f}%)"

# 2. Should have very few or no failures
# CortexClient should prevent 429s proactively
print(f"Failures: {len(create_errors)}")
assert len(create_errors) < 5, f"Should have very few failures (got {len(create_errors)})"

print(f"\n✅ Rate limiting test PASSED")
print(f" - Created {len(test_entities)}/1000 entities in {total_duration:.1f}s")
print(f" - Success rate: {success_rate * 100:.1f}%")
print(f" - Effective rate: ~{requests_per_minute:.0f} req/min")
print(f" - Note: CortexClient proactively limits requests to 1000 req/min (burst: 50)")
print(f" - ✅ Aggressive parallel creates succeeded without hitting 429s")


Loading