Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 161 additions & 85 deletions scrapegraphai/telemetry/telemetry.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
"""
This module contains code that relates to sending ScrapeGraphAI usage telemetry.

To disable sending telemetry there are three ways:

1. Set it to false programmatically in your driver:
>>> from scrapegraphai import telemetry
>>> telemetry.disable_telemetry()
2. Set it to `false` in ~/.scrapegraphai.conf under `DEFAULT`
[DEFAULT]
telemetry_enabled = False
3. Set SCRAPEGRAPHAI_TELEMETRY_ENABLED=false as an environment variable:
SCRAPEGRAPHAI_TELEMETRY_ENABLED=false python run.py
or:
export SCRAPEGRAPHAI_TELEMETRY_ENABLED=false
"""

import configparser
import functools
import importlib.metadata
import json
import logging
import os
import platform
import threading
import uuid
from typing import Callable, Dict
from typing import Any, Callable
from urllib import request
from urllib.error import HTTPError, URLError

# Load version
VERSION = importlib.metadata.version("scrapegraphai")
STR_VERSION = ".".join([str(i) for i in VERSION])

# 🚀 Your proxy service endpoint (instead of PostHog)
PROXY_URL = "https://scrapegraph-proxy.onrender.com/capture/"
from pydantic import BaseModel, Field, ValidationError

VERSION = importlib.metadata.version("scrapegraphai")
TRACK_URL = "https://sgai-oss-tracing.onrender.com/v1/telemetry"
TIMEOUT = 2
DEFAULT_CONFIG_LOCATION = os.path.expanduser("~/.scrapegraphai.conf")

logger = logging.getLogger(__name__)

# Everything below remains mostly same

def _load_config(config_location: str) -> configparser.ConfigParser:
config = configparser.ConfigParser()
try:
with open(config_location) as f:
config.read_file(f)
except Exception:
except (OSError, configparser.Error) as e:
logger.debug(f"Unable to load config file: {e}")
config["DEFAULT"] = {}
else:
if "DEFAULT" not in config:
Expand All @@ -39,27 +53,31 @@ def _load_config(config_location: str) -> configparser.ConfigParser:
try:
with open(config_location, "w") as f:
config.write(f)
except Exception:
pass
except OSError as e:
logger.debug(f"Unable to write config file: {e}")
return config


def _check_config_and_environ_for_telemetry_flag(default_value: bool, config_obj):
telemetry_enabled = default_value
def _check_config_and_environ_for_telemetry_flag(
telemetry_default: bool, config_obj: configparser.ConfigParser
) -> bool:
telemetry_enabled = telemetry_default
if "telemetry_enabled" in config_obj["DEFAULT"]:
try:
telemetry_enabled = config_obj.getboolean("DEFAULT", "telemetry_enabled")
except Exception:
pass

except ValueError as e:
logger.debug(
f"Unable to parse value for `telemetry_enabled` from config. Encountered {e}"
)
if os.environ.get("SCRAPEGRAPHAI_TELEMETRY_ENABLED") is not None:
env_value = os.environ.get("SCRAPEGRAPHAI_TELEMETRY_ENABLED")
config_obj["DEFAULT"]["telemetry_enabled"] = env_value
try:
telemetry_enabled = config_obj.getboolean(
"DEFAULT", "telemetry_enabled"
telemetry_enabled = config_obj.getboolean("DEFAULT", "telemetry_enabled")
except ValueError as e:
logger.debug(
f"Unable to parse value for `SCRAPEGRAPHAI_TELEMETRY_ENABLED` from environment. Encountered {e}"
)
except Exception:
pass

return telemetry_enabled


Expand All @@ -70,69 +88,128 @@ def _check_config_and_environ_for_telemetry_flag(default_value: bool, config_obj
MAX_COUNT_SESSION = 1000


BASE_PROPERTIES = {
"os_type": os.name,
"os_version": platform.platform(),
"python_version": f"{platform.python_version()}/{platform.python_implementation()}",
"distinct_id": g_anonymous_id,
"scrapegraphai_version": VERSION,
"telemetry_version": "0.0.4-proxy",
}


def disable_telemetry():
"""
function for disabling the telemetries
"""
global g_telemetry_enabled
g_telemetry_enabled = False


def is_telemetry_enabled() -> bool:
"""
function for checking if a telemetry is enables
"""
if g_telemetry_enabled:
global CALL_COUNTER
if CALL_COUNTER == 0:
logger.debug(
"Note: ScrapeGraphAI collects anonymous usage data to improve the library. "
"You can disable telemetry by setting SCRAPEGRAPHAI_TELEMETRY_ENABLED=false or "
"by editing ~/.scrapegraphai.conf."
)
CALL_COUNTER += 1
if CALL_COUNTER > MAX_COUNT_SESSION:
return False
return True
return False
else:
return False


class TelemetryEvent(BaseModel):
"""Validated telemetry payload matching the tracing API schema."""

user_prompt: str = Field(min_length=1, max_length=4096)
json_schema: str = Field(min_length=512, max_length=16384)
website_content: str = Field(min_length=1, max_length=65536)
llm_response: str = Field(min_length=1, max_length=32768)
llm_model: str = Field(min_length=1, max_length=256)
url: str = Field(min_length=1, max_length=2048)

# ⭐ UPDATED FOR PROXY — send without API key
def _send_event_json(event_json: dict):

def _build_valid_telemetry_event(
prompt: str | None,
schema: dict | None,
content: str | None,
response: dict | str | None,
llm_model: str | None,
source: list[str] | None,
) -> TelemetryEvent | None:
"""Build and validate a TelemetryEvent. Returns None if validation fails."""
url: str | None = source[0] if isinstance(source, list) and source else None

json_schema: str | None = None
if isinstance(schema, dict):
try:
json_schema = json.dumps(schema)
except (TypeError, ValueError):
json_schema = None
elif schema is not None:
json_schema = str(schema)

llm_response: str | None = None
if isinstance(response, dict):
try:
llm_response = json.dumps(response)
except (TypeError, ValueError):
llm_response = None
elif response is not None:
llm_response = str(response)

try:
return TelemetryEvent(
user_prompt=prompt,
json_schema=json_schema,
website_content=content,
llm_response=llm_response,
llm_model=llm_model or "unknown",
url=url,
)
except (ValidationError, TypeError):
return None


def _send_telemetry(event: TelemetryEvent):
"""Send telemetry event to the tracing endpoint."""
headers = {
"Content-Type": "application/json",
"User-Agent": f"scrapegraphai/{STR_VERSION}",
"sgai-oss-version": VERSION,
}
try:
data = json.dumps(event_json).encode()
req = request.Request(PROXY_URL, data=data, headers=headers)
data = json.dumps(event.model_dump()).encode()
except (TypeError, ValueError) as e:
logger.debug(f"Failed to serialize telemetry event: {e}")
return

try:
req = request.Request(TRACK_URL, data=data, headers=headers)
with request.urlopen(req, timeout=TIMEOUT) as f:
response_body = f.read()
if f.code != 200:
raise RuntimeError(response_body)
except Exception as e:
logger.debug(f"Failed to send telemetry data to proxy: {e}")
else:
logger.debug(f"Telemetry payload forwarded to proxy: {data}")
f.read()
if f.code == 201:
logger.debug("Telemetry data sent successfully")
else:
logger.debug(f"Telemetry endpoint returned unexpected status: {f.code}")
except HTTPError as e:
logger.debug(f"Failed to send telemetry data (HTTP {e.code}): {e.reason}")
except URLError as e:
logger.debug(f"Failed to send telemetry data (URL error): {e.reason}")
except OSError as e:
logger.debug(f"Failed to send telemetry data (OS error): {e}")


def send_event_json(event_json: dict):
if not g_telemetry_enabled:
raise RuntimeError("Telemetry tracking is disabled!")
def _send_telemetry_threaded(event: TelemetryEvent):
"""Send telemetry in a background daemon thread."""
try:
th = threading.Thread(target=_send_event_json, args=(event_json,))
th = threading.Thread(target=_send_telemetry, args=(event,))
th.daemon = True
th.start()
except Exception as e:
logger.debug(f"Telemetry dispatch thread failed: {e}")
except RuntimeError as e:
logger.debug(f"Failed to send telemetry data in a thread: {e}")


def log_event(event: str, properties: Dict[str, any]):
if is_telemetry_enabled():
payload = {
"event": event,
"distinct_id": g_anonymous_id,
"properties": {**BASE_PROPERTIES, **properties},
}
send_event_json(payload)
def log_event(event: str, properties: dict[str, Any]):
"""No-op stub kept for backwards compatibility."""
logger.debug(f"log_event called with event={event} (no-op)")


def log_graph_execution(
Expand All @@ -150,31 +227,30 @@ def log_graph_execution(
exception: str = None,
total_tokens: int = None,
):
props = {
"graph_name": graph_name,
"source": source,
"prompt": prompt,
"schema": schema,
"llm_model": llm_model,
"embedder_model": embedder_model,
"source_type": source_type,
"content": content,
"response": response,
"execution_time": execution_time,
"error_node": error_node,
"exception": exception,
"total_tokens": total_tokens,
"type": "community-library",
}
log_event("graph_execution", props)
"""
function for logging the graph execution
"""
if not is_telemetry_enabled():
return

if error_node is not None:
return

event = _build_valid_telemetry_event(
prompt=prompt,
schema=schema,
content=content,
response=response,
llm_model=llm_model,
source=source,
)
if event is None:
logger.debug("Telemetry skipped: event validation failed")
return

_send_telemetry_threaded(event)


def capture_function_usage(call_fn: Callable) -> Callable:
@functools.wraps(call_fn)
def wrapped_fn(*args, **kwargs):
try:
return call_fn(*args, **kwargs)
finally:
if is_telemetry_enabled():
log_event("function_usage", {"function_name": call_fn.__name__})
return wrapped_fn
"""Passthrough decorator kept for backwards compatibility."""
return call_fn
Loading