He implementado todas mis rutas usando async. Y he seguido todas las pautas de la documentación de FastAPI.
Cada ruta tiene varias llamadas a la base de datos, que no tienen soporte asíncrono, por lo que son una función normal como esta
def db_fetch(query): # I take a few seconds to respond return
Para evitar bloquear mi ciclo de eventos, uso fastapi.concurrancy.run_in_threadpool
Ahora el problema es que, cuando llega una gran cantidad de solicitudes, mis nuevas solicitudes se bloquean. Incluso si cierro la pestaña del navegador (cancelar solicitud), toda la aplicación se atasca hasta que se procesan las solicitudes anteriores.
¿Qué estoy haciendo mal aquí?
Uso uvicorn
como mi servidor ASGI. Corro en un clúster de kubernetes con 2 réplicas.
Pocos sospechosos: ¿Estoy generando demasiados hilos? ¿Es algún error dentro de uvicron? ¡No muy seguro!
Es como usted ha dicho un problema con demasiados hilos. Debajo del capó, fastapi usa starlette que a su vez usa to_thread.run_sync de to_thread.run_sync
. Como se describe aquí , demasiados subprocesos podrían generar un problema y podría protegerlos con un semáforo para establecer un límite superior en el máximo de subprocesos creados. En código, eso sería más o menos como
# Core Library from typing import TypeVar, Callable from typing_extensions import ParamSpec # Third party from anyio import Semaphore from starlette.concurrency import run_in_threadpool # To not have too many threads running (which could happen on too many concurrent # requests, we limit it with a semaphore. MAX_CONCURRENT_THREADS = 10 MAX_THREADS_GUARD = Semaphore(MAX_CONCURRENT_THREADS) T = TypeVar("T") P = ParamSpec("P") async def run_async(func: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> T: async with MAX_THREADS_GUARD: return await run_in_threadpool(func, args, kwargs)