• Empleos
  • Sobre nosotros
  • profesionales
    • Inicio
    • Empleos
    • Cursos y retos
  • empresas
    • Inicio
    • Publicar vacante
    • Nuestro proceso
    • Precios
    • Evaluaciones
    • Nómina
    • Blog
    • Comercial
    • Calculadora de salario

0

247
Vistas
¿Cómo funciona realmente asyncio?

Esta pregunta está motivada por mi otra pregunta: ¿Cómo esperar en cdef?

Hay toneladas de artículos y publicaciones de blog en la web sobre asyncio , pero todos son muy superficiales. No pude encontrar ninguna información sobre cómo se implementa realmente asyncio y qué hace que la E/S sea asíncrona. Estaba tratando de leer el código fuente, pero son miles de líneas de código C que no es del más alto grado, muchas de las cuales tratan con objetos auxiliares, pero lo más importante es que es difícil conectar entre la sintaxis de Python y el código C que traduciría. en.

La propia documentación de Asyncio es aún menos útil. No hay información allí sobre cómo funciona, solo algunas pautas sobre cómo usarlo, que a veces también son engañosas/muy mal escritas.

Estoy familiarizado con la implementación de rutinas de Go, y esperaba que Python hiciera lo mismo. Si ese fuera el caso, el código que encontré en la publicación vinculada anteriormente habría funcionado. Como no lo hizo, ahora estoy tratando de averiguar por qué. Mi mejor conjetura hasta ahora es la siguiente, corríjame donde me equivoque:

  1. Las definiciones de procedimientos de la forma async def foo(): ... en realidad se interpretan como métodos de una clase que hereda coroutine .
  2. Tal vez, async def en realidad se divide en varios métodos mediante declaraciones de await , donde el objeto, en el que se llama a estos métodos, puede realizar un seguimiento del progreso realizado a través de la ejecución hasta el momento.
  3. Si lo anterior es cierto, entonces, esencialmente, la ejecución de una corrutina se reduce a llamar a los métodos del objeto corrutina por parte de algún administrador global (¿bucle?).
  4. El administrador global es de alguna manera (¿cómo?) consciente de cuándo las operaciones de E/S son realizadas por el código de Python (¿solo?) y puede elegir uno de los métodos de rutina pendientes para ejecutar después de que el método de ejecución actual renuncie al control (presione el botón await declaración).

En otras palabras, aquí está mi intento de "desazucarar" alguna sintaxis asyncio en algo más comprensible:

 async def coro(name): print('before', name) await asyncio.sleep() print('after', name) asyncio.gather(coro('first'), coro('second')) # translated from async def coro(name) class Coro(coroutine): def before(self, name): print('before', name) def after(self, name): print('after', name) def __init__(self, name): self.name = name self.parts = self.before, self.after self.pos = 0 def __call__(): self.parts[self.pos](self.name) self.pos += 1 def done(self): return self.pos == len(self.parts) # translated from asyncio.gather() class AsyncIOManager: def gather(*coros): while not every(c.done() for c in coros): coro = random.choice(coros) coro()

Si mi conjetura resulta correcta: entonces tengo un problema. ¿Cómo ocurre realmente la E/S en este escenario? en un hilo aparte? ¿Se suspende todo el intérprete y la E/S ocurre fuera del intérprete? ¿Qué se entiende exactamente por E/S? Si mi procedimiento de python llamó al procedimiento C open() y, a su vez, envió una interrupción al kernel, cediendo el control, ¿cómo sabe esto el intérprete de Python y puede continuar ejecutando otro código, mientras que el código del kernel hace el I/ real? O y hasta que despierte el procedimiento de Python que envió la interrupción originalmente? ¿Cómo puede el intérprete de Python, en principio, ser consciente de que esto sucede?

over 3 years ago · Santiago Trujillo
3 Respuestas
Responde la pregunta

0

¿Cómo funciona asyncio?

Antes de responder a esta pregunta, debemos comprender algunos términos básicos; sáltelos si ya conoce alguno de ellos.

Generadores

Los generadores son objetos que nos permiten suspender la ejecución de una función de python. Los generadores seleccionados por el usuario se implementan utilizando la palabra clave yield . Al crear una función normal que contiene la palabra clave yield , convertimos esa función en un generador:

 >>> def test(): ... yield 1 ... yield 2 ... >>> gen = test() >>> next(gen) 1 >>> next(gen) 2 >>> next(gen) Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration

Como puede ver, llamar a next() en el generador hace que el intérprete cargue el marco de la prueba y yield el valor producido. Llamar a next() nuevamente, hace que el marco se cargue nuevamente en la pila del intérprete y continúa yield otro valor.

Para la tercera vez que se llama a next() , nuestro generador había terminado y se lanzó StopIteration .

Comunicarse con un generador

Una característica menos conocida de los generadores es el hecho de que puedes comunicarte con ellos usando dos métodos: send() y throw() .

 >>> def test(): ... val = yield 1 ... print(val) ... yield 2 ... yield 3 ... >>> gen = test() >>> next(gen) 1 >>> gen.send("abc") abc 2 >>> gen.throw(Exception()) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "<stdin>", line 4, in test Exception

Al llamar a gen.send() , el valor se pasa como valor de retorno de la palabra clave yield .

gen.throw() por otro lado, permite lanzar Excepciones dentro de los generadores, con la excepción generada en el mismo lugar en el que se llamó al yield .

Devolviendo valores de generadores

Al devolver un valor de un generador, el valor se coloca dentro de la excepción StopIteration . Más adelante podemos recuperar el valor de la excepción y usarlo según nuestras necesidades.

 >>> def test(): ... yield 1 ... return "abc" ... >>> gen = test() >>> next(gen) 1 >>> try: ... next(gen) ... except StopIteration as exc: ... print(exc.value) ... abc

He aquí una nueva palabra clave: yield from

Python 3.4 vino con la adición de una nueva palabra clave: yield from . Lo que esa palabra clave nos permite hacer es pasar cualquier next() , send() y throw() a un generador anidado más interno. Si el generador interno devuelve un valor, también es el valor de retorno de yield from :

 >>> def inner(): ... inner_result = yield 2 ... print('inner', inner_result) ... return 3 ... >>> def outer(): ... yield 1 ... val = yield from inner() ... print('outer', val) ... yield 4 ... >>> gen = outer() >>> next(gen) 1 >>> next(gen) # Goes inside inner() automatically 2 >>> gen.send("abc") inner abc outer 3 4

He escrito un artículo para profundizar en este tema.

Poniendolo todo junto

Al introducir la nueva palabra clave yield from en Python 3.4, ahora pudimos crear generadores dentro de generadores que, como un túnel, pasan los datos de un lado a otro desde los generadores más internos a los más externos. Esto ha dado lugar a un nuevo significado para los generadores: corrutinas .

Las rutinas son funciones que se pueden detener y reanudar mientras se ejecutan. En Python, se definen mediante la palabra clave async def . Al igual que los generadores, también utilizan su propia forma de yield from la que seawait . Antes de que async y await se introdujeran en Python 3.5, creamos corrutinas exactamente de la misma manera en que se crearon los generadores (con yield from en lugar de await ).

 async def inner(): return 1 async def outer(): await inner()

Al igual que todos los iteradores y generadores implementan el __iter__() , todas las rutinas implementan __await__() lo que les permite continuar cada vez que se llama a await coro .

Hay un buen diagrama de secuencia dentro de los documentos de Python que debería consultar.

En asyncio, además de las funciones coroutine, tenemos 2 objetos importantes: tareas y futuros .

Futuros

Los futuros son objetos que tienen implementado el método __await__() , y su trabajo es mantener un determinado estado y resultado. El estado puede ser uno de los siguientes:

  1. PENDIENTE: el futuro no tiene ningún resultado o conjunto de excepciones.
  2. CANCELADO: el futuro se canceló usando fut.cancel()
  3. FINALIZADO: el futuro finalizó, ya sea por un conjunto de resultados usando fut.set_result() o por un conjunto de excepciones usando fut.set_exception()

El resultado, tal como lo ha adivinado, puede ser un objeto de Python, que se devolverá, o una excepción que se generará.

Otra característica importante de los objetos future es que contienen un método llamado add_done_callback() . Este método permite llamar a las funciones tan pronto como se realiza la tarea, ya sea que haya generado una excepción o haya finalizado.

Tareas

Los objetos de tarea son futuros especiales, que se envuelven alrededor de las corrutinas y se comunican con las corrutinas más internas y más externas. Cada vez que una corrutina await un futuro, el futuro se pasa de regreso a la tarea (al igual que en el yield from ), y la tarea lo recibe.

A continuación, la tarea se vincula al futuro. Lo hace llamando a add_done_callback() en el futuro. De ahora en adelante, si el futuro se realiza alguna vez, ya sea cancelándolo, pasando una excepción o pasando un objeto de Python como resultado, se llamará a la devolución de llamada de la tarea y volverá a existir.

Asíncio

La última pregunta candente que debemos responder es: ¿cómo se implementa el IO?

En lo más profundo de asyncio, tenemos un bucle de eventos. Un bucle de eventos de tareas. El trabajo del bucle de eventos es llamar a las tareas cada vez que estén listas y coordinar todo ese esfuerzo en una sola máquina de trabajo.

La parte IO del bucle de eventos se basa en una única función crucial llamada select . Select es una función de bloqueo, implementada por el sistema operativo subyacente, que permite esperar en los sockets los datos entrantes o salientes. Al recibir datos, se activa y devuelve los sockets que recibieron datos o los sockets que están listos para escribir.

Cuando intenta recibir o enviar datos a través de un socket a través de asyncio, lo que realmente sucede a continuación es que primero se verifica si el socket tiene datos que se puedan leer o enviar de inmediato. Si su .send() está lleno, o el .recv() está vacío, el socket se registra en la función de select (simplemente agregándolo a una de las listas, rlist para recv y wlist para send ) y el correspondiente La función await un objeto future recién creado, vinculado a ese zócalo.

Cuando todas las tareas disponibles están esperando futuros, el bucle de eventos llama a select y espera. Cuando uno de los sockets tiene datos entrantes, o su búfer de send se agota, asyncio verifica el objeto futuro vinculado a ese socket y lo configura como listo.

Ahora toda la magia sucede. El futuro está listo, la tarea que se agregó antes con add_done_callback() vuelve a la vida y llama a .send() en la corrutina que reanuda la corrutina más interna (debido a la cadena de await ) y lee el datos recién recibidos de un búfer cercano al que se derramó.

Cadena de métodos nuevamente, en caso de recv() :

  1. select.select espera.
  2. Se devuelve un socket listo, con datos.
  3. Los datos del socket se mueven a un búfer.
  4. Se llama future.set_result() .
  5. La tarea que se agregó a sí misma con add_done_callback() ahora se activa.
  6. Task llama a .send() en la corrutina que va hasta la corrutina más interna y la activa.
  7. Los datos se leen del búfer y se devuelven a nuestro humilde usuario.

En resumen, asyncio utiliza capacidades de generador, que permiten pausar y reanudar funciones. Utiliza el yield from las capacidades que permiten pasar datos de un lado a otro desde el generador más interno al más externo. Utiliza todos esos para detener la ejecución de la función mientras espera que se complete IO (mediante el uso de la función de select del sistema operativo).

¿Y lo mejor de todo? Mientras una función está en pausa, otra puede ejecutarse e intercalarse con el delicado tejido, que es asíncio.

over 3 years ago · Santiago Trujillo Denunciar

0

Hablar de async/await await y asyncio no es lo mismo. La primera es una construcción fundamental de bajo nivel (corrutinas), mientras que la última es una biblioteca que utiliza estas construcciones. Por el contrario, no hay una única respuesta definitiva.

La siguiente es una descripción general de cómo funcionan las bibliotecas async/await await y asyncio -like. Es decir, puede haber otros trucos encima (los hay...) pero son intrascendentes a menos que los construyas tú mismo. La diferencia debería ser insignificante a menos que ya sepa lo suficiente como para no tener que hacer esa pregunta.

1. Corrutinas versus subrutinas en pocas palabras

Al igual que las subrutinas (funciones, procedimientos, ...), las corrutinas (generadores, ...) son una abstracción de la pila de llamadas y el puntero de instrucciones: hay una pila de fragmentos de código en ejecución, y cada uno está en una instrucción específica.

La distinción entre def y definición async def es simplemente para mayor claridad. La diferencia real es return versus yield . A partir de esto, await o yield from tomar la diferencia de llamadas individuales a pilas completas.

1.1. subrutinas

Una subrutina representa un nuevo nivel de pila para contener variables locales y un solo recorrido de sus instrucciones para llegar a un final. Considere una subrutina como esta:

 def subfoo(bar): qux = 3 return qux * bar

Cuando lo ejecutas, eso significa

  1. asignar espacio de pila para bar y qux
  2. ejecutar recursivamente la primera declaración y saltar a la siguiente declaración
  3. una vez en un return , empuja su valor a la pila de llamadas
  4. borrar la pila (1.) y el puntero de instrucciones (2.)

En particular, 4. significa que una subrutina siempre comienza en el mismo estado. Todo lo exclusivo de la función en sí se pierde al finalizar. No se puede reanudar una función, incluso si hay instrucciones después de la return .

 root -\ : \- subfoo --\ :/--<---return --/ | V

1.2. Corrutinas como subrutinas persistentes

Una rutina es como una subrutina, pero puede salir sin destruir su estado. Considere una rutina como esta:

 def cofoo(bar): qux = yield bar # yield marks a break point return qux

Cuando lo ejecutas, eso significa

  1. asignar espacio de pila para bar y qux
  2. ejecutar recursivamente la primera declaración y saltar a la siguiente declaración
    1. una vez en yield , empuje su valor a la pila de llamadas pero almacene la pila y el puntero de instrucción
    2. una vez que llame a yield , restaure la pila y el puntero de instrucción y empuje los argumentos a qux
  3. una vez en un return , empuja su valor a la pila de llamadas
  4. borrar la pila (1.) y el puntero de instrucciones (2.)

Tenga en cuenta la adición de 2.1 y 2.2: una rutina puede suspenderse y reanudarse en puntos predefinidos. Esto es similar a cómo se suspende una subrutina durante la llamada a otra subrutina. La diferencia es que la rutina activa no está estrictamente vinculada a su pila de llamadas. En cambio, una rutina suspendida es parte de una pila separada y aislada.

 root -\ : \- cofoo --\ :/--<+--yield --/ | : V :

Esto significa que las corrutinas suspendidas se pueden almacenar o mover libremente entre pilas. Cualquier pila de llamadas que tenga acceso a una rutina puede decidir reanudarla.

1.3. Atravesando la pila de llamadas

Hasta ahora, nuestra corrutina solo baja en la pila de llamadas con yield . Una subrutina puede subir y bajar en la pila de llamadas con return y () . Para completar, las corrutinas también necesitan un mecanismo para subir la pila de llamadas. Considere una rutina como esta:

 def wrap(): yield 'before' yield from cofoo() yield 'after'

Cuando lo ejecuta, eso significa que aún asigna la pila y el puntero de instrucción como una subrutina. Cuando se suspende, eso todavía es como almacenar una subrutina.

Sin embargo, yield from hace ambas cosas . Suspende la pila y el puntero de instrucciones de wrap y ejecuta cofoo . Tenga en cuenta que la wrap permanece suspendida hasta cofoo termine por completo. Cada vez que cofoo suspende o se envía algo, cofoo se conecta directamente a la pila de llamadas.

1.4. Coroutines todo el camino hacia abajo

Según lo establecido, yield from permite conectar dos ámbitos a través de otro intermedio. Cuando se aplica recursivamente, eso significa que la parte superior de la pila se puede conectar a la parte inferior de la pila.

 root -\ : \-> coro_a -yield-from-> coro_b --\ :/ <-+------------------------yield ---/ | : :\ --+-- coro_a.send----------yield ---\ : coro_b <-/

Tenga en cuenta que root y coro_b no se conocen. Esto hace que las corrutinas sean mucho más limpias que las devoluciones de llamada: las corrutinas aún se basan en una relación 1: 1 como las subrutinas. Las corrutinas suspenden y reanudan toda su pila de ejecución existente hasta un punto de llamada normal.

En particular, root podría tener un número arbitrario de rutinas para reanudar. Sin embargo, nunca puede reanudar más de uno al mismo tiempo. ¡Las corrutinas de la misma raíz son concurrentes pero no paralelas!

1.5. Python async y await

Hasta ahora, la explicación ha utilizado explícitamente el yield y el yield from vocabulario de los generadores: la funcionalidad subyacente es la misma. La nueva sintaxis de Python3.5 async y await existe principalmente para mayor claridad.

 def foo(): # subroutine? return None def foo(): # coroutine? yield from foofoo() # generator? coroutine? async def foo(): # coroutine! await foofoo() # coroutine! return None

Las declaraciones async for y async with son necesarias porque rompería la cadena yield from/await con las declaraciones for y with desnudas.

2. Anatomía de un bucle de eventos simple

Por sí misma, una corrutina no tiene el concepto de ceder el control a otra corrutina. Solo puede ceder el control a la persona que llama en la parte inferior de una pila de rutinas. Esta persona que llama puede cambiar a otra rutina y ejecutarla.

Este nodo raíz de varias corrutinas es comúnmente un bucle de eventos : en suspensión, una corrutina produce un evento en el que desea reanudar. A su vez, el bucle de eventos es capaz de esperar de manera eficiente a que ocurran estos eventos. Esto le permite decidir qué rutina ejecutar a continuación o cómo esperar antes de reanudar.

Tal diseño implica que hay un conjunto de eventos predefinidos que comprende el ciclo. Varias corrutinas se await unas a otras, hasta que finalmente se await un evento. Este evento puede comunicarse directamente con el bucle de eventos yield el control.

 loop -\ : \-> coroutine --await--> event --\ :/ <-+----------------------- yield --/ | : | : # loop waits for event to happen | : :\ --+-- send(reply) -------- yield --\ : coroutine <--yield-- event <-/

La clave es que la suspensión de la rutina permite que el bucle de eventos y los eventos se comuniquen directamente. La pila de rutina intermedia no requiere ningún conocimiento sobre qué ciclo la está ejecutando, ni cómo funcionan los eventos.

2.1.1. Eventos en el tiempo

El evento más simple de manejar es llegar a un punto en el tiempo. Este también es un bloque fundamental del código de subprocesos: un subproceso sleep repetidamente hasta que se cumple una condición. Sin embargo, una sleep regular bloquea la ejecución por sí misma: queremos que otras corrutinas no se bloqueen. En su lugar, queremos decirle al ciclo de eventos cuándo debe reanudar la pila de rutinas actual.

2.1.2. Definición de un evento

Un evento es simplemente un valor que podemos identificar, ya sea a través de una enumeración, un tipo u otra identidad. Podemos definir esto con una clase simple que almacena nuestro tiempo objetivo. Además de almacenar la información del evento, podemos permitir await una clase directamente.

 class AsyncSleep: """Event to sleep until a point in time""" def __init__(self, until: float): self.until = until # used whenever someone ``await``s an instance of this Event def __await__(self): # yield this Event to the loop yield self def __repr__(self): return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

Esta clase solo almacena el evento; no dice cómo manejarlo realmente.

La única característica especial es __await__ : es lo que busca la palabra clave await . Prácticamente, es un iterador pero no está disponible para la maquinaria de iteración normal.

2.2.1. En espera de un evento

Ahora que tenemos un evento, ¿cómo reaccionan las corrutinas? Deberíamos poder expresar el equivalente del sleep await nuestro evento. Para ver mejor lo que está pasando, esperamos dos veces durante la mitad del tiempo:

 import time async def asleep(duration: float): """await that ``duration`` seconds pass""" await AsyncSleep(time.time() + duration / 2) await AsyncSleep(time.time() + duration / 2)

Podemos instanciar y ejecutar directamente esta rutina. Similar a un generador, usar coroutine.send ejecuta la corrutina hasta que yield un resultado.

 coroutine = asleep(100) while True: print(coroutine.send(None)) time.sleep(0.1)

Esto nos da dos eventos AsyncSleep y luego una StopIteration cuando finaliza la rutina. Tenga en cuenta que el único retraso es de time.sleep in the loop. Cada AsyncSleep solo almacena un desplazamiento de la hora actual.

2.2.2. Evento + Sueño

En este punto, tenemos dos mecanismos separados a nuestra disposición:

  • Eventos AsyncSleep que se pueden producir desde dentro de una corrutina
  • time.sleep que puede esperar sin afectar las rutinas

En particular, estos dos son ortogonales: ninguno afecta o desencadena al otro. Como resultado, podemos idear nuestra propia estrategia para sleep para cumplir con el retraso de un AsyncSleep .

2.3. Un bucle de eventos ingenuo

Si tenemos varias corrutinas, cada una puede decirnos cuando quiere ser despertada. Entonces podemos esperar a que se quiera reanudar el primero de ellos, luego al siguiente, y así sucesivamente. Cabe destacar que en cada punto solo nos importa cuál es el siguiente .

Esto hace que la programación sea sencilla:

  1. ordena las corrutinas por su hora de despertar deseada
  2. elige el primero que quiera despertar
  3. esperar hasta este punto en el tiempo
  4. ejecuta esta rutina
  5. repetir desde 1.

Una implementación trivial no necesita ningún concepto avanzado. Una list permite ordenar las corrutinas por fecha. Esperar es un tiempo regular. time.sleep . Ejecutar coroutines funciona igual que antes con coroutine.send .

 def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" # store wake-up-time and coroutines waiting = [(0, coroutine) for coroutine in coroutines] while waiting: # 2. pick the first coroutine that wants to wake up until, coroutine = waiting.pop(0) # 3. wait until this point in time time.sleep(max(0.0, until - time.time())) # 4. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0])

Por supuesto, esto tiene un amplio margen de mejora. Podemos usar un montón para la cola de espera o una tabla de despacho para eventos. También podríamos obtener valores de retorno de StopIteration y asignarlos a la rutina. Sin embargo, el principio fundamental sigue siendo el mismo.

2.4. Cooperativa de espera

El evento AsyncSleep y el bucle de eventos de run son una implementación completamente funcional de eventos cronometrados.

 async def sleepy(identifier: str = "coroutine", count=5): for i in range(count): print(identifier, 'step', i + 1, 'at %.2f' % time.time()) await asleep(0.1) run(*(sleepy("coroutine %d" % j) for j in range(5)))

Esto cambia cooperativamente entre cada una de las cinco corrutinas, suspendiendo cada una por 0.1 segundos. Aunque el bucle de eventos es síncrono, aún ejecuta el trabajo en 0,5 segundos en lugar de 2,5 segundos. Cada rutina tiene estado y actúa de forma independiente.

3. Bucle de eventos de E/S

Un bucle de eventos que admita sleep es adecuado para sondeos . Sin embargo, esperar la E/S en un identificador de archivo se puede hacer de manera más eficiente: el sistema operativo implementa la E/S y, por lo tanto, sabe qué identificadores están listos. Idealmente, un bucle de eventos debe admitir un evento "listo para E/S" explícito.

3.1. La llamada select

Python ya tiene una interfaz para consultar el sistema operativo en busca de identificadores de E/S de lectura. Cuando se llama con identificadores para leer o escribir, devuelve los identificadores listos para leer o escribir:

 readable, writeable, _ = select.select(rlist, wlist, xlist, timeout)

Por ejemplo, podemos open un archivo para escritura y esperar a que esté listo:

 write_target = open('/tmp/foo') readable, writeable, _ = select.select([], [write_target], [])

Una vez que seleccione devoluciones, writeable contiene nuestro archivo abierto.

3.2. Evento de E/S básico

Similar a la solicitud AsyncSleep , necesitamos definir un evento para E/S. Con la lógica de select subyacente, el evento debe hacer referencia a un objeto legible, por ejemplo, un archivo open . Además, almacenamos la cantidad de datos para leer.

 class AsyncRead: def __init__(self, file, amount=1): self.file = file self.amount = amount self._buffer = '' def __await__(self): while len(self._buffer) < self.amount: yield self # we only get here if ``read`` should not block self._buffer += self.file.read(1) return self._buffer def __repr__(self): return '%s(file=%s, amount=%d, progress=%d)' % ( self.__class__.__name__, self.file, self.amount, len(self._buffer) )

Al igual que con AsyncSleep , en su mayoría solo almacenamos los datos necesarios para la llamada al sistema subyacente. Esta vez, __await__ se puede reanudar varias veces, hasta que se haya leído la amount deseada. Además, return el resultado de E/S en lugar de simplemente reanudar.

3.3. Aumento de un bucle de eventos con E/S de lectura

La base de nuestro bucle de eventos sigue siendo la run definida anteriormente. Primero, necesitamos rastrear las solicitudes de lectura. Este ya no es un cronograma ordenado, solo asignamos solicitudes de lectura a rutinas.

 # new waiting_read = {} # type: Dict[file, coroutine]

Dado que select.select toma un parámetro de tiempo de espera, podemos usarlo en lugar de time.sleep .

 # old time.sleep(max(0.0, until - time.time())) # new readable, _, _ = select.select(list(reads), [], [])

Esto nos da todos los archivos legibles; si hay alguno, ejecutamos la rutina correspondiente. Si no hay ninguno, hemos esperado lo suficiente para que se ejecute nuestra rutina actual.

 # new - reschedule waiting coroutine, run readable coroutine if readable: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read[readable[0]]

Finalmente, tenemos que escuchar las solicitudes de lectura.

 # new if isinstance(command, AsyncSleep): ... elif isinstance(command, AsyncRead): ...

3.4. Poniendo todo junto

Lo anterior fue un poco simplificado. Necesitamos hacer algunos cambios para no matar de hambre a los corutines dormidos si siempre podemos leer. Necesitamos manejar el no tener nada que leer o nada que esperar. Sin embargo, el resultado final aún se ajusta a 30 LOC.

 def run(*coroutines): """Cooperatively run all ``coroutines`` until completion""" waiting_read = {} # type: Dict[file, coroutine] waiting = [(0, coroutine) for coroutine in coroutines] while waiting or waiting_read: # 2. wait until the next coroutine may run or read ... try: until, coroutine = waiting.pop(0) except IndexError: until, coroutine = float('inf'), None readable, _, _ = select.select(list(waiting_read), [], []) else: readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time())) # ... and select the appropriate one if readable and time.time() < until: if until and coroutine: waiting.append((until, coroutine)) waiting.sort() coroutine = waiting_read.pop(readable[0]) # 3. run this coroutine try: command = coroutine.send(None) except StopIteration: continue # 1. sort coroutines by their desired suspension ... if isinstance(command, AsyncSleep): waiting.append((command.until, coroutine)) waiting.sort(key=lambda item: item[0]) # ... or register reads elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine

3.5. E/S cooperativa

Las AsyncSleep , AsyncRead y run ahora son completamente funcionales para dormir y/o leer. Al igual que para sleepy , podemos definir un ayudante para probar la lectura:

 async def ready(path, amount=1024*32): print('read', path, 'at', '%d' % time.time()) with open(path, 'rb') as file: result = await AsyncRead(file, amount) print('done', path, 'at', '%d' % time.time()) print('got', len(result), 'B') run(sleepy('background', 5), ready('/dev/urandom'))

Ejecutando esto, podemos ver que nuestra E/S está intercalada con la tarea en espera:

 id background round 1 read /dev/urandom at 1530721148 id background round 2 id background round 3 id background round 4 id background round 5 done /dev/urandom at 1530721148 got 1024 B

4. E/S sin bloqueo

Si bien la E/S en los archivos transmite el concepto, no es realmente adecuado para una biblioteca como asyncio : la llamada de select siempre regresa para los archivos , y tanto open como la read pueden bloquearse indefinidamente . Esto bloquea todas las rutinas de un bucle de eventos, lo cual es malo. Las bibliotecas como aiofiles usan subprocesos y sincronización para falsificar E/S sin bloqueo y eventos en el archivo.

Sin embargo, los sockets permiten E/S sin bloqueo, y su latencia inherente lo hace mucho más crítico. Cuando se usa en un bucle de eventos, la espera de datos y el reintento se pueden ajustar sin bloquear nada.

4.1. Evento de E/S sin bloqueo

Similar a nuestro AsyncRead , podemos definir un evento de suspensión y lectura para sockets. En lugar de tomar un archivo, tomamos un socket, que no debe bloquear. Además, nuestro __await__ usa socket.recv en lugar de file.read .

 class AsyncRecv: def __init__(self, connection, amount=1, read_buffer=1024): assert not connection.getblocking(), 'connection must be non-blocking for async recv' self.connection = connection self.amount = amount self.read_buffer = read_buffer self._buffer = b'' def __await__(self): while len(self._buffer) < self.amount: try: self._buffer += self.connection.recv(self.read_buffer) except BlockingIOError: yield self return self._buffer def __repr__(self): return '%s(file=%s, amount=%d, progress=%d)' % ( self.__class__.__name__, self.connection, self.amount, len(self._buffer) )

A diferencia de AsyncRead , __await__ realiza E/S verdaderamente sin bloqueo. Cuando hay datos disponibles, siempre se lee. Cuando no hay datos disponibles, siempre se suspende. Eso significa que el bucle de eventos solo se bloquea mientras realizamos un trabajo útil.

4.2. Desbloqueo del bucle de eventos

En lo que respecta al ciclo de eventos, nada cambia mucho. El evento para escuchar sigue siendo el mismo que para los archivos: un descriptor de archivo marcado como listo por select .

 # old elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine # new elif isinstance(command, AsyncRead): waiting_read[command.file] = coroutine elif isinstance(command, AsyncRecv): waiting_read[command.connection] = coroutine

En este punto, debería ser obvio que AsyncRead y AsyncRecv son el mismo tipo de evento. Podríamos refactorizarlos fácilmente para que sean un evento con un componente de E/S intercambiable. En efecto, el bucle de eventos, las corrutinas y los eventos separan claramente un planificador, un código intermedio arbitrario y la E/S real.

4.3. El lado feo de la E/S sin bloqueo

En principio, lo que deberías hacer en este punto es replicar la lógica de read as a recv para AsyncRecv . Sin embargo, esto es mucho más feo ahora: debe manejar los retornos tempranos cuando las funciones se bloquean dentro del kernel, pero le ceden el control. Por ejemplo, abrir una conexión versus abrir un archivo es mucho más largo:

 # file file = open(path, 'rb') # non-blocking socket connection = socket.socket() connection.setblocking(False) # open without blocking - retry on failure try: connection.connect((url, port)) except BlockingIOError: pass

Para resumir, lo que queda son unas pocas docenas de líneas de manejo de excepciones. Los eventos y el bucle de eventos ya funcionan en este punto.

 id background round 1 read localhost:25000 at 1530783569 read /dev/urandom at 1530783569 done localhost:25000 at 1530783569 got 32768 B id background round 2 id background round 3 id background round 4 done /dev/urandom at 1530783569 got 4096 B id background round 5

Apéndice

Código de ejemplo en github

over 3 years ago · Santiago Trujillo Denunciar

0

¿Qué es asincio?

Asyncio significa entrada y salida asíncrona y se refiere a un paradigma de programación que logra una alta concurrencia utilizando un solo hilo o bucle de eventos. La programación asincrónica es un tipo de programación paralela en la que se permite que una unidad de trabajo se ejecute por separado del subproceso de la aplicación principal. Cuando se completa el trabajo, notifica al subproceso principal sobre la finalización o falla del subproceso de trabajo.

Echemos un vistazo en la imagen de abajo:

flujo_asincrónico

Entendamos asyncio con un ejemplo:

Para entender el concepto detrás de asyncio, consideremos un restaurante con un solo mesero. De repente, aparecen tres clientes, A, B y C. Los tres toman una cantidad variable de tiempo para decidir qué comer una vez que reciben el menú del mesero.

Supongamos que A toma 5 minutos, B 10 minutos y C 1 minuto para decidir. Si el camarero soltero comienza primero con B y toma el pedido de B en 10 minutos, luego sirve a A y dedica 5 minutos a anotar su pedido y finalmente dedica 1 minuto a saber qué quiere comer C. Entonces, en total, el mesero gasta 10 + 5 + 1 = 16 minutos para anotar sus pedidos. Sin embargo, fíjate en esta secuencia de eventos, C termina esperando 15 minutos antes de que el mesero llegue a él, A espera 10 minutos y B espera 0 minutos.

Ahora considere si el mesero supiera el tiempo que tomaría cada cliente para decidir. Puede comenzar con C primero, luego ir a A y finalmente a B. De esta manera, cada cliente experimentaría una espera de 0 minutos. Se crea una ilusión de tres camareros, uno dedicado a cada cliente aunque sólo haya uno.

Por último, el tiempo total que tarda el camarero en tomar los tres pedidos es de 10 minutos, mucho menos que los 16 minutos del otro escenario.

Veamos otro ejemplo:

Supongamos que el maestro de ajedrez Magnus Carlsen organiza una exhibición de ajedrez en la que juega con varios jugadores aficionados. Tiene dos formas de conducir la exposición: de forma sincrónica y asincrónica.

Suposiciones:

  • 24 oponentes
  • Magnus Carlsen hace cada movimiento de ajedrez en 5 segundos
  • Cada oponente tarda 55 segundos en hacer un movimiento.
  • Los juegos promedian 30 pares de movimientos (60 movimientos en total)

Sincrónicamente : Magnus Carlsen juega un juego a la vez, nunca dos al mismo tiempo, hasta que se completa el juego. Cada juego toma (55 + 5) * 30 == 1800 segundos o 30 minutos . Toda la exposición dura 24 * 30 == 720 minutos, o 12 horas .

Asincrónicamente : Magnus Carlsen se mueve de mesa en mesa, haciendo un movimiento en cada mesa. Ella deja la mesa y deja que el oponente haga su próximo movimiento durante el tiempo de espera. Un movimiento en los 24 juegos le lleva a Judit 24 * 5 == 120 segundos o 2 minutos . Toda la exposición ahora se reduce a 120 * 30 == 3600 segundos, o solo 1 hora

Solo hay un Magnus Carlsen, que solo tiene dos manos y solo hace un movimiento a la vez. Pero jugar de forma asíncrona reduce el tiempo de exhibición de 12 horas a una.

Ejemplo de codificación:

Intentemos demostrar el tiempo de ejecución síncrono y asíncrono usando un fragmento de código.

Asíncrono - async_count.py

 import asyncio import time async def count(): print("One", end=" ") await asyncio.sleep(1) print("Two", end=" ") await asyncio.sleep(2) print("Three", end=" ") async def main(): await asyncio.gather(count(), count(), count(), count(), count()) if __name__ == "__main__": start_time = time.perf_counter() asyncio.run(main()) end_time = time.perf_counter() execution_time = end_time - start_time print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

Asíncrono - Salida :

 One One One One One Two Two Two Two Two Three Three Three Three Three Executing - async_count.py Execution Starts: 18453.442160108 Executions Ends: 18456.444719712 Totals Execution Time:3.00 seconds.

Síncrono - sync_count.py

 import time def count(): print("One", end=" ") time.sleep(1) print("Two", end=" ") time.sleep(2) print("Three", end=" ") def main(): for _ in range(5): count() if __name__ == "__main__": start_time = time.perf_counter() main() end_time = time.perf_counter() execution_time = end_time - start_time print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

Síncrono - Salida :

 One Two Three One Two Three One Two Three One Two Three One Two Three Executing - sync_count.py Execution Starts: 18875.175965998 Executions Ends: 18890.189930292 Totals Execution Time:15.01 seconds.

¿Por qué usar asyncio en lugar de subprocesos múltiples en Python?

  • Es muy difícil escribir código que sea seguro para subprocesos. Con el código asincrónico, sabe exactamente dónde cambiará el código de una tarea a la siguiente y las condiciones de carrera son mucho más difíciles de conseguir.
  • Los subprocesos consumen una buena cantidad de datos, ya que cada subproceso necesita tener su propia pila. Con el código asíncrono, todo el código comparte la misma pila y la pila se mantiene pequeña debido a que se desenrolla continuamente entre tareas.
  • Los subprocesos son estructuras del sistema operativo y, por lo tanto, requieren más memoria para que la plataforma los admita. No existe tal problema con las tareas asincrónicas.

¿Cómo funciona asyncio?

Antes de profundizar, recordemos Python Generator

Generador de Python:

Las funciones que contienen una declaración de yield se compilan como generadores. El uso de una expresión de rendimiento en el cuerpo de una función hace que esa función sea un generador. Estas funciones devuelven un objeto que admite los métodos del protocolo de iteración. El objeto generador creado recibe automáticamente un __next()__ . Volviendo al ejemplo de la sección anterior, podemos invocar __next__ directamente en el objeto generador en lugar de usar next() :

 def asynchronous(): yield "Educative" if __name__ == "__main__": gen = asynchronous() str = gen.__next__() print(str)

Recuerde lo siguiente acerca de los generadores:

  • Las funciones del generador le permiten posponer la computación de valores costosos. Solo calcula el siguiente valor cuando es necesario. Esto hace que la memoria y el cálculo de los generadores sean eficientes; se abstienen de guardar secuencias largas en la memoria o de hacer todos los cálculos costosos por adelantado.
  • Los generadores, cuando están suspendidos, retienen la ubicación del código, que es la última declaración de rendimiento ejecutada, y su alcance local completo. Esto les permite reanudar la ejecución desde donde la dejaron.
  • Los objetos generadores no son más que iteradores.
  • Recuerde hacer una distinción entre una función generadora y el objeto generador asociado, que a menudo se usan indistintamente. Cuando se invoca una función generadora, se devuelve un objeto generador y se invoca next() en el objeto generador para ejecutar el código dentro de la función generadora.

Estados de un generador:

Un generador pasa por los siguientes estados:

  • GEN_CREATED cuando se devuelve un objeto generador por primera vez desde una función generadora y la iteración no ha comenzado.
  • GEN_RUNNING cuando se ha invocado next en el objeto generador y el intérprete de python lo está ejecutando.
  • GEN_SUSPENDED cuando un generador se suspende en un rendimiento
  • GEN_CLOSED cuando un generador ha completado la ejecución o se ha cerrado.

generador_ciclo

Métodos en objetos generadores:

Un objeto generador expone diferentes métodos que se pueden invocar para manipular el generador. Estos son:

  • throw()
  • send()
  • close()

Profundicemos en explicaciones más detalladas

Las reglas de asyncio:

  • La sintaxis async def introduce una rutina nativa o un generador asíncrono . Las expresiones async with y async for también son válidas.
  • La palabra clave await devuelve el control de la función al bucle de eventos. (Suspende la ejecución de la rutina circundante). Si Python encuentra una expresión await await f() en el ámbito de g() , así es como await le dice al bucle de eventos: "Suspender la ejecución de g() hasta lo que sea que esté esperando". on—se devuelve el resultado de f() . Mientras tanto, deja que se ejecute otra cosa».

En el código, ese segundo punto se ve más o menos así:

 async def g(): # Pause here and come back to g() when f() is ready r = await f() return r

También hay un conjunto estricto de reglas sobre cuándo y cómo puede y no puede usar async / await . Estos pueden ser útiles si todavía está recogiendo la sintaxis o si ya está expuesto al uso de async / await :

  • Una función que introduce con async def es una rutina. Puede usar await , return o yield , pero todos estos son opcionales. Declarar async def noop(): pass es válido:
    • El uso de await y/o return crea una función de rutina. Para llamar a una función coroutine, debe await a que obtenga sus resultados.
    • Es menos común usar yield en un bloque de async def . Esto crea un generador asíncrono , que se itera con async for . Olvídese de los generadores asíncronos por el momento y concéntrese en obtener la sintaxis de las funciones coroutine, que usan await y/o return .
    • Cualquier cosa definida con async def puede no usar yield from , lo que generará un SyntaxError .
  • Al igual que es un SyntaxError para usar yield fuera de una función de def , es un SyntaxError para usar await fuera de una corrutina async def . Solo puede usar await en el cuerpo de coroutines.

Aquí hay algunos ejemplos breves destinados a resumir las reglas anteriores:

 async def f(x): y = await z(x) # OK - `await` and `return` allowed in coroutines return y async def g(x): yield x # OK - this is an async generator async def m(x): yield from gen(x) # NO - SyntaxError def m(x): y = await z(x) # NO - SyntaxError (no `async def` here) return y

Rutina basada en generador

Python creó una distinción entre los generadores de Python y los generadores destinados a ser utilizados como corrutinas. Estas corrutinas se denominan corrutinas basadas en generador y requieren que se agregue el decorador @asynio.coroutine a la definición de la función, aunque esto no se aplica estrictamente.

Las corrutinas basadas en generador usan yield from sintaxis en lugar de yield . Una rutina puede:

  • rendimiento de otra rutina
  • rendimiento de un futuro
  • devolver una expresión
  • generar excepción

Las corrutinas en Python hacen posible la multitarea cooperativa. La multitarea cooperativa es el enfoque en el que el proceso en ejecución cede voluntariamente la CPU a otros procesos. Un proceso puede hacerlo cuando está lógicamente bloqueado, por ejemplo, mientras espera la entrada del usuario o cuando ha iniciado una solicitud de red y estará inactivo por un tiempo. Una corrutina se puede definir como una función especial que puede ceder el control a su llamador sin perder su estado.

Entonces, ¿cuál es la diferencia entre rutinas y generadores?

Los generadores son esencialmente iteradores aunque parecen funciones. La distinción entre generadores y corrutinas, en general, es que:

  • Los generadores devuelven un valor al invocador, mientras que una corrutina cede el control a otra corrutina y puede reanudar la ejecución desde el punto en que cede el control.
  • Un generador no puede aceptar argumentos una vez iniciado, mientras que una rutina sí.
  • Los generadores se utilizan principalmente para simplificar la escritura de iteradores. Son un tipo de corrutina y, a veces, también se denominan semicorutinas.

Ejemplo de rutina basada en generador

La corrutina basada en un generador más simple que podemos escribir es la siguiente:

 @asyncio.coroutine def do_something_important(): yield from asyncio.sleep(1)

La rutina duerme por un segundo. Tenga en cuenta el decorador y el uso de yield from .

Ejemplo de rutina basada en nativo

Por nativo se entiende que el lenguaje introdujo la sintaxis para definir específicamente las corrutinas, convirtiéndolas en ciudadanos de primera clase en el lenguaje. Las rutinas nativas se pueden definir mediante la sintaxis async/await . La corrutina nativa más simple que podemos escribir es la siguiente:

 async def do_something_important(): await asyncio.sleep(1)

Patrones de diseño AsyncIO

AsyncIO viene con su propio conjunto de posibles diseños de secuencias de comandos, que analizaremos en esta sección.

1. Bucles de eventos

El bucle de eventos es una construcción de programación que espera que sucedan los eventos y luego los envía a un controlador de eventos. Un evento puede ser un usuario que hace clic en un botón de la interfaz de usuario o un proceso que inicia la descarga de un archivo. En el núcleo de la programación asincrónica se encuentra el bucle de eventos.

Código de ejemplo:

 import asyncio import random import time from threading import Thread from threading import current_thread # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[34m", # Blue ) async def do_something_important(sleep_for): print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0]) await asyncio.sleep(sleep_for) def launch_event_loops(): # get a new event loop loop = asyncio.new_event_loop() # set the event loop for the current thread asyncio.set_event_loop(loop) # run a coroutine on the event loop loop.run_until_complete(do_something_important(random.randint(1, 5))) # remember to close the loop loop.close() if __name__ == "__main__": thread_1 = Thread(target=launch_event_loops) thread_2 = Thread(target=launch_event_loops) start_time = time.perf_counter() thread_1.start() thread_2.start() print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0]) thread_1.join() thread_2.join() end_time = time.perf_counter() execution_time = end_time - start_time print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])

Comando de ejecución: python async_event_loop.py

Producción:

async_event_loop

Pruébelo usted mismo y examine la salida y se dará cuenta de que cada subproceso generado está ejecutando su propio ciclo de eventos.

Tipos de bucles de eventos

Hay dos tipos de bucles de eventos:

  • SelectorEventLoop : SelectorEventLoop se basa en el módulo de selectores y es el bucle predeterminado en todas las plataformas.
  • ProactorEventLoop : ProactorEventLoop se basa en los puertos de finalización de E/S de Windows y solo es compatible con Windows.

2. Futuros

Future representa un cálculo que está en curso o se programará en el futuro. Es un objeto esperable especial de bajo nivel que representa un resultado eventual de una operación asíncrona. No confunda threading.Future y asyncio.Future .

Código de ejemplo:

 import time import asyncio from asyncio import Future # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[34m", # Blue ) async def bar(future): print(colors[1] + "bar will sleep for 3 seconds" + colors[0]) await asyncio.sleep(3) print(colors[1] + "bar resolving the future" + colors[0]) future.done() future.set_result("future is resolved") async def foo(future): print(colors[2] + "foo will await the future" + colors[0]) await future print(colors[2] + "foo finds the future resolved" + colors[0]) async def main(): future = Future() await asyncio.gather(foo(future), bar(future)) if __name__ == "__main__": start_time = time.perf_counter() asyncio.run(main()) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

Comando de ejecución: python async_futures.py

Producción:

asíncrono_futuros

A ambas corrutinas se les pasa un futuro. La rutina foo() espera que se resuelva el futuro, mientras que la rutina bar() resuelve el futuro después de tres segundos.

3. Tareas

Las tareas son como futuros, de hecho, Task es una subclase de Future y se puede crear usando los siguientes métodos:

  • asyncio.create_task() acepta rutinas y las envuelve como tareas.
  • loop.create_task() solo acepta rutinas.
  • asyncio.ensure_future() acepta futuros, rutinas y cualquier objeto aguardable.

Las tareas envuelven corrutinas y las ejecutan en bucles de eventos. Si una rutina espera en un futuro, la tarea suspende la ejecución de la rutina y espera a que se complete el futuro. Cuando finaliza el Future, se reanuda la ejecución de la corrutina envuelta.

Código de ejemplo:

 import time import asyncio from asyncio import Future # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[34m", # Blue ) async def bar(future): print(colors[1] + "bar will sleep for 3 seconds" + colors[0]) await asyncio.sleep(3) print(colors[1] + "bar resolving the future" + colors[0]) future.done() future.set_result("future is resolved") async def foo(future): print(colors[2] + "foo will await the future" + colors[0]) await future print(colors[2] + "foo finds the future resolved" + colors[0]) async def main(): future = Future() loop = asyncio.get_event_loop() t1 = loop.create_task(bar(future)) t2 = loop.create_task(foo(future)) await t2, t1 if __name__ == "__main__": start_time = time.perf_counter() loop = asyncio.get_event_loop() loop.run_until_complete(main()) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

Comando de ejecución: python async_tasks.py

Producción:

tareas_asincrónicas

4. Encadenamiento de rutinas:

Una característica clave de las corrutinas es que se pueden encadenar. Un objeto de corrutina está a la espera, por lo que otra corrutina puede await . Esto le permite dividir los programas en corrutinas más pequeñas, manejables y reciclables:

Código de ejemplo:

 import sys import asyncio import random import time # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[36m", # Cyan "\033[34m", # Blue ) async def function1(n: int) -> str: i = random.randint(0, 10) print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0]) await asyncio.sleep(i) result = f"result{n}-1" print(colors[1] + f"Returning function1({n}) == {result}." + colors[0]) return result async def function2(n: int, arg: str) -> str: i = random.randint(0, 10) print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0]) await asyncio.sleep(i) result = f"result{n}-2 derived from {arg}" print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0]) return result async def chain(n: int) -> None: start = time.perf_counter() p1 = await function1(n) p2 = await function2(n, p1) end = time.perf_counter() - start print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0]) async def main(*args): await asyncio.gather(*(chain(n) for n in args)) if __name__ == "__main__": random.seed(444) args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:]) start_time = time.perf_counter() asyncio.run(main(*args)) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

Preste mucha atención a la salida, donde function1() duerme durante un tiempo variable y function2() comienza a trabajar con los resultados a medida que están disponibles:

Comando de ejecución: python async_chained.py 11 8 5

Producción:

asíncrono_encadenado

5. Usando una cola:

En este diseño, no hay encadenamiento de ningún consumidor individual a un productor. Los consumidores no saben de antemano el número de productores, ni siquiera el número acumulativo de elementos que se agregarán a la cola.

A un productor o consumidor individual le toma una cantidad variable de tiempo poner y extraer elementos de la cola, respectivamente. La cola sirve como un rendimiento que puede comunicarse con los productores y consumidores sin que se comuniquen entre sí directamente.

Código de ejemplo:

 import asyncio import argparse import itertools as it import os import random import time # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[36m", # Cyan "\033[34m", # Blue ) async def generate_item(size: int = 5) -> str: return os.urandom(size).hex() async def random_sleep(caller=None) -> None: i = random.randint(0, 10) if caller: print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0]) await asyncio.sleep(i) async def produce(name: int, producer_queue: asyncio.Queue) -> None: n = random.randint(0, 10) for _ in it.repeat(None, n): # Synchronous loop for each single producer await random_sleep(caller=f"Producer {name}") i = await generate_item() t = time.perf_counter() await producer_queue.put((i, t)) print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0]) async def consume(name: int, consumer_queue: asyncio.Queue) -> None: while True: await random_sleep(caller=f"Consumer {name}") i, t = await consumer_queue.get() now = time.perf_counter() print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0]) consumer_queue.task_done() async def main(no_producer: int, no_consumer: int): q = asyncio.Queue() producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)] consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)] await asyncio.gather(*producers) await q.join() # Implicitly awaits consumers, too for consumer in consumers: consumer.cancel() if __name__ == "__main__": random.seed(444) parser = argparse.ArgumentParser() parser.add_argument("-p", "--no_producer", type=int, default=10) parser.add_argument("-c", "--no_consumer", type=int, default=15) ns = parser.parse_args() start_time = time.perf_counter() asyncio.run(main(**ns.__dict__)) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

Comando de ejecución: python async_queue.py -p 2 -c 4

Producción:

asíncrono_cola

Por último, veamos un ejemplo de cómo asyncio reduce el tiempo de espera: dada una corrutina generate_random_int() que sigue produciendo números enteros aleatorios en el rango [0, 10], hasta que uno de ellos supera un umbral, desea permitir múltiples llamadas de esta corrutina no necesita esperar a que la otra se complete en sucesión.

Código de ejemplo:

 import time import asyncio import random # ANSI colors colors = ( "\033[0m", # End of color "\033[31m", # Red "\033[32m", # Green "\033[36m", # Cyan "\033[35m", # Magenta "\033[34m", # Blue ) async def generate_random_int(indx: int, threshold: int = 5) -> int: print(colors[indx + 1] + f"Initiated generate_random_int({indx}).") i = random.randint(0, 10) while i <= threshold: print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.") await asyncio.sleep(indx + 1) i = random.randint(0, 10) print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0]) return i async def main(): res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3))) return res if __name__ == "__main__": random.seed(444) start_time = time.perf_counter() r1, r2, r3 = asyncio.run(main()) print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0]) end_time = time.perf_counter() execution_time = end_time - start_time print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

Comando de ejecución: python async_random.py

Producción:

asíncrono_aleatorio

Nota: si está escribiendo algún código usted mismo, prefiera corrutinas nativas por ser explícito en lugar de implícito. Las corrutinas basadas en generador se eliminarán en Python 3.10.

Repositorio de GitHub: https://github.com/tssovi/asynchronous-in-python

over 3 years ago · Santiago Trujillo Denunciar
Responde la pregunta
Encuentra empleos remotos

¡Descubre la nueva forma de encontrar empleo!

Top de empleos
Top categorías de empleo
Empresas
Publicar vacante Precios Nuestro proceso Comercial
Legal
Términos y condiciones Política de privacidad
© 2025 PeakU Inc. All Rights Reserved.

Andres GPT

Recomiéndame algunas ofertas
Necesito ayuda