From 3228eaf0cd353de9d334d90a58363d3bd8041e79 Mon Sep 17 00:00:00 2001 From: Abhi <171412961+iapoorv01@users.noreply.github.com> Date: Wed, 1 Jul 2026 01:25:04 +0530 Subject: [PATCH 1/2] Promote TwelveLabs Video RAG components into core xpacks/llm This commit promotes the TwelveLabs integration from the template directory into the native pathway.xpacks.llm core library, as proposed in #255. Key changes: - pyproject.toml: Introduced a new [twelvelabs] optional dependency extra. - embedders.py: Promoted MarengoEmbedder into pathway.xpacks.llm.embedders. It inherits from BaseEmbedder and leverages an async pathway (_aembed_one) via asyncio.gather for concurrent embedding generation, preventing thread blocking. It implements the lazy ImportError pattern. - parsers.py: Promoted TwelveLabsVideoParser into pathway.xpacks.llm.parsers. Implemented proper resource cleanup logic using try/finally blocks to explicitly delete TwelveLabs assets (when delete_assets=True) preventing asset flooding on the API. Also implements the lazy ImportError pattern. - test_twelvelabs.py: Ported the unit tests into python/pathway/xpacks/llm/tests/. Contains no-network test coverage via stubbed SDK mocks. The live smoke test is maintained but securely gated behind the TWELVELABS_API_KEY existence check. - CHANGELOG.md: Added an entry under [Unreleased] -> Added documenting the promotion. This effectively extends Pathway's live-sync multimodal indexing capabilities to handle full video ingestion over the TwelveLabs network natively. --- CHANGELOG.md | 1 + pyproject.toml | 4 + python/pathway/xpacks/llm/embedders.py | 140 +++++++++++ python/pathway/xpacks/llm/parsers.py | 167 +++++++++++++ .../xpacks/llm/tests/test_twelvelabs.py | 220 ++++++++++++++++++ 5 files changed, 532 insertions(+) create mode 100644 python/pathway/xpacks/llm/tests/test_twelvelabs.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c200cd81..9bb52d931 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `pw.io.weaviate.write` writes a Pathway table to a [Weaviate](https://weaviate.io/) collection, keeping it in sync with the table: additions and updates upsert objects (keyed by a UUID derived from the required `primary_key`), and deletions remove them. An optional `vector` column is stored as the object's embedding; the remaining columns become object properties. The target collection must already exist. The connector writes in parallel across workers (`pathway spawn -n`), with `batch_size` and `concurrency` to tune throughput, and supports `api_key`/`headers` authentication for self-hosted and Weaviate Cloud deployments. - `pw.io.pinecone.write` writes a Pathway table to a [Pinecone](https://www.pinecone.io/) index for use as a vector store in RAG pipelines. It keeps the index in sync with the current state of the table: a row is upserted under its record id and a row removed from the table is deleted from the index. By default the id is the table's internal row key (unique, and written in parallel across workers); pass a `primary_key` column to use your own ids instead, in which case the values must uniquely identify rows (a collision raises an error) and writing runs on a single worker. The `vector` column holds the dense embedding (`list[float]` or a 1-D `numpy.ndarray`); the remaining columns — or just those listed in `metadata_columns` — are stored as record metadata. Targets both Pinecone cloud and the local Pinecone emulator via the `host` parameter; the index must already exist with a matching dimension. - `pw.io.s3.read`, `pw.io.s3.read_from_digital_ocean`, `pw.io.s3.read_from_wasabi`, `pw.io.minio.read`, `pw.io.pyfilesystem.read`, and `pw.xpacks.connectors.sharepoint.read` now support the `format="only_metadata"` option (already available in `pw.io.fs.read` and `pw.io.gdrive.read`). In this mode the connector tracks additions, modifications, and deletions of objects in the source but does not download their contents — the resulting table contains only the `_metadata` column. This is useful for monitoring changes in large buckets or directories without spending time and traffic on fetching the objects themselves. For S3 and MinIO this also skips the object downloads entirely at the engine level, not just the parsing of their contents. +- Promoted `TwelveLabsVideoParser` and `MarengoEmbedder` out of the Video RAG example template and into the native `pathway.xpacks.llm` core library. You can now build Video RAG applications directly in Pathway by installing `pip install pathway[twelvelabs]`. ### Changed - `pw.xpacks.connectors.sharepoint.read` now supports the `max_backlog_size` parameter, already available in the other input connectors. It limits the number of entries read from SharePoint and kept in processing at any moment, which helps to avoid memory spikes when the source emits a large initial burst of data. diff --git a/pyproject.toml b/pyproject.toml index 9cb6d99b2..85e8d440b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,6 +99,9 @@ xpack-llm-docs = [ xpack-sharepoint = [ "Office365-REST-Python-Client >= 2.5.3", ] +twelvelabs = [ + "twelvelabs >= 0.3.0", +] all = [ "pathway[pyfilesystem]", "pathway[sql]", @@ -108,6 +111,7 @@ all = [ "pathway[xpack-sharepoint]", "pathway[airbyte]", "pathway[milvus]", + "pathway[twelvelabs]", ] tests = [ "pytest >= 8.0.0, < 9.0.0", diff --git a/python/pathway/xpacks/llm/embedders.py b/python/pathway/xpacks/llm/embedders.py index 504d5eaa2..58a25c622 100644 --- a/python/pathway/xpacks/llm/embedders.py +++ b/python/pathway/xpacks/llm/embedders.py @@ -21,6 +21,7 @@ "SentenceTransformerEmbedder", "GeminiEmbedder", "BedrockEmbedder", + "MarengoEmbedder", ] @@ -753,3 +754,142 @@ async def __wrapped__(self, input: str, **kwargs) -> np.ndarray: embedding = result.get("embedding", result.get("embeddings", [[]])[0]) return np.array(embedding) + + +DEFAULT_MARENGO_MODEL = "marengo3.0" + + +def _resolve_twelvelabs_api_key(api_key: str | None) -> str: + import os + + key = api_key or os.environ.get("TWELVELABS_API_KEY") + if not key: + raise ValueError( + "TwelveLabs API key is missing. Pass `api_key=...` or set the " + "`TWELVELABS_API_KEY` environment variable." + ) + return key + + +def _build_twelvelabs_client(api_key: str | None): + try: + from twelvelabs import TwelveLabs + except ImportError as e: + raise ImportError( + "The `twelvelabs` package is required to use the TwelveLabs components. " + "Install it with `pip install pathway[twelvelabs]`." + ) from e + return TwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) + + +def _build_async_twelvelabs_client(api_key: str | None): + try: + from twelvelabs import AsyncTwelveLabs + except ImportError as e: + raise ImportError( + "The `twelvelabs` package is required to use the TwelveLabs components. " + "Install it with `pip install pathway[twelvelabs]`." + ) from e + return AsyncTwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) + + +class MarengoEmbedder(BaseEmbedder): + """Embed text using the TwelveLabs Marengo multimodal embedding model. + + Marengo returns 512-dimensional embeddings in a shared multimodal space, so the + text it produces is directly comparable with image, audio and video embeddings + from the same model. This makes it a natural retriever embedder for pipelines + that index video with :class:`TwelveLabsVideoParser`. + + Args: + model: Marengo model name. Defaults to ``"marengo3.0"``. + api_key: TwelveLabs API key. If ``None``, the SDK reads it from the + ``TWELVELABS_API_KEY`` environment variable. + capacity: Maximum number of concurrent operations. Defaults to ``None`` + (no specific limit). + retry_strategy: Strategy for handling retries. Defaults to + :py:class:`~pathway.udfs.ExponentialBackoffRetryStrategy`. + cache_strategy: Pathway caching strategy. Defaults to ``None``. + + Example: + + >>> import pathway as pw # doctest: +SKIP + >>> from pathway.xpacks.llm.embedders import MarengoEmbedder # doctest: +SKIP + >>> embedder = MarengoEmbedder() # doctest: +SKIP + """ + + def __init__( + self, + *, + model: str = DEFAULT_MARENGO_MODEL, + api_key: str | None = None, + capacity: int | None = None, + retry_strategy: ( + udfs.AsyncRetryStrategy | None + ) = pw.udfs.ExponentialBackoffRetryStrategy(), + cache_strategy: udfs.CacheStrategy | None = None, + ): + executor = udfs.async_executor(capacity=capacity, retry_strategy=retry_strategy) + # Marengo embeds one text per request, so keep batches at size 1. + super().__init__( + executor=executor, cache_strategy=cache_strategy, max_batch_size=1 + ) + self.model = model + self._api_key = api_key + self._client = None + self._aclient = None + + @property + def client(self): + if self._client is None: + self._client = _build_twelvelabs_client(self._api_key) + return self._client + + @property + def aclient(self): + if self._aclient is None: + self._aclient = _build_async_twelvelabs_client(self._api_key) + return self._aclient + + def get_embedding_dimension(self, **kwargs) -> int: + """Return the embedding dimension (512 for Marengo). + + This is a one-time, setup-time probe: Pathway calls it once while + building the index, not on the per-document hot path. The single + synchronous request issued here is therefore intentional and acceptable + (the actual embedding hot path runs asynchronously via ``__wrapped__``). + + The base implementation probes ``__wrapped__`` with a single string and + takes ``len`` of the result; since this embedder always returns a list of + vectors, probe with a one-element list and measure the first vector + instead (mirroring Pathway's ``SentenceTransformerEmbedder``). + """ + return len(self._embed_one(".")) + + def _embed_one(self, text: str) -> np.ndarray: + """Synchronous single-text embed, used only for the setup-time probe.""" + response = self.client.embed.create(model_name=self.model, text=text) + vector = response.text_embedding.segments[0].float_ + return np.array(vector, dtype=np.float32) + + async def _aembed_one(self, text: str) -> np.ndarray: + resp = await self.aclient.embed.create(model_name=self.model, text=text) + vector = resp.text_embedding.segments[0].float_ + return np.array(vector, dtype=np.float32) + + async def __wrapped__(self, inputs: list[str], **kwargs) -> list[np.ndarray]: + """Embed the given texts with Marengo. + + Marengo embeds one text per request, so the requests are issued + concurrently on the async TwelveLabs client (``AsyncTwelveLabs``) rather + than serially, keeping the embedding hot path non-blocking. + + Args: + inputs: the strings to embed. + + Returns: + A list of 512-dimensional ``numpy`` arrays, one per input string. + """ + import asyncio + + return list(await asyncio.gather(*[self._aembed_one(t) for t in inputs])) diff --git a/python/pathway/xpacks/llm/parsers.py b/python/pathway/xpacks/llm/parsers.py index 338611e65..608faa3ca 100644 --- a/python/pathway/xpacks/llm/parsers.py +++ b/python/pathway/xpacks/llm/parsers.py @@ -1363,3 +1363,170 @@ async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]: ) text = getattr(transcript, "text", str(transcript)) return [(text, {})] + + +DEFAULT_PEGASUS_MODEL = "pegasus1.5" +DEFAULT_PROMPT = ( + "Describe this video in detail. Summarize what happens, who and what appears, " + "the setting, any spoken or on-screen text, and the overall topic. " + "Write the description so it can be used to answer questions about the video." +) + + +def _resolve_twelvelabs_api_key(api_key: str | None) -> str: + import os + + key = api_key or os.environ.get("TWELVELABS_API_KEY") + if not key: + raise ValueError( + "TwelveLabs API key is missing. Pass `api_key=...` or set the " + "`TWELVELABS_API_KEY` environment variable." + ) + return key + + +def _build_twelvelabs_client(api_key: str | None): + try: + from twelvelabs import TwelveLabs + except ImportError as e: + raise ImportError( + "The `twelvelabs` package is required to use the TwelveLabs components. " + "Install it with `pip install pathway[twelvelabs]`." + ) from e + return TwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) + + +class TwelveLabsVideoParser(pw.UDF): + """Parse videos into text using the TwelveLabs Pegasus model. + + The parser uploads the incoming video bytes to TwelveLabs as an asset, waits + for the asset to be ready, and then asks Pegasus to produce a textual + description of the video using ``prompt``. The returned text is suitable for + chunking, embedding and indexing by the standard Pathway RAG components. + + By default the uploaded asset is deleted once the analysis finishes (even if + the analysis fails), so repeated runs do not flood the TwelveLabs asset list. + Set ``delete_assets=False`` to keep the assets around for reuse or + inspection; in that case the emitted ``twelvelabs_asset_id`` metadata refers + to a live, retrievable asset. + + Args: + prompt: Instruction sent to Pegasus describing what to extract from the + video. Defaults to a generic, RAG-oriented description prompt. + model: Pegasus model name. Defaults to ``"pegasus1.5"``. + api_key: TwelveLabs API key. If ``None``, the SDK reads it from the + ``TWELVELABS_API_KEY`` environment variable. + max_tokens: Maximum number of tokens Pegasus may generate. Defaults to 2048. + temperature: Sampling temperature for Pegasus. Defaults to ``None`` (SDK default). + asset_poll_interval: Seconds between asset-readiness checks. Defaults to 5. + asset_timeout: Maximum number of seconds to wait for an uploaded asset to + become ready before raising. Defaults to 600. + delete_assets: If ``True`` (the default), the uploaded asset is deleted + after the analysis completes, so repeated runs do not accumulate + assets in your TwelveLabs account. When ``True``, the emitted + ``twelvelabs_asset_id`` metadata is omitted because the asset no + longer exists. Set to ``False`` to keep assets (e.g. for reuse or + debugging), in which case the id is included in the metadata. + cache_strategy: Pathway caching strategy. To enable caching, pass a valid + :py:class:`~pathway.udfs.CacheStrategy`. Defaults to ``None``. + + Example: + + >>> import pathway as pw # doctest: +SKIP + >>> from pathway.xpacks.llm.parsers import TwelveLabsVideoParser # doctest: +SKIP + >>> parser = TwelveLabsVideoParser() # doctest: +SKIP + """ + + def __init__( + self, + prompt: str = DEFAULT_PROMPT, + model: str = DEFAULT_PEGASUS_MODEL, + api_key: str | None = None, + max_tokens: int = 2048, + temperature: float | None = None, + asset_poll_interval: float = 5.0, + asset_timeout: float = 600.0, + delete_assets: bool = True, + cache_strategy: udfs.CacheStrategy | None = None, + ): + super().__init__(cache_strategy=cache_strategy) + self.prompt = prompt + self.model = model + self.max_tokens = max_tokens + self.temperature = temperature + self.asset_poll_interval = asset_poll_interval + self.asset_timeout = asset_timeout + self.delete_assets = delete_assets + self._api_key = api_key + self._client = None + + @property + def client(self): + if self._client is None: + self._client = _build_twelvelabs_client(self._api_key) + return self._client + + def _upload_asset(self, contents: bytes) -> str: + """Upload video bytes and return the asset id once it is ready.""" + import time + + asset = self.client.assets.create( + method="direct", file=("video.mp4", contents), filename="video.mp4" + ) + deadline = time.monotonic() + self.asset_timeout + while asset.status not in ("ready", "failed"): + if time.monotonic() > deadline: + raise TimeoutError( + f"TwelveLabs asset {asset.id} was not ready after " + f"{self.asset_timeout}s (last status: {asset.status})." + ) + time.sleep(self.asset_poll_interval) + asset = self.client.assets.retrieve(asset.id) + if asset.status == "failed": + raise RuntimeError(f"TwelveLabs asset {asset.id} failed to process.") + return asset.id + + def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]: + from twelvelabs.types.video_context import VideoContext_AssetId + + asset_id = self._upload_asset(contents) + try: + logger.info("Analyzing TwelveLabs asset %s with Pegasus...", asset_id) + analyze_kwargs: dict = dict( + model_name=self.model, + video=VideoContext_AssetId(asset_id=asset_id), + prompt=self.prompt, + max_tokens=self.max_tokens, + ) + if self.temperature is not None: + analyze_kwargs["temperature"] = self.temperature + response = self.client.analyze(**analyze_kwargs) + text = response.data or "" + finally: + if self.delete_assets: + # Remove the per-run asset so repeated runs do not flood the + # TwelveLabs asset list. Best-effort: a cleanup failure must not + # mask the analysis result (or an analysis error above). + try: + self.client.assets.delete(asset_id) + except Exception: # noqa: BLE001 + logger.warning("Failed to delete TwelveLabs asset %s.", asset_id) + # When the asset has been deleted the id no longer resolves, so only + # surface it in the metadata when the asset is kept around. + metadata = {} if self.delete_assets else {"twelvelabs_asset_id": asset_id} + return [(text, metadata)] + + def __call__(self, contents: pw.ColumnExpression, **kwargs) -> pw.ColumnExpression: + """Parse the video document. + + Args: + contents: Column with the raw bytes of each video. + + Returns: + A column with a list of ``(text, metadata)`` pairs for each video. + When ``delete_assets=False`` the metadata records the TwelveLabs + ``twelvelabs_asset_id`` used for the analysis; with the default + ``delete_assets=True`` the asset is removed afterwards and the id is + omitted (it would no longer resolve). + """ + return super().__call__(contents, **kwargs) diff --git a/python/pathway/xpacks/llm/tests/test_twelvelabs.py b/python/pathway/xpacks/llm/tests/test_twelvelabs.py new file mode 100644 index 000000000..40e69eddd --- /dev/null +++ b/python/pathway/xpacks/llm/tests/test_twelvelabs.py @@ -0,0 +1,220 @@ +# Copyright © 2026 Pathway + +"""Tests for the TwelveLabs video-RAG components. + +The no-network tests stub the TwelveLabs SDK and run without any credentials. +The live test is skipped unless ``TWELVELABS_API_KEY`` is set in the environment. + +Run with:: + + pytest python/pathway/xpacks/llm/tests/test_twelvelabs.py +""" + +import os + +import numpy as np +import pytest + +from pathway.xpacks.llm.embedders import DEFAULT_MARENGO_MODEL, MarengoEmbedder +from pathway.xpacks.llm.parsers import DEFAULT_PEGASUS_MODEL, TwelveLabsVideoParser + + +class _FakeSegment: + def __init__(self, vector): + self.float_ = vector + + +class _FakeTextEmbedding: + def __init__(self, vector): + self.segments = [_FakeSegment(vector)] + + +class _FakeEmbeddingResponse: + def __init__(self, vector): + self.text_embedding = _FakeTextEmbedding(vector) + + +class _FakeEmbed: + def __init__(self, vector): + self._vector = vector + self.calls = [] + + def create(self, *, model_name, text): + self.calls.append((model_name, text)) + return _FakeEmbeddingResponse(self._vector) + + +class _FakeAsset: + def __init__(self, id, status): + self.id = id + self.status = status + + +class _FakeAssets: + def __init__(self): + self.uploaded = None + self.deleted = [] + + def create(self, *, method, file, filename): + self.uploaded = (method, filename) + return _FakeAsset("asset-123", "ready") + + def retrieve(self, asset_id): + return _FakeAsset(asset_id, "ready") + + def delete(self, asset_id): + self.deleted.append(asset_id) + + +class _FakeAnalyzeResponse: + def __init__(self, data): + self.data = data + + +class _FakeClient: + def __init__(self, *, vector=None, analyze_text="a description"): + self.embed = _FakeEmbed(vector or [0.0] * 512) + self.assets = _FakeAssets() + self._analyze_text = analyze_text + self.analyze_calls = [] + + def analyze(self, **kwargs): + self.analyze_calls.append(kwargs) + return _FakeAnalyzeResponse(self._analyze_text) + + +# --- No-network unit tests ------------------------------------------------- + + +class _FakeAsyncEmbed: + def __init__(self, vector): + self._vector = vector + self.calls = [] + + async def create(self, *, model_name, text): + self.calls.append((model_name, text)) + return _FakeEmbeddingResponse(self._vector) + + +class _FakeAsyncClient: + def __init__(self, *, vector=None): + self.embed = _FakeAsyncEmbed(vector or [0.0] * 512) + + +def test_embedder_returns_vector_array(): + embedder = MarengoEmbedder() + embedder._client = _FakeClient(vector=list(range(512))) + + out = embedder._embed_one("a red car") + + assert isinstance(out, np.ndarray) + assert out.shape == (512,) + assert out.dtype == np.float32 + assert embedder._client.embed.calls == [(DEFAULT_MARENGO_MODEL, "a red car")] + + +def test_embedder_defaults(): + embedder = MarengoEmbedder() + assert embedder.model == DEFAULT_MARENGO_MODEL + # Marengo embeds a single text per request. + assert embedder.max_batch_size == 1 + + +def test_embedder_wrapped_is_async_and_concurrent(): + # The hot path runs on the async client and returns one array per input. + import asyncio + + embedder = MarengoEmbedder() + embedder._aclient = _FakeAsyncClient(vector=list(range(512))) + + out = asyncio.run(embedder.__wrapped__(["a red car", "a blue boat"])) + + assert len(out) == 2 + for arr in out: + assert isinstance(arr, np.ndarray) + assert arr.shape == (512,) + assert arr.dtype == np.float32 + assert embedder._aclient.embed.calls == [ + (DEFAULT_MARENGO_MODEL, "a red car"), + (DEFAULT_MARENGO_MODEL, "a blue boat"), + ] + + +def test_embedding_dimension_probe_returns_512(): + # `BaseEmbedder.get_embedding_dimension` probes `__wrapped__` with a single + # string (not a list); the index factory relies on this returning the true + # vector size, so `__wrapped__` must handle a bare string input. + embedder = MarengoEmbedder() + embedder._client = _FakeClient(vector=[0.0] * 512) + assert embedder.get_embedding_dimension() == 512 + + +def test_video_parser_uploads_then_analyzes_and_deletes_asset(): + # Default: delete_assets=True -> asset is removed and id omitted from metadata. + parser = TwelveLabsVideoParser(prompt="What happens?") + parser._client = _FakeClient(analyze_text="A red car drives on a highway.") + + out = parser.__wrapped__(b"fake-video-bytes") + + assert out == [("A red car drives on a highway.", {})] + # Asset was uploaded via the direct method... + assert parser._client.assets.uploaded is not None + assert parser._client.assets.uploaded[0] == "direct" + # ...deleted afterwards so runs don't flood the asset list... + assert parser._client.assets.deleted == ["asset-123"] + # ...and Pegasus was called with the right model, prompt and asset. + (call,) = parser._client.analyze_calls + assert call["model_name"] == DEFAULT_PEGASUS_MODEL + assert call["prompt"] == "What happens?" + assert call["video"].asset_id == "asset-123" + + +def test_video_parser_keeps_asset_when_disabled(): + parser = TwelveLabsVideoParser(delete_assets=False) + parser._client = _FakeClient(analyze_text="desc") + + out = parser.__wrapped__(b"bytes") + + assert out == [("desc", {"twelvelabs_asset_id": "asset-123"})] + assert parser._client.assets.deleted == [] + + +def test_video_parser_failed_asset_raises(): + parser = TwelveLabsVideoParser() + client = _FakeClient() + client.assets.create = lambda **kw: _FakeAsset("a", "failed") + parser._client = client + + with pytest.raises(RuntimeError): + parser.__wrapped__(b"bytes") + # Failure happens during upload (before analyze); nothing was deleted. + assert client.assets.deleted == [] + + +def test_video_parser_deletes_asset_even_when_analyze_raises(): + parser = TwelveLabsVideoParser() + client = _FakeClient() + + def _boom(**kwargs): + raise RuntimeError("pegasus exploded") + + client.analyze = _boom + parser._client = client + + with pytest.raises(RuntimeError): + parser.__wrapped__(b"bytes") + # try/finally still cleaned up the uploaded asset. + assert client.assets.deleted == ["asset-123"] + + +# --- Live smoke test (requires TWELVELABS_API_KEY) ------------------------- + + +@pytest.mark.skipif( + not os.environ.get("TWELVELABS_API_KEY"), + reason="TWELVELABS_API_KEY not set; skipping live TwelveLabs call", +) +def test_marengo_live_embedding_is_512_dim(): + embedder = MarengoEmbedder() + vector = embedder._embed_one("a red car driving on a highway") + assert vector.shape == (512,) From e863cdd4face509764a354f2f16fbc71d0ec74b4 Mon Sep 17 00:00:00 2001 From: Abhi <171412961+iapoorv01@users.noreply.github.com> Date: Sat, 4 Jul 2026 10:50:09 +0530 Subject: [PATCH 2/2] refactor(llm): address reviewer feedback for twelvelabs integration --- pyproject.toml | 2 +- python/pathway/xpacks/llm/_utils.py | 28 +++++++++++++++++++ python/pathway/xpacks/llm/embedders.py | 37 ++------------------------ python/pathway/xpacks/llm/parsers.py | 27 ++----------------- 4 files changed, 33 insertions(+), 61 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 85e8d440b..40728c1d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,7 @@ xpack-sharepoint = [ "Office365-REST-Python-Client >= 2.5.3", ] twelvelabs = [ - "twelvelabs >= 0.3.0", + "twelvelabs >= 1.2.8", ] all = [ "pathway[pyfilesystem]", diff --git a/python/pathway/xpacks/llm/_utils.py b/python/pathway/xpacks/llm/_utils.py index c7b251ca8..bcd78a035 100644 --- a/python/pathway/xpacks/llm/_utils.py +++ b/python/pathway/xpacks/llm/_utils.py @@ -146,3 +146,31 @@ def _prepare_executor( raise ValueError( "`async_mode` should be set to either `batch_async` or `fully_async`" ) + + +def _resolve_twelvelabs_api_key(api_key: str | None) -> str: + import os + + key = api_key or os.environ.get("TWELVELABS_API_KEY") + if not key: + raise ValueError( + "TwelveLabs API key is missing. Pass `api_key=...` or set the " + "`TWELVELABS_API_KEY` environment variable." + ) + return key + + +def _build_twelvelabs_client(api_key: str | None): + from pathway.optional_import import optional_imports + + with optional_imports("twelvelabs"): + from twelvelabs import TwelveLabs + return TwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) + + +def _build_async_twelvelabs_client(api_key: str | None): + from pathway.optional_import import optional_imports + + with optional_imports("twelvelabs"): + from twelvelabs import AsyncTwelveLabs + return AsyncTwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) diff --git a/python/pathway/xpacks/llm/embedders.py b/python/pathway/xpacks/llm/embedders.py index 58a25c622..e8cefb86e 100644 --- a/python/pathway/xpacks/llm/embedders.py +++ b/python/pathway/xpacks/llm/embedders.py @@ -15,6 +15,8 @@ from pathway.xpacks.llm._utils import _coerce_sync, _extract_value_inside_dict from pathway.xpacks.llm.constants import OPENAI_EMBEDDERS_MAX_TOKENS +from ._utils import _build_async_twelvelabs_client, _build_twelvelabs_client + __all__ = [ "OpenAIEmbedder", "LiteLLMEmbedder", @@ -759,40 +761,6 @@ async def __wrapped__(self, input: str, **kwargs) -> np.ndarray: DEFAULT_MARENGO_MODEL = "marengo3.0" -def _resolve_twelvelabs_api_key(api_key: str | None) -> str: - import os - - key = api_key or os.environ.get("TWELVELABS_API_KEY") - if not key: - raise ValueError( - "TwelveLabs API key is missing. Pass `api_key=...` or set the " - "`TWELVELABS_API_KEY` environment variable." - ) - return key - - -def _build_twelvelabs_client(api_key: str | None): - try: - from twelvelabs import TwelveLabs - except ImportError as e: - raise ImportError( - "The `twelvelabs` package is required to use the TwelveLabs components. " - "Install it with `pip install pathway[twelvelabs]`." - ) from e - return TwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) - - -def _build_async_twelvelabs_client(api_key: str | None): - try: - from twelvelabs import AsyncTwelveLabs - except ImportError as e: - raise ImportError( - "The `twelvelabs` package is required to use the TwelveLabs components. " - "Install it with `pip install pathway[twelvelabs]`." - ) from e - return AsyncTwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) - - class MarengoEmbedder(BaseEmbedder): """Embed text using the TwelveLabs Marengo multimodal embedding model. @@ -890,6 +858,5 @@ async def __wrapped__(self, inputs: list[str], **kwargs) -> list[np.ndarray]: Returns: A list of 512-dimensional ``numpy`` arrays, one per input string. """ - import asyncio return list(await asyncio.gather(*[self._aembed_one(t) for t in inputs])) diff --git a/python/pathway/xpacks/llm/parsers.py b/python/pathway/xpacks/llm/parsers.py index 608faa3ca..4abb4333e 100644 --- a/python/pathway/xpacks/llm/parsers.py +++ b/python/pathway/xpacks/llm/parsers.py @@ -10,6 +10,7 @@ import io import logging import re +import time import warnings from abc import ABC, abstractmethod from collections import defaultdict @@ -29,7 +30,7 @@ from pathway.internals.config import _check_entitlements from pathway.optional_import import optional_imports from pathway.xpacks.llm import _parser_utils, llms, prompts -from pathway.xpacks.llm._utils import _prepare_executor +from pathway.xpacks.llm._utils import _build_twelvelabs_client, _prepare_executor from pathway.xpacks.llm.constants import DEFAULT_VISION_MODEL if TYPE_CHECKING: @@ -1373,29 +1374,6 @@ async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]: ) -def _resolve_twelvelabs_api_key(api_key: str | None) -> str: - import os - - key = api_key or os.environ.get("TWELVELABS_API_KEY") - if not key: - raise ValueError( - "TwelveLabs API key is missing. Pass `api_key=...` or set the " - "`TWELVELABS_API_KEY` environment variable." - ) - return key - - -def _build_twelvelabs_client(api_key: str | None): - try: - from twelvelabs import TwelveLabs - except ImportError as e: - raise ImportError( - "The `twelvelabs` package is required to use the TwelveLabs components. " - "Install it with `pip install pathway[twelvelabs]`." - ) from e - return TwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key)) - - class TwelveLabsVideoParser(pw.UDF): """Parse videos into text using the TwelveLabs Pegasus model. @@ -1468,7 +1446,6 @@ def client(self): def _upload_asset(self, contents: bytes) -> str: """Upload video bytes and return the asset id once it is ready.""" - import time asset = self.client.assets.create( method="direct", file=("video.mp4", contents), filename="video.mp4"