Esta pregunta está motivada por mi otra pregunta: ¿Cómo esperar en cdef?
Hay toneladas de artículos y publicaciones de blog en la web sobre asyncio
, pero todos son muy superficiales. No pude encontrar ninguna información sobre cómo se implementa realmente asyncio
y qué hace que la E/S sea asíncrona. Estaba tratando de leer el código fuente, pero son miles de líneas de código C que no es del más alto grado, muchas de las cuales tratan con objetos auxiliares, pero lo más importante es que es difícil conectar entre la sintaxis de Python y el código C que traduciría. en.
La propia documentación de Asyncio es aún menos útil. No hay información allí sobre cómo funciona, solo algunas pautas sobre cómo usarlo, que a veces también son engañosas/muy mal escritas.
Estoy familiarizado con la implementación de rutinas de Go, y esperaba que Python hiciera lo mismo. Si ese fuera el caso, el código que encontré en la publicación vinculada anteriormente habría funcionado. Como no lo hizo, ahora estoy tratando de averiguar por qué. Mi mejor conjetura hasta ahora es la siguiente, corríjame donde me equivoque:
async def foo(): ...
en realidad se interpretan como métodos de una clase que hereda coroutine
.async def
en realidad se divide en varios métodos mediante declaraciones de await
, donde el objeto, en el que se llama a estos métodos, puede realizar un seguimiento del progreso realizado a través de la ejecución hasta el momento.await
declaración). En otras palabras, aquí está mi intento de "desazucarar" alguna sintaxis asyncio
en algo más comprensible:
async def coro(name): print('before', name) await asyncio.sleep() print('after', name) asyncio.gather(coro('first'), coro('second')) # translated from async def coro(name) class Coro(coroutine): def before(self, name): print('before', name) def after(self, name): print('after', name) def __init__(self, name): self.name = name self.parts = self.before, self.after self.pos = 0 def __call__(): self.parts[self.pos](self.name) self.pos += 1 def done(self): return self.pos == len(self.parts) # translated from asyncio.gather() class AsyncIOManager: def gather(*coros): while not every(c.done() for c in coros): coro = random.choice(coros) coro()
Si mi conjetura resulta correcta: entonces tengo un problema. ¿Cómo ocurre realmente la E/S en este escenario? en un hilo aparte? ¿Se suspende todo el intérprete y la E/S ocurre fuera del intérprete? ¿Qué se entiende exactamente por E/S? Si mi procedimiento de python llamó al procedimiento C open()
y, a su vez, envió una interrupción al kernel, cediendo el control, ¿cómo sabe esto el intérprete de Python y puede continuar ejecutando otro código, mientras que el código del kernel hace el I/ real? O y hasta que despierte el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, ser consciente de que esto sucede?
Antes de responder a esta pregunta, debemos comprender algunos términos básicos; sáltelos si ya conoce alguno de ellos.
Los generadores son objetos que nos permiten suspender la ejecución de una función de python. Los generadores seleccionados por el usuario se implementan utilizando la palabra clave yield
. Al crear una función normal que contiene la palabra clave yield
, convertimos esa función en un generador:
>>> def test(): ... yield 1 ... yield 2 ... >>> gen = test() >>> next(gen) 1 >>> next(gen) 2 >>> next(gen) Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration
Como puede ver, llamar a next()
en el generador hace que el intérprete cargue el marco de la prueba y yield
el valor producido. Llamar a next()
nuevamente, hace que el marco se cargue nuevamente en la pila del intérprete y continúa yield
otro valor.
Para la tercera vez que se llama a next()
, nuestro generador había terminado y se lanzó StopIteration
.
Una característica menos conocida de los generadores es el hecho de que puedes comunicarte con ellos usando dos métodos: send()
y throw()
.
>>> def test(): ... val = yield 1 ... print(val) ... yield 2 ... yield 3 ... >>> gen = test() >>> next(gen) 1 >>> gen.send("abc") abc 2 >>> gen.throw(Exception()) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 4, in test Exception
Al llamar a gen.send()
, el valor se pasa como valor de retorno de la palabra clave yield
.
gen.throw()
por otro lado, permite lanzar Excepciones dentro de los generadores, con la excepción generada en el mismo lugar en el que se llamó al yield
.
Al devolver un valor de un generador, el valor se coloca dentro de la excepción StopIteration
. Más adelante podemos recuperar el valor de la excepción y usarlo según nuestras necesidades.
>>> def test(): ... yield 1 ... return "abc" ... >>> gen = test() >>> next(gen) 1 >>> try: ... next(gen) ... except StopIteration as exc: ... print(exc.value) ... abc
yield from
Python 3.4 vino con la adición de una nueva palabra clave: yield from
. Lo que esa palabra clave nos permite hacer es pasar cualquier next()
, send()
y throw()
a un generador anidado más interno. Si el generador interno devuelve un valor, también es el valor de retorno de yield from
:
>>> def inner(): ... inner_result = yield 2 ... print('inner', inner_result) ... return 3 ... >>> def outer(): ... yield 1 ... val = yield from inner() ... print('outer', val) ... yield 4 ... >>> gen = outer() >>> next(gen) 1 >>> next(gen) # Goes inside inner() automatically 2 >>> gen.send("abc") inner abc outer 3 4
He escrito un artículo para profundizar en este tema.
Al introducir la nueva palabra clave yield from
en Python 3.4, ahora pudimos crear generadores dentro de generadores que, como un túnel, pasan los datos de un lado a otro desde los generadores más internos a los más externos. Esto ha dado lugar a un nuevo significado para los generadores: corrutinas .
Las rutinas son funciones que se pueden detener y reanudar mientras se ejecutan. En Python, se definen mediante la palabra clave async def
. Al igual que los generadores, también utilizan su propia forma de yield from
la que seawait
. Antes de que async
y await
se introdujeran en Python 3.5, creamos corrutinas exactamente de la misma manera en que se crearon los generadores (con yield from
en lugar de await
).
async def inner(): return 1 async def outer(): await inner()
Al igual que todos los iteradores y generadores implementan el __iter__()
, todas las rutinas implementan __await__()
lo que les permite continuar cada vez que se llama a await coro
.
Hay un buen diagrama de secuencia dentro de los documentos de Python que debería consultar.
En asyncio, además de las funciones coroutine, tenemos 2 objetos importantes: tareas y futuros .
Los futuros son objetos que tienen implementado el método __await__()
, y su trabajo es mantener un determinado estado y resultado. El estado puede ser uno de los siguientes:
fut.cancel()
fut.set_result()
o por un conjunto de excepciones usando fut.set_exception()
El resultado, tal como lo ha adivinado, puede ser un objeto de Python, que se devolverá, o una excepción que se generará.
Otra característica importante de los objetos future
es que contienen un método llamado add_done_callback()
. Este método permite llamar a las funciones tan pronto como se realiza la tarea, ya sea que haya generado una excepción o haya finalizado.
Los objetos de tarea son futuros especiales, que se envuelven alrededor de las corrutinas y se comunican con las corrutinas más internas y más externas. Cada vez que una corrutina await
un futuro, el futuro se pasa de regreso a la tarea (al igual que en el yield from
), y la tarea lo recibe.
A continuación, la tarea se vincula al futuro. Lo hace llamando a add_done_callback()
en el futuro. De ahora en adelante, si el futuro se realiza alguna vez, ya sea cancelándolo, pasando una excepción o pasando un objeto de Python como resultado, se llamará a la devolución de llamada de la tarea y volverá a existir.
La última pregunta candente que debemos responder es: ¿cómo se implementa el IO?
En lo más profundo de asyncio, tenemos un bucle de eventos. Un bucle de eventos de tareas. El trabajo del bucle de eventos es llamar a las tareas cada vez que estén listas y coordinar todo ese esfuerzo en una sola máquina de trabajo.
La parte IO del bucle de eventos se basa en una única función crucial llamada select
. Select es una función de bloqueo, implementada por el sistema operativo subyacente, que permite esperar en los sockets los datos entrantes o salientes. Al recibir datos, se activa y devuelve los sockets que recibieron datos o los sockets que están listos para escribir.
Cuando intenta recibir o enviar datos a través de un socket a través de asyncio, lo que realmente sucede a continuación es que primero se verifica si el socket tiene datos que se puedan leer o enviar de inmediato. Si su .send()
está lleno, o el .recv()
está vacío, el socket se registra en la función de select
(simplemente agregándolo a una de las listas, rlist
para recv
y wlist
para send
) y el correspondiente La función await
un objeto future
recién creado, vinculado a ese zócalo.
Cuando todas las tareas disponibles están esperando futuros, el bucle de eventos llama a select
y espera. Cuando uno de los sockets tiene datos entrantes, o su búfer de send
se agota, asyncio verifica el objeto futuro vinculado a ese socket y lo configura como listo.
Ahora toda la magia sucede. El futuro está listo, la tarea que se agregó antes con add_done_callback()
vuelve a la vida y llama a .send()
en la corrutina que reanuda la corrutina más interna (debido a la cadena de await
) y lee el datos recién recibidos de un búfer cercano al que se derramó.
Cadena de métodos nuevamente, en caso de recv()
:
select.select
espera.future.set_result()
.add_done_callback()
ahora se activa..send()
en la corrutina que va hasta la corrutina más interna y la activa. En resumen, asyncio utiliza capacidades de generador, que permiten pausar y reanudar funciones. Utiliza el yield from
las capacidades que permiten pasar datos de un lado a otro desde el generador más interno al más externo. Utiliza todos esos para detener la ejecución de la función mientras espera que se complete IO (mediante el uso de la función de select
del sistema operativo).
¿Y lo mejor de todo? Mientras una función está en pausa, otra puede ejecutarse e intercalarse con el delicado tejido, que es asíncio.
Hablar de async/await
await y asyncio
no es lo mismo. La primera es una construcción fundamental de bajo nivel (corrutinas), mientras que la última es una biblioteca que utiliza estas construcciones. Por el contrario, no hay una única respuesta definitiva.
La siguiente es una descripción general de cómo funcionan las bibliotecas async/await
await y asyncio
-like. Es decir, puede haber otros trucos encima (los hay...) pero son intrascendentes a menos que los construyas tú mismo. La diferencia debería ser insignificante a menos que ya sepa lo suficiente como para no tener que hacer esa pregunta.
Al igual que las subrutinas (funciones, procedimientos, ...), las corrutinas (generadores, ...) son una abstracción de la pila de llamadas y el puntero de instrucciones: hay una pila de fragmentos de código en ejecución, y cada uno está en una instrucción específica.
La distinción entre def
y definición async def
es simplemente para mayor claridad. La diferencia real es return
versus yield
. A partir de esto, await
o yield from
tomar la diferencia de llamadas individuales a pilas completas.
Una subrutina representa un nuevo nivel de pila para contener variables locales y un solo recorrido de sus instrucciones para llegar a un final. Considere una subrutina como esta:
def subfoo(bar): qux = 3 return qux * bar
Cuando lo ejecutas, eso significa
bar
y qux
return
, empuja su valor a la pila de llamadas En particular, 4. significa que una subrutina siempre comienza en el mismo estado. Todo lo exclusivo de la función en sí se pierde al finalizar. No se puede reanudar una función, incluso si hay instrucciones después de la return
.
root -\ : \- subfoo --\ :/--<---return --/ | V
Una rutina es como una subrutina, pero puede salir sin destruir su estado. Considere una rutina como esta:
def cofoo(bar): qux = yield bar # yield marks a break point return qux
Cuando lo ejecutas, eso significa
bar
y qux
yield
, empuje su valor a la pila de llamadas pero almacene la pila y el puntero de instrucciónyield
, restaure la pila y el puntero de instrucción y empuje los argumentos a qux
return
, empuja su valor a la pila de llamadasTenga en cuenta la adición de 2.1 y 2.2: una rutina puede suspenderse y reanudarse en puntos predefinidos. Esto es similar a cómo se suspende una subrutina durante la llamada a otra subrutina. La diferencia es que la rutina activa no está estrictamente vinculada a su pila de llamadas. En cambio, una rutina suspendida es parte de una pila separada y aislada.
root -\ : \- cofoo --\ :/--<+--yield --/ | : V :
Esto significa que las corrutinas suspendidas se pueden almacenar o mover libremente entre pilas. Cualquier pila de llamadas que tenga acceso a una rutina puede decidir reanudarla.
Hasta ahora, nuestra corrutina solo baja en la pila de llamadas con yield
. Una subrutina puede subir y bajar en la pila de llamadas con return
y ()
. Para completar, las corrutinas también necesitan un mecanismo para subir la pila de llamadas. Considere una rutina como esta:
def wrap(): yield 'before' yield from cofoo() yield 'after'
Cuando lo ejecuta, eso significa que aún asigna la pila y el puntero de instrucción como una subrutina. Cuando se suspende, eso todavía es como almacenar una subrutina.
Sin embargo, yield from
hace ambas cosas . Suspende la pila y el puntero de instrucciones de wrap
y ejecuta cofoo
. Tenga en cuenta que la wrap
permanece suspendida hasta cofoo
termine por completo. Cada vez que cofoo
suspende o se envía algo, cofoo
se conecta directamente a la pila de llamadas.
Según lo establecido, yield from
permite conectar dos ámbitos a través de otro intermedio. Cuando se aplica recursivamente, eso significa que la parte superior de la pila se puede conectar a la parte inferior de la pila.
root -\ : \-> coro_a -yield-from-> coro_b --\ :/ <-+------------------------yield ---/ | : :\ --+-- coro_a.send----------yield ---\ : coro_b <-/
Tenga en cuenta que root
y coro_b
no se conocen. Esto hace que las corrutinas sean mucho más limpias que las devoluciones de llamada: las corrutinas aún se basan en una relación 1: 1 como las subrutinas. Las corrutinas suspenden y reanudan toda su pila de ejecución existente hasta un punto de llamada normal.
En particular, root
podría tener un número arbitrario de rutinas para reanudar. Sin embargo, nunca puede reanudar más de uno al mismo tiempo. ¡Las corrutinas de la misma raíz son concurrentes pero no paralelas!
async
y await
Hasta ahora, la explicación ha utilizado explícitamente el yield
y el yield from
vocabulario de los generadores: la funcionalidad subyacente es la misma. La nueva sintaxis de Python3.5 async
y await
existe principalmente para mayor claridad.
def foo(): # subroutine? return None def foo(): # coroutine? yield from foofoo() # generator? coroutine? async def foo(): # coroutine! await foofoo() # coroutine! return None
Las declaraciones async for
y async with
son necesarias porque rompería la cadena yield from/await
con las declaraciones for
y with
desnudas.
Por sí misma, una corrutina no tiene el concepto de ceder el control a otra corrutina. Solo puede ceder el control a la persona que llama en la parte inferior de una pila de rutinas. Esta persona que llama puede cambiar a otra rutina y ejecutarla.
Este nodo raíz de varias corrutinas es comúnmente un bucle de eventos : en suspensión, una corrutina produce un evento en el que desea reanudar. A su vez, el bucle de eventos es capaz de esperar de manera eficiente a que ocurran estos eventos. Esto le permite decidir qué rutina ejecutar a continuación o cómo esperar antes de reanudar.
Tal diseño implica que hay un conjunto de eventos predefinidos que comprende el ciclo. Varias corrutinas se await
unas a otras, hasta que finalmente se await
un evento. Este evento puede comunicarse directamente con el bucle de eventos yield
el control.
loop -\ : \-> coroutine --await--> event --\ :/ <-+----------------------- yield --/ | : | : # loop waits for event to happen | : :\ --+-- send(reply) -------- yield --\ : coroutine <--yield-- event <-/
La clave es que la suspensión de la rutina permite que el bucle de eventos y los eventos se comuniquen directamente. La pila de rutina intermedia no requiere ningún conocimiento sobre qué ciclo la está ejecutando, ni cómo funcionan los eventos.
El evento más simple de manejar es llegar a un punto en el tiempo. Este también es un bloque fundamental del código de subprocesos: un subproceso sleep
repetidamente hasta que se cumple una condición. Sin embargo, una sleep
regular bloquea la ejecución por sí misma: queremos que otras corrutinas no se bloqueen. En su lugar, queremos decirle al ciclo de eventos cuándo debe reanudar la pila de rutinas actual.
Un evento es simplemente un valor que podemos identificar, ya sea a través de una enumeración, un tipo u otra identidad. Podemos definir esto con una clase simple que almacena nuestro tiempo objetivo. Además de almacenar la información del evento, podemos permitir await
una clase directamente.
class AsyncSleep: """Event to sleep until a point in time""" def __init__(self, until: float): self.until = until # used whenever someone ``await``s an instance of this Event def __await__(self): # yield this Event to the loop yield self def __repr__(self): return '%s(until=%.1f)' % (self.__class__.__name__, self.until)
Esta clase solo almacena el evento; no dice cómo manejarlo realmente.
La única característica especial es __await__
: es lo que busca la palabra clave await
. Prácticamente, es un iterador pero no está disponible para la maquinaria de iteración normal.
Ahora que tenemos un evento, ¿cómo reaccionan las corrutinas? Deberíamos poder expresar el equivalente del sleep
await
nuestro evento. Para ver mejor lo que está pasando, esperamos dos veces durante la mitad del tiempo:
import time async def asleep(duration: float): """await that ``duration`` seconds pass""" await AsyncSleep(time.time() + duration / 2) await AsyncSleep(time.time() + duration / 2)
Podemos instanciar y ejecutar directamente esta rutina. Similar a un generador, usar coroutine.send
ejecuta la corrutina hasta que yield
un resultado.
coroutine = asleep(100) while True: print(coroutine.send(None)) time.sleep(0.1)
Esto nos da dos eventos AsyncSleep
y luego una StopIteration
cuando finaliza la rutina. Tenga en cuenta que el único retraso es de time.sleep
in the loop. Cada AsyncSleep
solo almacena un desplazamiento de la hora actual.
En este punto, tenemos dos mecanismos separados a nuestra disposición:
AsyncSleep
que se pueden producir desde dentro de una corrutinatime.sleep
que puede esperar sin afectar las rutinas En particular, estos dos son ortogonales: ninguno afecta o desencadena al otro. Como resultado, podemos idear nuestra propia estrategia para sleep
para cumplir con el retraso de un AsyncSleep
.
Si tenemos varias corrutinas, cada una puede decirnos cuando quiere ser despertada. Entonces podemos esperar a que se quiera reanudar el primero de ellos, luego al siguiente, y así sucesivamente. Cabe destacar que en cada punto solo nos importa cuál es el siguiente .
Esto hace que la programación sea sencilla:
Una implementación trivial no necesita ningún concepto avanzado. Una list
permite ordenar las corrutinas por fecha. Esperar es un tiempo regular. time.sleep
. Ejecutar coroutines funciona igual que antes con coroutine.send
.
def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" # store wake-up-time and coroutines waiting = [(0, coroutine) for coroutine in coroutines] while waiting: # 2. pick the first coroutine that wants to wake up until, coroutine = waiting.pop(0) # 3. wait until this point in time time.sleep(max(0.0, until - time.time())) # 4. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0])
Por supuesto, esto tiene un amplio margen de mejora. Podemos usar un montón para la cola de espera o una tabla de despacho para eventos. También podríamos obtener valores de retorno de StopIteration
y asignarlos a la rutina. Sin embargo, el principio fundamental sigue siendo el mismo.
El evento AsyncSleep
y el bucle de eventos de run
son una implementación completamente funcional de eventos cronometrados.
async def sleepy(identifier: str = "coroutine", count=5): for i in range(count): print(identifier, 'step', i + 1, 'at %.2f' % time.time()) await asleep(0.1) run(*(sleepy("coroutine %d" % j) for j in range(5)))
Esto cambia cooperativamente entre cada una de las cinco corrutinas, suspendiendo cada una por 0.1 segundos. Aunque el bucle de eventos es síncrono, aún ejecuta el trabajo en 0,5 segundos en lugar de 2,5 segundos. Cada rutina tiene estado y actúa de forma independiente.
Un bucle de eventos que admita sleep
es adecuado para sondeos . Sin embargo, esperar la E/S en un identificador de archivo se puede hacer de manera más eficiente: el sistema operativo implementa la E/S y, por lo tanto, sabe qué identificadores están listos. Idealmente, un bucle de eventos debe admitir un evento "listo para E/S" explícito.
select
Python ya tiene una interfaz para consultar el sistema operativo en busca de identificadores de E/S de lectura. Cuando se llama con identificadores para leer o escribir, devuelve los identificadores listos para leer o escribir:
readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)
Por ejemplo, podemos open
un archivo para escritura y esperar a que esté listo:
write_target = open('/tmp/foo') readable, writeable, _ = select.select([], [write_target], [])
Una vez que seleccione devoluciones, writeable
contiene nuestro archivo abierto.
Similar a la solicitud AsyncSleep
, necesitamos definir un evento para E/S. Con la lógica de select
subyacente, el evento debe hacer referencia a un objeto legible, por ejemplo, un archivo open
. Además, almacenamos la cantidad de datos para leer.
class AsyncRead: def __init__(self, file, amount=1): self.file = file self.amount = amount self._buffer = '' def __await__(self): while len(self._buffer) < self.amount: yield self # we only get here if ``read`` should not block self._buffer += self.file.read(1) return self._buffer def __repr__(self): return '%s(file=%s, amount=%d, progress=%d)' % ( self.__class__.__name__, self.file, self.amount, len(self._buffer) )
Al igual que con AsyncSleep
, en su mayoría solo almacenamos los datos necesarios para la llamada al sistema subyacente. Esta vez, __await__
se puede reanudar varias veces, hasta que se haya leído la amount
deseada. Además, return
el resultado de E/S en lugar de simplemente reanudar.
La base de nuestro bucle de eventos sigue siendo la run
definida anteriormente. Primero, necesitamos rastrear las solicitudes de lectura. Este ya no es un cronograma ordenado, solo asignamos solicitudes de lectura a rutinas.
# new waiting_read = {} # type: Dict[file, coroutine]
Dado que select.select
toma un parámetro de tiempo de espera, podemos usarlo en lugar de time.sleep
.
# old time.sleep(max(0.0, until - time.time())) # new readable, _, _ = select.select(list(reads), [], [])
Esto nos da todos los archivos legibles; si hay alguno, ejecutamos la rutina correspondiente. Si no hay ninguno, hemos esperado lo suficiente para que se ejecute nuestra rutina actual.
# new - reschedule waiting coroutine, run readable coroutine if readable: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read[readable[0]]
Finalmente, tenemos que escuchar las solicitudes de lectura.
# new if isinstance(command, AsyncSleep): ... elif isinstance(command, AsyncRead): ...
Lo anterior fue un poco simplificado. Necesitamos hacer algunos cambios para no matar de hambre a los corutines dormidos si siempre podemos leer. Necesitamos manejar el no tener nada que leer o nada que esperar. Sin embargo, el resultado final aún se ajusta a 30 LOC.
def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" waiting_read = {} # type: Dict[file, coroutine] waiting = [(0, coroutine) for coroutine in coroutines] while waiting or waiting_read: # 2. wait until the next coroutine may run or read ... try: until, coroutine = waiting.pop(0) except IndexError: until, coroutine = float('inf'), None readable, _, _ = select.select(list(waiting_read), [], []) else: readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time())) # ... and select the appropriate one if readable and time.time() < until: if until and coroutine: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read.pop(readable[0]) # 3. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension ... if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0]) # ... or register reads elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine
Las AsyncSleep
, AsyncRead
y run
ahora son completamente funcionales para dormir y/o leer. Al igual que para sleepy
, podemos definir un ayudante para probar la lectura:
async def ready(path, amount=1024*32): print('read', path, 'at', '%d' % time.time()) with open(path, 'rb') as file: result = await AsyncRead(file, amount) print('done', path, 'at', '%d' % time.time()) print('got', len(result), 'B') run(sleepy('background', 5), ready('/dev/urandom'))
Ejecutando esto, podemos ver que nuestra E/S está intercalada con la tarea en espera:
id background round 1 read /dev/urandom at 1530721148 id background round 2 id background round 3 id background round 4 id background round 5 done /dev/urandom at 1530721148 got 1024 B
Si bien la E/S en los archivos transmite el concepto, no es realmente adecuado para una biblioteca como asyncio
: la llamada de select
siempre regresa para los archivos , y tanto open
como la read
pueden bloquearse indefinidamente . Esto bloquea todas las rutinas de un bucle de eventos, lo cual es malo. Las bibliotecas como aiofiles
usan subprocesos y sincronización para falsificar E/S sin bloqueo y eventos en el archivo.
Sin embargo, los sockets permiten E/S sin bloqueo, y su latencia inherente lo hace mucho más crítico. Cuando se usa en un bucle de eventos, la espera de datos y el reintento se pueden ajustar sin bloquear nada.
Similar a nuestro AsyncRead
, podemos definir un evento de suspensión y lectura para sockets. En lugar de tomar un archivo, tomamos un socket, que no debe bloquear. Además, nuestro __await__
usa socket.recv
en lugar de file.read
.
class AsyncRecv: def __init__(self, connection, amount=1, read_buffer=1024): assert not connection.getblocking(), 'connection must be non-blocking for async recv' self.connection = connection self.amount = amount self.read_buffer = read_buffer self._buffer = b'' def __await__(self): while len(self._buffer) < self.amount: try: self._buffer += self.connection.recv(self.read_buffer) except BlockingIOError: yield self return self._buffer def __repr__(self): return '%s(file=%s, amount=%d, progress=%d)' % ( self.__class__.__name__, self.connection, self.amount, len(self._buffer) )
A diferencia de AsyncRead
, __await__
realiza E/S verdaderamente sin bloqueo. Cuando hay datos disponibles, siempre se lee. Cuando no hay datos disponibles, siempre se suspende. Eso significa que el bucle de eventos solo se bloquea mientras realizamos un trabajo útil.
En lo que respecta al ciclo de eventos, nada cambia mucho. El evento para escuchar sigue siendo el mismo que para los archivos: un descriptor de archivo marcado como listo por select
.
# old elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine # new elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine elif isinstance(command, AsyncRecv): waiting_read[command.connection] = coroutine
En este punto, debería ser obvio que AsyncRead
y AsyncRecv
son el mismo tipo de evento. Podríamos refactorizarlos fácilmente para que sean un evento con un componente de E/S intercambiable. En efecto, el bucle de eventos, las corrutinas y los eventos separan claramente un planificador, un código intermedio arbitrario y la E/S real.
En principio, lo que deberías hacer en este punto es replicar la lógica de read
as a recv
para AsyncRecv
. Sin embargo, esto es mucho más feo ahora: debe manejar los retornos tempranos cuando las funciones se bloquean dentro del kernel, pero le ceden el control. Por ejemplo, abrir una conexión versus abrir un archivo es mucho más largo:
# file file = open(path, 'rb') # non-blocking socket connection = socket.socket() connection.setblocking(False) # open without blocking - retry on failure try: connection.connect((url, port)) except BlockingIOError: pass
Para resumir, lo que queda son unas pocas docenas de líneas de manejo de excepciones. Los eventos y el bucle de eventos ya funcionan en este punto.
id background round 1 read localhost:25000 at 1530783569 read /dev/urandom at 1530783569 done localhost:25000 at 1530783569 got 32768 B id background round 2 id background round 3 id background round 4 done /dev/urandom at 1530783569 got 4096 B id background round 5
Asyncio significa entrada y salida asíncrona y se refiere a un paradigma de programación que logra una alta concurrencia utilizando un solo hilo o bucle de eventos. La programación asincrónica es un tipo de programación paralela en la que se permite que una unidad de trabajo se ejecute por separado del subproceso de la aplicación principal. Cuando se completa el trabajo, notifica al subproceso principal sobre la finalización o falla del subproceso de trabajo.
Echemos un vistazo en la imagen de abajo:
Entendamos asyncio con un ejemplo:
Para entender el concepto detrás de asyncio, consideremos un restaurante con un solo mesero. De repente, aparecen tres clientes, A, B y C. Los tres toman una cantidad variable de tiempo para decidir qué comer una vez que reciben el menú del mesero.
Supongamos que A toma 5 minutos, B 10 minutos y C 1 minuto para decidir. Si el camarero soltero comienza primero con B y toma el pedido de B en 10 minutos, luego sirve a A y dedica 5 minutos a anotar su pedido y finalmente dedica 1 minuto a saber qué quiere comer C. Entonces, en total, el mesero gasta 10 + 5 + 1 = 16 minutos para anotar sus pedidos. Sin embargo, fíjate en esta secuencia de eventos, C termina esperando 15 minutos antes de que el mesero llegue a él, A espera 10 minutos y B espera 0 minutos.
Ahora considere si el mesero supiera el tiempo que tomaría cada cliente para decidir. Puede comenzar con C primero, luego ir a A y finalmente a B. De esta manera, cada cliente experimentaría una espera de 0 minutos. Se crea una ilusión de tres camareros, uno dedicado a cada cliente aunque sólo haya uno.
Por último, el tiempo total que tarda el camarero en tomar los tres pedidos es de 10 minutos, mucho menos que los 16 minutos del otro escenario.
Veamos otro ejemplo:
Supongamos que el maestro de ajedrez Magnus Carlsen organiza una exhibición de ajedrez en la que juega con varios jugadores aficionados. Tiene dos formas de conducir la exposición: de forma sincrónica y asincrónica.
Suposiciones:
Sincrónicamente : Magnus Carlsen juega un juego a la vez, nunca dos al mismo tiempo, hasta que se completa el juego. Cada juego toma (55 + 5) * 30 == 1800 segundos o 30 minutos . Toda la exposición dura 24 * 30 == 720 minutos, o 12 horas .
Asincrónicamente : Magnus Carlsen se mueve de mesa en mesa, haciendo un movimiento en cada mesa. Ella deja la mesa y deja que el oponente haga su próximo movimiento durante el tiempo de espera. Un movimiento en los 24 juegos le lleva a Judit 24 * 5 == 120 segundos o 2 minutos . Toda la exposición ahora se reduce a 120 * 30 == 3600 segundos, o solo 1 hora
Solo hay un Magnus Carlsen, que solo tiene dos manos y solo hace un movimiento a la vez. Pero jugar de forma asíncrona reduce el tiempo de exhibición de 12 horas a una.
Ejemplo de codificación:
Intentemos demostrar el tiempo de ejecución síncrono y asíncrono usando un fragmento de código.
Asíncrono - async_count.py
import asyncio import time async def count(): print("One", end=" ") await asyncio.sleep(1) print("Two", end=" ") await asyncio.sleep(2) print("Three", end=" ") async def main(): await asyncio.gather(count(), count(), count(), count(), count()) if __name__ == "__main__": start_time = time.perf_counter() asyncio.run(main()) end_time = time.perf_counter() execution_time = end_time - start_time print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Asíncrono - Salida :
One One One One One Two Two Two Two Two Three Three Three Three Three Executing - async_count.py Execution Starts: 18453.442160108 Executions Ends: 18456.444719712 Totals Execution Time:3.00 seconds.
Síncrono - sync_count.py
import time def count(): print("One", end=" ") time.sleep(1) print("Two", end=" ") time.sleep(2) print("Three", end=" ") def main(): for _ in range(5): count() if __name__ == "__main__": start_time = time.perf_counter() main() end_time = time.perf_counter() execution_time = end_time - start_time print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")
Síncrono - Salida :
One Two Three One Two Three One Two Three One Two Three One Two Three Executing - sync_count.py Execution Starts: 18875.175965998 Executions Ends: 18890.189930292 Totals Execution Time:15.01 seconds.
Generador de Python:
Las funciones que contienen una declaración de yield
se compilan como generadores. El uso de una expresión de rendimiento en el cuerpo de una función hace que esa función sea un generador. Estas funciones devuelven un objeto que admite los métodos del protocolo de iteración. El objeto generador creado recibe automáticamente un __next()__
. Volviendo al ejemplo de la sección anterior, podemos invocar __next__
directamente en el objeto generador en lugar de usar next()
:
def asynchronous(): yield "Educative" if __name__ == "__main__": gen = asynchronous() str = gen.__next__() print(str)
Recuerde lo siguiente acerca de los generadores:
next()
en el objeto generador para ejecutar el código dentro de la función generadora.Estados de un generador:
Un generador pasa por los siguientes estados:
GEN_CREATED
cuando se devuelve un objeto generador por primera vez desde una función generadora y la iteración no ha comenzado.GEN_RUNNING
cuando se ha invocado next en el objeto generador y el intérprete de python lo está ejecutando.GEN_SUSPENDED
cuando un generador se suspende en un rendimientoGEN_CLOSED
cuando un generador ha completado la ejecución o se ha cerrado.Métodos en objetos generadores:
Un objeto generador expone diferentes métodos que se pueden invocar para manipular el generador. Estos son:
throw()
send()
close()
Las reglas de asyncio:
async def
introduce una rutina nativa o un generador asíncrono . Las expresiones async with
y async for
también son válidas.await
devuelve el control de la función al bucle de eventos. (Suspende la ejecución de la rutina circundante). Si Python encuentra una expresión await await f()
en el ámbito de g()
, así es como await
le dice al bucle de eventos: "Suspender la ejecución de g()
hasta lo que sea que esté esperando". on—se devuelve el resultado de f()
. Mientras tanto, deja que se ejecute otra cosa».En el código, ese segundo punto se ve más o menos así:
async def g(): # Pause here and come back to g() when f() is ready r = await f() return r
También hay un conjunto estricto de reglas sobre cuándo y cómo puede y no puede usar async
/ await
. Estos pueden ser útiles si todavía está recogiendo la sintaxis o si ya está expuesto al uso de async
/ await
:
async def
es una rutina. Puede usar await
, return
o yield
, pero todos estos son opcionales. Declarar async def noop(): pass
es válido:await
y/o return
crea una función de rutina. Para llamar a una función coroutine, debe await
a que obtenga sus resultados.yield
en un bloque de async def
. Esto crea un generador asíncrono , que se itera con async for
. Olvídese de los generadores asíncronos por el momento y concéntrese en obtener la sintaxis de las funciones coroutine, que usan await
y/o return
.async def
puede no usar yield from
, lo que generará un SyntaxError
.SyntaxError
para usar yield
fuera de una función de def
, es un SyntaxError
para usar await
fuera de una corrutina async def
. Solo puede usar await
en el cuerpo de coroutines.Aquí hay algunos ejemplos breves destinados a resumir las reglas anteriores:
async def f(x): y = await z(x) # OK - `await` and `return` allowed in coroutines return y async def g(x): yield x # OK - this is an async generator async def m(x): yield from gen(x) # NO - SyntaxError def m(x): y = await z(x) # NO - SyntaxError (no `async def` here) return y
Python creó una distinción entre los generadores de Python y los generadores destinados a ser utilizados como corrutinas. Estas corrutinas se denominan corrutinas basadas en generador y requieren que se agregue el decorador @asynio.coroutine
a la definición de la función, aunque esto no se aplica estrictamente.
Las corrutinas basadas en generador usan yield from
sintaxis en lugar de yield
. Una rutina puede:
Las corrutinas en Python hacen posible la multitarea cooperativa. La multitarea cooperativa es el enfoque en el que el proceso en ejecución cede voluntariamente la CPU a otros procesos. Un proceso puede hacerlo cuando está lógicamente bloqueado, por ejemplo, mientras espera la entrada del usuario o cuando ha iniciado una solicitud de red y estará inactivo por un tiempo. Una corrutina se puede definir como una función especial que puede ceder el control a su llamador sin perder su estado.
Entonces, ¿cuál es la diferencia entre rutinas y generadores?
Los generadores son esencialmente iteradores aunque parecen funciones. La distinción entre generadores y corrutinas, en general, es que:
La corrutina basada en un generador más simple que podemos escribir es la siguiente:
@asyncio.coroutine def do_something_important(): yield from asyncio.sleep(1)
La rutina duerme por un segundo. Tenga en cuenta el decorador y el uso de yield from
.
Por nativo se entiende que el lenguaje introdujo la sintaxis para definir específicamente las corrutinas, convirtiéndolas en ciudadanos de primera clase en el lenguaje. Las rutinas nativas se pueden definir mediante la sintaxis async/await
. La corrutina nativa más simple que podemos escribir es la siguiente:
async def do_something_important(): await asyncio.sleep(1)
AsyncIO viene con su propio conjunto de posibles diseños de secuencias de comandos, que analizaremos en esta sección.
1. Bucles de eventos
El bucle de eventos es una construcción de programación que espera que sucedan los eventos y luego los envía a un controlador de eventos. Un evento puede ser un usuario que hace clic en un botón de la interfaz de usuario o un proceso que inicia la descarga de un archivo. En el núcleo de la programación asincrónica se encuentra el bucle de eventos.
Código de ejemplo:
import asyncio import random import time from threading import Thread from threading import current_thread # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[34m", # Blue ) async def do_something_important(sleep_for): print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0]) await asyncio.sleep(sleep_for) def launch_event_loops(): # get a new event loop loop = asyncio.new_event_loop() # set the event loop for the current thread asyncio.set_event_loop(loop) # run a coroutine on the event loop loop.run_until_complete(do_something_important(random.randint(1, 5))) # remember to close the loop loop.close() if __name__ == "__main__": thread_1 = Thread(target=launch_event_loops) thread_2 = Thread(target=launch_event_loops) start_time = time.perf_counter() thread_1.start() thread_2.start() print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0]) thread_1.join() thread_2.join() end_time = time.perf_counter() execution_time = end_time - start_time print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])
Comando de ejecución: python async_event_loop.py
Producción:
Pruébelo usted mismo y examine la salida y se dará cuenta de que cada subproceso generado está ejecutando su propio ciclo de eventos.
Tipos de bucles de eventos
Hay dos tipos de bucles de eventos:
2. Futuros
Future representa un cálculo que está en curso o se programará en el futuro. Es un objeto esperable especial de bajo nivel que representa un resultado eventual de una operación asíncrona. No confunda threading.Future
y asyncio.Future
.
Código de ejemplo:
import time import asyncio from asyncio import Future # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[34m", # Blue ) async def bar(future): print(colors[1] + "bar will sleep for 3 seconds" + colors[0]) await asyncio.sleep(3) print(colors[1] + "bar resolving the future" + colors[0]) future.done() future.set_result("future is resolved") async def foo(future): print(colors[2] + "foo will await the future" + colors[0]) await future print(colors[2] + "foo finds the future resolved" + colors[0]) async def main(): future = Future() await asyncio.gather(foo(future), bar(future)) if __name__ == "__main__": start_time = time.perf_counter() asyncio.run(main()) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Comando de ejecución: python async_futures.py
Producción:
A ambas corrutinas se les pasa un futuro. La rutina foo()
espera que se resuelva el futuro, mientras que la rutina bar()
resuelve el futuro después de tres segundos.
3. Tareas
Las tareas son como futuros, de hecho, Task es una subclase de Future y se puede crear usando los siguientes métodos:
asyncio.create_task()
acepta rutinas y las envuelve como tareas.loop.create_task()
solo acepta rutinas.asyncio.ensure_future()
acepta futuros, rutinas y cualquier objeto aguardable.Las tareas envuelven corrutinas y las ejecutan en bucles de eventos. Si una rutina espera en un futuro, la tarea suspende la ejecución de la rutina y espera a que se complete el futuro. Cuando finaliza el Future, se reanuda la ejecución de la corrutina envuelta.
Código de ejemplo:
import time import asyncio from asyncio import Future # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[34m", # Blue ) async def bar(future): print(colors[1] + "bar will sleep for 3 seconds" + colors[0]) await asyncio.sleep(3) print(colors[1] + "bar resolving the future" + colors[0]) future.done() future.set_result("future is resolved") async def foo(future): print(colors[2] + "foo will await the future" + colors[0]) await future print(colors[2] + "foo finds the future resolved" + colors[0]) async def main(): future = Future() loop = asyncio.get_event_loop() t1 = loop.create_task(bar(future)) t2 = loop.create_task(foo(future)) await t2, t1 if __name__ == "__main__": start_time = time.perf_counter() loop = asyncio.get_event_loop() loop.run_until_complete(main()) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])
Comando de ejecución: python async_tasks.py
Producción:
4. Encadenamiento de rutinas:
Una característica clave de las corrutinas es que se pueden encadenar. Un objeto de corrutina está a la espera, por lo que otra corrutina puede await
. Esto le permite dividir los programas en corrutinas más pequeñas, manejables y reciclables:
Código de ejemplo:
import sys import asyncio import random import time # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[36m", # Cyan "\033[34m", # Blue ) async def function1(n: int) -> str: i = random.randint(0, 10) print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0]) await asyncio.sleep(i) result = f"result{n}-1" print(colors[1] + f"Returning function1({n}) == {result}." + colors[0]) return result async def function2(n: int, arg: str) -> str: i = random.randint(0, 10) print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0]) await asyncio.sleep(i) result = f"result{n}-2 derived from {arg}" print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0]) return result async def chain(n: int) -> None: start = time.perf_counter() p1 = await function1(n) p2 = await function2(n, p1) end = time.perf_counter() - start print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0]) async def main(*args): await asyncio.gather(*(chain(n) for n in args)) if __name__ == "__main__": random.seed(444) args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:]) start_time = time.perf_counter() asyncio.run(main(*args)) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Preste mucha atención a la salida, donde function1()
duerme durante un tiempo variable y function2()
comienza a trabajar con los resultados a medida que están disponibles:
Comando de ejecución: python async_chained.py 11 8 5
Producción:
5. Usando una cola:
En este diseño, no hay encadenamiento de ningún consumidor individual a un productor. Los consumidores no saben de antemano el número de productores, ni siquiera el número acumulativo de elementos que se agregarán a la cola.
A un productor o consumidor individual le toma una cantidad variable de tiempo poner y extraer elementos de la cola, respectivamente. La cola sirve como un rendimiento que puede comunicarse con los productores y consumidores sin que se comuniquen entre sí directamente.
Código de ejemplo:
import asyncio import argparse import itertools as it import os import random import time # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[36m", # Cyan "\033[34m", # Blue ) async def generate_item(size: int = 5) -> str: return os.urandom(size).hex() async def random_sleep(caller=None) -> None: i = random.randint(0, 10) if caller: print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0]) await asyncio.sleep(i) async def produce(name: int, producer_queue: asyncio.Queue) -> None: n = random.randint(0, 10) for _ in it.repeat(None, n): # Synchronous loop for each single producer await random_sleep(caller=f"Producer {name}") i = await generate_item() t = time.perf_counter() await producer_queue.put((i, t)) print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0]) async def consume(name: int, consumer_queue: asyncio.Queue) -> None: while True: await random_sleep(caller=f"Consumer {name}") i, t = await consumer_queue.get() now = time.perf_counter() print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0]) consumer_queue.task_done() async def main(no_producer: int, no_consumer: int): q = asyncio.Queue() producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)] consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)] await asyncio.gather(*producers) await q.join() # Implicitly awaits consumers, too for consumer in consumers: consumer.cancel() if __name__ == "__main__": random.seed(444) parser = argparse.ArgumentParser() parser.add_argument("-p", "--no_producer", type=int, default=10) parser.add_argument("-c", "--no_consumer", type=int, default=15) ns = parser.parse_args() start_time = time.perf_counter() asyncio.run(main(**ns.__dict__)) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Comando de ejecución: python async_queue.py -p 2 -c 4
Producción:
Por último, veamos un ejemplo de cómo asyncio reduce el tiempo de espera: dada una corrutina generate_random_int()
que sigue produciendo números enteros aleatorios en el rango [0, 10], hasta que uno de ellos supera un umbral, desea permitir múltiples llamadas de esta corrutina no necesita esperar a que la otra se complete en sucesión.
Código de ejemplo:
import time import asyncio import random # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[36m", # Cyan "\033[35m", # Magenta "\033[34m", # Blue ) async def generate_random_int(indx: int, threshold: int = 5) -> int: print(colors[indx + 1] + f"Initiated generate_random_int({indx}).") i = random.randint(0, 10) while i <= threshold: print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.") await asyncio.sleep(indx + 1) i = random.randint(0, 10) print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0]) return i async def main(): res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3))) return res if __name__ == "__main__": random.seed(444) start_time = time.perf_counter() r1, r2, r3 = asyncio.run(main()) print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0]) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])
Comando de ejecución: python async_random.py
Producción:
Nota: si está escribiendo algún código usted mismo, prefiera corrutinas nativas por ser explícito en lugar de implícito. Las corrutinas basadas en generador se eliminarán en Python 3.10.
Repositorio de GitHub: https://github.com/tssovi/asynchronous-in-python