{{< resource-info >}}
Date: 2026-05-19
Category: AI Trading
Tags: Hyperliquid, perpetual DEX, on-chain trading, leverage trading, trading bot, DeFi, HyperEVM
Read Time: 18 minutes
Introduction: Why Hyperliquid Dominates the Perp DEX LandscapeThe decentralized perpetual futures trading landscape has undergone a seismic shift since 2023, and at the epicenter of this transformation stands Hyperliquid — the fully on-chain orderbook perpetual DEX that has consistently processed over $2 billion in daily trading volume throughout 2026. Unlike traditional automated market maker (AMM) based DEXs that rely on liquidity pools and suffer from slippage and impermanent loss, Hyperliquid brings the familiar Central Limit Order Book (CLOB) experience to the blockchain, combining the execution quality of centralized exchanges with the self-custody and transparency benefits of DeFi.Launched with a mission to eliminate the compromises traders face when choosing between centralized and decentralized venues, Hyperliquid has grown to support 100+ trading pairs across major cryptocurrencies, offering leverage of up to 50x on select markets. The platform’s native integration with HyperEVM — its purpose-built EVM-compatible execution layer — has opened the floodgates for sophisticated trading strategies, MEV-resistant execution, and seamless smart contract composability that was previously impossible on traditional perpetual DEXs.For algorithmic traders and bot developers, Hyperliquid represents a paradigm shift. The platform’s comprehensive Python SDK, combined with high-performance REST and WebSocket APIs, enables sub-second order placement, real-time position management, and fully automated strategy execution. Whether you’re building a market-making bot, a trend-following system, or a complex multi-exchange arbitrage strategy, Hyperliquid provides the infrastructure backbone that can handle institutional-grade trading volumes without sacrificing decentralization.In this comprehensive guide, we’ll explore every facet of trading on Hyperliquid in 2026 — from understanding the core architecture to building production-ready trading bots. By the end, you’ll have the knowledge and code templates necessary to integrate Hyperliquid into your algorithmic trading stack.—## Understanding Hyperliquid’s Core Architecture### The CLOB Advantage: Why Orderbooks Matter for PerpetualsTraditional perpetual DEXs built on AMM models like vAMM (virtual AMM) suffer from inherent limitations: concentrated liquidity around the current price, significant slippage for large orders, and exposure to impermanent loss for liquidity providers. Hyperliquid’s Central Limit Order Book (CLOB) architecture eliminates these problems entirely.In a CLOB system, traders place limit orders at specific prices, creating a visible depth chart that shows exactly how much liquidity exists at each price level. This enables:- Tighter spreads — Market makers compete directly, narrowing bid-ask spreads to levels comparable with Binance and Bybit #
- Zero slippage limit orders — Large orders execute exactly at the specified price (or better) when matched
- Transparent price discovery — All orders are visible on-chain, preventing hidden manipulation
- Efficient capital utilization — No need to lock capital in liquidity pools; capital is used only when orders are filledHyperliquid’s orderbook is fully settled on its proprietary L1 chain, achieving block times under 1 second and supporting 10,000+ transactions per second. This performance eliminates the latency penalties that have historically plagued on-chain trading venues.### HyperEVM: Smart Contracts Meet High-Frequency TradingThe introduction of HyperEVM in late 2024 was a watershed moment for Hyperliquid. This EVM-compatible execution layer enables:- Smart contract wallets — Programmable account logic for advanced access control and automated execution
- Composable DeFi strategies — Integration with lending protocols, yield optimizers, and other on-chain primitives
- Custom order types — Conditional orders, trailing stops, and TWAP execution via smart contract automation
- Gasless transactions — Meta-transaction support where fees can be paid in any token or sponsored by third partiesFor bot developers, HyperEVM means you can deploy smart contracts that interact directly with the exchange infrastructure, enabling strategies that would be impossible on centralized exchanges or traditional DEXs.—## Setting Up Your Hyperliquid Trading Environment### Prerequisites and API Key RegistrationBefore writing any code, you’ll need to set up your trading environment. Hyperliquid uses wallet-based authentication — no API key registration required. Your Ethereum wallet (via EIP-712 signatures) serves as your identity and authentication mechanism.Required tools:
- Python 3.10 or higher
- An Ethereum wallet with private key
- USDC on Arbitrum for deposits (Hyperliquid’s deposit layer)### Installing the Hyperliquid Python SDKThe official Python SDK provides comprehensive access to all exchange functions. Installation is straightforward:```bash
python -m venv hyperliquid-env source hyperliquid-env/bin/activate
Install the official SDK #
pip install hyperliquid-python-sdk
Install additional dependencies for bot development #
pip install websockets aiohttp pandas numpy python-dotenv
Create a `.env` file to store your configuration securely:bash
.env - NEVER commit this to version control #
PRIVATE_KEY=your_ethereum_private_key_here
WALLET_ADDRESS=0x_your_wallet_address
TESTNET=true
### Basic Connection and AuthenticationHere's the foundational code to establish an authenticated connection to Hyperliquidbash
.env - NEVER commit this to version control #
PRIVATE_KEY=your_ethereum_private_key_here WALLET_ADDRESS=0x_your_wallet_address TESTNET=true
load_dotenv()class HyperliquidTrader:
"""Production-ready Hyperliquid trading client."""
def __init__(self, use_testnet=True):
self.private_key = os.getenv('PRIVATE_KEY')
self.wal```python
import os
import asyncio
from dotenv import load_dotenv
from hyperliquid.exchange import Exchange
from hyperliquid.info import Info
from hyperliquid.utils import constants
# Load environment variables
load_dotenv()
class HyperliquidTrader:
"""Production-ready Hyperliquid trading client."""
def __init__(self, use_testnet=True):
self.private_key = os.getenv('PRIVATE_KEY')
self.wallet_address = os.getenv('WALLET_ADDRESS')
# Select endpoint based on environment
if use_testnet:
self.base_url = constants.TESTNET_API_URL
else:
self.base_url = constants.MAINNET_API_URL
# Initialize exchange and info clients
self.exchange = Exchange(
self.wallet_address,
self.private_key,
self.base_url
)
self.info = Info(self.base_url)
print(f"Connected to Hyperliquid {'Testnet' if use_testnet else 'Mainnet'}")
print(f"Wallet: {self.wallet_address}")
def get_account_summary(self):
"""Fetch comprehensive account information."""
user_state = self.info.user_state(self.wallet_address)
account_value = float(user_state['marginSummary']['accountValue'])
total_margin_used = float(user_state['marginSummary']['totalMarginUsed'])
withdrawable = float(user_state['withdrawable'])
print(f"Account Value: ${account_value:,.2f}")
print(f"Margin Used: ${total_margin_used:,.2f}")
print(f"Withdrawable: ${withdrawable:,.2f}")
return user_state
# Initialize trader
trader = HyperliquidTrader(use_testnet=True)
trader.get_account_summary()
```'name': asset['name'],
'max_leverage': asset['maxLeverage'],
'sz_decimals': asset['szDecimals'],
'only_isolated': asset.get('onlyIsolated', False)
})
print(f"\nAvailable Markets: {len(assets)}")
for asset in assets[:10]: # Show first 10
print(f" {asset['name']}: {asset['max_leverage']}x max leverage")
return assets# Usage
assets = trader.get_all_assets()
```---## Building a Production-Ready Trading Bot### Real-Time Market Data via WebSocketFor algorithmic trading, low-latency market data is essential. Hyperliquid's WebSocket API provides real-time orderbook updates, trades, and user-specific fills:```python
import json
import websocketsclass HyperliquidWebSocketFeed:
"""High-performance WebSocket data feed for Hyperliquid."""
def __init__(self):
self.ws_url = "wss://api.hyperliquid.xyz/ws"
self.subscriptions = {}
self.orderbook_cache = {}
self.running = False
async def connect(self):
"""Establish WebSocket connection with auto-reconnect."""
while True:
try:
async with websockets.connect(self.ws_url) as ws:
print("WebSocket connected")
self.ws = ws
self.running = True
# Resubscribe to previous channels on reconnect
for sub in self.subscriptions.values():
await ws.send(json.dumps(sub))
await self._listen()
except Exception as e:
print(f"WebSocket error: {e}. Reconnecting in 5s...")
await asyncio.sleep(5)
async def _listen(self):
"""Process incoming messages."""
async f```python
def get_all_assets(self):
"""Retrieve all available perpetual markets."""
meta = self.info.meta()
universe = meta['universe']
assets = []
for asset in universe:
assets.append({
'name': asset['name'],
'max_leverage': asset['maxLeverage'],
'sz_decimals': asset['szDecimals'],
'only_isolated': asset.get('onlyIsolated', False)
})
print(f"\nAvailable Markets: {len(assets)}")
for asset in assets[:10]: # Show first 10
print(f" {asset['name']}: {asset['max_leverage']}x max leverage")
return assets
# Usage
assets = trader.get_all_assets()
```r a in levels[1]],
'timestamp': data.get('time', 0)
}
async def subscribe_orderbook(self, coin):
"""Subscribe to real-time orderbook for a specific market."""
sub = {
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": coin}
}
self.subscriptions[f"book_{coin}"] = sub
if self.running:
await self.ws.send(json.dumps(sub))
print(f"Subscribed to {coin} orderbook")
async def subscribe_trades(self, coin):
"""Subscribe to real-time trade feed."""
sub = {
"method": "subscribe",
"subscription": {"type": "trades", "coin": coin}
}
self.subscriptions[f"trades_{coin}"] = sub
if self.running:
await self.ws.send(json.dumps(sub))# Usage
async def main():
feed = HyperliquidWebSocketFeed()
await feed.subscribe_orderbook("BTC")
await feed.subscribe_trades("BTC")
await feed.```python
import json
import websockets
class HyperliquidWebSocketFeed:
"""High-performance WebSocket data feed for Hyperliquid."""
def __init__(self):
self.ws_url = "wss://api.hyperliquid.xyz/ws"
self.subscriptions = {}
self.orderbook_cache = {}
self.running = False
async def connect(self):
"""Establish WebSocket connection with auto-reconnect."""
while True:
try:
async with websockets.connect(self.ws_url) as ws:
print("WebSocket connected")
self.ws = ws
self.running = True
# Resubscribe to previous channels on reconnect
for sub in self.subscriptions.values():
await ws.send(json.dumps(sub))
await self._listen()
except Exception as e:
print(f"WebSocket error: {e}. Reconnecting in 5s...")
await asyncio.sleep(5)
async def _listen(self):
"""Process incoming messages."""
async for message in self.ws:
msg = json.loads(message)
if msg.get("channel") == "l2Book":
await self._handle_orderbook(msg['data'])
elif msg.get("channel") == "trades":
await self._handle_trades(msg['data'])
elif msg.get("channel") == "userFills":
await self._handle_fills(msg['data'])
async def _handle_orderbook(self, data):
"""Process L2 orderbook updates."""
coin = data['coin']
levels = data['levels']
self.orderbook_cache[coin] = {
'bids': [{'px': float(b['px']), 'sz': float(b['sz'])} for b in levels[0]],
'asks': [{'px': float(a['px']), 'sz': float(a['sz'])} for a in levels[1]],
'timestamp': data.get('time', 0)
}
async def subscribe_orderbook(self, coin):
"""Subscribe to real-time orderbook for a specific market."""
sub = {
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": coin}
}
self.subscriptions[f"book_{coin}"] = sub
if self.running:
await self.ws.send(json.dumps(sub))
print(f"Subscribed to {coin} orderbook")
async def subscribe_trades(self, coin):
"""Subscribe to real-time trade feed."""
sub = {
"method": "subscribe",
"subscription": {"type": "trades", "coin": coin}
}
self.subscriptions[f"trades_{coin}"] = sub
if self.running:
await self.ws.send(json.dumps(sub))
# Usage
async def main():
feed = HyperliquidWebSocketFeed()
await feed.subscribe_orderbook("BTC")
await feed.subscribe_trades("BTC")
await feed.connect()
# asyncio.run(main())
```px = float(position['entryPx'])
current_px = float(position['markPx'])
size = float(position['szi'])
pnl = (current_px - entry_px) * size if size > 0 else (entry_px - current_px) * abs(size)
pnl_pct = ((current_px / entry_px) - 1) * 100 * (1 if size > 0 else -1)
active_positions.append({
'coin': position['coin'],
'size': size,
'entry_price': entry_px,
'mark_price': current_px,
'unrealized_pnl': pnl,
'pnl_percent': pnl_pct,
'leverage': float(position['leverage']['value']),
'margin_used': float(position['marginUsed']),
'liquidation_price': float(position.get('liquidationPx', 0))
})
return active_positions
def set_leverage(self, coin: str, leverage: int):
"""Set leverage for a specific market."""
result = self.exchange.update_leverage(leverage, coin)
print(f"Leverage for {coin} set to { leverage}x")
return result
def close_position(self, coin: str):
"""Close entire position for a specific market."""
positions = self.get_positions()
for pos in positions:
if pos['coin'] == coin and pos['size'] != 0:
is_buy = pos['size'] < 0 # Buy to close short, sell to close long
sz = abs(pos['size'])
return self.place_market_order(coin, is_buy, sz)
print(f"No open position for {coin}")
return None# Usage examples
# trader.set_leverage("BTC", 10)
# trader.place_limit_order("BTC", True, 0.1, 65000.0, "Gtc")
# positions = trader.get_positions()
```### Complete Trend-Following Bot ExampleHere's a complete, production-ready trend-following bot that combines all components:```python
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedeltaclass TrendFollowingBot:
"""EMA crossover trend-following bot for Hyperliquid."""
def __init__(self, trader: HyperliquidTrader, coin: str = "BTC"):
self.trader = trader
self.coin = coin
self.fast_ema_period = 9
self.slow_ema_period = 21
self.risk_per_trade = 0.02 # 2% of account
self.in_position = False
self.position_side = None
def calculate_ema(self, prices: pd.Series, period: int) -> float:
"""Calculate Exponential Moving Average."""
return prices.ewm(span=period, adjust=False).mean().iloc[-1]
def fetch_recent_prices(self, lookback: int = 100) -> pd.Series:
"""Fetch recent market prices from candle data."""
# Using REST API for historical candles
candles = self.trader.info.candles(
coin=self.coin,
interval="5m",
startTime=int((datetime.now() - timedelta(hours=12)).timestamp() * 1000),
endTime=int(datetime.now().timestamp() * 1000)
)
```python
def place_market_order(self, coin: str, is_buy: bool, sz: float):
"""Execute a market order with slippage protection."""
order_type = {"limit": {"tif": "Ioc"}} # Immediate-or-Cancel
result = self.exchange.order(
coin,
is_buy,
sz,
0, # Price 0 for market IOC
order_type,
reduce_only=False
)
print(f"Market {'BUY' if is_buy else 'SELL'} {sz} {coin}")
print(f"Status: {result['status']}")
if 'response' in result and 'data' in result['response']:
statuses = result['response']['data']['statuses']
for status in statuses:
if 'filled' in status:
fill = status['filled']
print(f"Filled: {fill['totalSz']} @ avg {fill['avgPx']}")
return result
def place_limit_order(self, coin: str, is_buy: bool, sz: float,
px: float, tif: str = "Gtc"):
"""Place a limit order with specified time-in-force.
TIF options:
- Gtc: Good-til-Cancelled
- Ioc: Immediate-or-Cancel
- Fok: Fill-or-Kill
"""
order_type = {"limit": {"tif": tif}}
result = self.exchange.order(
coin,
is_buy,
sz,
px,
order_type,
reduce_only=False
)
print(f"Limit {'BUY' if is_buy else 'SELL'} {sz} {coin} @ {px}")
return result
def place_stop_loss_order(self, coin: str, is_buy: bool, sz: float,
trigger_px: float, limit_px: float):
"""Place a stop-loss order with trigger price."""
order_type = {
"trigger": {
"triggerPx": str(trigger_px),
"isMarket": True,
"tpsl": "sl"
}
}
result = self.exchange.order(
coin,
is_buy,
sz,
limit_px,
order_type,
reduce_only=True
)
print(f"Stop-Loss {'BUY' if is_buy else 'SELL'} {sz} {coin}")
print(f"Trigger: {trigger_px}, Limit: {limit_px}")
return result
```ent_px = float(mids.get(self.coin, 0))
if current_px == 0:
print("Could not fetch current price")
return
# Calculate position size
sz = self.calculate_position_size(current_px)
if signal == "BUY":
self.trader.place_market_order(self.coin, True, sz)
self.position_side = "LONG"
elif signal == "SELL":
self.trader.place_market_order(self.coin, False, sz)
self.position_side = "SHORT"
self.in_position = True
def run(self, check_interval: int = 60):
"""Main bot loop with signal checking."""
print(f"Starting trend bot for {self.coin}")
print(f"Fast EMA: {self.fast_ema_period}, Slow EMA: {self.slow_ema_period}")
while True:
try:
signal = self.check_signals()
print(f"[{datetime.now()}] Signal: {signal}")
self.execute_signal(signal)
# Print current positions
positions = self.trader.get_positions()
for pos in positions:
print(f" {pos['coin']}: {pos['size']} | "
f"PnL: ${pos['unrealized_pnl']:+.2f} ({pos['pnl_percent']:+.2f}%)")
time.sleep(check_interval)
except Exception as e:
print(f"Error in bot loop: {e}")
time.sleep(10)# Launch bot
# bot = TrendFollowingBot(trader, "BTC")
# bot.run()
```---## Advanced Hyperliquid API Integration### REST API: Batch Operations and Historical DataFor strategies requiring historical analysis, the REST API provides comprehensive data access:```python
def get_funding_rates(self, coin: str = None):
"""Fetch current funding rates for all or specific markets."""
meta = self.info.meta()
assets = meta['universe']
funding_data = []
for asset in assets[:20]: # Check top 20
name = asset['name']
ctx = self.info.funding_history(name, 1)
if ctx:
funding_data.append({
'coin': name,
'funding_rate': float(ctx[0].get('fundingRate', 0)),
'predicted_rate': float(ctx[0].get('predictedFundingRate'```python
def get_positions(self):
"""Fetch all open positions with P&L details."""
user_state = self.info.user_state(self.wallet_address)
positions = user_state.get('assetPositions', [])
active_positions = []
for pos in positions:
position = pos['position']
entry_px = float(position['entryPx'])
current_px = float(position['markPx'])
size = float(position['szi'])
pnl = (current_px - entry_px) * size if size > 0 else (entry_px - current_px) * abs(size)
pnl_pct = ((current_px / entry_px) - 1) * 100 * (1 if size > 0 else -1)
active_positions.append({
'coin': position['coin'],
'size': size,
'entry_price': entry_px,
'mark_price': current_px,
'unrealized_pnl': pnl,
'pnl_percent': pnl_pct,
'leverage': float(position['leverage']['value']),
'margin_used': float(position['marginUsed']),
'liquidation_price': float(position.get('liquidationPx', 0))
})
return active_positions
def set_leverage(self, coin: str, leverage: int):
"""Set leverage for a specific market."""
result = self.exchange.update_leverage(leverage, coin)
print(f"Leverage for {coin} set to { leverage}x")
return result
def close_position(self, coin: str):
"""Close entire position for a specific market."""
positions = self.get_positions()
for pos in positions:
if pos['coin'] == coin and pos['size'] != 0:
is_buy = pos['size'] < 0 # Buy to close short, sell to close long
sz = abs(pos['size'])
return self.place_market_order(coin, is_buy, sz)
print(f"No open position for {coin}")
return None
# Usage examples
# trader.set_leverage("BTC", 10)
# trader.place_limit_order("BTC", True, 0.1, 65000.0, "Gtc")
# positions = trader.get_positions()
```assets:
sub = {
"method": "subscribe",
"subscription": {"type": "allMids"}
}
await ws.send(json.dumps(sub))
sub = {
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": asset}
}
await ws.send(json.dumps(sub))
print(f"Subscribed to {asset}")
async def run(self):
"""Main loop with multi-asset management."""
async with websockets.connect(self.ws_url) as ws:
await self.subscribe_all(ws)
async for message in ws:
msg = json.loads(message)
channel = msg.get("channel")
if channel == "allMids":
for asset, price in msg['data'].items():
if asset in self.data_cache:
self.data_cache[asset]['mid'] = float(price)
elif channel == "l2Book":
coin = msg['data']['coin']
if coin in self.data_cache:
bids = msg['data']['levels'][0]
asks = msg['data']['levels'][1]
if bids and asks:
best_bid = float(bids[0]['px'])
best_ask = float(asks[0]['px'])
self.data_cache[coin]['spread'] = best_ask - best_bid
# Call registered handlers
if channel in self.handlers:
await self.handlers[channel](msg)# Usage
# assets = ["BTC", "ETH", "SOL", "AVAX", "ARB"]
# manager = MultiAssetWebSocketManager(assets)
# asyncio.run(manager.run())
```---## Risk Management on Hyperliquid### Isolated vs. Cross MarginHyperliquid supports both isolated and cross-margin modes:```python
def set_cross_margin(self, coin: str):
"""Enable cross-margin mode for a market."""
result = self.exchange.update_isolated_margin(coin, False, None)
print(f"Cross margin enabled for {coin}")
return result
def set_isolated_margin(self, coin: str, leverag```python
import time
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class TrendFollowingBot:
"""EMA crossover trend-following bot for Hyperliquid."""
def __init__(self, trader: HyperliquidTrader, coin: str = "BTC"):
self.trader = trader
self.coin = coin
self.fast_ema_period = 9
self.slow_ema_period = 21
self.risk_per_trade = 0.02 # 2% of account
self.in_position = False
self.position_side = None
def calculate_ema(self, prices: pd.Series, period: int) -> float:
"""Calculate Exponential Moving Average."""
return prices.ewm(span=period, adjust=False).mean().iloc[-1]
def fetch_recent_prices(self, lookback: int = 100) -> pd.Series:
"""Fetch recent market prices from candle data."""
# Using REST API for historical candles
candles = self.trader.info.candles(
coin=self.coin,
interval="5m",
startTime=int((datetime.now() - timedelta(hours=12)).timestamp() * 1000),
endTime=int(datetime.now().timestamp() * 1000)
)
closes = [float(c['c']) for c in candles if 'c' in c]
return pd.Series(closes)
def calculate_position_size(self, entry_px: float) -> float:
"""Calculate position size based on risk parameters."""
account = self.trader.get_account_summary()
account_value = float(account['marginSummary']['accountValue'])
risk_amount = account_value * self.risk_per_trade
# Risk-based sizing: size = risk_amount / (estimated stop distance)
stop_distance = entry_px * 0.015 # 1.5% stop
position_size = risk_amount / stop_distance
# Round to appropriate decimal places
return round(position_size, 4)
def check_signals(self) -> str:
"""Check for EMA crossover signals."""
prices = self.fetch_recent_prices()
if len(prices) < self.slow_ema_period + 5:
return "HOLD"
fast_ema = self.calculate_ema(prices, self.fast_ema_period)
slow_ema = self.calculate_ema(prices, self.slow_ema_period)
# Previous values for crossover detection
prev_fast = prices.ewm(span=self.fast_ema_period).mean().iloc[-2]
prev_slow = prices.ewm(span=self.slow_ema_period).mean().iloc[-2]
# Bullish crossover: fast crosses above slow
if prev_fast <= prev_slow and fast_ema > slow_ema:
return "BUY"
# Bearish crossover: fast crosses below slow
if prev_fast >= prev_slow and fast_ema < slow_ema:
return "SELL"
return "HOLD"
def execute_signal(self, signal: str):
"""Execute trading signal with proper risk management."""
if signal == "HOLD":
return
# Close existing position if opposite signal
if self.in_position:
if (signal == "BUY" and self.position_side == "SHORT") or \
(signal == "SELL" and self.position_side == "LONG"):
self.trader.close_position(self.coin)
self.in_position = False
time.sleep(1) # Wait for fill
else:
return # Same direction, hold
# Get current price for sizing
mids = self.trader.info.all_mids()
current_px = float(mids.get(self.coin, 0))
if current_px == 0:
print("Could not fetch current price")
return
# Calculate position size
sz = self.calculate_position_size(current_px)
if signal == "BUY":
self.trader.place_market_order(self.coin, True, sz)
self.position_side = "LONG"
elif signal == "SELL":
self.trader.place_market_order(self.coin, False, sz)
self.position_side = "SHORT"
self.in_position = True
def run(self, check_interval: int = 60):
"""Main bot loop with signal checking."""
print(f"Starting trend bot for {self.coin}")
print(f"Fast EMA: {self.fast_ema_period}, Slow EMA: {self.slow_ema_period}")
while True:
try:
signal = self.check_signals()
print(f"[{datetime.now()}] Signal: {signal}")
self.execute_signal(signal)
# Print current positions
positions = self.trader.get_positions()
for pos in positions:
print(f" {pos['coin']}: {pos['size']} | "
f"PnL: ${pos['unrealized_pnl']:+.2f} ({pos['pnl_percent']:+.2f}%)")
time.sleep(check_interval)
except Exception as e:
print(f"Error in bot loop: {e}")
time.sleep(10)
# Launch bot
# bot = TrendFollowingBot(trader, "BTC")
# bot.run()
```r Arbitrum confirmation (typically under 5 minutes)For programmatic deposits, use the bridge contract directly:```python
def deposit_usdc(self, amount_usdc: float):
"""Deposit USDC from Arbitrum to Hyperliquid."""
# Amount in 6 decimal places (USDC)
amount_wei = int(amount_usdc * 1_000_000)
result = self.exchange.spot_transfer(
to=self.wallet_address,
amount=str(amount_wei),
asset="USDC"
)
print(f"Deposited {amount_usdc} USDC")
return result
```### What are the trading fees on Hyperliquid?Hyperliquid uses a **maker-taker fee model**:
- **Maker fee**: -0.002% (negative — you earn a rebate for providing liquidity)
- **Taker fee**: 0.035%These fees are among the most competitive in the perpetual DEX space. High-volume traders can qualify for additional fee discounts through the Hyperliquid VIP program.### Is Hyperliquid safe? What are the risks?Hyperliquid operates as a **non-custodial exchange** — your funds remain in a smart contract controlled by your private keys. However, as with any DeFi protocol, risks exist:1. **Smart contract risk**: While audited, bugs could potentially affect fund safety
2. **Liquidation risk**: High-leverage positions can be liquidated during volatile moves
3. **Oracle risk**: Mark prices rely on oracle feeds that could theoretically deviate
4. **Bridge risk**: Deposits transit through Arbitrum bridge infrastructureThe platform has processed over $500 billion in cumulative volume since inception with no major security incidents. Insurance funds are maintained to protect against socialized losses.### How do I backtest strategies before going live?Hyperliquid provides **free historical data** via its API. Here's a backtesting framework:```python
def fetch_historical_candles(self, coin: str, interval: str = "1h",
days: int = 30) -> pd.DataFrame:
"""Fetch historical OHLCV data for backtesting."""
end_time = int(datetime.now().timestamp() * 1000)
start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
candles = self.info.candles(
coin=coin,
interval=interval,
startTime=start_time,
endTime=end_time
)
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df['open'] = df['o'].astype(float)
df['high'] = df['h'].astype(float)
df['low'] = df['l'].astype(float)
df['close'] = df['c'].astype(float)
df['volume'] = df['v'].astype(float)
return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
``````python
class BacktestEngine:
"""Simple backtesting engine for Hyperliquid strategies."""
def __init__(self, data: pd.DataFrame, initial_capital: float = 10000):
self.data = data
self.capital = initial_capital
self.position = 0
self.trades = []
def run_ema_strategy(self, fast: int = 9, slow: int = 21):
"""Run EMA crossover backtest."""
self.data['fast_ema'] = self.data['close'].ewm(span=fast).mean()
self.data['slow_ema'] = self.data['close'].ewm(span=slow).mean()
for i in range(slow + 1, len(self.data)):
prev_fast = self.data['fast_ema'].iloc[i-1]
prev_slow = self.data['slow_ema'].iloc[i-1]
curr_fast = self.data['fast_ema'].iloc[i]
curr_slow = self.data['slow_ema'].iloc[i]
price = self.data['close'].iloc[i]
# Bullish crossover
if prev_fast <= prev_slow and curr_fast > curr_slow and self.position <= 0:
if self.position < 0:
self.capital += abs(self.position) * price # Close short
self.position = self.capital / price # Go long
self.capital = 0
self.trades.append({'time': self.data['timestamp'].iloc[i],
'action': 'BUY', 'price': price})
# Bearish crossover
elif prev_fast >= prev_slow and curr_fast < curr_slow and self.position >= 0:
if self.position > 0:
self.capital = self.position * price # Close long
self.position = -self.capital / price # Go short
self.capital = 0
self.trades.append({'time': self.data['timestamp'].iloc[i],
'action': 'SELL', 'price': price})
# Calculate final P&L
final_price = self.data['close'].iloc[-1]
final_value = self.capital + abs(self.position) * final_price
total_return = (final_value / 10000 - 1) * 100
print(f"Backtest Results:")
print(f"Total Return: {total_return:.2f}%")
print(f"Total Trades: {len(self.trades)}")
print(f"Final Portfolio Value: ${final_value:,.2f}")
return self.trades# Usage
# data = trader.fetch_historical_candles("BTC", "1h", 90)
# bt = BacktestEngine(data)
# bt.run_ema_strategy(9, 21)
```### Can ```python
def get_funding_rates(self, coin: str = None):
"""Fetch current funding rates for all or specific markets."""
meta = self.info.meta()
assets = meta['universe']
funding_data = []
for asset in assets[:20]: # Check top 20
name = asset['name']
ctx = self.info.funding_history(name, 1)
if ctx:
funding_data.append({
'coin': name,
'funding_rate': float(ctx[0].get('fundingRate', 0)),
'predicted_rate': float(ctx[0].get('predictedFundingRate', 0))
})
# Sort by absolute funding rate
funding_data.sort(key=lambda x: abs(x['funding_rate']), reverse=True)
print("\nTop Funding Rates:")
for f in funding_data[:10]:
print(f" {f['coin']}: {f['funding_rate']*100:+.4f}%")
return funding_data
def get_recent_trades(self, coin: str, limit: int = 100):
"""Fetch recent public trades for volume analysis."""
trades = self.info.recent_trades(coin)
trade_list = []
for t in trades[:limit]:
trade_list.append({
'price': float(t['px']),
'size': float(t['sz']),
'side': 'buy' if t['side'] == 'B' else 'sell',
'time': t['time']
})
df = pd.DataFrame(trade_list)
# Calculate buy/sell ratio
buy_vol = df[df['side'] == 'buy']['size'].sum()
sell_vol = df[df['side'] == 'sell']['size'].sum()
ratio = buy_vol / sell_vol if sell_vol > 0 else float('inf')
print(f"\n{coin} Trade Analysis (last {limit} trades)")
print(f"Buy/Sell Ratio: {ratio:.2f} | Buy Vol: {buy_vol:.4f} | Sell Vol: {sell_vol:.4f}")
return df
# Usage
# funding = trader.get_funding_rates()
# trades = trader.get_recent_trades("ETH", 200)
```h your own creativity and market insight.Start building today and experience why billions in daily volume flow through Hyperliquid's orderbooks.---*Disclaimer: Cryptocurrency trading carries significant risk. This article is for educational purposes only and does not constitute financial advice. Always conduct your own research and never trade with funds you cannot afford to lose. Past performance does not guarantee future results.*---**Related Articles:**
- Minara AI Trading Bot
— AI-powered automated trading
- Binance Exchange
— World's leading crypto exchange**Resources:**
- [Hyperliquid Documentation](https://hyperliquid.gitbook.io/hyperliquid-docs)
- [GitHub: hyperliquid-dex](https://github.com/hyperliquid-dex)
- [Hyperliquid Python SDK](https://github.com/hyperliquid-dex/hyperliquid-python-sdk)
```python
class MultiAssetWebSocketManager:
"""Manage WebSocket connections for multiple assets simultaneously."""
def __init__(self, assets: list):
self.assets = assets
self.ws_url = "wss://api.hyperliquid.xyz/ws"
self.handlers = {}
self.data_cache = {asset: {'mid': 0, 'spread': 0} for asset in assets}
def register_handler(self, channel: str, handler):
"""Register custom handler for a channel type."""
self.handlers[channel] = handler
async def subscribe_all(self, ws):
"""Subscribe to all assets in batch."""
for asset in self.assets:
sub = {
"method": "subscribe",
"subscription": {"type": "allMids"}
}
await ws.send(json.dumps(sub))
sub = {
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": asset}
}
await ws.send(json.dumps(sub))
print(f"Subscribed to {asset}")
async def run(self):
"""Main loop with multi-asset management."""
async with websockets.connect(self.ws_url) as ws:
await self.subscribe_all(ws)
async for message in ws:
msg = json.loads(message)
channel = msg.get("channel")
if channel == "allMids":
for asset, price in msg['data'].items():
if asset in self.data_cache:
self.data_cache[asset]['mid'] = float(price)
elif channel == "l2Book":
coin = msg['data']['coin']
if coin in self.data_cache:
bids = msg['data']['levels'][0]
asks = msg['data']['levels'][1]
if bids and asks:
best_bid = float(bids[0]['px'])
best_ask = float(asks[0]['px'])
self.data_cache[coin]['spread'] = best_ask - best_bid
# Call registered handlers
if channel in self.handlers:
await self.handlers[channel](msg)
# Usage
# assets = ["BTC", "ETH", "SOL", "AVAX", "ARB"]
# manager = MultiAssetWebSocketManager(assets)
# asyncio.run(manager.run())
``````python
def set_cross_margin(self, coin: str):
"""Enable cross-margin mode for a market."""
result = self.exchange.update_isolated_margin(coin, False, None)
print(f"Cross margin enabled for {coin}")
return result
def set_isolated_margin(self, coin: str, leverage: int):
"""Enable isolated margin with specific leverage."""
result = self.exchange.update_isolated_margin(coin, True, leverage)
print(f"Isolated margin enabled for {coin} at {leverage}x")
return result
``````python
class RiskManager:
"""Comprehensive risk management system for Hyperliquid trading."""
def __init__(self, trader: HyperliquidTrader):
self.trader = trader
self.max_daily_loss = 0.05 # 5% max daily loss
self.max_position_size = 0.50 # 50% of account max
self.max_leverage = 25
self.daily_pnl = 0
self.last_reset = datetime.now().date()
def check_daily_limit(self) -> bool:
"""Check if daily loss limit has been reached."""
today = datetime.now().date()
if today != self.last_reset:
self.daily_pnl = 0
self.last_reset = today
account = self.trader.get_account_summary()
account_value = float(account['marginSummary']['accountValue'])
loss_pct = abs(min(self.daily_pnl, 0)) / account_value if account_value > 0 else 0
if loss_pct >= self.max_daily_loss:
print(f"DAILY LOSS LIMIT REACHED: {loss_pct*100:.2f}%")
return False
return True
def validate_order(self, coin: str, size: float, leverage: int) -> bool:
"""Validate order against risk parameters."""
# Check leverage limit
if leverage > self.max_leverage:
print(f"Leverage {leverage}x exceeds maximum {self.max_leverage}x")
return False
# Check position size limit
account = self.trader.get_account_summary()
account_value = float(account['marginSummary']['accountValue'])
mids = self.trader.info.all_mids()
price = float(mids.get(coin, 0))
notional = size * price
if notional / account_value > self.max_position_size:
print(f"Position size {notional/account_value*100:.1f}% exceeds limit")
return False
return True
def emergency_close_all(self):
"""Emergency close all positions immediately."""
print("EMERGENCY CLOSE ALL POSITIONS")
positions = self.trader.get_positions()
for pos in positions:
if pos['size'] != 0:
self.trader.close_position(pos['coin'])
time.sleep(0.5)
# Integrate with bot
# risk = RiskManager(trader)
# if risk.check_daily_limit() and risk.validate_order("BTC", 0.1, 10):
# trader.place_market_order("BTC", True, 0.1)
``````python
import aiohttp
import asyncio
class AsyncHyperliquidClient:
"""Async Hyperliquid client for maximum throughput."""
def __init__(self):
self.base_url = "https://api.hyperliquid.xyz"
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def batch_requests(self, requests: list) -> list:
"""Execute multiple requests concurrently."""
tasks = [self._make_request(req) for req in requests]
return await asyncio.gather(*tasks, return_exceptions=True)
async def _make_request(self, request: dict):
"""Single async request."""
async with self.session.post(
f"{self.base_url}/info",
json=request
) as resp:
return await resp.json()
# Usage
# async with AsyncHyperliquidClient() as client:
# reqs = [{"type": "meta"}, {"type": "allMids"}]
# results = await client.batch_requests(reqs)
``````python
def deposit_usdc(self, amount_usdc: float):
"""Deposit USDC from Arbitrum to Hyperliquid."""
# Amount in 6 decimal places (USDC)
amount_wei = int(amount_usdc * 1_000_000)
result = self.exchange.spot_transfer(
to=self.wallet_address,
amount=str(amount_wei),
asset="USDC"
)
print(f"Deposited {amount_usdc} USDC")
return result
``````python
def fetch_historical_candles(self, coin: str, interval: str = "1h",
days: int = 30) -> pd.DataFrame:
"""Fetch historical OHLCV data for backtesting."""
end_time = int(datetime.now().timestamp() * 1000)
start_time = int((datetime.now() - timedelta(days=days)).timestamp() * 1000)
candles = self.info.candles(
coin=coin,
interval=interval,
startTime=start_time,
endTime=end_time
)
df = pd.DataFrame(candles)
df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
df['open'] = df['o'].astype(float)
df['high'] = df['h'].astype(float)
df['low'] = df['l'].astype(float)
df['close'] = df['c'].astype(float)
df['volume'] = df['v'].astype(float)
return df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
``````python
class BacktestEngine:
"""Simple backtesting engine for Hyperliquid strategies."""
def __init__(self, data: pd.DataFrame, initial_capital: float = 10000):
self.data = data
self.capital = initial_capital
self.position = 0
self.trades = []
def run_ema_strategy(self, fast: int = 9, slow: int = 21):
"""Run EMA crossover backtest."""
self.data['fast_ema'] = self.data['close'].ewm(span=fast).mean()
self.data['slow_ema'] = self.data['close'].ewm(span=slow).mean()
for i in range(slow + 1, len(self.data)):
prev_fast = self.data['fast_ema'].iloc[i-1]
prev_slow = self.data['slow_ema'].iloc[i-1]
curr_fast = self.data['fast_ema'].iloc[i]
curr_slow = self.data['slow_ema'].iloc[i]
price = self.data['close'].iloc[i]
# Bullish crossover
if prev_fast <= prev_slow and curr_fast > curr_slow and self.position <= 0:
if self.position < 0:
self.capital += abs(self.position) * price # Close short
self.position = self.capital / price # Go long
self.capital = 0
self.trades.append({'time': self.data['timestamp'].iloc[i],
'action': 'BUY', 'price': price})
# Bearish crossover
elif prev_fast >= prev_slow and curr_fast < curr_slow and self.position >= 0:
if self.position > 0:
self.capital = self.position * price # Close long
self.position = -self.capital / price # Go short
self.capital = 0
self.trades.append({'time': self.data['timestamp'].iloc[i],
'action': 'SELL', 'price': price})
# Calculate final P&L
final_price = self.data['close'].iloc[-1]
final_value = self.capital + abs(self.position) * final_price
total_return = (final_value / 10000 - 1) * 100
print(f"Backtest Results:")
print(f"Total Return: {total_return:.2f}%")
print(f"Total Trades: {len(self.trades)}")
print(f"Final Portfolio Value: ${final_value:,.2f}")
return self.trades
# Usage
# data = trader.fetch_historical_candles("BTC", "1h", 90)
# bt = BacktestEngine(data)
# bt.run_ema_strategy(9, 21)
💬 Discussion