Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2.1

orbs:
linter: talkiq/linter@4.0.1
poetry: talkiq/poetry@4.2.0
poetry: talkiq/poetry@4.2.1

executors:
python310:
Expand Down
23 changes: 0 additions & 23 deletions pubsub/gcloud/aio/pubsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,6 @@ async def handler(message):
- ``subscriber_messages_received`` - [counter] the number of messages pulled
from pubsub

Metrics Agent (Deprecated)
~~~~~~~~~~~~~~~~~~~~~~~~~~

``subscribe`` has also an optional ``metrics_client`` argument which will be
removed in a future release. You can provide any metrics agent that implements
the same interface as ``MetricsAgent`` (Datadog client will do ;) ) and get the
following metrics:

- ``pubsub.producer.batch`` - [histogram] actual size of a batch retrieved from
pubsub.
- ``pubsub.consumer.failfast`` - [increment] a message was dropped due to its
lease being expired.
- ``pubsub.consumer.latency.receive`` - [histogram] how many seconds it took
for a message to reach handler after it was published.
- ``pubsub.consumer.succeeded`` - [increment] ``handler`` call was successfull.
- ``pubsub.consumer.failed`` - [increment] ``handler`` call raised an
exception.
- ``pubsub.consumer.latency.runtime`` - [histogram] ``handler`` execution time
in seconds.
- ``pubsub.acker.batch.failed`` - [increment] ack request failed.
- ``pubsub.acker.batch`` - [histogram] actual number of messages that was acked
in a single request.

Publisher
---------

Expand Down
19 changes: 0 additions & 19 deletions pubsub/gcloud/aio/pubsub/metrics_agent.py

This file was deleted.

41 changes: 1 addition & 40 deletions pubsub/gcloud/aio/pubsub/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import asyncio
import logging
import time
import warnings
from collections.abc import Awaitable
from collections.abc import Callable
from typing import Optional
Expand All @@ -18,7 +17,6 @@
from . import metrics
from .subscriber_client import SubscriberClient
from .subscriber_message import SubscriberMessage
from .metrics_agent import MetricsAgent

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,7 +95,6 @@ async def acker(
ack_queue: 'asyncio.Queue[str]',
subscriber_client: 'SubscriberClient',
ack_window: float,
metrics_client: MetricsAgent,
) -> None:
ack_ids: list[str] = []
while True:
Expand Down Expand Up @@ -160,7 +157,6 @@ async def maybe_ack(ack_id: str) -> None:
exc_info=e,
extra={'exc_message': str(e)},
)
metrics_client.increment('pubsub.acker.batch.failed')
metrics.BATCH_STATUS.labels(
component='acker',
outcome='failed',
Expand All @@ -175,15 +171,13 @@ async def maybe_ack(ack_id: str) -> None:
exc_info=e,
extra={'exc_message': str(e)},
)
metrics_client.increment('pubsub.acker.batch.failed')
metrics.BATCH_STATUS.labels(
component='acker',
outcome='failed',
).inc()

continue

metrics_client.histogram('pubsub.acker.batch', len(ack_ids))
metrics.BATCH_STATUS.labels(
component='acker',
outcome='succeeded',
Expand All @@ -199,7 +193,6 @@ async def nacker(
nack_queue: 'asyncio.Queue[str]',
subscriber_client: 'SubscriberClient',
nack_window: float,
metrics_client: MetricsAgent,
) -> None:
ack_ids: list[str] = []
while True:
Expand Down Expand Up @@ -264,7 +257,6 @@ async def maybe_nack(ack_id: str) -> None:
exc_info=e,
extra={'exc_message': str(e)},
)
metrics_client.increment('pubsub.nacker.batch.failed')
metrics.BATCH_STATUS.labels(
component='nacker', outcome='failed',
).inc()
Expand All @@ -278,14 +270,12 @@ async def maybe_nack(ack_id: str) -> None:
exc_info=e,
extra={'exc_message': str(e)},
)
metrics_client.increment('pubsub.nacker.batch.failed')
metrics.BATCH_STATUS.labels(
component='nacker', outcome='failed',
).inc()

continue

metrics_client.histogram('pubsub.nacker.batch', len(ack_ids))
metrics.BATCH_STATUS.labels(
component='nacker',
outcome='succeeded',
Expand All @@ -302,7 +292,6 @@ async def _execute_callback(
ack_queue: 'asyncio.Queue[str]',
nack_queue: 'Optional[asyncio.Queue[str]]',
insertion_time: float,
metrics_client: MetricsAgent,
) -> None:
try:
start = time.perf_counter()
Expand All @@ -312,19 +301,13 @@ async def _execute_callback(
with metrics.CONSUME_LATENCY.labels(phase='runtime').time():
await callback(message)
await ack_queue.put(message.ack_id)
metrics_client.histogram(
'pubsub.consumer.latency.runtime',
time.perf_counter() - start,
)
metrics_client.increment('pubsub.consumer.succeeded')
metrics.CONSUME.labels(outcome='succeeded').inc()

except asyncio.CancelledError:
if nack_queue:
await nack_queue.put(message.ack_id)

log.warning('application callback was cancelled')
metrics_client.increment('pubsub.consumer.cancelled')
metrics.CONSUME.labels(outcome='cancelled').inc()
except Exception as e:
if nack_queue:
Expand All @@ -335,7 +318,6 @@ async def _execute_callback(
exc_info=e,
extra={'exc_message': str(e)},
)
metrics_client.increment('pubsub.consumer.failed')
metrics.CONSUME.labels(outcome='failed').inc()

async def consumer( # pylint: disable=too-many-locals
Expand All @@ -345,7 +327,6 @@ async def consumer( # pylint: disable=too-many-locals
ack_deadline_cache: AckDeadlineCache,
max_tasks: int,
nack_queue: 'Optional[asyncio.Queue[str]]',
metrics_client: MetricsAgent,
) -> None:
try:
semaphore = asyncio.Semaphore(max_tasks)
Expand All @@ -358,7 +339,6 @@ async def _consume_one(

ack_deadline = await ack_deadline_cache.get()
if (time.perf_counter() - pulled_at) >= ack_deadline:
metrics_client.increment('pubsub.consumer.failfast')
metrics.CONSUME.labels(outcome='failfast').inc()
message_queue.task_done()
semaphore.release()
Expand All @@ -367,9 +347,6 @@ async def _consume_one(
# publish_time is in UTC Zulu
# https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
recv_latency = time.time() - message.publish_time.timestamp()
metrics_client.histogram(
'pubsub.consumer.latency.receive', recv_latency,
)
metrics.CONSUME_LATENCY.labels(phase='receive').observe(
recv_latency,
)
Expand All @@ -381,7 +358,6 @@ async def _consume_one(
ack_queue,
nack_queue,
time.perf_counter(),
metrics_client,
),
)
task.add_done_callback(lambda _f: semaphore.release())
Expand All @@ -407,7 +383,6 @@ async def producer(
message_queue: MessageQueue,
subscriber_client: 'SubscriberClient',
max_messages: int,
metrics_client: MetricsAgent,
) -> None:
try:
while True:
Expand All @@ -429,9 +404,6 @@ async def producer(
except (asyncio.TimeoutError, KeyError):
continue

metrics_client.histogram(
'pubsub.producer.batch', len(new_messages),
)
metrics.MESSAGES_RECEIVED.inc(len(new_messages))
metrics.BATCH_SIZE.observe(len(new_messages))

Expand Down Expand Up @@ -473,7 +445,6 @@ async def subscribe(
num_tasks_per_consumer: int = 1,
enable_nack: bool = True,
nack_window: float = 0.3,
metrics_client: MetricsAgent | None = None,
) -> None:
# pylint: disable=too-many-locals
ack_queue: 'asyncio.Queue[str]' = asyncio.Queue(
Expand All @@ -487,13 +458,6 @@ async def subscribe(
ack_deadline,
)

if metrics_client is not None:
warnings.warn(
'Using MetricsAgent in subscribe() is deprecated. '
'Refer to Prometheus metrics instead.',
DeprecationWarning,
)
metrics_client = metrics_client or MetricsAgent()
acker_tasks = []
consumer_tasks = []
producer_tasks = []
Expand All @@ -502,7 +466,7 @@ async def subscribe(
asyncio.ensure_future(
acker(
subscription, ack_queue, subscriber_client,
ack_window=ack_window, metrics_client=metrics_client,
ack_window=ack_window,
),
),
)
Expand All @@ -515,7 +479,6 @@ async def subscribe(
nacker(
subscription, nack_queue, subscriber_client,
nack_window=nack_window,
metrics_client=metrics_client,
),
),
)
Expand All @@ -532,7 +495,6 @@ async def subscribe(
ack_deadline_cache,
num_tasks_per_consumer,
nack_queue,
metrics_client=metrics_client,
),
),
)
Expand All @@ -543,7 +505,6 @@ async def subscribe(
q,
subscriber_client,
max_messages=max_messages_per_producer,
metrics_client=metrics_client,
),
),
)
Expand Down
Loading