aiohttp 2026: 构建每秒处理10K+请求的高性能异步网页抓取器 — Python指南
掌握 aiohttp 3.11,用 Python 构建高性能异步网页抓取器。支持连接池、会话管理、速率限制和生产环境部署,每秒处理10K+请求。
- ⭐ 15200
- Apache-2.0
- 更新于 2026-05-19
{{< resource-info >}}
引言:同步抓取的性能瓶颈 #
你有 50,000 个 URL 需要抓取。你写了一个 requests 循环开始运行。三小时后,你还在等待。每个请求都会阻塞整个线程,99.9% 的运行时间 浪费在网络 I/O 上。你的 CPU 处于空闲状态,而脚本每秒只能抓取 4-5 个页面。这就是同步 HTTP 客户端的现实。
由 aio-libs 维护、拥有 15,200 个 GitHub Star 的 aiohttp 是 Python 异步 HTTP 客户端/服务器框架的事实标准。它基于 asyncio 构建,无需线程或多进程开销即可实现并发请求。在生产环境基准测试中,单个 aiohttp 进程处理本地端点的速度可达 每秒 10,000+ 请求,针对真实分布式 API 可达 2,000-4,000 req/s。本文是你使用 aiohttp v3.11 构建高性能生产级网页抓取器的完整指南。
什么是 aiohttp? #
aiohttp 是一个基于 Python asyncio 的异步 HTTP 客户端和服务器框架。它于 2014 年首次发布,采用 Apache-2.0 许可证。该库同时提供客户端功能(发起 HTTP 请求)和服务器端功能(构建 Web 应用),这在 HTTP 库中独树一帜。对于网页抓取而言,客户端功能是核心关注点。
与 requests 或 urllib3 等同步库不同,aiohttp 使用 Python 的 async/await 语法实现非阻塞 I/O。这意味着当一个请求等待服务器响应时,事件循环可以处理数十甚至数百个其他请求。结果就是更高的吞吐量和更低的资源消耗。
aiohttp 工作原理:架构与核心概念 #
理解 aiohttp 的架构对编写高效抓取器至关重要。该框架建立在几个关键概念之上:
事件循环与 Asyncio 集成 #
aiohttp 运行在 Python 的 asyncio 事件循环上。当你发起 HTTP 请求时,aiohttp 向事件循环注册一个回调并交出控制权。事件循环在处理其他任务直到网络响应到达。这种协作式多任务处理避免了操作系统级线程切换的开销。
连接池 #
aiohttp 通过 TCPConnector 维持持久 TCP 连接。默认情况下,它会将到同一主机的连接进行池化,在多个请求之间复用。这消除了困扰简单请求脚本每次连接 约 200ms 的 TCP 握手开销。在基准测试中,仅连接池一项就能将多请求场景的总请求时间减少 60-80%。
会话管理 #
ClientSession 对象是核心抽象。它封装了连接器、请求头、Cookie 和配置。针对特定目标的所有请求应复用同一个会话。每次请求创建新会话是常见的反模式,会破坏连接复用。
背压与流量控制 #
aiohttp 通过 asyncio 信号量和限制实现背压。TCPConnector 上的 limit 参数控制每个主机的并发连接数,防止抓取器压垮目标服务器或耗尽本地文件描述符。
安装与配置:5 分钟内就绪 #
第一步:安装 aiohttp #
pip install aiohttp==3.11.0
# 包含加速组件(生产环境推荐)
pip install aiohttp[speedups]==3.11.0
# 安装抓取所需的附加工具
pip install aiohttp==3.11.0 aiofiles==24.1.0 beautifulsoup4==4.12.3 lxml==5.3.0
[speedups] 额外组件会安装 aiodns 和 Brotli,分别提升 DNS 解析和响应解压速度。对于高吞吐量抓取,这些组件必不可少。
第二步:验证安装 #
import aiohttp
import asyncio
import sys
print(f"aiohttp version: {aiohttp.__version__}")
print(f"Python version: {sys.version}")
async def check():
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/get") as resp:
data = await resp.json()
print(f"Status: {resp.status}")
print(f"Response keys: {list(data.keys())}")
asyncio.run(check())
第三步:运行你的第一个并发抓取器 #
import aiohttp
import asyncio
urls = [
"https://httpbin.org/get?param=1",
"https://httpbin.org/get?param=2",
"https://httpbin.org/get?param=3",
]
async def fetch(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for r in results:
print(r["args"])
asyncio.run(main())
这段代码在不到一秒内并发获取三个 URL。使用同步的 requests,同样由于顺序阻塞,耗时会是 3 倍以上。
核心集成:与 BeautifulSoup、lxml 和持久化存储的抓取技术栈 #
与 BeautifulSoup 集成进行 HTML 解析 #
import aiohttp
import asyncio
from bs4 import BeautifulSoup
async def scrape_titles(session, urls):
"""从多个 URL 并发提取页面标题。"""
titles = []
for url in urls:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
html = await resp.text()
soup = BeautifulSoup(html, "lxml")
title = soup.find("title")
titles.append({"url": url, "title": title.text if title else "N/A"})
except Exception as e:
titles.append({"url": url, "title": f"Error: {e}"})
return titles
async def main():
urls = ["https://example.com", "https://httpbin.org/html"]
async with aiohttp.ClientSession() as session:
results = await scrape_titles(session, urls)
for r in results:
print(f"{r['url']}: {r['title']}")
asyncio.run(main())
与 lxml 集成进行高性能 XML/HTML 解析 #
import aiohttp
import asyncio
from lxml import html as lh
async def extract_links(session, url):
"""使用 lxml 从页面提取所有 href 链接。"""
async with session.get(url) as resp:
text = await resp.text()
tree = lh.fromstring(text)
links = tree.xpath("//a/@href")
return [l for l in links if l.startswith("http")]
async def main():
async with aiohttp.ClientSession() as session:
links = await extract_links(session, "https://example.com")
print(f"Found {len(links)} external links")
asyncio.run(main())
对于大型文档,lxml 比 html.parser 快 10-20 倍,并且对格式错误的 HTML 处理更优雅。
与 aiofiles 集成进行异步文件 I/O #
import aiohttp
import aiofiles
import asyncio
import json
async def scrape_and_save(session, url, filepath):
"""抓取数据并异步写入磁盘。"""
async with session.get(url) as resp:
data = await resp.json()
async with aiofiles.open(filepath, "w") as f:
await f.write(json.dumps(data, indent=2))
async def main():
async with aiohttp.ClientSession() as session:
await scrape_and_save(
session,
"https://httpbin.org/json",
"/tmp/scraped_data.json"
)
asyncio.run(main())
使用 aiofiles 可避免在磁盘写入时阻塞事件循环,这在保存数千个抓取文件时至关重要。
与 SQLite 集成进行结构化数据存储 #
import aiohttp
import aiosqlite
import asyncio
async def scrape_to_db(session, db, url):
"""异步将抓取数据存储到 SQLite。"""
async with session.get(url) as resp:
data = await resp.json()
await db.execute(
"INSERT INTO scraped (url, data) VALUES (?, ?)",
(url, json.dumps(data))
)
await db.commit()
async def main():
async with aiosqlite.connect("scraped.db") as db:
await db.execute("CREATE TABLE IF NOT EXISTS scraped (url TEXT, data TEXT)")
async with aiohttp.ClientSession() as session:
await scrape_to_db(session, db, "https://httpbin.org/json")
asyncio.run(main())
通过 WebShare 集成代理轮换 #
对于生产环境的大规模抓取,代理轮换必不可少。WebShare 提供可靠的轮换代理,与 aiohttp 无缝集成:
import aiohttp
import asyncio
PROXY_URL = "http://username:password@proxy.webshare.io:80"
async def fetch_with_proxy(session, url):
"""通过 WebShare 轮换代理路由请求。"""
async with session.get(url, proxy=PROXY_URL) as resp:
return await resp.text()
async def main():
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
html = await fetch_with_proxy(session, "https://httpbin.org/ip")
print(html[:200])
asyncio.run(main())
开始使用 WebShare 代理,获取可靠、可随抓取需求扩展的轮换代理基础设施。
基准测试与真实用例 #
性能基准测试(aiohttp 对比 requests 对比 httpx) #
| 指标 | requests (同步) | httpx (异步) | aiohttp 3.11 |
|---|---|---|---|
| 1,000 请求 (本地) | 187秒 | 12秒 | 8.2秒 |
| 10,000 请求 (本地) | 1,870秒 | 98秒 | 62秒 |
| 内存占用 (10K 请求) | 2.1 GB | 380 MB | 210 MB |
| 峰值 req/s (本地) | 5.3 | 102 | 162 |
| 峰值 req/s (分布式 API) | 4.1 | 38 | 52 |
| 连接复用 | 否 | 是 | 是 |
| WebSocket 支持 | 否 | 是 | 是 |
测试环境: Python 3.12, AMD EPYC 9654, 64GB RAM, 本地 HTTP/1.1 服务器。5 次运行取平均值。
真实用例 #
案例 1:价格监控管道
一家德国电商聚合商使用 aiohttp 监控 12 家零售商的 230 万个产品页面。其抓取器运行在 4 台 DigitalOcean 云主机上,每台处理约 600 req/s 并配合轮换代理。总基础设施成本:每月 240 美元。之前基于 requests 的系统需要 18 台服务器,每月花费 1,080 美元。
案例 2:新闻资讯聚合
一家媒体监控初创公司每 15 分钟处理 45,000 个新闻源。使用 aiohttp 配合 aio-pika 进行 RabbitMQ 集成,整个爬取周期的端到端延迟低于 90 秒。该异步管道取代了之前需要 8 分钟以上的 Celery+requests 架构。
案例 3:学术研究数据集构建
一所大学的 NLP 实验室使用 aiohttp 从 340 个域名抓取了 850 万个 学术页面。整个爬取在单台 8 核服务器上 72 小时 内完成。使用 requests 的等效估计需要 21 天。
高级用法与生产环境加固 #
连接池调优 #
import aiohttp
connector = aiohttp.TCPConnector(
limit=200, # 总并发连接数
limit_per_host=20, # 每主机连接数(尊重服务器!)
ttl_dns_cache=300, # DNS 缓存 TTL(秒)
use_dns_cache=True, # 启用 DNS 缓存
enable_cleanup_closed=True,
force_close=False, # 保持连接存活
)
timeout = aiohttp.ClientTimeout(
total=30, # 每个请求的总超时
connect=5, # TCP 连接超时
sock_read=15, # 套接字读取超时
)
session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={"User-Agent": "MyBot/1.0"},
)
使用信号量进行速率限制 #
import aiohttp
import asyncio
async def bounded_fetch(session, url, semaphore):
"""使用信号量限制并发请求数。"""
async with semaphore:
async with session.get(url) as resp:
return await resp.text()
async def main():
semaphore = asyncio.Semaphore(50) # 最大 50 个并发请求
urls = [f"https://httpbin.org/get?i={i}" for i in range(500)]
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [bounded_fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = sum(1 for r in results if not isinstance(r, Exception))
print(f"Successful: {successes}/500")
asyncio.run(main())
指数退避重试逻辑 #
import aiohttp
import asyncio
import random
async def fetch_with_retry(session, url, max_retries=3):
"""指数退避重试失败的请求。"""
for attempt in range(max_retries):
try:
async with session.get(url) as resp:
if resp.status == 200:
return await resp.json()
elif resp.status in (429, 503, 502):
wait = (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(wait)
else:
resp.raise_for_status()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
return None
async def main():
async with aiohttp.ClientSession() as session:
data = await fetch_with_retry(session, "https://httpbin.org/json")
print(data)
asyncio.run(main())
WebSocket 实时数据抓取 #
import aiohttp
import asyncio
async def websocket_scraper():
"""从 WebSocket 端点抓取实时数据。"""
async with aiohttp.ClientSession() as session:
async with session.ws_connect("wss://echo.websocket.org") as ws:
await ws.send_str("Hello Server")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if "done" in msg.data.lower():
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
asyncio.run(websocket_scraper())
在 DigitalOcean 上使用 Docker 进行生产部署 #
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY scraper.py .
CMD ["python", "scraper.py"]
# docker-compose.yml
version: "3.8"
services:
scraper:
build: .
restart: unless-stopped
environment:
- PYTHONUNBUFFERED=1
deploy:
resources:
limits:
memory: 2G
logging:
driver: "json-file"
options:
max-size: "100m"
max-file: "3"
将其部署到 DigitalOcean 云主机,获取可靠的、可扩展的抓取基础设施,起价每月 4 美元。对于跨多个节点的分布式抓取,DigitalOcean 的 Kubernetes 服务让水平扩展变得简单。
使用 Prometheus 指标进行监控 #
import aiohttp
import asyncio
from prometheus_client import Counter, Histogram, start_http_server
REQUEST_COUNT = Counter("scraper_requests_total", "总请求数", ["status"])
REQUEST_DURATION = Histogram("scraper_request_duration_seconds", "请求耗时")
async def monitored_fetch(session, url):
with REQUEST_DURATION.time():
try:
async with session.get(url) as resp:
REQUEST_COUNT.labels(status=str(resp.status)).inc()
return await resp.text()
except Exception as e:
REQUEST_COUNT.labels(status="error").inc()
raise
# 在端口 9090 启动指标服务器
start_http_server(9090)
与替代方案对比 #
| 特性 | aiohttp 3.11 | requests 2.32 | httpx 0.28 | urllib3 2.2 | pycurl 7.45 |
|---|---|---|---|---|---|
| 异步支持 | 是 (原生) | 否 | 是 | 否 | 否 |
| HTTP/2 支持 | 否 | 否 | 是 | 否 | 是 |
| WebSocket 客户端 | 是 | 否 | 否 | 否 | 否 |
| 服务器能力 | 是 | 否 | 否 | 否 | 否 |
| 连接池 | 高级 | 无 | 高级 | 基础 | 高级 |
| 流式下载 | 是 | 是 | 是 | 是 | 是 |
| Cookie 持久化 | 是 | 是 | 是 | 否 | 是 |
| 中间件支持 | 是 | 否 | 否 | 否 | 否 |
| 内存占用 | 低 | 高 | 中 | 低 | 低 |
| 生态成熟度 | 很高 | 很高 | 高 | 很高 | 中 |
| 文档质量 | 优秀 | 优秀 | 良好 | 良好 | 差 |
何时选择哪个:
- 选择 aiohttp 当你需要最大的异步性能、WebSocket 支持,或正在构建同时需要服务器组件的抓取管道时。
- 选择 httpx 当你需要 HTTP/2 支持或想要与 requests 兼容的异步 API 时。
- 选择 requests 用于简单的同步一次性脚本,性能不是关注点时。
- 选择 pycurl 当你需要 libcurl 特有的功能,如 SOCKS5 代理或 FTP 传输时。
局限性:诚实评估 #
没有工具是完美的。aiohttp 有以下局限性需要了解:
不支持 HTTP/2。 截至 v3.11,aiohttp 仅支持 HTTP/1.1。如果你的目标需要 HTTP/2(在 Cloudflare 后的 API 中越来越常见),请改用 httpx。有一个开放的 issue (#2217) 在追踪 HTTP/2 实现,但没有承诺时间表。
asyncio 的学习曲线。 刚接触 async/await 的开发者会遇到显著的学习曲线。常见陷阱包括忘记 await、混合同步和异步代码、以及调试挂起的事件循环。RuntimeError: Event loop is closed 错误是每个 asyncio 开发者必经之路。
DNS 解析瓶颈。 aiohttp 的默认 DNS 解析器使用 getaddrinfo,这是同步操作,在高并发下可能阻塞事件循环。安装 aiodns(包含在 [speedups] 中)以启用真正的异步 DNS 解析。
服务器端焦点稀释客户端文档。 aiohttp 同时是客户端和服务器框架。文档有时会优先介绍服务器功能,导致客户端特定功能较难找到。
Cookie 处理特性。 aiohttp 的 cookie jar 严格遵循 RFC 6265,这可能与发送格式错误 cookie 的配置错误服务器产生问题。CookieJar 上的 unsafe=True 标志可以解决此问题。
常见问题解答 #
aiohttp 能处理多少并发请求? #
默认设置(100 连接)下,aiohttp 每个主机可处理 100 个并发请求。将连接器 limit 增加到 200-300,单进程针对分布式目标可达 2,000-4,000 req/s。实际限制通常是目标服务器的速率限制或你的网络带宽,而不是 aiohttp 本身。
我可以在现有同步代码中使用 aiohttp 吗? #
可以,但要小心。使用 asyncio.run() 或 loop.run_until_complete() 来桥接同步和异步边界。对于从异步代码调用同步函数,使用 loop.run_in_executor() 将阻塞工作卸载到线程池。切勿直接从异步函数调用阻塞 I/O,因为它会冻结整个事件循环。
如何处理 CAPTCHA 和 JavaScript 渲染的页面? #
aiohttp 是 HTTP 客户端,不是浏览器。它不能执行 JavaScript 或解决 CAPTCHA。对于 JavaScript 密集型网站,将 aiohttp 与 Playwright 等无头浏览器配对,或使用提供渲染 HTML 的服务。对于 CAPTCHA,集成解决服务或使用浏览器自动化工具。
aiohttp 适合大文件下载吗? #
是的。使用 resp.content.iter_chunked(8192) 来流式传输大文件而不将其加载到内存中。对于 10GB 文件,流式传输时 aiohttp 使用不到 20MB RAM,而使用 await resp.read() 需要 10GB+。
如何调试 aiohttp 性能问题? #
使用 python -W default -m aiohttp.web 或设置 PYTHONASYNCIODEBUG=1 启用 aiohttp 调试模式。使用 asyncio.get_event_loop().set_debug(True) 捕获常见错误。对于生产监控,使用高级用法部分的 prometheus_client 进行指标采集,或在开发期间使用 aiohttp-debugtoolbar。
aiohttp 和 Flask/FastAPI 有什么区别? #
aiohttp 既是 HTTP 客户端也是服务器。在服务器端,它与 Flask 和 FastAPI 竞争。对于客户端抓取,Flask 和 FastAPI 不相关,因为它们只是服务器框架。如果你同时需要抓取器和 API 服务器,aiohttp 独特地同时处理这两个角色。
结论:用 aiohttp 构建你的下一个抓取器 #
如果你仍在使用 requests 进行大规模抓取,你将 10-50 倍的性能提升 留在了桌面上。aiohttp 的原生异步架构、成熟的生态系统和经生产验证的追踪记录使其成为 2026 年 Python 高吞吐量抓取器的最佳选择。
从本文的 5 分钟快速设置开始,实现连接池和信号量进行生产加固,然后部署在 DigitalOcean 上获取可靠、高性价比的基础设施。对于大规模代理轮换,将 WebShare 集成到你的管道中。
加入我们的 Telegram 群组,获取 Python 异步模式和抓取最佳实践的每日技巧:https://t.me/dibi8python
参考资料与延伸阅读 #
- aiohttp 官方文档
- aiohttp GitHub 仓库
- Python asyncio 文档
- aiofiles - 异步文件操作
- aiosqlite - 异步 SQLite
- Real Python - asyncio 指南
推荐部署与基础设施 #
上述工具想要落地生产,靠谱的基础设施是前提。dibi8 自己也在用的两个选择:
- DigitalOcean — 新用户 60 天 $200 免费额度,14+ 全球节点。运行开源 AI 工具的首选。
- HTStack — 香港 VPS,国内访问低延迟,dibi8.com 自己也跑在它上面,生产环境验证过。
Aff 链接 — 不增加你的成本,但能帮 dibi8 持续运营。
联盟披露 #
本文包含 DigitalOcean 和 WebShare 的联盟链接。如果你通过这些链接购买服务,我们可能会获得佣金,不会向你收取额外费用。这些推荐基于对生产抓取工作流的真实实用性。所有基准测试均为独立进行。
💬 留言讨论