Skip to content

BaseClient

from pulsefire.clients import BaseClient

Bases: ABC

Base client class.

Inherit from this class to implement a client.

Source code in pulsefire/clients.py
class BaseClient(abc.ABC):
    """Base client class.

    Inherit from this class to implement a client.
    """

    base_url: str
    """Base URL, can be extended by `invoke`."""
    default_headers: dict[str, str]
    """Default params (ignores `...`), can be overwritten by `invoke`."""
    default_params: dict[str, Any]
    """Default header params, can be overwritten by `invoke`."""
    default_queries: dict[str, str]
    """Default query params, can be overwritten by `invoke`."""
    middlewares: list[Middleware]
    """Pre and post processors during `invoke`."""
    session: aiohttp.ClientSession | None = None
    """Context manager client session."""

    def __init__(
        self,
        *,
        base_url: str,
        default_params: dict[str, Any] = {},
        default_headers: dict[str, str] = {},
        default_queries: dict[str, str] = {},
        middlewares: list[Middleware] = [],
    ) -> None:
        self.base_url = base_url
        self.default_headers = default_headers
        self.default_params = default_params
        self.default_queries = default_queries
        self.middlewares = middlewares
        async def run_invocation(invocation: Invocation):
            return await invocation()
        self.middleware_begin = run_invocation
        for middleware in middlewares[::-1]:
            self.middleware_begin = middleware(self.middleware_begin)
            if not inspect.iscoroutinefunction(self.middleware_begin):
                raise TypeError(f"{self.middleware_begin} is not a coroutine function")

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__} id={id(self)}>"

    async def __aenter__(self):
        """Context manager, in-context invocations will reuse a single `aiohttp.ClientSession`
        improving performance and memory footprint.

        Raises:
            RuntimeError: When entering an already entered client.
        """
        if self.session:
            raise RuntimeError(f"{self!r} has been already entered")
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *_) -> None:
        transports = 0
        transports_closed = asyncio.Event()

        def connection_lost(exc, orig_lost):
            nonlocal transports
            try:
                orig_lost(exc)
            finally:
                transports -= 1
                if transports == 0:
                    transports_closed.set()

        def eof_received(orig_eof_received):
            try:
                orig_eof_received()
            except AttributeError:
                pass

        for conn in self.session.connector._conns.values():
            for handler, _ in conn:
                proto: asyncio.Protocol = getattr(handler.transport, "_ssl_protocol", None)
                if proto is None:
                    continue
                transports += 1
                orig_lost = proto.connection_lost
                orig_eof_received = proto.eof_received
                proto.connection_lost = functools.partial(
                    connection_lost, orig_lost=orig_lost
                )
                proto.eof_received = functools.partial(
                    eof_received, orig_eof_received=orig_eof_received
                )
        if transports == 0:
            transports_closed.set()

        await self.session.close()
        await transports_closed.wait()
        self.session = None

    async def invoke(self, method: HttpMethod, path_or_url: str):
        """Build an Invocation and send through the middlewares.

        Params are automatically grabbed from the outer frame (ignores `...`).
        The invoker client method is automatically grabbed from the outer frame
        and passed to the instantiation of Invocation.
        """
        params = {}
        for key, value in itertools.chain(self.default_params.items(), sys._getframe(1).f_locals.items()):
            if key != "self" and value != ...:
                params[key] = value
        params["queries"] = {**self.default_queries, **params.get("queries", {})}
        params["headers"] = {**self.default_headers, **params.get("headers", {})}
        invoker: MethodType | None = getattr(self, sys._getframe(1).f_code.co_name, None)
        invocation = Invocation(method, self.base_url + path_or_url, params, self.session, invoker=invoker)
        return await self.middleware_begin(invocation)
Attributes
session class-attribute instance-attribute
session: ClientSession | None = None

Context manager client session.

base_url instance-attribute
base_url: str = base_url

Base URL, can be extended by invoke.

default_headers instance-attribute
default_headers: dict[str, str] = default_headers

Default params (ignores ...), can be overwritten by invoke.

default_params instance-attribute
default_params: dict[str, Any] = default_params

Default header params, can be overwritten by invoke.

default_queries instance-attribute
default_queries: dict[str, str] = default_queries

Default query params, can be overwritten by invoke.

middlewares instance-attribute
middlewares: list[Middleware] = middlewares

Pre and post processors during invoke.

middleware_begin instance-attribute
middleware_begin = middleware(middleware_begin)
Functions
__init__
__init__(
    *,
    base_url: str,
    default_params: dict[str, Any] = {},
    default_headers: dict[str, str] = {},
    default_queries: dict[str, str] = {},
    middlewares: list[Middleware] = []
) -> None
Source code in pulsefire/clients.py
def __init__(
    self,
    *,
    base_url: str,
    default_params: dict[str, Any] = {},
    default_headers: dict[str, str] = {},
    default_queries: dict[str, str] = {},
    middlewares: list[Middleware] = [],
) -> None:
    self.base_url = base_url
    self.default_headers = default_headers
    self.default_params = default_params
    self.default_queries = default_queries
    self.middlewares = middlewares
    async def run_invocation(invocation: Invocation):
        return await invocation()
    self.middleware_begin = run_invocation
    for middleware in middlewares[::-1]:
        self.middleware_begin = middleware(self.middleware_begin)
        if not inspect.iscoroutinefunction(self.middleware_begin):
            raise TypeError(f"{self.middleware_begin} is not a coroutine function")
__repr__
__repr__() -> str
Source code in pulsefire/clients.py
def __repr__(self) -> str:
    return f"<{self.__class__.__name__} id={id(self)}>"
__aenter__ async
__aenter__()

Context manager, in-context invocations will reuse a single aiohttp.ClientSession improving performance and memory footprint.

Raises:

Type Description
RuntimeError

When entering an already entered client.

Source code in pulsefire/clients.py
async def __aenter__(self):
    """Context manager, in-context invocations will reuse a single `aiohttp.ClientSession`
    improving performance and memory footprint.

    Raises:
        RuntimeError: When entering an already entered client.
    """
    if self.session:
        raise RuntimeError(f"{self!r} has been already entered")
    self.session = aiohttp.ClientSession()
    return self
__aexit__ async
__aexit__(*_) -> None
Source code in pulsefire/clients.py
async def __aexit__(self, *_) -> None:
    transports = 0
    transports_closed = asyncio.Event()

    def connection_lost(exc, orig_lost):
        nonlocal transports
        try:
            orig_lost(exc)
        finally:
            transports -= 1
            if transports == 0:
                transports_closed.set()

    def eof_received(orig_eof_received):
        try:
            orig_eof_received()
        except AttributeError:
            pass

    for conn in self.session.connector._conns.values():
        for handler, _ in conn:
            proto: asyncio.Protocol = getattr(handler.transport, "_ssl_protocol", None)
            if proto is None:
                continue
            transports += 1
            orig_lost = proto.connection_lost
            orig_eof_received = proto.eof_received
            proto.connection_lost = functools.partial(
                connection_lost, orig_lost=orig_lost
            )
            proto.eof_received = functools.partial(
                eof_received, orig_eof_received=orig_eof_received
            )
    if transports == 0:
        transports_closed.set()

    await self.session.close()
    await transports_closed.wait()
    self.session = None
invoke async
invoke(method: HttpMethod, path_or_url: str)

Build an Invocation and send through the middlewares.

Params are automatically grabbed from the outer frame (ignores ...). The invoker client method is automatically grabbed from the outer frame and passed to the instantiation of Invocation.

Source code in pulsefire/clients.py
async def invoke(self, method: HttpMethod, path_or_url: str):
    """Build an Invocation and send through the middlewares.

    Params are automatically grabbed from the outer frame (ignores `...`).
    The invoker client method is automatically grabbed from the outer frame
    and passed to the instantiation of Invocation.
    """
    params = {}
    for key, value in itertools.chain(self.default_params.items(), sys._getframe(1).f_locals.items()):
        if key != "self" and value != ...:
            params[key] = value
    params["queries"] = {**self.default_queries, **params.get("queries", {})}
    params["headers"] = {**self.default_headers, **params.get("headers", {})}
    invoker: MethodType | None = getattr(self, sys._getframe(1).f_code.co_name, None)
    invocation = Invocation(method, self.base_url + path_or_url, params, self.session, invoker=invoker)
    return await self.middleware_begin(invocation)