Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
)
from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient
from sift_client._internal.low_level_wrappers.ingestion import IngestionLowLevelClient
from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient
from sift_client._internal.low_level_wrappers.ping import PingLowLevelClient
from sift_client._internal.low_level_wrappers.remote_files import RemoteFilesLowLevelClient
from sift_client._internal.low_level_wrappers.reports import ReportsLowLevelClient
Expand All @@ -18,6 +19,7 @@
"CalculatedChannelsLowLevelClient",
"ChannelsLowLevelClient",
"IngestionLowLevelClient",
"JobsLowLevelClient",
"PingLowLevelClient",
"RemoteFilesLowLevelClient",
"ReportsLowLevelClient",
Expand Down
152 changes: 152 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Low-level wrapper for the JobService API.

This module provides thin wrappers around the autogenerated bindings for the JobService API.
It handles common concerns like error handling and pagination.

It provides an asynchronous client for the JobService API.
"""

from __future__ import annotations

from typing import Any, cast

from sift.jobs.v1.jobs_pb2 import (
CancelJobRequest,
ListJobsRequest,
ListJobsResponse,
RetryJobRequest,
RetryJobResponse,
)
from sift.jobs.v1.jobs_pb2_grpc import JobServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.sift_types.job import Job
from sift_client.transport import GrpcClient, WithGrpcClient


class JobsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the JobService API.

This class provides a thin wrapper around the autogenerated bindings for the JobService API.
"""

def __init__(self, grpc_client: GrpcClient):
"""Initialize the JobsLowLevelClient.

Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)

async def list_jobs(
self,
*,
page_size: int | None = None,
page_token: str | None = None,
query_filter: str | None = None,
organization_id: str | None = None,
order_by: str | None = None,
) -> tuple[list[Job], str]:
"""List jobs with pagination.

Args:
page_size: Maximum number of jobs to return.
page_token: Page token from a previous list call.
query_filter: CEL filter string.
organization_id: Organization ID (required if user belongs to multiple orgs).
order_by: How to order results.

Returns:
The ListJobsResponse containing jobs and next page token.
"""
request_kwargs: dict[str, Any] = {}
if page_size is not None:
request_kwargs["page_size"] = page_size
if page_token is not None:
request_kwargs["page_token"] = page_token
if query_filter is not None:
request_kwargs["filter"] = query_filter
if organization_id is not None:
request_kwargs["organization_id"] = organization_id
if order_by is not None:
request_kwargs["order_by"] = order_by

request = ListJobsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(JobServiceStub).ListJobs(request)
response = cast("ListJobsResponse", response)
jobs = [Job._from_proto(job) for job in response.jobs]
return jobs, response.next_page_token

async def list_all_jobs(
self,
*,
query_filter: str | None = None,
organization_id: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
) -> list[Job]:
"""List all jobs, handling pagination automatically.

Args:
query_filter: CEL filter string.
organization_id: Organization ID (required if user belongs to multiple orgs).
order_by: How to order results.
max_results: Maximum number of results to return across all pages.

Returns:
A list of Job objects.
"""
kwargs: dict[str, Any] = {}
if query_filter is not None:
kwargs["query_filter"] = query_filter
if organization_id is not None:
kwargs["organization_id"] = organization_id

jobs = await self._handle_pagination(
func=self.list_jobs,
kwargs=kwargs,
order_by=order_by,
max_results=max_results,
)

return jobs

async def get_job(self, job_id: str) -> Job:
"""Get a job by ID.

Args:
job_id: The ID of the job to retrieve.

Returns:
The Job object.
"""
jobs = await self.list_all_jobs(
query_filter=f'job_id == "{job_id}"',
max_results=1,
)
if not jobs:
raise ValueError(f"Job not found: {job_id}")
return jobs[0]

async def cancel_job(self, job_id: str) -> None:
"""Cancel a job.

Args:
job_id: The ID of the job to cancel.
"""
request = CancelJobRequest(job_id=job_id)
await self._grpc_client.get_stub(JobServiceStub).CancelJob(request)

async def retry_job(self, job_id: str) -> Job:
"""Retry a job.

Args:
job_id: The ID of the job to retry.

Returns:
The updated Job object.
"""
request = RetryJobRequest(job_id=job_id)
response = await self._grpc_client.get_stub(JobServiceStub).RetryJob(request)
response = cast("RetryJobResponse", response)
return Job._from_proto(response.job)
1 change: 1 addition & 0 deletions python/lib/sift_client/_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def mock_client():
client.tags = MagicMock()
client.test_results = MagicMock()
client.file_attachments = MagicMock()
client.jobs = MagicMock()
client.async_ = MagicMock(spec=AsyncAPIs)
client.async_.ingestion = MagicMock()
return client
Expand Down
Loading
Loading