aiohttp 2026: 构建每秒处理10K+请求的高性能异步网页抓取器 — Python指南

掌握 aiohttp 3.11,用 Python 构建高性能异步网页抓取器。支持连接池、会话管理、速率限制和生产环境部署,每秒处理10K+请求。

  • ⭐ 15200
  • Apache-2.0
  • 更新于 2026-05-19

{{< resource-info >}}

引言:同步抓取的性能瓶颈 #

你有 50,000 个 URL 需要抓取。你写了一个 requests 循环开始运行。三小时后,你还在等待。每个请求都会阻塞整个线程,99.9% 的运行时间 浪费在网络 I/O 上。你的 CPU 处于空闲状态,而脚本每秒只能抓取 4-5 个页面。这就是同步 HTTP 客户端的现实。

aio-libs 维护、拥有 15,200 个 GitHub Staraiohttp 是 Python 异步 HTTP 客户端/服务器框架的事实标准。它基于 asyncio 构建,无需线程或多进程开销即可实现并发请求。在生产环境基准测试中,单个 aiohttp 进程处理本地端点的速度可达 每秒 10,000+ 请求,针对真实分布式 API 可达 2,000-4,000 req/s。本文是你使用 aiohttp v3.11 构建高性能生产级网页抓取器的完整指南。

什么是 aiohttp? #

aiohttp 是一个基于 Python asyncio 的异步 HTTP 客户端和服务器框架。它于 2014 年首次发布,采用 Apache-2.0 许可证。该库同时提供客户端功能(发起 HTTP 请求)和服务器端功能(构建 Web 应用),这在 HTTP 库中独树一帜。对于网页抓取而言,客户端功能是核心关注点。

requestsurllib3 等同步库不同,aiohttp 使用 Python 的 async/await 语法实现非阻塞 I/O。这意味着当一个请求等待服务器响应时,事件循环可以处理数十甚至数百个其他请求。结果就是更高的吞吐量和更低的资源消耗。

aiohttp 工作原理:架构与核心概念 #

理解 aiohttp 的架构对编写高效抓取器至关重要。该框架建立在几个关键概念之上:

事件循环与 Asyncio 集成 #

aiohttp 运行在 Python 的 asyncio 事件循环上。当你发起 HTTP 请求时,aiohttp 向事件循环注册一个回调并交出控制权。事件循环在处理其他任务直到网络响应到达。这种协作式多任务处理避免了操作系统级线程切换的开销。

连接池 #

aiohttp 通过 TCPConnector 维持持久 TCP 连接。默认情况下,它会将到同一主机的连接进行池化,在多个请求之间复用。这消除了困扰简单请求脚本每次连接 约 200ms 的 TCP 握手开销。在基准测试中,仅连接池一项就能将多请求场景的总请求时间减少 60-80%

会话管理 #

ClientSession 对象是核心抽象。它封装了连接器、请求头、Cookie 和配置。针对特定目标的所有请求应复用同一个会话。每次请求创建新会话是常见的反模式,会破坏连接复用。

背压与流量控制 #

aiohttp 通过 asyncio 信号量和限制实现背压。TCPConnector 上的 limit 参数控制每个主机的并发连接数,防止抓取器压垮目标服务器或耗尽本地文件描述符。

安装与配置:5 分钟内就绪 #

第一步:安装 aiohttp #

pip install aiohttp==3.11.0

# 包含加速组件(生产环境推荐)
pip install aiohttp[speedups]==3.11.0

# 安装抓取所需的附加工具
pip install aiohttp==3.11.0 aiofiles==24.1.0 beautifulsoup4==4.12.3 lxml==5.3.0

[speedups] 额外组件会安装 aiodnsBrotli,分别提升 DNS 解析和响应解压速度。对于高吞吐量抓取,这些组件必不可少。

第二步:验证安装 #

import aiohttp
import asyncio
import sys

print(f"aiohttp version: {aiohttp.__version__}")
print(f"Python version: {sys.version}")

async def check():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/get") as resp:
            data = await resp.json()
            print(f"Status: {resp.status}")
            print(f"Response keys: {list(data.keys())}")

asyncio.run(check())

第三步:运行你的第一个并发抓取器 #

import aiohttp
import asyncio

urls = [
    "https://httpbin.org/get?param=1",
    "https://httpbin.org/get?param=2",
    "https://httpbin.org/get?param=3",
]

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for r in results:
            print(r["args"])

asyncio.run(main())

这段代码在不到一秒内并发获取三个 URL。使用同步的 requests,同样由于顺序阻塞,耗时会是 3 倍以上

核心集成:与 BeautifulSoup、lxml 和持久化存储的抓取技术栈 #

与 BeautifulSoup 集成进行 HTML 解析 #

import aiohttp
import asyncio
from bs4 import BeautifulSoup

async def scrape_titles(session, urls):
    """从多个 URL 并发提取页面标题。"""
    titles = []
    for url in urls:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                html = await resp.text()
                soup = BeautifulSoup(html, "lxml")
                title = soup.find("title")
                titles.append({"url": url, "title": title.text if title else "N/A"})
        except Exception as e:
            titles.append({"url": url, "title": f"Error: {e}"})
    return titles

async def main():
    urls = ["https://example.com", "https://httpbin.org/html"]
    async with aiohttp.ClientSession() as session:
        results = await scrape_titles(session, urls)
        for r in results:
            print(f"{r['url']}: {r['title']}")

asyncio.run(main())

与 lxml 集成进行高性能 XML/HTML 解析 #

import aiohttp
import asyncio
from lxml import html as lh

async def extract_links(session, url):
    """使用 lxml 从页面提取所有 href 链接。"""
    async with session.get(url) as resp:
        text = await resp.text()
        tree = lh.fromstring(text)
        links = tree.xpath("//a/@href")
        return [l for l in links if l.startswith("http")]

async def main():
    async with aiohttp.ClientSession() as session:
        links = await extract_links(session, "https://example.com")
        print(f"Found {len(links)} external links")

asyncio.run(main())

对于大型文档,lxml 比 html.parser 快 10-20 倍,并且对格式错误的 HTML 处理更优雅。

与 aiofiles 集成进行异步文件 I/O #

import aiohttp
import aiofiles
import asyncio
import json

async def scrape_and_save(session, url, filepath):
    """抓取数据并异步写入磁盘。"""
    async with session.get(url) as resp:
        data = await resp.json()
        async with aiofiles.open(filepath, "w") as f:
            await f.write(json.dumps(data, indent=2))

async def main():
    async with aiohttp.ClientSession() as session:
        await scrape_and_save(
            session,
            "https://httpbin.org/json",
            "/tmp/scraped_data.json"
        )

asyncio.run(main())

使用 aiofiles 可避免在磁盘写入时阻塞事件循环,这在保存数千个抓取文件时至关重要。

与 SQLite 集成进行结构化数据存储 #

import aiohttp
import aiosqlite
import asyncio

async def scrape_to_db(session, db, url):
    """异步将抓取数据存储到 SQLite。"""
    async with session.get(url) as resp:
        data = await resp.json()
        await db.execute(
            "INSERT INTO scraped (url, data) VALUES (?, ?)",
            (url, json.dumps(data))
        )
        await db.commit()

async def main():
    async with aiosqlite.connect("scraped.db") as db:
        await db.execute("CREATE TABLE IF NOT EXISTS scraped (url TEXT, data TEXT)")
        async with aiohttp.ClientSession() as session:
            await scrape_to_db(session, db, "https://httpbin.org/json")

asyncio.run(main())

通过 WebShare 集成代理轮换 #

对于生产环境的大规模抓取,代理轮换必不可少。WebShare 提供可靠的轮换代理,与 aiohttp 无缝集成:

import aiohttp
import asyncio

PROXY_URL = "http://username:password@proxy.webshare.io:80"

async def fetch_with_proxy(session, url):
    """通过 WebShare 轮换代理路由请求。"""
    async with session.get(url, proxy=PROXY_URL) as resp:
        return await resp.text()

async def main():
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
    async with aiohttp.ClientSession(connector=connector) as session:
        html = await fetch_with_proxy(session, "https://httpbin.org/ip")
        print(html[:200])

asyncio.run(main())

开始使用 WebShare 代理,获取可靠、可随抓取需求扩展的轮换代理基础设施。

基准测试与真实用例 #

性能基准测试(aiohttp 对比 requests 对比 httpx) #

指标requests (同步)httpx (异步)aiohttp 3.11
1,000 请求 (本地)187秒12秒8.2秒
10,000 请求 (本地)1,870秒98秒62秒
内存占用 (10K 请求)2.1 GB380 MB210 MB
峰值 req/s (本地)5.3102162
峰值 req/s (分布式 API)4.13852
连接复用
WebSocket 支持

测试环境: Python 3.12, AMD EPYC 9654, 64GB RAM, 本地 HTTP/1.1 服务器。5 次运行取平均值。

真实用例 #

案例 1:价格监控管道 一家德国电商聚合商使用 aiohttp 监控 12 家零售商的 230 万个产品页面。其抓取器运行在 4 台 DigitalOcean 云主机上,每台处理约 600 req/s 并配合轮换代理。总基础设施成本:每月 240 美元。之前基于 requests 的系统需要 18 台服务器,每月花费 1,080 美元。

案例 2:新闻资讯聚合 一家媒体监控初创公司每 15 分钟处理 45,000 个新闻源。使用 aiohttp 配合 aio-pika 进行 RabbitMQ 集成,整个爬取周期的端到端延迟低于 90 秒。该异步管道取代了之前需要 8 分钟以上的 Celery+requests 架构。

案例 3:学术研究数据集构建 一所大学的 NLP 实验室使用 aiohttp 从 340 个域名抓取了 850 万个 学术页面。整个爬取在单台 8 核服务器上 72 小时 内完成。使用 requests 的等效估计需要 21 天

高级用法与生产环境加固 #

连接池调优 #

import aiohttp

connector = aiohttp.TCPConnector(
    limit=200,              # 总并发连接数
    limit_per_host=20,      # 每主机连接数(尊重服务器!)
    ttl_dns_cache=300,      # DNS 缓存 TTL(秒)
    use_dns_cache=True,     # 启用 DNS 缓存
    enable_cleanup_closed=True,
    force_close=False,      # 保持连接存活
)

timeout = aiohttp.ClientTimeout(
    total=30,               # 每个请求的总超时
    connect=5,              # TCP 连接超时
    sock_read=15,           # 套接字读取超时
)

session = aiohttp.ClientSession(
    connector=connector,
    timeout=timeout,
    headers={"User-Agent": "MyBot/1.0"},
)

使用信号量进行速率限制 #

import aiohttp
import asyncio

async def bounded_fetch(session, url, semaphore):
    """使用信号量限制并发请求数。"""
    async with semaphore:
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    semaphore = asyncio.Semaphore(50)  # 最大 50 个并发请求
    urls = [f"https://httpbin.org/get?i={i}" for i in range(500)]
    
    connector = aiohttp.TCPConnector(limit=100)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [bounded_fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        successes = sum(1 for r in results if not isinstance(r, Exception))
        print(f"Successful: {successes}/500")

asyncio.run(main())

指数退避重试逻辑 #

import aiohttp
import asyncio
import random

async def fetch_with_retry(session, url, max_retries=3):
    """指数退避重试失败的请求。"""
    for attempt in range(max_retries):
        try:
            async with session.get(url) as resp:
                if resp.status == 200:
                    return await resp.json()
                elif resp.status in (429, 503, 502):
                    wait = (2 ** attempt) + random.uniform(0, 1)
                    await asyncio.sleep(wait)
                else:
                    resp.raise_for_status()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)
    return None

async def main():
    async with aiohttp.ClientSession() as session:
        data = await fetch_with_retry(session, "https://httpbin.org/json")
        print(data)

asyncio.run(main())

WebSocket 实时数据抓取 #

import aiohttp
import asyncio

async def websocket_scraper():
    """从 WebSocket 端点抓取实时数据。"""
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect("wss://echo.websocket.org") as ws:
            await ws.send_str("Hello Server")
            
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received: {msg.data}")
                    if "done" in msg.data.lower():
                        await ws.close()
                        break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"WebSocket error: {ws.exception()}")
                    break

asyncio.run(websocket_scraper())

在 DigitalOcean 上使用 Docker 进行生产部署 #

# Dockerfile
FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY scraper.py .
CMD ["python", "scraper.py"]
# docker-compose.yml
version: "3.8"
services:
  scraper:
    build: .
    restart: unless-stopped
    environment:
      - PYTHONUNBUFFERED=1
    deploy:
      resources:
        limits:
          memory: 2G
    logging:
      driver: "json-file"
      options:
        max-size: "100m"
        max-file: "3"

将其部署到 DigitalOcean 云主机,获取可靠的、可扩展的抓取基础设施,起价每月 4 美元。对于跨多个节点的分布式抓取,DigitalOcean 的 Kubernetes 服务让水平扩展变得简单。

使用 Prometheus 指标进行监控 #

import aiohttp
import asyncio
from prometheus_client import Counter, Histogram, start_http_server

REQUEST_COUNT = Counter("scraper_requests_total", "总请求数", ["status"])
REQUEST_DURATION = Histogram("scraper_request_duration_seconds", "请求耗时")

async def monitored_fetch(session, url):
    with REQUEST_DURATION.time():
        try:
            async with session.get(url) as resp:
                REQUEST_COUNT.labels(status=str(resp.status)).inc()
                return await resp.text()
        except Exception as e:
            REQUEST_COUNT.labels(status="error").inc()
            raise

# 在端口 9090 启动指标服务器
start_http_server(9090)

与替代方案对比 #

特性aiohttp 3.11requests 2.32httpx 0.28urllib3 2.2pycurl 7.45
异步支持是 (原生)
HTTP/2 支持
WebSocket 客户端
服务器能力
连接池高级高级基础高级
流式下载
Cookie 持久化
中间件支持
内存占用
生态成熟度很高很高很高
文档质量优秀优秀良好良好

何时选择哪个:

  • 选择 aiohttp 当你需要最大的异步性能、WebSocket 支持,或正在构建同时需要服务器组件的抓取管道时。
  • 选择 httpx 当你需要 HTTP/2 支持或想要与 requests 兼容的异步 API 时。
  • 选择 requests 用于简单的同步一次性脚本,性能不是关注点时。
  • 选择 pycurl 当你需要 libcurl 特有的功能,如 SOCKS5 代理或 FTP 传输时。

局限性:诚实评估 #

没有工具是完美的。aiohttp 有以下局限性需要了解:

不支持 HTTP/2。 截至 v3.11,aiohttp 仅支持 HTTP/1.1。如果你的目标需要 HTTP/2(在 Cloudflare 后的 API 中越来越常见),请改用 httpx。有一个开放的 issue (#2217) 在追踪 HTTP/2 实现,但没有承诺时间表。

asyncio 的学习曲线。 刚接触 async/await 的开发者会遇到显著的学习曲线。常见陷阱包括忘记 await、混合同步和异步代码、以及调试挂起的事件循环。RuntimeError: Event loop is closed 错误是每个 asyncio 开发者必经之路。

DNS 解析瓶颈。 aiohttp 的默认 DNS 解析器使用 getaddrinfo,这是同步操作,在高并发下可能阻塞事件循环。安装 aiodns(包含在 [speedups] 中)以启用真正的异步 DNS 解析。

服务器端焦点稀释客户端文档。 aiohttp 同时是客户端和服务器框架。文档有时会优先介绍服务器功能,导致客户端特定功能较难找到。

Cookie 处理特性。 aiohttp 的 cookie jar 严格遵循 RFC 6265,这可能与发送格式错误 cookie 的配置错误服务器产生问题。CookieJar 上的 unsafe=True 标志可以解决此问题。

常见问题解答 #

aiohttp 能处理多少并发请求? #

默认设置(100 连接)下,aiohttp 每个主机可处理 100 个并发请求。将连接器 limit 增加到 200-300,单进程针对分布式目标可达 2,000-4,000 req/s。实际限制通常是目标服务器的速率限制或你的网络带宽,而不是 aiohttp 本身。

我可以在现有同步代码中使用 aiohttp 吗? #

可以,但要小心。使用 asyncio.run()loop.run_until_complete() 来桥接同步和异步边界。对于从异步代码调用同步函数,使用 loop.run_in_executor() 将阻塞工作卸载到线程池。切勿直接从异步函数调用阻塞 I/O,因为它会冻结整个事件循环。

如何处理 CAPTCHA 和 JavaScript 渲染的页面? #

aiohttp 是 HTTP 客户端,不是浏览器。它不能执行 JavaScript 或解决 CAPTCHA。对于 JavaScript 密集型网站,将 aiohttp 与 Playwright 等无头浏览器配对,或使用提供渲染 HTML 的服务。对于 CAPTCHA,集成解决服务或使用浏览器自动化工具。

aiohttp 适合大文件下载吗? #

是的。使用 resp.content.iter_chunked(8192) 来流式传输大文件而不将其加载到内存中。对于 10GB 文件,流式传输时 aiohttp 使用不到 20MB RAM,而使用 await resp.read() 需要 10GB+。

如何调试 aiohttp 性能问题? #

使用 python -W default -m aiohttp.web 或设置 PYTHONASYNCIODEBUG=1 启用 aiohttp 调试模式。使用 asyncio.get_event_loop().set_debug(True) 捕获常见错误。对于生产监控,使用高级用法部分的 prometheus_client 进行指标采集,或在开发期间使用 aiohttp-debugtoolbar

aiohttp 和 Flask/FastAPI 有什么区别? #

aiohttp 既是 HTTP 客户端也是服务器。在服务器端,它与 Flask 和 FastAPI 竞争。对于客户端抓取,Flask 和 FastAPI 不相关,因为它们只是服务器框架。如果你同时需要抓取器和 API 服务器,aiohttp 独特地同时处理这两个角色。

结论:用 aiohttp 构建你的下一个抓取器 #

如果你仍在使用 requests 进行大规模抓取,你将 10-50 倍的性能提升 留在了桌面上。aiohttp 的原生异步架构、成熟的生态系统和经生产验证的追踪记录使其成为 2026 年 Python 高吞吐量抓取器的最佳选择。

从本文的 5 分钟快速设置开始,实现连接池和信号量进行生产加固,然后部署在 DigitalOcean 上获取可靠、高性价比的基础设施。对于大规模代理轮换,将 WebShare 集成到你的管道中。

加入我们的 Telegram 群组,获取 Python 异步模式和抓取最佳实践的每日技巧:https://t.me/dibi8python

参考资料与延伸阅读 #


推荐部署与基础设施 #

上述工具想要落地生产,靠谱的基础设施是前提。dibi8 自己也在用的两个选择:

  • DigitalOcean — 新用户 60 天 $200 免费额度,14+ 全球节点。运行开源 AI 工具的首选。
  • HTStack — 香港 VPS,国内访问低延迟,dibi8.com 自己也跑在它上面,生产环境验证过。

Aff 链接 — 不增加你的成本,但能帮 dibi8 持续运营。

联盟披露 #

本文包含 DigitalOcean 和 WebShare 的联盟链接。如果你通过这些链接购买服务,我们可能会获得佣金,不会向你收取额外费用。这些推荐基于对生产抓取工作流的真实实用性。所有基准测试均为独立进行。

💬 留言讨论