I want to download/scrape 50 million log records from a site. Instead of downloading 50 million in one go, I was trying to download it in parts like 10 million at a time using the following code but it's only handling 20,000 at a time (more than that throws an error) so it becomes time-consuming to download that much data. Currently, it takes 3-4 mins to download 20,000 records with the speed of 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s]
so how to speed it up?
import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio
nest_asyncio.apply()
async def make_numbers(numbers, _numbers):
for i in range(numbers, _numbers):
yield i
n = 0
q = 10000000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
async for x in make_numbers(n, q):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
print(data)
s = time.perf_counter()
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch())
except:
print("error")
elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")
Traceback (most recent call last):
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
raise exceptions[0]
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
sock = await self._connect_sock(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
await self.sock_connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
return await self._proactor.connect(sock, address)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
future.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
value = callback(transferred, key, ov)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 136, in <module>
loop.run_until_complete(fetch())
File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
result = coro.send(None)
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 88, in fetch
response = await f
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 37, in _wait_for_one
return f.result()
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
raise self._exception
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
result = coro.throw(exc)
File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 125, in do_get
async with session.get(url + str(x), headers=headers) as response:
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
self._resp = await self._coro
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
conn = await self._connector.connect(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
raise last_exc
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
transp, proto = await self._wrap_create_connection(
File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
If it's not the bandwidth that limits you (but I cannot check this), there is a solution less complicated than the celery and rabbitmq but it is not as scalable as the celery and rabbitmq, it will be limited by your number of CPU.
Instead of splitting calls on celery workers, you split them on multiple processes.
I modified the fetch
function like this:
async def fetch(start, end):
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession() as session:
post_tasks = []
# prepare the coroutines that poat
# use start and end arguments here!
async for x in make_numbers(start, end):
post_tasks.append(do_get(session, url, x))
# now execute them all at once
responses = [await f for f in
tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]
and I modified the main processes:
import concurrent.futures
from itertools import count
def one_executor(start, end):
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(fetch(start, end))
except:
print("error")
if __name__ == '__main__':
s = time.perf_counter()
# Change the value to the number of core you want to use.
max_worker = 4
length_by_executor = q // max_worker
with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
for index_min in count(0, length_by_executor):
# no matter with duplicated indexes due to the use of
# range in make_number function.
index_max = min(index_min + length_by_executor, q)
executor.submit(one_executor, index_min, index_max)
if index_max == q:
break
elapsed = time.perf_counter() - s
print(f"executed in {elapsed:0.2f} seconds.")
Here the result I get (with the value of q
set to 10_000
):
1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.
I don't work on the tqdm
progress bar, with the current solution, two bars will be displayed (but I think tqdm works well with multi processes).