diff --git a/Justfile b/Justfile index de6a855..84c35cb 100644 --- a/Justfile +++ b/Justfile @@ -13,11 +13,11 @@ _setup: # Run all tests test-all: _setup test-import - {{pytest}} -n auto --dist loadfile -m "not setup" --html=report.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests + {{pytest}} -n auto -m "not setup and not perf" --html=report.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests # Run all tests serially - helpful to see if any tests seem to be hanging _test-all-individual: test-import - {{pytest}} --html=report-all-invidual.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests + {{pytest}} -m "not setup and not perf" --html=report-all-invidual.html --self-contained-html --cov=cortexapps_cli --cov-append --cov-report term-missing tests # Run import test, a pre-requisite for any tests that rely on test data. test-import: @@ -25,4 +25,9 @@ test-import: # Run a single test, ie: just test tests/test_catalog.py test testname: - {{pytest}} -n auto {{testname}} + {{pytest}} -n auto -m "" {{testname}} + +# Run performance tests (rate limiting, long-running tests) +test-perf: + @echo "Running performance tests (this may take 60+ seconds)..." + {{pytest}} -v -s -m perf tests/ diff --git a/cortexapps_cli/cli.py b/cortexapps_cli/cli.py index d6f26a3..c389ca7 100755 --- a/cortexapps_cli/cli.py +++ b/cortexapps_cli/cli.py @@ -87,7 +87,8 @@ def global_callback( url: str = typer.Option(None, "--url", "-u", help="Base URL for the API", envvar="CORTEX_BASE_URL"), config_file: str = typer.Option(os.path.join(os.path.expanduser('~'), '.cortex', 'config'), "--config", "-c", help="Config file path", envvar="CORTEX_CONFIG"), tenant: str = typer.Option("default", "--tenant", "-t", help="Tenant alias", envvar="CORTEX_TENANT_ALIAS"), - log_level: Annotated[str, typer.Option("--log-level", "-l", help="Set the logging level")] = "INFO" + log_level: Annotated[str, typer.Option("--log-level", "-l", help="Set the logging level")] = "INFO", + rate_limit: int = typer.Option(None, "--rate-limit", "-r", help="API rate limit in requests per minute (default: 1000)", envvar="CORTEX_RATE_LIMIT") ): if not ctx.obj: ctx.obj = {} @@ -135,7 +136,7 @@ def global_callback( api_key = api_key.strip('"\' ') url = url.strip('"\' /') - ctx.obj["client"] = CortexClient(api_key, tenant, numeric_level, url) + ctx.obj["client"] = CortexClient(api_key, tenant, numeric_level, url, rate_limit) @app.command() def version(): diff --git a/cortexapps_cli/cortex_client.py b/cortexapps_cli/cortex_client.py index 1e35713..1ace5f7 100644 --- a/cortexapps_cli/cortex_client.py +++ b/cortexapps_cli/cortex_client.py @@ -9,12 +9,64 @@ from rich.console import Console import logging import urllib.parse +import time +import threading +import os from cortexapps_cli.utils import guess_data_key +class TokenBucket: + """ + Token bucket rate limiter for client-side rate limiting. + + Allows bursts up to bucket capacity while enforcing long-term rate limit. + Thread-safe for concurrent use. + """ + def __init__(self, rate, capacity=None): + """ + Args: + rate: Tokens per second (e.g., 1000 req/min = 16.67 req/sec) + capacity: Maximum tokens in bucket (default: rate, allows 1 second burst) + """ + self.rate = rate + self.capacity = capacity or rate + self.tokens = self.capacity + self.last_update = time.time() + self.lock = threading.Lock() + + def acquire(self, tokens=1): + """ + Acquire tokens, blocking until available. + + Args: + tokens: Number of tokens to acquire (default: 1) + """ + with self.lock: + while True: + now = time.time() + elapsed = now - self.last_update + + # Refill tokens based on elapsed time + self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) + self.last_update = now + + if self.tokens >= tokens: + self.tokens -= tokens + return + + # Calculate wait time for next token + tokens_needed = tokens - self.tokens + wait_time = tokens_needed / self.rate + + # Release lock and sleep + self.lock.release() + time.sleep(min(wait_time, 0.1)) # Sleep in small increments + self.lock.acquire() + + class CortexClient: - def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcortexapp.com'): + def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcortexapp.com', rate_limit=None): self.api_key = api_key self.tenant = tenant self.base_url = base_url @@ -22,6 +74,19 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort logging.basicConfig(level=numeric_level) self.logger = logging.getLogger(__name__) + # Enable urllib3 retry logging to see when retries occur + urllib3_logger = logging.getLogger('urllib3.util.retry') + urllib3_logger.setLevel(logging.DEBUG) + + # Read rate limit from environment variable or use default + if rate_limit is None: + rate_limit = int(os.environ.get('CORTEX_RATE_LIMIT', '1000')) + + # Client-side rate limiter (default: 1000 req/min = 16.67 req/sec) + # Allows bursting up to 50 requests, then enforces rate limit + self.rate_limiter = TokenBucket(rate=rate_limit/60.0, capacity=50) + self.logger.info(f"Rate limiter initialized: {rate_limit} req/min (burst: 50)") + # Create a session with connection pooling for better performance self.session = requests.Session() @@ -31,7 +96,13 @@ def __init__(self, api_key, tenant, numeric_level, base_url='https://api.getcort adapter = HTTPAdapter( pool_connections=10, pool_maxsize=50, - max_retries=Retry(total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504]) + max_retries=Retry( + total=3, + backoff_factor=0.3, + status_forcelist=[500, 502, 503, 504], # Removed 429 - we avoid it with rate limiting + allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"], + respect_retry_after_header=True + ) ) self.session.mount('https://', adapter) self.session.mount('http://', adapter) @@ -50,13 +121,30 @@ def request(self, method, endpoint, params={}, headers={}, data=None, raw_body=F req_data = json.dumps(data) # Use session for connection pooling and reuse + # Acquire rate limit token before making request (blocks if needed) + self.rate_limiter.acquire() + + start_time = time.time() response = self.session.request(method, url, params=params, headers=req_headers, data=req_data) + duration = time.time() - start_time + + # Log slow requests or non-200 responses (likely retries happened) + if duration > 2.0 or response.status_code != 200: + self.logger.info(f"{method} {endpoint} -> {response.status_code} ({duration:.1f}s)") + + # Log if retries likely occurred (duration suggests backoff delays) + if duration > 5.0: + self.logger.warning(f"⚠️ Slow request ({duration:.1f}s) - likely retries occurred") self.logger.debug(f"Request Headers: {response.request.headers}") self.logger.debug(f"Response Status Code: {response.status_code}") self.logger.debug(f"Response Headers: {response.headers}") self.logger.debug(f"Response Content: {response.text}") + # Check if response is OK. Note: urllib3 Retry with status_forcelist should have already + # retried any 429/500/502/503/504 errors. If we're here with one of those status codes, + # it means retries were exhausted. + if not response.ok: try: # try to parse the error message diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..787bbe3 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +markers = + perf: Performance tests that should not run in regular test suite (use 'just test-perf') + setup: Setup tests that run before other tests +# Exclude perf tests by default (setup tests are handled by Justfile) +addopts = -m "not perf" diff --git a/tests/test_perf_rate_limiting.py b/tests/test_perf_rate_limiting.py new file mode 100644 index 0000000..251b079 --- /dev/null +++ b/tests/test_perf_rate_limiting.py @@ -0,0 +1,186 @@ +""" +Performance test for client-side rate limiting. + +This test is excluded from the main test suite because: +1. It intentionally generates high API load (1000+ req/min) +2. It makes many rapid parallel API requests to stress rate limiting +3. It should only be run manually via: just test-perf + +Purpose: +- Verify that CortexClient's TokenBucket rate limiter prevents 429 errors +- Use aggressive parallelism (250 workers) with direct API calls +- Validate that even under heavy load, creates succeed without hitting 429s + +Strategy: +- Create 1000 test catalog entities with 250 parallel workers +- Use direct CortexClient API calls (no subprocess overhead) +- CortexClient proactively limits requests to 1000 req/min (with 50-token burst) +- Validate that 95%+ of creates succeed without any 429 errors +""" + +import time +import pytest +import os +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from cortexapps_cli.cortex_client import CortexClient + + +@pytest.mark.perf +def test_rate_limiting_with_retry_validation(): + """ + Rate limit test that validates CortexClient's TokenBucket rate limiter. + + Creates 1000 entities with 250 parallel workers to stress the API. + CortexClient should proactively limit requests to avoid 429 errors. + Test validates that aggressive parallel creates succeed (95%+ success rate). + """ + print("\n=== Rate Limiting Test (Aggressive Parallel Creates) ===") + print(f"Starting time: {time.strftime('%H:%M:%S')}") + print("Testing CortexClient rate limiting with 250 parallel workers.\n") + + # Initialize CortexClient with credentials from environment + api_key = os.environ.get('CORTEX_API_KEY') + base_url = os.environ.get('CORTEX_BASE_URL', 'https://api.getcortexapp.com') + + if not api_key: + pytest.skip("CORTEX_API_KEY not set") + + client = CortexClient( + api_key=api_key, + tenant='perf-test', + numeric_level=logging.INFO, + base_url=base_url + ) + + # Setup: Create custom entity type for test entities + print("Setting up perf-test entity type...") + entity_type_def = { + "type": "perf-test", + "name": "Performance Test Entity", + "description": "Temporary entity type for rate limiting performance tests", + "schema": {"type": "object", "required": [], "properties": {}} + } + + # Create entity type (delete first if it exists from previous failed run) + try: + client.delete("api/v1/catalog/definitions/perf-test") + except: + pass # Ignore if doesn't exist + + try: + client.post("api/v1/catalog/definitions", data=entity_type_def) + print(" ✅ Created perf-test entity type") + except Exception as e: + print(f" ⚠️ Warning: Could not create entity type: {e}") + + # Create test data rapidly to stress rate limiting + print("Creating 1000 test entities in parallel (testing rate limiter)...") + start_time = time.time() + + test_entities = [] + create_errors = [] + completed_count = 0 + + # Function to create a single entity using CortexClient directly (much faster than subprocess) + def create_entity(index): + entity_tag = f"rate-limit-test-{index:04d}" + # OpenAPI YAML format (what the API expects) + openapi_yaml = f"""openapi: 3.0.1 +info: + title: Rate Limit Test {index} + x-cortex-tag: {entity_tag} + x-cortex-type: perf-test + description: Test entity {index} for rate limiting validation +""" + + try: + # Make direct API call - CortexClient rate limiter prevents 429s + # Use the open-api endpoint which accepts OpenAPI YAML format + client.post("api/v1/open-api", data=openapi_yaml, content_type="application/openapi;charset=UTF-8") + return {'success': True, 'tag': entity_tag, 'index': index} + except Exception as e: + return { + 'success': False, + 'tag': entity_tag, + 'index': index, + 'error': str(e)[:200] # Truncate long errors + } + + # Execute in parallel with 250 workers to stress the rate limiter + # CortexClient should proactively limit to 1000 req/min and avoid 429s + with ThreadPoolExecutor(max_workers=250) as executor: + futures = {executor.submit(create_entity, i): i for i in range(1000)} + + for future in as_completed(futures): + result = future.result() + completed_count += 1 + + if result['success']: + test_entities.append(result['tag']) + + if completed_count % 50 == 0: + elapsed = time.time() - start_time + rate = completed_count / elapsed * 60 if elapsed > 0 else 0 + print(f" Completed {completed_count}/1000 entities in {elapsed:.1f}s | ~{rate:.0f} req/min") + else: + create_errors.append({ + 'entity': result['index'], + 'tag': result['tag'], + 'error': result['error'] + }) + print(f" Entity {result['index']}: ❌ FAILED: {result['error']}") + + total_duration = time.time() - start_time + + # Cleanup - delete all entities by type (much faster than individual deletes) + print(f"\nCleaning up test entities...") + cleanup_start = time.time() + try: + # Delete all entities of type perf-test using params + client.delete("api/v1/catalog", params={"types": "perf-test"}) + cleanup_duration = time.time() - cleanup_start + print(f" ✅ Deleted all perf-test entities in {cleanup_duration:.1f}s") + except Exception as e: + print(f" ⚠️ Cleanup error: {str(e)}") + + # Cleanup entity type + try: + client.delete("api/v1/catalog/definitions/perf-test") + print(f" ✅ Deleted perf-test entity type") + except Exception as e: + print(f" ⚠️ Could not delete entity type: {e}") + + # Analysis + print(f"\n=== Test Results ===") + print(f"Duration: {total_duration:.1f}s") + print(f"Entities created: {len(test_entities)}/1000") + print(f"Create errors: {len(create_errors)}") + + # Calculate approximate request rate (based on successful creates / time) + # Note: This doesn't include internal retries by CortexClient + requests_per_minute = (len(test_entities) / total_duration) * 60 + print(f"Effective rate: ~{requests_per_minute:.0f} req/min") + + # Assertions + print(f"\n=== Validation ===") + + # 1. All or nearly all entities should be created successfully + # If CortexClient rate limiter works, no 429s should occur + success_rate = len(test_entities) / 1000 + print(f"Success rate: {success_rate * 100:.1f}%") + assert success_rate >= 0.95, f"At least 95% of creates should succeed (got {success_rate * 100:.1f}%)" + + # 2. Should have very few or no failures + # CortexClient should prevent 429s proactively + print(f"Failures: {len(create_errors)}") + assert len(create_errors) < 5, f"Should have very few failures (got {len(create_errors)})" + + print(f"\n✅ Rate limiting test PASSED") + print(f" - Created {len(test_entities)}/1000 entities in {total_duration:.1f}s") + print(f" - Success rate: {success_rate * 100:.1f}%") + print(f" - Effective rate: ~{requests_per_minute:.0f} req/min") + print(f" - Note: CortexClient proactively limits requests to 1000 req/min (burst: 50)") + print(f" - ✅ Aggressive parallel creates succeeded without hitting 429s") + + diff --git a/tests/test_scorecards.py b/tests/test_scorecards.py index 7b5c991..d00e15f 100644 --- a/tests/test_scorecards.py +++ b/tests/test_scorecards.py @@ -87,13 +87,22 @@ def test_scorecards_drafts(): # - However, we should be cleaning up test data after tests run which would invalidate these assumptions. @pytest.fixture(scope='session') -@mock.patch.dict(os.environ, {"CORTEX_API_KEY": os.environ['CORTEX_API_KEY_VIEWER']}) def test_exemption_that_will_be_approved(): rule_id = _get_rule("Has Custom Data") print("rule_id = " + rule_id) - response = cli(["scorecards", "exemptions", "request", "-s", "cli-test-scorecard", "-t", "cli-test-service", "-r", "test approve", "-ri", rule_id, "-d", "100"]) - assert response['exemptionStatus']['status'] == 'PENDING', "exemption state should be PENDING" + # Revoke any existing exemption to make test idempotent (ignore if doesn't exist) + # Use admin key for revoke since viewer doesn't have permission + try: + cli(["scorecards", "exemptions", "revoke", "-s", "cli-test-scorecard", "-t", "cli-test-service", "-r", "cleanup", "-ri", rule_id]) + except Exception as e: + # Ignore errors - exemption may not exist + print(f"Cleanup: {e}") + + # Request exemption with viewer key (auto-approved with admin, pending with viewer) + with mock.patch.dict(os.environ, {"CORTEX_API_KEY": os.environ['CORTEX_API_KEY_VIEWER']}): + response = cli(["scorecards", "exemptions", "request", "-s", "cli-test-scorecard", "-t", "cli-test-service", "-r", "test approve", "-ri", rule_id, "-d", "100"]) + assert response['exemptionStatus']['status'] == 'PENDING', "exemption state should be PENDING" @pytest.mark.usefixtures('test_exemption_that_will_be_approved') def test_approve_exemption(): @@ -106,13 +115,22 @@ def test_approve_exemption(): assert response['exemptions'][0]['exemptionStatus']['status'] == 'REJECTED', "exemption state should be REJECTED" @pytest.fixture(scope='session') -@mock.patch.dict(os.environ, {"CORTEX_API_KEY": os.environ['CORTEX_API_KEY_VIEWER']}) def test_exemption_that_will_be_denied(): rule_id = _get_rule("Is Definitely False") print("rule_id = " + rule_id) - response = cli(["scorecards", "exemptions", "request", "-s", "cli-test-scorecard", "-t", "cli-test-service", "-r", "test deny", "-ri", rule_id, "-d", "100"]) - assert response['exemptionStatus']['status'] == 'PENDING', "exemption state should be PENDING" + # Revoke any existing exemption to make test idempotent (ignore if doesn't exist) + # Use admin key for revoke since viewer doesn't have permission + try: + cli(["scorecards", "exemptions", "revoke", "-s", "cli-test-scorecard", "-t", "cli-test-service", "-r", "cleanup", "-ri", rule_id]) + except Exception as e: + # Ignore errors - exemption may not exist + print(f"Cleanup: {e}") + + # Request exemption with viewer key (auto-approved with admin, pending with viewer) + with mock.patch.dict(os.environ, {"CORTEX_API_KEY": os.environ['CORTEX_API_KEY_VIEWER']}): + response = cli(["scorecards", "exemptions", "request", "-s", "cli-test-scorecard", "-t", "cli-test-service", "-r", "test deny", "-ri", rule_id, "-d", "100"]) + assert response['exemptionStatus']['status'] == 'PENDING', "exemption state should be PENDING" @pytest.mark.usefixtures('test_exemption_that_will_be_denied') def test_deny_exemption():