JSON response middleware.
Attempts to deserialize JSON responses regardless of content type,
if an exception is raised during deserialization, bytes are returned instead.
Example:
# Use orjson loads for 3~10x faster deserialization
import orjson
json_response_middleware(orjson.loads)
Parameters:
Name |
Type |
Description |
Default |
loads
|
Callable[[str | bytes | bytearray], Any]
|
JSON decoder to be used on deserialization.
|
loads
|
Source code in pulsefire/middlewares.py
| def json_response_middleware(loads: Callable[[str | bytes | bytearray], Any] = json.loads):
"""JSON response middleware.
Attempts to deserialize JSON responses regardless of content type,
if an exception is raised during deserialization, bytes are returned instead.
Example:
```python
# Use orjson loads for 3~10x faster deserialization
import orjson
json_response_middleware(orjson.loads)
```
Parameters:
loads: JSON decoder to be used on deserialization.
"""
def constructor(next: MiddlewareCallable):
async def middleware(invocation: Invocation):
response: aiohttp.ClientResponse = await next(invocation)
try:
return await response.json(encoding="utf-8", content_type=None, loads=loads)
except Exception:
return await response.read()
return middleware
return constructor
|