Comprehensive guide to Hyperliquid, the fully on-chain perpetual DEX processing $2B+ daily volume with 100+ trading pairs, up to 50x leverage, HyperEVM smart contracts, and Python SDK for bot integration.
- Proprietary
- 更新于 2026-05-20
{{< resource-info >}}
**Date: ** 2026-05-19
**Category: ** AI Trading
**Tags: ** Hyperliquid, perpetual DEX, on-chain trading, leverage trading, trading bot, DeFi, HyperEVM
**Read Time: ** 18 minutes
简介:为什么 Hyperliquid 主导 Perp DEX 格局自 2023 年以来,去中心化的永续期货交易格局发生了翻天覆地的变化,而这一转变的中心是 Hyperliquid,这是一款完全链上订单簿的永续 DEX,在 2026 年全年处理的日交易量超过20 亿美元。与依赖流动性池并遭受滑点和无常损失的基于传统自动做市商 (AMM) 的 DEX 不同,Hyperliquid 带来了熟悉的中央限价订单簿 (CLOB)经验融入区块链,将中心化交易所的执行质量与 DeFi 的自我托管和透明度优势结合起来。Hyperliquid 的推出旨在消除交易者在集中式和去中心化交易场所之间进行选择时面临的妥协,现已发展到支持主要加密货币的 100 多个交易对,在特定市场上提供高达 50 倍的杠杆。 该平台与 HyperEVM(其专门构建的 EVM 兼容执行层)的本机集成为复杂的交易策略、MEV 抵抗执行和无缝智能合约可组合性打开了大门,这在传统的永久 DEX 上是不可能实现的。对于算法交易者和机器人开发人员来说,Hyperliquid 代表了一种范式转变。 该平台全面的 Python SDK 与高性能 REST 和 WebSocket API 相结合,可实现亚秒级下单、实时头寸管理和全自动策略执行。 无论您是构建做市机器人、趋势跟踪系统还是复杂的多交易所套利策略,Hyperliquid 都能提供基础设施骨干,可以在不牺牲去中心化的情况下处理机构级交易量。在这份综合指南中,我们将探讨 2026 年 Hyperliquid 交易的各个方面——从了解核心架构到构建可投入生产的交易机器人。 最后,您将拥有将 Hyperliquid 集成到算法交易堆栈中所需的知识和代码模板。— #
了解 Hyperliquid 的核心架构### CLOB 优势:为什么订单簿对于永续合约很重要基于 vAMM(虚拟 AMM)等 AMM 模型构建的传统永续 DEX 存在固有的局限性:流动性集中在当前价格附近,大额订单滑点严重,流动性提供者面临无常损失。 Hyperliquid 的 中央限价订单簿 (CLOB) 架构完全消除了这些问题。在 CLOB 系统中,交易者以特定价格下限价单,创建可见的深度图表,准确显示每个价格水平存在多少流动性。 这使得:- 点差收窄 — 做市商直接竞争,将买卖价差缩小至与币安和 Bybit 相当的水平 #
- 零滑点限价订单 — 大额订单在匹配时完全按照指定价格(或更好)执行
- 透明的价格发现 — 所有订单在链上可见,防止隐藏操纵
- 高效的资金利用——无需将资金锁定在流动性池中; 仅当订单被执行时才使用资金Hyperliquid的订单簿完全结算在其专有的L1链上,实现区块时间低于1秒并支持每秒10,000+笔交易。 这种性能消除了历史上困扰链上交易场所的延迟损失。### HyperEVM:智能合约满足高频交易2024 年底 HyperEVM 的推出对于 Hyperliquid 来说是一个分水岭。 这个与 EVM 兼容的执行层支持:- Smart contract wallets — Programmable account logic for advanced access control and automated execution
- Composable DeFi strategies — Integration with lending protocols, yield optimizers, and other on-chain primitives
- Custom order types — Conditional orders, trailing stops, and TWAP execution via smart contract automation
- Gasless transactions — Meta-transaction support where fees can be paid in any token or sponsored by third parties对于机器人开发人员来说,HyperEVM 意味着您可以部署直接与交易所基础设施交互的智能合约,从而实现在中心化交易所或传统 DEX 上不可能实现的策略。—
设置您的超流动性交易环境### 先决条件和 API 密钥注册在编写任何代码之前,您需要设置您的交易环境。 Hyperliquid 使用基于钱包的身份验证 - 无需注册 API 密钥。 您的以太坊钱包(通过 EIP-712 签名)充当您的身份和身份验证机制。所需工具: #
- Python 3.10 或更高版本
- 带有私钥的以太坊钱包
- USDC 在 Arbitrum 上用于存款(Hyperliquid 的存款层)### 安装 Hyperliquid Python SDK官方Python SDK提供了对所有交换功能的全面访问。 安装很简单:```` bas h
python -m venv hyperliquid-env 源 hyperliquid-env/bin/activate
安装官方SDK #
pip 安装 hyperliquid-python-sdk
为机器人开发安装额外的依赖项 #
pip install websockets aiohttp pandas numpy python-dotenv
`.env` 文件来安全地存储您的配置:````
bas
h
# .env - 永远不要将其提交给版本控制
PRIVATE_KEY=your_ethereum_private_key_here
WALLET_ADDRESS=0x_your_wallet_address
测试网=true
````### Basic Connection and Authentication这是建立与 Hyperliquid 的经过身份验证的连接的基础代码````
bas
h
# .env - 永远不要将其提交给版本控制
PRIVATE_KEY=your_ethereum_private_key_here
WALLET_ADDRESS=0x_your_wallet_address
测试网=true
```
yperliquid
.utils 导入常量# 加载环境变量
加载_dotenv()超级液体交易者类:
"""生产就绪的超液体交易客户端。"""
def __init__(self, use_testnet=True):
self.private_key = os.getenv('PRIVATE_KEY')
self.wal````
pytho
n
导入操作系统
导入异步
从 dotenv 导入 load_dotenv
从 hyperliquid.exchange 导入 Exchange
从 hyperliquid.info 导入信息
从 hyperliquid.utils 导入常量
# 加载环境变量
加载_dotenv()
超级液体交易者类:
"""生产就绪的超液体交易客户端。"""
def __init__(self, use_testnet=True):
self.private_key = os.getenv('PRIVATE_KEY')
self.wallet_address = os.getenv('WALLET_ADDRESS')
# 根据环境选择端点
如果使用_testnet:
self.base_url = 常量.TESTNET_API_URL
其他:
self.base_url = 常量.MAINNET_API_URL
# 初始化交换和信息客户端
self.交换 = 交换(
self.钱包地址,
self.private_key,
self.base_url
)
self.info = 信息(self.base_url)
print(f"已连接到 Hyperliquid {'Testnet' if use_testnet else 'Mainnet'}")
print(f"钱包:{self.wallet_address}")
def get_account_summary(self):
"""获取全面的帐户信息。"""
user_state = self.info.user_state(self.wallet_address)
account_value = float(user_state['marginSummary']['accountValue'])
Total_margin_used = float(user_state['marginSummary']['totalMarginUsed'])
可提款 = float(user_state['可提款'])
print(f"账户价值: ${account_value: ,.2f}")
print(f"已用保证金: ${total_margin_used: ,.2f}")
print(f"可提取: ${可提取: ,.2f}")
返回用户状态
# 初始化交易者
交易者 = HyperliquidTrader(use_testnet=True)
trader.get_account_summary()
```'名称':资产['名称'],
'max_leverage': 资产['maxLeverage'],
'sz_decimals':资产['szDecimals'],
'only_isolated': asset.get('onlyIsolated', False)
})
print(f"\n可用市场:{len(资产)}")
for asset in assets[:10]: # 显示前 10 个
print(f" {asset['name']}: {asset['max_leverage']}x 最大杠杆")
返还资产# 用法
资产 = trader.get_all_assets()
````---
## 构建一个可投入生产的交易机器人### 通过 WebSocket 获取实时市场数据对于算法交易来说,低延迟的市场数据至关重要。 Hyperliquid 的 WebSocket API 提供实时订单簿更新、交易和用户特定的填充:````蟒蛇
导入 json
导入网络套接字HyperliquidWebSocketFeed 类:
"""Hyperliquid 的高性能 WebSocket 数据源。"""
def __init__(自身):
self.ws_url = "wss: //api.hyperliquid.xyz/ws"
自我订阅 = {}
self.orderbook_cache = {}
自运行 = False
异步定义连接(自身):
"""建立具有自动重新连接的 WebSocket 连接。"""
而true实:
尝试:
与 websockets.connect(self.ws_url) 异步作为 ws:
print("WebSocket 已连接")
self.ws = ws
自我运行= True
# 重新连接时重新订阅以前的频道
对于 self.subscriptions.values() 中的 sub:
等待 ws.send(json.dumps(sub))
等待 self._listen()
除了异常 e:
print(f"WebSocket 错误:{e}。5 秒后重新连接...")
等待 asyncio.sleep(5)
异步 def _listen(self):
"""处理传入消息。"""
异步 f```
pytho
n
def get_all_assets(自身):
"""检索所有可用的永久市场。"""
元 = self.info.meta()
宇宙=元['宇宙']
资产=[]
对于宇宙中的资产:
资产.追加({
'名称':资产['名称'],
'max_leverage': 资产['maxLeverage'],
'sz_decimals':资产['szDecimals'],
'only_isolated': asset.get('onlyIsolated', False)
})
print(f"\n可用市场:{len(资产)}")
for asset in assets[:10]: # 显示前 10 个
print(f" {asset['name']}: {asset['max_leverage']}x 最大杠杆")
返还资产
# 用法
资产 = trader.get_all_assets()
```
r
a in level[1]],
'时间戳': data.get('时间', 0)
}
异步 def subscribe_orderbook(self, coin):
"""订阅特定市场的实时订单簿。"""
子 = {
"方法": "订阅",
"订阅":{"类型":"l2Book","硬币":硬币}
}
self.subscriptions[f"book_{coin}"] = sub
如果自运行:
等待 self.ws.send(json.dumps(sub))
print(f"订阅了 {coin} 订单簿")
异步 def subscribe_trades(self, coin):
"""订阅实时贸易动态。"""
子 = {
"方法": "订阅",
"订阅":{"类型":"交易","硬币":硬币}
}
self.subscriptions[f"trades_{coin}"] = sub
如果自运行:
等待 self.ws.send(json.dumps(sub))# 用法
异步 def main():
feed = HyperliquidWebSocketFeed()
等待 feed.subscribe_orderbook("BTC")
等待 feed.subscribe_trades("BTC")
等待提要。```
pytho
n
导入 json
导入网络套接字
HyperliquidWebSocketFeed 类:
"""Hyperliquid 的高性能 WebSocket 数据源。"""
def __init__(自身):
self.ws_url = "wss: //api.hyperliquid.xyz/ws"
自我订阅 = {}
self.orderbook_cache = {}
自运行 = False
异步定义连接(自身):
"""建立具有自动重新连接的 WebSocket 连接。"""
而true实:
尝试:
与 websockets.connect(self.ws_url) 异步作为 ws:
print("WebSocket 已连接")
self.ws = ws
自我运行= True
# 重新连接时重新订阅以前的频道
对于 self.subscriptions.values() 中的 sub:
等待 ws.send(json.dumps(sub))
等待 self._listen()
除了异常 e:
print(f"WebSocket 错误:{e}。5 秒后重新连接...")
等待 asyncio.sleep(5)
异步 def _listen(self):
"""处理传入消息。"""
self.ws 中的消息异步:
msg = json.loads(消息)
if msg.get("channel") == "l2Book":
等待 self._handle_orderbook(msg['data'])
elif msg.get("channel") == "交易":
等待 self._handle_trades(msg['data'])
elif msg.get("channel") == "userFills":
等待 self._handle_fills(msg['data'])
异步 def _handle_orderbook(self, 数据):
"""处理 L2 订单簿更新。"""
硬币 = 数据['硬币']
级别 = 数据['级别']
self.orderbook_cache[硬币] = {
'bids': [{'px': float(b['px']), 'sz': float(b['sz'])} 对于级别 [0] 中的 b],
'asks': [{'px': float(a['px']), 'sz': float(a['sz'])} 对于级别 [1]] 中的 a,
'时间戳': data.get('时间', 0)
}
异步 def subscribe_orderbook(self, coin):
"""订阅特定市场的实时订单簿。"""
子 = {
"方法": "订阅",
"订阅":{"类型":"l2Book","硬币":硬币}
}
self.subscriptions[f"book_{coin}"] = sub
如果自运行:
等待 self.ws.send(json.dumps(sub))
print(f"订阅了 {coin} 订单簿")
异步 def subscribe_trades(self, coin):
"""订阅实时贸易动态。"""
子 = {
"方法": "订阅",
"订阅":{"类型":"交易","硬币":硬币}
}
self.subscriptions[f"trades_{coin}"] = sub
如果自运行:
等待 self.ws.send(json.dumps(sub))
# 用法
异步 def main():
feed = HyperliquidWebSocketFeed()
等待 feed.subscribe_orderbook("BTC")
等待 feed.subscribe_trades("BTC")
等待 feed.connect()
# asyncio.run(main())
```
p
x
= float(位置['entryPx'])
current_px = float(位置['markPx'])
大小 = 浮动(位置['szi'])
pnl = (current_px -entry_px) * 大小,如果大小 > 0,否则 (entry_px - current_px) * abs(大小)
pnl_pct = ((当前像素 / 条目像素) - 1) * 100 * (如果大小 > 0 则为 1,否则为 -1)
active_positions.append({
'硬币':位置['硬币'],
"尺寸":尺寸,
'entry_price':entry_px,
'mark_price': current_px,
'unrealized_pnl':pnl,
'pnl_百分比':pnl_pct,
'杠杆': float(位置['杠杆']['价值']),
'margin_used': float(位置['marginUsed']),
'liquidation_price': float(position.get('liquidationPx', 0))
})
返回活跃位置
def set_leverage(self, coin: str, 杠杆: int):
"""为特定市场设置杠杆。"""
结果 = self.exchange.update_leverage(杠杆, 币)
print(f"{coin} 的杠杆设置为 {杠杆}x")
返回结果
def close_position(self, coin: str):
"""关闭特定市场的全部头寸。"""
位置 = self.get_positions()
对于职位中的 pos:
如果 pos['coin'] == coin 并且 pos['size'] != 0:
is_buy = pos['size'] < 0 # 买入平空,卖出平多
sz = abs(pos['尺寸'])
返回 self.place_market_order(coin, is_buy, sz)
print(f"{coin} 没有未平仓头寸")
返回无# 使用示例
# trader.set_leverage("BTC", 10)
# trader.place_limit_order("BTC", True, 0.1, 65000.0, "Gtc")
# 仓位 = trader.get_positions()
````### 完整的趋势跟踪机器人示例这是一个完整的、可投入生产的趋势跟踪机器人,它结合了所有组件:````蟒蛇
导入时间
将 pandas 导入为 pd
将 numpy 导入为 np
从日期时间导入日期时间,时间增量TrendFollowingBot 类:
"""Hyperliquid 的 EMA 交叉趋势跟踪机器人。"""
def __init__(self, 交易者: HyperliquidTrader, 硬币: str = "BTC"):
自我交易者 = 交易者
self.coin = 硬币
self.fast_ema_period = 9
self.slow_ema_period = 21
self.risk_per_trade = 0.02 # 账户的 2%
self.in_position = False
self.position_side = 无
defcalculate_ema(self, 价格: pd.Series, period: int) -> float:
"""计算指数移动平均线。"""
返回prices.ewm(span=period, adjustment=False).mean().iloc[-1]
def fetch_recent_prices(self, Lookback: int = 100) -> pd.Series:
"""从蜡烛数据中获取最近的市场价格。"""
# 对历史蜡烛使用 REST API
蜡烛 = self.trader.info.candles(
硬币=自我.硬币,
间隔="5m",
startTime=int((datetime.now() - timedelta(小时=12)).timestamp() * 1000),
endTime=int(datetime.now().timestamp() * 1000)
)
````蟒蛇
def place_market_order(self, coin: str, is_buy: bool, sz: float):
"""执行具有滑点保护的市价订单。"""
order_type = {"limit": {"tif": "Ioc"}} # 立即或取消
结果 = self.exchange.order(
硬币,
是_购买,
深圳,
0, # 市场IOC价格0
订单类型,
减少_仅=false
)
print(f"市场{'买入' if is_buy else '卖出'} {sz} {coin}")
print(f"状态:{结果['状态']}")
如果结果中有'response'并且结果['response']中有'data':
状态=结果['响应']['数据']['状态']
对于状态中的状态:
如果状态为"已填写":
填充=状态['已填充']
print(f"填充:{fill['totalSz']} @ avg {fill['avgPx']}")
返回结果
def place_limit_order(self, coin: str, is_buy: bool, sz: float,
px:浮动,tif:str ="Gtc"):
"""下达指定有效期的限价订单。
TIF 选项:
- Gtc:取消前有效
- Ioc:立即或取消
- 霍克:填充或杀死
”“”
order_type = {"limit": {"tif": tif}}
结果 = self.exchange.order(
硬币,
是_购买,
深圳,
像素,
订单类型,
减少_仅=false
)
print(f"限制{'买入' if is_buy else '卖出'} {sz} {coin} @ {px}")
返回结果
def place_stop_loss_order(self, coin: str, is_buy: bool, sz: float,
触发像素:浮动,限制像素:浮动):
"""以触发价格下止损单。"""
订单类型 = {
"触发":{
"triggerPx":str(trigger_px),
"isMarket":确实,
"tpsl":"sl"
}
}
结果 = self.exchange.order(
硬币,
是_购买,
深圳,
限制像素,
订单类型,
减少_仅=true
)
print(f"止损{'买入' if is_buy else '卖出'} {sz} {coin}")
print(f"触发器:{trigger_px},限制:{limit_px}")
返回结果
```
ent
_px
= float(mids.get(self.coin, 0))
如果当前_px == 0:
print("无法获取当前价格")
返回
# 计算仓位大小
sz = self.calculate_position_size(current_px)
如果信号=="买入":
self.trader.place_market_order(self.coin, True, sz)
self.position_side = "长"
elif 信号 =="卖出":
self.trader.place_market_order(self.coin, False, sz)
self.position_side = "短"
self.in_position = True
def run(self, check_interval: int = 60):
"""带有信号检查的主机器人循环。"""
print(f"启动 {self.coin} 的趋势机器人")
print(f"快速 EMA: {self.fast_ema_period}, 慢速 EMA: {self.slow_ema_period}")
而true实:
尝试:
信号 = self.check_signals()
print(f"[{datetime.now()}] 信号:{signal}")
self.execute_signal(信号)
# 打印当前位置
持仓 = self.trader.get_positions()
对于职位中的 pos:
print(f" {pos['coin']}: {pos['size']} | "
f"盈亏:${pos['unrealized_pnl']:+.2f} ({pos['pnl_percent']:+.2f}%)")
时间.睡眠(检查间隔)
除了异常 e:
print(f"机器人循环错误:{e}")
时间.睡眠(10)# 启动机器人
# bot = TrendFollowingBot(交易者, "BTC")
# 机器人.run()
````---
## 高级超液体 API 集成### REST API:批量操作和历史数据对于需要历史分析的策略,REST API 提供全面的数据访问:````蟒蛇
def get_funding_rates(self, coin: str = None):
"""获取所有或特定市场的当前融资利率。"""
元 = self.info.meta()
资产=元['宇宙']
资金数据 = []
for asset in assets[:20]: # 检查前 20 个
名称 = 资产['名称']
ctx = self.info.funding_history(名称, 1)
如果ctx:
资金数据.append({
'硬币':名称,
'funding_rate': float(ctx[0].get('fundingRate', 0)),
'predicted_rate': float(ctx[0].get('predictedFundingRate'```
pytho
n
def get_positions(自身):
"""获取所有未平仓头寸以及盈亏详细信息。"""
user_state = self.info.user_state(self.wallet_address)
位置 = user_state.get('assetPositions', [])
活跃位置 = []
对于职位中的 pos:
位置 = pos['位置']
Entry_px = float(位置['entryPx'])
current_px = float(位置['markPx'])
大小 = 浮动(位置['szi'])
pnl = (current_px -entry_px) * 大小,如果大小 > 0,否则 (entry_px - current_px) * abs(大小)
pnl_pct = ((当前像素 / 条目像素) - 1) * 100 * (如果大小 > 0 则为 1,否则为 -1)
active_positions.append({
'硬币':位置['硬币'],
"尺寸":尺寸,
'entry_price':entry_px,
'mark_price': current_px,
'unrealized_pnl':pnl,
'pnl_百分比':pnl_pct,
'杠杆': float(位置['杠杆']['价值']),
'margin_used': float(位置['marginUsed']),
'liquidation_price': float(position.get('liquidationPx', 0))
})
返回活跃位置
def set_leverage(self, coin: str, 杠杆: int):
"""为特定市场设置杠杆。"""
结果 = self.exchange.update_leverage(杠杆, 币)
print(f"{coin} 的杠杆设置为 {杠杆}x")
返回结果
def close_position(self, coin: str):
"""关闭特定市场的全部头寸。"""
位置 = self.get_positions()
对于职位中的 pos:
如果 pos['coin'] == coin 并且 pos['size'] != 0:
is_buy = pos['size'] < 0 # 买入平空,卖出平多
sz = abs(pos['尺寸'])
返回 self.place_market_order(coin, is_buy, sz)
print(f"{coin} 没有未平仓头寸")
返回无
# 使用示例
# trader.set_leverage("BTC", 10)
# trader.place_limit_order("BTC", True, 0.1, 65000.0, "Gtc")
# 仓位 = trader.get_positions()
``资产:
子 = {
"方法": "订阅",
"订阅":{"类型":"allMids"}
}
等待 ws.send(json.dumps(sub))
子 = {
"方法": "订阅",
"订阅":{"类型":"l2Book","硬币":资产}
}
等待 ws.send(json.dumps(sub))
print(f"订阅了 {asset}")
异步def运行(自我):
"""具有多资产管理的主循环。"""
与 websockets.connect(self.ws_url) 异步作为 ws:
等待 self.subscribe_all(ws)
ws 中的消息异步:
msg = json.loads(消息)
频道 = msg.get("频道")
如果通道=="allMids":
对于资产,价格在 msg['data'].items() 中:
如果资产在 self.data_cache 中:
self.data_cache[资产]['mid'] = float(价格)
elif 频道 == "l2Book":
硬币 = msg['数据']['硬币']
如果 self.data_cache 中有硬币:
出价 = msg['数据']['级别'][0]
询问 = msg['数据']['级别'][1]
如果出价和询问:
best_bid = float(出价[0]['px'])
best_ask = float(询问[0]['px'])
self.data_cache[coin]['spread'] = best_ask - best_bid
# 调用注册的处理程序
if self.handlers 中的通道:
等待 self.handlers[通道](msg)# 用法
# 资产 = ["BTC", "ETH", "SOL", "AVAX", "ARB"]
# 管理器 = MultiAssetWebSocketManager(资产)
# asyncio.run(manager.run())
````---
## 超流动性的风险管理### 逐仓保证金与全仓保证金Hyperliquid 支持逐仓和全仓模式:```
pytho
n
def set_cross_margin(self, coin: str):
"""Enable cross-margin mode for a market."""
result = self.exchange.update_isolated_margin(coin, False, None)
print(f"Cross margin enabled for {coin}")
return result
def set_isolated_margin(self, coin: str, leverag```
pytho
n
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class TrendFollowingBot:
"""EMA crossover trend-following bot for Hyperliquid."""
def __init__(self, trader: HyperliquidTrader, coin: str = "BTC"):
self.trader = trader
self.coin = coin
self.fast_ema_period = 9
self.slow_ema_period = 21
self.risk_per_trade = 0.02 # 2% of account
self.in_position = False
self.position_side = None
def calculate_ema(self, prices: pd.Series, period: int) -> float:
"""Calculate Exponential Moving Average."""
return prices.ewm(span=period, adjust=False).mean().iloc[-1]
def fetch_recent_prices(self, lookback: int = 100) -> pd.Series:
"""Fetch recent market prices from candle data."""
# Using REST API for historical candles
candles = self.trader.info.candles(
coin=self.coin,
interval="5m",
startTime=int((datetime.now() - timedelta(hours=12)).timestamp() * 1000),
endTime=int(datetime.now().timestamp() * 1000)
)
closes = [float(c['c']) for c in candles if 'c' in c]
return pd.Series(closes)
def calculate_position_size(self, entry_px: float) -> float:
"""Calculate position size based on risk parameters."""
account = self.trader.get_account_summary()
account_value = float(account['marginSummary']['accountValue'])
risk_amount = account_value * self.risk_per_trade
# Risk-based sizing: size = risk_amount / (estimated stop distance)
stop_distance = entry_px * 0.015 # 1.5% stop
position_size = risk_amount / stop_distance
# Round to appropriate decimal places
return round(position_size, 4)
def check_signals(self) -> str:
"""Check for EMA crossover signals."""
prices = self.fetch_recent_prices()
if len(prices) < self.slow_ema_period + 5:
return "HOLD"
fast_ema = self.calculate_ema(prices, self.fast_ema_period)
slow_ema = self.calculate_ema(prices, self.slow_ema_period)
# Previous values for crossover detection
prev_fast = prices.ewm(span=self.fast_ema_period).mean().iloc[-2]
prev_slow = prices.ewm(span=self.slow_ema_period).mean().iloc[-2]
# Bullish crossover: fast crosses above slow
if prev_fast <= prev_slow and fast_ema > slow_ema:
return "BUY"
# Bearish crossover: fast crosses below slow
if prev_fast >= prev_slow and fast_ema < slow_ema:
return "SELL"
return "HOLD"
def execute_signal(self, signal: str):
"""Execute trading signal with proper risk management."""
if signal == "HOLD":
return
# Close existing position if opposite signal
if self.in_position:
if (signal == "BUY" and self.position_side == "SHORT") or \
(signal == "SELL" and self.position_side == "LONG"):
self.trader.close_position(self.coin)
self.in_position = False
time.sleep(1) # Wait for fill
else:
return # Same direction, hold
# Get current price for sizing
mids = self.trader.info.all_mids()
current_px = float(mids.get(self.coin, 0))
if current_px == 0:
print("Could not fetch current price")
return
# Calculate position size
sz = self.calculate_position_size(current_px)
if signal == "BUY":
self.trader.place_market_order(self.coin, True, sz)
self.position_side = "LONG"
elif signal == "SELL":
self.trader.place_market_order(self.coin, False, sz)
self.position_side = "SHORT"
self.in_position = True
def run(self, check_interval: int = 60):
"""Main bot loop with signal checking."""
print(f"Starting trend bot for {self.coin}")
print(f"Fast EMA: {self.fast_ema_period}, Slow EMA: {self.slow_ema_period}")
while True:
try:
signal = self.check_signals()
print(f"[{datetime.now()}] Signal: {signal}")
self.execute_signal(signal)
# Print current positions
positions = self.trader.get_positions()
for pos in positions:
print(f" {pos['coin']}: {pos['size']} | "
f"PnL: ${pos['unrealized_pnl']:+.2f} ({pos['pnl_percent']:+.2f}%)")
time.sleep(check_interval)
except Exception as e:
print(f"Error in bot loop: {e}")
time.sleep(10)
# Launch bot
# bot = TrendFollowingBot(trader, "BTC")
# bot.run()
```r
Arbitrum confirmation (typically under 5 minutes)对于程序化存款,直接使用桥梁合约:````蟒蛇
def Deposit_usdc(self, amount_usdc: float):
"""将 USDC 从 Arbitrum 存入 Hyperliquid。"""
# 小数点后 6 位的金额 (USDC)
amount_wei = int(amount_usdc * 1_000_000)
结果 = self.exchange.spot_transfer(
to=self.wallet_address,
金额=str(金额_wei),
资产="USDC"
)
print(f"已存入 {amount_usdc} USDC")
返回结果
````### Hyperliquid 的交易费用是多少?Hyperliquid 使用**制造商-接受者费用模型**:
- **Maker 费用**:-0.002%(负数 — 您因提供流动性而获得回扣)
- **接受者费用**:0.035%这些费用是永久 DEX 领域中最具竞争力的费用之一。 大交易量交易者可以通过 Hyperliquid VIP 计划获得额外费用折扣。### 超液体安全吗? 有哪些风险?Hyperliquid 作为**非托管交易所**运行——您的资金保留在由您的私钥控制的智能合约中。 然而,与任何 DeFi 协议一样,也存在风险:1. **智能合约风险**:在审计过程中,错误可能会影响资金安全
2. **强平风险**:高杠杆头寸可能会在波动期间被强平
3. **预言机风险**:标记价格依赖于预言机提要,理论上可能会出现偏差
4. **过桥风险**:存款通过 Arbitrum 过桥基础设施传输The platform has processed over $500 billion in cumulative volume since inception with no major security incidents. 维持保险基金以防止社会化损失。### 在上线之前如何回测策略?Hyperliquid 通过其 API 提供**免费历史数据**。 这是一个回测框架:````蟒蛇
def fetch_historical_candles(self, 硬币: str, 间隔: str = "1h",
天:int = 30) -> pd.DataFrame:
"""获取历史 OHLCV 数据进行回测。"""
end_time = int(datetime.now().timestamp() * 1000)
start_time = int((datetime.now() - timedelta(天=天)).timestamp() * 1000)
蜡烛= self.info.蜡烛(
硬币=硬币,
间隔=间隔,
开始时间=开始时间,
结束时间=结束时间
)
df = pd.DataFrame(蜡烛)
df['时间戳'] = pd.to_datetime(df['t'], 单位='毫秒')
df['open'] = df['o'].astype(float)
df['高'] = df['h'].astype(float)
df['low'] = df['l'].astype(float)
df['close'] = df['c'].astype(float)
df['体积'] = df['v'].astype(float)
return df[['时间戳', '开盘价', '最高价', '最低价', '收盘价', '交易量']]
````````蟒蛇
类回测引擎:
"""用于超液体策略的简单回测引擎。"""
def __init__(自身,数据:pd.DataFrame,initial_capital:float = 10000):
self.data = 数据
self.capital = 初始资本
自身位置 = 0
自我交易 = []
def run_ema_strategy(self, 快: int = 9, 慢: int = 21):
"""运行 EMA 交叉回测。"""
self.data['fast_ema'] = self.data['close'].ewm(span=fast).mean()
self.data['slow_ema'] = self.data['close'].ewm(span=slow).mean()
对于范围内的 i(slow + 1, len(self.data)):
prev_fast = self.data['fast_ema'].iloc[i-1]
prev_slow = self.data['slow_ema'].iloc[i-1]
curr_fast = self.data['fast_ema'].iloc[i]
curr_slow = self.data['slow_ema'].iloc[i]
价格 = self.data['close'].iloc[i]
# 看涨交叉
如果 prev_fast <= prev_slow 且 curr_fast > curr_slow 且 self.position <= 0:
如果 self.position < 0:
self.capital += abs(self.position) * 价格 # 平仓
self.position = self.capital / 价格 # 做多
自有资本 = 0
self.trades.append({'时间': self.data['时间戳'].iloc[i],
"行动":"购买","价格":价格})
# 看跌交叉
elif prev_fast >= prev_slow 且 curr_fast < curr_slow 且 self.position >= 0:
如果 self.position > 0:
self.capital = self.position * 价格 # 平仓多头
self.position = -self.capital / 价格 # 做空
自有资本 = 0
self.trades.append({'时间': self.data['时间戳'].iloc[i],
"行动":"卖出","价格":价格})
# 计算最终盈亏
Final_price = self.data['close'].iloc[-1]
Final_value = self.capital + abs(self.position) * Final_price
总回报 = (最终值 / 10000 - 1) * 100
print(f"回测结果:")
print(f"总回报:{total_return: .2f}%")
print(f"总交易:{len(self.trades)}")
print(f"最终投资组合价值:${final_value: ,.2f}")
退货自营交易# 用法
# data = trader.fetch_historical_candles("BTC", "1h", 90)
# bt = BacktestEngine(数据)
# bt.run_ema_strategy(9, 21)
````### 可以 ```
pytho
n
def get_funding_rates(self, coin: str = None):
"""获取所有或特定市场的当前融资利率。"""
元 = self.info.meta()
资产=元['宇宙']
资金数据 = []
for asset in assets[:20]: # 检查前 20 个
名称 = 资产['名称']
ctx = self.info.funding_history(名称, 1)
如果ctx:
资金数据.append({
'硬币':名称,
'funding_rate': float(ctx[0].get('fundingRate', 0)),
'predicted_rate': float(ctx[0].get('predictedFundingRate', 0))
})
# 按绝对资金费率排序
funding_data.sort(key = lambda x:abs(x ['funding_rate']),reverse = True)
print("\n最高资金费率:")
对于funding_data[:10]中的f:
print(f" {f['coin']}: {f['funding_rate']*100: +.4f}%")
返回资金_数据
def get_recent_trades(self, coin: str, limit: int = 100):
"""获取最近的公开交易以进行交易量分析。"""
交易 = self.info.recent_trades(coin)
交易列表 = []
对于交易中的 t[:limit]:
trade_list.append({
'价格': float(t['px']),
'尺寸': float(t['sz']),
'side': '买入' if t['side'] == 'B' else '卖出',
'时间': t['时间']
})
df = pd.DataFrame(trade_list)
# 计算买入/卖出比率
buy_vol = df[df['side'] == 'buy']['size'].sum()
sell_vol = df[df['side'] == 'sell']['size'].sum()
比率 = buy_vol / sell_vol 如果 sell_vol > 0 否则 float('inf')
print(f"\n{coin} 交易分析 (最后 {limit} 笔交易)")
print(f"买入/卖出比率:{ratio: .2f} | 买入量:{buy_vol: .4f} | 卖出量:{sell_vol: .4f}")
返回df
# 用法
# 资金 = trader.get_funding_rates()
# 交易 = trader.get_recent_trades("ETH", 200)
``拥有自己的创造力和市场洞察力。今天就开始构建,体验为什么每天有数十亿的交易量流经 Hyperliquid 的订单簿。---*免责声明:加密货币交易存在重大风险。 本文仅用于教育目的,不构成财务建议。 始终进行自己的研究,切勿使用您无法承受损失的资金进行交易。 过去的表现并不能保证未来的结果。*---**相关文章:**
- Minara AI Trading Bot
— 人工智能驱动的自动交易
- Binance Exchange
— 世界领先的加密货币交易所**Resources: **
- [Hyperliquid Documentation](https://hyperliquid.gitbook.io/hyperliquid-docs)
- [GitHub: hyperliquid-dex](https://github.com/hyperliquid-dex)
- [Hyperliquid Python SDK](https://github.com/hyperliquid-dex/hyperliquid-python-sdk)
```pyt
h
o
n
class MultiAssetWebSocketManager:
"""Manage WebSocket connections for multiple assets simultaneously."""
def __init__(self, assets: list):
self.assets = assets
self.ws_url = "wss: //api.hyperliquid.xyz/ws"
self.handlers = {}
self.data_cache = {asset: {'mid': 0, 'spread': 0} for asset in assets}
def register_handler(self, channel: str, handler):
"""Register custom handler for a channel type."""
self.handlers[channel] = handler
async def subscribe_all(self, ws):
"""Subscribe to all assets in batch."""
for asset in self.assets:
sub = {
"method": "subscribe",
"subscription": {"type": "allMids"}
}
await ws.send(json.dumps(sub))
sub = {
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": asset}
}
await ws.send(json.dumps(sub))
print(f"Subscribed to {asset}")
async def run(self):
"""Main loop with multi-asset management."""
async with websockets.connect(self.ws_url) as ws:
await self.subscribe_all(ws)
async for message in ws:
msg = json.loads(message)
channel = msg.get("channel")
if channel == "allMids":
for asset, price in msg['data'].items():
if asset in self.data_cache:
self.data_cache[asset]['mid'] = float(price)
elif channel == "l2Book":
coin = msg['data']['coin']
if coin in self.data_cache:
bids = msg['data']['levels'][0]
asks = msg['data']['levels'][1]
if bids and asks:
best_bid = float(bids[0]['px'])
best_ask = float(asks[0]['px'])
self.data_cache[coin]['spread'] = best_ask - best_bid
# Call registered handlers
if channel in self.handlers:
await self.handlers[channel](msg)
# Usage
# assets = ["BTC", "ETH", "SOL", "AVAX", "ARB"]
# manager = MultiAssetWebSocketManager(assets)
# asyncio.run(manager.run())
pytho n def set_cross_margin(self, coin: str): “““Enable cross-margin mode for a market.””” result = self.exchange.update_isolated_margin(coin, False, None) print(f"Cross margin enabled for {coin}") return result
def set_isolated_margin(self, coin: str, leverage: int):
"""Enable isolated margin with specific leverage."""
result = self.exchange.update_isolated_margin(coin, True, leverage)
print(f"Isolated margin enabled for {coin} at {leverage}x")
return result
pytho
n
class RiskManager:
"""Comprehensive risk management system for Hyperliquid trading."""
def __init__(self, trader: HyperliquidTrader):
self.trader = trader
self.max_daily_loss = 0.05 # 5% max daily loss
self.max_position_size = 0.50 # 50% of account max
self.max_leverage = 25
self.daily_pnl = 0
self.last_reset = datetime.now().date()
def check_daily_limit(self) -> bool:
"""Check if daily loss limit has been reached."""
today = datetime.now().date()
if today != self.last_reset:
self.daily_pnl = 0
self.last_reset = today
account = self.trader.get_account_summary()
account_value = float(account['marginSummary']['accountValue'])
loss_pct = abs(min(self.daily_pnl, 0)) / account_value if account_value > 0 else 0
if loss_pct >= self.max_daily_loss:
print(f"DAILY LOSS LIMIT REACHED: {loss_pct*100: .2f}%")
return False
return True
def validate_order(self, coin: str, size: float, leverage: int) -> bool:
"""Validate order against risk parameters."""
# Check leverage limit
if leverage > self.max_leverage:
print(f"Leverage {leverage}x exceeds maximum {self.max_leverage}x")
return False
# Check position size limit
account = self.trader.get_account_summary()
account_value = float(account['marginSummary']['accountValue'])
mids = self.trader.info.all_mids()
price = float(mids.get(coin, 0))
notional = size * price
if notional / account_value > self.max_position_size:
print(f"Position size {notional/account_value*100: .1f}% exceeds limit")
return False
return True
def emergency_close_all(self):
"""Emergency close all positions immediately."""
print("EMERGENCY CLOSE ALL POSITIONS")
positions = self.trader.get_positions()
for pos in positions:
if pos['size'] != 0:
self.trader.close_position(pos['coin'])
time.sleep(0.5)
# Integrate with bot
# risk = RiskManager(trader)
# if risk.check_daily_limit() and risk.validate_order("BTC", 0.1, 10):
# trader.place_market_order("BTC", True, 0.1)
pytho n import aiohttp import asyncio
class AsyncHyperliquidClient: “““Async Hyperliquid client for maximum throughput.”””
def __init__(self):
self.base_url = "https://api.hyperliquid.xyz"
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def batch_requests(self, requests: list) -> list:
"""Execute multiple requests concurrently."""
tasks = [self._make_request(req) for req in requests]
return await asyncio.gather(*tasks, return_exceptions=True)
async def _make_request(self, request: dict):
"""Single async request."""
async with self.session.post(
f"{self.base_url}/info",
json=request
) as resp:
return await resp.json()
Usage #
async with AsyncHyperliquidClient() as client: #
reqs = [{“type”: “meta”}, {“type”: “allMids”}] #
results = await client.batch_requests(reqs) #
pytho
n
def deposit_usdc(self, amount_usdc: float):
"""Deposit USDC from Arbitrum to Hyperliquid."""
# Amount in 6 decimal places (USDC)
amount_wei = int(amount_usdc * 1_000_000)
result = self.exchange.spot_transfer(
to=self.wallet_address,
amount=str(amount_wei),
asset="USDC"
)
print(f"Deposited {amount_usdc} USDC")
return result
pytho n def fetch_historical_candles(self, coin: str, interval: str = “1h”, days: int = 30) -> pd.DataFrame: “““Fetch historical OHLCV data for backtesting.””” end_time = int(datetime.now().timestamp() * 1000) start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
candles = self.info.candles(
coin=coin,
interval=interval,
startTime=start_time,
endTime=end_time
)
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df['open'] = df['o'].astype(float)
df['high'] = df['h'].astype(float)
df['low'] = df['l'].astype(float)
df['close'] = df['c'].astype(float)
df['volume'] = df['v'].astype(float)
return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
pytho
n
class BacktestEngine:
"""Simple backtesting engine for Hyperliquid strategies."""
def __init__(self, data: pd.DataFrame, initial_capital: float = 10000):
self.data = data
self.capital = initial_capital
self.position = 0
self.trades = []
def run_ema_strategy(self, fast: int = 9, slow: int = 21):
"""Run EMA crossover backtest."""
self.data['fast_ema'] = self.data['close'].ewm(span=fast).mean()
self.data['slow_ema'] = self.data['close'].ewm(span=slow).mean()
for i in range(slow + 1, len(self.data)):
prev_fast = self.data['fast_ema'].iloc[i-1]
prev_slow = self.data['slow_ema'].iloc[i-1]
curr_fast = self.data['fast_ema'].iloc[i]
curr_slow = self.data['slow_ema'].iloc[i]
price = self.data['close'].iloc[i]
# Bullish crossover
if prev_fast <= prev_slow and curr_fast > curr_slow and self.position <= 0:
if self.position < 0:
self.capital += abs(self.position) * price # Close short
self.position = self.capital / price # Go long
self.capital = 0
self.trades.append({'time': self.data['timestamp'].iloc[i],
'action': 'BUY', 'price': price})
# Bearish crossover
elif prev_fast >= prev_slow and curr_fast < curr_slow and self.position >= 0:
if self.position > 0:
self.capital = self.position * price # Close long
self.position = -self.capital / price # Go short
self.capital = 0
self.trades.append({'time': self.data['timestamp'].iloc[i],
'action': 'SELL', 'price': price})
# Calculate final P&L
final_price = self.data['close'].iloc[-1]
final_value = self.capital + abs(self.position) * final_price
total_return = (final_value / 10000 - 1) * 100
print(f"Backtest Results:")
print(f"Total Return: {total_return: .2f}%")
print(f"Total Trades: {len(self.trades)}")
print(f"Final Portfolio Value: ${final_value: ,.2f}")
return self.trades
# Usage
# data = trader.fetch_historical_candles("BTC", "1h", 90)
# bt = BacktestEngine(data)
# bt.run_ema_strategy(9, 21)
```
💬 留言讨论