Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `pw.io.weaviate.write` writes a Pathway table to a [Weaviate](https://weaviate.io/) collection, keeping it in sync with the table: additions and updates upsert objects (keyed by a UUID derived from the required `primary_key`), and deletions remove them. An optional `vector` column is stored as the object's embedding; the remaining columns become object properties. The target collection must already exist. The connector writes in parallel across workers (`pathway spawn -n`), with `batch_size` and `concurrency` to tune throughput, and supports `api_key`/`headers` authentication for self-hosted and Weaviate Cloud deployments.
- `pw.io.pinecone.write` writes a Pathway table to a [Pinecone](https://www.pinecone.io/) index for use as a vector store in RAG pipelines. It keeps the index in sync with the current state of the table: a row is upserted under its record id and a row removed from the table is deleted from the index. By default the id is the table's internal row key (unique, and written in parallel across workers); pass a `primary_key` column to use your own ids instead, in which case the values must uniquely identify rows (a collision raises an error) and writing runs on a single worker. The `vector` column holds the dense embedding (`list[float]` or a 1-D `numpy.ndarray`); the remaining columns — or just those listed in `metadata_columns` — are stored as record metadata. Targets both Pinecone cloud and the local Pinecone emulator via the `host` parameter; the index must already exist with a matching dimension.
- `pw.io.s3.read`, `pw.io.s3.read_from_digital_ocean`, `pw.io.s3.read_from_wasabi`, `pw.io.minio.read`, `pw.io.pyfilesystem.read`, and `pw.xpacks.connectors.sharepoint.read` now support the `format="only_metadata"` option (already available in `pw.io.fs.read` and `pw.io.gdrive.read`). In this mode the connector tracks additions, modifications, and deletions of objects in the source but does not download their contents — the resulting table contains only the `_metadata` column. This is useful for monitoring changes in large buckets or directories without spending time and traffic on fetching the objects themselves. For S3 and MinIO this also skips the object downloads entirely at the engine level, not just the parsing of their contents.
- Promoted `TwelveLabsVideoParser` and `MarengoEmbedder` out of the Video RAG example template and into the native `pathway.xpacks.llm` core library. You can now build Video RAG applications directly in Pathway by installing `pip install pathway[twelvelabs]`.

### Changed
- `pw.xpacks.connectors.sharepoint.read` now supports the `max_backlog_size` parameter, already available in the other input connectors. It limits the number of entries read from SharePoint and kept in processing at any moment, which helps to avoid memory spikes when the source emits a large initial burst of data.
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ xpack-llm-docs = [
xpack-sharepoint = [
"Office365-REST-Python-Client >= 2.5.3",
]
twelvelabs = [
"twelvelabs >= 1.2.8",
]
all = [
"pathway[pyfilesystem]",
"pathway[sql]",
Expand All @@ -108,6 +111,7 @@ all = [
"pathway[xpack-sharepoint]",
"pathway[airbyte]",
"pathway[milvus]",
"pathway[twelvelabs]",
]
tests = [
"pytest >= 8.0.0, < 9.0.0",
Expand Down
28 changes: 28 additions & 0 deletions python/pathway/xpacks/llm/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,31 @@ def _prepare_executor(
raise ValueError(
"`async_mode` should be set to either `batch_async` or `fully_async`"
)


def _resolve_twelvelabs_api_key(api_key: str | None) -> str:
import os

key = api_key or os.environ.get("TWELVELABS_API_KEY")
if not key:
raise ValueError(
"TwelveLabs API key is missing. Pass `api_key=...` or set the "
"`TWELVELABS_API_KEY` environment variable."
)
return key


def _build_twelvelabs_client(api_key: str | None):
from pathway.optional_import import optional_imports

with optional_imports("twelvelabs"):
from twelvelabs import TwelveLabs
return TwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key))


def _build_async_twelvelabs_client(api_key: str | None):
from pathway.optional_import import optional_imports

with optional_imports("twelvelabs"):
from twelvelabs import AsyncTwelveLabs
return AsyncTwelveLabs(api_key=_resolve_twelvelabs_api_key(api_key))
107 changes: 107 additions & 0 deletions python/pathway/xpacks/llm/embedders.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
from pathway.xpacks.llm._utils import _coerce_sync, _extract_value_inside_dict
from pathway.xpacks.llm.constants import OPENAI_EMBEDDERS_MAX_TOKENS

from ._utils import _build_async_twelvelabs_client, _build_twelvelabs_client

__all__ = [
"OpenAIEmbedder",
"LiteLLMEmbedder",
"SentenceTransformerEmbedder",
"GeminiEmbedder",
"BedrockEmbedder",
"MarengoEmbedder",
]


Expand Down Expand Up @@ -753,3 +756,107 @@ async def __wrapped__(self, input: str, **kwargs) -> np.ndarray:
embedding = result.get("embedding", result.get("embeddings", [[]])[0])

return np.array(embedding)


DEFAULT_MARENGO_MODEL = "marengo3.0"


class MarengoEmbedder(BaseEmbedder):
"""Embed text using the TwelveLabs Marengo multimodal embedding model.

Marengo returns 512-dimensional embeddings in a shared multimodal space, so the
text it produces is directly comparable with image, audio and video embeddings
from the same model. This makes it a natural retriever embedder for pipelines
that index video with :class:`TwelveLabsVideoParser`.

Args:
model: Marengo model name. Defaults to ``"marengo3.0"``.
api_key: TwelveLabs API key. If ``None``, the SDK reads it from the
``TWELVELABS_API_KEY`` environment variable.
capacity: Maximum number of concurrent operations. Defaults to ``None``
(no specific limit).
retry_strategy: Strategy for handling retries. Defaults to
:py:class:`~pathway.udfs.ExponentialBackoffRetryStrategy`.
cache_strategy: Pathway caching strategy. Defaults to ``None``.

Example:

>>> import pathway as pw # doctest: +SKIP
>>> from pathway.xpacks.llm.embedders import MarengoEmbedder # doctest: +SKIP
>>> embedder = MarengoEmbedder() # doctest: +SKIP
"""

def __init__(
self,
*,
model: str = DEFAULT_MARENGO_MODEL,
api_key: str | None = None,
capacity: int | None = None,
retry_strategy: (
udfs.AsyncRetryStrategy | None
) = pw.udfs.ExponentialBackoffRetryStrategy(),
cache_strategy: udfs.CacheStrategy | None = None,
):
executor = udfs.async_executor(capacity=capacity, retry_strategy=retry_strategy)
# Marengo embeds one text per request, so keep batches at size 1.
super().__init__(
executor=executor, cache_strategy=cache_strategy, max_batch_size=1
)
self.model = model
self._api_key = api_key
self._client = None
self._aclient = None

@property
def client(self):
if self._client is None:
self._client = _build_twelvelabs_client(self._api_key)
return self._client

@property
def aclient(self):
if self._aclient is None:
self._aclient = _build_async_twelvelabs_client(self._api_key)
return self._aclient

def get_embedding_dimension(self, **kwargs) -> int:
"""Return the embedding dimension (512 for Marengo).

This is a one-time, setup-time probe: Pathway calls it once while
building the index, not on the per-document hot path. The single
synchronous request issued here is therefore intentional and acceptable
(the actual embedding hot path runs asynchronously via ``__wrapped__``).

The base implementation probes ``__wrapped__`` with a single string and
takes ``len`` of the result; since this embedder always returns a list of
vectors, probe with a one-element list and measure the first vector
instead (mirroring Pathway's ``SentenceTransformerEmbedder``).
"""
return len(self._embed_one("."))

def _embed_one(self, text: str) -> np.ndarray:
"""Synchronous single-text embed, used only for the setup-time probe."""
response = self.client.embed.create(model_name=self.model, text=text)
vector = response.text_embedding.segments[0].float_
return np.array(vector, dtype=np.float32)

async def _aembed_one(self, text: str) -> np.ndarray:
resp = await self.aclient.embed.create(model_name=self.model, text=text)
vector = resp.text_embedding.segments[0].float_
return np.array(vector, dtype=np.float32)

async def __wrapped__(self, inputs: list[str], **kwargs) -> list[np.ndarray]:
"""Embed the given texts with Marengo.

Marengo embeds one text per request, so the requests are issued
concurrently on the async TwelveLabs client (``AsyncTwelveLabs``) rather
than serially, keeping the embedding hot path non-blocking.

Args:
inputs: the strings to embed.

Returns:
A list of 512-dimensional ``numpy`` arrays, one per input string.
"""

return list(await asyncio.gather(*[self._aembed_one(t) for t in inputs]))
146 changes: 145 additions & 1 deletion python/pathway/xpacks/llm/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io
import logging
import re
import time
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
Expand All @@ -29,7 +30,7 @@
from pathway.internals.config import _check_entitlements
from pathway.optional_import import optional_imports
from pathway.xpacks.llm import _parser_utils, llms, prompts
from pathway.xpacks.llm._utils import _prepare_executor
from pathway.xpacks.llm._utils import _build_twelvelabs_client, _prepare_executor
from pathway.xpacks.llm.constants import DEFAULT_VISION_MODEL

if TYPE_CHECKING:
Expand Down Expand Up @@ -1363,3 +1364,146 @@ async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]:
)
text = getattr(transcript, "text", str(transcript))
return [(text, {})]


DEFAULT_PEGASUS_MODEL = "pegasus1.5"
DEFAULT_PROMPT = (
"Describe this video in detail. Summarize what happens, who and what appears, "
"the setting, any spoken or on-screen text, and the overall topic. "
"Write the description so it can be used to answer questions about the video."
)


class TwelveLabsVideoParser(pw.UDF):
"""Parse videos into text using the TwelveLabs Pegasus model.

The parser uploads the incoming video bytes to TwelveLabs as an asset, waits
for the asset to be ready, and then asks Pegasus to produce a textual
description of the video using ``prompt``. The returned text is suitable for
chunking, embedding and indexing by the standard Pathway RAG components.

By default the uploaded asset is deleted once the analysis finishes (even if
the analysis fails), so repeated runs do not flood the TwelveLabs asset list.
Set ``delete_assets=False`` to keep the assets around for reuse or
inspection; in that case the emitted ``twelvelabs_asset_id`` metadata refers
to a live, retrievable asset.

Args:
prompt: Instruction sent to Pegasus describing what to extract from the
video. Defaults to a generic, RAG-oriented description prompt.
model: Pegasus model name. Defaults to ``"pegasus1.5"``.
api_key: TwelveLabs API key. If ``None``, the SDK reads it from the
``TWELVELABS_API_KEY`` environment variable.
max_tokens: Maximum number of tokens Pegasus may generate. Defaults to 2048.
temperature: Sampling temperature for Pegasus. Defaults to ``None`` (SDK default).
asset_poll_interval: Seconds between asset-readiness checks. Defaults to 5.
asset_timeout: Maximum number of seconds to wait for an uploaded asset to
become ready before raising. Defaults to 600.
delete_assets: If ``True`` (the default), the uploaded asset is deleted
after the analysis completes, so repeated runs do not accumulate
assets in your TwelveLabs account. When ``True``, the emitted
``twelvelabs_asset_id`` metadata is omitted because the asset no
longer exists. Set to ``False`` to keep assets (e.g. for reuse or
debugging), in which case the id is included in the metadata.
cache_strategy: Pathway caching strategy. To enable caching, pass a valid
:py:class:`~pathway.udfs.CacheStrategy`. Defaults to ``None``.

Example:

>>> import pathway as pw # doctest: +SKIP
>>> from pathway.xpacks.llm.parsers import TwelveLabsVideoParser # doctest: +SKIP
>>> parser = TwelveLabsVideoParser() # doctest: +SKIP
"""

def __init__(
self,
prompt: str = DEFAULT_PROMPT,
model: str = DEFAULT_PEGASUS_MODEL,
api_key: str | None = None,
max_tokens: int = 2048,
temperature: float | None = None,
asset_poll_interval: float = 5.0,
asset_timeout: float = 600.0,
delete_assets: bool = True,
cache_strategy: udfs.CacheStrategy | None = None,
):
super().__init__(cache_strategy=cache_strategy)
self.prompt = prompt
self.model = model
self.max_tokens = max_tokens
self.temperature = temperature
self.asset_poll_interval = asset_poll_interval
self.asset_timeout = asset_timeout
self.delete_assets = delete_assets
self._api_key = api_key
self._client = None

@property
def client(self):
if self._client is None:
self._client = _build_twelvelabs_client(self._api_key)
return self._client

def _upload_asset(self, contents: bytes) -> str:
"""Upload video bytes and return the asset id once it is ready."""

asset = self.client.assets.create(
method="direct", file=("video.mp4", contents), filename="video.mp4"
)
deadline = time.monotonic() + self.asset_timeout
while asset.status not in ("ready", "failed"):
if time.monotonic() > deadline:
raise TimeoutError(
f"TwelveLabs asset {asset.id} was not ready after "
f"{self.asset_timeout}s (last status: {asset.status})."
)
time.sleep(self.asset_poll_interval)
asset = self.client.assets.retrieve(asset.id)
if asset.status == "failed":
raise RuntimeError(f"TwelveLabs asset {asset.id} failed to process.")
return asset.id

def __wrapped__(self, contents: bytes, **kwargs) -> list[tuple[str, dict]]:
from twelvelabs.types.video_context import VideoContext_AssetId

asset_id = self._upload_asset(contents)
try:
logger.info("Analyzing TwelveLabs asset %s with Pegasus...", asset_id)
analyze_kwargs: dict = dict(
model_name=self.model,
video=VideoContext_AssetId(asset_id=asset_id),
prompt=self.prompt,
max_tokens=self.max_tokens,
)
if self.temperature is not None:
analyze_kwargs["temperature"] = self.temperature
response = self.client.analyze(**analyze_kwargs)
text = response.data or ""
finally:
if self.delete_assets:
# Remove the per-run asset so repeated runs do not flood the
# TwelveLabs asset list. Best-effort: a cleanup failure must not
# mask the analysis result (or an analysis error above).
try:
self.client.assets.delete(asset_id)
except Exception: # noqa: BLE001
logger.warning("Failed to delete TwelveLabs asset %s.", asset_id)
# When the asset has been deleted the id no longer resolves, so only
# surface it in the metadata when the asset is kept around.
metadata = {} if self.delete_assets else {"twelvelabs_asset_id": asset_id}
return [(text, metadata)]

def __call__(self, contents: pw.ColumnExpression, **kwargs) -> pw.ColumnExpression:
"""Parse the video document.

Args:
contents: Column with the raw bytes of each video.

Returns:
A column with a list of ``(text, metadata)`` pairs for each video.
When ``delete_assets=False`` the metadata records the TwelveLabs
``twelvelabs_asset_id`` used for the analysis; with the default
``delete_assets=True`` the asset is removed afterwards and the id is
omitted (it would no longer resolve).
"""
return super().__call__(contents, **kwargs)
Loading
Loading