From 1e98bd795db4183b9c6add33149e22ea4d80e0a9 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:24:26 -0800 Subject: [PATCH 1/9] jobs initial commit --- .../_internal/low_level_wrappers/__init__.py | 2 + .../_internal/low_level_wrappers/jobs.py | 152 +++++++ python/lib/sift_client/_tests/conftest.py | 1 + .../sift_client/_tests/resources/test_jobs.py | 416 ++++++++++++++++++ .../sift_client/_tests/sift_types/test_job.py | 277 ++++++++++++ python/lib/sift_client/client.py | 7 + python/lib/sift_client/resources/__init__.py | 4 + python/lib/sift_client/resources/jobs.py | 157 +++++++ .../resources/sync_stubs/__init__.py | 3 + python/lib/sift_client/sift_types/__init__.py | 7 + python/lib/sift_client/sift_types/job.py | 274 ++++++++++++ python/lib/sift_client/util/util.py | 4 + 12 files changed, 1304 insertions(+) create mode 100644 python/lib/sift_client/_internal/low_level_wrappers/jobs.py create mode 100644 python/lib/sift_client/_tests/resources/test_jobs.py create mode 100644 python/lib/sift_client/_tests/sift_types/test_job.py create mode 100644 python/lib/sift_client/resources/jobs.py create mode 100644 python/lib/sift_client/sift_types/job.py diff --git a/python/lib/sift_client/_internal/low_level_wrappers/__init__.py b/python/lib/sift_client/_internal/low_level_wrappers/__init__.py index 39caca801..f90a7ff9a 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/__init__.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/__init__.py @@ -4,6 +4,7 @@ ) from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient from sift_client._internal.low_level_wrappers.ingestion import IngestionLowLevelClient +from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient from sift_client._internal.low_level_wrappers.ping import PingLowLevelClient from sift_client._internal.low_level_wrappers.remote_files import RemoteFilesLowLevelClient from sift_client._internal.low_level_wrappers.reports import ReportsLowLevelClient @@ -18,6 +19,7 @@ "CalculatedChannelsLowLevelClient", "ChannelsLowLevelClient", "IngestionLowLevelClient", + "JobsLowLevelClient", "PingLowLevelClient", "RemoteFilesLowLevelClient", "ReportsLowLevelClient", diff --git a/python/lib/sift_client/_internal/low_level_wrappers/jobs.py b/python/lib/sift_client/_internal/low_level_wrappers/jobs.py new file mode 100644 index 000000000..59a9b69fd --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/jobs.py @@ -0,0 +1,152 @@ +"""Low-level wrapper for the JobService API. + +This module provides thin wrappers around the autogenerated bindings for the JobService API. +It handles common concerns like error handling and pagination. + +It provides an asynchronous client for the JobService API. +""" + +from __future__ import annotations + +from typing import Any, cast + +from sift.jobs.v1.jobs_pb2 import ( + CancelJobRequest, + ListJobsRequest, + ListJobsResponse, + RetryJobRequest, + RetryJobResponse, +) +from sift.jobs.v1.jobs_pb2_grpc import JobServiceStub + +from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client.sift_types.job import Job +from sift_client.transport import GrpcClient, WithGrpcClient + + +class JobsLowLevelClient(LowLevelClientBase, WithGrpcClient): + """Low-level client for the JobService API. + + This class provides a thin wrapper around the autogenerated bindings for the JobService API. + """ + + def __init__(self, grpc_client: GrpcClient): + """Initialize the JobsLowLevelClient. + + Args: + grpc_client: The gRPC client to use for making API calls. + """ + super().__init__(grpc_client) + + async def list_jobs( + self, + *, + page_size: int | None = None, + page_token: str | None = None, + query_filter: str | None = None, + organization_id: str | None = None, + order_by: str | None = None, + ) -> tuple[list[Job], str]: + """List jobs with pagination. + + Args: + page_size: Maximum number of jobs to return. + page_token: Page token from a previous list call. + query_filter: CEL filter string. + organization_id: Organization ID (required if user belongs to multiple orgs). + order_by: How to order results. + + Returns: + The ListJobsResponse containing jobs and next page token. + """ + request_kwargs: dict[str, Any] = {} + if page_size is not None: + request_kwargs["page_size"] = page_size + if page_token is not None: + request_kwargs["page_token"] = page_token + if query_filter is not None: + request_kwargs["filter"] = query_filter + if organization_id is not None: + request_kwargs["organization_id"] = organization_id + if order_by is not None: + request_kwargs["order_by"] = order_by + + request = ListJobsRequest(**request_kwargs) + response = await self._grpc_client.get_stub(JobServiceStub).ListJobs(request) + response = cast("ListJobsResponse", response) + jobs = [Job._from_proto(job) for job in response.jobs] + return jobs, response.next_page_token + + async def list_all_jobs( + self, + *, + query_filter: str | None = None, + organization_id: str | None = None, + order_by: str | None = None, + max_results: int | None = None, + ) -> list[Job]: + """List all jobs, handling pagination automatically. + + Args: + query_filter: CEL filter string. + organization_id: Organization ID (required if user belongs to multiple orgs). + order_by: How to order results. + max_results: Maximum number of results to return across all pages. + + Returns: + A list of Job objects. + """ + kwargs: dict[str, Any] = {} + if query_filter is not None: + kwargs["query_filter"] = query_filter + if organization_id is not None: + kwargs["organization_id"] = organization_id + + jobs = await self._handle_pagination( + func=self.list_jobs, + kwargs=kwargs, + order_by=order_by, + max_results=max_results, + ) + + return jobs + + async def get_job(self, job_id: str) -> Job: + """Get a job by ID. + + Args: + job_id: The ID of the job to retrieve. + + Returns: + The Job object. + """ + jobs = await self.list_all_jobs( + query_filter=f'job_id == "{job_id}"', + max_results=1, + ) + if not jobs: + raise ValueError(f"Job not found: {job_id}") + return jobs[0] + + async def cancel_job(self, job_id: str) -> None: + """Cancel a job. + + Args: + job_id: The ID of the job to cancel. + """ + request = CancelJobRequest(job_id=job_id) + await self._grpc_client.get_stub(JobServiceStub).CancelJob(request) + + async def retry_job(self, job_id: str) -> Job: + """Retry a job. + + Args: + job_id: The ID of the job to retry. + + Returns: + The updated Job object. + """ + request = RetryJobRequest(job_id=job_id) + response = await self._grpc_client.get_stub(JobServiceStub).RetryJob(request) + response = cast("RetryJobResponse", response) + return Job._from_proto(response.job) diff --git a/python/lib/sift_client/_tests/conftest.py b/python/lib/sift_client/_tests/conftest.py index 3efaf4d93..1d44bb437 100644 --- a/python/lib/sift_client/_tests/conftest.py +++ b/python/lib/sift_client/_tests/conftest.py @@ -48,6 +48,7 @@ def mock_client(): client.tags = MagicMock() client.test_results = MagicMock() client.file_attachments = MagicMock() + client.jobs = MagicMock() client.async_ = MagicMock(spec=AsyncAPIs) client.async_.ingestion = MagicMock() return client diff --git a/python/lib/sift_client/_tests/resources/test_jobs.py b/python/lib/sift_client/_tests/resources/test_jobs.py new file mode 100644 index 000000000..fb6e9cc43 --- /dev/null +++ b/python/lib/sift_client/_tests/resources/test_jobs.py @@ -0,0 +1,416 @@ +"""Pytest tests for the Jobs API. + +These tests demonstrate and validate the usage of the Jobs API including: +- Basic job operations (get, list) +- Job filtering and searching +- Job cancellation and retry +- Error handling and edge cases +""" + +from datetime import datetime, timedelta, timezone + +import pytest +from grpc.aio import AioRpcError + +from sift_client import SiftClient +from sift_client.resources import JobsAPI, JobsAPIAsync +from sift_client.sift_types import Job +from sift_client.sift_types.job import JobStatus, JobType + +pytestmark = pytest.mark.integration + + +def test_client_binding(sift_client): + assert sift_client.jobs + assert isinstance(sift_client.jobs, JobsAPI) + assert sift_client.async_.jobs + assert isinstance(sift_client.async_.jobs, JobsAPIAsync) + + +@pytest.fixture +def jobs_api_async(sift_client: SiftClient): + """Get the async jobs API instance.""" + return sift_client.async_.jobs + + +@pytest.fixture +def jobs_api_sync(sift_client: SiftClient): + """Get the synchronous jobs API instance.""" + return sift_client.jobs + + +@pytest.fixture +def test_job(jobs_api_sync): + """Get a test job to use in tests.""" + jobs = jobs_api_sync.list_(limit=1) + assert jobs + assert len(jobs) >= 1 + return jobs[0] + + +class TestJobsAPIAsync: + """Test suite for the async Jobs API functionality.""" + + class TestGet: + """Tests for the async get method.""" + + @pytest.mark.asyncio + async def test_get_by_id(self, jobs_api_async, test_job): + """Test getting a specific job by ID.""" + retrieved_job = await jobs_api_async.get(test_job.id_) + + assert isinstance(retrieved_job, Job) + assert retrieved_job.id_ == test_job.id_ + assert retrieved_job.job_type is not None + assert retrieved_job.job_status is not None + + class TestList: + """Tests for the async list_ method.""" + + @pytest.mark.asyncio + async def test_basic_list(self, jobs_api_async): + """Test basic job listing functionality.""" + jobs = await jobs_api_async.list_(limit=5) + + assert isinstance(jobs, list) + assert len(jobs) <= 5 + + if jobs: + job = jobs[0] + assert isinstance(job, Job) + assert job.id_ is not None + assert job.job_type is not None + assert job.job_status is not None + assert job.organization_id is not None + assert job.created_by_user_id is not None + assert job.modified_by_user_id is not None + assert job.created_date is not None + assert job.modified_date is not None + + @pytest.mark.asyncio + async def test_list_with_job_ids_filter(self, jobs_api_async): + """Test job listing with job IDs filter.""" + all_jobs = await jobs_api_async.list_(limit=3) + + if all_jobs: + job_ids = [j.id_ for j in all_jobs] + filtered_jobs = await jobs_api_async.list_(job_ids=job_ids) + + assert isinstance(filtered_jobs, list) + assert len(filtered_jobs) >= len(all_jobs) + + for job in filtered_jobs: + assert job.id_ in job_ids + + @pytest.mark.asyncio + async def test_list_with_job_type_filter(self, jobs_api_async): + """Test job listing with job type filter.""" + # Test with DATA_IMPORT type + jobs = await jobs_api_async.list_(job_type=JobType.DATA_IMPORT, limit=5) + + assert isinstance(jobs, list) + + for job in jobs: + assert job.job_type == JobType.DATA_IMPORT + + @pytest.mark.asyncio + async def test_list_with_job_status_filter(self, jobs_api_async): + """Test job listing with job status filter.""" + # Test with FINISHED status + jobs = await jobs_api_async.list_(job_status=JobStatus.FINISHED, limit=5) + + assert isinstance(jobs, list) + + for job in jobs: + assert job.job_status == JobStatus.FINISHED + + @pytest.mark.asyncio + async def test_list_with_created_date_filters(self, jobs_api_async): + """Test job listing with created date filters.""" + one_year_ago = datetime.now(timezone.utc) - timedelta(days=365) + jobs = await jobs_api_async.list_(created_after=one_year_ago, limit=5) + + assert isinstance(jobs, list) + + for job in jobs: + assert job.created_date >= one_year_ago + + @pytest.mark.asyncio + async def test_list_with_modified_date_filters(self, jobs_api_async): + """Test job listing with modified date filters.""" + one_year_ago = datetime.now(timezone.utc) - timedelta(days=365) + jobs = await jobs_api_async.list_(modified_after=one_year_ago, limit=5) + + assert isinstance(jobs, list) + + for job in jobs: + assert job.modified_date >= one_year_ago + + @pytest.mark.asyncio + async def test_list_with_started_date_filters(self, jobs_api_async): + """Test job listing with started date filters.""" + one_year_ago = datetime.now(timezone.utc) - timedelta(days=365) + jobs = await jobs_api_async.list_(started_date_after=one_year_ago, limit=5) + + assert isinstance(jobs, list) + + for job in jobs: + if job.started_date: + assert job.started_date >= one_year_ago + + @pytest.mark.asyncio + async def test_list_with_completed_date_filters(self, jobs_api_async): + """Test job listing with completed date filters.""" + one_year_ago = datetime.now(timezone.utc) - timedelta(days=365) + jobs = await jobs_api_async.list_(completed_date_after=one_year_ago, limit=5) + + assert isinstance(jobs, list) + + for job in jobs: + if job.completed_date: + assert job.completed_date >= one_year_ago + + @pytest.mark.asyncio + async def test_list_with_limit(self, jobs_api_async): + """Test job listing with different limits.""" + jobs_1 = await jobs_api_async.list_(limit=1) + assert isinstance(jobs_1, list) + assert len(jobs_1) <= 1 + + jobs_3 = await jobs_api_async.list_(limit=3) + assert isinstance(jobs_3, list) + assert len(jobs_3) <= 3 + + @pytest.mark.asyncio + async def test_list_with_multiple_filters(self, jobs_api_async): + """Test job listing with multiple filters combined.""" + one_month_ago = datetime.now(timezone.utc) - timedelta(days=30) + jobs = await jobs_api_async.list_( + job_type=JobType.DATA_IMPORT, + job_status=JobStatus.FINISHED, + created_after=one_month_ago, + limit=5, + ) + + assert isinstance(jobs, list) + + for job in jobs: + assert job.job_type == JobType.DATA_IMPORT + assert job.job_status == JobStatus.FINISHED + assert job.created_date >= one_month_ago + + class TestCancel: + """Tests for the async cancel method.""" + + @pytest.mark.asyncio + async def test_cancel_finished_job_no_effect(self, jobs_api_async): + """Test that cancelling a finished job has no effect.""" + # Find a finished job + finished_jobs = await jobs_api_async.list_(job_status=JobStatus.FINISHED, limit=1) + + if finished_jobs: + job = finished_jobs[0] + original_status = job.job_status + + # Cancel should not raise an error but won't change status + await jobs_api_async.cancel(job) + + # Verify status hasn't changed + updated_job = await jobs_api_async.get(job.id_) + assert updated_job.job_status == original_status + + @pytest.mark.asyncio + async def test_cancel_with_job_id_string(self, jobs_api_async): + """Test cancelling a job by passing job ID as string.""" + # Find a finished job to safely test with + finished_jobs = await jobs_api_async.list_(job_status=JobStatus.FINISHED, limit=1) + + if finished_jobs: + job = finished_jobs[0] + # Cancel using job ID string (should not raise error) + await jobs_api_async.cancel(job.id_) + + class TestRetry: + """Tests for the async retry method.""" + + @pytest.mark.asyncio + async def test_retry_failed_job(self, jobs_api_async): + """Test retrying a failed job.""" + # Find a failed job + failed_jobs = await jobs_api_async.list_(job_status=JobStatus.FAILED, limit=1) + + if failed_jobs: + job = failed_jobs[0] + original_id = job.id_ + + # Retry the job + retried_job = await jobs_api_async.retry(job) + + # Verify we got a job back + assert isinstance(retried_job, Job) + assert retried_job.id_ == original_id + # Status might be CREATED or RUNNING after retry + assert retried_job.job_status in [ + JobStatus.CREATED, + JobStatus.RUNNING, + JobStatus.FAILED, # May fail again immediately + ] + + @pytest.mark.asyncio + async def test_retry_with_job_id_string(self, jobs_api_async): + """Test retrying a job by passing job ID as string.""" + # Find a failed job + failed_jobs = await jobs_api_async.list_(job_status=JobStatus.FAILED, limit=1) + + if failed_jobs: + job = failed_jobs[0] + + # Retry using job ID string + retried_job = await jobs_api_async.retry(job.id_) + + assert isinstance(retried_job, Job) + assert retried_job.id_ == job.id_ + + @pytest.mark.asyncio + async def test_retry_finished_job_no_effect(self, jobs_api_async): + """Test that retrying a finished job has no effect.""" + # Find a finished job + finished_jobs = await jobs_api_async.list_(job_status=JobStatus.FINISHED, limit=1) + + if finished_jobs: + job = finished_jobs[0] + original_status = job.job_status + + # Retry should not raise an error but won't change status + with pytest.raises(AioRpcError, match="job cannot be retried"): + retried_job = await jobs_api_async.retry(job) + + class TestJobProperties: + """Tests for job property methods.""" + + @pytest.mark.asyncio + async def test_job_status_properties(self, jobs_api_async): + """Test job status property methods.""" + # Test with different job statuses + finished_jobs = await jobs_api_async.list_(job_status=JobStatus.FINISHED, limit=1) + if finished_jobs: + job = finished_jobs[0] + assert job.is_finished is True + assert job.is_failed is False + assert job.is_cancelled is False + assert job.is_in_progress is False + + failed_jobs = await jobs_api_async.list_(job_status=JobStatus.FAILED, limit=1) + if failed_jobs: + job = failed_jobs[0] + assert job.is_failed is True + assert job.is_finished is False + assert job.is_cancelled is False + assert job.is_in_progress is False + + running_jobs = await jobs_api_async.list_(job_status=JobStatus.RUNNING, limit=1) + if running_jobs: + job = running_jobs[0] + assert job.is_in_progress is True + assert job.is_finished is False + assert job.is_failed is False + assert job.is_cancelled is False + + cancelled_jobs = await jobs_api_async.list_(job_status=JobStatus.CANCELLED, limit=1) + if cancelled_jobs: + job = cancelled_jobs[0] + assert job.is_cancelled is True + assert job.is_finished is False + assert job.is_failed is False + assert job.is_in_progress is False + + @pytest.mark.asyncio + async def test_job_details_by_type(self, jobs_api_async): + """Test that job details are populated correctly based on job type.""" + # Test DATA_IMPORT jobs + import_jobs = await jobs_api_async.list_(job_type=JobType.DATA_IMPORT, limit=1) + if import_jobs: + job = import_jobs[0] + if job.job_details: + assert job.job_details.data_import_id is not None + + # Test RULE_EVALUATION jobs + rule_eval_jobs = await jobs_api_async.list_(job_type=JobType.RULE_EVALUATION, limit=1) + if rule_eval_jobs: + job = rule_eval_jobs[0] + if job.job_details: + assert job.job_details.report_id is not None + + # Test DATA_EXPORT jobs + export_jobs = await jobs_api_async.list_(job_type=JobType.DATA_EXPORT, limit=1) + if export_jobs: + job = export_jobs[0] + if job.job_details: + assert job.job_details.storage_key is not None + + @pytest.mark.asyncio + async def test_job_status_details_for_data_import(self, jobs_api_async): + """Test that status details are populated for data import jobs.""" + import_jobs = await jobs_api_async.list_( + job_type=JobType.DATA_IMPORT, + job_status=JobStatus.FINISHED, + limit=5, + ) + + if import_jobs: + # Find a job with status details + for job in import_jobs: + if job.job_status_details: + assert job.job_status_details.points_processed is not None + assert job.job_status_details.points_total is not None + break + + class TestJobInstanceMethods: + """Tests for job instance methods.""" + + @pytest.mark.asyncio + async def test_job_refresh(self, jobs_api_async, test_job): + """Test refreshing a job to get latest data.""" + # Get the job + job = await jobs_api_async.get(test_job.id_) + + # Refresh should work without error + # Note: We can't easily test that data actually changes + # but we can verify the method works + original_modified = job.modified_date + refreshed_job = job.refresh() + + assert isinstance(refreshed_job, Job) + assert refreshed_job.id_ == job.id_ + # Modified date should be the same or newer + assert refreshed_job.modified_date >= original_modified + + +class TestJobsAPISync: + """Test suite for the synchronous Jobs API functionality. + + Only includes a single test for basic sync generation. No specific sync behavior difference tests are needed. + """ + + class TestGet: + """Tests for the sync get method.""" + + def test_get_by_id(self, jobs_api_sync, test_job): + """Test getting a specific job by ID synchronously.""" + retrieved_job = jobs_api_sync.get(test_job.id_) + + assert isinstance(retrieved_job, Job) + assert retrieved_job.id_ == test_job.id_ + + class TestList: + """Tests for the sync list method.""" + + def test_basic_list(self, jobs_api_sync): + """Test basic synchronous job listing functionality.""" + jobs = jobs_api_sync.list_(limit=5) + + assert isinstance(jobs, list) + assert len(jobs) <= 5 + + if jobs: + assert isinstance(jobs[0], Job) diff --git a/python/lib/sift_client/_tests/sift_types/test_job.py b/python/lib/sift_client/_tests/sift_types/test_job.py new file mode 100644 index 000000000..790f6c3cf --- /dev/null +++ b/python/lib/sift_client/_tests/sift_types/test_job.py @@ -0,0 +1,277 @@ +"""Tests for sift_types.Job model.""" + +from datetime import datetime, timezone +from unittest.mock import MagicMock + +import pytest + +from sift_client.sift_types import Job +from sift_client.sift_types.job import JobDetails, JobStatus, JobStatusDetails, JobType + + +@pytest.fixture +def mock_job(mock_client): + """Create a mock Job instance for testing.""" + job = Job( + proto=MagicMock(), + id_="test_job_id", + organization_id="org1", + created_by_user_id="user1", + modified_by_user_id="user1", + created_date=datetime.now(timezone.utc), + modified_date=datetime.now(timezone.utc), + started_date=datetime.now(timezone.utc), + completed_date=None, + job_type=JobType.DATA_IMPORT, + job_status=JobStatus.RUNNING, + job_status_details=JobStatusDetails(points_processed=100, points_total=1000), + job_details=JobDetails(data_import_id="import123"), + ) + job._apply_client_to_instance(mock_client) + return job + + +@pytest.fixture +def mock_finished_job(mock_client): + """Create a mock finished Job instance for testing.""" + job = Job( + proto=MagicMock(), + id_="test_finished_job_id", + organization_id="org1", + created_by_user_id="user1", + modified_by_user_id="user1", + created_date=datetime.now(timezone.utc), + modified_date=datetime.now(timezone.utc), + started_date=datetime.now(timezone.utc), + completed_date=datetime.now(timezone.utc), + job_type=JobType.DATA_IMPORT, + job_status=JobStatus.FINISHED, + job_status_details=JobStatusDetails(points_processed=1000, points_total=1000), + job_details=JobDetails(data_import_id="import123"), + ) + job._apply_client_to_instance(mock_client) + return job + + +@pytest.fixture +def mock_failed_job(mock_client): + """Create a mock failed Job instance for testing.""" + job = Job( + proto=MagicMock(), + id_="test_failed_job_id", + organization_id="org1", + created_by_user_id="user1", + modified_by_user_id="user1", + created_date=datetime.now(timezone.utc), + modified_date=datetime.now(timezone.utc), + started_date=datetime.now(timezone.utc), + completed_date=datetime.now(timezone.utc), + job_type=JobType.DATA_EXPORT, + job_status=JobStatus.FAILED, + job_status_details=JobStatusDetails(error_message="Export failed"), + job_details=JobDetails(storage_key="exports/failed.csv"), + ) + job._apply_client_to_instance(mock_client) + return job + + +@pytest.fixture +def mock_cancelled_job(mock_client): + """Create a mock cancelled Job instance for testing.""" + job = Job( + proto=MagicMock(), + id_="test_cancelled_job_id", + organization_id="org1", + created_by_user_id="user1", + modified_by_user_id="user1", + created_date=datetime.now(timezone.utc), + modified_date=datetime.now(timezone.utc), + started_date=None, + completed_date=None, + job_type=JobType.RULE_EVALUATION, + job_status=JobStatus.CANCELLED, + job_status_details=None, + job_details=JobDetails(report_id="report123"), + ) + job._apply_client_to_instance(mock_client) + return job + + +class TestJobStatusProperties: + """Unit tests for Job status property methods.""" + + def test_is_in_progress_true_for_running_job(self, mock_job): + """Test that is_in_progress returns True for RUNNING status.""" + assert mock_job.job_status == JobStatus.RUNNING + assert mock_job.is_in_progress is True + assert mock_job.is_finished is False + assert mock_job.is_failed is False + assert mock_job.is_cancelled is False + + def test_is_finished_true_for_finished_job(self, mock_finished_job): + """Test that is_finished returns True for FINISHED status.""" + assert mock_finished_job.job_status == JobStatus.FINISHED + assert mock_finished_job.is_finished is True + assert mock_finished_job.is_in_progress is False + assert mock_finished_job.is_failed is False + assert mock_finished_job.is_cancelled is False + + def test_is_failed_true_for_failed_job(self, mock_failed_job): + """Test that is_failed returns True for FAILED status.""" + assert mock_failed_job.job_status == JobStatus.FAILED + assert mock_failed_job.is_failed is True + assert mock_failed_job.is_in_progress is False + assert mock_failed_job.is_finished is False + assert mock_failed_job.is_cancelled is False + + def test_is_cancelled_true_for_cancelled_job(self, mock_cancelled_job): + """Test that is_cancelled returns True for CANCELLED status.""" + assert mock_cancelled_job.job_status == JobStatus.CANCELLED + assert mock_cancelled_job.is_cancelled is True + assert mock_cancelled_job.is_in_progress is False + assert mock_cancelled_job.is_finished is False + assert mock_cancelled_job.is_failed is False + + +class TestJobInstanceMethods: + """Unit tests for Job instance methods.""" + + def test_refresh_calls_client_and_updates_self(self, mock_job, mock_client): + """Test that refresh() calls client.jobs.get and calls _update.""" + refreshed_job = MagicMock() + refreshed_job.job_status = JobStatus.FINISHED + refreshed_job.completed_date = datetime.now(timezone.utc) + mock_client.jobs.get.return_value = refreshed_job + + # Mock the _update method to verify it's called + with MagicMock() as mock_update: + mock_job._update = mock_update + + # Call refresh + result = mock_job.refresh() + + # Verify client method was called with correct job ID + mock_client.jobs.get.assert_called_once_with(mock_job._id_or_error) + # Verify _update was called with the returned job + mock_update.assert_called_once_with(refreshed_job) + # Verify it returns self + assert result is mock_job + + def test_cancel_calls_client_and_refreshes(self, mock_job, mock_client): + """Test that cancel() calls client.jobs.cancel and refreshes.""" + refreshed_job = MagicMock() + refreshed_job.job_status = JobStatus.CANCELLED + mock_client.jobs.get.return_value = refreshed_job + + # Mock the _update method + with MagicMock() as mock_update: + mock_job._update = mock_update + + # Call cancel + mock_job.cancel() + + # Verify client.jobs.cancel was called with self + mock_client.jobs.cancel.assert_called_once_with(mock_job) + # Verify refresh was called (which calls get and _update) + mock_client.jobs.get.assert_called_once_with(mock_job._id_or_error) + mock_update.assert_called_once_with(refreshed_job) + + def test_retry_calls_client_and_updates_self(self, mock_failed_job, mock_client): + """Test that retry() calls client.jobs.retry and calls _update.""" + retried_job = MagicMock() + retried_job.job_status = JobStatus.CREATED + mock_client.jobs.retry.return_value = retried_job + + # Mock the _update method to verify it's called + with MagicMock() as mock_update: + mock_failed_job._update = mock_update + + # Call retry + result = mock_failed_job.retry() + + # Verify client method was called with self + mock_client.jobs.retry.assert_called_once_with(mock_failed_job) + # Verify _update was called with the returned job + mock_update.assert_called_once_with(retried_job) + # Verify it returns self + assert result is mock_failed_job + + +class TestJobType: + """Unit tests for JobType enum.""" + + def test_to_filter_str(self): + """Test that to_filter_str returns correct format.""" + assert JobType.DATA_IMPORT.to_filter_str() == "JOB_TYPE_DATA_IMPORT" + assert JobType.DATA_EXPORT.to_filter_str() == "JOB_TYPE_DATA_EXPORT" + assert JobType.RULE_EVALUATION.to_filter_str() == "JOB_TYPE_RULE_EVALUATION" + + +class TestJobStatus: + """Unit tests for JobStatus enum.""" + + def test_to_filter_str(self): + """Test that to_filter_str returns correct format.""" + assert JobStatus.CREATED.to_filter_str() == "JOB_STATUS_CREATED" + assert JobStatus.RUNNING.to_filter_str() == "JOB_STATUS_RUNNING" + assert JobStatus.FINISHED.to_filter_str() == "JOB_STATUS_FINISHED" + assert JobStatus.FAILED.to_filter_str() == "JOB_STATUS_FAILED" + assert JobStatus.CANCELLED.to_filter_str() == "JOB_STATUS_CANCELLED" + assert JobStatus.CANCEL_REQUESTED.to_filter_str() == "JOB_STATUS_CANCEL_REQUESTED" + + +class TestJobStatusDetails: + """Unit tests for JobStatusDetails model.""" + + def test_data_import_status_details(self): + """Test JobStatusDetails for data import jobs.""" + details = JobStatusDetails(points_processed=500, points_total=1000) + assert details.points_processed == 500 + assert details.points_total == 1000 + assert details.error_message is None + + def test_data_export_status_details(self): + """Test JobStatusDetails for data export jobs.""" + details = JobStatusDetails(error_message="Export failed due to timeout") + assert details.error_message == "Export failed due to timeout" + assert details.points_processed is None + assert details.points_total is None + + def test_empty_status_details(self): + """Test JobStatusDetails with no fields set.""" + details = JobStatusDetails() + assert details.points_processed is None + assert details.points_total is None + assert details.error_message is None + + +class TestJobDetails: + """Unit tests for JobDetails model.""" + + def test_rule_evaluation_details(self): + """Test JobDetails for rule evaluation jobs.""" + details = JobDetails(report_id="report123") + assert details.report_id == "report123" + assert details.data_import_id is None + assert details.storage_key is None + + def test_data_import_details(self): + """Test JobDetails for data import jobs.""" + details = JobDetails(data_import_id="import456") + assert details.data_import_id == "import456" + assert details.report_id is None + assert details.storage_key is None + + def test_data_export_details(self): + """Test JobDetails for data export jobs.""" + details = JobDetails(storage_key="exports/data.csv") + assert details.storage_key == "exports/data.csv" + assert details.report_id is None + assert details.data_import_id is None + + def test_empty_details(self): + """Test JobDetails with no fields set.""" + details = JobDetails() + assert details.report_id is None + assert details.data_import_id is None + assert details.storage_key is None diff --git a/python/lib/sift_client/client.py b/python/lib/sift_client/client.py index acb5bc79b..1642df1a9 100644 --- a/python/lib/sift_client/client.py +++ b/python/lib/sift_client/client.py @@ -11,6 +11,8 @@ FileAttachmentsAPI, FileAttachmentsAPIAsync, IngestionAPIAsync, + JobsAPI, + JobsAPIAsync, PingAPI, PingAPIAsync, ReportsAPI, @@ -93,6 +95,9 @@ class SiftClient( ingestion: IngestionAPIAsync """Instance of the Ingestion API for making synchronous requests.""" + jobs: JobsAPI + """Instance of the Jobs API for making synchronous requests.""" + reports: ReportsAPI """Instance of the Reports API for making synchronous requests.""" @@ -149,6 +154,7 @@ def __init__( self.calculated_channels = CalculatedChannelsAPI(self) self.channels = ChannelsAPI(self) self.file_attachments = FileAttachmentsAPI(self) + self.jobs = JobsAPI(self) self.rules = RulesAPI(self) self.reports = ReportsAPI(self) self.runs = RunsAPI(self) @@ -163,6 +169,7 @@ def __init__( channels=ChannelsAPIAsync(self), file_attachments=FileAttachmentsAPIAsync(self), ingestion=IngestionAPIAsync(self), + jobs=JobsAPIAsync(self), reports=ReportsAPIAsync(self), rules=RulesAPIAsync(self), runs=RunsAPIAsync(self), diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index 5058ac366..5c4dc9fa0 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -155,6 +155,7 @@ async def main(): from sift_client.resources.channels import ChannelsAPIAsync from sift_client.resources.file_attachments import FileAttachmentsAPIAsync from sift_client.resources.ingestion import IngestionAPIAsync +from sift_client.resources.jobs import JobsAPIAsync from sift_client.resources.ping import PingAPIAsync from sift_client.resources.reports import ReportsAPIAsync from sift_client.resources.rules import RulesAPIAsync @@ -167,6 +168,7 @@ async def main(): AssetsAPI, CalculatedChannelsAPI, ChannelsAPI, + JobsAPI, PingAPI, ReportsAPI, RulesAPI, @@ -186,6 +188,8 @@ async def main(): "FileAttachmentsAPI", "FileAttachmentsAPIAsync", "IngestionAPIAsync", + "JobsAPI", + "JobsAPIAsync", "PingAPI", "PingAPIAsync", "ReportsAPI", diff --git a/python/lib/sift_client/resources/jobs.py b/python/lib/sift_client/resources/jobs.py new file mode 100644 index 000000000..2e6c86408 --- /dev/null +++ b/python/lib/sift_client/resources/jobs.py @@ -0,0 +1,157 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient +from sift_client.resources._base import ResourceBase +from sift_client.sift_types.job import Job, JobStatus, JobType +from sift_client.util import cel_utils as cel + +if TYPE_CHECKING: + from datetime import datetime + + from sift_client.client import SiftClient + + +class JobsAPIAsync(ResourceBase): + """High-level API for interacting with jobs. + + This class provides a Pythonic interface for managing jobs in Sift. + Jobs represent long-running operations like data imports, rule evaluations, and data exports. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the JobsAPI. + + Args: + sift_client: The Sift client to use. + """ + super().__init__(sift_client) + self._low_level_client = JobsLowLevelClient(grpc_client=self.client.grpc_client) + + async def get(self, job_id: str) -> Job: + """Get a job by ID. + + Args: + job_id: The ID of the job to retrieve. + + Returns: + The Job object. + """ + job = await self._low_level_client.get_job(job_id) + return self._apply_client_to_instance(job) + + async def list_( + self, + *, + # Self ids + job_ids: list[str] | None = None, + # Created/modified ranges + created_after: datetime | None = None, + created_before: datetime | None = None, + modified_after: datetime | None = None, + modified_before: datetime | None = None, + # Created/modified users + created_by_user_id: str | None = None, + modified_by_user_id: str | None = None, + # Resource-specific filters + job_type: JobType | None = None, + job_status: JobStatus | None = None, + started_date_after: datetime | None = None, + started_date_before: datetime | None = None, + completed_date_after: datetime | None = None, + completed_date_before: datetime | None = None, + # Common filters + organization_id: str | None = None, + filter_query: str | None = None, + # Ordering and pagination + order_by: str | None = None, + limit: int | None = None, + ) -> list[Job]: + """List jobs with optional filtering. + + Args: + job_ids: Filter to jobs with any of these IDs. + created_after: Filter to jobs created after this datetime. + created_before: Filter to jobs created before this datetime. + modified_after: Filter to jobs modified after this datetime. + modified_before: Filter to jobs modified before this datetime. + created_by_user_id: Filter to jobs created by this user ID. + modified_by_user_id: Filter to jobs last modified by this user ID. + job_type: Filter to jobs with this type. + job_status: Filter to jobs with this status. + started_date_after: Filter to jobs started after this datetime. + started_date_before: Filter to jobs started before this datetime. + completed_date_after: Filter to jobs completed after this datetime. + completed_date_before: Filter to jobs completed before this datetime. + organization_id: Organization ID. Required if your user belongs to multiple organizations. + filter_query: Explicit CEL query to filter jobs. If provided, other filter arguments are ignored. + order_by: Field and direction to order results by. + limit: Maximum number of jobs to return. If None, returns all matches. + + Returns: + A list of Job objects that match the filter criteria. + """ + filter_parts = [ + *self._build_time_cel_filters( + created_after=created_after, + created_before=created_before, + modified_after=modified_after, + modified_before=modified_before, + created_by=created_by_user_id, + modified_by=modified_by_user_id, + ), + *self._build_common_cel_filters(filter_query=filter_query), + ] + if job_ids: + filter_parts.append(cel.in_("job_id", job_ids)) + if job_status: + filter_parts.append(cel.equals("job_status", job_status.to_filter_str())) + if job_type: + filter_parts.append(cel.equals("job_type", job_type.to_filter_str())) + if started_date_after: + filter_parts.append(cel.greater_than("started_date", started_date_after)) + if started_date_before: + filter_parts.append(cel.less_than("started_date", started_date_before)) + if completed_date_after: + filter_parts.append(cel.greater_than("completed_date", completed_date_after)) + if completed_date_before: + filter_parts.append(cel.less_than("completed_date", completed_date_before)) + + query_filter = cel.and_(*filter_parts) + + jobs = await self._low_level_client.list_all_jobs( + query_filter=query_filter or None, + organization_id=organization_id, + order_by=order_by, + max_results=limit, + ) + return self._apply_client_to_instances(jobs) + + async def cancel(self, job: Job | str) -> None: + """Cancel a job. + + If the job hasn't started yet, it will be cancelled immediately. + Jobs that are already finished, failed, or cancelled are not affected. + + Args: + job: The Job or ID of the job to cancel. + + """ + job_id = job._id_or_error if isinstance(job, Job) else job + await self._low_level_client.cancel_job(job_id) + + async def retry(self, job: Job | str) -> Job: + """Retry a failed job. + + Jobs that are finished, in progress, or in the process of being cancelled are not affected. + + Args: + job: The Job or ID of the job to retry. + + Returns: + The updated Job object. + """ + job_id = job._id_or_error if isinstance(job, Job) else job + updated_job = await self._low_level_client.retry_job(job_id) + return self._apply_client_to_instance(updated_job) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.py b/python/lib/sift_client/resources/sync_stubs/__init__.py index ab988e7f2..3f6cc427c 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.py +++ b/python/lib/sift_client/resources/sync_stubs/__init__.py @@ -8,6 +8,7 @@ CalculatedChannelsAPIAsync, ChannelsAPIAsync, FileAttachmentsAPIAsync, + JobsAPIAsync, PingAPIAsync, ReportsAPIAsync, RulesAPIAsync, @@ -21,6 +22,7 @@ CalculatedChannelsAPI = generate_sync_api(CalculatedChannelsAPIAsync, "CalculatedChannelsAPI") ChannelsAPI = generate_sync_api(ChannelsAPIAsync, "ChannelsAPI") FileAttachmentsAPI = generate_sync_api(FileAttachmentsAPIAsync, "FileAttachmentsAPI") +JobsAPI = generate_sync_api(JobsAPIAsync, "JobsAPI") RulesAPI = generate_sync_api(RulesAPIAsync, "RulesAPI") RunsAPI = generate_sync_api(RunsAPIAsync, "RunsAPI") ReportsAPI = generate_sync_api(ReportsAPIAsync, "ReportsAPI") @@ -32,6 +34,7 @@ "CalculatedChannelsAPI", "ChannelsAPI", "FileAttachmentsAPI", + "JobsAPI", "PingAPI", "ReportsAPI", "RulesAPI", diff --git a/python/lib/sift_client/sift_types/__init__.py b/python/lib/sift_client/sift_types/__init__.py index b55717c60..907b7379e 100644 --- a/python/lib/sift_client/sift_types/__init__.py +++ b/python/lib/sift_client/sift_types/__init__.py @@ -148,6 +148,13 @@ IngestionConfig, IngestionConfigCreate, ) +from sift_client.sift_types.job import ( + Job, + JobDetails, + JobStatus, + JobStatusDetails, + JobType, +) from sift_client.sift_types.report import Report, ReportRuleStatus, ReportRuleSummary, ReportUpdate from sift_client.sift_types.rule import ( Rule, diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py new file mode 100644 index 000000000..8a5ebf477 --- /dev/null +++ b/python/lib/sift_client/sift_types/job.py @@ -0,0 +1,274 @@ +"""Job types for the Sift API.""" + +from __future__ import annotations + +from datetime import datetime, timezone +from enum import Enum +from typing import TYPE_CHECKING + +from pydantic import BaseModel +from sift.jobs.v1.jobs_pb2 import Job as JobProto +from sift.jobs.v1.jobs_pb2 import JobDetails as JobDetailsProto +from sift.jobs.v1.jobs_pb2 import JobStatus as JobStatusProto +from sift.jobs.v1.jobs_pb2 import JobStatusDetails as JobStatusDetailsProto +from sift.jobs.v1.jobs_pb2 import JobType as JobTypeProto + +from sift_client.sift_types._base import BaseType + +if TYPE_CHECKING: + from sift_client.client import SiftClient + + +class JobType(str, Enum): + """Type of job.""" + + RULE_EVALUATION = "RULE_EVALUATION" + DATA_IMPORT = "DATA_IMPORT" + DATA_EXPORT = "DATA_EXPORT" + + def to_filter_str(self) -> str: + """Convert to string representation.""" + return f"JOB_TYPE_{self.value}" + + def to_proto(self) -> int: + """Convert to proto enum value.""" + mapping = { + JobType.RULE_EVALUATION: JobTypeProto.JOB_TYPE_RULE_EVALUATION, + JobType.DATA_IMPORT: JobTypeProto.JOB_TYPE_DATA_IMPORT, + JobType.DATA_EXPORT: JobTypeProto.JOB_TYPE_DATA_EXPORT, + } + return mapping[self] + + @classmethod + def from_proto(cls, proto_value: int) -> JobType: + """Create from proto enum value.""" + mapping = { + JobTypeProto.JOB_TYPE_RULE_EVALUATION: JobType.RULE_EVALUATION, + JobTypeProto.JOB_TYPE_DATA_IMPORT: JobType.DATA_IMPORT, + JobTypeProto.JOB_TYPE_DATA_EXPORT: JobType.DATA_EXPORT, + } + if proto_value not in mapping: + raise ValueError(f"Unknown JobType proto value: {proto_value}") + return mapping[proto_value] + + +class JobStatus(str, Enum): + """Status of a job.""" + + CREATED = "CREATED" + RUNNING = "RUNNING" + FINISHED = "FINISHED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + CANCEL_REQUESTED = "CANCEL_REQUESTED" + + def to_filter_str(self) -> str: + """Convert to string representation.""" + return f"JOB_STATUS_{self.value}" + + @classmethod + def from_proto(cls, proto_value: int) -> JobStatus: + """Create from proto enum value.""" + mapping = { + JobStatusProto.JOB_STATUS_CREATED: JobStatus.CREATED, + JobStatusProto.JOB_STATUS_RUNNING: JobStatus.RUNNING, + JobStatusProto.JOB_STATUS_FINISHED: JobStatus.FINISHED, + JobStatusProto.JOB_STATUS_FAILED: JobStatus.FAILED, + JobStatusProto.JOB_STATUS_CANCELLED: JobStatus.CANCELLED, + JobStatusProto.JOB_STATUS_CANCEL_REQUESTED: JobStatus.CANCEL_REQUESTED, + } + if proto_value not in mapping: + raise ValueError(f"Unknown JobStatus proto value: {proto_value}") + return mapping[proto_value] + + +class JobStatusDetails(BaseModel): + """Status details for a job. + + Fields are populated based on job_type: + - DATA_IMPORT: points_processed, points_total + - DATA_EXPORT: error_message + - RULE_EVALUATION: (no additional fields) + """ + + # Data import fields + points_processed: int | None = None + points_total: int | None = None + + # Data export fields + error_message: str | None = None + + @classmethod + def from_proto(cls, proto: JobStatusDetailsProto) -> JobStatusDetails | None: + """Create from proto.""" + if not proto.HasField("status"): + return None + + status_field = proto.WhichOneof("status") + if status_field == "data_import": + return cls( + points_processed=proto.data_import.points_processed, + points_total=proto.data_import.points_total, + ) + elif status_field == "data_export": + return cls( + error_message=proto.data_export.error_message + if proto.data_export.error_message + else None + ) + elif status_field == "rule_evaluation": + return cls() + return None + + +class JobDetails(BaseModel): + """Job details for a job. + + Fields are populated based on job_type: + - RULE_EVALUATION: report_id + - DATA_IMPORT: data_import_id + - DATA_EXPORT: storage_key + """ + + # Rule evaluation fields + report_id: str | None = None + + # Data import fields + data_import_id: str | None = None + + # Data export fields + storage_key: str | None = None + + @classmethod + def from_proto(cls, proto: JobDetailsProto) -> JobDetails | None: + """Create from proto.""" + if not proto.HasField("details"): + return None + + details_field = proto.WhichOneof("details") + if details_field == "rule_evaluation": + return cls(report_id=proto.rule_evaluation.report_id) + elif details_field == "data_import": + return cls(data_import_id=proto.data_import.data_import_id) + elif details_field == "data_export": + return cls(storage_key=proto.data_export.storage_key) + return None + + +class Job(BaseType[JobProto, "Job"]): + """A job in the Sift system. + + Jobs represent long-running operations like data imports, rule evaluations, and data exports. + """ + + # Required fields + organization_id: str + created_by_user_id: str + modified_by_user_id: str + created_date: datetime + modified_date: datetime + job_type: JobType + job_status: JobStatus + + # Optional fields + started_date: datetime | None + completed_date: datetime | None + job_status_details: JobStatusDetails | None + job_details: JobDetails | None + + @classmethod + def _from_proto(cls, proto: JobProto, sift_client: SiftClient | None = None) -> Job: + """Create from proto.""" + return cls( + proto=proto, + id_=proto.job_id, + organization_id=proto.organization_id, + created_by_user_id=proto.created_by_user_id, + modified_by_user_id=proto.modified_by_user_id, + created_date=proto.created_date.ToDatetime(tzinfo=timezone.utc), + modified_date=proto.modified_date.ToDatetime(tzinfo=timezone.utc), + started_date=( + proto.started_date.ToDatetime(tzinfo=timezone.utc) + if proto.HasField("started_date") + else None + ), + completed_date=( + proto.completed_date.ToDatetime(tzinfo=timezone.utc) + if proto.HasField("completed_date") + else None + ), + job_type=JobType.from_proto(proto.job_type), + job_status=JobStatus.from_proto(proto.job_status), + job_status_details=( + JobStatusDetails.from_proto(proto.job_status_details) + if proto.HasField("job_status_details") + else None + ), + job_details=( + JobDetails.from_proto(proto.job_details) if proto.HasField("job_details") else None + ), + _client=sift_client, + ) + + @property + def is_in_progress(self) -> bool: + """Return True if the job is in progress, False otherwise. + + A job is in progress if its status is RUNNING. + """ + return self.job_status == JobStatus.RUNNING + + @property + def is_failed(self) -> bool: + """Return True if the job has failed, False otherwise. + + A job has failed if its status is FAILED. + """ + return self.job_status == JobStatus.FAILED + + @property + def is_finished(self) -> bool: + """Return True if the job has finished, False otherwise. + + A job has finished if its status is FINISHED. + """ + return self.job_status == JobStatus.FINISHED + + @property + def is_cancelled(self) -> bool: + """Return True if the job has been cancelled, False otherwise. + + A job has been cancelled if its status is CANCELLED. + """ + return self.job_status == JobStatus.CANCELLED + + def refresh(self) -> Job: + """Refresh this job with the latest data from the API. + + Returns: + The updated Job object. + """ + updated_job = self.client.jobs.get(self._id_or_error) + self._update(updated_job) + return self + + def cancel(self) -> None: + """Cancel this job. + + If the job hasn't started yet, it will be cancelled immediately. + Jobs that are already finished, failed, or cancelled are not affected. + """ + self.client.jobs.cancel(self) + self.refresh() + + def retry(self) -> Job: + """Retry this job. + + Jobs that are finished, in progress, or in the process of being cancelled are not affected. + + Returns: + The updated Job object. + """ + updated_job = self.client.jobs.retry(self) + self._update(updated_job) + return self diff --git a/python/lib/sift_client/util/util.py b/python/lib/sift_client/util/util.py index 60b58501b..3800f91a7 100644 --- a/python/lib/sift_client/util/util.py +++ b/python/lib/sift_client/util/util.py @@ -9,6 +9,7 @@ ChannelsAPIAsync, FileAttachmentsAPIAsync, IngestionAPIAsync, + JobsAPIAsync, PingAPIAsync, ReportsAPIAsync, RulesAPIAsync, @@ -39,6 +40,9 @@ class AsyncAPIs(NamedTuple): ingestion: IngestionAPIAsync """Instance of the Ingestion API for making asynchronous requests.""" + jobs: JobsAPIAsync + """Instance of the Jobs API for making asynchronous requests.""" + reports: ReportsAPIAsync """Instance of the Reports API for making asynchronous requests.""" From 861fcc64402b25cd6bdacb0c294f27a206e46dab Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:32:17 -0800 Subject: [PATCH 2/9] linting --- .../sift_client/_tests/resources/test_jobs.py | 3 +- python/lib/sift_client/resources/__init__.py | 2 +- .../resources/sync_stubs/__init__.pyi | 101 ++++++++++++++++++ python/lib/sift_client/sift_types/__init__.py | 5 + python/lib/sift_client/sift_types/job.py | 5 +- python/scripts/dev | 4 +- 6 files changed, 112 insertions(+), 8 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_jobs.py b/python/lib/sift_client/_tests/resources/test_jobs.py index fb6e9cc43..30bfd1924 100644 --- a/python/lib/sift_client/_tests/resources/test_jobs.py +++ b/python/lib/sift_client/_tests/resources/test_jobs.py @@ -279,11 +279,10 @@ async def test_retry_finished_job_no_effect(self, jobs_api_async): if finished_jobs: job = finished_jobs[0] - original_status = job.job_status # Retry should not raise an error but won't change status with pytest.raises(AioRpcError, match="job cannot be retried"): - retried_job = await jobs_api_async.retry(job) + await jobs_api_async.retry(job) class TestJobProperties: """Tests for job property methods.""" diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index 5c4dc9fa0..294f02290 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -154,7 +154,7 @@ async def main(): from sift_client.resources.calculated_channels import CalculatedChannelsAPIAsync from sift_client.resources.channels import ChannelsAPIAsync from sift_client.resources.file_attachments import FileAttachmentsAPIAsync -from sift_client.resources.ingestion import IngestionAPIAsync +from sift_client.resources.ingestion import IngestionAPIAsync, TracingConfig from sift_client.resources.jobs import JobsAPIAsync from sift_client.resources.ping import PingAPIAsync from sift_client.resources.reports import ReportsAPIAsync diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 97c86f0d6..acfbc0f23 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -26,6 +26,7 @@ if TYPE_CHECKING: FileAttachmentUpdate, RemoteFileEntityType, ) + from sift_client.sift_types.job import Job, JobStatus, JobType from sift_client.sift_types.report import Report, ReportUpdate from sift_client.sift_types.rule import Rule, RuleCreate, RuleUpdate from sift_client.sift_types.run import Run, RunCreate, RunUpdate @@ -644,6 +645,106 @@ class FileAttachmentsAPI: """ ... +class JobsAPI: + """Sync counterpart to `JobsAPIAsync`. + + High-level API for interacting with jobs. + + This class provides a Pythonic interface for managing jobs in Sift. + Jobs represent long-running operations like data imports, rule evaluations, and data exports. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the JobsAPI. + + Args: + sift_client: The Sift client to use. + """ + ... + + def _run(self, coro): ... + def cancel(self, job: Job | str) -> None: + """Cancel a job. + + If the job hasn't started yet, it will be cancelled immediately. + Jobs that are already finished, failed, or cancelled are not affected. + + Args: + job: The Job or ID of the job to cancel. + """ + ... + + def get(self, job_id: str) -> Job: + """Get a job by ID. + + Args: + job_id: The ID of the job to retrieve. + + Returns: + The Job object. + """ + ... + + def list_( + self, + *, + job_ids: list[str] | None = None, + created_after: datetime | None = None, + created_before: datetime | None = None, + modified_after: datetime | None = None, + modified_before: datetime | None = None, + created_by_user_id: str | None = None, + modified_by_user_id: str | None = None, + job_type: JobType | None = None, + job_status: JobStatus | None = None, + started_date_after: datetime | None = None, + started_date_before: datetime | None = None, + completed_date_after: datetime | None = None, + completed_date_before: datetime | None = None, + organization_id: str | None = None, + filter_query: str | None = None, + order_by: str | None = None, + limit: int | None = None, + ) -> list[Job]: + """List jobs with optional filtering. + + Args: + job_ids: Filter to jobs with any of these IDs. + created_after: Filter to jobs created after this datetime. + created_before: Filter to jobs created before this datetime. + modified_after: Filter to jobs modified after this datetime. + modified_before: Filter to jobs modified before this datetime. + created_by_user_id: Filter to jobs created by this user ID. + modified_by_user_id: Filter to jobs last modified by this user ID. + job_type: Filter to jobs with this type. + job_status: Filter to jobs with this status. + started_date_after: Filter to jobs started after this datetime. + started_date_before: Filter to jobs started before this datetime. + completed_date_after: Filter to jobs completed after this datetime. + completed_date_before: Filter to jobs completed before this datetime. + organization_id: Organization ID. Required if your user belongs to multiple organizations. + filter_query: Explicit CEL query to filter jobs. If provided, other filter arguments are ignored. + order_by: Field and direction to order results by. + limit: Maximum number of jobs to return. If None, returns all matches. + + Returns: + A list of Job objects that match the filter criteria. + """ + ... + + def retry(self, job: Job | str) -> Job: + """Retry a failed job. + + Jobs that are finished, in progress, or in the process of being cancelled are not affected. + + Args: + job: The Job or ID of the job to retry. + + Returns: + The updated Job object. + """ + ... + class PingAPI: """Sync counterpart to `PingAPIAsync`. diff --git a/python/lib/sift_client/sift_types/__init__.py b/python/lib/sift_client/sift_types/__init__.py index 907b7379e..172fa6ede 100644 --- a/python/lib/sift_client/sift_types/__init__.py +++ b/python/lib/sift_client/sift_types/__init__.py @@ -195,6 +195,11 @@ "FlowConfig", "IngestionConfig", "IngestionConfigCreate", + "Job", + "JobDetails", + "JobStatus", + "JobStatusDetails", + "JobType", "Report", "ReportRuleStatus", "ReportRuleSummary", diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 8a5ebf477..5bd823087 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -42,7 +42,7 @@ def to_proto(self) -> int: @classmethod def from_proto(cls, proto_value: int) -> JobType: """Create from proto enum value.""" - mapping = { + mapping: dict[int, JobType] = { JobTypeProto.JOB_TYPE_RULE_EVALUATION: JobType.RULE_EVALUATION, JobTypeProto.JOB_TYPE_DATA_IMPORT: JobType.DATA_IMPORT, JobTypeProto.JOB_TYPE_DATA_EXPORT: JobType.DATA_EXPORT, @@ -51,7 +51,6 @@ def from_proto(cls, proto_value: int) -> JobType: raise ValueError(f"Unknown JobType proto value: {proto_value}") return mapping[proto_value] - class JobStatus(str, Enum): """Status of a job.""" @@ -69,7 +68,7 @@ def to_filter_str(self) -> str: @classmethod def from_proto(cls, proto_value: int) -> JobStatus: """Create from proto enum value.""" - mapping = { + mapping: dict[int, JobStatus] = { JobStatusProto.JOB_STATUS_CREATED: JobStatus.CREATED, JobStatusProto.JOB_STATUS_RUNNING: JobStatus.RUNNING, JobStatusProto.JOB_STATUS_FINISHED: JobStatus.FINISHED, diff --git a/python/scripts/dev b/python/scripts/dev index 6a610805b..78f85e603 100755 --- a/python/scripts/dev +++ b/python/scripts/dev @@ -84,12 +84,12 @@ lint_fix() { doc() { source venv/bin/activate - pdoc lib/sift_py -p 7777 + mkdocs serve } doc_build() { source venv/bin/activate - pdoc lib/sift_py -o docs/sift_py + mkdocs build } run_tests() { From 76d7fed88663b023811a9687ba312ca204f74762 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:40:15 -0800 Subject: [PATCH 3/9] clean up the statusdetails and job details types --- .../sift_client/_tests/resources/test_jobs.py | 13 +- python/lib/sift_client/sift_types/__init__.py | 12 ++ python/lib/sift_client/sift_types/job.py | 127 ++++++++++-------- 3 files changed, 92 insertions(+), 60 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_jobs.py b/python/lib/sift_client/_tests/resources/test_jobs.py index 30bfd1924..4ac74bec0 100644 --- a/python/lib/sift_client/_tests/resources/test_jobs.py +++ b/python/lib/sift_client/_tests/resources/test_jobs.py @@ -15,7 +15,14 @@ from sift_client import SiftClient from sift_client.resources import JobsAPI, JobsAPIAsync from sift_client.sift_types import Job -from sift_client.sift_types.job import JobStatus, JobType +from sift_client.sift_types.job import ( + DataExportDetails, + DataImportDetails, + DataImportStatusDetails, + JobStatus, + JobType, + RuleEvaluationDetails, +) pytestmark = pytest.mark.integration @@ -331,6 +338,7 @@ async def test_job_details_by_type(self, jobs_api_async): if import_jobs: job = import_jobs[0] if job.job_details: + assert isinstance(job.job_details, DataImportDetails) assert job.job_details.data_import_id is not None # Test RULE_EVALUATION jobs @@ -338,6 +346,7 @@ async def test_job_details_by_type(self, jobs_api_async): if rule_eval_jobs: job = rule_eval_jobs[0] if job.job_details: + assert isinstance(job.job_details, RuleEvaluationDetails) assert job.job_details.report_id is not None # Test DATA_EXPORT jobs @@ -345,6 +354,7 @@ async def test_job_details_by_type(self, jobs_api_async): if export_jobs: job = export_jobs[0] if job.job_details: + assert isinstance(job.job_details, DataExportDetails) assert job.job_details.storage_key is not None @pytest.mark.asyncio @@ -360,6 +370,7 @@ async def test_job_status_details_for_data_import(self, jobs_api_async): # Find a job with status details for job in import_jobs: if job.job_status_details: + assert isinstance(job.job_status_details, DataImportStatusDetails) assert job.job_status_details.points_processed is not None assert job.job_status_details.points_total is not None break diff --git a/python/lib/sift_client/sift_types/__init__.py b/python/lib/sift_client/sift_types/__init__.py index 172fa6ede..be885c01c 100644 --- a/python/lib/sift_client/sift_types/__init__.py +++ b/python/lib/sift_client/sift_types/__init__.py @@ -149,11 +149,17 @@ IngestionConfigCreate, ) from sift_client.sift_types.job import ( + DataExportDetails, + DataExportStatusDetails, + DataImportDetails, + DataImportStatusDetails, Job, JobDetails, JobStatus, JobStatusDetails, JobType, + RuleEvaluationDetails, + RuleEvaluationStatusDetails, ) from sift_client.sift_types.report import Report, ReportRuleStatus, ReportRuleSummary, ReportUpdate from sift_client.sift_types.rule import ( @@ -195,11 +201,17 @@ "FlowConfig", "IngestionConfig", "IngestionConfigCreate", + "DataExportDetails", + "DataExportStatusDetails", + "DataImportDetails", + "DataImportStatusDetails", "Job", "JobDetails", "JobStatus", "JobStatusDetails", "JobType", + "RuleEvaluationDetails", + "RuleEvaluationStatusDetails", "Report", "ReportRuleStatus", "ReportRuleSummary", diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 5bd823087..5d066a167 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -81,78 +81,85 @@ def from_proto(cls, proto_value: int) -> JobStatus: return mapping[proto_value] -class JobStatusDetails(BaseModel): - """Status details for a job. +class DataImportStatusDetails(BaseModel): + """Status details for a data import job.""" + + points_processed: int + points_total: int - Fields are populated based on job_type: - - DATA_IMPORT: points_processed, points_total - - DATA_EXPORT: error_message - - RULE_EVALUATION: (no additional fields) - """ - # Data import fields - points_processed: int | None = None - points_total: int | None = None +class DataExportStatusDetails(BaseModel): + """Status details for a data export job.""" - # Data export fields error_message: str | None = None - @classmethod - def from_proto(cls, proto: JobStatusDetailsProto) -> JobStatusDetails | None: - """Create from proto.""" - if not proto.HasField("status"): - return None - - status_field = proto.WhichOneof("status") - if status_field == "data_import": - return cls( - points_processed=proto.data_import.points_processed, - points_total=proto.data_import.points_total, - ) - elif status_field == "data_export": - return cls( - error_message=proto.data_export.error_message - if proto.data_export.error_message - else None - ) - elif status_field == "rule_evaluation": - return cls() + +class RuleEvaluationStatusDetails(BaseModel): + """Status details for a rule evaluation job.""" + + pass + + +JobStatusDetails = DataImportStatusDetails | DataExportStatusDetails | RuleEvaluationStatusDetails + + +def _job_status_details_from_proto( + proto: JobStatusDetailsProto, +) -> JobStatusDetails | None: + """Create JobStatusDetails from proto.""" + if not proto.HasField("status"): return None + status_field = proto.WhichOneof("status") + if status_field == "data_import": + return DataImportStatusDetails( + points_processed=proto.data_import.points_processed, + points_total=proto.data_import.points_total, + ) + elif status_field == "data_export": + return DataExportStatusDetails( + error_message=proto.data_export.error_message or None + ) + elif status_field == "rule_evaluation": + return RuleEvaluationStatusDetails() + return None -class JobDetails(BaseModel): - """Job details for a job. - Fields are populated based on job_type: - - RULE_EVALUATION: report_id - - DATA_IMPORT: data_import_id - - DATA_EXPORT: storage_key - """ +class DataImportDetails(BaseModel): + """Details for a data import job.""" - # Rule evaluation fields - report_id: str | None = None + data_import_id: str - # Data import fields - data_import_id: str | None = None - # Data export fields - storage_key: str | None = None +class DataExportDetails(BaseModel): + """Details for a data export job.""" - @classmethod - def from_proto(cls, proto: JobDetailsProto) -> JobDetails | None: - """Create from proto.""" - if not proto.HasField("details"): - return None - - details_field = proto.WhichOneof("details") - if details_field == "rule_evaluation": - return cls(report_id=proto.rule_evaluation.report_id) - elif details_field == "data_import": - return cls(data_import_id=proto.data_import.data_import_id) - elif details_field == "data_export": - return cls(storage_key=proto.data_export.storage_key) + storage_key: str + + +class RuleEvaluationDetails(BaseModel): + """Details for a rule evaluation job.""" + + report_id: str + + +JobDetails = DataImportDetails | DataExportDetails | RuleEvaluationDetails + + +def _job_details_from_proto(proto: JobDetailsProto) -> JobDetails | None: + """Create JobDetails from proto.""" + if not proto.HasField("details"): return None + details_field = proto.WhichOneof("details") + if details_field == "rule_evaluation": + return RuleEvaluationDetails(report_id=proto.rule_evaluation.report_id) + elif details_field == "data_import": + return DataImportDetails(data_import_id=proto.data_import.data_import_id) + elif details_field == "data_export": + return DataExportDetails(storage_key=proto.data_export.storage_key) + return None + class Job(BaseType[JobProto, "Job"]): """A job in the Sift system. @@ -199,12 +206,14 @@ def _from_proto(cls, proto: JobProto, sift_client: SiftClient | None = None) -> job_type=JobType.from_proto(proto.job_type), job_status=JobStatus.from_proto(proto.job_status), job_status_details=( - JobStatusDetails.from_proto(proto.job_status_details) + _job_status_details_from_proto(proto.job_status_details) if proto.HasField("job_status_details") else None ), job_details=( - JobDetails.from_proto(proto.job_details) if proto.HasField("job_details") else None + _job_details_from_proto(proto.job_details) + if proto.HasField("job_details") + else None ), _client=sift_client, ) From c57bb2ee410bdcddfe85d0032f533b6bdf434e3a Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:40:37 -0800 Subject: [PATCH 4/9] clean up the statusdetails and job details types --- python/lib/sift_client/sift_types/__init__.py | 12 ++++++------ python/lib/sift_client/sift_types/job.py | 5 ++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/python/lib/sift_client/sift_types/__init__.py b/python/lib/sift_client/sift_types/__init__.py index be885c01c..3f6b3f6c6 100644 --- a/python/lib/sift_client/sift_types/__init__.py +++ b/python/lib/sift_client/sift_types/__init__.py @@ -197,21 +197,19 @@ "ChannelConfig", "ChannelDataType", "ChannelReference", - "Flow", - "FlowConfig", - "IngestionConfig", - "IngestionConfigCreate", "DataExportDetails", "DataExportStatusDetails", "DataImportDetails", "DataImportStatusDetails", + "Flow", + "FlowConfig", + "IngestionConfig", + "IngestionConfigCreate", "Job", "JobDetails", "JobStatus", "JobStatusDetails", "JobType", - "RuleEvaluationDetails", - "RuleEvaluationStatusDetails", "Report", "ReportRuleStatus", "ReportRuleSummary", @@ -221,6 +219,8 @@ "RuleActionType", "RuleAnnotationType", "RuleCreate", + "RuleEvaluationDetails", + "RuleEvaluationStatusDetails", "RuleUpdate", "RuleVersion", "Run", diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 5d066a167..8276e7ffb 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -51,6 +51,7 @@ def from_proto(cls, proto_value: int) -> JobType: raise ValueError(f"Unknown JobType proto value: {proto_value}") return mapping[proto_value] + class JobStatus(str, Enum): """Status of a job.""" @@ -117,9 +118,7 @@ def _job_status_details_from_proto( points_total=proto.data_import.points_total, ) elif status_field == "data_export": - return DataExportStatusDetails( - error_message=proto.data_export.error_message or None - ) + return DataExportStatusDetails(error_message=proto.data_export.error_message or None) elif status_field == "rule_evaluation": return RuleEvaluationStatusDetails() return None From d09d6816ced4cb64592f8f375de52d480041e11d Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:44:12 -0800 Subject: [PATCH 5/9] tests --- .../sift_client/_tests/sift_types/test_job.py | 80 +++++++++---------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/python/lib/sift_client/_tests/sift_types/test_job.py b/python/lib/sift_client/_tests/sift_types/test_job.py index 790f6c3cf..c88e184d6 100644 --- a/python/lib/sift_client/_tests/sift_types/test_job.py +++ b/python/lib/sift_client/_tests/sift_types/test_job.py @@ -6,7 +6,16 @@ import pytest from sift_client.sift_types import Job -from sift_client.sift_types.job import JobDetails, JobStatus, JobStatusDetails, JobType +from sift_client.sift_types.job import ( + DataExportDetails, + DataExportStatusDetails, + DataImportDetails, + DataImportStatusDetails, + JobStatus, + JobType, + RuleEvaluationDetails, + RuleEvaluationStatusDetails, +) @pytest.fixture @@ -24,8 +33,8 @@ def mock_job(mock_client): completed_date=None, job_type=JobType.DATA_IMPORT, job_status=JobStatus.RUNNING, - job_status_details=JobStatusDetails(points_processed=100, points_total=1000), - job_details=JobDetails(data_import_id="import123"), + job_status_details=DataImportStatusDetails(points_processed=100, points_total=1000), + job_details=DataImportDetails(data_import_id="import123"), ) job._apply_client_to_instance(mock_client) return job @@ -46,8 +55,8 @@ def mock_finished_job(mock_client): completed_date=datetime.now(timezone.utc), job_type=JobType.DATA_IMPORT, job_status=JobStatus.FINISHED, - job_status_details=JobStatusDetails(points_processed=1000, points_total=1000), - job_details=JobDetails(data_import_id="import123"), + job_status_details=DataImportStatusDetails(points_processed=1000, points_total=1000), + job_details=DataImportDetails(data_import_id="import123"), ) job._apply_client_to_instance(mock_client) return job @@ -68,8 +77,8 @@ def mock_failed_job(mock_client): completed_date=datetime.now(timezone.utc), job_type=JobType.DATA_EXPORT, job_status=JobStatus.FAILED, - job_status_details=JobStatusDetails(error_message="Export failed"), - job_details=JobDetails(storage_key="exports/failed.csv"), + job_status_details=DataExportStatusDetails(error_message="Export failed"), + job_details=DataExportDetails(storage_key="exports/failed.csv"), ) job._apply_client_to_instance(mock_client) return job @@ -91,7 +100,7 @@ def mock_cancelled_job(mock_client): job_type=JobType.RULE_EVALUATION, job_status=JobStatus.CANCELLED, job_status_details=None, - job_details=JobDetails(report_id="report123"), + job_details=RuleEvaluationDetails(report_id="report123"), ) job._apply_client_to_instance(mock_client) return job @@ -221,57 +230,44 @@ def test_to_filter_str(self): class TestJobStatusDetails: - """Unit tests for JobStatusDetails model.""" + """Unit tests for JobStatusDetails types.""" def test_data_import_status_details(self): - """Test JobStatusDetails for data import jobs.""" - details = JobStatusDetails(points_processed=500, points_total=1000) + """Test DataImportStatusDetails for data import jobs.""" + details = DataImportStatusDetails(points_processed=500, points_total=1000) assert details.points_processed == 500 assert details.points_total == 1000 - assert details.error_message is None def test_data_export_status_details(self): - """Test JobStatusDetails for data export jobs.""" - details = JobStatusDetails(error_message="Export failed due to timeout") + """Test DataExportStatusDetails for data export jobs.""" + details = DataExportStatusDetails(error_message="Export failed due to timeout") assert details.error_message == "Export failed due to timeout" - assert details.points_processed is None - assert details.points_total is None - - def test_empty_status_details(self): - """Test JobStatusDetails with no fields set.""" - details = JobStatusDetails() - assert details.points_processed is None - assert details.points_total is None + + def test_data_export_status_details_empty_error(self): + """Test DataExportStatusDetails with no error message.""" + details = DataExportStatusDetails() assert details.error_message is None + def test_rule_evaluation_status_details(self): + """Test RuleEvaluationStatusDetails for rule evaluation jobs.""" + details = RuleEvaluationStatusDetails() + assert details is not None + class TestJobDetails: - """Unit tests for JobDetails model.""" + """Unit tests for JobDetails types.""" def test_rule_evaluation_details(self): - """Test JobDetails for rule evaluation jobs.""" - details = JobDetails(report_id="report123") + """Test RuleEvaluationDetails for rule evaluation jobs.""" + details = RuleEvaluationDetails(report_id="report123") assert details.report_id == "report123" - assert details.data_import_id is None - assert details.storage_key is None def test_data_import_details(self): - """Test JobDetails for data import jobs.""" - details = JobDetails(data_import_id="import456") + """Test DataImportDetails for data import jobs.""" + details = DataImportDetails(data_import_id="import456") assert details.data_import_id == "import456" - assert details.report_id is None - assert details.storage_key is None def test_data_export_details(self): - """Test JobDetails for data export jobs.""" - details = JobDetails(storage_key="exports/data.csv") + """Test DataExportDetails for data export jobs.""" + details = DataExportDetails(storage_key="exports/data.csv") assert details.storage_key == "exports/data.csv" - assert details.report_id is None - assert details.data_import_id is None - - def test_empty_details(self): - """Test JobDetails with no fields set.""" - details = JobDetails() - assert details.report_id is None - assert details.data_import_id is None - assert details.storage_key is None From c8957e7ae31fdc639c70eb4dc262ad0e9cfd0906 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:48:38 -0800 Subject: [PATCH 6/9] python 3.8 fix --- python/lib/sift_client/sift_types/job.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 8276e7ffb..759aae4a1 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone from enum import Enum -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Union from pydantic import BaseModel from sift.jobs.v1.jobs_pb2 import Job as JobProto @@ -101,8 +101,6 @@ class RuleEvaluationStatusDetails(BaseModel): pass -JobStatusDetails = DataImportStatusDetails | DataExportStatusDetails | RuleEvaluationStatusDetails - def _job_status_details_from_proto( proto: JobStatusDetailsProto, @@ -142,7 +140,11 @@ class RuleEvaluationDetails(BaseModel): report_id: str -JobDetails = DataImportDetails | DataExportDetails | RuleEvaluationDetails +# Note: Using Union instead of | syntax for Python 3.9 compatibility at module level. +# While `from __future__ import annotations` allows | in type hints (they're strings), +# module-level type aliases are evaluated at runtime and require Union in Python <3.10. +JobStatusDetails = Union[DataImportStatusDetails, DataExportStatusDetails, RuleEvaluationStatusDetails] +JobDetails = Union[DataImportDetails, DataExportDetails, RuleEvaluationDetails] def _job_details_from_proto(proto: JobDetailsProto) -> JobDetails | None: From d1e1a2f8a7e3ffce950cbedac6b42690695000ea Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Thu, 8 Jan 2026 13:49:44 -0800 Subject: [PATCH 7/9] python 3.8 fix --- python/lib/sift_client/sift_types/job.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 759aae4a1..386564b23 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -101,7 +101,6 @@ class RuleEvaluationStatusDetails(BaseModel): pass - def _job_status_details_from_proto( proto: JobStatusDetailsProto, ) -> JobStatusDetails | None: @@ -143,7 +142,9 @@ class RuleEvaluationDetails(BaseModel): # Note: Using Union instead of | syntax for Python 3.9 compatibility at module level. # While `from __future__ import annotations` allows | in type hints (they're strings), # module-level type aliases are evaluated at runtime and require Union in Python <3.10. -JobStatusDetails = Union[DataImportStatusDetails, DataExportStatusDetails, RuleEvaluationStatusDetails] +JobStatusDetails = Union[ + DataImportStatusDetails, DataExportStatusDetails, RuleEvaluationStatusDetails +] JobDetails = Union[DataImportDetails, DataExportDetails, RuleEvaluationDetails] From 369cc9408baf00bc5e5918275ef293488bd1701a Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Mon, 12 Jan 2026 13:30:00 -0800 Subject: [PATCH 8/9] comment --- python/lib/sift_client/sift_types/job.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 386564b23..5b475f83a 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -226,6 +226,7 @@ def is_in_progress(self) -> bool: A job is in progress if its status is RUNNING. """ + self.refresh() return self.job_status == JobStatus.RUNNING @property @@ -234,6 +235,7 @@ def is_failed(self) -> bool: A job has failed if its status is FAILED. """ + self.refresh() return self.job_status == JobStatus.FAILED @property @@ -242,6 +244,7 @@ def is_finished(self) -> bool: A job has finished if its status is FINISHED. """ + self.refresh() return self.job_status == JobStatus.FINISHED @property @@ -250,6 +253,7 @@ def is_cancelled(self) -> bool: A job has been cancelled if its status is CANCELLED. """ + self.refresh() return self.job_status == JobStatus.CANCELLED def refresh(self) -> Job: From e2ab67698b9ccca18cdf6f7850a73fae13889a35 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Mon, 12 Jan 2026 13:36:01 -0800 Subject: [PATCH 9/9] fix tests via mocking get --- python/lib/sift_client/_tests/sift_types/test_job.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/lib/sift_client/_tests/sift_types/test_job.py b/python/lib/sift_client/_tests/sift_types/test_job.py index c88e184d6..847514fdf 100644 --- a/python/lib/sift_client/_tests/sift_types/test_job.py +++ b/python/lib/sift_client/_tests/sift_types/test_job.py @@ -37,6 +37,8 @@ def mock_job(mock_client): job_details=DataImportDetails(data_import_id="import123"), ) job._apply_client_to_instance(mock_client) + # Configure mock to return the job itself when refresh is called + mock_client.jobs.get.return_value = job return job @@ -59,6 +61,8 @@ def mock_finished_job(mock_client): job_details=DataImportDetails(data_import_id="import123"), ) job._apply_client_to_instance(mock_client) + # Configure mock to return the job itself when refresh is called + mock_client.jobs.get.return_value = job return job @@ -81,6 +85,8 @@ def mock_failed_job(mock_client): job_details=DataExportDetails(storage_key="exports/failed.csv"), ) job._apply_client_to_instance(mock_client) + # Configure mock to return the job itself when refresh is called + mock_client.jobs.get.return_value = job return job @@ -103,6 +109,8 @@ def mock_cancelled_job(mock_client): job_details=RuleEvaluationDetails(report_id="report123"), ) job._apply_client_to_instance(mock_client) + # Configure mock to return the job itself when refresh is called + mock_client.jobs.get.return_value = job return job