Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions automation_file/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,40 @@
)
from automation_file.core.sqlite_lock import SQLiteLock
from automation_file.core.substitution import SubstitutionException, substitute
from automation_file.core.tracing import action_span, init_tracing
from automation_file.exceptions import (
BoxException,
DataOpsException,
DiffException,
OneDriveException,
TextOpsException,
TracingException,
)
from automation_file.local.archive_ops import (
detect_archive_format,
extract_archive,
list_archive,
supported_formats,
)
from automation_file.local.conditional import if_exists, if_newer, if_size_gt
from automation_file.local.data_ops import (
csv_filter,
csv_to_jsonl,
csv_to_parquet,
jsonl_append,
jsonl_iter,
parquet_read,
parquet_write,
yaml_delete,
yaml_get,
yaml_set,
)
from automation_file.local.diff_ops import (
DirDiff,
apply_dir_diff,
apply_text_patch,
diff_dirs,
diff_dirs_summary,
diff_text_files,
iter_dir_diff,
)
Expand All @@ -108,6 +131,13 @@
from automation_file.local.sync_ops import SyncException, sync_dir
from automation_file.local.tar_ops import TarException, create_tar, extract_tar
from automation_file.local.templates import render_file, render_string
from automation_file.local.text_ops import (
encoding_convert,
file_merge,
file_split,
line_count,
sed_replace,
)
from automation_file.local.trash import (
TrashEntry,
empty_trash,
Expand Down Expand Up @@ -147,6 +177,7 @@
azure_blob_instance,
register_azure_blob_ops,
)
from automation_file.remote.box import BoxClient, box_instance, register_box_ops
from automation_file.remote.cross_backend import CrossBackendException, copy_between
from automation_file.remote.dropbox_api import (
DropboxClient,
Expand Down Expand Up @@ -194,6 +225,11 @@
drive_upload_to_folder,
)
from automation_file.remote.http_download import download_file
from automation_file.remote.onedrive import (
OneDriveClient,
onedrive_instance,
register_onedrive_ops,
)
from automation_file.remote.s3 import S3Client, register_s3_ops, s3_instance
from automation_file.remote.sftp import SFTPClient, register_sftp_ops, sftp_instance
from automation_file.remote.smb import SMBClient, SMBEntry
Expand Down Expand Up @@ -297,9 +333,19 @@ def __getattr__(name: str) -> Any:
"detect_archive_format",
"detect_from_bytes",
"detect_mime",
"action_span",
"apply_text_patch",
"csv_filter",
"csv_to_jsonl",
"csv_to_parquet",
"diff_dirs",
"diff_dirs_summary",
"diff_text_files",
"empty_trash",
"encoding_convert",
"file_merge",
"file_split",
"init_tracing",
"extract_archive",
"iter_dir_diff",
"list_archive",
Expand All @@ -321,7 +367,20 @@ def __getattr__(name: str) -> Any:
"json_get",
"json_set",
"json_delete",
"jsonl_append",
"jsonl_iter",
"JsonEditException",
"line_count",
"parquet_read",
"parquet_write",
"sed_replace",
"yaml_delete",
"yaml_get",
"yaml_set",
"DataOpsException",
"DiffException",
"TextOpsException",
"TracingException",
"zip_dir",
"zip_file",
"zip_info",
Expand Down Expand Up @@ -371,6 +430,14 @@ def __getattr__(name: str) -> Any:
"FTPException",
"ftp_instance",
"register_ftp_ops",
"OneDriveClient",
"onedrive_instance",
"register_onedrive_ops",
"OneDriveException",
"BoxClient",
"box_instance",
"register_box_ops",
"BoxException",
"CrossBackendException",
"copy_between",
"WebDAVClient",
Expand Down
13 changes: 8 additions & 5 deletions automation_file/core/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ def __init__(self, registry: ActionRegistry | None = None) -> None:

# Template-method: single action ------------------------------------
def _execute_event(self, action: list) -> Any:
from automation_file.core.tracing import action_span

name, payload_kind, payload = self._parse_action(action)
command = self.registry.resolve(name)
if command is None:
raise ExecuteActionException(f"unknown action: {name!r}")
if payload_kind == "none":
return command()
if payload_kind == "kwargs":
return command(**payload)
return command(*payload)
with action_span(name):
if payload_kind == "none":
return command()
if payload_kind == "kwargs":
return command(**payload)
return command(*payload)

@staticmethod
def _parse_action(action: list) -> tuple[str, str, Any]:
Expand Down
33 changes: 32 additions & 1 deletion automation_file/core/action_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ def event_dict(self) -> dict[str, Command]:
def _local_commands() -> dict[str, Command]:
from automation_file.local import (
conditional,
data_ops,
diff_ops,
dir_ops,
file_ops,
json_edit,
shell_ops,
sync_ops,
tar_ops,
text_ops,
zip_ops,
)

Expand Down Expand Up @@ -113,6 +116,29 @@ def _local_commands() -> dict[str, Command]:
"FA_if_exists": conditional.if_exists,
"FA_if_newer": conditional.if_newer,
"FA_if_size_gt": conditional.if_size_gt,
# Text / binary
"FA_file_split": text_ops.file_split,
"FA_file_merge": text_ops.file_merge,
"FA_encoding_convert": text_ops.encoding_convert,
"FA_line_count": text_ops.line_count,
"FA_sed_replace": text_ops.sed_replace,
# Diff / patch
"FA_diff_files": diff_ops.diff_text_files,
"FA_diff_dirs": diff_ops.diff_dirs_summary,
"FA_apply_patch": diff_ops.apply_text_patch,
# Structured data (CSV / JSONL)
"FA_csv_filter": data_ops.csv_filter,
"FA_csv_to_jsonl": data_ops.csv_to_jsonl,
"FA_jsonl_iter": data_ops.jsonl_iter,
"FA_jsonl_append": data_ops.jsonl_append,
# Structured data (YAML)
"FA_yaml_get": data_ops.yaml_get,
"FA_yaml_set": data_ops.yaml_set,
"FA_yaml_delete": data_ops.yaml_delete,
# Structured data (Parquet)
"FA_parquet_read": data_ops.parquet_read,
"FA_parquet_write": data_ops.parquet_write,
"FA_csv_to_parquet": data_ops.csv_to_parquet,
}


Expand Down Expand Up @@ -153,7 +179,7 @@ def _http_commands() -> dict[str, Command]:


def _utils_commands() -> dict[str, Command]:
from automation_file.core import checksum, crypto, manifest
from automation_file.core import checksum, crypto, manifest, tracing
from automation_file.remote import cross_backend
from automation_file.utils import deduplicate, fast_find, grep, rotate

Expand All @@ -170,6 +196,7 @@ def _utils_commands() -> dict[str, Command]:
"FA_copy_between": cross_backend.copy_between,
"FA_encrypt_file": crypto.encrypt_file,
"FA_decrypt_file": crypto.decrypt_file,
"FA_tracing_init": tracing.init_tracing,
}


Expand All @@ -186,8 +213,10 @@ def _lazy_execute_action_dag(

def _register_cloud_backends(registry: ActionRegistry) -> None:
from automation_file.remote.azure_blob import register_azure_blob_ops
from automation_file.remote.box import register_box_ops
from automation_file.remote.dropbox_api import register_dropbox_ops
from automation_file.remote.ftp import register_ftp_ops
from automation_file.remote.onedrive import register_onedrive_ops
from automation_file.remote.s3 import register_s3_ops
from automation_file.remote.sftp import register_sftp_ops

Expand All @@ -196,6 +225,8 @@ def _register_cloud_backends(registry: ActionRegistry) -> None:
register_dropbox_ops(registry)
register_sftp_ops(registry)
register_ftp_ops(registry)
register_onedrive_ops(registry)
register_box_ops(registry)


def _register_trigger_ops(registry: ActionRegistry) -> None:
Expand Down
125 changes: 125 additions & 0 deletions automation_file/core/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""OpenTelemetry tracing bridge for the action executor and DAG runner.

Callers opt in by calling :func:`init_tracing` once at startup (or the
``FA_tracing_init`` action) with a service name. Every subsequent action
dispatch through ``ActionExecutor._execute_event`` and every DAG node run
through ``dag_executor._run_action`` is wrapped in a span named
``automation_file.action`` with the action name on the ``fa.action`` attribute.

If ``init_tracing`` has not been called, :func:`action_span` returns a
cheap no-op context manager — the executor always pays exactly one
``trace.get_tracer`` call and nothing else, so tracing is zero-overhead
for callers who never enable it.
"""

from __future__ import annotations

import contextlib
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any

from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter

from automation_file.exceptions import TracingException
from automation_file.logging_config import file_automation_logger

_TRACER_NAME = "automation_file"
# Mutable container so helpers don't need ``global`` to flip the flag.
_state: dict[str, bool] = {"initialised": False}


def init_tracing(
service_name: str = "automation_file",
*,
exporter: SpanExporter | None = None,
resource_attributes: dict[str, Any] | None = None,
) -> bool:
"""Install a global :class:`TracerProvider` and register ``exporter``.

Returns True on the first call, False if tracing is already initialised.
``exporter`` defaults to a :class:`SpanExporter` that discards everything —
so that spans are created (and tooling can inspect them) without requiring
the caller to wire up a backend. Pass an OTLP / Jaeger / Zipkin exporter
from the matching ``opentelemetry-exporter-*`` package when you want spans
to leave the process.
"""
if _state["initialised"]:
return False
attributes: dict[str, Any] = {SERVICE_NAME: service_name}
if resource_attributes:
attributes.update(resource_attributes)
resource = Resource.create(attributes)
provider = TracerProvider(resource=resource)
active_exporter = exporter if exporter is not None else _NullExporter()
provider.add_span_processor(BatchSpanProcessor(active_exporter))
try:
trace.set_tracer_provider(provider)
except Exception as err: # pylint: disable=broad-exception-caught
raise TracingException(f"cannot install tracer provider: {err}") from err
_state["initialised"] = True
file_automation_logger.info("tracing: initialised (service=%s)", service_name)
return True


def is_initialised() -> bool:
"""Return True when :func:`init_tracing` has already run."""
return _state["initialised"]


@contextmanager
def action_span(action_name: str, attributes: dict[str, Any] | None = None) -> Iterator[None]:
"""Open a span named ``automation_file.action`` for ``action_name``.

When tracing is not initialised this is a no-op — the executor can wrap
every action unconditionally without paying for an unused tracer on the
hot path.
"""
if not _state["initialised"]:
yield
return
tracer = trace.get_tracer(_TRACER_NAME)
with tracer.start_as_current_span("automation_file.action") as span:
span.set_attribute("fa.action", action_name)
if attributes:
for key, value in attributes.items():
span.set_attribute(key, value)
yield


def _shutdown_for_tests() -> None:
"""Reset module state so a fresh :func:`init_tracing` call works again.

OpenTelemetry's :func:`trace.set_tracer_provider` is a one-shot guarded
by an internal ``Once`` sentinel — repeated calls are silently ignored.
Tests need to flip tracing off and back on, so we reach into the
``opentelemetry.trace`` module and reset the sentinel. This is the
conventional pattern used by the opentelemetry-python test suite itself.
"""
provider = trace.get_tracer_provider()
shutdown = getattr(provider, "shutdown", None)
if callable(shutdown):
# Exporter shutdown is best-effort when a test already tore it down.
with contextlib.suppress(Exception):
shutdown() # pylint: disable=not-callable # narrowed by callable() above
# pylint: disable=protected-access # test-only reset of OTel's Once sentinel
once_cls = type(trace._TRACER_PROVIDER_SET_ONCE)
trace._TRACER_PROVIDER_SET_ONCE = once_cls()
trace._TRACER_PROVIDER = None
_state["initialised"] = False


class _NullExporter(SpanExporter):
"""Default exporter: accept spans, discard them."""

def export(self, spans: Any) -> Any:
from opentelemetry.sdk.trace.export import SpanExportResult

del spans
return SpanExportResult.SUCCESS

def shutdown(self) -> None:
return None
20 changes: 20 additions & 0 deletions automation_file/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ class FsspecException(FileAutomationException):
"""Raised by the fsspec bridge on missing dependency or backend failures."""


class TextOpsException(FileAutomationException):
"""Raised by text / binary file helpers (split, merge, sed, encoding_convert)."""


class DataOpsException(FileAutomationException):
"""Raised by CSV / JSONL / YAML / Parquet helpers."""


class OneDriveException(FileAutomationException):
"""Raised by the OneDrive (Microsoft Graph) backend."""


class BoxException(FileAutomationException):
"""Raised by the Box backend."""


class TracingException(FileAutomationException):
"""Raised when OpenTelemetry tracing setup cannot be completed."""


_ARGPARSE_EMPTY_MESSAGE = "argparse received no actionable argument"
_BAD_TRIGGER_FUNCTION = "trigger name is not registered in the executor"
_BAD_CALLBACK_METHOD = "callback_param_method must be 'kwargs' or 'args'"
Expand Down
Loading
Loading