Convert a function to run asynchronously. Use as decorator @sync_to_async()
.
Example:
@sync_to_async()
def sample_func(number: int):
...
async def main():
await sample_func(0)
Parameters:
Name |
Type |
Description |
Default |
executor
|
Executor | None
|
Executor to be passed to loop.run_in_executor .
|
None
|
Source code in pulsefire/functools.py
| def sync_to_async(executor: Executor | None = None):
"""Convert a function to run asynchronously. Use as decorator `@sync_to_async()`.
Example:
```python
@sync_to_async()
def sample_func(number: int):
...
async def main():
await sample_func(0)
```
Parameters:
executor: Executor to be passed to `loop.run_in_executor`.
"""
def decorator[**P, R](func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
new_func = functools.partial(func, *args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(executor, new_func)
return wrapper
return decorator
|