Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements/test.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pytest
pytest-cov
pytest-xdist
coverage
pytest-asyncio
3 changes: 3 additions & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ pluggy==1.5.0
pytest==8.2.2
# via
# -r requirements/test.in
# pytest-asyncio
# pytest-cov
# pytest-xdist
pytest-asyncio==0.23.7
# via -r requirements/test.in
pytest-cov==5.0.0
# via -r requirements/test.in
pytest-xdist==3.6.1
Expand Down
5 changes: 5 additions & 0 deletions src/statsd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Version: v\ |version|.
"""

from .async_client import BaseAsyncStatsdClient, DebugAsyncStatsdClient
from .base import Sample
from .client import (
BaseStatsdClient,
DebugStatsdClient,
Expand All @@ -14,8 +16,11 @@


__all__ = (
"BaseAsyncStatsdClient",
"BaseStatsdClient",
"DebugAsyncStatsdClient",
"DebugStatsdClient",
"Sample",
"StatsdClient",
"UDPStatsdClient",
"__version__",
Expand Down
169 changes: 169 additions & 0 deletions src/statsd/async_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from __future__ import annotations

import abc
import contextlib
import functools
import logging
import time
from typing import Any, AsyncIterator, Awaitable, Callable, Mapping, TypeVar
from typing_extensions import ParamSpec

from statsd.base import AbstractStatsdClient


P = ParamSpec("P")
T = TypeVar("T")
U = TypeVar("U")

logger = logging.getLogger("statsd")


class BaseAsyncStatsdClient(AbstractStatsdClient[Awaitable[None]]):
"""
Base async client.

This class exposes the public interface and takes care of packet formatting
as well as sampling. It does not actually send packets anywhere, which is
left to concrete subclasses implementing :meth:`_emit`.
"""

@abc.abstractmethod
async def _emit(self, packets: list[str]) -> None:
"""
Async send implementation.

This method is responsible for actually sending the formatted packets
and should be implemented by all subclasses.

It may batch or buffer packets but should not modify them in any way. It
should be agnostic to the Statsd format.
"""
raise NotImplementedError()

def timed(
self,
name: str | None = None,
*,
tags: Mapping[str, str] | None = None,
sample_rate: float | None = None,
use_distribution: bool = False,
) -> Callable[[Callable[P, Awaitable[U]]], Callable[P, Awaitable[U]]]:
"""
Wrap a function to record its execution time.

This just wraps the function call with a :meth:`timer` context manager.

If a name is not provided, the function name will be used.

Passing ``use_distribution=True`` will report the value as a globally
aggregated :meth:`distribution` metric instead of a :meth:`timing`
metric.

>>> client = AsyncStatsdClient()
>>> @client.timed()
... async def do_something():
... pass
"""

def decorator(
fn: Callable[P, Awaitable[U]],
) -> Callable[P, Awaitable[U]]:
# TODO: Should the fallback include the module? Class (for methods)?
# or func.__name__
metric_name = name or fn.__name__

@functools.wraps(fn)
async def wrapped(*args: P.args, **kwargs: P.kwargs) -> U:
async with self.timer(
metric_name,
tags=tags,
use_distribution=use_distribution,
sample_rate=sample_rate,
):
return await fn(*args, **kwargs)

return wrapped

return decorator

@contextlib.asynccontextmanager
async def timer(
self,
name: str,
*,
tags: Mapping[str, str] | None = None,
sample_rate: float | None = None,
use_distribution: bool = False,
) -> AsyncIterator[None]:
"""
Context manager to measure the execution time of an async block.

Passing ``use_distribution=True`` will report the value as a globally
aggregated :meth:`distribution` metric instead of a :meth:`timing`
metric.

>>> client = AsyncStatsdClient()
>>> async def operation():
... async with client.timer("download_duration"):
... pass
"""
start = time.perf_counter()
try:
yield
finally:
duration_ms = int(1000 * (time.perf_counter() - start))
if use_distribution:
await self.distribution(
name,
duration_ms,
tags=tags,
sample_rate=sample_rate,
)
else:
await self.timing(
name,
duration_ms,
tags=tags,
sample_rate=sample_rate,
)


class DebugAsyncStatsdClient(BaseAsyncStatsdClient):
"""
Verbose client for development or debugging purposes.

All Statsd packets will be logged and optionally forwarded to a wrapped
client.
"""

def __init__(
self,
level: int = logging.INFO,
logger: logging.Logger = logger,
inner: BaseAsyncStatsdClient | None = None,
**kwargs: Any,
) -> None:
r"""
Initialize DebugStatsdClient.

:param level: Log level to use, defaults to ``INFO``.

:param logger: Logger instance to use, defaults to ``statsd``.

:param inner: Wrapped client.

:param \**kwargs: Extra arguments forwarded to :class:`BaseAsyncStatsdClient`.
"""
super().__init__(**kwargs)
self.level = level
self.logger = logger
self.inner = inner

async def _emit(self, packets: list[str]) -> None:
for packet in packets:
self.logger.log(self.level, "> %s", packet)
if self.inner:
await self.inner._emit(packets)


AsyncStatsdClient = DebugAsyncStatsdClient
Loading