Bài viết này tổng hợp đầy đủ 7 đoạn code hiện được công bố trên Code Vault — một kho code cá nhân về radar giao dịch crypto, hệ thống giao dịch tự động và công cụ bảo mật, tất cả viết bằng Python thuần với chi phí API bằng 0 hoặc gần bằng 0. Mỗi đoạn code bên dưới đều kèm mã nguồn đầy đủ để bạn đọc, fork và chạy cục bộ. Sử dụng mục lục bên phải để nhảy đến công cụ cụ thể.
⚠️ Cảnh báo rủi ro — Các công cụ này tiếp xúc trực tiếp với thị trường và dữ liệu on-chain. Một số công cụ gửi cảnh báo thời gian thực qua Telegram, và một công cụ (“AI giao dịch tự động”) mở vị thế ảo trên Binance Futures. Hãy đọc kỹ ghi chú dưới mỗi công cụ. Sử dụng với rủi ro tự chịu; tác giả không bảo đảm bất kỳ kết quả giao dịch nào.
Radar Giao Dịch
Vitalik Sell Radar (Radar bán của Vitalik)
Ngày: 2026.05.02 Thẻ: Python · WebSocket · Ethereum · Telegram · Event-Driven
GitHub: vitalik-sell-radar
Sự kiện WebSocket · Phát hiện ví Vitalik bán · Đẩy Telegram dưới giây
Theo dõi ví Vitalik Buterin (vitalik.eth) phát hiện các giao dịch bán token ERC-20 qua subscribe sự kiện WebSocket — không polling, độ trễ dưới 1 giây. Tự động phân loại người nhận: DEX Router (Uniswap, 1inch, SushiSwap), ví nóng CEX (Binance, Coinbase, Kraken), hoặc LP Pool. Lấy giá token thời gian thực từ DexScreener. Failover đa RPC với tự động reconnect. Python thuần, chi phí 0 — dùng RPC công khai miễn phí.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
Vitalik Sell Radar — Event-Driven Edition
WebSocket subscription to ERC-20 Transfer events, real-time detection of
Vitalik's sell activity, instant push to Telegram.
Architecture:
1. WebSocket subscribes to Transfer(from=vitalik) events → sub-second detection
2. Classifies sell behavior (transfers to DEX Router / CEX / LP Pool)
3. Queries token info + price via DexScreener
4. Pushes alert to Telegram
5. Auto-reconnect + multi-RPC failover
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import aiohttp
import websockets
# Load .env file
def load_env():
env_path = Path(__file__).parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_env()
# ============================================================
# Configuration
# ============================================================
# Vitalik's main wallet
VITALIK_ADDRESS = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
VITALIK_PADDED = "0x" + VITALIK_ADDRESS[2:].lower().zfill(64)
# ERC-20 Transfer event topic
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
# WebSocket RPC endpoints (free, support eth_subscribe)
WS_ENDPOINTS = [
"wss://ethereum-rpc.publicnode.com",
"wss://eth.drpc.org",
"wss://ethereum.publicnode.com",
]
# HTTP RPC for querying token info
HTTP_RPC = os.environ.get("HTTP_RPC", "https://eth.drpc.org")
# Telegram
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# Known DEX Router addresses (sell destinations)
KNOWN_DEX_ROUTERS = {
# Uniswap
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap V2 Router",
"0xe592427a0aece92de3edee1f18e0157c05861564": "Uniswap V3 Router",
"0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "Uniswap V3 Router2",
"0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad": "Uniswap Universal Router",
"0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b": "Uniswap Universal Router (old)",
# 1inch
"0x1111111254eeb25477b68fb85ed929f73a960582": "1inch V5",
"0x111111125421ca6dc452d289314280a0f8842a65": "1inch V6",
# SushiSwap
"0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "SushiSwap Router",
# CoW Protocol
"0x9008d19f58aabd9ed0d60971565aa8510560ab41": "CoW Settlement",
# 0x
"0xdef1c0ded9bec7f1a1670819833240f027b25eff": "0x Exchange Proxy",
# Curve
"0x99a58482bd75cbab83b27ec03ca68ff489b5788f": "Curve Router",
}
# Known CEX hot wallets (partial list)
KNOWN_CEX = {
"0x28c6c06298d514db089934071355e5743bf21d60": "Binance Hot Wallet",
"0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance Hot Wallet 2",
"0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance Hot Wallet 3",
"0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance Hot Wallet 4",
"0x71660c4005ba85c37ccec55d0c4493e66fe775d3": "Coinbase",
"0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43": "Coinbase 10",
"0x503828976d22510aad0201ac7ec88293211d23da": "Coinbase 2",
"0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2": "FTX (defunct)",
"0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0": "Kraken",
"0xae2d4617c862309a3d75a0ffb358c7a5009c673f": "Kraken 10",
}
# Minimum notification amount (USD), 0 = notify all
MIN_NOTIFY_USD = int(os.environ.get("MIN_NOTIFY_USD", "0"))
# Reconnect settings
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60
# ============================================================
# Logging
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("vitalik-radar")
# ============================================================
# Global state
# ============================================================
# Dedup set for processed tx hashes (last 1000)
seen_txs: set = set()
seen_txs_list: list = []
# HTTP session
http_session: aiohttp.ClientSession | None = None
# Token info cache: {address: {symbol, name, decimals}}
token_cache: dict = {}
# Running flag
running = True
# ============================================================
# Utility functions
# ============================================================
def shorten_addr(addr: str) -> str:
"""Shorten address for display"""
return f"{addr[:6]}...{addr[-4:]}"
def decode_transfer_value(data_hex: str, decimals: int) -> float:
"""Decode Transfer event value"""
try:
raw = int(data_hex, 16)
return raw / (10 ** decimals)
except:
return 0.0
def topic_to_address(topic: str) -> str:
"""Extract 20-byte address from 32-byte topic"""
return "0x" + topic[-40:]
def classify_recipient(to_addr: str) -> tuple[str, str]:
"""
Classify recipient address.
Returns (type, name)
Type: "dex" / "cex" / "pool" / "unknown"
"""
to_lower = to_addr.lower()
if to_lower in KNOWN_DEX_ROUTERS:
return "dex", KNOWN_DEX_ROUTERS[to_lower]
if to_lower in KNOWN_CEX:
return "cex", KNOWN_CEX[to_lower]
return "unknown", ""
async def get_http_session() -> aiohttp.ClientSession:
global http_session
if http_session is None or http_session.closed:
http_session = aiohttp.ClientSession()
return http_session
async def rpc_call(method: str, params: list) -> dict | None:
"""HTTP JSON-RPC call"""
session = await get_http_session()
try:
async with session.post(
HTTP_RPC,
json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params},
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
data = await resp.json()
return data.get("result")
except Exception as e:
log.error(f"RPC call {method} failed: {e}")
return None
async def get_token_info(token_addr: str) -> dict:
"""Query ERC-20 token info (symbol, name, decimals)"""
addr_lower = token_addr.lower()
if addr_lower in token_cache:
return token_cache[addr_lower]
info = {"symbol": "???", "name": "Unknown", "decimals": 18, "address": token_addr}
# symbol()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x95d89b41"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
symbol_hex = hex_str[offset+64:offset+64+length*2]
info["symbol"] = bytes.fromhex(symbol_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["symbol"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
# decimals()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x313ce567"}, "latest"
])
if result and len(result) > 2:
try:
info["decimals"] = int(result, 16)
except:
pass
# name()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x06fdde03"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
name_hex = hex_str[offset+64:offset+64+length*2]
info["name"] = bytes.fromhex(name_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["name"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
token_cache[addr_lower] = info
return info
async def check_if_pool(addr: str) -> bool:
"""Check if address is a Uniswap V2/V3 pool (has token0 method)"""
result = await rpc_call("eth_call", [
{"to": addr, "data": "0x0dfe1681"}, "latest" # token0()
])
if result and len(result) == 66: # 0x + 64 hex chars
return True
return False
async def get_eth_price() -> float:
"""Get ETH price from CoinGecko"""
session = await get_http_session()
try:
async with session.get(
"https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
return data.get("ethereum", {}).get("usd", 0)
except:
return 2300 # fallback
async def get_token_price_usd(token_addr: str) -> float | None:
"""Get token USD price from DexScreener (free, no API key needed)"""
session = await get_http_session()
try:
async with session.get(
f"https://api.dexscreener.com/latest/dex/tokens/{token_addr}",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
pairs = data.get("pairs", [])
if pairs:
# Pick pair with highest liquidity
pairs.sort(key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0), reverse=True)
price = float(pairs[0].get("priceUsd", 0) or 0)
return price if price > 0 else None
except:
pass
return None
# ============================================================
# Telegram notifications
# ============================================================
async def send_telegram(text: str):
"""Send Telegram message"""
if not TG_BOT_TOKEN:
log.warning("[TG] No bot token configured, skipping notification")
return
session = await get_http_session()
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
try:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
result = await resp.json()
if not result.get("ok"):
log.error(f"[TG] Send failed: {result.get('description', '')}")
# Retry with plain text if HTML parse fails
if "parse" in result.get("description", "").lower():
payload["parse_mode"] = None
async with session.post(url, json=payload) as resp2:
pass
else:
log.info("[TG] Message sent")
except Exception as e:
log.error(f"[TG] Error: {e}")
# ============================================================
# Event handling
# ============================================================
async def handle_transfer_event(log_entry: dict):
"""Process a single Transfer event"""
tx_hash = log_entry.get("transactionHash", "")
token_addr = log_entry.get("address", "")
topics = log_entry.get("topics", [])
data = log_entry.get("data", "0x0")
# Dedup
dedup_key = f"{tx_hash}:{token_addr}"
if dedup_key in seen_txs:
return
seen_txs.add(dedup_key)
seen_txs_list.append(dedup_key)
# Cleanup (keep last 1000)
while len(seen_txs_list) > 1000:
old = seen_txs_list.pop(0)
seen_txs.discard(old)
# Parse topics: [Transfer, from, to]
if len(topics) < 3:
return
from_addr = topic_to_address(topics[1])
to_addr = topic_to_address(topics[2])
# Confirm it's from Vitalik
if from_addr.lower() != VITALIK_ADDRESS.lower():
return
# Get token info
token_info = await get_token_info(token_addr)
symbol = token_info["symbol"]
decimals = token_info["decimals"]
amount = decode_transfer_value(data, decimals)
if amount == 0:
return
# Classify recipient
recipient_type, recipient_name = classify_recipient(to_addr)
# If unknown, check if it's an LP pool
is_pool = False
if recipient_type == "unknown":
is_pool = await check_if_pool(to_addr)
if is_pool:
recipient_type = "pool"
recipient_name = "LP Pool"
# Determine if this is a "sell" action
is_sell = recipient_type in ("dex", "cex", "pool")
# If not a sell, just a regular transfer — log silently
if not is_sell:
log.info(f"[Transfer] {symbol} {amount:,.2f} → {shorten_addr(to_addr)} (regular transfer, not notifying)")
return
# Get price
token_price = await get_token_price_usd(token_addr)
usd_value = amount * token_price if token_price else None
# Below minimum notification amount — skip
if usd_value is not None and usd_value < MIN_NOTIFY_USD:
log.info(f"[Sell] {symbol} ${usd_value:.0f} < ${MIN_NOTIFY_USD} minimum, skipping")
return
# Build notification message
timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S UTC")
sell_type_label = {
"dex": "🔄 DEX Sell",
"cex": "🏦 CEX Transfer",
"pool": "🌊 Pool Sell",
}
msg_lines = [
f"<b>🚨 Vitalik Sell Signal</b>",
f"",
f"Token: <b>{symbol}</b>",
f"Amount: {amount:,.4f}",
]
if usd_value is not None:
msg_lines.append(f"Value: <b>${usd_value:,.0f}</b>")
if token_price is not None:
msg_lines.append(f"Price: ${token_price:,.8f}")
msg_lines.extend([
f"",
f"Type: {sell_type_label.get(recipient_type, 'Unknown')}",
f"Destination: {recipient_name or shorten_addr(to_addr)}",
f"Time: {timestamp}",
f"",
f"TX: https://etherscan.io/tx/{tx_hash}",
f"Token: https://dexscreener.com/ethereum/{token_addr}",
])
msg = "\n".join(msg_lines)
log.info(f"[SELL DETECTED] {symbol} {amount:,.4f} → {recipient_name or to_addr} | ${usd_value or '?'}")
await send_telegram(msg)
# ============================================================
# WebSocket listener main loop
# ============================================================
async def subscribe_and_listen(ws_url: str):
"""Connect to WebSocket and subscribe to Vitalik Transfer events"""
log.info(f"Connecting to {ws_url}...")
async with websockets.connect(
ws_url,
ping_interval=20,
ping_timeout=30,
close_timeout=10,
max_size=2**20, # 1MB
) as ws:
# Subscribe to Transfer FROM Vitalik
sub_from = {
"jsonrpc": "2.0", "id": 1,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_from))
resp = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(resp)
if "error" in data:
raise Exception(f"Subscribe failed: {data['error']}")
sub_id_from = data.get("result", "")
log.info(f"✅ Subscribed to Transfer FROM Vitalik (id={sub_id_from[:10]}...)")
# Also subscribe to Transfer TO Vitalik (monitor buys/receives)
sub_to = {
"jsonrpc": "2.0", "id": 2,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, None, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_to))
resp2 = await asyncio.wait_for(ws.recv(), timeout=10)
data2 = json.loads(resp2)
sub_id_to = data2.get("result", "")
if sub_id_to:
log.info(f"✅ Subscribed to Transfer TO Vitalik (id={sub_id_to[:10]}...)")
log.info("🔍 Monitoring Vitalik wallet... waiting for events")
# Listen for events
async for message in ws:
if not running:
break
try:
evt = json.loads(message)
if "params" not in evt:
continue
sub_id = evt["params"].get("subscription", "")
log_entry = evt["params"].get("result", {})
if sub_id == sub_id_from:
# Vitalik sent tokens → check if it's a sell
await handle_transfer_event(log_entry)
elif sub_id == sub_id_to:
# Vitalik received tokens — log silently
token_addr = log_entry.get("address", "")
token_info = await get_token_info(token_addr)
data_hex = log_entry.get("data", "0x0")
amount = decode_transfer_value(data_hex, token_info["decimals"])
log.debug(f"[Receive] {token_info['symbol']} +{amount:,.4f}")
except json.JSONDecodeError:
continue
except Exception as e:
log.error(f"Event handling error: {e}", exc_info=True)
async def main():
"""Main loop with auto-reconnect"""
global running
# Validate required config
if not TG_BOT_TOKEN:
log.warning("TG_BOT_TOKEN not set — Telegram notifications disabled")
if not TG_CHAT_ID:
log.warning("TG_CHAT_ID not set — Telegram notifications disabled")
# Signal handling
def shutdown(sig, frame):
global running
log.info(f"Received signal {sig}, shutting down...")
running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# Startup notice
log.info("=" * 50)
log.info("Vitalik Sell Radar — Started")
log.info(f"Monitoring: {VITALIK_ADDRESS}")
log.info(f"Min notify: ${MIN_NOTIFY_USD}")
log.info(f"Telegram: {'✅ Configured' if TG_BOT_TOKEN else '❌ Not configured'}")
log.info("=" * 50)
if TG_BOT_TOKEN and TG_CHAT_ID:
await send_telegram(
"🟢 Vitalik Sell Radar — Started\n\n"
f"Monitoring: {shorten_addr(VITALIK_ADDRESS)}\n"
f"Min notify: ${MIN_NOTIFY_USD}\n"
"Mode: WebSocket event-driven (sub-second latency)"
)
endpoint_idx = 0
reconnect_delay = RECONNECT_DELAY
while running:
ws_url = WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]
try:
await subscribe_and_listen(ws_url)
reconnect_delay = RECONNECT_DELAY # reset on success
except websockets.exceptions.ConnectionClosed as e:
log.warning(f"WebSocket closed: {e}")
except asyncio.TimeoutError:
log.warning("WebSocket timeout")
except Exception as e:
log.error(f"WebSocket error: {e}")
if not running:
break
# Switch endpoint and retry
endpoint_idx += 1
log.info(f"Reconnecting in {reconnect_delay}s... (next: {WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]})")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 1.5, MAX_RECONNECT_DELAY)
# Cleanup
if http_session and not http_session.closed:
await http_session.close()
log.info("Vitalik Sell Radar — Stopped")
if __name__ == "__main__":
asyncio.run(main())
On-Chain Narrative Radar (Radar tường thuật on-chain)
Ngày: 2026.04.28 Thẻ: Python · GMGN · DEXScreener · Telegram
Theo động lượng · Phát hiện token on-chain mới · ETH/SOL/BSC/Base
Động lượng là động cơ đẩy duy nhất — tường thuật chỉ là nhãn phân loại. Quét 30 giây mỗi lần trên 4 chain. Token phải tăng vốn hóa 3 lần liên tiếp với tổng tăng ≥5% mới kích hoạt cảnh báo. Tường thuật (Musk/Trump, Binance/CZ, người nổi tiếng) được phân loại ★★★/★★/★ nhưng không tự kích hoạt push. Kiểm tra an toàn: SOL dùng RugCheck, EVM dùng GoPlus. Python thuần, chi phí AI = 0.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
叙事雷达 → 链上雷达 v1
纯Python,零AI成本(关键词匹配 + 叙事去重)
三条推送通道:
1. 全新叙事 — 从未见过的概念/故事,全链推
2. 马斯克/川普相关 — 重点ETH+SOL,BSC也推
3. 币安/CZ相关 — 只推BSC
数据源:GMGN新币 + DEXScreener搜索
叙事历史:SQLite去重
"""
import requests
import json
import time
import os
import re
import sqlite3
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from difflib import SequenceMatcher
# === 配置 ===
DATA_DIR = os.path.expanduser("~/crypto-trading")
DB_FILE = os.path.join(DATA_DIR, "narrative_history.db")
LOG_FILE = os.path.join(DATA_DIR, "narrative_radar.log")
SEEN_FILE = os.path.join(DATA_DIR, "narrative_seen.json")
FLAP_SEEN_FILE = os.path.join(DATA_DIR, "flap_seen.json")
# 扫描间隔
SCAN_INTERVAL = 30 # 30秒(GMGN数据约1-5分钟刷新一次,10秒太频繁且数据不变)
# 动量追踪器 — 内存中记录每个币的价格/市值快照
# {address: [{'ts': timestamp, 'mc': market_cap, 'vol': volume, 'price': price}, ...]}
MOMENTUM_TRACKER = {}
MOMENTUM_PUSHED = {} # {address: {'count': N, 'last_ts': ts, 'last_mc': mc}} 推送计数
MOMENTUM_CONSECUTIVE_UP = 3 # 连续涨3轮(数据实际变化时才算一轮)
# 从.env读取TG配置
def load_env():
env = {}
env_file = os.path.expanduser("~/.env")
if os.path.exists(env_file):
with open(env_file) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v
return env
ENV = load_env()
TG_TOKEN = ENV.get('TELEGRAM_BOT_TOKEN', '')
TG_CHAT_ID = int(os.environ.get('TG_CHAT_ID', '0'))
GMGN_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'application/json',
'Referer': 'https://gmgn.ai/',
}
# ============================================================
# 马斯克/川普关键词库(大小写不敏感)
# ============================================================
MUSK_TRUMP_KEYWORDS = {
# 马斯克核心
'musk', 'elon', 'elonmusk',
# SpaceX/Tesla/X
'spacex', 'starship', 'tesla', 'cybertruck', 'roadster',
'neuralink', 'boring', 'hyperloop', 'xai', 'grok',
# 马斯克相关人物/宠物/梗
'floki', 'shiba', # 只在新币上下文中用
'doge father', 'dogefather', 'technoking',
'mars colony', 'mars',
# 川普核心
'trump', 'donald', 'maga', 'potus', 'trump47',
'melania', 'barron', 'ivanka',
# 川普相关
'dark maga', 'darkmaga', 'ultra maga', 'save america',
'truth social', 'covfefe',
# 马斯克+川普联动
'doge department', 'd.o.g.e', 'government efficiency',
}
# 马斯克/川普正则(捕捉变体)
MUSK_TRUMP_PATTERNS = [
r'\belon\b', r'\bmusk\b', r'\btrump\b', r'\bmaga\b',
r'\bspacex\b', r'\bstarship\b', r'\btesla\b', r'\bgrok\b',
r'\bmelania\b', r'\bbarron\b', r'\bdoge\s*department\b',
r'\bd\.?o\.?g\.?e\b', # D.O.G.E变体
r'\bx\s*ai\b', r'\bneuralink\b',
]
# ============================================================
# 币安/CZ关键词库
# ============================================================
BINANCE_CZ_KEYWORDS = {
# CZ核心
'cz', 'changpeng', 'zhao', 'czb', 'czbinance',
# 何一(BSC现在的核心推手!)
'heyi', 'yi he', 'he yi', '何一', 'yihe',
'sister yi', 'yi jie', '一姐', '何一姐',
# 币安品牌
'binance', 'bnb', 'pancake', 'pancakeswap',
# CZ相关动态词(书、活动、推特高频词)
'giggle academy', 'binance life', 'bnb chain',
'principles', 'cz book',
# YZi Labs (原Binance Labs)
'yzi', 'yzi labs',
# 中文关键词(BSC上常见)
'赵长鹏', '币安', '长鹏', 'cz的', '何一的',
# Four.meme平台相关
'fourmeme', 'four meme', '4meme',
# CZ/何一推特互动高频词
'czs dog', 'cz dog', 'bnb dog',
'build on bnb', 'bnb ecosystem',
}
BINANCE_CZ_PATTERNS = [
r'\bcz\b', r'\bbinance\b', r'\bbnb\b',
r'\bheyi\b', r'\byi\s*he\b', r'\bhe\s*yi\b',
r'\b何一\b', r'\b一姐\b',
r'\bpancake\b', r'\bgiggle\b', r'\byzi\b',
r'\bfourmeme\b', r'\b4meme\b',
]
# ============================================================
# 推特热点/名人关键词库(★★级别)
# ============================================================
CELEBRITY_VIRAL_KEYWORDS = {
# 科技名人
'vitalik', 'buterin', 'sam altman', 'satoshi',
'michael saylor', 'saylor', 'cathie wood',
'jack dorsey', 'zuckerberg', 'bezos',
'jensen huang', 'nvidia', 'tim cook',
# 币圈名人
'justin sun', 'sun yuchen', '孙宇晨', 'tron',
'arthur hayes', 'su zhu', '3ac',
'brian armstrong', 'coinbase',
'larry fink', 'blackrock',
'gary gensler', 'sec',
'michael novogratz', 'galaxy',
# 政治/社会名人
'biden', 'obama', 'putin', 'xi jinping',
'kanye', 'drake', 'snoop dogg', 'paris hilton',
'mark cuban', 'mr beast', 'mrbeast',
# 病毒式传播热词(龙虾级别的梗)
'lobster', '龙虾', 'lobsta',
'hawk tuah', 'griddy', 'skibidi',
'rizz', 'sigma', 'gyatt',
# 重大事件关键词
'etf', 'halving', '减半',
'world war', 'wwiii',
'fed', 'rate cut', '降息',
'tiktok ban', 'tiktok',
}
CELEBRITY_VIRAL_PATTERNS = [
r'\bvitalik\b', r'\bsaylor\b', r'\bblackrock\b',
r'\bcoinbase\b', r'\bjustin\s*sun\b', r'\blobster\b',
r'\betf\b', r'\bhalving\b', r'\bmrbeast\b',
r'\bsnoop\b', r'\bkanye\b', r'\bdrake\b',
]
# ============================================================
# 通用垃圾词(过滤明显的骗局/低质量币)
# ============================================================
SPAM_PATTERNS = [
r'airdrop', r'presale', r'pre\s*sale',
r'1000x', r'100x guaranteed',
r'safe\s*moon', r'baby\s*\w+', # babydoge等仿盘
r'pornhub', r'porn', r'xxx', r'nsfw',
r'nigga', r'nigger', r'faggot',
r'scam', r'rugpull', r'rug\s*pull',
r'official\s*token', r'official\s*coin',
]
# 常见无叙事意义的单词(过滤单词名币)
COMMON_NOISE_WORDS = {
'nice', 'good', 'bad', 'cool', 'hot', 'big', 'small',
'life', 'love', 'hate', 'happy', 'sad', 'fun', 'lol',
'cat', 'dog', 'moon', 'sun', 'star', 'king', 'queen',
'gold', 'rich', 'cash', 'money', 'pay', 'buy', 'sell',
'pump', 'dump', 'bull', 'bear', 'green', 'red',
'hello', 'world', 'yes', 'no', 'wow', 'omg', 'lmao',
'simp', 'chad', 'based', 'cope', 'seethe',
'test', 'new', 'old', 'real', 'fake',
# 垃圾币名常见词
'shit', 'shitcoin', 'fuck', 'fart', 'poop', 'pee',
'cum', 'dick', 'ass', 'boob', 'tit',
'nigga', 'retard', 'slop',
# 超通用币名
'the', 'and', 'for', 'from', 'with', 'this', 'that',
'coin', 'token', 'meme', 'pepe', 'wojak',
'peg', 'usd', 'usdt', 'usdc', 'dai',
}
# ============================================================
# 工具函数
# ============================================================
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
os.makedirs(DATA_DIR, exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(line + '\n')
def load_flap_seen():
if os.path.exists(FLAP_SEEN_FILE):
try:
with open(FLAP_SEEN_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_flap_seen(data):
# 只保留7天内的
cutoff = int(time.time()) - 86400 * 7
data = {k: v for k, v in data.items() if v > cutoff}
with open(FLAP_SEEN_FILE, 'w') as f:
json.dump(data, f)
def tg_send(text, parse_mode='Markdown'):
if not TG_TOKEN:
log(f"[TG] No token, skip: {text[:80]}")
return False
try:
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text, 'parse_mode': parse_mode},
timeout=10
)
result = resp.json()
if not result.get('ok'):
# Markdown失败时降级到纯文本
if 'can\'t parse' in str(result.get('description', '')).lower():
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text},
timeout=10
)
else:
log(f"[TG] Error: {result.get('description', '')}")
return False
return True
except Exception as e:
log(f"[TG] Send error: {e}")
return False
# ============================================================
# 叙事历史数据库
# ============================================================
def init_db():
"""初始化SQLite叙事历史库"""
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
# 所有见过的叙事主题
c.execute('''CREATE TABLE IF NOT EXISTS narratives (
id INTEGER PRIMARY KEY AUTOINCREMENT,
theme TEXT NOT NULL, -- 归一化的叙事主题(小写)
first_token_name TEXT, -- 第一次出现时的代币名
first_token_address TEXT, -- 第一次出现时的地址
first_chain TEXT, -- 第一次出现的链
first_seen_at INTEGER, -- 第一次看到的时间戳
token_count INTEGER DEFAULT 1, -- 出现过多少次
last_seen_at INTEGER -- 最近一次看到
)''')
# 所有扫描过的代币
c.execute('''CREATE TABLE IF NOT EXISTS tokens_seen (
address TEXT PRIMARY KEY,
chain TEXT,
name TEXT,
symbol TEXT,
narrative_theme TEXT,
category TEXT, -- 'musk_trump' / 'binance_cz' / 'novel' / 'common'
first_seen_at INTEGER,
market_cap REAL,
pushed INTEGER DEFAULT 0, -- 是否已推送
seen_count INTEGER DEFAULT 1 -- 出现次数
)''')
# 索引
c.execute('CREATE INDEX IF NOT EXISTS idx_theme ON narratives(theme)')
c.execute('CREATE INDEX IF NOT EXISTS idx_addr ON tokens_seen(address)')
conn.commit()
return conn
def normalize_theme(name, symbol):
"""
从代币名称+符号提取归一化的叙事主题
例如:'Elon Mars Colony' → 'elon mars colony'
'TRUMP2028' → 'trump'
'PancakeBunny' → 'pancake bunny'
"""
# 合并name和symbol
text = f"{name} {symbol}".lower().strip()
# 去除常见后缀/前缀
noise = ['token', 'coin', 'inu', 'swap', 'finance', 'protocol',
'dao', 'defi', 'nft', 'meta', 'verse', 'fi', 'ai',
'pepe', 'wojak', 'chad', 'based']
# 分割camelCase
text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
# 去除数字(如2028、1000x)
text = re.sub(r'\d+x?', '', text)
# 只保留字母和空格
text = re.sub(r'[^a-z\s]', ' ', text)
# 去噪
words = [w for w in text.split() if w and len(w) > 1 and w not in noise]
if not words:
return name.lower().strip()
return ' '.join(sorted(set(words)))
def is_similar_theme(theme1, theme2, threshold=0.7):
"""模糊匹配两个叙事主题"""
if theme1 == theme2:
return True
# 子串匹配
if theme1 in theme2 or theme2 in theme1:
return True
# 词重叠
words1 = set(theme1.split())
words2 = set(theme2.split())
if words1 and words2:
overlap = len(words1 & words2) / min(len(words1), len(words2))
if overlap >= 0.6:
return True
# 序列匹配
return SequenceMatcher(None, theme1, theme2).ratio() >= threshold
def check_narrative_novelty(conn, theme, name, symbol, address, chain):
"""
检查叙事状态
返回:
('novel', None) — 第一次见到
('heating', narrative_row) — 短时间内持续出现新币!热点信号!
('existing', existing_theme_row) — 已有叙事,不热
核心逻辑:同一主题在30分钟内出现2+个不同的币 = 热点
"""
c = conn.cursor()
now = int(time.time())
HEAT_WINDOW = 1800 # 30分钟窗口
HEAT_THRESHOLD = 2 # 窗口内出现2个以上同主题币就是热点
# 精确匹配
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives WHERE theme = ?', (theme,))
exact = c.fetchone()
if exact:
row_id, _, _, _, _, first_seen, count, last_seen = exact
# 更新计数
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE theme = ?',
(new_count, now, theme))
conn.commit()
# 热点判断:在HEAT_WINDOW内出现了多个币
if now - first_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
# 或者:最近一次和这次间隔很短(说明持续在冒)
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
return ('existing', exact)
# 模糊匹配 — 取最近1000个主题比对
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives ORDER BY last_seen_at DESC LIMIT 1000')
for row in c.fetchall():
if is_similar_theme(theme, row[1]):
row_id, _, _, _, _, first_seen, count, last_seen = row
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE id = ?',
(new_count, now, row[0]))
conn.commit()
# 热点判断
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', row)
return ('existing', row)
# 第一次见到 — 记录
c.execute('''INSERT INTO narratives (theme, first_token_name, first_token_address, first_chain, first_seen_at, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?)''',
(theme, name, address, chain, now, now))
conn.commit()
return ('novel', None)
def get_token_seen_count(conn, address):
"""获取代币出现次数"""
c = conn.cursor()
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
row = c.fetchone()
return row[0] if row else 0
def is_token_seen(conn, address):
"""检查代币是否已经扫描过"""
c = conn.cursor()
c.execute('SELECT address FROM tokens_seen WHERE address = ?', (address,))
return c.fetchone() is not None
def record_token(conn, address, chain, name, symbol, theme, category, mc, pushed=False):
"""记录已扫描的代币 — 重复出现时计数+1"""
c = conn.cursor()
# 检查是否已存在
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
existing = c.fetchone()
if existing:
# 已存在:计数+1,更新市值
new_count = existing[0] + 1
c.execute('''UPDATE tokens_seen SET seen_count = ?, market_cap = ?, category = ?
WHERE address = ?''', (new_count, mc, category, address))
else:
# 新记录
c.execute('''INSERT INTO tokens_seen
(address, chain, name, symbol, narrative_theme, category, first_seen_at, market_cap, pushed, seen_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)''',
(address, chain, name, symbol, theme, category, int(time.time()), mc, 1 if pushed else 0))
conn.commit()
# ============================================================
# 叙事分类引擎
# ============================================================
def classify_narrative(name, symbol, chain):
"""
分类代币叙事
返回:('musk_trump', matched_keywords) / ('binance_cz', matched_keywords) / ('novel', None) / ('common', None)
"""
text = f"{name} {symbol}".lower()
# 1. 检查是否是垃圾币
for pat in SPAM_PATTERNS:
if re.search(pat, text, re.IGNORECASE):
return ('spam', None)
# 2. 马斯克/川普检测
matched_mt = []
for kw in MUSK_TRUMP_KEYWORDS:
if kw.lower() in text:
matched_mt.append(kw)
if not matched_mt:
for pat in MUSK_TRUMP_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_mt.append(m.group())
if matched_mt:
# 马斯克/川普:重点ETH+SOL,BSC也可以
chain_lower = chain.lower()
if chain_lower in ('eth', 'ethereum', 'sol', 'solana', 'bsc', 'base'):
return ('musk_trump', matched_mt)
# 3. 币安/CZ检测 — 只在BSC上推
matched_bc = []
for kw in BINANCE_CZ_KEYWORDS:
if kw.lower() in text:
matched_bc.append(kw)
if not matched_bc:
for pat in BINANCE_CZ_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_bc.append(m.group())
if matched_bc:
chain_lower = chain.lower()
if chain_lower in ('bsc',):
return ('binance_cz', matched_bc)
else:
return ('binance_cz_wrong_chain', matched_bc)
# 4. 名人/推特热点检测(★★级别)
matched_cv = []
for kw in CELEBRITY_VIRAL_KEYWORDS:
if kw.lower() in text:
matched_cv.append(kw)
if not matched_cv:
for pat in CELEBRITY_VIRAL_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_cv.append(m.group())
if matched_cv:
return ('celebrity_viral', matched_cv)
# 5. 都不匹配 → 需要进一步检查是否全新叙事
return ('check_novelty', None)
# ============================================================
# 安全检查(复用现有逻辑)
# ============================================================
def check_token_safety(chain, address):
"""快速安全检查 — 只拦硬伤(蜜罐/可增发),卖税不作为否决条件"""
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://api.rugcheck.xyz/v1/tokens/{address}/report', timeout=10)
if r.status_code == 200:
data = r.json()
score = data.get('score', 999)
mint = data.get('mintAuthority')
freeze = data.get('freezeAuthority')
return {
'safe': not mint and not freeze,
'score': score, 'mint': mint is not None,
'freeze': freeze is not None
}
except:
pass
else:
chain_map = {'ethereum': '1', 'eth': '1', 'bsc': '56', 'base': '8453'}
cid = chain_map.get(chain, '1')
try:
r = requests.get(f'https://api.gopluslabs.io/api/v1/token_security/{cid}?contract_addresses={address}', timeout=10)
if r.status_code == 200:
result = r.json().get('result', {})
data = result.get(address.lower(), {})
if data:
honeypot = data.get('is_honeypot', '0') == '1'
mintable = data.get('is_mintable', '0') == '1'
sell_tax = float(data.get('sell_tax', '0') or '0')
buy_tax = float(data.get('buy_tax', '0') or '0')
return {
'safe': not honeypot and not mintable, # 卖税不作为否决
'honeypot': honeypot, 'mintable': mintable,
'sell_tax': sell_tax, 'buy_tax': buy_tax
}
except:
pass
return {'safe': False, 'reason': '无法检查'} # 无法检查时不推,宁可错过不踩坑
# ============================================================
# GMGN数据获取
# ============================================================
def gmgn_get(url):
try:
resp = requests.get(url, headers=GMGN_HEADERS, timeout=15)
if resp.status_code == 200:
return resp.json().get('data', {})
except:
pass
return {}
def fetch_token_description(chain, address):
"""获取代币描述/故事 — 叙事雷达核心信息"""
desc = ''
# SOL链:Pump.fun有最完整的description
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://frontend-api-v3.pump.fun/coins/{address}', timeout=8)
if r.status_code == 200:
data = r.json()
desc = data.get('description', '') or ''
twitter = data.get('twitter', '') or ''
telegram = data.get('telegram', '') or ''
website = data.get('website', '') or ''
return {
'description': desc.strip(),
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
# 所有链:DEXScreener info字段(网站+社交链接)
try:
chain_dex = {'sol': 'solana', 'eth': 'ethereum', 'bsc': 'bsc', 'base': 'base',
'solana': 'solana', 'ethereum': 'ethereum'}.get(chain, chain)
r = requests.get(f'https://api.dexscreener.com/latest/dex/tokens/{address}', timeout=8)
if r.status_code == 200:
pairs = r.json().get('pairs', [])
if pairs:
info = pairs[0].get('info', {})
websites = info.get('websites', [])
socials = info.get('socials', [])
twitter = ''
telegram = ''
website = ''
for s in socials:
if s.get('type') == 'twitter':
twitter = s.get('url', '')
elif s.get('type') == 'telegram':
telegram = s.get('url', '')
for w in websites:
if w.get('label', '').lower() == 'website':
website = w.get('url', '')
if not desc:
# DEXScreener没有description但有社交信息
return {
'description': desc,
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
return {'description': desc, 'twitter': '', 'telegram': '', 'website': ''}
def fetch_new_tokens():
"""从GMGN获取各链新币 + 多维度覆盖"""
all_tokens = []
seen_addrs = set()
for chain in ['eth', 'bsc', 'base']:
# 多维度拉数据,避免漏掉
urls = [
# 按创建时间 — 最新的币
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=open_timestamp&direction=desc&limit=100',
# 按交易量 — 最活跃的币
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=swaps&direction=desc&limit=50',
]
for url in urls:
data = gmgn_get(url)
tokens = data.get('rank', [])
for t in tokens:
addr = t.get('address', '')
if not addr or addr in seen_addrs:
continue
mc = t.get('market_cap', 0) or t.get('fdv', 0) or 0
liq = t.get('liquidity', 0) or 0
# 基本过滤:太小的不看
if mc < 1000 or liq < 500 or mc > 10000000:
continue
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 999
# 不限年龄 — 动量追踪核心逻辑:涨就推,不管新旧
seen_addrs.add(addr)
all_tokens.append({
'address': addr,
'chain': chain,
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': t.get('volume', 0) or 0,
'holders': t.get('holder_count', 0) or 0,
'sm': t.get('smart_degen_count', 0) or 0,
'chg_1h': t.get('price_change_percent1h', 0) or 0,
'chg_24h': t.get('price_change_percent', 0) or 0,
'age_h': age_h,
'price': t.get('price', 0),
'buys_1h': t.get('buys', 0) or 0,
'sells_1h': t.get('sells', 0) or 0,
})
time.sleep(0.3)
return all_tokens
def fetch_flap_tokens():
"""
FLAP平台扫描 — BSC社区驱动型发射台
找形态:跌下来但有底部支撑(有庄在低位推)
特征:24h跌了,但1h企稳/反弹,买入>卖出,holders在涨
"""
data = gmgn_get(
'https://gmgn.ai/defi/quotation/v1/rank/bsc/swaps/24h?launchpad=flap&orderby=volume&direction=desc&limit=30'
)
tokens = data.get('rank', [])
candidates = []
for t in tokens:
addr = t.get('address', '')
if not addr:
continue
mc = t.get('market_cap', 0) or 0
liq = t.get('liquidity', 0) or 0
vol = t.get('volume', 0) or 0
holders = t.get('holder_count', 0) or 0
buys = t.get('buys', 0) or 0
sells = t.get('sells', 0) or 0
chg_1h = t.get('price_change_percent1h', 0) or 0
chg_24h = t.get('price_change_percent', 0) or 0
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 0
# 基本门槛
if mc < 1000 or liq < 500:
continue
if holders < 5:
continue
# 底部支撑形态判断:
# 条件1: 24h跌了(或者涨幅有限),说明不是刚拉的
# 条件2: 1h跌幅小于24h跌幅,说明在企稳
# 条件3: 买入 > 卖出,有人在接
buy_ratio = buys / max(sells, 1)
is_support = False
reason = ''
# 形态A: 24h跌了,1h在企稳/反弹
if chg_24h < -10 and chg_1h > chg_24h * 0.3:
is_support = True
reason = f'24h跌{chg_24h:.0f}%但1h企稳{chg_1h:+.0f}%'
# 形态B: 24h微跌或横盘,1h微涨,买卖比健康
if -10 <= chg_24h <= 30 and chg_1h > -5 and buy_ratio > 1.1:
is_support = True
reason = f'底部横盘 买卖比{buy_ratio:.2f}'
# 形态C: 大跌后强反弹
if chg_24h < -30 and chg_1h > 10:
is_support = True
reason = f'大跌{chg_24h:.0f}%后反弹{chg_1h:+.0f}%'
if is_support and buy_ratio >= 1.0:
candidates.append({
'address': addr,
'chain': 'bsc',
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': vol,
'holders': holders,
'sm': 0,
'chg_1h': chg_1h,
'chg_24h': chg_24h,
'age_h': age_h,
'price': t.get('price', 0),
'buys': buys,
'sells': sells,
'buy_ratio': buy_ratio,
'support_reason': reason,
'launchpad': 'flap',
})
# 按市值排序
candidates.sort(key=lambda x: x['mc'], reverse=True)
return candidates
def format_flap_alert(token, desc_info=None):
"""FLAP低吸信号推送"""
msg = f"链上雷达 — FLAP低吸信号\n"
msg += f"链: BSC | 平台: FLAP\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 故事描述
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"形态: {token['support_reason']}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"24h量 ${token['volume']:>12,.0f}\n"
msg += f"持有人 {token['holders']:>12,d}\n"
msg += f"买/卖 {token['buys']:>6,d}/{token['sells']:>6,d}\n"
msg += f"买卖比 {token['buy_ratio']:>12.2f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
msg += f"24h涨幅 {token['chg_24h']:>+11.1f}%\n"
msg += f"```\n"
msg += "\nFLAP社区币 — 低吸进场信号"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# 推送格式
# ============================================================
def format_musk_trump_alert(token, matched_kw, desc_info=None):
"""马斯克/川普叙事推送"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"链上雷达 — 马斯克/川普概念\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 叙事故事(核心!)
desc = (desc_info or {}).get('description', '')
if desc:
# 截取前200字符,避免太长
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"命中关键词: {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```\n"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_binance_cz_alert(token, matched_kw, desc_info=None):
"""币安/CZ叙事推送"""
msg = f"链上雷达 — 币安/CZ概念\n"
msg += f"链: BSC\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 叙事故事
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"命中关键词: {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```\n"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_novel_narrative_alert(token, theme, desc_info=None):
"""全新叙事推送 — 保留备用"""
return format_heating_narrative_alert(token, theme, 1, desc_info)
def format_heating_narrative_alert(token, theme, count, desc_info=None):
"""叙事热点推送 — 同主题持续冒新币"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"链上雷达 — 叙事热点\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 叙事故事
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 300:
desc = desc[:300] + '...'
msg += f"故事: {desc}\n\n"
else:
msg += f"叙事主题: {theme}\n\n"
msg += f"同类概念已出现{count}个币 — 持续有人做\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"持有人 {token['holders']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# 动量追踪器 — 持续上涨+放量检测
# ============================================================
def track_momentum(tokens):
"""
每轮扫描更新币的快照。
连续多轮市值上涨+成交量增加 = 动量信号,直接推。
"""
global MOMENTUM_TRACKER, MOMENTUM_PUSHED
now = time.time()
alerts = []
# 当前轮所有地址
current_addrs = set()
for token in tokens:
addr = token['address']
mc = token['mc']
vol = token.get('volume', 0) or 0
price = token.get('price', 0) or 0
buys = token.get('buys_1h', 0) or token.get('buys', 0) or 0
current_addrs.add(addr)
# 基本门槛
if mc < 1000 or token.get('liq', 0) < 500 or mc > 10000000:
continue
# 记录快照 — 只有数据真正变化时才记录(GMGN有缓存)
if addr not in MOMENTUM_TRACKER:
MOMENTUM_TRACKER[addr] = []
snapshots = MOMENTUM_TRACKER[addr]
# 跳过重复数据(跟上一次完全一样就不记录)
if snapshots and snapshots[-1]['mc'] == mc and snapshots[-1]['vol'] == vol:
continue # 数据没变,跳过
snapshots.append({
'ts': now,
'mc': mc,
'vol': vol,
'price': price,
'buys': buys,
})
# 只保留最近20个快照(约200秒)
if len(snapshots) > 20:
snapshots[:] = snapshots[-20:]
# 至少需要3个快照才能判断
if len(snapshots) < MOMENTUM_CONSECUTIVE_UP:
continue
# 检测最近N轮是否持续涨
recent = snapshots[-MOMENTUM_CONSECUTIVE_UP:]
consecutive_up = True
total_gain = 0
for i in range(1, len(recent)):
prev_mc = recent[i-1]['mc']
curr_mc = recent[i]['mc']
if prev_mc <= 0:
consecutive_up = False
break
gain = (curr_mc - prev_mc) / prev_mc
if gain <= 0: # 任何一轮没涨就不算
consecutive_up = False
break
total_gain += gain
if not consecutive_up:
continue
# 连续涨了!检查放量(成交量在增)
vol_increasing = True
for i in range(1, len(recent)):
if recent[i]['buys'] < recent[i-1]['buys'] * 0.8: # 允许小幅波动
vol_increasing = False
break
# 计算总涨幅
first_mc = recent[0]['mc']
last_mc = recent[-1]['mc']
pct_gain = ((last_mc - first_mc) / first_mc * 100) if first_mc > 0 else 0
# 推送条件:连续涨 + 涨幅>5%
if pct_gain < 5:
continue
# 信号计数:同一个币每次触发信号,计数+1
push_info = MOMENTUM_PUSHED.get(addr, {'count': 0, 'last_ts': 0, 'last_mc': 0})
# 必须比上次推送时市值还高才推(真的还在涨)
if push_info['count'] > 0 and last_mc <= push_info['last_mc']:
continue
push_info['count'] += 1
push_info['last_ts'] = now
push_info['last_mc'] = last_mc
signal_count = push_info['count']
# 安全检查
safety = check_token_safety(token['chain'], addr)
if not safety.get('safe'):
continue
# 叙事分类 → 星级评分
category, matched_kw = classify_narrative(token['name'], token['symbol'], token['chain'])
is_flap = token.get('launchpad') == 'flap'
if category == 'musk_trump':
stars = 3
narrative_tag = f"马斯克/川普概念 ({', '.join(matched_kw[:3])})"
elif category == 'binance_cz':
stars = 3
narrative_tag = f"币安/CZ概念 ({', '.join(matched_kw[:3])})"
elif category == 'celebrity_viral':
stars = 2
narrative_tag = f"名人/热点 ({', '.join(matched_kw[:3])})"
elif is_flap:
stars = 2
narrative_tag = "FLAP社区币"
else:
# 检查是否全新叙事
theme = normalize_theme(token['name'], token['symbol'])
theme_words = [w for w in theme.split() if w not in COMMON_NOISE_WORDS and len(w) > 2]
if len(theme_words) >= 2:
stars = 2
narrative_tag = f"叙事: {theme}"
else:
stars = 1
narrative_tag = "无明确叙事"
# 生成推送
desc_info = fetch_token_description(token['chain'], addr)
# FLAP币额外标注社区/CTO信息
if is_flap:
has_twitter = bool(desc_info.get('twitter'))
has_tg = bool(desc_info.get('telegram'))
has_web = bool(desc_info.get('website'))
community_tags = []
if has_twitter:
community_tags.append("有推特")
if has_tg:
community_tags.append("有TG群")
if has_web:
community_tags.append("有官网")
if community_tags:
narrative_tag += f" | {' '.join(community_tags)}"
stars = min(3, stars + 1) # 有社区加一星
else:
narrative_tag += " | 无社区链接"
msg = format_momentum_alert(token, pct_gain, len(recent), vol_increasing, stars, narrative_tag, desc_info, signal_count)
alerts.append({'msg': msg, 'token': token})
MOMENTUM_PUSHED[addr] = push_info
log(f"[动量信号{signal_count}] {token['name']} ({token['symbol']}) on {token['chain']} — 连涨{len(recent)}轮 +{pct_gain:.1f}%")
# 清理不再出现的币
stale = [a for a in MOMENTUM_TRACKER if a not in current_addrs]
for a in stale:
if now - MOMENTUM_TRACKER[a][-1]['ts'] > 600: # 10分钟没出现就清理
del MOMENTUM_TRACKER[a]
# 清理推送记录 — 1小时没出现的清掉
MOMENTUM_PUSHED = {k: v for k, v in MOMENTUM_PUSHED.items() if now - v.get('last_ts', 0) < 3600}
return alerts
def format_momentum_alert(token, pct_gain, rounds, vol_up, stars, narrative_tag, desc_info=None, seen_count=0):
"""持续上涨动量推送 — 带叙事星级"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
vol_tag = "放量" if vol_up else ""
star_str = "★" * stars + "☆" * (3 - stars)
# 信号编号从标题移到下面
msg = f"链上雷达\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 故事描述
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"叙事: {narrative_tag}\n"
msg += f"连涨{rounds}轮 +{pct_gain:.1f}% {vol_tag}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```\n"
msg += f"评星: {star_str} 出现次数: {seen_count}"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
def format_celebrity_alert(token, matched_kw, desc_info=None):
"""名人/推特热点推送 ★★"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"链上雷达 — 名人/热点 ★★\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"命中关键词: {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# 核心扫描逻辑
# ============================================================
def scan_narratives():
"""主扫描函数"""
conn = init_db()
tokens = fetch_new_tokens()
log(f"扫描 {len(tokens)} 个新币...")
# === 动量追踪 — 每轮更新所有币的快照,检测持续上涨 ===
# 拉FLAP币一起喂进动量追踪器
flap_tokens = []
try:
flap_tokens = fetch_flap_tokens()
except:
pass
all_momentum_tokens = tokens + flap_tokens
momentum_alerts = track_momentum(all_momentum_tokens)
for token in tokens:
addr = token['address']
chain = token['chain']
name = token['name']
symbol = token['symbol']
# 已扫描过的 — 更新seen_count和narratives的token_count,但不重复推
if is_token_seen(conn, addr):
# 更新seen_count
c = conn.cursor()
c.execute('UPDATE tokens_seen SET seen_count = seen_count + 1, market_cap = ? WHERE address = ?', (token['mc'], addr))
# 更新narratives表的token_count(按主题)
theme_tmp = normalize_theme(name, symbol)
if theme_tmp:
c.execute('UPDATE narratives SET token_count = token_count + 1, last_seen_at = ? WHERE theme = ?', (int(time.time()), theme_tmp))
conn.commit()
continue
# 分类叙事
category, matched_kw = classify_narrative(name, symbol, chain)
if category == 'spam':
record_token(conn, addr, chain, name, symbol, '', 'spam', token['mc'])
continue
# 基本质量门槛(防止推太多垃圾)
min_mc = 1000
min_liq = 500
if token['mc'] < min_mc or token['liq'] < min_liq:
record_token(conn, addr, chain, name, symbol, '', 'too_small', token['mc'])
continue
theme = normalize_theme(name, symbol)
# 所有分类只记录,不直接推送 — 推送统一走动量引擎
record_token(conn, addr, chain, name, symbol, theme, category, token['mc'])
check_narrative_novelty(conn, theme, name, symbol, addr, chain)
conn.close()
# === 推送动量信号 ===
pushed = 0
for ma in momentum_alerts[:8]: # 单轮最多推8个
if tg_send(ma['msg']):
pushed += 1
time.sleep(1) # 避免TG限流
return pushed, len(momentum_alerts)
# ============================================================
# 主循环
# ============================================================
def main():
log("=" * 50)
log("链上雷达 v1 启动")
log(f"扫描间隔: {SCAN_INTERVAL}s")
log(f"推送逻辑: 动量优先 — 连涨才推,叙事只做分类标签")
log("=" * 50)
# 初始化DB
init_db()
# 启动通知
tg_send(
"链上雷达 v1 已启动\n\n"
"核心逻辑: 动量优先\n"
"连涨3轮+涨幅>5%才推送\n"
"叙事只做分类标签:\n"
"★★★ 马斯克/川普 | 币安/CZ | FLAP有社区\n"
"★★ 名人热点 | FLAP无社区 | 有叙事\n"
"★ 无明确叙事\n\n"
f"扫描频率: 每{SCAN_INTERVAL}秒"
)
scan_count = 0
total_pushed = 0
while True:
try:
scan_count += 1
pushed, found = scan_narratives()
total_pushed += pushed
if pushed > 0:
log(f"第{scan_count}轮: 发现{found}个, 推送{pushed}个 (累计推送{total_pushed})")
else:
if scan_count % 20 == 0: # 每20轮报一次无信号
log(f"第{scan_count}轮: 无新信号 (累计推送{total_pushed})")
except Exception as e:
log(f"扫描异常: {e}")
time.sleep(SCAN_INTERVAL)
if __name__ == '__main__':
main()
Bộ quét OI + Funding Rate
Ngày: 2026.04.25 Thẻ: Python · Binance Futures · Telegram
Phát hiện funding rate đổi dấu + OI tăng mạnh
Scanner dựa trên snapshot: phát hiện funding rate chuyển từ dương sang âm trong khi OI đang tăng. Chạy mỗi 5 phút.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
OI持续放大 + 费率由正转负 扫描器
- 每分钟运行一次
- 检测: OI持续放大(4段递增, 总涨幅>8%) + 费率由正转负
- 去重: 同一币种24小时内只推一次
- 纯API零成本
"""
import requests
import json
import os
import time
import sys
from datetime import datetime, timedelta
from pathlib import Path
# ============ 配置 ============
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env.oi"
ALERT_HISTORY_FILE = SCRIPT_DIR / "oi_funding_alerts.json"
FR_SNAPSHOT_FILE = SCRIPT_DIR / "fr_snapshot.json" # 上一次费率快照
# 信号参数
MIN_OI_CHANGE_PCT = 8 # OI总涨幅最低8%
MIN_VOLUME_USDT = 0 # 无门槛,全扫
MIN_FR_PERIODS_POSITIVE = 2 # 转负前至少2期为正
DEDUP_HOURS = 24 # 去重窗口24小时
# ============ 加载TG配置 ============
def load_env():
env = {}
if ENV_FILE.exists():
for line in ENV_FILE.read_text().strip().split('\n'):
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k.strip()] = v.strip()
return env
env = load_env()
TG_BOT_TOKEN = env.get('TG_BOT_TOKEN', '')
TG_CHAT_ID = env.get('TG_CHAT_ID', '')
# ============ TG推送 ============
def send_tg(text):
if not TG_BOT_TOKEN or not TG_CHAT_ID:
print("[TG] 未配置, 仅打印:")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
# 分段发送(TG限制4096字)
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
for chunk in chunks:
try:
resp = requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk,
'parse_mode': 'Markdown'
}, timeout=10)
if resp.status_code != 200:
# fallback无格式
requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk
}, timeout=10)
except Exception as e:
print(f"[TG] 发送失败: {e}")
# ============ 去重 ============
def load_alert_history():
if ALERT_HISTORY_FILE.exists():
try:
return json.loads(ALERT_HISTORY_FILE.read_text())
except:
return {}
return {}
def save_alert_history(history):
ALERT_HISTORY_FILE.write_text(json.dumps(history))
def is_duplicate(symbol, history):
if symbol not in history:
return False
last_alert = datetime.fromisoformat(history[symbol])
return (datetime.now() - last_alert).total_seconds() < DEDUP_HOURS * 3600
def mark_alerted(symbol, history):
history[symbol] = datetime.now().isoformat()
# 清理过期记录
cutoff = datetime.now() - timedelta(hours=DEDUP_HOURS * 2)
history = {k: v for k, v in history.items()
if datetime.fromisoformat(v) > cutoff}
return history
# ============ 费率快照 ============
def load_fr_snapshot():
if FR_SNAPSHOT_FILE.exists():
try:
return json.loads(FR_SNAPSHOT_FILE.read_text())
except:
pass
return {}
def save_fr_snapshot(snapshot):
FR_SNAPSHOT_FILE.write_text(json.dumps(snapshot))
# ============ 核心扫描 ============
def scan():
ts_start = time.time()
# 1. 获取所有永续合约
try:
info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo', timeout=10).json()
symbols = [s['symbol'] for s in info['symbols']
if s['contractType'] == 'PERPETUAL' and s['quoteAsset'] == 'USDT' and s['status'] == 'TRADING']
except Exception as e:
print(f"[ERROR] exchangeInfo: {e}")
return []
# 2. 批量获取24h行情(过滤低量币)
try:
tickers = requests.get('https://fapi.binance.com/fapi/v1/ticker/24hr', timeout=10).json()
ticker_map = {t['symbol']: t for t in tickers}
except Exception as e:
print(f"[ERROR] ticker: {e}")
return []
active = [s for s in symbols if float(ticker_map.get(s, {}).get('quoteVolume', 0)) > MIN_VOLUME_USDT]
# 3. 批量获取当前费率 (一次拿全部)
try:
fr_all = requests.get('https://fapi.binance.com/fapi/v1/premiumIndex', timeout=10).json()
fr_current = {item['symbol']: float(item['lastFundingRate']) for item in fr_all}
except:
fr_current = {}
# 4. 加载上次快照,对比找"刚转负"的
prev_snapshot = load_fr_snapshot()
# 保存本次快照(供下次对比)
save_fr_snapshot(fr_current)
if not prev_snapshot:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 首次运行,保存快照,下次开始对比")
return []
# 找出: 上次>=0, 这次<0 的币
just_turned_negative = []
for sym in active:
prev_fr = prev_snapshot.get(sym)
curr_fr = fr_current.get(sym)
if prev_fr is None or curr_fr is None:
continue
if prev_fr >= 0 and curr_fr < 0:
just_turned_negative.append(sym)
if not just_turned_negative:
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] 扫描完成: {len(active)}币/{elapsed:.1f}s, 无新转负")
return []
print(f"[{datetime.now().strftime('%H:%M:%S')}] 发现 {len(just_turned_negative)} 个刚转负: {just_turned_negative}")
# 5. 只对刚转负的币查OI
signals = []
for sym in just_turned_negative:
try:
# OI历史
oi_hist = requests.get('https://fapi.binance.com/futures/data/openInterestHist',
params={'symbol': sym, 'period': '1h', 'limit': 48}, timeout=10).json()
oi_chg = 0
segs = []
oi_rising = False
if oi_hist and len(oi_hist) >= 12:
oi_values = [float(x['sumOpenInterestValue']) for x in oi_hist]
seg_len = len(oi_values) // 4
if seg_len >= 3:
segs = [
sum(oi_values[:seg_len]) / seg_len,
sum(oi_values[seg_len:seg_len*2]) / seg_len,
sum(oi_values[seg_len*2:seg_len*3]) / seg_len,
sum(oi_values[seg_len*3:]) / max(1, len(oi_values[seg_len*3:]))
]
oi_chg = (segs[3] - segs[0]) / segs[0] * 100 if segs[0] > 0 else 0
oi_rising = oi_chg > 0
t = ticker_map.get(sym, {})
signals.append({
'symbol': sym,
'price': float(t.get('lastPrice', 0)),
'price_chg_24h': float(t.get('priceChangePercent', 0)),
'volume': float(t.get('quoteVolume', 0)),
'oi_change': oi_chg,
'oi_segments': segs,
'oi_rising': oi_rising,
'current_fr': fr_current.get(sym, 0),
'prev_fr': prev_snapshot.get(sym, 0),
})
except:
continue
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] 扫描完成: {len(active)}币/{elapsed:.1f}s, 信号: {len(signals)}")
return signals
# ============ 附加信息 ============
def get_square_discussion(coin):
"""查询币安广场该币的帖子数和浏览量"""
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v4/friendly/pgc/content/queryByHashtag",
params={"hashtag": f"#{coin.lower()}", "pageIndex": 1, "pageSize": 1, "orderBy": "HOT"},
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.binance.com/en/square"},
timeout=8
)
if r.status_code == 200:
ht = r.json().get("data", {}).get("hashtag", {})
return ht.get("contentCount", 0), ht.get("viewCount", 0)
except:
pass
return 0, 0
def get_market_caps():
"""获取币安流通市值"""
mcap = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap[name] = float(mc)
except:
pass
return mcap
def get_spot_symbols():
"""获取有现货的币种"""
try:
info = requests.get("https://api.binance.com/api/v3/exchangeInfo", timeout=10).json()
return {s["baseAsset"] for s in info["symbols"]
if s["quoteAsset"] == "USDT" and s["status"] == "TRADING"}
except:
return set()
def fmt_mcap(v):
if v >= 1e9: return f"${v/1e9:.2f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def fmt_views(v):
if v >= 1e6: return f"{v/1e6:.1f}M"
if v >= 1e3: return f"{v/1e3:.0f}K"
return str(v)
# ============ 格式化推送 ============
def format_alert(signals):
if not signals:
return None
# OI在涨的排前面,同组内按费率绝对值排序
signals.sort(key=lambda x: (-int(x.get('oi_rising', False)), x['current_fr']))
# 批量获取附加信息
mcap_map = get_market_caps()
spot_set = get_spot_symbols()
now = datetime.now().strftime('%m-%d %H:%M')
lines = [f"*[ 费率刚转负+OI涨 ]* {now}\n"]
for s in signals:
coin = s['symbol'].replace('USDT', '')
# 费率: 上期→本期
fr_change = f"{s['prev_fr']:+.4%} -> {s['current_fr']:+.4%}"
# 附加信息
mcap = mcap_map.get(coin, 0)
has_spot = coin in spot_set
sq_posts, sq_views = get_square_discussion(coin)
lines.append(f"```")
lines.append(f"{coin}")
lines.append(f" 价格: {s['price']:.4f} 24h: {s['price_chg_24h']:+.1f}%")
lines.append(f" 费率: {fr_change}")
if s['oi_segments']:
oi_segs = ' > '.join([f"{v/1e6:.1f}M" for v in s['oi_segments']])
lines.append(f" OI: +{s['oi_change']:.1f}% ({oi_segs})")
lines.append(f" 成交额: ${s['volume']/1e6:.1f}M")
lines.append(f" 市值: {fmt_mcap(mcap) if mcap > 0 else '未知'} 现货: {'有' if has_spot else '仅合约'}")
if sq_posts > 0:
lines.append(f" 广场: {sq_posts}帖 / {fmt_views(sq_views)}浏览")
else:
lines.append(f" 广场: 无讨论")
lines.append(f"```")
return '\n'.join(lines)
# ============ 主逻辑 ============
def main():
signals = scan()
if signals:
# 只推: 费率当前为负 + OI在涨 (最强组合)
strong = [s for s in signals if s['current_fr'] < 0 and s.get('oi_rising')]
if strong:
msg = format_alert(strong)
if msg:
send_tg(msg)
print(f" 推送 {len(strong)} 个信号 (总{len(signals)}个转负, {len(strong)}个OI也涨)")
else:
print(f" {len(signals)} 个转负但无OI在涨的, 跳过")
else:
print(f" 无信号")
if __name__ == '__main__':
main()
Accumulation Radar (Radar tích lũy)
Ngày: 2026.04.25 Thẻ: Python · Binance · CoinGlass · Telegram
Động lượng + OI bất thường + Cảnh báo thông minh
Quét theo giờ: theo dõi động lượng các đồng tăng giá hàng đầu, phát hiện OI bất thường, đẩy Telegram. Python thuần, chi phí AI = 0.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
热度做多雷达 v2 — 热度+费率+OI 三维扫描
核心逻辑(拉哪模式):
1. 热度先行 → CG热搜+放量=资金涌入信号
2. 负费率=空头燃料,庄家拉盘爆空单
3. OI暴涨=大资金建仓=即将拉盘
单策略:发现热度→小仓做多→严格止损→拿住赢家
数据源:币安合约API + CoinGecko Trending(零成本)
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
from square_heat import get_square_heat
# === 加载 .env ===
env_file = Path(__file__).parent / ".env.oi"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
# === 配置 ===
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "YOUR_CHAT_ID")
FAPI = "https://fapi.binance.com"
# 热度历史记录(用于检测首次上榜)
HEAT_HISTORY_FILE = Path(__file__).parent / "heat_history.json"
# 热度参数
VOL_SURGE_MULT = 2.5 # 成交量放大2.5倍以上=放量
MIN_VOL_USD = 20_000_000 # 日均成交>$20M才检测放量
# OI异动参数
MIN_OI_DELTA_PCT = 3.0 # OI变化至少3%
MIN_OI_USD = 2_000_000 # 最低OI门槛 $2M
def api_get(endpoint, params=None):
"""币安API请求"""
url = f"{FAPI}{endpoint}"
for attempt in range(3):
try:
resp = requests.get(url, params=params, timeout=10)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 429:
time.sleep(2)
else:
return None
except:
time.sleep(1)
return None
def format_usd(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def mcap_str(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.0f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def send_telegram(text):
"""发送TG消息"""
if not TG_BOT_TOKEN:
print("\n[TG] No token, stdout:\n")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
# 分段发送(TG限制4096字)
chunks = []
current = ""
for line in text.split("\n"):
if len(current) + len(line) + 1 > 3800:
chunks.append(current)
current = line
else:
current += "\n" + line if current else line
if current:
chunks.append(current)
for chunk in chunks:
try:
resp = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk,
"parse_mode": "Markdown"
}, timeout=10)
if resp.status_code == 200:
print(f"[TG] Sent ✓ ({len(chunk)} chars)")
else:
# Markdown失败就用纯文本
resp2 = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk.replace("*", "").replace("_", ""),
}, timeout=10)
print(f"[TG] Sent plain ({'✓' if resp2.status_code == 200 else '✗'})")
except Exception as e:
print(f"[TG] Error: {e}")
time.sleep(0.5)
def main():
print(f"🔥 热度做多雷达 v2 — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 1. 全市场行情+费率
tickers_raw = api_get("/fapi/v1/ticker/24hr")
premiums_raw = api_get("/fapi/v1/premiumIndex")
if not tickers_raw or not premiums_raw:
print("❌ API失败")
return
ticker_map = {}
for t in tickers_raw:
if t["symbol"].endswith("USDT"):
ticker_map[t["symbol"]] = {
"px_chg": float(t["priceChangePercent"]),
"vol": float(t["quoteVolume"]),
"price": float(t["lastPrice"]),
}
funding_map = {}
for p in premiums_raw:
if p["symbol"].endswith("USDT"):
funding_map[p["symbol"]] = float(p["lastFundingRate"])
# 2. 真实流通市值(币安现货API)
mcap_map = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap_map[name] = float(mc)
print(f"✅ 真实市值: {len(mcap_map)}个币")
except Exception as e:
print(f"⚠️ 市值API失败: {e}")
# 3. 热度检测:币安广场热搜 + CoinGecko Trending + 成交量暴增
heat_map = {}
cg_trending = set()
square_trending = set()
# 3a. 币安广场热搜(6H)— 最重要!币安用户=交易用户
sq_coins = get_square_heat()
if sq_coins:
for i, c in enumerate(sq_coins):
coin = c["coin"]
square_trending.add(coin)
# 排名越靠前分越高,急速上升额外加分
rank_score = max(50 - i * 4, 10)
if c.get("rapidRiser"):
rank_score += 15
heat_map[coin] = heat_map.get(coin, 0) + rank_score
print(f"🏦 广场热搜: {len(square_trending)}个币 {[c['coin'] for c in sq_coins[:5]]}")
# 3b. CoinGecko Trending
try:
r = requests.get("https://api.coingecko.com/api/v3/search/trending", timeout=10)
if r.status_code == 200:
for item in r.json().get("coins", []):
sym = item["item"]["symbol"].upper()
rank = item["item"].get("score", 99)
cg_trending.add(sym)
heat_map[sym] = heat_map.get(sym, 0) + max(50 - rank * 3, 10)
print(f"🌐 CG Trending: {len(cg_trending)}个币")
except Exception as e:
print(f"⚠️ CG Trending失败: {e}")
# 成交量暴增检测
vol_surge_coins = set()
for sym, tk in ticker_map.items():
coin = sym.replace("USDT", "")
vol_24h = tk["vol"]
if vol_24h > MIN_VOL_USD:
kl = api_get("/fapi/v1/klines", {"symbol": sym, "interval": "1d", "limit": 8})
if kl and len(kl) >= 5:
avg_prev = sum(float(k[7]) for k in kl[:-1]) / (len(kl) - 1)
if avg_prev > 0:
ratio = vol_24h / avg_prev
if ratio >= VOL_SURGE_MULT:
vol_surge_coins.add(coin)
heat_map[coin] = heat_map.get(coin, 0) + min(ratio * 10, 50)
time.sleep(0.05)
print(f"📈 放量(≥{VOL_SURGE_MULT}x): {len(vol_surge_coins)}个币")
# 双重/三重热度
dual_heat = cg_trending & vol_surge_coins
square_vol = square_trending & vol_surge_coins
triple_heat = cg_trending & vol_surge_coins & square_trending
all_multi_heat = dual_heat | square_vol
if all_multi_heat:
for coin in all_multi_heat:
heat_map[coin] = heat_map.get(coin, 0) + 20
if triple_heat:
for coin in triple_heat:
heat_map[coin] = heat_map.get(coin, 0) + 30 # 三重热度超级加分
print(f"🔥🔥🔥 三重热度: {triple_heat}")
else:
print(f"🔥🔥 双重热度: {all_multi_heat}")
# 4. OI扫描(Top100成交量 + 热度币)
scan_syms = set()
# 热度币必扫
for coin in heat_map:
sym = coin + "USDT"
if sym in ticker_map:
scan_syms.add(sym)
# Top100成交量
top_by_vol = sorted(ticker_map.items(), key=lambda x: x[1]["vol"], reverse=True)[:100]
for sym, _ in top_by_vol:
scan_syms.add(sym)
oi_map = {}
for i, sym in enumerate(scan_syms):
oi_hist = api_get("/futures/data/openInterestHist", {
"symbol": sym, "period": "1h", "limit": 6
})
if oi_hist and len(oi_hist) >= 2:
curr = float(oi_hist[-1]["sumOpenInterestValue"])
prev_1h = float(oi_hist[-2]["sumOpenInterestValue"])
prev_6h = float(oi_hist[0]["sumOpenInterestValue"])
d1h = ((curr - prev_1h) / prev_1h * 100) if prev_1h > 0 else 0
d6h = ((curr - prev_6h) / prev_6h * 100) if prev_6h > 0 else 0
oi_map[sym] = {"oi_usd": curr, "d1h": d1h, "d6h": d6h}
if (i + 1) % 10 == 0:
time.sleep(0.5)
print(f"📊 OI扫描: {len(oi_map)}个币")
# 5. 整合所有数据
all_syms = set(list(ticker_map.keys()))
coin_data = {}
for sym in all_syms:
tk = ticker_map.get(sym, {})
if not tk:
continue
oi = oi_map.get(sym, {})
fr = funding_map.get(sym, 0)
coin = sym.replace("USDT", "")
d6h = oi.get("d6h", 0)
fr_pct = fr * 100
oi_usd = oi.get("oi_usd", 0)
# 真实市值优先,fallback粗估
if coin in mcap_map:
est_mcap = mcap_map[coin]
else:
est_mcap = max(tk["vol"] * 0.3, oi_usd * 2) if oi_usd > 0 else tk["vol"] * 0.3
heat = heat_map.get(coin, 0)
coin_data[sym] = {
"coin": coin, "sym": sym,
"px_chg": tk["px_chg"], "vol": tk["vol"],
"fr_pct": fr_pct, "d6h": d6h,
"oi_usd": oi_usd, "est_mcap": est_mcap,
"heat": heat,
"in_cg": coin in cg_trending,
"in_sq": coin in square_trending,
"vol_surge": coin in vol_surge_coins,
}
# ═══════════════════════════════════════
# 热度榜
# ═══════════════════════════════════════
hot_coins = sorted(
[d for d in coin_data.values() if d["heat"] > 0],
key=lambda x: x["heat"], reverse=True
)
# 检测首次上榜
heat_history = {}
if HEAT_HISTORY_FILE.exists():
try:
heat_history = json.loads(HEAT_HISTORY_FILE.read_text())
except:
pass
now_ts = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
new_entries = [] # 首次上榜的币
for s in hot_coins:
coin = s["coin"]
if coin not in heat_history:
# 首次上榜!
heat_history[coin] = {"first_seen": now_ts, "price": s.get("px_chg", 0)}
sources = []
if s["in_sq"]: sources.append("广场")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("放量")
new_entries.append({"coin": coin, "sources": sources, "data": s})
# 清理超过7天的历史(避免文件无限增长)
cutoff = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=7)).strftime("%Y-%m-%d")
heat_history = {k: v for k, v in heat_history.items()
if v.get("first_seen", "9999") >= cutoff}
# 保存历史
HEAT_HISTORY_FILE.write_text(json.dumps(heat_history, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════
# 追多:负费率+在涨
# ═══════════════════════════════════════
chase = []
for sym, d in coin_data.items():
if d["px_chg"] > 3 and d["fr_pct"] < -0.005 and d["vol"] > 1_000_000:
fr_hist = api_get("/fapi/v1/fundingRate", {"symbol": sym, "limit": 5})
fr_rates = [float(f["fundingRate"]) * 100 for f in fr_hist] if fr_hist else [d["fr_pct"]]
fr_prev = fr_rates[-2] if len(fr_rates) >= 2 else d["fr_pct"]
fr_delta = d["fr_pct"] - fr_prev
trend = "加速恶化" if fr_delta < -0.05 else "转负" if fr_delta < -0.01 else "持平" if abs(fr_delta) < 0.01 else "回升"
chase.append({**d, "fr_delta": fr_delta, "trend": trend,
"rates": " → ".join([f"{x:.3f}" for x in fr_rates[-3:]])})
time.sleep(0.2)
chase.sort(key=lambda x: x["fr_pct"])
# ═══════════════════════════════════════
# 生成推送
# ═══════════════════════════════════════
now = datetime.now(timezone(timedelta(hours=8)))
lines = [
f"**热度做多雷达**",
f"{now.strftime('%Y-%m-%d %H:%M')} CST",
]
# 热度榜(表格)
# 首次上榜放最前面
if new_entries:
lines.append(f"\n**[ 首次上榜 ]** 新出现的热度币,重点关注")
tbl = ["```"]
tbl.append(f"{'币种':<10} {'市值':>8} {'涨幅':>7} {'来源'}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for e in new_entries:
s = e["data"]
src_str = "/".join(e["sources"])
tbl.append(f"{s['coin']:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
if hot_coins:
lines.append(f"\n**[ 热度榜 ]**")
tbl = ["```"]
tbl.append(f"{'币种':<10} {'市值':>8} {'涨幅':>7} {'来源'}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for s in hot_coins[:10]:
sources = []
if s["in_sq"]: sources.append("广场")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("放量")
extra = []
if abs(s["d6h"]) >= 3: extra.append(f"OI{s['d6h']:+.0f}%")
if s["fr_pct"] < -0.03: extra.append(f"费率{s['fr_pct']:.2f}%")
src_str = "/".join(sources)
if extra:
src_str += " " + " ".join(extra)
coin_name = s['coin']
tbl.append(f"{coin_name:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append("\n**[ 热度榜 ]** 暂无热点")
# 追多(表格)
lines.append(f"\n**[ 追多 ]** 涨了+费率负=空头燃料")
if chase:
tbl = ["```"]
tbl.append(f"{'币种':<10} {'费率':>10} {'趋势':>8} {'涨幅':>7} {'市值':>8}")
tbl.append(f"{'-'*10} {'-'*10} {'-'*8} {'-'*7} {'-'*8}")
for s in chase[:8]:
tbl.append(
f"{s['coin']:<10} {s['fr_pct']:>+9.3f}% {s['trend']:>8} {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append(" 暂无符合条件的标的")
# OI异动(表格)
oi_alerts = []
for sym, oi in oi_map.items():
if abs(oi["d6h"]) >= 8:
d = coin_data.get(sym)
if d and d["heat"] == 0:
oi_alerts.append(d)
oi_alerts.sort(key=lambda x: abs(x["d6h"]), reverse=True)
if oi_alerts:
lines.append(f"\n**[ OI异动 ]** 6小时持仓变化>=8%")
tbl = ["```"]
tbl.append(f"{'币种':<10} {'方向':>4} {'OI变化':>8} {'涨幅':>7} {'市值':>8}")
tbl.append(f"{'-'*10} {'-'*4} {'-'*8} {'-'*7} {'-'*8}")
for s in oi_alerts[:6]:
direction = "增仓" if s["d6h"] > 0 else "减仓"
tbl.append(
f"{s['coin']:<10} {direction:>4} {s['d6h']:>+7.1f}% {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
# 值得关注
highlights = []
hot_oi = [d for d in coin_data.values() if d["heat"] > 0 and d["d6h"] > 5]
for s in sorted(hot_oi, key=lambda x: x["d6h"], reverse=True)[:3]:
highlights.append(f"{s['coin']} — 热度高+OI涨{s['d6h']:+.0f}%,资金涌入")
hot_fuel = [d for d in coin_data.values() if d["heat"] > 0 and d["fr_pct"] < -0.03]
for s in sorted(hot_fuel, key=lambda x: x["fr_pct"])[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — 热度高+费率{s['fr_pct']:.2f}%,空头燃料足")
chase_fire = [s for s in chase[:5] if "加速" in s.get("trend", "")]
for s in chase_fire[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — 费率{s['fr_pct']:.3f}%持续恶化,逼空在即")
if highlights:
lines.append(f"\n**[ 值得关注 ]**")
for h in highlights[:5]:
lines.append(f" {h}")
lines.append(f"\n广场=币安站内搜索 / CG=CoinGecko全球热度")
lines.append(f"费率负=做空多,庄家拉盘爆空头")
report = "\n".join(lines)
send_telegram(report)
print("\n✅ 完成")
if __name__ == "__main__":
main()
Binance Alpha Monitor (Giám sát Binance Alpha)
Ngày: 2026.04.23 Thẻ: Python · Binance · Claude AI · Telegram
WebSocket realtime · Phân tích AI · Đẩy Telegram
Tự động phát hiện token mới được niêm yết trên Binance Alpha, phân tích chất lượng token bằng Claude AI và gửi cảnh báo qua Telegram.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
Binance Alpha Monitor v2 — 稳定版
REST轮询 + 智能过滤 + 评级 + TG推送
零API Key,零AI成本(评级用规则引擎)
运行: python3 alpha_monitor.py
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import httpx
# ============================================================
# 配置
# ============================================================
BASE_DIR = Path(__file__).parent
DB_PATH = str(BASE_DIR / "data" / "alpha.db")
# TG
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# LLM (可选,用于抽取叙事,不配置则降级为规则)
# 优先从hermes auth.json读credential pool(与scanner一致)
def _load_anthropic_key():
"""从auth.json或环境变量获取API key"""
# 1. hermes auth.json credential pool
try:
auth_file = os.path.expanduser("~/.hermes/auth.json")
if os.path.exists(auth_file):
with open(auth_file) as f:
auth = json.load(f)
pool = auth.get("credential_pool", {}).get("anthropic", [])
if pool:
key = pool[0].get("access_token", "")
if key and key != "***":
return key
except Exception:
pass
# 2. 环境变量 fallback
return os.environ.get("ANTHROPIC_API_KEY", "")
ANTHROPIC_API_KEY = _load_anthropic_key()
ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")
# 轮询间隔
ANNOUNCEMENT_POLL_INTERVAL = 30 # 公告轮询30秒
AGGREGATION_POLL_INTERVAL = 15 # 聚合工作者15秒
MONITOR_POLL_INTERVAL = 120 # 上线后监控2分钟
# HTTP
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
}
BINANCE_ANNOUNCEMENT_API = "https://www.binance.com/bapi/composite/v1/public/cms/article/list/query"
# ============================================================
# 日志
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("alpha")
# ============================================================
# 过滤规则
# ============================================================
# 触发关键词 — 命中任意一个就触发
TRIGGER_KEYWORDS = [
"alpha", "空投", "airdrop", "tge", "token generation",
"将上线", "will list", "will launch",
"独家", "exclusive", "binance wallet", "hodler",
]
# 排除关键词 — 命中任意一个直接排除
EXCLUDE_KEYWORDS = [
"delisting", "delist", "下架", "deprecate", "退市",
"maintenance", "维护",
"launchpool", "megadrop",
"buyback", "回购",
"已完成", "完成结算",
"perpetual contract", # 永续合约
"futures will launch", # 期货上新
"usdⓢ-margined", # U本位合约
"coin-margined", # 币本位合约
"margin will add", # 杠杆上新
"trading bots services", # 交易机器人
"trading pairs", # 交易对调整(非新币)
]
# Alpha Box 盲盒
ALPHA_BOX_KEYWORDS = ["alpha box", "盲盒", "mystery box"]
# ============================================================
# VC 白名单
# ============================================================
TIER1_VCS = [
"binance labs", "yzi labs",
"coinbase ventures", "a16z", "andreessen horowitz", "paradigm",
"polychain", "polychain capital", "sequoia", "sequoia china", "sequoia capital",
"multicoin", "multicoin capital", "pantera", "pantera capital",
"dragonfly", "dragonfly capital", "founders fund",
]
TIER2_VCS = [
"abcde", "iosg", "hashkey", "okx ventures",
"sevenx", "folius", "foresight", "hashed",
"bitkraft", "framework", "framework ventures",
"delphi", "delphi digital", "electric capital",
"variant", "1kx", "placeholder",
"animoca", "animoca brands", "jump", "jump crypto",
"hack vc", "bain capital",
]
# 叙事热度
HOT_NARRATIVES = ["defi_perp", "ai_agent", "ai_defi", "defai", "zk_proof"]
WEAK_NARRATIVES = ["gamefi", "meme", "social"]
# 币安亲儿子信号
BINANCE_DARLING_KEYWORDS = ["yzi labs", "binance labs"]
# ============================================================
# 评级引擎
# ============================================================
TIER_ICONS = {"S": "🟢🟢🟢", "A": "🟡🟡", "B": "🟠", "C": "⚪"}
TIER_LABELS = {"S": "S 级(必研究)", "A": "A 级(值得看)", "B": "B 级(正常)", "C": "C 级(了解)"}
def count_vc_tier(vcs: list, vc_list: list) -> int:
count = 0
vcs_lower = [v.lower() for v in vcs]
for t in vc_list:
if any(t in v for v in vcs_lower):
count += 1
return count
def rate_project(circ_mcap: float, fdv: float, vcs: list,
narrative: str, is_darling: bool) -> dict:
"""S/A/B/C 评级"""
t1 = count_vc_tier(vcs, TIER1_VCS)
t2 = count_vc_tier(vcs, TIER2_VCS)
hot = narrative in HOT_NARRATIVES
weak = narrative in WEAK_NARRATIVES
circ_mcap = circ_mcap or 0
fdv = fdv or 0
warnings = []
if weak:
warnings.append(f"⚠️ {narrative} 历史破发率较高")
# S级 5条路径
if is_darling:
return {"tier": "S", "reason": "币安亲儿子(YZi/Binance Labs/CZ)", "warnings": warnings}
if hot and t1 >= 1 and fdv < 500_000_000:
return {"tier": "S", "reason": f"热叙事({narrative})+ Tier1 VC", "warnings": warnings}
if t1 >= 2 and circ_mcap < 50_000_000 and fdv < 300_000_000:
return {"tier": "S", "reason": "≥2家 Tier1 中盘", "warnings": warnings}
if t1 >= 1 and circ_mcap < 10_000_000 and fdv < 100_000_000:
return {"tier": "S", "reason": "Tier1 微盘", "warnings": warnings}
if hot and circ_mcap < 10_000_000 and fdv < 50_000_000:
return {"tier": "S", "reason": f"热叙事({narrative})微盘", "warnings": warnings}
# A级
if t1 >= 1 and circ_mcap < 20_000_000 and fdv < 200_000_000:
return {"tier": "A", "reason": "Tier1 小盘", "warnings": warnings}
# B级
if circ_mcap < 50_000_000 and fdv < 500_000_000:
return {"tier": "B", "reason": "中盘", "warnings": warnings}
return {"tier": "C", "reason": "大盘/弱信号", "warnings": warnings}
# ============================================================
# 数据库
# ============================================================
def init_db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
symbol TEXT NOT NULL,
name TEXT,
launch_time TEXT,
source TEXT,
raw_text TEXT,
tier TEXT DEFAULT 'PENDING',
tier_reason TEXT,
narrative TEXT,
narrative_desc TEXT,
vcs_json TEXT DEFAULT '[]',
is_darling INTEGER DEFAULT 0,
open_price REAL,
total_supply REAL,
circulating_supply REAL,
fdv REAL,
circulating_mcap REAL,
excluded INTEGER DEFAULT 0,
exclude_reason TEXT,
discovered_at TEXT,
updated_at TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS pushes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
push_type TEXT,
sent_at TEXT,
content TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
price REAL,
circulating_mcap REAL,
fdv REAL
)""")
conn.commit()
conn.close()
def project_id(symbol: str, date_str: str) -> str:
return hashlib.md5(f"{symbol.upper()}_{date_str}".encode()).hexdigest()[:16]
def project_exists(pid: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute("SELECT 1 FROM projects WHERE id=?", (pid,)).fetchone() is not None
conn.close()
return exists
def save_project(project: dict):
conn = sqlite3.connect(DB_PATH)
now = datetime.utcnow().isoformat()
conn.execute("""
INSERT OR IGNORE INTO projects
(id, symbol, name, launch_time, source, raw_text, tier, tier_reason,
narrative, narrative_desc, vcs_json, is_darling,
open_price, total_supply, circulating_supply, fdv, circulating_mcap,
excluded, exclude_reason, discovered_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
project["id"], project["symbol"], project.get("name"),
project.get("launch_time"), project.get("source"), project.get("raw_text"),
project.get("tier", "PENDING"), project.get("tier_reason"),
project.get("narrative"), project.get("narrative_desc"),
json.dumps(project.get("vcs", [])), int(project.get("is_darling", False)),
project.get("open_price"), project.get("total_supply"),
project.get("circulating_supply"), project.get("fdv"),
project.get("circulating_mcap"),
int(project.get("excluded", 0)), project.get("exclude_reason"),
now, now,
))
conn.commit()
conn.close()
def update_project(pid: str, fields: dict):
if not fields:
return
conn = sqlite3.connect(DB_PATH)
fields["updated_at"] = datetime.utcnow().isoformat()
set_parts = [f"{k}=?" for k in fields]
values = list(fields.values()) + [pid]
conn.execute(f"UPDATE projects SET {','.join(set_parts)} WHERE id=?", values)
conn.commit()
conn.close()
def get_project(pid: str) -> Optional[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM projects WHERE id=?", (pid,)).fetchone()
conn.close()
return dict(row) if row else None
def list_pending() -> list:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM projects WHERE excluded=0 AND tier='PENDING' ORDER BY discovered_at"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_active() -> list:
"""上线后需要监控的项目(非PENDING非EXCLUDED,有launch_time)"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM projects
WHERE excluded=0 AND launch_time IS NOT NULL AND launch_time != ''
AND tier NOT IN ('PENDING', 'EXCLUDED', 'ERROR')
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def has_pushed(pid: str, push_type: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute(
"SELECT 1 FROM pushes WHERE project_id=? AND push_type=?", (pid, push_type)
).fetchone() is not None
conn.close()
return exists
def log_push(pid: str, push_type: str, content: str):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO pushes (project_id, push_type, sent_at, content) VALUES (?,?,?,?)",
(pid, push_type, datetime.utcnow().isoformat(), content)
)
conn.commit()
conn.close()
def save_snapshot(pid: str, price: float, mcap: float, fdv: float):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO snapshots (project_id, timestamp, price, circulating_mcap, fdv) VALUES (?,?,?,?,?)",
(pid, datetime.utcnow().isoformat(), price, mcap, fdv)
)
conn.commit()
conn.close()
# ============================================================
# TG 推送
# ============================================================
async def send_tg(text: str, silent: bool = False) -> bool:
if not TG_BOT_TOKEN or not TG_CHAT_ID:
logger.error("TG_BOT_TOKEN 或 TG_CHAT_ID 未配置")
return False
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_notification": silent,
}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.post(url, json=payload)
if resp.status_code != 200:
logger.error(f"TG发送失败 {resp.status_code}: {resp.text[:200]}")
return False
return True
except Exception as e:
logger.error(f"TG发送异常: {e}")
return False
# ============================================================
# 公告标题解析
# ============================================================
def is_trigger(title: str) -> tuple[bool, Optional[str]]:
t = title.lower()
for kw in EXCLUDE_KEYWORDS:
if kw.lower() in t:
return False, f"排除: {kw}"
for kw in ALPHA_BOX_KEYWORDS:
if kw.lower() in t:
return False, "Alpha Box 盲盒"
for kw in TRIGGER_KEYWORDS:
if kw.lower() in t:
return True, None
return False, None
def extract_symbol(title: str) -> Optional[str]:
# 英文括号: "Chip (CHIP)"
m = re.search(r"\(([A-Z0-9]{2,10})\)", title)
if m:
return m.group(1)
# 中文括号
m = re.search(r"(([A-Z0-9]{2,10}))", title)
if m:
return m.group(1)
return None
def extract_name(title: str) -> Optional[str]:
patterns = [
r"(?:上线|List|list|Launch|launch|featured)\s+([A-Za-z0-9 ]+?)\s*[\((]",
]
for p in patterns:
m = re.search(p, title, re.IGNORECASE)
if m:
return m.group(1).strip()
return None
# ============================================================
# 币安公告抓取
# ============================================================
async def fetch_announcements(limit: int = 20) -> list:
"""抓取币安最新公告"""
all_articles = []
# 48: New Cryptocurrency Listing, 161: Latest Activities, 93: Latest News
for catalog_id in [48, 161, 93]:
params = {"type": 1, "catalogId": catalog_id, "pageNo": 1, "pageSize": limit}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.get(BINANCE_ANNOUNCEMENT_API, params=params)
resp.raise_for_status()
data = resp.json()
for catalog in data.get("data", {}).get("catalogs", []):
for a in catalog.get("articles", []):
a["_catalog_id"] = catalog_id
all_articles.append(a)
except Exception as e:
logger.warning(f"抓取分类 {catalog_id} 失败: {e}")
# 去重
seen = set()
unique = []
for a in all_articles:
code = a.get("code")
if code and code not in seen:
seen.add(code)
unique.append(a)
return unique
# ============================================================
# CoinGecko 数据
# ============================================================
async def fetch_coingecko(symbol: str, project_name: str = "") -> dict:
"""查CoinGecko代币经济数据。双重匹配:symbol精确匹配 + 项目名模糊匹配"""
result = {"found": False, "price": None, "fdv": None, "mcap": None,
"total_supply": None, "circ_supply": None, "chain": None, "contract": None}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
# 搜索策略:先搜symbol,如果项目名不同再搜项目名
queries = [symbol]
if project_name and project_name.upper() != symbol.upper():
queries.append(project_name)
coin_id = None
best_rank = 999999
name_exact_match = None # 项目名精确匹配优先
for query in queries:
resp = await client.get("https://api.coingecko.com/api/v3/search",
params={"query": query})
if resp.status_code != 200:
continue
coins = resp.json().get("coins", [])
for c in coins:
c_sym = c.get("symbol", "").upper()
c_name = c.get("name", "").lower()
c_rank = c.get("market_cap_rank") or 999999
# 项目名精确匹配(最高优先级,如MegaETH -> megaeth)
if project_name and c_name == project_name.lower():
name_exact_match = c["id"]
# 精确symbol匹配 — 取market_cap_rank最高(最小)的
if c_sym == symbol.upper():
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
# 项目名模糊匹配(如MegaETH搜mega)
if project_name and project_name.lower() in c_name:
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
# 项目名精确匹配 > 其他匹配
if name_exact_match:
coin_id = name_exact_match
if not coin_id:
logger.info(f"CoinGecko未找到 {symbol}/{project_name}")
return result
logger.info(f"CoinGecko匹配: {symbol} -> {coin_id} (rank={best_rank})")
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code == 429:
await asyncio.sleep(5)
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code != 200:
return result
d = resp2.json()
md = d.get("market_data", {})
result.update({
"found": True,
"price": (md.get("current_price") or {}).get("usd"),
"fdv": (md.get("fully_diluted_valuation") or {}).get("usd"),
"mcap": (md.get("market_cap") or {}).get("usd"),
"total_supply": md.get("total_supply"),
"circ_supply": md.get("circulating_supply"),
})
# 提取categories(含VC信息如"YZi Labs Portfolio")和description
result["categories"] = d.get("categories", [])
result["description"] = (d.get("description") or {}).get("en", "")[:500]
platforms = d.get("platforms", {})
for chain, addr in platforms.items():
if addr:
result["chain"] = chain
result["contract"] = addr
break
# CoinGecko没价格时(新币常见),从币安合约/现货补充
if not result["price"]:
binance_price = await _fetch_binance_price(symbol, client)
if binance_price:
result["price"] = binance_price
ts = result.get("total_supply") or 0
cs = result.get("circ_supply") or 0
if ts > 0:
result["fdv"] = binance_price * ts
if cs > 0:
result["mcap"] = binance_price * cs
logger.info(f"币安补充价格 {symbol}: ${binance_price}, FDV=${result.get('fdv',0):,.0f}")
except Exception as e:
logger.warning(f"CoinGecko查询失败 {symbol}: {e}")
return result
async def _fetch_binance_price(symbol: str, client: httpx.AsyncClient) -> float:
"""从币安现货或合约获取价格"""
pair = f"{symbol.upper()}USDT"
# 1. 现货
try:
resp = await client.get(f"https://api.binance.com/api/v3/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
# 2. 合约
try:
resp = await client.get(f"https://fapi.binance.com/fapi/v1/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
return 0.0
# ============================================================
# LLM 叙事抽取(可选,降级为规则)
# ============================================================
async def llm_extract(raw_text: str, symbol: str, name: str = "", cg_data: dict = None) -> dict:
"""用LLM从公告+CoinGecko数据抽取叙事/VC/是否亲儿子"""
fallback = {
"narrative": "unknown", "narrative_desc": "",
"vcs": [], "is_darling": False, "exclude_reason": None,
}
# 从CoinGecko categories自动提取信息
cg_data = cg_data or {}
categories = cg_data.get("categories", [])
description = cg_data.get("description", "")
# 自动检测亲儿子(从categories)
darling_cats = [c for c in categories if any(kw in c.lower() for kw in ["yzi labs", "binance labs"])]
if darling_cats:
fallback["is_darling"] = True
if not ANTHROPIC_API_KEY:
# 降级:从标题+categories关键词猜
t = raw_text.lower()
for kw in BINANCE_DARLING_KEYWORDS:
if kw in t:
fallback["is_darling"] = True
# 从categories猜叙事
cat_str = " ".join(categories).lower()
if "defi" in cat_str: fallback["narrative"] = "defi"
elif "ai" in cat_str: fallback["narrative"] = "ai_agent"
elif "gaming" in cat_str or "gamefi" in cat_str: fallback["narrative"] = "gamefi"
elif "meme" in cat_str: fallback["narrative"] = "meme"
elif "rwa" in cat_str or "real world" in cat_str: fallback["narrative"] = "rwa"
return fallback
# 构建丰富的上下文
extra_context = ""
if categories:
extra_context += f"\nCoinGecko分类: {', '.join(categories)}"
if description:
extra_context += f"\n项目描述: {description[:300]}"
if cg_data.get("found"):
extra_context += f"\n市场数据: FDV=${cg_data.get('fdv',0):,.0f}, MCap=${cg_data.get('mcap',0):,.0f}, 价格=${cg_data.get('price',0)}"
if cg_data.get("chain"):
extra_context += f", 链={cg_data['chain']}"
system = "你是加密货币研究员,从币安公告和项目数据中提取关键信息。只返回JSON,无其他文字。"
user = f"""分析这个币安上新项目:
代币: {symbol}, 项目名: {name or "未知"}
公告原文: {raw_text}
{extra_context}
返回JSON:
{{
"narrative": "defi_perp|ai_agent|ai_defi|defai|zk_proof|infra|defi|rwa|gamefi|meme|social|stablecoin|unknown",
"narrative_desc": "一句话中文描述这个项目做什么、有什么特点",
"vcs": ["从CoinGecko分类和公告中提取的投资机构列表"],
"is_darling": true/false,
"exclude_reason": null|"already_tge"|"meme_only"
}}
判断规则:
- narrative: 选最主要的一个类别
- vcs: CoinGecko分类里如果有 "XXX Portfolio" 就提取XXX作为机构
- is_darling: 如果有YZi Labs/Binance Labs投资 或 CZ/何一站台 则true
- exclude_reason: 只有当项目在其他主要CEX(如Coinbase/OKX/Bybit)上线超过3个月才算"already_tge"。如果只是在DEX或刚在币安上线,不算already_tge。CoinGecko有价格数据不代表already_tge。纯meme无叙事则"meme_only"
"""
try:
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(
f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": ANTHROPIC_MODEL,
"max_tokens": 800,
"temperature": 0,
"system": system,
"messages": [{"role": "user", "content": user}],
}
)
if resp.status_code != 200:
logger.warning(f"LLM调用失败 {resp.status_code}")
return fallback
data = resp.json()
text = ""
for block in data.get("content", []):
if block.get("type") == "text":
text = block.get("text", "")
break
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(lines[1:-1])
return json.loads(text)
except Exception as e:
logger.warning(f"LLM抽取异常: {e}")
return fallback
# ============================================================
# 消息格式化
# ============================================================
def _fmt_mcap(v):
if not v:
return "N/A"
if v >= 1e9:
return f"${v/1e9:.1f}B"
if v >= 1e6:
return f"${v/1e6:.1f}M"
if v >= 1e3:
return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def _fmt_price(v):
if not v:
return "N/A"
if v >= 1:
return f"${v:.2f}"
if v >= 0.01:
return f"${v:.4f}"
return f"${v:.6f}"
def fmt_discovery(p: dict) -> str:
tier = p.get("tier", "C")
icon = TIER_ICONS.get(tier, "⚪")
label = TIER_LABELS.get(tier, "")
symbol = p["symbol"]
name = p.get("name") or ""
vcs = json.loads(p.get("vcs_json", "[]")) if isinstance(p.get("vcs_json"), str) else p.get("vcs", [])
lines = [
f"{icon} <b>Alpha 首发 · ${symbol}</b> {icon}",
f"📋 {label}",
"",
f"<b>{name}</b>" if name else "",
]
if p.get("narrative_desc"):
lines.append(f"💡 {p['narrative_desc']}")
if p.get("narrative") and p["narrative"] != "unknown":
lines.append(f"🏷 叙事: {p['narrative']}")
lines.append("")
if p.get("fdv"):
lines.append(f"📊 FDV: {_fmt_mcap(p['fdv'])}")
if p.get("circulating_mcap"):
lines.append(f"📊 流通市值: {_fmt_mcap(p['circulating_mcap'])}")
if p.get("open_price"):
lines.append(f"💰 预估开盘价: {_fmt_price(p['open_price'])}")
if p.get("total_supply") and p.get("circulating_supply"):
pct = p["circulating_supply"] / p["total_supply"] * 100
lines.append(f"📦 初始流通: {pct:.1f}%")
if vcs:
lines.append("")
lines.append("🏛 <b>机构</b>")
for v in vcs[:5]:
is_t1 = any(t in v.lower() for t in TIER1_VCS)
lines.append(f" {'⭐' if is_t1 else '·'} {v}")
if p.get("is_darling"):
lines.append("")
lines.append("🔥 <b>币安亲儿子</b>")
if p.get("tier_reason"):
lines.append("")
lines.append(f"🎯 {p['tier_reason']}")
lines.append("")
lines.append(f"<i>📌 来源: {p.get('source', 'binance')}</i>")
if p.get("raw_text"):
lines.append(f"<i>{p['raw_text'][:120]}</i>")
return "\n".join(l for l in lines if l is not None)
def fmt_countdown(p: dict, minutes: int) -> str:
icon = TIER_ICONS.get(p.get("tier", "C"), "⚪")
t = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
lines = [
f"{icon} <b>倒计时提醒</b>",
f"<b>${p['symbol']}</b> · {p.get('name', '')}",
f"⏰ 距上线还有 <b>{t}</b>",
]
if p.get("fdv"):
lines.append(f"FDV: {_fmt_mcap(p['fdv'])}")
if minutes <= 30:
lines.append("🔔 <b>准备下单</b>")
return "\n".join(lines)
def fmt_launch(p: dict, price: float, mcap: float, fdv: float) -> str:
lines = [
f"🚀 <b>${p['symbol']} 已上线</b>",
f"开盘价: <b>{_fmt_price(price)}</b>",
f"流通市值: <b>{_fmt_mcap(mcap)}</b>",
f"FDV: <b>{_fmt_mcap(fdv)}</b>",
]
return "\n".join(lines)
def fmt_periodic(p: dict, idx: int, price: float, mcap: float, change_pct: float) -> str:
arrow = "📈" if change_pct > 0 else "📉"
minutes = 30 * idx
lines = [
f"⏱ <b>${p['symbol']} · +{minutes}min</b>",
f"流通市值: {_fmt_mcap(mcap)} ({arrow} {change_pct:+.1f}%)",
f"当前价: {_fmt_price(price)}",
]
if change_pct >= 100:
lines.append("💡 <b>已翻倍,考虑分批止盈</b>")
elif change_pct <= -30:
lines.append("⚠️ 跌幅较大,评估是否止损")
return "\n".join(lines)
def fmt_anomaly(p: dict, atype: str, price: float, change_pct: float) -> str:
emoji = {"double": "🚀", "halve": "🔻"}.get(atype, "⚡")
desc = {"double": "市值翻倍", "halve": "市值腰斩"}.get(atype, "异动")
return f"{emoji} <b>${p['symbol']} {desc}</b>\n变化: {change_pct:+.1f}%\n当前价: {_fmt_price(price)}"
# ============================================================
# 核心逻辑: 公告监听
# ============================================================
async def announcement_listener():
"""轮询币安公告,发现新Alpha项目"""
logger.info(f"📡 公告监听启动 · 轮询 {ANNOUNCEMENT_POLL_INTERVAL}s")
while True:
try:
articles = await fetch_announcements()
new_count = 0
for art in articles:
title = art.get("title", "")
triggered, reason = is_trigger(title)
if not triggered:
continue
symbol = extract_symbol(title)
if not symbol:
continue
# 用发布日期去重
release_ts = art.get("releaseDate")
release_iso = datetime.fromtimestamp(release_ts / 1000).isoformat() if release_ts else ""
launch_date = release_iso[:10] if release_iso else datetime.utcnow().date().isoformat()
pid = project_id(symbol, launch_date)
if project_exists(pid):
continue
project = {
"id": pid,
"symbol": symbol,
"name": extract_name(title),
"launch_time": release_iso,
"source": "binance_announcement",
"raw_text": title,
"tier": "PENDING",
"vcs": [],
"is_darling": False,
"excluded": 0,
}
save_project(project)
new_count += 1
logger.info(f"🆕 发现 ${symbol}: {title[:80]}")
if new_count:
logger.info(f"本轮发现 {new_count} 个新项目")
except Exception as e:
logger.error(f"公告监听异常: {e}", exc_info=True)
await asyncio.sleep(ANNOUNCEMENT_POLL_INTERVAL)
# ============================================================
# 核心逻辑: 聚合 + 推送
# ============================================================
async def aggregation_worker():
"""对PENDING项目做数据聚合、评级、推送"""
logger.info(f"🧠 聚合工作者启动 · 轮询 {AGGREGATION_POLL_INTERVAL}s")
while True:
try:
pending = list_pending()
for p in pending:
symbol = p["symbol"]
try:
logger.info(f"📦 聚合 ${symbol}")
# 1. CoinGecko
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
await asyncio.sleep(1) # 避免限流
# 2. LLM抽取叙事(传入CoinGecko数据增强分析)
llm = await llm_extract(p.get("raw_text", ""), symbol, p.get("name"), cg_data=cg)
await asyncio.sleep(1)
# 3. 判断是否排除
if llm.get("exclude_reason") in ("already_tge", "meme_only"):
update_project(p["id"], {
"excluded": 1,
"exclude_reason": llm["exclude_reason"],
"tier": "EXCLUDED",
})
logger.info(f"⏭ ${symbol} 排除: {llm['exclude_reason']}")
continue
# 4. 数据校验 — CoinGecko数据可能匹配错或项目太新没数据
cg_fdv = cg.get("fdv", 0) or 0
cg_mcap = cg.get("mcap", 0) or 0
data_suspect = False
data_warnings = []
# 上币安的项目FDV不可能低于$100K(除非CoinGecko匹配错了)
if cg.get("found") and cg_fdv > 0 and cg_fdv < 100_000:
data_suspect = True
data_warnings.append(f"FDV=${cg_fdv:,.0f}异常低,可能CoinGecko匹配错误")
logger.warning(f"⚠️ {symbol} FDV=${cg_fdv:,.0f} 异常低,数据可能不准")
# 流通100%的大项目也要警惕(新代币通常不会100%流通)
if (cg.get("circ_supply") and cg.get("total_supply")
and cg["circ_supply"] == cg["total_supply"]
and cg_fdv < 1_000_000):
data_suspect = True
data_warnings.append("流通=总量且FDV极低,可能是同名垃圾币")
if data_suspect:
# 标记数据不可靠,评级时不使用CoinGecko的FDV/MCap
cg_fdv = 0
cg_mcap = 0
logger.warning(f"⚠️ {symbol} CoinGecko数据不可靠,评级使用LLM判断为主")
# 5. 评级
is_darling = llm.get("is_darling", False)
vcs = llm.get("vcs", [])
narrative = llm.get("narrative", "unknown")
rating = rate_project(
cg_mcap, cg_fdv,
vcs, narrative, is_darling
)
# 6. 更新DB(data_suspect时不存CoinGecko的错误数据)
update_project(p["id"], {
"tier": rating["tier"],
"tier_reason": rating["reason"],
"narrative": narrative,
"narrative_desc": llm.get("narrative_desc", ""),
"vcs_json": json.dumps(vcs),
"is_darling": int(is_darling),
"open_price": cg.get("price") if not data_suspect else None,
"total_supply": cg.get("total_supply") if not data_suspect else None,
"circulating_supply": cg.get("circ_supply") if not data_suspect else None,
"fdv": cg_fdv if cg_fdv else None,
"circulating_mcap": cg_mcap if cg_mcap else None,
})
# 6. 推送discovery
full = get_project(p["id"])
if full and not has_pushed(p["id"], "discovery"):
text = fmt_discovery(full)
silent = rating["tier"] in ("B", "C")
ok = await send_tg(text, silent=silent)
if ok:
log_push(p["id"], "discovery", text)
logger.info(f"✅ 推送 ${symbol} [{rating['tier']}]")
except Exception as e:
logger.error(f"聚合 {symbol} 失败: {e}", exc_info=True)
update_project(p["id"], {"tier": "ERROR", "tier_reason": str(e)[:100]})
except Exception as e:
logger.error(f"聚合循环异常: {e}", exc_info=True)
await asyncio.sleep(AGGREGATION_POLL_INTERVAL)
# ============================================================
# 核心逻辑: 上线后监控
# ============================================================
async def post_launch_monitor():
"""倒计时提醒 + 上线瞬间 + 30min×4跟踪 + 异动"""
logger.info(f"📊 上线监控启动 · 轮询 {MONITOR_POLL_INTERVAL}s")
while True:
try:
projects = list_active()
for p in projects:
try:
await _monitor_project(p)
except Exception as e:
logger.error(f"监控 {p['symbol']} 异常: {e}")
except Exception as e:
logger.error(f"监控循环异常: {e}", exc_info=True)
await asyncio.sleep(MONITOR_POLL_INTERVAL)
async def _monitor_project(p: dict):
pid = p["id"]
symbol = p["symbol"]
launch_str = p.get("launch_time", "")
if not launch_str:
return
try:
launch = datetime.fromisoformat(launch_str.replace("Z", "").split("+")[0])
except:
return
now = datetime.utcnow()
delta_sec = (launch - now).total_seconds()
# T-3h
if 3*3600 - 300 <= delta_sec <= 3*3600 + 300:
if not has_pushed(pid, "t_minus_3h"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=p.get("tier") in ("B", "C"))
if ok:
log_push(pid, "t_minus_3h", text)
# T-30m
elif 30*60 - 150 <= delta_sec <= 30*60 + 150:
if not has_pushed(pid, "t_minus_30m"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "t_minus_30m", text)
# 上线瞬间
elif -300 <= delta_sec <= 0:
if not has_pushed(pid, "at_launch"):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
text = fmt_launch(p, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "at_launch", text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
# 上线后30min × 4
elif 0 < -delta_sec <= 2.5 * 3600:
minutes_after = int(-delta_sec / 60)
for idx, target in enumerate([30, 60, 90, 120], 1):
if abs(minutes_after - target) <= 5:
ptype = f"post_30m_{idx}"
if not has_pushed(pid, ptype):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
open_price = p.get("open_price") or cg["price"]
change = ((cg["price"] - open_price) / open_price * 100) if open_price else 0
text = fmt_periodic(p, idx, cg["price"], cg.get("mcap", 0), change)
ok = await send_tg(text, silent=p.get("tier") in ("B", "C") and idx > 1)
if ok:
log_push(pid, ptype, text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
# 异动
if change >= 100 and not has_pushed(pid, "anomaly_double"):
t = fmt_anomaly(p, "double", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_double", t)
elif change <= -50 and not has_pushed(pid, "anomaly_halve"):
t = fmt_anomaly(p, "halve", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_halve", t)
break
# ============================================================
# 启动
# ============================================================
async def main():
init_db()
logger.info(f"📂 数据库: {DB_PATH}")
# 测试TG
ok = await send_tg("🎉 <b>Alpha Monitor v2 启动</b>\n\n📡 币安公告监听中...\n🔔 有新Alpha会立即推送")
if ok:
logger.info("✅ TG推送正常")
else:
logger.warning("⚠️ TG推送失败,检查配置")
tasks = [
asyncio.create_task(announcement_listener(), name="announcements"),
asyncio.create_task(aggregation_worker(), name="aggregator"),
asyncio.create_task(post_launch_monitor(), name="monitor"),
]
logger.info("=" * 50)
logger.info("🚀 Alpha Monitor v2 启动完成")
logger.info(f" 📡 公告轮询: {ANNOUNCEMENT_POLL_INTERVAL}s")
logger.info(f" 🧠 LLM: {'Sonnet' if ANTHROPIC_API_KEY else '降级(规则)'}")
logger.info(f" 🔔 TG: {'✅' if TG_BOT_TOKEN else '❌'}")
logger.info("=" * 50)
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
for t in tasks:
t.cancel()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
AI Giao Dịch Tự Động
Giao dịch tự động Futures + Alpha v1
Ngày: 2026.04.29 Thẻ: Python · Binance Futures · Autonomous · Telegram
AI quét thị trường → phân tích → giao dịch ảo → theo dõi → review — hoàn toàn tự động
⚠️ Cảnh báo rủi ro: Code này chỉ được kiểm tra trên giao dịch ảo/paper. Không có kinh nghiệm giao dịch thực và KHÔNG nên dùng làm cơ sở cho giao dịch thực. Chưa được xác thực với tiền thực — sử dụng tự chịu rủi ro.
Một AI tự động quét toàn bộ thị trường Binance Futures mỗi 30 giây, phát hiện bất thường và mở vị thế ảo. 4 chiến lược phát hiện tín hiệu (funding rate âm cực đoan → squeeze long, funding dương cực đoan → short, short sau khi pump, bật sau khi sập). Trước khi mở vị thế, chạy kiểm tra môi trường đa chiều (xu hướng BTC + Fear&Greed + OI attention + khối lượng) — cần ≥3/7 điểm. Tự động giám sát cắt lỗ/chốt lời mỗi 30 giây.
Kết quả hiện tại (công bố trung thực): 4 lệnh đã đóng, win rate 75%, +13.94U. Nhưng lợi nhuận tập trung ở một lệnh (IR short +45%), phần còn lại về cơ bản hòa vốn. Đa dạng hóa vị thế chưa đủ — có xu hướng chồng các lệnh cùng hướng cùng logic. Vẫn là chấm điểm dựa trên rule, chưa phải tư duy độc lập thực sự.
Lộ trình huấn luyện: scan → trade → close → review → tìm lỗi → cải thiện → lặp. Mục tiêu: tiến hóa từ trình thực thi rule thành trader có tư duy độc lập.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
市场扫描器 - 每分钟运行
纯Python零AI成本,发现异常信号自动开仓
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_FILE = os.path.join(SCRIPT_DIR, "trades.json")
SCANNER_STATE = os.path.join(SCRIPT_DIR, "scanner_state.json")
SCANNER_LOG = os.path.join(SCRIPT_DIR, "scanner.log")
INITIAL_BALANCE = 100.0
TZ_UTC8 = timezone(timedelta(hours=8))
# === 配置 ===
MAX_OPEN_POSITIONS = 3 # 最多同时持仓
POSITION_PCT = 30 # 每笔仓位占比%
LEVERAGE = 3 # 杠杆
COOLDOWN_HOURS = 4 # 同一币种冷却时间
MIN_VOLUME_M = 10 # 最小24h成交额(百万U)
# === TG推送 ===
def load_tg_config():
"""Load TG config from environment variables or .env file"""
env = {}
# Try .env in script directory, then current directory
for env_path in [
os.path.join(SCRIPT_DIR, ".env"),
os.path.join(os.getcwd(), ".env"),
]:
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v.strip().strip('"').strip("'")
break
# OS environment variables override file
for key in ['TG_BOT_TOKEN', 'TELEGRAM_BOT_TOKEN', 'TG_CHAT_ID']:
val = os.environ.get(key)
if val:
env[key] = val
return env
def send_tg(text):
try:
env = load_tg_config()
token = env.get('TG_BOT_TOKEN', env.get('TELEGRAM_BOT_TOKEN', ''))
if not token:
return
chat_id = env.get('TG_CHAT_ID', '')
if not chat_id:
return
url = f"https://api.telegram.org/bot{token}/sendMessage"
requests.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
}, timeout=10)
except:
pass
# === 数据加载 ===
def load_trades():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"initial_balance": INITIAL_BALANCE, "trades": []}
def save_trades(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_state():
if os.path.exists(SCANNER_STATE):
with open(SCANNER_STATE, "r") as f:
return json.load(f)
return {"last_opens": {}, "signals_seen": {}}
def save_state(state):
with open(SCANNER_STATE, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def get_balance(data):
balance = data.get("initial_balance", INITIAL_BALANCE)
for t in data["trades"]:
if t["status"] == "closed" and t["pnl_usd"] is not None:
balance += t["pnl_usd"]
return balance
def next_id(data):
if not data["trades"]:
return "001"
max_id = max(int(t["id"]) for t in data["trades"])
return f"{max_id + 1:03d}"
def now_str():
return datetime.now(TZ_UTC8).strftime("%Y-%m-%dT%H:%M:%S")
def log(msg):
ts = datetime.now(TZ_UTC8).strftime("%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(SCANNER_LOG, "a") as f:
f.write(line + "\n")
# === 币安API ===
def get_all_tickers():
url = "https://fapi.binance.com/fapi/v1/ticker/24hr"
resp = requests.get(url, timeout=10)
return resp.json()
def get_funding_rates():
"""获取所有币种最新费率"""
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
resp = requests.get(url, timeout=10)
return {item['symbol']: float(item['lastFundingRate']) * 100
for item in resp.json()}
def get_funding_history(symbol, limit=8):
url = f"https://fapi.binance.com/fapi/v1/fundingRate?symbol={symbol}&limit={limit}"
resp = requests.get(url, timeout=10)
return [float(item['fundingRate']) * 100 for item in resp.json()]
def get_open_interest(symbol):
url = f"https://fapi.binance.com/fapi/v1/openInterest?symbol={symbol}"
resp = requests.get(url, timeout=10)
data = resp.json()
return float(data['openInterest'])
def get_klines(symbol, interval="4h", limit=6):
url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
resp = requests.get(url, timeout=10)
return resp.json()
# === 信号检测 ===
def detect_extreme_negative_funding(symbol, funding_rate, funding_rates_map):
"""
策略1: 费率极端深负 → 做多(逼空)
条件: 当前费率<-0.08% 且 连续多期为负
"""
if funding_rate >= -0.08:
return None
try:
history = get_funding_history(symbol, 8)
neg_count = sum(1 for r in history if r < -0.03)
if neg_count < 4:
return None
avg_rate = sum(history) / len(history)
# 费率越极端,信号越强
strength = "S" if avg_rate < -0.15 else "A" if avg_rate < -0.10 else "B"
return {
"type": "extreme_neg_funding",
"direction": "long",
"strength": strength,
"reason": f"费率极端深负 avg:{avg_rate:.4f}% 连续{neg_count}/8期为负 逼空概率高",
"sl_pct": 0.08, # 止损8%
"tp_pct": 0.12, # 止盈12%
}
except:
return None
def detect_extreme_positive_funding(symbol, funding_rate, funding_rates_map):
"""
策略2: 费率极端正 → 做空(多头拥挤)
条件: 当前费率>0.10% 且 连续多期高正
"""
if funding_rate <= 0.10:
return None
try:
history = get_funding_history(symbol, 8)
pos_count = sum(1 for r in history if r > 0.05)
if pos_count < 4:
return None
avg_rate = sum(history) / len(history)
strength = "S" if avg_rate > 0.20 else "A" if avg_rate > 0.12 else "B"
return {
"type": "extreme_pos_funding",
"direction": "short",
"strength": strength,
"reason": f"费率极端正 avg:{avg_rate:.4f}% 连续{pos_count}/8期高正 多头过度拥挤",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
return None
def detect_crash_bounce(ticker):
"""
策略3: 暴跌后反弹(超跌反弹)
条件: 24h跌>25% 但最近4h企稳/反弹
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct >= -25:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
# 最近2根K线
recent_closes = [float(k[4]) for k in klines[-3:]]
# 企稳: 最近K线收盘 >= 前一根
if len(recent_closes) >= 2 and recent_closes[-1] >= recent_closes[-2]:
return {
"type": "crash_bounce",
"direction": "long",
"strength": "B", # 风险较高给B级
"reason": f"24h暴跌{change_pct:.1f}%后企稳 超跌反弹",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
pass
return None
def detect_pump_short(ticker):
"""
策略4: 暴涨后做空(ATH回落)
条件: 24h涨>40% — 根据生命周期模型,暴涨后回调概率>85%
需要确认已经开始回落(不在最高点做空)
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct <= 40:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
highs = [float(k[2]) for k in klines]
closes = [float(k[4]) for k in klines]
current = closes[-1]
peak = max(highs)
# 从最高点回落超过10%才做空
pullback = (peak - current) / peak * 100
if pullback < 10:
return None
strength = "A" if change_pct > 80 else "B"
return {
"type": "pump_short",
"direction": "short",
"strength": strength,
"reason": f"24h暴涨{change_pct:.1f}%后回落{pullback:.1f}% 历史回调概率>85%",
"sl_pct": 0.15, # 暴涨币波动大,止损宽一些
"tp_pct": 0.20,
}
except:
pass
return None
# === 综合环境检查 ===
def check_environment(symbol, signal):
"""
开仓前综合检查:不是单一信号触发就开,要多维度对齐
返回 (pass/fail, analysis_dict, adjusted_strength)
"""
analysis = {
"btc_env": "",
"sentiment": "",
"oi_check": "",
"volume_check": "",
"verdict": ""
}
score = 0 # 综合得分,>=3才开仓
try:
# 1. BTC环境 — 做多需要BTC不在暴跌,做空需要BTC不在暴涨
btc_url = "https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT"
btc = requests.get(btc_url, timeout=5).json()
btc_chg = float(btc['priceChangePercent'])
if signal["direction"] == "long":
if btc_chg > -2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 环境正常 +1"
elif btc_chg < -5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 暴跌中做多危险 -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 偏弱 0"
else: # short
if btc_chg < 2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 环境正常 +1"
elif btc_chg > 5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 暴涨中做空危险 -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 偏强 0"
# 2. 市场情绪(Fear & Greed)
try:
fng = requests.get("https://api.alternative.me/fng/", timeout=5).json()
fng_val = int(fng['data'][0]['value'])
if signal["direction"] == "long":
if fng_val <= 25:
score += 1
analysis["sentiment"] = f"FGI={fng_val}极度恐惧 逆向做多 +1"
elif fng_val >= 75:
score -= 1
analysis["sentiment"] = f"FGI={fng_val}极度贪婪 做多风险 -1"
else:
analysis["sentiment"] = f"FGI={fng_val}中性 0"
else:
if fng_val >= 75:
score += 1
analysis["sentiment"] = f"FGI={fng_val}极度贪婪 逆向做空 +1"
elif fng_val <= 25:
score -= 1
analysis["sentiment"] = f"FGI={fng_val}极度恐惧 做空风险 -1"
else:
analysis["sentiment"] = f"FGI={fng_val}中性 0"
except:
analysis["sentiment"] = "FGI获取失败 0"
# 3. OI变化 — 看该币OI是否支持方向
try:
oi = get_open_interest(symbol)
ticker = requests.get(f"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol={symbol}", timeout=5).json()
price = float(ticker['lastPrice'])
oi_usd = oi * price
if oi_usd > 5_000_000: # OI > 5M说明有关注度
score += 1
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 有关注度 +1"
else:
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 关注度低 0"
except:
analysis["oi_check"] = "OI获取失败 0"
# 4. 成交量 — 量能是否活跃
try:
vol = float(ticker.get('quoteVolume', 0))
if vol > 50_000_000:
score += 1
analysis["volume_check"] = f"24h量={vol/1e6:.0f}M 活跃 +1"
elif vol > 20_000_000:
analysis["volume_check"] = f"24h量={vol/1e6:.0f}M 一般 0"
else:
score -= 1
analysis["volume_check"] = f"24h量={vol/1e6:.0f}M 冷清 -1"
except:
analysis["volume_check"] = "量能获取失败 0"
# 5. 信号本身的强度加分
if signal["strength"] == "S":
score += 2
elif signal["strength"] == "A":
score += 1
# 综合判定: >=3通过
analysis["verdict"] = f"综合得分:{score}/7"
if score >= 3:
return True, analysis, signal["strength"]
else:
return False, analysis, signal["strength"]
except Exception as e:
analysis["verdict"] = f"检查异常:{e} 保守不开"
return False, analysis, signal["strength"]
# === 开仓执行 ===
def execute_open(data, state, symbol, price, signal):
"""执行虚拟开仓 — 先过综合环境检查"""
# 综合环境检查
passed, env_analysis, strength = check_environment(symbol, signal)
env_summary = " | ".join(v for v in env_analysis.values() if v)
if not passed:
log(f"综合检查未通过 {symbol}: {env_summary}")
return
log(f"综合检查通过 {symbol}: {env_summary}")
balance = get_balance(data)
position_usd = balance * POSITION_PCT / 100
if signal["direction"] == "long":
sl = round(price * (1 - signal["sl_pct"]), 6)
tp = round(price * (1 + signal["tp_pct"]), 6)
else:
sl = round(price * (1 + signal["sl_pct"]), 6)
tp = round(price * (1 - signal["tp_pct"]), 6)
trade = {
"id": next_id(data),
"symbol": symbol,
"direction": signal["direction"],
"leverage": LEVERAGE,
"position_pct": POSITION_PCT,
"position_usd": round(position_usd, 4),
"notional_usd": round(position_usd * LEVERAGE, 4),
"entry_price": price,
"stop_loss": sl,
"take_profit": tp,
"entry_time": now_str(),
"exit_price": None,
"exit_time": None,
"exit_reason": None,
"pnl_pct": None,
"pnl_usd": None,
"status": "open",
"pre_analysis": {
"btc_env": env_analysis.get("btc_env", ""),
"sentiment": env_analysis.get("sentiment", ""),
"oi": env_analysis.get("oi_check", ""),
"volume": env_analysis.get("volume_check", ""),
"key_reason": f"[{signal['strength']}级] {signal['reason']}",
"risk": f"综合得分:{env_analysis.get('verdict','')} 策略:{signal['type']}"
},
"post_review": None
}
data["trades"].append(trade)
save_trades(data)
# 记录冷却
state["last_opens"][symbol] = now_str()
save_state(state)
direction_cn = "做多" if signal["direction"] == "long" else "做空"
msg = f"""```
[扫描开仓] #{trade['id']}
币种: {symbol}
方向: {direction_cn} {LEVERAGE}x
入场: {price}
止损: {sl}
止盈: {tp}
仓位: {position_usd:.2f}U
信号: [{signal['strength']}] {signal['reason']}
时间: {trade['entry_time']}
```"""
log(f"开仓 #{trade['id']} {symbol} {direction_cn} @ {price} | {signal['reason']}")
send_tg(msg)
print(msg)
# === 换仓逻辑 ===
def swap_weakest(data, state, open_positions, new_signal, tickers):
"""满仓时遇到S级信号,平掉浮亏最大的持仓,开新仓"""
ticker_map = {t['symbol']: float(t['lastPrice']) for t in tickers}
# 计算每个持仓的浮盈%
worst_trade = None
worst_pnl = float('inf')
for t in open_positions:
price = ticker_map.get(t["symbol"])
if price is None:
continue
if t["direction"] == "long":
pnl_pct = (price - t["entry_price"]) / t["entry_price"] * 100
else:
pnl_pct = (t["entry_price"] - price) / t["entry_price"] * 100
if pnl_pct < worst_pnl:
worst_pnl = pnl_pct
worst_trade = t
worst_price = price
if worst_trade is None:
return
# 只换掉亏损的仓位,盈利的不动
if worst_pnl > 0:
log(f"满仓但所有持仓盈利,不换仓 | 新信号: {new_signal['symbol']}")
return
# 平掉最弱的
if worst_trade["direction"] == "long":
pnl_pct_lev = (worst_price - worst_trade["entry_price"]) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
else:
pnl_pct_lev = (worst_trade["entry_price"] - worst_price) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
pos_usd = worst_trade.get("position_usd", worst_trade.get("position_pct", 30))
pnl_usd = round(pnl_pct_lev / 100 * pos_usd, 4)
worst_trade["exit_price"] = worst_price
worst_trade["exit_time"] = now_str()
worst_trade["exit_reason"] = f"换仓→{new_signal['symbol']}"
worst_trade["pnl_pct"] = round(pnl_pct_lev, 2)
worst_trade["pnl_usd"] = pnl_usd
worst_trade["status"] = "closed"
save_trades(data)
direction_cn = "多" if worst_trade["direction"] == "long" else "空"
msg = f"""```
[换仓平仓] #{worst_trade['id']}
平掉: {worst_trade['symbol']} {direction_cn}
入场: {worst_trade['entry_price']}
出场: {worst_price}
盈亏: {pnl_pct_lev:+.2f}% ({pnl_usd:+.2f}U)
原因: S级信号{new_signal['symbol']}更强
```"""
log(f"换仓平 #{worst_trade['id']} {worst_trade['symbol']} {pnl_usd:+.2f}U → 开 {new_signal['symbol']}")
send_tg(msg)
# 开新仓
execute_open(data, state, new_signal["symbol"], new_signal["price"], new_signal)
# === 主扫描逻辑 ===
def scan():
data = load_trades()
state = load_state()
now = datetime.now(TZ_UTC8)
# 检查持仓数
open_positions = [t for t in data["trades"] if t["status"] == "open"]
open_symbols = set(t["symbol"] for t in open_positions)
if len(open_positions) >= MAX_OPEN_POSITIONS:
return
# 获取市场数据
try:
tickers = get_all_tickers()
funding_rates = get_funding_rates()
except Exception as e:
log(f"API错误: {e}")
return
# 过滤USDT合约 + 最小成交量
exclude = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "FDUSDUSDT", "BTCDOMUSDT", "BTCSTUSDT"}
candidates = [t for t in tickers
if t['symbol'].endswith('USDT')
and t['symbol'] not in exclude
and float(t['quoteVolume']) > MIN_VOLUME_M * 1e6]
signals = []
for ticker in candidates:
symbol = ticker['symbol']
# 跳过已持仓的币
if symbol in open_symbols:
continue
# 冷却检查
last_open = state.get("last_opens", {}).get(symbol)
if last_open:
try:
last_dt = datetime.fromisoformat(last_open)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=TZ_UTC8)
if (now - last_dt).total_seconds() < COOLDOWN_HOURS * 3600:
continue
except:
pass
fr = funding_rates.get(symbol, 0)
# 运行所有策略检测
for detect_fn in [
lambda s, f, m: detect_extreme_negative_funding(s, f, m),
lambda s, f, m: detect_extreme_positive_funding(s, f, m),
lambda s, f, m: detect_crash_bounce(ticker),
lambda s, f, m: detect_pump_short(ticker),
]:
try:
signal = detect_fn(symbol, fr, funding_rates)
if signal:
signal["symbol"] = symbol
signal["price"] = float(ticker['lastPrice'])
signal["volume_m"] = float(ticker['quoteVolume']) / 1e6
signals.append(signal)
except:
continue
if not signals:
return
# 按信号强度排序 S>A>B
strength_order = {"S": 0, "A": 1, "B": 2}
signals.sort(key=lambda x: strength_order.get(x["strength"], 3))
# 只取最强的信号开仓(一次最多开1笔)
best = signals[0]
# B级信号跳过,只开S和A级
if best["strength"] == "B":
log(f"B级信号跳过: {best['symbol']} {best['reason']}")
return
slots = MAX_OPEN_POSITIONS - len(open_positions)
if slots > 0:
execute_open(data, state, best["symbol"], best["price"], best)
elif best["strength"] == "S":
# 满仓但遇到S级信号 → 换掉最弱的持仓
swap_weakest(data, state, open_positions, best, tickers)
if __name__ == "__main__":
scan()
Công Cụ Tiện Ích
VoiceKey — Xác thực giọng nói
Ngày: 2026.04.27 Thẻ: Python · Security · Telegram · Speaker Verification
Bảo vệ AI agent của bạn bằng xác thực giọng nói
Xác minh người nói cho bảo mật bot Telegram. Sử dụng resemblyzer (mô hình GE2E) để trích xuất embedding giọng nói và so sánh độ tương đồng cosine. Đăng nhập bằng giọng nói thay cho mật khẩu — chỉ những người nói đã biết mới có thể đăng nhập. Phụ thuộc nhẹ, Python thuần.
Mã nguồn đầy đủ
#!/usr/bin/env python3
"""
VoiceKey — 声纹密钥 (Voiceprint Authentication)
Speaker verification for Telegram bot security.
Uses resemblyzer (GE2E model) for speaker embedding extraction
and cosine similarity for verification.
Zero AI cost — runs entirely on local CPU.
Usage:
# Register voiceprint from audio files
python voicekey.py register --audio voice1.ogg voice2.ogg --owner "YourName"
# Verify a voice against stored voiceprint
python voicekey.py verify --audio test.ogg
# As a Python module
from voicekey import VoiceKey
vk = VoiceKey()
vk.register(["voice1.ogg", "voice2.ogg"], owner="YourName")
is_owner, score = vk.verify("test.ogg")
"""
import os
import json
import tempfile
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime
# Lazy imports to speed up module load when not needed
_encoder = None
def _get_encoder():
"""Lazy-load the voice encoder model (first call takes ~1s)."""
global _encoder
if _encoder is None:
from resemblyzer import VoiceEncoder
_encoder = VoiceEncoder()
return _encoder
def _audio_to_wav(audio_path: str) -> str:
"""Convert any audio format to 16kHz mono WAV for processing."""
from pydub import AudioSegment
ext = Path(audio_path).suffix.lower()
if ext in ('.ogg', '.oga'):
audio = AudioSegment.from_ogg(audio_path)
elif ext == '.mp3':
audio = AudioSegment.from_mp3(audio_path)
elif ext == '.wav':
return audio_path # already wav
elif ext in ('.m4a', '.aac'):
audio = AudioSegment.from_file(audio_path, format=ext.lstrip('.'))
else:
audio = AudioSegment.from_file(audio_path)
# Convert to 16kHz mono
audio = audio.set_frame_rate(16000).set_channels(1)
tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
audio.export(tmp.name, format='wav')
return tmp.name
def _extract_embedding(audio_path: str) -> np.ndarray:
"""Extract voice embedding from an audio file."""
from resemblyzer import preprocess_wav
encoder = _get_encoder()
wav_path = _audio_to_wav(audio_path)
wav = preprocess_wav(wav_path)
# Cleanup temp file
if wav_path != audio_path:
os.unlink(wav_path)
if len(wav) < 1600: # Less than 0.1s
raise ValueError(f"Audio too short: {len(wav)/16000:.1f}s (need >0.1s)")
return encoder.embed_utterance(wav)
class VoiceKey:
"""
Speaker verification using voice embeddings.
Attributes:
data_dir: Directory to store voiceprint data
threshold: Cosine similarity threshold for verification (default: 0.75)
voiceprint: The registered owner's voice embedding (256-dim vector)
"""
def __init__(self, data_dir: str = None, threshold: float = 0.75):
if data_dir is None:
data_dir = os.path.expanduser("~/.hermes/voiceprint")
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.threshold = threshold
self.voiceprint = None
self.metadata = {}
self._load()
def _load(self):
"""Load existing voiceprint if available."""
vp_path = self.data_dir / "voiceprint.npy"
meta_path = self.data_dir / "voiceprint_meta.json"
if vp_path.exists():
self.voiceprint = np.load(str(vp_path))
if meta_path.exists():
with open(meta_path) as f:
self.metadata = json.load(f)
self.threshold = self.metadata.get("threshold", self.threshold)
@property
def is_registered(self) -> bool:
"""Check if a voiceprint is registered."""
return self.voiceprint is not None
def register(self, audio_paths: list, owner: str = "owner") -> dict:
"""
Register a voiceprint from multiple audio samples.
Args:
audio_paths: List of audio file paths (ogg, mp3, wav, etc.)
owner: Name of the voiceprint owner
Returns:
dict with registration results
"""
embeddings = []
results = []
for path in audio_paths:
try:
embed = _extract_embedding(path)
embeddings.append(embed)
results.append({"file": os.path.basename(path), "status": "ok"})
except Exception as e:
results.append({"file": os.path.basename(path), "status": "error", "error": str(e)})
if not embeddings:
raise ValueError("No valid audio samples provided")
# Average embeddings and normalize
voiceprint = np.mean(embeddings, axis=0)
voiceprint = voiceprint / np.linalg.norm(voiceprint)
# Save
np.save(str(self.data_dir / "voiceprint.npy"), voiceprint)
self.metadata = {
"owner": owner,
"samples_used": len(embeddings),
"embedding_dim": int(voiceprint.shape[0]),
"threshold": self.threshold,
"created": datetime.now().isoformat(),
}
with open(self.data_dir / "voiceprint_meta.json", "w") as f:
json.dump(self.metadata, f, indent=2)
self.voiceprint = voiceprint
# Self-test
similarities = []
for embed in embeddings:
sim = float(np.dot(voiceprint, embed / np.linalg.norm(embed)))
similarities.append(sim)
return {
"owner": owner,
"samples": len(embeddings),
"self_test_scores": similarities,
"min_score": min(similarities),
"details": results,
}
def verify(self, audio_path: str) -> tuple:
"""
Verify if an audio sample matches the registered voiceprint.
Args:
audio_path: Path to audio file to verify
Returns:
tuple: (is_verified: bool, similarity_score: float)
"""
if not self.is_registered:
raise RuntimeError("No voiceprint registered. Call register() first.")
embed = _extract_embedding(audio_path)
embed = embed / np.linalg.norm(embed)
similarity = float(np.dot(self.voiceprint, embed))
is_verified = similarity >= self.threshold
return is_verified, similarity
def get_info(self) -> dict:
"""Get voiceprint registration info."""
if not self.is_registered:
return {"registered": False}
return {
"registered": True,
**self.metadata,
}
def main():
parser = argparse.ArgumentParser(
description="VoiceKey — Speaker verification for security"
)
sub = parser.add_subparsers(dest="command")
# Register
reg = sub.add_parser("register", help="Register voiceprint from audio files")
reg.add_argument("--audio", nargs="+", required=True, help="Audio files (ogg/mp3/wav)")
reg.add_argument("--owner", default="owner", help="Owner name")
reg.add_argument("--data-dir", default=None, help="Data directory")
reg.add_argument("--threshold", type=float, default=0.75, help="Verification threshold")
# Verify
ver = sub.add_parser("verify", help="Verify audio against voiceprint")
ver.add_argument("--audio", required=True, help="Audio file to verify")
ver.add_argument("--data-dir", default=None, help="Data directory")
# Info
sub.add_parser("info", help="Show voiceprint info")
args = parser.parse_args()
if args.command == "register":
vk = VoiceKey(data_dir=args.data_dir, threshold=args.threshold)
result = vk.register(args.audio, owner=args.owner)
print(f"Registered voiceprint for: {result['owner']}")
print(f"Samples used: {result['samples']}")
print(f"Self-test scores: {[f'{s:.4f}' for s in result['self_test_scores']]}")
print(f"Min score: {result['min_score']:.4f} (threshold: {args.threshold})")
elif args.command == "verify":
vk = VoiceKey(data_dir=getattr(args, 'data_dir', None))
is_ok, score = vk.verify(args.audio)
status = "PASS" if is_ok else "FAIL"
print(f"[{status}] Similarity: {score:.4f} (threshold: {vk.threshold})")
elif args.command == "info":
vk = VoiceKey()
info = vk.get_info()
for k, v in info.items():
print(f" {k}: {v}")
else:
parser.print_help()
if __name__ == "__main__":
main()
Lời kết
7 đoạn code phía trên đều lấy từ connectfarm1.com vào ngày 2026-05-04. Cả 7 chia sẻ ba triết lý: (1) Chi phí AI gần bằng 0 — hầu hết dùng rule engine và API công khai miễn phí thay cho LLM; (2) Chỉ dùng Python, dễ chạy trên VPS giá rẻ với crontab hoặc một vòng while True đơn giản; (3) Telegram là kênh đầu ra duy nhất — không dashboard, không front-end, đẩy tin trực tiếp tới nơi bạn thật sự đọc. Hãy kết hợp các radar để xác thực tín hiệu chéo, và xem “AI giao dịch tự động” như một sandbox nghiên cứu, không phải máy in tiền.