• Jobs
  • About Us
  • professionals
    • Home
    • Jobs
    • Courses and challenges
  • business
    • Home
    • Post vacancy
    • Our process
    • Pricing
    • Assessments
    • Payroll
    • Blog
    • Sales
    • Salary Calculator

0

205
Views
How to speed up async requests in Python

I want to download/scrape 50 million log records from a site. Instead of downloading 50 million in one go, I was trying to download it in parts like 10 million at a time using the following code but it's only handling 20,000 at a time (more than that throws an error) so it becomes time-consuming to download that much data. Currently, it takes 3-4 mins to download 20,000 records with the speed of 100%|██████████| 20000/20000 [03:48<00:00, 87.41it/s] so how to speed it up?

import asyncio
import aiohttp
import time
import tqdm
import nest_asyncio

nest_asyncio.apply()


async def make_numbers(numbers, _numbers):
    for i in range(numbers, _numbers):
        yield i


n = 0
q = 10000000


async def fetch():
    # example
    url = "https://httpbin.org/anything/log?id="

    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        async for x in make_numbers(n, q):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]


async def do_get(session, url, x):
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'Access-Control-Allow-Origin': "*",
        'Accept-Encoding': "gzip, deflate",
        'Accept-Language': "en-US"
    }

    async with session.get(url + str(x), headers=headers) as response:
        data = await response.text()
        print(data)


s = time.perf_counter()
try:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(fetch())
except:
    print("error")

elapsed = time.perf_counter() - s
# print(f"{__file__} executed in {elapsed:0.2f} seconds.")

Traceback (most recent call last):

File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 986, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore[return-value]  # noqa
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1056, in create_connection
    raise exceptions[0]
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 1041, in create_connection
    sock = await self._connect_sock(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 955, in _connect_sock
    await self.sock_connect(sock, address)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\proactor_events.py", line 702, in sock_connect
    return await self._proactor.connect(sock, address)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 328, in __wakeup
    future.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 812, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\windows_events.py", line 599, in finish_connect
    ov.getresult()
OSError: [WinError 121] The semaphore timeout period has expired

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 136, in <module>
    loop.run_until_complete(fetch())
  File "C:\Users\SGM\AppData\Roaming\Python\Python39\site-packages\nest_asyncio.py", line 81, in run_until_complete
    return f.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
    raise self._exception
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 256, in __step
    result = coro.send(None)
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 88, in fetch
    response = await f
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 37, in _wait_for_one
    return f.result()
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\futures.py", line 201, in result
    raise self._exception
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\asyncio\tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "C:\Users\SGM\Desktop\xnet\x3stackoverflow.py", line 125, in do_get
    async with session.get(url + str(x), headers=headers) as response:
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 1138, in __aenter__
    self._resp = await self._coro
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\client.py", line 535, in _request
    conn = await self._connector.connect(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 542, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1206, in _create_direct_connection
    raise last_exc
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 1175, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
  File "C:\Users\SGM\AppData\Local\Programs\Python\Python39\lib\site-packages\aiohttp\connector.py", line 992, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]
over 3 years ago · Santiago Trujillo
1 answers
Answer question

0

If it's not the bandwidth that limits you (but I cannot check this), there is a solution less complicated than the celery and rabbitmq but it is not as scalable as the celery and rabbitmq, it will be limited by your number of CPU.

Instead of splitting calls on celery workers, you split them on multiple processes.

I modified the fetch function like this:

async def fetch(start, end):
    # example
    url = "https://httpbin.org/anything/log?id="
    async with aiohttp.ClientSession() as session:
        post_tasks = []
        # prepare the coroutines that poat
        # use start and end arguments here!
        async for x in make_numbers(start, end):
            post_tasks.append(do_get(session, url, x))
        # now execute them all at once

        responses = [await f for f in
                     tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]

and I modified the main processes:

import concurrent.futures
from itertools import count

def one_executor(start, end):
    loop = asyncio.new_event_loop()
    try:
        loop.run_until_complete(fetch(start, end))
    except:
        print("error")


if __name__ == '__main__':

    s = time.perf_counter()
    # Change the value to the number of core you want to use.
    max_worker = 4
    length_by_executor = q // max_worker
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_worker) as executor:
        for index_min in count(0, length_by_executor):
            # no matter with duplicated indexes due to the use of 
            # range in make_number function.
            index_max = min(index_min + length_by_executor, q)
            executor.submit(one_executor, index_min, index_max)
            if index_max == q:
                break

    elapsed = time.perf_counter() - s
    print(f"executed in {elapsed:0.2f} seconds.")

Here the result I get (with the value of q set to 10_000):

1 worker: executed in 13.90 seconds.
2 workers: executed in 7.24 seconds.
3 workers: executed in 6.82 seconds.

I don't work on the tqdm progress bar, with the current solution, two bars will be displayed (but I think tqdm works well with multi processes).

over 3 years ago · Santiago Trujillo Report
Answer question
Find remote jobs

Discover the new way to find a job!

Top jobs
Top job categories
Business
Post vacancy Pricing Our process Sales
Legal
Terms and conditions Privacy policy
© 2025 PeakU Inc. All Rights Reserved.

Andres GPT

Recommend me some offers
I have an error