diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..51625b6 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,35 @@ +name: CI + +on: + push: + pull_request: + +jobs: + test: + name: Python ${{ matrix.python-version }} + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12", "3.13"] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + + - name: Install dependencies + run: pip install -e ".[dev]" + + - name: Lint (ruff) + run: ruff check src tests + + - name: Type-check (mypy) + run: mypy src + + - name: Test (pytest) + run: pytest diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 0000000..a467fb7 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,39 @@ +name: Integration tests + +on: + push: + branches: + - main + workflow_dispatch: + +jobs: + integration: + name: Real API integration tests + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install dependencies + run: pip install -e ".[dev]" + + - name: Verify SYVEL_API_KEY is configured + run: | + if [ -z "$SYVEL_API_KEY" ]; then + echo "::error::SYVEL_API_KEY secret is not configured in this repository." + echo "Add it under Settings → Secrets and variables → Actions." + exit 1 + fi + env: + SYVEL_API_KEY: ${{ secrets.SYVEL_API_KEY }} + + - name: Run integration tests + run: pytest tests/test_integration.py -v + env: + SYVEL_API_KEY: ${{ secrets.SYVEL_API_KEY }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..aab26c6 --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,32 @@ +name: Publish to PyPI + +on: + push: + tags: + - "v*" + +jobs: + publish: + name: Build and publish + runs-on: ubuntu-latest + environment: pypi + permissions: + id-token: write # Required for PyPI Trusted Publishing (OIDC) + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + + - name: Install build dependencies + run: pip install build + + - name: Build package + run: python -m build + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a9a46da --- /dev/null +++ b/.gitignore @@ -0,0 +1,34 @@ +# IDE +.idea/ +.vscode/ + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +*.egg + +# Build +dist/ +build/ +.eggs/ + +# Virtual environments +.venv/ +venv/ +env/ + +# Testing +.pytest_cache/ +.coverage +.coverage.* +htmlcov/ + +# Type checking +.mypy_cache/ + +# Linting +.ruff_cache/ + +# Environment +.env diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..56b8375 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,35 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.1.0] — 2026-04-03 + +### Added + +- `Syvel` synchronous client with `check()` and `check_email()` methods. +- `AsyncSyvel` asynchronous client with an identical API (`async`/`await`). +- P1 endpoints on both clients: `usage()`, `logs()`, `stats()`, `list_keys()`, + `create_key()`, `revoke_key()`. +- Full typed exception hierarchy: + - `SyvelError` — base class with `status_code` and `code` attributes. + - `SyvelAuthError` — HTTP 401, invalid or missing API key. + - `SyvelForbiddenError` — HTTP 403, origin not authorised. + - `SyvelValidationError` — HTTP 422, invalid email or domain format. + - `SyvelRateLimitError` — HTTP 429, quota exceeded. Exposes `reset_at: datetime | None`. + - `SyvelTimeoutError` — request exceeded the configured timeout. +- Silent mode (`silent=True`) for automatic fail-open behaviour. +- Context manager support for both sync (`with`) and async (`async with`) clients. +- Response models as frozen dataclasses: `CheckResult`, `UsageResult`, `LogEntry`, + `LogsPage`, `StatsPoint`, `ApiKey`. +- `py.typed` marker — PEP 561 compliant for full type-checker support. +- Integration examples for Django, FastAPI, and Flask. +- GitHub Actions CI matrix across Python 3.10–3.13. +- PyPI Trusted Publishing (OIDC) workflow — no stored tokens. + +[Unreleased]: https://github.com/syvel-io/syvel-python/compare/v0.1.0...HEAD +[0.1.0]: https://github.com/syvel-io/syvel-python/releases/tag/v0.1.0 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..80e6ef3 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 Syvel.io + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..6c07574 --- /dev/null +++ b/README.md @@ -0,0 +1,332 @@ +# syvel + +[![PyPI version](https://img.shields.io/pypi/v/syvel?color=blue)](https://pypi.org/project/syvel/) +[![Python versions](https://img.shields.io/pypi/pyversions/syvel)](https://pypi.org/project/syvel/) +[![License](https://img.shields.io/pypi/l/syvel)](LICENSE) +[![CI](https://github.com/syvel-io/syvel-python/actions/workflows/ci.yml/badge.svg)](https://github.com/syvel-io/syvel-python/actions/workflows/ci.yml) + +Official Python SDK for [Syvel](https://www.syvel.io) — disposable email detection API. + +Detect throwaway addresses, role accounts, and undeliverable domains before they pollute your database. Works with **Python 3.10+**, supports both **sync and async**, and has **zero runtime dependencies** beyond [httpx](https://www.python-httpx.org/). + +--- + +## Install + +```bash +pip install syvel +``` + +--- + +## Quick start + +### Synchronous + +```python +import os +from syvel import Syvel, SyvelError, SyvelTimeoutError + +client = Syvel(api_key=os.environ["SYVEL_API_KEY"]) + +try: + result = client.check_email("user@example.com") + if result and result.is_risky: + print(f"Blocked: {result.reason} (score {result.risk_score})") + else: + print("Email looks good") +except SyvelTimeoutError: + pass # Always fail open on timeouts +except SyvelError as e: + print(f"API error {e.status_code}: {e}") +``` + +### Asynchronous + +```python +import os +from syvel import AsyncSyvel + +async def validate(email: str) -> bool: + async with AsyncSyvel(api_key=os.environ["SYVEL_API_KEY"]) as client: + result = await client.check_email(email) + return result is None or not result.is_risky +``` + +### Fail-open (silent) mode + +When you'd rather accept every user than block one legitimate signup, use `silent=True`. All errors are suppressed and methods return `None`: + +```python +client = Syvel(api_key=os.environ["SYVEL_API_KEY"], silent=True) + +result = client.check_email("user@example.com") +# Returns None on any error — never blocks your signup flow +if result and result.is_risky: + print("Risky email detected") +``` + +--- + +## Configuration + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `api_key` | `str` | — | **Required.** Your Syvel API key (`sv_...`). Get one at [syvel.io/dashboard](https://www.syvel.io/dashboard). | +| `base_url` | `str` | `https://api.syvel.io` | Override the API base URL (useful for testing). | +| `timeout` | `float` | `3.0` | Request timeout in seconds. | +| `silent` | `bool` | `False` | When `True`, all errors return `None` instead of raising. | + +--- + +## API reference + +### `check(domain)` + +Check a bare domain for disposable-email risk. + +```python +result = client.check("yopmail.com") +``` + +### `check_email(email)` + +Check a full email address. Passes the complete address to the API to enable local-part analysis (role account detection, random pattern scoring). + +```python +result = client.check_email("admin@example.com") +``` + +### `usage()` + +Get the current monthly quota and usage for your project. + +```python +usage = client.usage() +print(f"{usage.used}/{usage.limit} checks used this month") +``` + +### `logs(cursor=None, limit=None)` + +Retrieve a paginated page of recent request logs. + +```python +page = client.logs(limit=50) +for entry in page.items: + print(entry.target, entry.risk_score) + +# Fetch next page +if page.next_cursor: + next_page = client.logs(cursor=page.next_cursor, limit=50) +``` + +### `stats()` + +Retrieve time-series analytics for the last 30 days. + +```python +points = client.stats() +for point in points: + print(f"{point.date}: {point.total} checks, {point.risky} risky") +``` + +### `list_keys()` / `create_key(name)` / `revoke_key(key_id)` + +Manage API keys for your project. + +```python +keys = client.list_keys() + +new_key = client.create_key("Staging") +print(new_key.id) + +client.revoke_key(new_key.id) +``` + +All methods are available on `AsyncSyvel` as coroutines — prefix them with `await`. + +--- + +## Response fields + +| Field | Type | Description | +|-------|------|-------------| +| `email` | `str` | Masked email address (`"a9****e5@yopmail.com"`). | +| `is_risky` | `bool` | `True` when `risk_score ≥ 65`. Use as your primary signal. | +| `risk_score` | `int` | 0 (safe) → 100 (confirmed disposable). | +| `reason` | `str` | `"safe"` \| `"disposable"` \| `"undeliverable"` \| `"role_account"` | +| `deliverability_score` | `int` | Likelihood (0–100) that mail will be delivered. | +| `did_you_mean` | `str \| None` | Typo correction suggestion (e.g. `"hotmail.com"` for `"hotmial.com"`). | +| `is_free_provider` | `bool` | `True` for Gmail, Yahoo, etc. | +| `is_corporate_email` | `bool` | `True` for business domains. | +| `is_alias_email` | `bool` | `True` for privacy-relay services (SimpleLogin, AnonAddy, etc.). | +| `mx_provider_label` | `str \| None` | Human-readable name of the mail provider. | + +--- + +## Error handling + +All errors inherit from `SyvelError`. Catch specific subclasses for fine-grained handling: + +```python +from syvel import ( + SyvelError, + SyvelAuthError, + SyvelForbiddenError, + SyvelValidationError, + SyvelRateLimitError, + SyvelTimeoutError, +) + +try: + result = client.check_email("user@example.com") +except SyvelAuthError: + # Invalid or missing API key + print("Check your SYVEL_API_KEY") +except SyvelRateLimitError as e: + # Monthly quota exceeded + print(f"Quota resets at {e.reset_at}") +except SyvelTimeoutError: + # Request took longer than `timeout` seconds + pass # Always fail open +except SyvelValidationError: + # Invalid email or domain format + pass +except SyvelForbiddenError: + # Origin not whitelisted for this API key + pass +except SyvelError as e: + # Catch-all for unexpected API errors + print(f"Syvel error {e.status_code}: {e}") +``` + +| Exception | HTTP | Cause | +|-----------|------|-------| +| `SyvelAuthError` | 401 | Invalid or missing API key | +| `SyvelForbiddenError` | 403 | Origin not authorised for this key | +| `SyvelValidationError` | 422 | Invalid email or domain format | +| `SyvelRateLimitError` | 429 | Monthly quota exceeded | +| `SyvelTimeoutError` | — | Request exceeded the configured timeout | +| `SyvelError` | any | Unexpected API error | + +--- + +## Framework examples + +### Django + +```python +# validators.py +import os +from django.core.exceptions import ValidationError +from syvel import Syvel + +_client = Syvel(api_key=os.environ["SYVEL_API_KEY"], silent=True) + +def validate_not_disposable(value: str) -> None: + result = _client.check_email(value) + if result and result.is_risky: + raise ValidationError( + "Please use a permanent email address.", + code="disposable_email", + ) + +# forms.py +from django import forms +from .validators import validate_not_disposable + +class SignupForm(forms.Form): + email = forms.EmailField(validators=[validate_not_disposable]) +``` + +### FastAPI + +```python +import os +from functools import lru_cache +from typing import Annotated +from fastapi import Depends, FastAPI, HTTPException +from syvel import AsyncSyvel, CheckResult + +app = FastAPI() + +@lru_cache(maxsize=1) +def get_client() -> AsyncSyvel: + return AsyncSyvel(api_key=os.environ["SYVEL_API_KEY"], silent=True) + +async def email_risk( + email: str, + client: AsyncSyvel = Depends(get_client), +) -> CheckResult | None: + return await client.check_email(email) + +EmailRisk = Annotated[CheckResult | None, Depends(email_risk)] + +@app.post("/signup") +async def signup(email: str, risk: EmailRisk): + if risk and risk.is_risky: + raise HTTPException(422, f"Disposable email not allowed ({risk.reason})") + return {"status": "ok"} +``` + +### Flask + +```python +import os +from flask import Flask, abort, request +from syvel import Syvel + +app = Flask(__name__) +_client = Syvel(api_key=os.environ["SYVEL_API_KEY"], silent=True) + +@app.before_request +def check_email(): + if request.endpoint == "register" and request.method == "POST": + email = request.form.get("email", "") + result = _client.check_email(email) + if result and result.is_risky: + abort(422, "Disposable email addresses are not allowed.") +``` + +--- + +## Context manager + +Use `Syvel` or `AsyncSyvel` as a context manager to ensure connections are released when you're done: + +```python +# Sync +with Syvel(api_key="sv_...") as client: + result = client.check("yopmail.com") + +# Async +async with AsyncSyvel(api_key="sv_...") as client: + result = await client.check("yopmail.com") +``` + +For long-lived processes (web servers), create the client once at startup and call `client.close()` / `await client.aclose()` on shutdown instead. + +--- + +## Contributing + +Pull requests are welcome. For major changes, please open an issue first. + +```bash +git clone https://github.com/syvel-io/syvel-python.git +cd syvel-python +pip install -e ".[dev]" + +# Run tests +pytest + +# Lint + type-check +ruff check src tests +mypy src +``` + +--- + +## License + +[MIT](LICENSE) diff --git a/examples/django_validator.py b/examples/django_validator.py new file mode 100644 index 0000000..0004f9c --- /dev/null +++ b/examples/django_validator.py @@ -0,0 +1,79 @@ +""" +Django form field validator using Syvel. + +This example shows two integration patterns: + +1. A standalone validator function that can be attached to any form field. +2. A custom form field subclass with built-in validation. + +Setup +----- +Add your API key to settings.py (or load it from an environment variable):: + + # settings.py + SYVEL_API_KEY = os.environ["SYVEL_API_KEY"] + +Install the SDK:: + + pip install syvel + +Usage +----- +Attach the validator to any EmailField:: + + from django import forms + from myapp.validators import validate_not_disposable + + class SignupForm(forms.Form): + email = forms.EmailField(validators=[validate_not_disposable]) + +Or use the custom field directly:: + + from myapp.validators import SafeEmailField + + class SignupForm(forms.Form): + email = SafeEmailField() +""" + +from __future__ import annotations + +import os + +from django.conf import settings # type: ignore[import-untyped] +from django.core.exceptions import ValidationError # type: ignore[import-untyped] +from django.forms import EmailField # type: ignore[import-untyped] + +from syvel import Syvel + +# Reuse a single client instance across all requests for connection pooling. +_client = Syvel( + api_key=getattr(settings, "SYVEL_API_KEY", os.environ.get("SYVEL_API_KEY", "")), + silent=True, # Fail open: never block a user because of a third-party failure. +) + + +def validate_not_disposable(value: str) -> None: + """Django validator that rejects disposable email addresses. + + Silently passes when Syvel is unavailable (fail-open by design). + + Args: + value: The email address to validate. + + Raises: + ValidationError: When the email is identified as risky. + """ + result = _client.check_email(value) + if result is not None and result.is_risky: + raise ValidationError( + "Please use a permanent email address. Disposable addresses are not allowed.", + code="disposable_email", + ) + + +class SafeEmailField(EmailField): + """An EmailField that automatically rejects disposable addresses.""" + + def validate(self, value: str) -> None: + super().validate(value) + validate_not_disposable(value) diff --git a/examples/fastapi_dependency.py b/examples/fastapi_dependency.py new file mode 100644 index 0000000..281f1dd --- /dev/null +++ b/examples/fastapi_dependency.py @@ -0,0 +1,87 @@ +""" +FastAPI async dependency for email risk checking. + +This example shows how to integrate Syvel into a FastAPI application using +dependency injection. The client is shared across all requests via +``@lru_cache``, ensuring a single ``AsyncClient`` instance and connection pool. + +Setup +----- +:: + + pip install syvel fastapi + +Usage +----- +:: + + from fastapi import FastAPI, HTTPException, Depends + from pydantic import BaseModel + from examples.fastapi_dependency import EmailRisk, check_email_risk + + app = FastAPI() + + class SignupRequest(BaseModel): + email: str + password: str + + @app.post("/signup") + async def signup( + body: SignupRequest, + risk: EmailRisk = Depends(check_email_risk), + ): + if risk is not None and risk.is_risky: + raise HTTPException( + status_code=422, + detail={ + "field": "email", + "message": "Disposable email addresses are not allowed.", + "reason": risk.reason, + }, + ) + # ... create user + return {"status": "ok"} + +Shutdown +-------- +Close the client when the app shuts down:: + + @app.on_event("shutdown") + async def shutdown(): + client = get_syvel_client() + await client.aclose() +""" + +from __future__ import annotations + +import os +from functools import lru_cache +from typing import Annotated + +from fastapi import Depends, Query # type: ignore[import-untyped] + +from syvel import AsyncSyvel, CheckResult + + +@lru_cache(maxsize=1) +def get_syvel_client() -> AsyncSyvel: + """Return a shared ``AsyncSyvel`` client (created once per process).""" + return AsyncSyvel( + api_key=os.environ["SYVEL_API_KEY"], + silent=True, # Fail open: never block a user because of a third-party failure. + ) + + +async def check_email_risk( + email: str = Query(..., description="Email address to check"), + client: AsyncSyvel = Depends(get_syvel_client), +) -> CheckResult | None: + """FastAPI dependency that returns the Syvel risk result for *email*. + + Returns ``None`` when Syvel is unavailable (fail-open). + """ + return await client.check_email(email) + + +# Type alias for use in route signatures. +EmailRisk = Annotated[CheckResult | None, Depends(check_email_risk)] diff --git a/examples/flask_extension.py b/examples/flask_extension.py new file mode 100644 index 0000000..8c609f7 --- /dev/null +++ b/examples/flask_extension.py @@ -0,0 +1,117 @@ +""" +Flask extension for email risk validation using Syvel. + +This example provides a ``SyvelFlask`` extension class and a standalone +``init_syvel`` helper. The client is initialised once at startup and +shared across all requests. + +Setup +----- +:: + + pip install syvel flask + +Usage +----- +Using the extension class:: + + from flask import Flask + from examples.flask_extension import SyvelFlask + + app = Flask(__name__) + app.config["SYVEL_API_KEY"] = os.environ["SYVEL_API_KEY"] + + syvel_ext = SyvelFlask() + syvel_ext.init_app(app) + +Using the helper function:: + + from flask import Flask + from examples.flask_extension import init_syvel + + app = Flask(__name__) + init_syvel(app, protected_endpoints={"register", "signup"}) + +In your view, abort with 422 when the email is risky:: + + from flask import request, abort, g + + @app.route("/register", methods=["POST"]) + def register(): + # g.syvel_result is set by the before_request hook + if g.get("syvel_result") and g.syvel_result.is_risky: + abort(422, "Disposable email addresses are not allowed.") + # ... create user +""" + +from __future__ import annotations + +import os + +from flask import Flask, abort, g, request # type: ignore[import-untyped] + +from syvel import Syvel + + +class SyvelFlask: + """Flask extension that validates email addresses before protected endpoints. + + Attach to a Flask app via :meth:`init_app` (application factory pattern) + or pass the app directly to the constructor. + """ + + def __init__(self, app: Flask | None = None) -> None: + self._client: Syvel | None = None + if app is not None: + self.init_app(app) + + def init_app(self, app: Flask, *, protected_endpoints: set[str] | None = None) -> None: + """Initialise the extension with a Flask application. + + Args: + app: The Flask application instance. + protected_endpoints: Set of endpoint names (view function names) that + trigger email validation. When ``None``, defaults to + ``{"register", "signup"}``. + """ + api_key = app.config.get("SYVEL_API_KEY") or os.environ.get("SYVEL_API_KEY", "") + self._client = Syvel(api_key=api_key, silent=True) + guarded = protected_endpoints or {"register", "signup"} + + @app.before_request + def _check_email() -> None: + if request.endpoint not in guarded: + return + email = (request.form.get("email") or request.json or {}).get("email", "") # type: ignore[union-attr] + if not email: + return + assert self._client is not None + result = self._client.check_email(str(email)) + g.syvel_result = result + if result is not None and result.is_risky: + abort(422, "Disposable email addresses are not allowed.") + + @app.teardown_appcontext + def _close_client(_: BaseException | None) -> None: + pass # httpx.Client is reused across requests; close on app shutdown. + + app.extensions["syvel"] = self + + def close(self) -> None: + """Close the underlying HTTP client.""" + if self._client is not None: + self._client.close() + + +def init_syvel( + app: Flask, + *, + protected_endpoints: set[str] | None = None, +) -> SyvelFlask: + """Convenience helper to attach the Syvel extension to *app*. + + Returns the :class:`SyvelFlask` extension instance. + """ + ext = SyvelFlask() + ext.init_app(app, protected_endpoints=protected_endpoints) + return ext diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..ca75ef1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,85 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "syvel" +dynamic = ["version"] +description = "Disposable email detection SDK for Python" +readme = "README.md" +license = { text = "MIT" } +requires-python = ">=3.10" +keywords = [ + "disposable email", + "email validation", + "email verification", + "spam detection", + "syvel", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Communications :: Email", + "Topic :: Internet", + "Topic :: Software Development :: Libraries :: Python Modules", + "Typing :: Typed", +] +dependencies = ["httpx>=0.27.0"] + +[project.urls] +Homepage = "https://www.syvel.io" +Documentation = "https://www.syvel.io/docs/integrations/python" +Repository = "https://github.com/syvel-io/syvel-python" +"Bug Tracker" = "https://github.com/syvel-io/syvel-python/issues" +Changelog = "https://github.com/syvel-io/syvel-python/blob/main/CHANGELOG.md" + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "respx>=0.21", + "pytest-cov>=5.0", + "mypy>=1.9", + "ruff>=0.4", +] + +[tool.hatch.version] +path = "src/syvel/_version.py" + +[tool.hatch.build.targets.wheel] +packages = ["src/syvel"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +pythonpath = ["src"] +addopts = "--cov=syvel --cov-report=term-missing --cov-fail-under=100" + +[tool.coverage.run] +source = ["src/syvel"] +omit = ["src/syvel/_version.py"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "if TYPE_CHECKING:", +] + +[tool.mypy] +strict = true +python_version = "3.10" +files = ["src/syvel"] + +[tool.ruff] +line-length = 100 + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "ANN"] +ignore = ["ANN401"] diff --git a/src/syvel/__init__.py b/src/syvel/__init__.py new file mode 100644 index 0000000..3a3e0a6 --- /dev/null +++ b/src/syvel/__init__.py @@ -0,0 +1,58 @@ +"""Syvel — disposable email detection SDK for Python. + +Quickstart:: + + from syvel import Syvel + + client = Syvel(api_key="sv_...") + result = client.check_email("user@example.com") + if result and result.is_risky: + raise ValueError("Disposable email not allowed") + +Async usage:: + + from syvel import AsyncSyvel + + async with AsyncSyvel(api_key="sv_...") as client: + result = await client.check_email("user@example.com") + +Fail-open (silent) mode:: + + client = Syvel(api_key="sv_...", silent=True) + result = client.check_email("user@example.com") + # Returns None on any error — never blocks your signup flow +""" + +from syvel._version import __version__ +from syvel.async_client import AsyncSyvel +from syvel.client import Syvel +from syvel.exceptions import ( + SyvelAuthError, + SyvelError, + SyvelForbiddenError, + SyvelRateLimitError, + SyvelTimeoutError, + SyvelValidationError, +) +from syvel.models import ApiKey, CheckResult, LogEntry, LogsPage, StatsPoint, UsageResult + +__all__ = [ + "__version__", + # Clients + "Syvel", + "AsyncSyvel", + # Models + "CheckResult", + "UsageResult", + "LogEntry", + "LogsPage", + "StatsPoint", + "ApiKey", + # Exceptions + "SyvelError", + "SyvelAuthError", + "SyvelForbiddenError", + "SyvelValidationError", + "SyvelRateLimitError", + "SyvelTimeoutError", +] diff --git a/src/syvel/_http.py b/src/syvel/_http.py new file mode 100644 index 0000000..03bfec1 --- /dev/null +++ b/src/syvel/_http.py @@ -0,0 +1,98 @@ +"""Internal HTTP utilities shared by the sync and async clients.""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any +from urllib.parse import quote + +import httpx + +from syvel._version import __version__ +from syvel.exceptions import ( + SyvelAuthError, + SyvelError, + SyvelForbiddenError, + SyvelRateLimitError, + SyvelValidationError, +) + +DEFAULT_BASE_URL = "https://api.syvel.io" +DEFAULT_TIMEOUT = 3.0 + + +def _make_headers(api_key: str) -> dict[str, str]: + return { + "Authorization": f"Bearer {api_key}", + "User-Agent": f"syvel-python/{__version__}", + "Accept": "application/json", + } + + +def _encode_target(target: str) -> str: + """Percent-encode an email or domain for use in a URL path segment. + + Equivalent to JavaScript's ``encodeURIComponent``: encodes every character + that is not unreserved, including ``@``, ``+``, and ``/``. + """ + return quote(target, safe="") + + +def _validate_email(email: str, *, silent: bool) -> bool: + """Check that *email* contains an ``@`` character. + + Returns ``True`` when the email is valid. + Raises :class:`~syvel.exceptions.SyvelError` when invalid and ``silent`` is ``False``. + Returns ``False`` when invalid and ``silent`` is ``True``. + """ + if "@" not in email: + if silent: + return False + raise SyvelError( + f"Invalid email address: {email!r}. Expected a string containing '@'.", + code="INVALID_EMAIL", + ) + return True + + +def _try_json(response: httpx.Response) -> dict[str, Any]: + try: + data = response.json() + if isinstance(data, list): + return {"items": data} + return data if isinstance(data, dict) else {} + except Exception: + return {} + + +def _parse_datetime(value: str) -> datetime: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +def _parse_response(response: httpx.Response) -> dict[str, Any]: + """Map an httpx response to a parsed dict or raise a typed Syvel exception.""" + if response.is_success: + return _try_json(response) + + status = response.status_code + + if status == 401: + raise SyvelAuthError() + + if status == 403: + raise SyvelForbiddenError() + + if status == 422: + body = _try_json(response) + detail = body.get("detail") + raise SyvelValidationError(str(detail) if detail else None) + + if status == 429: + body = _try_json(response) + reset_str = body.get("reset_at") + reset_at = _parse_datetime(str(reset_str)) if reset_str else None + raise SyvelRateLimitError(reset_at) + + body = _try_json(response) + message = body.get("message") or body.get("detail") or f"HTTP {status}" + raise SyvelError(str(message), status_code=status) diff --git a/src/syvel/_version.py b/src/syvel/_version.py new file mode 100644 index 0000000..3dc1f76 --- /dev/null +++ b/src/syvel/_version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/src/syvel/async_client.py b/src/syvel/async_client.py new file mode 100644 index 0000000..bebf07a --- /dev/null +++ b/src/syvel/async_client.py @@ -0,0 +1,263 @@ +"""Asynchronous Syvel client.""" + +from __future__ import annotations + +from types import TracebackType +from typing import TYPE_CHECKING, Any, cast + +import httpx + +from syvel._http import ( + DEFAULT_BASE_URL, + DEFAULT_TIMEOUT, + _encode_target, + _make_headers, + _parse_response, + _validate_email, +) +from syvel.exceptions import SyvelError, SyvelTimeoutError +from syvel.models import ApiKey, CheckResult, LogsPage, StatsPoint, UsageResult + +if TYPE_CHECKING: + pass + + +class AsyncSyvel: + """Asynchronous Syvel client. + + Wraps the Syvel API using ``httpx.AsyncClient`` for async/await usage. + Must be used as an async context manager or closed explicitly with + :meth:`aclose`: + + .. code-block:: python + + async with AsyncSyvel(api_key="sv_...") as client: + result = await client.check_email("user@example.com") + if result and result.is_risky: + raise ValueError("Disposable email not allowed") + + This client is ideal for FastAPI, async Django, Starlette, and any other + ``asyncio``-based framework. + + Args: + api_key: Your Syvel API key (format: ``sv_...``). Generate one at + https://www.syvel.io/dashboard. + base_url: API base URL. Defaults to ``https://api.syvel.io``. + timeout: Request timeout in seconds. Defaults to ``3.0``. + Always fail open when you catch :class:`~syvel.exceptions.SyvelTimeoutError`. + silent: When ``True``, all errors are suppressed and methods return ``None`` + instead of raising. Useful for non-critical checks where you prefer to + accept users rather than block on a third-party failure. + """ + + def __init__( + self, + api_key: str, + *, + base_url: str = DEFAULT_BASE_URL, + timeout: float = DEFAULT_TIMEOUT, + silent: bool = False, + ) -> None: + if not api_key: + raise ValueError( + "api_key is required. Generate one at https://www.syvel.io/dashboard." + ) + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._timeout = timeout + self._silent = silent + self._headers = _make_headers(api_key) + self._http = httpx.AsyncClient( + base_url=self._base_url, + headers=self._headers, + timeout=self._timeout, + follow_redirects=False, + ) + + # ------------------------------------------------------------------------- + # Public API + # ------------------------------------------------------------------------- + + async def check(self, domain: str) -> CheckResult | None: + """Check a domain for disposable-email risk. + + Use :meth:`check_email` when you have a full email address — it enables + local-part analysis (role account detection, random pattern scoring). + + Args: + domain: Bare domain to check (e.g. ``"yopmail.com"``). + + Returns: + A :class:`~syvel.models.CheckResult`, or ``None`` if ``silent=True`` + and an error occurred. + + Raises: + :class:`~syvel.exceptions.SyvelAuthError`: Invalid or missing API key. + :class:`~syvel.exceptions.SyvelRateLimitError`: Monthly quota exceeded. + :class:`~syvel.exceptions.SyvelTimeoutError`: Request timed out. + :class:`~syvel.exceptions.SyvelError`: Any other API error. + """ + data = await self._request("GET", f"/v1/check/{_encode_target(domain)}") + if data is None: + return None + return CheckResult.from_dict(data) + + async def check_email(self, email: str) -> CheckResult | None: + """Check a full email address for disposable-email risk. + + Passes the complete address to the API (including the local part) to + enable role-account detection (e.g. ``admin@``, ``info@``) and random + pattern analysis. + + Args: + email: Full email address to check (e.g. ``"user@example.com"``). + + Returns: + A :class:`~syvel.models.CheckResult`, or ``None`` if ``silent=True`` + and an error occurred. + + Raises: + :class:`~syvel.exceptions.SyvelError`: If *email* does not contain ``@`` + and ``silent=False``. + :class:`~syvel.exceptions.SyvelAuthError`: Invalid or missing API key. + :class:`~syvel.exceptions.SyvelRateLimitError`: Monthly quota exceeded. + :class:`~syvel.exceptions.SyvelTimeoutError`: Request timed out. + """ + if not _validate_email(email, silent=self._silent): + return None + return await self.check(email) + + async def usage(self) -> UsageResult | None: + """Retrieve the current monthly quota and usage statistics. + + Returns: + A :class:`~syvel.models.UsageResult`, or ``None`` if ``silent=True`` + and an error occurred. + """ + data = await self._request("GET", "/v1/usage") + if data is None: + return None + return UsageResult.from_dict(data) + + async def logs( + self, + *, + cursor: str | None = None, + limit: int | None = None, + ) -> LogsPage | None: + """Retrieve a paginated page of request logs. + + Args: + cursor: Pagination cursor from a previous :class:`~syvel.models.LogsPage` + response. Omit to start from the most recent entries. + limit: Maximum number of entries to return per page. + + Returns: + A :class:`~syvel.models.LogsPage`, or ``None`` if ``silent=True`` + and an error occurred. + """ + params: dict[str, str | int] = {} + if cursor is not None: + params["cursor"] = cursor + if limit is not None: + params["limit"] = limit + data = await self._request("GET", "/v1/logs", params=params) + if data is None: + return None + return LogsPage.from_dict(data) + + async def stats(self) -> list[StatsPoint] | None: + """Retrieve time-series analytics for the last 30 days. + + Returns: + A list of :class:`~syvel.models.StatsPoint` objects, or ``None`` + if ``silent=True`` and an error occurred. + """ + data = await self._request("GET", "/v1/stats") + if data is None: + return None + chart = cast(list[Any], data.get("chart") or []) + return [StatsPoint.from_dict(point) for point in chart] + + async def list_keys(self) -> list[ApiKey] | None: + """List all API keys for the authenticated project. + + Returns: + A list of :class:`~syvel.models.ApiKey` objects, or ``None`` + if ``silent=True`` and an error occurred. + """ + data = await self._request("GET", "/v1/keys") + if data is None: + return None + items = cast(list[Any], data if isinstance(data, list) else data.get("items") or []) + return [ApiKey.from_dict(item) for item in items] + + async def create_key(self, name: str) -> ApiKey | None: + """Create a new API key for the authenticated project. + + Args: + name: Human-readable label for the new key. + + Returns: + The created :class:`~syvel.models.ApiKey`, or ``None`` if ``silent=True`` + and an error occurred. + """ + data = await self._request("POST", "/v1/keys", json={"label": name}) + if data is None: + return None + return ApiKey.from_dict(data) + + async def revoke_key(self, key_id: str) -> None: + """Revoke an API key by its identifier. + + Args: + key_id: The unique identifier of the key to revoke. + """ + await self._request("DELETE", f"/v1/keys/{key_id}") + + # ------------------------------------------------------------------------- + # Context manager + # ------------------------------------------------------------------------- + + async def aclose(self) -> None: + """Close the underlying HTTP client and release connections.""" + await self._http.aclose() + + async def __aenter__(self) -> AsyncSyvel: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + await self.aclose() + + # ------------------------------------------------------------------------- + # Internal + # ------------------------------------------------------------------------- + + async def _request( + self, + method: str, + path: str, + *, + json: dict[str, object] | None = None, + params: dict[str, str | int] | None = None, + ) -> dict[str, Any] | None: + try: + response = await self._http.request(method, path, json=json, params=params) + return _parse_response(response) + except SyvelError: + if self._silent: + return None + raise + except httpx.TimeoutException: + if self._silent: + return None + raise SyvelTimeoutError(self._timeout) + except Exception: + if self._silent: + return None + raise diff --git a/src/syvel/client.py b/src/syvel/client.py new file mode 100644 index 0000000..322464f --- /dev/null +++ b/src/syvel/client.py @@ -0,0 +1,260 @@ +"""Synchronous Syvel client.""" + +from __future__ import annotations + +from types import TracebackType +from typing import TYPE_CHECKING, Any, cast + +import httpx + +from syvel._http import ( + DEFAULT_BASE_URL, + DEFAULT_TIMEOUT, + _encode_target, + _make_headers, + _parse_response, + _validate_email, +) +from syvel.exceptions import SyvelError, SyvelTimeoutError +from syvel.models import ApiKey, CheckResult, LogsPage, StatsPoint, UsageResult + +if TYPE_CHECKING: + pass + + +class Syvel: + """Synchronous Syvel client. + + Wraps the Syvel API using ``httpx.Client`` for connection pooling. + Can be used as a context manager to ensure the underlying HTTP client + is properly closed: + + .. code-block:: python + + with Syvel(api_key="sv_...") as client: + result = client.check_email("user@example.com") + if result and result.is_risky: + raise ValueError("Disposable email not allowed") + + Args: + api_key: Your Syvel API key (format: ``sv_...``). Generate one at + https://www.syvel.io/dashboard. + base_url: API base URL. Defaults to ``https://api.syvel.io``. + timeout: Request timeout in seconds. Defaults to ``3.0``. + Always fail open when you catch :class:`~syvel.exceptions.SyvelTimeoutError`. + silent: When ``True``, all errors are suppressed and methods return ``None`` + instead of raising. Useful for non-critical checks where you prefer to + accept users rather than block on a third-party failure. + """ + + def __init__( + self, + api_key: str, + *, + base_url: str = DEFAULT_BASE_URL, + timeout: float = DEFAULT_TIMEOUT, + silent: bool = False, + ) -> None: + if not api_key: + raise ValueError( + "api_key is required. Generate one at https://www.syvel.io/dashboard." + ) + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._timeout = timeout + self._silent = silent + self._headers = _make_headers(api_key) + self._http = httpx.Client( + base_url=self._base_url, + headers=self._headers, + timeout=self._timeout, + follow_redirects=False, + ) + + # ------------------------------------------------------------------------- + # Public API + # ------------------------------------------------------------------------- + + def check(self, domain: str) -> CheckResult | None: + """Check a domain for disposable-email risk. + + Use :meth:`check_email` when you have a full email address — it enables + local-part analysis (role account detection, random pattern scoring). + + Args: + domain: Bare domain to check (e.g. ``"yopmail.com"``). + + Returns: + A :class:`~syvel.models.CheckResult`, or ``None`` if ``silent=True`` + and an error occurred. + + Raises: + :class:`~syvel.exceptions.SyvelAuthError`: Invalid or missing API key. + :class:`~syvel.exceptions.SyvelRateLimitError`: Monthly quota exceeded. + :class:`~syvel.exceptions.SyvelTimeoutError`: Request timed out. + :class:`~syvel.exceptions.SyvelError`: Any other API error. + """ + data = self._request("GET", f"/v1/check/{_encode_target(domain)}") + if data is None: + return None + return CheckResult.from_dict(data) + + def check_email(self, email: str) -> CheckResult | None: + """Check a full email address for disposable-email risk. + + Passes the complete address to the API (including the local part) to + enable role-account detection (e.g. ``admin@``, ``info@``) and random + pattern analysis. + + Args: + email: Full email address to check (e.g. ``"user@example.com"``). + + Returns: + A :class:`~syvel.models.CheckResult`, or ``None`` if ``silent=True`` + and an error occurred. + + Raises: + :class:`~syvel.exceptions.SyvelError`: If *email* does not contain ``@`` + and ``silent=False``. + :class:`~syvel.exceptions.SyvelAuthError`: Invalid or missing API key. + :class:`~syvel.exceptions.SyvelRateLimitError`: Monthly quota exceeded. + :class:`~syvel.exceptions.SyvelTimeoutError`: Request timed out. + """ + if not _validate_email(email, silent=self._silent): + return None + return self.check(email) + + def usage(self) -> UsageResult | None: + """Retrieve the current monthly quota and usage statistics. + + Returns: + A :class:`~syvel.models.UsageResult`, or ``None`` if ``silent=True`` + and an error occurred. + """ + data = self._request("GET", "/v1/usage") + if data is None: + return None + return UsageResult.from_dict(data) + + def logs( + self, + *, + cursor: str | None = None, + limit: int | None = None, + ) -> LogsPage | None: + """Retrieve a paginated page of request logs. + + Args: + cursor: Pagination cursor from a previous :class:`~syvel.models.LogsPage` + response. Omit to start from the most recent entries. + limit: Maximum number of entries to return per page. + + Returns: + A :class:`~syvel.models.LogsPage`, or ``None`` if ``silent=True`` + and an error occurred. + """ + params: dict[str, str | int] = {} + if cursor is not None: + params["cursor"] = cursor + if limit is not None: + params["limit"] = limit + data = self._request("GET", "/v1/logs", params=params) + if data is None: + return None + return LogsPage.from_dict(data) + + def stats(self) -> list[StatsPoint] | None: + """Retrieve time-series analytics for the last 30 days. + + Returns: + A list of :class:`~syvel.models.StatsPoint` objects, or ``None`` + if ``silent=True`` and an error occurred. + """ + data = self._request("GET", "/v1/stats") + if data is None: + return None + chart = cast(list[Any], data.get("chart") or []) + return [StatsPoint.from_dict(point) for point in chart] + + def list_keys(self) -> list[ApiKey] | None: + """List all API keys for the authenticated project. + + Returns: + A list of :class:`~syvel.models.ApiKey` objects, or ``None`` + if ``silent=True`` and an error occurred. + """ + data = self._request("GET", "/v1/keys") + if data is None: + return None + items = cast(list[Any], data if isinstance(data, list) else data.get("items") or []) + return [ApiKey.from_dict(item) for item in items] + + def create_key(self, name: str) -> ApiKey | None: + """Create a new API key for the authenticated project. + + Args: + name: Human-readable label for the new key. + + Returns: + The created :class:`~syvel.models.ApiKey`, or ``None`` if ``silent=True`` + and an error occurred. + """ + data = self._request("POST", "/v1/keys", json={"label": name}) + if data is None: + return None + return ApiKey.from_dict(data) + + def revoke_key(self, key_id: str) -> None: + """Revoke an API key by its identifier. + + Args: + key_id: The unique identifier of the key to revoke. + """ + self._request("DELETE", f"/v1/keys/{key_id}") + + # ------------------------------------------------------------------------- + # Context manager + # ------------------------------------------------------------------------- + + def close(self) -> None: + """Close the underlying HTTP client and release connections.""" + self._http.close() + + def __enter__(self) -> Syvel: + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + self.close() + + # ------------------------------------------------------------------------- + # Internal + # ------------------------------------------------------------------------- + + def _request( + self, + method: str, + path: str, + *, + json: dict[str, object] | None = None, + params: dict[str, str | int] | None = None, + ) -> dict[str, Any] | None: + try: + response = self._http.request(method, path, json=json, params=params) + return _parse_response(response) + except SyvelError: + if self._silent: + return None + raise + except httpx.TimeoutException: + if self._silent: + return None + raise SyvelTimeoutError(self._timeout) + except Exception: + if self._silent: + return None + raise diff --git a/src/syvel/exceptions.py b/src/syvel/exceptions.py new file mode 100644 index 0000000..93162b1 --- /dev/null +++ b/src/syvel/exceptions.py @@ -0,0 +1,93 @@ +"""Syvel exception hierarchy.""" + +from __future__ import annotations + +from datetime import datetime + + +class SyvelError(Exception): + """Base class for all Syvel SDK errors. + + Attributes: + status_code: HTTP status code, if the error originated from an API response. + code: Machine-readable error code string. + """ + + def __init__( + self, + message: str, + *, + status_code: int | None = None, + code: str | None = None, + ) -> None: + super().__init__(message) + self.status_code = status_code + self.code = code + + +class SyvelAuthError(SyvelError): + """Raised when the API key is invalid or missing (HTTP 401).""" + + def __init__(self) -> None: + super().__init__( + "Invalid or missing Syvel API key. " + "Check your api_key and visit https://www.syvel.io/dashboard to generate one.", + status_code=401, + code="UNAUTHORIZED", + ) + + +class SyvelForbiddenError(SyvelError): + """Raised when the request origin is not authorised for this API key (HTTP 403).""" + + def __init__(self) -> None: + super().__init__( + "This origin is not authorised for this API key. " + "Add it in your Syvel dashboard under project settings.", + status_code=403, + code="FORBIDDEN", + ) + + +class SyvelValidationError(SyvelError): + """Raised when the email or domain format is invalid (HTTP 422).""" + + def __init__(self, detail: str | None = None) -> None: + super().__init__( + detail or "Invalid email or domain format.", + status_code=422, + code="VALIDATION_ERROR", + ) + + +class SyvelRateLimitError(SyvelError): + """Raised when the monthly request quota has been exceeded (HTTP 429). + + Attributes: + reset_at: UTC datetime at which the quota resets, or ``None`` if not provided + by the API. + """ + + def __init__(self, reset_at: datetime | None = None) -> None: + super().__init__( + "Syvel monthly quota exceeded. " + "Upgrade your plan at https://www.syvel.io/dashboard.", + status_code=429, + code="RATE_LIMIT_EXCEEDED", + ) + self.reset_at = reset_at + + +class SyvelTimeoutError(SyvelError): + """Raised when a request exceeds the configured timeout. + + Always fail open: catch this error and allow the user through rather than + blocking on a third-party service failure. + """ + + def __init__(self, timeout_s: float) -> None: + super().__init__( + f"Syvel request timed out after {timeout_s}s.", + code="TIMEOUT", + ) + self.timeout_s = timeout_s diff --git a/src/syvel/models.py b/src/syvel/models.py new file mode 100644 index 0000000..518aec7 --- /dev/null +++ b/src/syvel/models.py @@ -0,0 +1,197 @@ +"""Syvel response models.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Literal + + +def _parse_dt(value: str) -> datetime: + """Parse an ISO 8601 datetime string, handling both 'Z' and '+00:00' suffixes.""" + return datetime.fromisoformat(value.replace("Z", "+00:00")) + + +@dataclass(frozen=True, slots=True) +class CheckResult: + """Result of an email or domain risk check. + + All fields are populated for every successful response. The ``email`` field + contains a masked version of the address (local part partially redacted). + + Attributes: + email: Masked email address (e.g. ``"a9****e5@yopmail.com"``). + is_risky: ``True`` when the risk score is ≥ 65. Use this as your primary + signal to decide whether to accept or reject an address. + risk_score: Risk score from 0 (safe) to 100 (confirmed disposable). + reason: Human-readable risk classification: + ``"safe"`` | ``"disposable"`` | ``"undeliverable"`` | ``"role_account"``. + deliverability_score: Likelihood (0–100) that mail sent to this address + will be delivered. + did_you_mean: A suggested correction when a common typo is detected + (e.g. ``"hotmail.com"`` for ``"hotmial.com"``), or ``None``. + is_free_provider: ``True`` for consumer webmail services (Gmail, Yahoo, etc.). + is_corporate_email: ``True`` when the domain belongs to a business. + is_alias_email: ``True`` for privacy-relay services (SimpleLogin, AnonAddy, etc.). + mx_provider_label: Human-readable name of the mail provider, or ``None``. + """ + + email: str + is_risky: bool + risk_score: int + reason: Literal["safe", "disposable", "undeliverable", "role_account"] + deliverability_score: int + did_you_mean: str | None + is_free_provider: bool + is_corporate_email: bool + is_alias_email: bool + mx_provider_label: str | None + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> CheckResult: + return cls( + email=str(data["email"]), + is_risky=bool(data["is_risky"]), + risk_score=int(data["risk_score"]), + reason=data["reason"], + deliverability_score=int(data["deliverability_score"]), + did_you_mean=data.get("did_you_mean") or None, + is_free_provider=bool(data.get("is_free_provider", False)), + is_corporate_email=bool(data.get("is_corporate_email", False)), + is_alias_email=bool(data.get("is_alias_email", False)), + mx_provider_label=data.get("mx_provider_label") or None, + ) + + +@dataclass(frozen=True, slots=True) +class UsageResult: + """Current quota usage for the authenticated project. + + Attributes: + used: Number of checks performed this month. + limit: Maximum checks allowed per month on the current plan. + reset_at: UTC datetime when the monthly quota resets. + plan: Current plan name (e.g. ``"free"``, ``"starter"``, ``"pro"``). + """ + + used: int + limit: int + reset_at: datetime + plan: str + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> UsageResult: + return cls( + used=int(data["month_count"]), + limit=int(data["monthly_limit"]), + reset_at=_parse_dt(str(data["reset_at"])) if data.get("reset_at") else datetime.min, + plan=str(data.get("plan", "")), + ) + + +@dataclass(frozen=True, slots=True) +class LogEntry: + """A single entry from the request log. + + Attributes: + id: Unique log entry identifier. + target: The checked email or domain (may be masked). + is_risky: Whether the check returned a risky result. + risk_score: Risk score at the time of the check. + reason: Risk classification at the time of the check. + created_at: UTC datetime when the check was performed. + """ + + id: str + target: str + is_risky: bool + risk_score: int + reason: str + created_at: datetime + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> LogEntry: + score = int(data.get("risk_score", 0)) + return cls( + id=str(data["id"]), + target=str(data.get("email_domain") or data.get("domain_hash") or ""), + is_risky=score >= 65, + risk_score=score, + reason=str(data.get("reason", "")), + created_at=_parse_dt(str(data["created_at"])), + ) + + +@dataclass(frozen=True, slots=True) +class LogsPage: + """A paginated page of log entries. + + Attributes: + items: Log entries for this page. + next_cursor: Cursor value to fetch the next page, or ``None`` if this is + the last page. + """ + + items: tuple[LogEntry, ...] + next_cursor: str | None + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> LogsPage: + raw_items: list[Any] = data.get("items") or [] + return cls( + items=tuple(LogEntry.from_dict(item) for item in raw_items), + next_cursor=data.get("next_cursor") or None, + ) + + +@dataclass(frozen=True, slots=True) +class StatsPoint: + """A single data point from the time-series analytics. + + Attributes: + date: Date string in ``YYYY-MM-DD`` format. + total: Total number of checks on this date. + risky: Number of risky checks on this date. + """ + + date: str + total: int + risky: int + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> StatsPoint: + return cls( + date=str(data.get("date", "")), + total=int(data.get("total_requests", 0)), + risky=int(data.get("blocked_count", 0)), + ) + + +@dataclass(frozen=True, slots=True) +class ApiKey: + """An API key associated with a project. + + Attributes: + id: Unique key identifier. + name: Human-readable label for the key. + prefix: Key prefix shown in the dashboard (e.g. ``"sv_"``). + created_at: UTC datetime when the key was created. + last_used_at: UTC datetime of the most recent use, or ``None`` if never used. + """ + + id: str + name: str + prefix: str + created_at: datetime + last_used_at: datetime | None + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ApiKey: + last_used = data.get("last_used_at") + return cls( + id=str(data["id"]), + name=str(data.get("label") or ""), + prefix=str(data.get("key_prefix") or "sv_"), + created_at=_parse_dt(str(data["created_at"])), + last_used_at=_parse_dt(str(last_used)) if last_used else None, + ) diff --git a/src/syvel/py.typed b/src/syvel/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..57a337a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,39 @@ +"""Shared test fixtures.""" + +import pytest + +SAMPLE_RESULT = { + "email": "a9****e5@yopmail.com", + "is_risky": True, + "risk_score": 100, + "reason": "disposable", + "deliverability_score": 0, + "did_you_mean": None, + "is_free_provider": False, + "is_corporate_email": False, + "is_alias_email": False, + "mx_provider_label": "Yopmail", +} + +SAFE_RESULT = { + "email": "us****r@gmail.com", + "is_risky": False, + "risk_score": 5, + "reason": "safe", + "deliverability_score": 95, + "did_you_mean": None, + "is_free_provider": True, + "is_corporate_email": False, + "is_alias_email": False, + "mx_provider_label": "Google", +} + + +@pytest.fixture +def api_key() -> str: + return "sv_test_key" + + +@pytest.fixture +def base_url() -> str: + return "https://api.syvel.io" diff --git a/tests/test_async_client.py b/tests/test_async_client.py new file mode 100644 index 0000000..3a1d2b7 --- /dev/null +++ b/tests/test_async_client.py @@ -0,0 +1,473 @@ +"""Unit tests for the asynchronous AsyncSyvel client.""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import httpx +import pytest +import respx + +from syvel import AsyncSyvel +from syvel.exceptions import ( + SyvelAuthError, + SyvelError, + SyvelForbiddenError, + SyvelRateLimitError, + SyvelTimeoutError, + SyvelValidationError, +) +from syvel.models import ApiKey, CheckResult, LogsPage, StatsPoint, UsageResult +from tests.conftest import SAFE_RESULT, SAMPLE_RESULT + +# --------------------------------------------------------------------------- +# Constructor +# --------------------------------------------------------------------------- + + +class TestConstructor: + async def test_raises_when_api_key_is_empty(self) -> None: + with pytest.raises(ValueError, match="api_key is required"): + AsyncSyvel(api_key="") + + async def test_strips_trailing_slash_from_base_url(self) -> None: + client = AsyncSyvel(api_key="sv_test", base_url="https://api.syvel.io/") + assert client._base_url == "https://api.syvel.io" + await client.aclose() + + async def test_defaults(self) -> None: + client = AsyncSyvel(api_key="sv_test") + assert client._timeout == 3.0 + assert client._silent is False + await client.aclose() + + +# --------------------------------------------------------------------------- +# check() +# --------------------------------------------------------------------------- + + +class TestCheck: + async def test_returns_check_result(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(200, json=SAMPLE_RESULT) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.check("yopmail.com") + + assert isinstance(result, CheckResult) + assert result.is_risky is True + assert result.risk_score == 100 + assert result.reason == "disposable" + + async def test_sends_authorization_header(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/gmail.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + async with AsyncSyvel(api_key=api_key) as client: + await client.check("gmail.com") + + assert route.called + request = route.calls[0].request + assert request.headers["authorization"] == f"Bearer {api_key}" + + async def test_sends_user_agent_header(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/gmail.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + async with AsyncSyvel(api_key=api_key) as client: + await client.check("gmail.com") + + request = route.calls[0].request + assert request.headers["user-agent"].startswith("syvel-python/") + + async def test_custom_base_url(self) -> None: + with respx.mock: + respx.get("https://custom.example.com/v1/check/yopmail.com").mock( + return_value=httpx.Response(200, json=SAMPLE_RESULT) + ) + async with AsyncSyvel( + api_key="sv_test", base_url="https://custom.example.com" + ) as client: + result = await client.check("yopmail.com") + + assert result is not None + assert result.is_risky is True + + async def test_url_encodes_special_characters( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/test%2Bdomain.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + async with AsyncSyvel(api_key=api_key) as client: + await client.check("test+domain.com") + + assert route.called + + +# --------------------------------------------------------------------------- +# check_email() +# --------------------------------------------------------------------------- + + +class TestCheckEmail: + async def test_url_encodes_at_symbol(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/user%40yopmail.com").mock( + return_value=httpx.Response(200, json=SAMPLE_RESULT) + ) + async with AsyncSyvel(api_key=api_key) as client: + await client.check_email("user@yopmail.com") + + assert route.called + + async def test_raises_on_missing_at_symbol(self, api_key: str) -> None: + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelError, match="@"): + await client.check_email("notanemail") + + async def test_silent_mode_returns_none_on_invalid_email( + self, api_key: str + ) -> None: + async with AsyncSyvel(api_key=api_key, silent=True) as client: + result = await client.check_email("notanemail") + assert result is None + + +# --------------------------------------------------------------------------- +# Error handling +# --------------------------------------------------------------------------- + + +class TestErrorHandling: + async def test_401_raises_auth_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(401, json={"message": "Unauthorized"}) + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelAuthError) as exc_info: + await client.check("yopmail.com") + + assert exc_info.value.status_code == 401 + assert exc_info.value.code == "UNAUTHORIZED" + + async def test_403_raises_forbidden_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(403, json={"message": "Forbidden"}) + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelForbiddenError) as exc_info: + await client.check("yopmail.com") + + assert exc_info.value.status_code == 403 + + async def test_422_raises_validation_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/bad").mock( + return_value=httpx.Response(422, json={"detail": "Invalid domain format"}) + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelValidationError) as exc_info: + await client.check("bad") + + assert exc_info.value.status_code == 422 + assert "Invalid domain format" in str(exc_info.value) + + async def test_429_raises_rate_limit_error_with_reset_at( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response( + 429, json={"reset_at": "2026-05-01T00:00:00Z"} + ) + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelRateLimitError) as exc_info: + await client.check("yopmail.com") + + assert exc_info.value.reset_at == datetime(2026, 5, 1, tzinfo=timezone.utc) + + async def test_429_raises_rate_limit_error_without_reset_at( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(429, json={}) + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelRateLimitError) as exc_info: + await client.check("yopmail.com") + + assert exc_info.value.reset_at is None + + async def test_500_raises_syvel_error_with_message( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(500, json={"message": "Internal server error"}) + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelError) as exc_info: + await client.check("yopmail.com") + + assert exc_info.value.status_code == 500 + assert "Internal server error" in str(exc_info.value) + + async def test_500_with_invalid_json_raises_syvel_error( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(500, content=b"not json") + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(SyvelError) as exc_info: + await client.check("yopmail.com") + + assert "HTTP 500" in str(exc_info.value) + + async def test_timeout_raises_timeout_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.TimeoutException("timed out") + ) + async with AsyncSyvel(api_key=api_key, timeout=1.0) as client: + with pytest.raises(SyvelTimeoutError) as exc_info: + await client.check("yopmail.com") + + assert exc_info.value.timeout_s == 1.0 + + async def test_network_error_raises_in_non_silent_mode( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.NetworkError("connection refused") + ) + async with AsyncSyvel(api_key=api_key) as client: + with pytest.raises(httpx.NetworkError): + await client.check("yopmail.com") + + +# --------------------------------------------------------------------------- +# Silent mode +# --------------------------------------------------------------------------- + + +class TestSilentMode: + async def test_network_error_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.NetworkError("connection refused") + ) + async with AsyncSyvel(api_key=api_key, silent=True) as client: + result = await client.check("yopmail.com") + + assert result is None + + async def test_401_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(401, json={}) + ) + async with AsyncSyvel(api_key=api_key, silent=True) as client: + result = await client.check("yopmail.com") + + assert result is None + + async def test_429_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(429, json={}) + ) + async with AsyncSyvel(api_key=api_key, silent=True) as client: + result = await client.check("yopmail.com") + + assert result is None + + async def test_timeout_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.TimeoutException("timed out") + ) + async with AsyncSyvel(api_key=api_key, silent=True) as client: + result = await client.check("yopmail.com") + + assert result is None + + +# --------------------------------------------------------------------------- +# Context manager +# --------------------------------------------------------------------------- + + +class TestContextManager: + async def test_async_context_manager_closes_client( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/gmail.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.check("gmail.com") + + assert result is not None + assert result.is_risky is False + + +# --------------------------------------------------------------------------- +# P1 methods +# --------------------------------------------------------------------------- + + +class TestP1Methods: + async def test_usage_returns_usage_result(self, api_key: str, base_url: str) -> None: + payload = { + "plan": "pro", + "monthly_limit": 10000, + "month_count": 250, + "remaining_month": 9750, + "reset_at": "2026-05-01T00:00:00Z", + } + with respx.mock: + respx.get(f"{base_url}/v1/usage").mock( + return_value=httpx.Response(200, json=payload) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.usage() + + assert isinstance(result, UsageResult) + assert result.used == 250 + assert result.limit == 10000 + assert result.plan == "pro" + + async def test_logs_returns_logs_page(self, api_key: str, base_url: str) -> None: + payload = { + "items": [ + { + "id": "log_1", + "email_domain": "yopmail.com", + "risk_score": 100, + "reason": "disposable", + "created_at": "2026-04-01T10:00:00Z", + } + ], + "next_cursor": None, + } + with respx.mock: + respx.get(f"{base_url}/v1/logs").mock( + return_value=httpx.Response(200, json=payload) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.logs() + + assert isinstance(result, LogsPage) + assert len(result.items) == 1 + assert result.next_cursor is None + + async def test_logs_passes_cursor_and_limit(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/logs").mock( + return_value=httpx.Response(200, json={"items": [], "next_cursor": None}) + ) + async with AsyncSyvel(api_key=api_key) as client: + await client.logs(cursor="abc", limit=10) + + request = route.calls[0].request + assert "cursor=abc" in str(request.url) + assert "limit=10" in str(request.url) + + async def test_stats_returns_list_of_stats_points( + self, api_key: str, base_url: str + ) -> None: + payload = { + "chart": [ + {"date": "2026-04-01", "total_requests": 50, "blocked_count": 8}, + ] + } + with respx.mock: + respx.get(f"{base_url}/v1/stats").mock( + return_value=httpx.Response(200, json=payload) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.stats() + + assert result is not None + assert isinstance(result[0], StatsPoint) + assert result[0].total == 50 + + async def test_list_keys_returns_api_keys(self, api_key: str, base_url: str) -> None: + payload = [ + { + "id": "key_1", + "label": "Production", + "key_prefix": "sv_", + "created_at": "2026-01-01T00:00:00Z", + "last_used_at": "2026-04-01T00:00:00Z", + } + ] + with respx.mock: + respx.get(f"{base_url}/v1/keys").mock( + return_value=httpx.Response(200, json=payload) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.list_keys() + + assert result is not None + assert isinstance(result[0], ApiKey) + assert result[0].last_used_at is not None + + async def test_create_key_sends_post(self, api_key: str, base_url: str) -> None: + payload = { + "id": "key_3", + "label": "CI", + "key_prefix": "sv_", + "created_at": "2026-04-01T00:00:00Z", + "last_used_at": None, + } + with respx.mock: + route = respx.post(f"{base_url}/v1/keys").mock( + return_value=httpx.Response(200, json=payload) + ) + async with AsyncSyvel(api_key=api_key) as client: + result = await client.create_key("CI") + + assert result is not None + assert result.name == "CI" + assert route.called + + async def test_revoke_key_sends_delete(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.delete(f"{base_url}/v1/keys/key_1").mock( + return_value=httpx.Response(204) + ) + async with AsyncSyvel(api_key=api_key) as client: + await client.revoke_key("key_1") + + assert route.called + + async def test_p1_methods_return_none_in_silent_mode_on_error( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/usage").mock(return_value=httpx.Response(401, json={})) + respx.get(f"{base_url}/v1/logs").mock(return_value=httpx.Response(401, json={})) + respx.get(f"{base_url}/v1/stats").mock(return_value=httpx.Response(401, json={})) + respx.get(f"{base_url}/v1/keys").mock(return_value=httpx.Response(401, json={})) + respx.post(f"{base_url}/v1/keys").mock(return_value=httpx.Response(401, json={})) + async with AsyncSyvel(api_key=api_key, silent=True) as client: + assert await client.usage() is None + assert await client.logs() is None + assert await client.stats() is None + assert await client.list_keys() is None + assert await client.create_key("x") is None diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..123d58b --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,471 @@ +"""Unit tests for the synchronous Syvel client.""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import httpx +import pytest +import respx + +from syvel import Syvel +from syvel.exceptions import ( + SyvelAuthError, + SyvelError, + SyvelForbiddenError, + SyvelRateLimitError, + SyvelTimeoutError, + SyvelValidationError, +) +from syvel.models import ApiKey, CheckResult, LogsPage, StatsPoint, UsageResult +from tests.conftest import SAFE_RESULT, SAMPLE_RESULT + +# --------------------------------------------------------------------------- +# Constructor +# --------------------------------------------------------------------------- + + +class TestConstructor: + def test_raises_when_api_key_is_empty(self) -> None: + with pytest.raises(ValueError, match="api_key is required"): + Syvel(api_key="") + + def test_strips_trailing_slash_from_base_url(self) -> None: + client = Syvel(api_key="sv_test", base_url="https://api.syvel.io/") + assert client._base_url == "https://api.syvel.io" + client.close() + + def test_defaults(self) -> None: + client = Syvel(api_key="sv_test") + assert client._timeout == 3.0 + assert client._silent is False + client.close() + + +# --------------------------------------------------------------------------- +# check() +# --------------------------------------------------------------------------- + + +class TestCheck: + def test_returns_check_result(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(200, json=SAMPLE_RESULT) + ) + client = Syvel(api_key=api_key) + result = client.check("yopmail.com") + + assert isinstance(result, CheckResult) + assert result.is_risky is True + assert result.risk_score == 100 + assert result.reason == "disposable" + assert result.email == "a9****e5@yopmail.com" + + def test_sends_authorization_header(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/gmail.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + client = Syvel(api_key=api_key) + client.check("gmail.com") + + assert route.called + request = route.calls[0].request + assert request.headers["authorization"] == f"Bearer {api_key}" + + def test_sends_user_agent_header(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/gmail.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + client = Syvel(api_key=api_key) + client.check("gmail.com") + + request = route.calls[0].request + assert request.headers["user-agent"].startswith("syvel-python/") + + def test_custom_base_url(self) -> None: + with respx.mock: + respx.get("https://custom.example.com/v1/check/yopmail.com").mock( + return_value=httpx.Response(200, json=SAMPLE_RESULT) + ) + client = Syvel(api_key="sv_test", base_url="https://custom.example.com") + result = client.check("yopmail.com") + + assert result is not None + assert result.is_risky is True + + def test_url_encodes_special_characters(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/test%2Bdomain.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + client = Syvel(api_key=api_key) + client.check("test+domain.com") + + assert route.called + + +# --------------------------------------------------------------------------- +# check_email() +# --------------------------------------------------------------------------- + + +class TestCheckEmail: + def test_url_encodes_at_symbol(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/check/user%40yopmail.com").mock( + return_value=httpx.Response(200, json=SAMPLE_RESULT) + ) + client = Syvel(api_key=api_key) + client.check_email("user@yopmail.com") + + assert route.called + + def test_raises_on_missing_at_symbol(self, api_key: str) -> None: + client = Syvel(api_key=api_key) + with pytest.raises(SyvelError, match="@"): + client.check_email("notanemail") + + def test_silent_mode_returns_none_on_invalid_email(self, api_key: str) -> None: + client = Syvel(api_key=api_key, silent=True) + result = client.check_email("notanemail") + assert result is None + + +# --------------------------------------------------------------------------- +# Error handling +# --------------------------------------------------------------------------- + + +class TestErrorHandling: + def test_401_raises_auth_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(401, json={"message": "Unauthorized"}) + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelAuthError) as exc_info: + client.check("yopmail.com") + + assert exc_info.value.status_code == 401 + assert exc_info.value.code == "UNAUTHORIZED" + + def test_403_raises_forbidden_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(403, json={"message": "Forbidden"}) + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelForbiddenError) as exc_info: + client.check("yopmail.com") + + assert exc_info.value.status_code == 403 + + def test_422_raises_validation_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/bad").mock( + return_value=httpx.Response(422, json={"detail": "Invalid domain format"}) + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelValidationError) as exc_info: + client.check("bad") + + assert exc_info.value.status_code == 422 + assert "Invalid domain format" in str(exc_info.value) + + def test_429_raises_rate_limit_error_with_reset_at( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response( + 429, json={"reset_at": "2026-05-01T00:00:00Z"} + ) + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelRateLimitError) as exc_info: + client.check("yopmail.com") + + assert exc_info.value.status_code == 429 + assert exc_info.value.reset_at == datetime(2026, 5, 1, tzinfo=timezone.utc) + + def test_429_raises_rate_limit_error_without_reset_at( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(429, json={}) + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelRateLimitError) as exc_info: + client.check("yopmail.com") + + assert exc_info.value.reset_at is None + + def test_500_raises_syvel_error_with_message(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(500, json={"message": "Internal server error"}) + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelError) as exc_info: + client.check("yopmail.com") + + assert exc_info.value.status_code == 500 + assert "Internal server error" in str(exc_info.value) + + def test_500_with_invalid_json_raises_syvel_error( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(500, content=b"not json") + ) + client = Syvel(api_key=api_key) + with pytest.raises(SyvelError) as exc_info: + client.check("yopmail.com") + + assert "HTTP 500" in str(exc_info.value) + + def test_timeout_raises_timeout_error(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.TimeoutException("timed out") + ) + client = Syvel(api_key=api_key, timeout=1.0) + with pytest.raises(SyvelTimeoutError) as exc_info: + client.check("yopmail.com") + + assert exc_info.value.timeout_s == 1.0 + assert exc_info.value.code == "TIMEOUT" + + def test_network_error_raises_in_non_silent_mode( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.NetworkError("connection refused") + ) + client = Syvel(api_key=api_key) + with pytest.raises(httpx.NetworkError): + client.check("yopmail.com") + + +# --------------------------------------------------------------------------- +# Silent mode +# --------------------------------------------------------------------------- + + +class TestSilentMode: + def test_network_error_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.NetworkError("connection refused") + ) + client = Syvel(api_key=api_key, silent=True) + result = client.check("yopmail.com") + + assert result is None + + def test_401_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(401, json={}) + ) + client = Syvel(api_key=api_key, silent=True) + result = client.check("yopmail.com") + + assert result is None + + def test_429_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + return_value=httpx.Response(429, json={}) + ) + client = Syvel(api_key=api_key, silent=True) + result = client.check("yopmail.com") + + assert result is None + + def test_timeout_returns_none(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/yopmail.com").mock( + side_effect=httpx.TimeoutException("timed out") + ) + client = Syvel(api_key=api_key, silent=True) + result = client.check("yopmail.com") + + assert result is None + + +# --------------------------------------------------------------------------- +# Context manager +# --------------------------------------------------------------------------- + + +class TestContextManager: + def test_context_manager_closes_client(self, api_key: str, base_url: str) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/check/gmail.com").mock( + return_value=httpx.Response(200, json=SAFE_RESULT) + ) + with Syvel(api_key=api_key) as client: + result = client.check("gmail.com") + + assert result is not None + assert result.is_risky is False + + +# --------------------------------------------------------------------------- +# P1 methods +# --------------------------------------------------------------------------- + + +class TestP1Methods: + def test_usage_returns_usage_result(self, api_key: str, base_url: str) -> None: + payload = { + "plan": "starter", + "monthly_limit": 1000, + "month_count": 42, + "remaining_month": 958, + "reset_at": "2026-05-01T00:00:00Z", + } + with respx.mock: + respx.get(f"{base_url}/v1/usage").mock( + return_value=httpx.Response(200, json=payload) + ) + client = Syvel(api_key=api_key) + result = client.usage() + + assert isinstance(result, UsageResult) + assert result.used == 42 + assert result.limit == 1000 + assert result.plan == "starter" + + def test_logs_returns_logs_page(self, api_key: str, base_url: str) -> None: + payload = { + "items": [ + { + "id": "abc123", + "email_domain": "yopmail.com", + "risk_score": 100, + "reason": "disposable", + "created_at": "2026-04-01T12:00:00Z", + } + ], + "next_cursor": "cursor_xyz", + "total": 1, + } + with respx.mock: + respx.get(f"{base_url}/v1/logs").mock( + return_value=httpx.Response(200, json=payload) + ) + client = Syvel(api_key=api_key) + result = client.logs() + + assert isinstance(result, LogsPage) + assert len(result.items) == 1 + assert result.items[0].id == "abc123" + assert result.next_cursor == "cursor_xyz" + + def test_logs_passes_cursor_and_limit(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.get(f"{base_url}/v1/logs").mock( + return_value=httpx.Response(200, json={"items": [], "next_cursor": None}) + ) + client = Syvel(api_key=api_key) + client.logs(cursor="abc", limit=10) + + request = route.calls[0].request + assert "cursor=abc" in str(request.url) + assert "limit=10" in str(request.url) + + def test_stats_returns_list_of_stats_points(self, api_key: str, base_url: str) -> None: + payload = { + "chart": [ + {"date": "2026-04-01", "total_requests": 100, "blocked_count": 15}, + {"date": "2026-04-02", "total_requests": 80, "blocked_count": 10}, + ] + } + with respx.mock: + respx.get(f"{base_url}/v1/stats").mock( + return_value=httpx.Response(200, json=payload) + ) + client = Syvel(api_key=api_key) + result = client.stats() + + assert result is not None + assert len(result) == 2 + assert isinstance(result[0], StatsPoint) + assert result[0].date == "2026-04-01" + assert result[0].total == 100 + assert result[0].risky == 15 + + def test_list_keys_returns_api_keys(self, api_key: str, base_url: str) -> None: + payload = [ + { + "id": "key_1", + "label": "Production", + "key_prefix": "sv_", + "created_at": "2026-01-01T00:00:00Z", + "last_used_at": None, + } + ] + with respx.mock: + respx.get(f"{base_url}/v1/keys").mock( + return_value=httpx.Response(200, json=payload) + ) + client = Syvel(api_key=api_key) + result = client.list_keys() + + assert result is not None + assert len(result) == 1 + assert isinstance(result[0], ApiKey) + assert result[0].name == "Production" + + def test_create_key_sends_post(self, api_key: str, base_url: str) -> None: + payload = { + "id": "key_2", + "label": "Staging", + "key_prefix": "sv_", + "created_at": "2026-04-01T00:00:00Z", + "last_used_at": None, + } + with respx.mock: + route = respx.post(f"{base_url}/v1/keys").mock( + return_value=httpx.Response(200, json=payload) + ) + client = Syvel(api_key=api_key) + result = client.create_key("Staging") + + assert result is not None + assert result.name == "Staging" + assert route.called + + def test_revoke_key_sends_delete(self, api_key: str, base_url: str) -> None: + with respx.mock: + route = respx.delete(f"{base_url}/v1/keys/key_1").mock( + return_value=httpx.Response(204) + ) + client = Syvel(api_key=api_key) + client.revoke_key("key_1") + + assert route.called + + def test_p1_methods_return_none_in_silent_mode_on_error( + self, api_key: str, base_url: str + ) -> None: + with respx.mock: + respx.get(f"{base_url}/v1/usage").mock(return_value=httpx.Response(401, json={})) + respx.get(f"{base_url}/v1/logs").mock(return_value=httpx.Response(401, json={})) + respx.get(f"{base_url}/v1/stats").mock(return_value=httpx.Response(401, json={})) + respx.get(f"{base_url}/v1/keys").mock(return_value=httpx.Response(401, json={})) + respx.post(f"{base_url}/v1/keys").mock(return_value=httpx.Response(401, json={})) + client = Syvel(api_key=api_key, silent=True) + assert client.usage() is None + assert client.logs() is None + assert client.stats() is None + assert client.list_keys() is None + assert client.create_key("x") is None diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..0c18a2e --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,116 @@ +"""Real-API integration tests. + +These tests make live HTTP requests to api.syvel.io and are skipped automatically +when the ``SYVEL_API_KEY`` environment variable is not set. + +Run them with:: + + SYVEL_API_KEY=sv_... pytest tests/test_integration.py -v +""" + +from __future__ import annotations + +import os + +import pytest + +from syvel import AsyncSyvel, Syvel +from syvel.exceptions import SyvelAuthError, SyvelValidationError + +API_KEY = os.environ.get("SYVEL_API_KEY") + +pytestmark = pytest.mark.skipif( + not API_KEY, + reason="SYVEL_API_KEY environment variable is not set", +) + + +# --------------------------------------------------------------------------- +# Sync +# --------------------------------------------------------------------------- + + +class TestSyncIntegration: + def test_check_known_disposable_domain(self) -> None: + assert API_KEY is not None + client = Syvel(api_key=API_KEY) + result = client.check("yopmail.com") + assert result is not None + assert result.is_risky is True + assert result.risk_score == 100 + assert result.reason == "disposable" + + def test_check_safe_domain(self) -> None: + assert API_KEY is not None + client = Syvel(api_key=API_KEY) + result = client.check("gmail.com") + assert result is not None + assert result.is_risky is False + + def test_check_result_has_all_fields(self) -> None: + assert API_KEY is not None + client = Syvel(api_key=API_KEY) + result = client.check("yopmail.com") + assert result is not None + assert isinstance(result.email, str) + assert isinstance(result.is_risky, bool) + assert isinstance(result.risk_score, int) + assert result.reason in ("safe", "disposable", "undeliverable", "role_account") + assert isinstance(result.deliverability_score, int) + assert isinstance(result.is_free_provider, bool) + assert isinstance(result.is_corporate_email, bool) + assert isinstance(result.is_alias_email, bool) + + def test_check_email_passes_full_address(self) -> None: + assert API_KEY is not None + client = Syvel(api_key=API_KEY) + result = client.check_email("user@yopmail.com") + assert result is not None + assert result.is_risky is True + + def test_invalid_api_key_raises_auth_error(self) -> None: + client = Syvel(api_key="sv_this_key_is_intentionally_invalid_for_testing") + with pytest.raises(SyvelAuthError): + client.check("gmail.com") + + def test_invalid_domain_raises_validation_error(self) -> None: + assert API_KEY is not None + client = Syvel(api_key=API_KEY) + with pytest.raises(SyvelValidationError): + client.check("not a valid domain !!!") + + +# --------------------------------------------------------------------------- +# Async +# --------------------------------------------------------------------------- + + +class TestAsyncIntegration: + async def test_check_known_disposable_domain(self) -> None: + assert API_KEY is not None + async with AsyncSyvel(api_key=API_KEY) as client: + result = await client.check("yopmail.com") + assert result is not None + assert result.is_risky is True + assert result.risk_score == 100 + + async def test_check_safe_domain(self) -> None: + assert API_KEY is not None + async with AsyncSyvel(api_key=API_KEY) as client: + result = await client.check("gmail.com") + assert result is not None + assert result.is_risky is False + + async def test_check_email_passes_full_address(self) -> None: + assert API_KEY is not None + async with AsyncSyvel(api_key=API_KEY) as client: + result = await client.check_email("user@yopmail.com") + assert result is not None + assert result.is_risky is True + + async def test_invalid_api_key_raises_auth_error(self) -> None: + async with AsyncSyvel( + api_key="sv_this_key_is_intentionally_invalid_for_testing" + ) as client: + with pytest.raises(SyvelAuthError): + await client.check("gmail.com")