Quiero descargar/extraer 50 millones de registros de un sitio. En lugar de descargar 50 millones de una sola vez, estaba tratando de descargarlo en partes como 10 millones a la vez usando el siguiente código, pero solo maneja 20,000 a la vez (más que eso genera un error), por lo que se vuelve lento para descargar tantos datos. Actualmente, toma de 3 a 4 minutos descargar 20,000 registros con una velocidad del 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]
entonces, ¿cómo acelerarlo?
import asyncio import aiohttp import time import tqdm import nest_asyncio nest_asyncio.apply() async def make_numbers(numbers, _numbers): for i in range(numbers, _numbers): yield i n = 0 q = 10000000 async def fetch(): # example url = "https://httpbin.org/anything/log?id=" async with aiohttp.ClientSession() as session: post_tasks = [] # prepare the coroutines that poat async for x in make_numbers(n, q): post_tasks.append(do_get(session, url, x)) # now execute them all at once responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))] async def do_get(session, url, x): headers = { 'Content-Type': "application/x-www-form-urlencoded", 'Access-Control-Allow-Origin': "*", 'Accept-Encoding': "gzip, deflate", 'Accept-Language': "en-US" } async with session.get(url + str(x), headers=headers) as response: data = await response.text() print(data) s = time.perf_counter() try: loop = asyncio.get_event_loop() loop.run_until_complete(fetch()) except: print("error") elapsed = time.perf_counter() - s # print(f"{__file__} executed in {elapsed:0.2f} seconds.")
Rastreo (llamadas recientes más última):
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection raise exceptions[0] File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection sock = await self._connect_sock( File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock await self.sock_connect(sock, address) File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect return await self._proactor.connect(sock, address) File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup future.result() File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll value = callback(transferred, key, ov) File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect ov.getresult() OSError: [WinError 121] The semaphore timeout period has expired The above exception was the direct cause of the following exception: Traceback (most recent call last): File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 136, in <module> loop.run_until_complete(fetch()) File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete return f.result() File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result raise self._exception File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step result = coro.send(None) File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 88, in fetch response = await f File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 37, in _wait_for_one return f.result() File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result raise self._exception File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step result = coro.throw(exc) File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 125, in do_get async with session.get(url + str(x), headers=headers) as response: File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__ self._resp = await self._coro File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request conn = await self._connector.connect( File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect proto = await self._create_connection(req, traces, timeout) File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection _, proto = await self._create_direct_connection(req, traces, timeout) File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection raise last_exc File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection transp, proto = await self._wrap_create_connection( File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection raise client_error(req.connection_key, exc) from exc aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
Si no es el ancho de banda lo que te limita (pero no puedo verificar esto), hay una solución menos complicada que celery y rabbitmq pero no es tan escalable como celery y rabbitmq, estará limitada por tu cantidad de CPU.
En lugar de dividir las llamadas en los trabajadores de apio, las divide en múltiples procesos.
Modifiqué la función de fetch
de esta manera:
async def fetch(start, end): # example url = "https://httpbin.org/anything/log?id=" async with aiohttp.ClientSession() as session: post_tasks = [] # prepare the coroutines that poat # use start and end arguments here! async for x in make_numbers(start, end): post_tasks.append(do_get(session, url, x)) # now execute them all at once responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
y modifiqué los procesos principales:
import concurrent.futures from itertools import count def one_executor(start, end): loop = asyncio.new_event_loop() try: loop.run_until_complete(fetch(start, end)) except: print("error") if __name__ == '__main__': s = time.perf_counter() # Change the value to the number of core you want to use. max_worker = 4 length_by_executor = q // max_worker with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor: for index_min in count(0, length_by_executor): # no matter with duplicated indexes due to the use of # range in make_number function. index_max = min(index_min + length_by_executor, q) executor.submit(one_executor, index_min, index_max) if index_max == q: break elapsed = time.perf_counter() - s print(f"executed in {elapsed:0.2f} seconds.")
Aquí el resultado que obtengo (con el valor de q
establecido en 10_000
):
1 worker: executed in 13.90 seconds. 2 workers: executed in 7.24 seconds. 3 workers: executed in 6.82 seconds.
No trabajo en la barra de progreso de tqdm
, con la solución actual, se mostrarán dos barras (pero creo que tqdm funciona bien con procesos múltiples).