From 1fcf8d766a3142a546d7aae74e3ea397eb3f0e31 Mon Sep 17 00:00:00 2001 From: Mark Story Date: Fri, 30 Jan 2026 10:19:32 -0500 Subject: [PATCH] feat: Add application to worker requests I missed porting this from sentry. I did the application attribute changes in sentry after starting the client library prototype and missed these changes. --- clients/python/src/taskbroker_client/app.py | 1 + .../src/taskbroker_client/worker/client.py | 6 +- .../src/taskbroker_client/worker/worker.py | 1 + clients/python/tests/worker/test_client.py | 99 ++++++++++++++----- 4 files changed, 81 insertions(+), 26 deletions(-) diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 372640cb..1ff06dc7 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -24,6 +24,7 @@ def __init__( metrics_class: str | MetricsBackend = "taskbroker_client.metrics.NoOpMetricsBackend", at_most_once_store: AtMostOnceStore | None = None, ) -> None: + self.name = name self.metrics = self._build_metrics(metrics_class) self._config = { "rpc_secret": None, diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 2329bfcd..91d8fa58 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -131,6 +131,7 @@ class TaskbrokerClient: def __init__( self, hosts: list[str], + application: str, metrics: MetricsBackend, max_tasks_before_rebalance: int = DEFAULT_REBALANCE_AFTER, max_consecutive_unavailable_errors: int = DEFAULT_CONSECUTIVE_UNAVAILABLE_ERRORS, @@ -140,6 +141,7 @@ def __init__( grpc_config: str | None = None, ) -> None: assert len(hosts) > 0, "You must provide at least one RPC host to connect to" + self._application = application self._hosts = hosts self._rpc_secret = rpc_secret self._metrics = metrics @@ -259,7 +261,7 @@ def get_task(self, namespace: str | None = None) -> InflightTaskActivation | Non """ self._emit_health_check() - request = GetTaskRequest(namespace=namespace) + request = GetTaskRequest(application=self._application, namespace=namespace) try: host, stub = self._get_cur_stub() with self._metrics.timer("taskworker.get_task.rpc", tags={"host": host}): @@ -299,6 +301,8 @@ def update_task( The return value is the next task that should be executed. """ self._emit_health_check() + if fetch_next_task is not None: + fetch_next_task.application = self._application self._metrics.incr( "taskworker.client.fetch_next", tags={"next": fetch_next_task is not None} diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 897ad38f..68474ada 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -71,6 +71,7 @@ def __init__( self.client = TaskbrokerClient( hosts=broker_hosts, + application=app.name, metrics=app.metrics, max_tasks_before_rebalance=rebalance_after, health_check_settings=( diff --git a/clients/python/tests/worker/test_client.py b/clients/python/tests/worker/test_client.py index 61d83a83..4297ccfc 100644 --- a/clients/python/tests/worker/test_client.py +++ b/clients/python/tests/worker/test_client.py @@ -15,7 +15,9 @@ TASK_ACTIVATION_STATUS_COMPLETE, TASK_ACTIVATION_STATUS_RETRY, FetchNextTask, + GetTaskRequest, GetTaskResponse, + SetTaskStatusRequest, SetTaskStatusResponse, TaskActivation, ) @@ -61,6 +63,8 @@ def __call__(self, *args: Any, **kwargs: Any) -> Any: if isinstance(res.response, Exception): raise res.response + elif callable(res.response): + return res.response(*args, **kwargs) return res.response def with_call(self, *args: Any, **kwargs: Any) -> Any: @@ -91,7 +95,7 @@ def unary_unary( def add_response( self, path: str, - resp: Message | Exception, + resp: Callable[[Any], Any] | Message | Exception, metadata: tuple[tuple[str, str | bytes], ...] | None = None, ) -> None: self._responses[path].append(MockServiceCall(response=resp, metadata=metadata)) @@ -130,7 +134,7 @@ def test_make_broker_hosts() -> None: def test_init_no_hosts() -> None: with pytest.raises(AssertionError) as err: - TaskbrokerClient(hosts=[], metrics=NoOpMetricsBackend()) + TaskbrokerClient(hosts=[], application="sentry", metrics=NoOpMetricsBackend()) assert "You must provide at least one RPC host" in str(err) @@ -167,6 +171,7 @@ def test_health_check_is_debounced() -> None: health_check_path = Path(f"/tmp/{''.join(random.choices(string.ascii_letters, k=16))}") client = TaskbrokerClient( hosts=["localhost-0:50051"], + application="sentry", metrics=NoOpMetricsBackend(), health_check_settings=HealthCheckSettings(health_check_path, 1), ) @@ -183,10 +188,11 @@ def test_health_check_is_debounced() -> None: def test_get_task_ok() -> None: - channel = MockChannel() - channel.add_response( - "/sentry_protos.taskbroker.v1.ConsumerService/GetTask", - GetTaskResponse( + def get_task_response(request: GetTaskRequest) -> GetTaskResponse: + assert request.application == "sentry" + assert request.namespace == "" + + return GetTaskResponse( task=TaskActivation( id="abc123", namespace="testing", @@ -195,11 +201,18 @@ def test_get_task_ok() -> None: headers={}, processing_deadline_duration=10, ) - ), + ) + + channel = MockChannel() + channel.add_response( + "/sentry_protos.taskbroker.v1.ConsumerService/GetTask", + get_task_response, ) with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel - client = TaskbrokerClient(["localhost-0:50051"], metrics=NoOpMetricsBackend()) + client = TaskbrokerClient( + hosts=["localhost-0:50051"], application="sentry", metrics=NoOpMetricsBackend() + ) result = client.get_task() assert result @@ -228,7 +241,8 @@ def test_get_task_writes_to_health_check_file() -> None: mock_channel.return_value = channel health_check_path = Path(f"/tmp/{''.join(random.choices(string.ascii_letters, k=16))}") client = TaskbrokerClient( - ["localhost-0:50051"], + hosts=["localhost-0:50051"], + application="sentry", metrics=NoOpMetricsBackend(), health_check_settings=HealthCheckSettings(health_check_path, 3), ) @@ -253,7 +267,7 @@ def test_get_task_with_interceptor() -> None: metadata=( ( "sentry-signature", - "3202702605c1b65055c28e7c78a5835e760830cff3e9f995eb7ad5f837130b1f", + "556b2e74f2d5a1d0134b1f803c9bfaa8467bbd8e4cb510a9856c5e2ef2b66a21", ), ), ) @@ -261,7 +275,10 @@ def test_get_task_with_interceptor() -> None: with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel client = TaskbrokerClient( - ["localhost-0:50051"], metrics=NoOpMetricsBackend(), rpc_secret=secret + hosts=["localhost-0:50051"], + application="sentry", + metrics=NoOpMetricsBackend(), + rpc_secret=secret, ) result = client.get_task() @@ -289,7 +306,9 @@ def test_get_task_with_namespace() -> None: with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel client = TaskbrokerClient( - hosts=make_broker_hosts("localhost:50051", num_brokers=1), metrics=NoOpMetricsBackend() + hosts=make_broker_hosts("localhost:50051", num_brokers=1), + application="sentry", + metrics=NoOpMetricsBackend(), ) result = client.get_task(namespace="testing") @@ -307,7 +326,9 @@ def test_get_task_not_found() -> None: ) with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel - client = TaskbrokerClient(["localhost:50051"], metrics=NoOpMetricsBackend()) + client = TaskbrokerClient( + hosts=["localhost:50051"], application="sentry", metrics=NoOpMetricsBackend() + ) result = client.get_task() assert result is None @@ -321,7 +342,9 @@ def test_get_task_failure() -> None: ) with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel - client = TaskbrokerClient(["localhost:50051"], metrics=NoOpMetricsBackend()) + client = TaskbrokerClient( + hosts=["localhost:50051"], application="name", metrics=NoOpMetricsBackend() + ) with pytest.raises(grpc.RpcError): client.get_task() @@ -346,6 +369,7 @@ def test_update_task_writes_to_health_check_file() -> None: health_check_path = Path(f"/tmp/{''.join(random.choices(string.ascii_letters, k=16))}") client = TaskbrokerClient( make_broker_hosts("localhost:50051", num_brokers=1), + application="sentry", metrics=NoOpMetricsBackend(), health_check_settings=HealthCheckSettings( health_check_path, DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH @@ -359,10 +383,12 @@ def test_update_task_writes_to_health_check_file() -> None: def test_update_task_ok_with_next() -> None: - channel = MockChannel() - channel.add_response( - "/sentry_protos.taskbroker.v1.ConsumerService/SetTaskStatus", - SetTaskStatusResponse( + def update_task_response(request: SetTaskStatusRequest) -> SetTaskStatusResponse: + assert request.fetch_next_task + assert request.fetch_next_task.application == "sentry" + assert request.fetch_next_task.namespace == "" + + return SetTaskStatusResponse( task=TaskActivation( id="abc123", namespace="testing", @@ -371,12 +397,19 @@ def test_update_task_ok_with_next() -> None: headers={}, processing_deadline_duration=10, ) - ), + ) + + channel = MockChannel() + channel.add_response( + "/sentry_protos.taskbroker.v1.ConsumerService/SetTaskStatus", + update_task_response, ) with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel client = TaskbrokerClient( - make_broker_hosts("localhost:50051", num_brokers=1), metrics=NoOpMetricsBackend() + make_broker_hosts("localhost:50051", num_brokers=1), + application="sentry", + metrics=NoOpMetricsBackend(), ) assert set(client._host_to_stubs.keys()) == {"localhost-0:50051"} result = client.update_task( @@ -407,7 +440,9 @@ def test_update_task_ok_with_next_namespace() -> None: with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel client = TaskbrokerClient( - make_broker_hosts("localhost:50051", num_brokers=1), metrics=NoOpMetricsBackend() + make_broker_hosts("localhost:50051", num_brokers=1), + application="sentry", + metrics=NoOpMetricsBackend(), ) result = client.update_task( ProcessingResult( @@ -431,7 +466,9 @@ def test_update_task_ok_no_next() -> None: with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel client = TaskbrokerClient( - make_broker_hosts("localhost:50051", num_brokers=1), metrics=NoOpMetricsBackend() + make_broker_hosts("localhost:50051", num_brokers=1), + application="sentry", + metrics=NoOpMetricsBackend(), ) result = client.update_task( ProcessingResult( @@ -453,7 +490,9 @@ def test_update_task_not_found() -> None: ) with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel - client = TaskbrokerClient(["localhost-0:50051"], metrics=NoOpMetricsBackend()) + client = TaskbrokerClient( + ["localhost-0:50051"], application="sentry", metrics=NoOpMetricsBackend() + ) result = client.update_task( ProcessingResult( task_id="abc123", @@ -474,7 +513,9 @@ def test_update_task_unavailable_retain_task_to_host() -> None: ) with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel - client = TaskbrokerClient(["localhost-0:50051"], metrics=NoOpMetricsBackend()) + client = TaskbrokerClient( + ["localhost-0:50051"], application="sentry", metrics=NoOpMetricsBackend() + ) with pytest.raises(MockGrpcError) as err: client.update_task( ProcessingResult( @@ -572,6 +613,7 @@ def test_client_loadbalance() -> None: ] client = TaskbrokerClient( hosts=make_broker_hosts(host_prefix="localhost:50051", num_brokers=4), + application="sentry", metrics=NoOpMetricsBackend(), max_tasks_before_rebalance=1, ) @@ -662,6 +704,7 @@ def test_client_loadbalance_on_notfound() -> None: ] client = TaskbrokerClient( hosts=make_broker_hosts(host_prefix="localhost:50051", num_brokers=3), + application="sentry", metrics=NoOpMetricsBackend(), max_tasks_before_rebalance=30, ) @@ -727,6 +770,7 @@ def test_client_loadbalance_on_unavailable() -> None: ] client = TaskbrokerClient( hosts=make_broker_hosts(host_prefix="localhost:50051", num_brokers=2), + application="sentry", metrics=NoOpMetricsBackend(), max_consecutive_unavailable_errors=3, ) @@ -784,6 +828,7 @@ def test_client_single_host_unavailable() -> None: mock_channel.return_value = channel client = TaskbrokerClient( hosts=["localhost-0:50051"], + application="sentry", metrics=NoOpMetricsBackend(), max_consecutive_unavailable_errors=3, temporary_unavailable_host_timeout=2, @@ -829,7 +874,10 @@ def test_client_reset_errors_after_success() -> None: with patch("taskbroker_client.worker.client.grpc.insecure_channel") as mock_channel: mock_channel.return_value = channel client = TaskbrokerClient( - ["localhost:50051"], metrics=NoOpMetricsBackend(), max_consecutive_unavailable_errors=3 + ["localhost:50051"], + application="sentry", + metrics=NoOpMetricsBackend(), + max_consecutive_unavailable_errors=3, ) with pytest.raises(grpc.RpcError, match="host is unavailable"): @@ -885,6 +933,7 @@ def mock_time() -> float: mock_channel.return_value = channel client = TaskbrokerClient( ["localhost:50051"], + application="sentry", metrics=NoOpMetricsBackend(), max_consecutive_unavailable_errors=3, temporary_unavailable_host_timeout=10,