本文把 Code Vault 当前发布的 7 个代码片段做一次完整梳理 —— 这是一个个人代码仓,内容覆盖加密交易雷达、自主交易系统、安全工具,全部纯 Python,零或接近零的 API 成本。每个片段下方都附了完整源码,可以直接读、Fork、本地跑。右侧目录可以跳转到具体工具。
⚠️ 风险提示 —— 这些工具直接对接行情和链上数据,部分会通过 Telegram 实时推送告警,其中"AI 自主交易"会在币安合约上开虚拟仓。请仔细阅读每个工具下方的说明。使用风险自担,作者不对交易结果做任何担保。
交易雷达
V神卖币雷达
发布日期: 2026.05.02 标签: Python · WebSocket · Ethereum · Telegram · Event-Driven
GitHub: vitalik-sell-radar
WebSocket事件驱动 · V神钱包卖出检测 · 秒级TG推送
通过WebSocket事件订阅实时监控V神钱包(vitalik.eth)的ERC-20代币卖出行为 — 零轮询,亚秒级延迟。自动分类接收方:DEX Router(Uniswap/1inch/SushiSwap)、CEX热钱包(币安/Coinbase/Kraken)、LP池。DexScreener实时查价。多RPC故障切换+自动重连。纯Python零成本,使用免费公共RPC节点。
完整源码
#!/usr/bin/env python3
"""
Vitalik Sell Radar — Event-Driven Edition
WebSocket subscription to ERC-20 Transfer events, real-time detection of
Vitalik's sell activity, instant push to Telegram.
Architecture:
1. WebSocket subscribes to Transfer(from=vitalik) events → sub-second detection
2. Classifies sell behavior (transfers to DEX Router / CEX / LP Pool)
3. Queries token info + price via DexScreener
4. Pushes alert to Telegram
5. Auto-reconnect + multi-RPC failover
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import aiohttp
import websockets
# Load .env file
def load_env():
env_path = Path(__file__).parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_env()
# ============================================================
# Configuration
# ============================================================
# Vitalik's main wallet
VITALIK_ADDRESS = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
VITALIK_PADDED = "0x" + VITALIK_ADDRESS[2:].lower().zfill(64)
# ERC-20 Transfer event topic
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
# WebSocket RPC endpoints (free, support eth_subscribe)
WS_ENDPOINTS = [
"wss://ethereum-rpc.publicnode.com",
"wss://eth.drpc.org",
"wss://ethereum.publicnode.com",
]
# HTTP RPC for querying token info
HTTP_RPC = os.environ.get("HTTP_RPC", "https://eth.drpc.org")
# Telegram
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# Known DEX Router addresses (sell destinations)
KNOWN_DEX_ROUTERS = {
# Uniswap
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap V2 Router",
"0xe592427a0aece92de3edee1f18e0157c05861564": "Uniswap V3 Router",
"0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "Uniswap V3 Router2",
"0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad": "Uniswap Universal Router",
"0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b": "Uniswap Universal Router (old)",
# 1inch
"0x1111111254eeb25477b68fb85ed929f73a960582": "1inch V5",
"0x111111125421ca6dc452d289314280a0f8842a65": "1inch V6",
# SushiSwap
"0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "SushiSwap Router",
# CoW Protocol
"0x9008d19f58aabd9ed0d60971565aa8510560ab41": "CoW Settlement",
# 0x
"0xdef1c0ded9bec7f1a1670819833240f027b25eff": "0x Exchange Proxy",
# Curve
"0x99a58482bd75cbab83b27ec03ca68ff489b5788f": "Curve Router",
}
# Known CEX hot wallets (partial list)
KNOWN_CEX = {
"0x28c6c06298d514db089934071355e5743bf21d60": "Binance Hot Wallet",
"0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance Hot Wallet 2",
"0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance Hot Wallet 3",
"0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance Hot Wallet 4",
"0x71660c4005ba85c37ccec55d0c4493e66fe775d3": "Coinbase",
"0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43": "Coinbase 10",
"0x503828976d22510aad0201ac7ec88293211d23da": "Coinbase 2",
"0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2": "FTX (defunct)",
"0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0": "Kraken",
"0xae2d4617c862309a3d75a0ffb358c7a5009c673f": "Kraken 10",
}
# Minimum notification amount (USD), 0 = notify all
MIN_NOTIFY_USD = int(os.environ.get("MIN_NOTIFY_USD", "0"))
# Reconnect settings
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60
# ============================================================
# Logging
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("vitalik-radar")
# ============================================================
# Global state
# ============================================================
# Dedup set for processed tx hashes (last 1000)
seen_txs: set = set()
seen_txs_list: list = []
# HTTP session
http_session: aiohttp.ClientSession | None = None
# Token info cache: {address: {symbol, name, decimals}}
token_cache: dict = {}
# Running flag
running = True
# ============================================================
# Utility functions
# ============================================================
def shorten_addr(addr: str) -> str:
"""Shorten address for display"""
return f"{addr[:6]}...{addr[-4:]}"
def decode_transfer_value(data_hex: str, decimals: int) -> float:
"""Decode Transfer event value"""
try:
raw = int(data_hex, 16)
return raw / (10 ** decimals)
except:
return 0.0
def topic_to_address(topic: str) -> str:
"""Extract 20-byte address from 32-byte topic"""
return "0x" + topic[-40:]
def classify_recipient(to_addr: str) -> tuple[str, str]:
"""
Classify recipient address.
Returns (type, name)
Type: "dex" / "cex" / "pool" / "unknown"
"""
to_lower = to_addr.lower()
if to_lower in KNOWN_DEX_ROUTERS:
return "dex", KNOWN_DEX_ROUTERS[to_lower]
if to_lower in KNOWN_CEX:
return "cex", KNOWN_CEX[to_lower]
return "unknown", ""
async def get_http_session() -> aiohttp.ClientSession:
global http_session
if http_session is None or http_session.closed:
http_session = aiohttp.ClientSession()
return http_session
async def rpc_call(method: str, params: list) -> dict | None:
"""HTTP JSON-RPC call"""
session = await get_http_session()
try:
async with session.post(
HTTP_RPC,
json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params},
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
data = await resp.json()
return data.get("result")
except Exception as e:
log.error(f"RPC call {method} failed: {e}")
return None
async def get_token_info(token_addr: str) -> dict:
"""Query ERC-20 token info (symbol, name, decimals)"""
addr_lower = token_addr.lower()
if addr_lower in token_cache:
return token_cache[addr_lower]
info = {"symbol": "???", "name": "Unknown", "decimals": 18, "address": token_addr}
# symbol()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x95d89b41"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
symbol_hex = hex_str[offset+64:offset+64+length*2]
info["symbol"] = bytes.fromhex(symbol_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["symbol"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
# decimals()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x313ce567"}, "latest"
])
if result and len(result) > 2:
try:
info["decimals"] = int(result, 16)
except:
pass
# name()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x06fdde03"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
name_hex = hex_str[offset+64:offset+64+length*2]
info["name"] = bytes.fromhex(name_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["name"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
token_cache[addr_lower] = info
return info
async def check_if_pool(addr: str) -> bool:
"""Check if address is a Uniswap V2/V3 pool (has token0 method)"""
result = await rpc_call("eth_call", [
{"to": addr, "data": "0x0dfe1681"}, "latest" # token0()
])
if result and len(result) == 66: # 0x + 64 hex chars
return True
return False
async def get_eth_price() -> float:
"""Get ETH price from CoinGecko"""
session = await get_http_session()
try:
async with session.get(
"https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
return data.get("ethereum", {}).get("usd", 0)
except:
return 2300 # fallback
async def get_token_price_usd(token_addr: str) -> float | None:
"""Get token USD price from DexScreener (free, no API key needed)"""
session = await get_http_session()
try:
async with session.get(
f"https://api.dexscreener.com/latest/dex/tokens/{token_addr}",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
pairs = data.get("pairs", [])
if pairs:
# Pick pair with highest liquidity
pairs.sort(key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0), reverse=True)
price = float(pairs[0].get("priceUsd", 0) or 0)
return price if price > 0 else None
except:
pass
return None
# ============================================================
# Telegram notifications
# ============================================================
async def send_telegram(text: str):
"""Send Telegram message"""
if not TG_BOT_TOKEN:
log.warning("[TG] No bot token configured, skipping notification")
return
session = await get_http_session()
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
try:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
result = await resp.json()
if not result.get("ok"):
log.error(f"[TG] Send failed: {result.get('description', '')}")
# Retry with plain text if HTML parse fails
if "parse" in result.get("description", "").lower():
payload["parse_mode"] = None
async with session.post(url, json=payload) as resp2:
pass
else:
log.info("[TG] Message sent")
except Exception as e:
log.error(f"[TG] Error: {e}")
# ============================================================
# Event handling
# ============================================================
async def handle_transfer_event(log_entry: dict):
"""Process a single Transfer event"""
tx_hash = log_entry.get("transactionHash", "")
token_addr = log_entry.get("address", "")
topics = log_entry.get("topics", [])
data = log_entry.get("data", "0x0")
# Dedup
dedup_key = f"{tx_hash}:{token_addr}"
if dedup_key in seen_txs:
return
seen_txs.add(dedup_key)
seen_txs_list.append(dedup_key)
# Cleanup (keep last 1000)
while len(seen_txs_list) > 1000:
old = seen_txs_list.pop(0)
seen_txs.discard(old)
# Parse topics: [Transfer, from, to]
if len(topics) < 3:
return
from_addr = topic_to_address(topics[1])
to_addr = topic_to_address(topics[2])
# Confirm it's from Vitalik
if from_addr.lower() != VITALIK_ADDRESS.lower():
return
# Get token info
token_info = await get_token_info(token_addr)
symbol = token_info["symbol"]
decimals = token_info["decimals"]
amount = decode_transfer_value(data, decimals)
if amount == 0:
return
# Classify recipient
recipient_type, recipient_name = classify_recipient(to_addr)
# If unknown, check if it's an LP pool
is_pool = False
if recipient_type == "unknown":
is_pool = await check_if_pool(to_addr)
if is_pool:
recipient_type = "pool"
recipient_name = "LP Pool"
# Determine if this is a "sell" action
is_sell = recipient_type in ("dex", "cex", "pool")
# If not a sell, just a regular transfer — log silently
if not is_sell:
log.info(f"[Transfer] {symbol} {amount:,.2f} → {shorten_addr(to_addr)} (regular transfer, not notifying)")
return
# Get price
token_price = await get_token_price_usd(token_addr)
usd_value = amount * token_price if token_price else None
# Below minimum notification amount — skip
if usd_value is not None and usd_value < MIN_NOTIFY_USD:
log.info(f"[Sell] {symbol} ${usd_value:.0f} < ${MIN_NOTIFY_USD} minimum, skipping")
return
# Build notification message
timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S UTC")
sell_type_label = {
"dex": "🔄 DEX Sell",
"cex": "🏦 CEX Transfer",
"pool": "🌊 Pool Sell",
}
msg_lines = [
f"<b>🚨 Vitalik Sell Signal</b>",
f"",
f"Token: <b>{symbol}</b>",
f"Amount: {amount:,.4f}",
]
if usd_value is not None:
msg_lines.append(f"Value: <b>${usd_value:,.0f}</b>")
if token_price is not None:
msg_lines.append(f"Price: ${token_price:,.8f}")
msg_lines.extend([
f"",
f"Type: {sell_type_label.get(recipient_type, 'Unknown')}",
f"Destination: {recipient_name or shorten_addr(to_addr)}",
f"Time: {timestamp}",
f"",
f"TX: https://etherscan.io/tx/{tx_hash}",
f"Token: https://dexscreener.com/ethereum/{token_addr}",
])
msg = "\n".join(msg_lines)
log.info(f"[SELL DETECTED] {symbol} {amount:,.4f} → {recipient_name or to_addr} | ${usd_value or '?'}")
await send_telegram(msg)
# ============================================================
# WebSocket listener main loop
# ============================================================
async def subscribe_and_listen(ws_url: str):
"""Connect to WebSocket and subscribe to Vitalik Transfer events"""
log.info(f"Connecting to {ws_url}...")
async with websockets.connect(
ws_url,
ping_interval=20,
ping_timeout=30,
close_timeout=10,
max_size=2**20, # 1MB
) as ws:
# Subscribe to Transfer FROM Vitalik
sub_from = {
"jsonrpc": "2.0", "id": 1,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_from))
resp = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(resp)
if "error" in data:
raise Exception(f"Subscribe failed: {data['error']}")
sub_id_from = data.get("result", "")
log.info(f"✅ Subscribed to Transfer FROM Vitalik (id={sub_id_from[:10]}...)")
# Also subscribe to Transfer TO Vitalik (monitor buys/receives)
sub_to = {
"jsonrpc": "2.0", "id": 2,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, None, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_to))
resp2 = await asyncio.wait_for(ws.recv(), timeout=10)
data2 = json.loads(resp2)
sub_id_to = data2.get("result", "")
if sub_id_to:
log.info(f"✅ Subscribed to Transfer TO Vitalik (id={sub_id_to[:10]}...)")
log.info("🔍 Monitoring Vitalik wallet... waiting for events")
# Listen for events
async for message in ws:
if not running:
break
try:
evt = json.loads(message)
if "params" not in evt:
continue
sub_id = evt["params"].get("subscription", "")
log_entry = evt["params"].get("result", {})
if sub_id == sub_id_from:
# Vitalik sent tokens → check if it's a sell
await handle_transfer_event(log_entry)
elif sub_id == sub_id_to:
# Vitalik received tokens — log silently
token_addr = log_entry.get("address", "")
token_info = await get_token_info(token_addr)
data_hex = log_entry.get("data", "0x0")
amount = decode_transfer_value(data_hex, token_info["decimals"])
log.debug(f"[Receive] {token_info['symbol']} +{amount:,.4f}")
except json.JSONDecodeError:
continue
except Exception as e:
log.error(f"Event handling error: {e}", exc_info=True)
async def main():
"""Main loop with auto-reconnect"""
global running
# Validate required config
if not TG_BOT_TOKEN:
log.warning("TG_BOT_TOKEN not set — Telegram notifications disabled")
if not TG_CHAT_ID:
log.warning("TG_CHAT_ID not set — Telegram notifications disabled")
# Signal handling
def shutdown(sig, frame):
global running
log.info(f"Received signal {sig}, shutting down...")
running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# Startup notice
log.info("=" * 50)
log.info("Vitalik Sell Radar — Started")
log.info(f"Monitoring: {VITALIK_ADDRESS}")
log.info(f"Min notify: ${MIN_NOTIFY_USD}")
log.info(f"Telegram: {'✅ Configured' if TG_BOT_TOKEN else '❌ Not configured'}")
log.info("=" * 50)
if TG_BOT_TOKEN and TG_CHAT_ID:
await send_telegram(
"🟢 Vitalik Sell Radar — Started\n\n"
f"Monitoring: {shorten_addr(VITALIK_ADDRESS)}\n"
f"Min notify: ${MIN_NOTIFY_USD}\n"
"Mode: WebSocket event-driven (sub-second latency)"
)
endpoint_idx = 0
reconnect_delay = RECONNECT_DELAY
while running:
ws_url = WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]
try:
await subscribe_and_listen(ws_url)
reconnect_delay = RECONNECT_DELAY # reset on success
except websockets.exceptions.ConnectionClosed as e:
log.warning(f"WebSocket closed: {e}")
except asyncio.TimeoutError:
log.warning("WebSocket timeout")
except Exception as e:
log.error(f"WebSocket error: {e}")
if not running:
break
# Switch endpoint and retry
endpoint_idx += 1
log.info(f"Reconnecting in {reconnect_delay}s... (next: {WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]})")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 1.5, MAX_RECONNECT_DELAY)
# Cleanup
if http_session and not http_session.closed:
await http_session.close()
log.info("Vitalik Sell Radar — Stopped")
if __name__ == "__main__":
asyncio.run(main())
链上雷达
发布日期: 2026.04.28 标签: Python · GMGN · DEXScreener · Telegram
动量驱动 · 链上新币发现 · 覆盖 ETH/SOL/BSC/Base
动量是唯一推送引擎,叙事只做分类标签。每30秒扫描4条链(ETH/SOL/BSC/Base),连涨3轮+涨幅5%才推送。叙事分类(马斯克/川普、币安/CZ、名人热点)只标星级(★★★/★★/★),不独立触发推送。安全检查:SOL用RugCheck、EVM用GoPlus。纯Python零AI成本。
完整源码
#!/usr/bin/env python3
"""
叙事雷达 → 链上雷达 v1
纯Python,零AI成本(关键词匹配 + 叙事去重)
三条推送通道:
1. 全新叙事 — 从未见过的概念/故事,全链推
2. 马斯克/川普相关 — 重点ETH+SOL,BSC也推
3. 币安/CZ相关 — 只推BSC
数据源:GMGN新币 + DEXScreener搜索
叙事历史:SQLite去重
"""
import requests
import json
import time
import os
import re
import sqlite3
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from difflib import SequenceMatcher
# === 配置 ===
DATA_DIR = os.path.expanduser("~/crypto-trading")
DB_FILE = os.path.join(DATA_DIR, "narrative_history.db")
LOG_FILE = os.path.join(DATA_DIR, "narrative_radar.log")
SEEN_FILE = os.path.join(DATA_DIR, "narrative_seen.json")
FLAP_SEEN_FILE = os.path.join(DATA_DIR, "flap_seen.json")
# 扫描间隔
SCAN_INTERVAL = 30 # 30秒(GMGN数据约1-5分钟刷新一次,10秒太频繁且数据不变)
# 动量追踪器 — 内存中记录每个币的价格/市值快照
# {address: [{'ts': timestamp, 'mc': market_cap, 'vol': volume, 'price': price}, ...]}
MOMENTUM_TRACKER = {}
MOMENTUM_PUSHED = {} # {address: {'count': N, 'last_ts': ts, 'last_mc': mc}} 推送计数
MOMENTUM_CONSECUTIVE_UP = 3 # 连续涨3轮(数据实际变化时才算一轮)
# 从.env读取TG配置
def load_env():
env = {}
env_file = os.path.expanduser("~/.env")
if os.path.exists(env_file):
with open(env_file) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v
return env
ENV = load_env()
TG_TOKEN = ENV.get('TELEGRAM_BOT_TOKEN', '')
TG_CHAT_ID = int(os.environ.get('TG_CHAT_ID', '0'))
GMGN_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'application/json',
'Referer': 'https://gmgn.ai/',
}
# ============================================================
# 马斯克/川普关键词库(大小写不敏感)
# ============================================================
MUSK_TRUMP_KEYWORDS = {
# 马斯克核心
'musk', 'elon', 'elonmusk',
# SpaceX/Tesla/X
'spacex', 'starship', 'tesla', 'cybertruck', 'roadster',
'neuralink', 'boring', 'hyperloop', 'xai', 'grok',
# 马斯克相关人物/宠物/梗
'floki', 'shiba', # 只在新币上下文中用
'doge father', 'dogefather', 'technoking',
'mars colony', 'mars',
# 川普核心
'trump', 'donald', 'maga', 'potus', 'trump47',
'melania', 'barron', 'ivanka',
# 川普相关
'dark maga', 'darkmaga', 'ultra maga', 'save america',
'truth social', 'covfefe',
# 马斯克+川普联动
'doge department', 'd.o.g.e', 'government efficiency',
}
# 马斯克/川普正则(捕捉变体)
MUSK_TRUMP_PATTERNS = [
r'\belon\b', r'\bmusk\b', r'\btrump\b', r'\bmaga\b',
r'\bspacex\b', r'\bstarship\b', r'\btesla\b', r'\bgrok\b',
r'\bmelania\b', r'\bbarron\b', r'\bdoge\s*department\b',
r'\bd\.?o\.?g\.?e\b', # D.O.G.E变体
r'\bx\s*ai\b', r'\bneuralink\b',
]
# ============================================================
# 币安/CZ关键词库
# ============================================================
BINANCE_CZ_KEYWORDS = {
# CZ核心
'cz', 'changpeng', 'zhao', 'czb', 'czbinance',
# 何一(BSC现在的核心推手!)
'heyi', 'yi he', 'he yi', '何一', 'yihe',
'sister yi', 'yi jie', '一姐', '何一姐',
# 币安品牌
'binance', 'bnb', 'pancake', 'pancakeswap',
# CZ相关动态词(书、活动、推特高频词)
'giggle academy', 'binance life', 'bnb chain',
'principles', 'cz book',
# YZi Labs (原Binance Labs)
'yzi', 'yzi labs',
# 中文关键词(BSC上常见)
'赵长鹏', '币安', '长鹏', 'cz的', '何一的',
# Four.meme平台相关
'fourmeme', 'four meme', '4meme',
# CZ/何一推特互动高频词
'czs dog', 'cz dog', 'bnb dog',
'build on bnb', 'bnb ecosystem',
}
BINANCE_CZ_PATTERNS = [
r'\bcz\b', r'\bbinance\b', r'\bbnb\b',
r'\bheyi\b', r'\byi\s*he\b', r'\bhe\s*yi\b',
r'\b何一\b', r'\b一姐\b',
r'\bpancake\b', r'\bgiggle\b', r'\byzi\b',
r'\bfourmeme\b', r'\b4meme\b',
]
# ============================================================
# 推特热点/名人关键词库(★★级别)
# ============================================================
CELEBRITY_VIRAL_KEYWORDS = {
# 科技名人
'vitalik', 'buterin', 'sam altman', 'satoshi',
'michael saylor', 'saylor', 'cathie wood',
'jack dorsey', 'zuckerberg', 'bezos',
'jensen huang', 'nvidia', 'tim cook',
# 币圈名人
'justin sun', 'sun yuchen', '孙宇晨', 'tron',
'arthur hayes', 'su zhu', '3ac',
'brian armstrong', 'coinbase',
'larry fink', 'blackrock',
'gary gensler', 'sec',
'michael novogratz', 'galaxy',
# 政治/社会名人
'biden', 'obama', 'putin', 'xi jinping',
'kanye', 'drake', 'snoop dogg', 'paris hilton',
'mark cuban', 'mr beast', 'mrbeast',
# 病毒式传播热词(龙虾级别的梗)
'lobster', '龙虾', 'lobsta',
'hawk tuah', 'griddy', 'skibidi',
'rizz', 'sigma', 'gyatt',
# 重大事件关键词
'etf', 'halving', '减半',
'world war', 'wwiii',
'fed', 'rate cut', '降息',
'tiktok ban', 'tiktok',
}
CELEBRITY_VIRAL_PATTERNS = [
r'\bvitalik\b', r'\bsaylor\b', r'\bblackrock\b',
r'\bcoinbase\b', r'\bjustin\s*sun\b', r'\blobster\b',
r'\betf\b', r'\bhalving\b', r'\bmrbeast\b',
r'\bsnoop\b', r'\bkanye\b', r'\bdrake\b',
]
# ============================================================
# 通用垃圾词(过滤明显的骗局/低质量币)
# ============================================================
SPAM_PATTERNS = [
r'airdrop', r'presale', r'pre\s*sale',
r'1000x', r'100x guaranteed',
r'safe\s*moon', r'baby\s*\w+', # babydoge等仿盘
r'pornhub', r'porn', r'xxx', r'nsfw',
r'nigga', r'nigger', r'faggot',
r'scam', r'rugpull', r'rug\s*pull',
r'official\s*token', r'official\s*coin',
]
# 常见无叙事意义的单词(过滤单词名币)
COMMON_NOISE_WORDS = {
'nice', 'good', 'bad', 'cool', 'hot', 'big', 'small',
'life', 'love', 'hate', 'happy', 'sad', 'fun', 'lol',
'cat', 'dog', 'moon', 'sun', 'star', 'king', 'queen',
'gold', 'rich', 'cash', 'money', 'pay', 'buy', 'sell',
'pump', 'dump', 'bull', 'bear', 'green', 'red',
'hello', 'world', 'yes', 'no', 'wow', 'omg', 'lmao',
'simp', 'chad', 'based', 'cope', 'seethe',
'test', 'new', 'old', 'real', 'fake',
# 垃圾币名常见词
'shit', 'shitcoin', 'fuck', 'fart', 'poop', 'pee',
'cum', 'dick', 'ass', 'boob', 'tit',
'nigga', 'retard', 'slop',
# 超通用币名
'the', 'and', 'for', 'from', 'with', 'this', 'that',
'coin', 'token', 'meme', 'pepe', 'wojak',
'peg', 'usd', 'usdt', 'usdc', 'dai',
}
# ============================================================
# 工具函数
# ============================================================
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
os.makedirs(DATA_DIR, exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(line + '\n')
def load_flap_seen():
if os.path.exists(FLAP_SEEN_FILE):
try:
with open(FLAP_SEEN_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_flap_seen(data):
# 只保留7天内的
cutoff = int(time.time()) - 86400 * 7
data = {k: v for k, v in data.items() if v > cutoff}
with open(FLAP_SEEN_FILE, 'w') as f:
json.dump(data, f)
def tg_send(text, parse_mode='Markdown'):
if not TG_TOKEN:
log(f"[TG] No token, skip: {text[:80]}")
return False
try:
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text, 'parse_mode': parse_mode},
timeout=10
)
result = resp.json()
if not result.get('ok'):
# Markdown失败时降级到纯文本
if 'can\'t parse' in str(result.get('description', '')).lower():
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text},
timeout=10
)
else:
log(f"[TG] Error: {result.get('description', '')}")
return False
return True
except Exception as e:
log(f"[TG] Send error: {e}")
return False
# ============================================================
# 叙事历史数据库
# ============================================================
def init_db():
"""初始化SQLite叙事历史库"""
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
# 所有见过的叙事主题
c.execute('''CREATE TABLE IF NOT EXISTS narratives (
id INTEGER PRIMARY KEY AUTOINCREMENT,
theme TEXT NOT NULL, -- 归一化的叙事主题(小写)
first_token_name TEXT, -- 第一次出现时的代币名
first_token_address TEXT, -- 第一次出现时的地址
first_chain TEXT, -- 第一次出现的链
first_seen_at INTEGER, -- 第一次看到的时间戳
token_count INTEGER DEFAULT 1, -- 出现过多少次
last_seen_at INTEGER -- 最近一次看到
)''')
# 所有扫描过的代币
c.execute('''CREATE TABLE IF NOT EXISTS tokens_seen (
address TEXT PRIMARY KEY,
chain TEXT,
name TEXT,
symbol TEXT,
narrative_theme TEXT,
category TEXT, -- 'musk_trump' / 'binance_cz' / 'novel' / 'common'
first_seen_at INTEGER,
market_cap REAL,
pushed INTEGER DEFAULT 0, -- 是否已推送
seen_count INTEGER DEFAULT 1 -- 出现次数
)''')
# 索引
c.execute('CREATE INDEX IF NOT EXISTS idx_theme ON narratives(theme)')
c.execute('CREATE INDEX IF NOT EXISTS idx_addr ON tokens_seen(address)')
conn.commit()
return conn
def normalize_theme(name, symbol):
"""
从代币名称+符号提取归一化的叙事主题
例如:'Elon Mars Colony' → 'elon mars colony'
'TRUMP2028' → 'trump'
'PancakeBunny' → 'pancake bunny'
"""
# 合并name和symbol
text = f"{name} {symbol}".lower().strip()
# 去除常见后缀/前缀
noise = ['token', 'coin', 'inu', 'swap', 'finance', 'protocol',
'dao', 'defi', 'nft', 'meta', 'verse', 'fi', 'ai',
'pepe', 'wojak', 'chad', 'based']
# 分割camelCase
text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
# 去除数字(如2028、1000x)
text = re.sub(r'\d+x?', '', text)
# 只保留字母和空格
text = re.sub(r'[^a-z\s]', ' ', text)
# 去噪
words = [w for w in text.split() if w and len(w) > 1 and w not in noise]
if not words:
return name.lower().strip()
return ' '.join(sorted(set(words)))
def is_similar_theme(theme1, theme2, threshold=0.7):
"""模糊匹配两个叙事主题"""
if theme1 == theme2:
return True
# 子串匹配
if theme1 in theme2 or theme2 in theme1:
return True
# 词重叠
words1 = set(theme1.split())
words2 = set(theme2.split())
if words1 and words2:
overlap = len(words1 & words2) / min(len(words1), len(words2))
if overlap >= 0.6:
return True
# 序列匹配
return SequenceMatcher(None, theme1, theme2).ratio() >= threshold
def check_narrative_novelty(conn, theme, name, symbol, address, chain):
"""
检查叙事状态
返回:
('novel', None) — 第一次见到
('heating', narrative_row) — 短时间内持续出现新币!热点信号!
('existing', existing_theme_row) — 已有叙事,不热
核心逻辑:同一主题在30分钟内出现2+个不同的币 = 热点
"""
c = conn.cursor()
now = int(time.time())
HEAT_WINDOW = 1800 # 30分钟窗口
HEAT_THRESHOLD = 2 # 窗口内出现2个以上同主题币就是热点
# 精确匹配
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives WHERE theme = ?', (theme,))
exact = c.fetchone()
if exact:
row_id, _, _, _, _, first_seen, count, last_seen = exact
# 更新计数
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE theme = ?',
(new_count, now, theme))
conn.commit()
# 热点判断:在HEAT_WINDOW内出现了多个币
if now - first_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
# 或者:最近一次和这次间隔很短(说明持续在冒)
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
return ('existing', exact)
# 模糊匹配 — 取最近1000个主题比对
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives ORDER BY last_seen_at DESC LIMIT 1000')
for row in c.fetchall():
if is_similar_theme(theme, row[1]):
row_id, _, _, _, _, first_seen, count, last_seen = row
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE id = ?',
(new_count, now, row[0]))
conn.commit()
# 热点判断
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', row)
return ('existing', row)
# 第一次见到 — 记录
c.execute('''INSERT INTO narratives (theme, first_token_name, first_token_address, first_chain, first_seen_at, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?)''',
(theme, name, address, chain, now, now))
conn.commit()
return ('novel', None)
def get_token_seen_count(conn, address):
"""获取代币出现次数"""
c = conn.cursor()
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
row = c.fetchone()
return row[0] if row else 0
def is_token_seen(conn, address):
"""检查代币是否已经扫描过"""
c = conn.cursor()
c.execute('SELECT address FROM tokens_seen WHERE address = ?', (address,))
return c.fetchone() is not None
def record_token(conn, address, chain, name, symbol, theme, category, mc, pushed=False):
"""记录已扫描的代币 — 重复出现时计数+1"""
c = conn.cursor()
# 检查是否已存在
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
existing = c.fetchone()
if existing:
# 已存在:计数+1,更新市值
new_count = existing[0] + 1
c.execute('''UPDATE tokens_seen SET seen_count = ?, market_cap = ?, category = ?
WHERE address = ?''', (new_count, mc, category, address))
else:
# 新记录
c.execute('''INSERT INTO tokens_seen
(address, chain, name, symbol, narrative_theme, category, first_seen_at, market_cap, pushed, seen_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)''',
(address, chain, name, symbol, theme, category, int(time.time()), mc, 1 if pushed else 0))
conn.commit()
# ============================================================
# 叙事分类引擎
# ============================================================
def classify_narrative(name, symbol, chain):
"""
分类代币叙事
返回:('musk_trump', matched_keywords) / ('binance_cz', matched_keywords) / ('novel', None) / ('common', None)
"""
text = f"{name} {symbol}".lower()
# 1. 检查是否是垃圾币
for pat in SPAM_PATTERNS:
if re.search(pat, text, re.IGNORECASE):
return ('spam', None)
# 2. 马斯克/川普检测
matched_mt = []
for kw in MUSK_TRUMP_KEYWORDS:
if kw.lower() in text:
matched_mt.append(kw)
if not matched_mt:
for pat in MUSK_TRUMP_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_mt.append(m.group())
if matched_mt:
# 马斯克/川普:重点ETH+SOL,BSC也可以
chain_lower = chain.lower()
if chain_lower in ('eth', 'ethereum', 'sol', 'solana', 'bsc', 'base'):
return ('musk_trump', matched_mt)
# 3. 币安/CZ检测 — 只在BSC上推
matched_bc = []
for kw in BINANCE_CZ_KEYWORDS:
if kw.lower() in text:
matched_bc.append(kw)
if not matched_bc:
for pat in BINANCE_CZ_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_bc.append(m.group())
if matched_bc:
chain_lower = chain.lower()
if chain_lower in ('bsc',):
return ('binance_cz', matched_bc)
else:
return ('binance_cz_wrong_chain', matched_bc)
# 4. 名人/推特热点检测(★★级别)
matched_cv = []
for kw in CELEBRITY_VIRAL_KEYWORDS:
if kw.lower() in text:
matched_cv.append(kw)
if not matched_cv:
for pat in CELEBRITY_VIRAL_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_cv.append(m.group())
if matched_cv:
return ('celebrity_viral', matched_cv)
# 5. 都不匹配 → 需要进一步检查是否全新叙事
return ('check_novelty', None)
# ============================================================
# 安全检查(复用现有逻辑)
# ============================================================
def check_token_safety(chain, address):
"""快速安全检查 — 只拦硬伤(蜜罐/可增发),卖税不作为否决条件"""
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://api.rugcheck.xyz/v1/tokens/{address}/report', timeout=10)
if r.status_code == 200:
data = r.json()
score = data.get('score', 999)
mint = data.get('mintAuthority')
freeze = data.get('freezeAuthority')
return {
'safe': not mint and not freeze,
'score': score, 'mint': mint is not None,
'freeze': freeze is not None
}
except:
pass
else:
chain_map = {'ethereum': '1', 'eth': '1', 'bsc': '56', 'base': '8453'}
cid = chain_map.get(chain, '1')
try:
r = requests.get(f'https://api.gopluslabs.io/api/v1/token_security/{cid}?contract_addresses={address}', timeout=10)
if r.status_code == 200:
result = r.json().get('result', {})
data = result.get(address.lower(), {})
if data:
honeypot = data.get('is_honeypot', '0') == '1'
mintable = data.get('is_mintable', '0') == '1'
sell_tax = float(data.get('sell_tax', '0') or '0')
buy_tax = float(data.get('buy_tax', '0') or '0')
return {
'safe': not honeypot and not mintable, # 卖税不作为否决
'honeypot': honeypot, 'mintable': mintable,
'sell_tax': sell_tax, 'buy_tax': buy_tax
}
except:
pass
return {'safe': False, 'reason': '无法检查'} # 无法检查时不推,宁可错过不踩坑
# ============================================================
# GMGN数据获取
# ============================================================
def gmgn_get(url):
try:
resp = requests.get(url, headers=GMGN_HEADERS, timeout=15)
if resp.status_code == 200:
return resp.json().get('data', {})
except:
pass
return {}
def fetch_token_description(chain, address):
"""获取代币描述/故事 — 叙事雷达核心信息"""
desc = ''
# SOL链:Pump.fun有最完整的description
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://frontend-api-v3.pump.fun/coins/{address}', timeout=8)
if r.status_code == 200:
data = r.json()
desc = data.get('description', '') or ''
twitter = data.get('twitter', '') or ''
telegram = data.get('telegram', '') or ''
website = data.get('website', '') or ''
return {
'description': desc.strip(),
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
# 所有链:DEXScreener info字段(网站+社交链接)
try:
chain_dex = {'sol': 'solana', 'eth': 'ethereum', 'bsc': 'bsc', 'base': 'base',
'solana': 'solana', 'ethereum': 'ethereum'}.get(chain, chain)
r = requests.get(f'https://api.dexscreener.com/latest/dex/tokens/{address}', timeout=8)
if r.status_code == 200:
pairs = r.json().get('pairs', [])
if pairs:
info = pairs[0].get('info', {})
websites = info.get('websites', [])
socials = info.get('socials', [])
twitter = ''
telegram = ''
website = ''
for s in socials:
if s.get('type') == 'twitter':
twitter = s.get('url', '')
elif s.get('type') == 'telegram':
telegram = s.get('url', '')
for w in websites:
if w.get('label', '').lower() == 'website':
website = w.get('url', '')
if not desc:
# DEXScreener没有description但有社交信息
return {
'description': desc,
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
return {'description': desc, 'twitter': '', 'telegram': '', 'website': ''}
def fetch_new_tokens():
"""从GMGN获取各链新币 + 多维度覆盖"""
all_tokens = []
seen_addrs = set()
for chain in ['eth', 'bsc', 'base']:
# 多维度拉数据,避免漏掉
urls = [
# 按创建时间 — 最新的币
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=open_timestamp&direction=desc&limit=100',
# 按交易量 — 最活跃的币
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=swaps&direction=desc&limit=50',
]
for url in urls:
data = gmgn_get(url)
tokens = data.get('rank', [])
for t in tokens:
addr = t.get('address', '')
if not addr or addr in seen_addrs:
continue
mc = t.get('market_cap', 0) or t.get('fdv', 0) or 0
liq = t.get('liquidity', 0) or 0
# 基本过滤:太小的不看
if mc < 1000 or liq < 500 or mc > 10000000:
continue
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 999
# 不限年龄 — 动量追踪核心逻辑:涨就推,不管新旧
seen_addrs.add(addr)
all_tokens.append({
'address': addr,
'chain': chain,
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': t.get('volume', 0) or 0,
'holders': t.get('holder_count', 0) or 0,
'sm': t.get('smart_degen_count', 0) or 0,
'chg_1h': t.get('price_change_percent1h', 0) or 0,
'chg_24h': t.get('price_change_percent', 0) or 0,
'age_h': age_h,
'price': t.get('price', 0),
'buys_1h': t.get('buys', 0) or 0,
'sells_1h': t.get('sells', 0) or 0,
})
time.sleep(0.3)
return all_tokens
def fetch_flap_tokens():
"""
FLAP平台扫描 — BSC社区驱动型发射台
找形态:跌下来但有底部支撑(有庄在低位推)
特征:24h跌了,但1h企稳/反弹,买入>卖出,holders在涨
"""
data = gmgn_get(
'https://gmgn.ai/defi/quotation/v1/rank/bsc/swaps/24h?launchpad=flap&orderby=volume&direction=desc&limit=30'
)
tokens = data.get('rank', [])
candidates = []
for t in tokens:
addr = t.get('address', '')
if not addr:
continue
mc = t.get('market_cap', 0) or 0
liq = t.get('liquidity', 0) or 0
vol = t.get('volume', 0) or 0
holders = t.get('holder_count', 0) or 0
buys = t.get('buys', 0) or 0
sells = t.get('sells', 0) or 0
chg_1h = t.get('price_change_percent1h', 0) or 0
chg_24h = t.get('price_change_percent', 0) or 0
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 0
# 基本门槛
if mc < 1000 or liq < 500:
continue
if holders < 5:
continue
# 底部支撑形态判断:
# 条件1: 24h跌了(或者涨幅有限),说明不是刚拉的
# 条件2: 1h跌幅小于24h跌幅,说明在企稳
# 条件3: 买入 > 卖出,有人在接
buy_ratio = buys / max(sells, 1)
is_support = False
reason = ''
# 形态A: 24h跌了,1h在企稳/反弹
if chg_24h < -10 and chg_1h > chg_24h * 0.3:
is_support = True
reason = f'24h跌{chg_24h:.0f}%但1h企稳{chg_1h:+.0f}%'
# 形态B: 24h微跌或横盘,1h微涨,买卖比健康
if -10 <= chg_24h <= 30 and chg_1h > -5 and buy_ratio > 1.1:
is_support = True
reason = f'底部横盘 买卖比{buy_ratio:.2f}'
# 形态C: 大跌后强反弹
if chg_24h < -30 and chg_1h > 10:
is_support = True
reason = f'大跌{chg_24h:.0f}%后反弹{chg_1h:+.0f}%'
if is_support and buy_ratio >= 1.0:
candidates.append({
'address': addr,
'chain': 'bsc',
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': vol,
'holders': holders,
'sm': 0,
'chg_1h': chg_1h,
'chg_24h': chg_24h,
'age_h': age_h,
'price': t.get('price', 0),
'buys': buys,
'sells': sells,
'buy_ratio': buy_ratio,
'support_reason': reason,
'launchpad': 'flap',
})
# 按市值排序
candidates.sort(key=lambda x: x['mc'], reverse=True)
return candidates
def format_flap_alert(token, desc_info=None):
"""FLAP低吸信号推送"""
msg = f"链上雷达 — FLAP低吸信号\n"
msg += f"链: BSC | 平台: FLAP\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 故事描述
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"形态: {token['support_reason']}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"24h量 ${token['volume']:>12,.0f}\n"
msg += f"持有人 {token['holders']:>12,d}\n"
msg += f"买/卖 {token['buys']:>6,d}/{token['sells']:>6,d}\n"
msg += f"买卖比 {token['buy_ratio']:>12.2f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
msg += f"24h涨幅 {token['chg_24h']:>+11.1f}%\n"
msg += f"```\n"
msg += "\nFLAP社区币 — 低吸进场信号"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# 推送格式
# ============================================================
def format_musk_trump_alert(token, matched_kw, desc_info=None):
"""马斯克/川普叙事推送"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"链上雷达 — 马斯克/川普概念\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 叙事故事(核心!)
desc = (desc_info or {}).get('description', '')
if desc:
# 截取前200字符,避免太长
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"命中关键词: {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```\n"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_binance_cz_alert(token, matched_kw, desc_info=None):
"""币安/CZ叙事推送"""
msg = f"链上雷达 — 币安/CZ概念\n"
msg += f"链: BSC\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 叙事故事
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"命中关键词: {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```\n"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_novel_narrative_alert(token, theme, desc_info=None):
"""全新叙事推送 — 保留备用"""
return format_heating_narrative_alert(token, theme, 1, desc_info)
def format_heating_narrative_alert(token, theme, count, desc_info=None):
"""叙事热点推送 — 同主题持续冒新币"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"链上雷达 — 叙事热点\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 叙事故事
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 300:
desc = desc[:300] + '...'
msg += f"故事: {desc}\n\n"
else:
msg += f"叙事主题: {theme}\n\n"
msg += f"同类概念已出现{count}个币 — 持续有人做\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"持有人 {token['holders']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# 动量追踪器 — 持续上涨+放量检测
# ============================================================
def track_momentum(tokens):
"""
每轮扫描更新币的快照。
连续多轮市值上涨+成交量增加 = 动量信号,直接推。
"""
global MOMENTUM_TRACKER, MOMENTUM_PUSHED
now = time.time()
alerts = []
# 当前轮所有地址
current_addrs = set()
for token in tokens:
addr = token['address']
mc = token['mc']
vol = token.get('volume', 0) or 0
price = token.get('price', 0) or 0
buys = token.get('buys_1h', 0) or token.get('buys', 0) or 0
current_addrs.add(addr)
# 基本门槛
if mc < 1000 or token.get('liq', 0) < 500 or mc > 10000000:
continue
# 记录快照 — 只有数据真正变化时才记录(GMGN有缓存)
if addr not in MOMENTUM_TRACKER:
MOMENTUM_TRACKER[addr] = []
snapshots = MOMENTUM_TRACKER[addr]
# 跳过重复数据(跟上一次完全一样就不记录)
if snapshots and snapshots[-1]['mc'] == mc and snapshots[-1]['vol'] == vol:
continue # 数据没变,跳过
snapshots.append({
'ts': now,
'mc': mc,
'vol': vol,
'price': price,
'buys': buys,
})
# 只保留最近20个快照(约200秒)
if len(snapshots) > 20:
snapshots[:] = snapshots[-20:]
# 至少需要3个快照才能判断
if len(snapshots) < MOMENTUM_CONSECUTIVE_UP:
continue
# 检测最近N轮是否持续涨
recent = snapshots[-MOMENTUM_CONSECUTIVE_UP:]
consecutive_up = True
total_gain = 0
for i in range(1, len(recent)):
prev_mc = recent[i-1]['mc']
curr_mc = recent[i]['mc']
if prev_mc <= 0:
consecutive_up = False
break
gain = (curr_mc - prev_mc) / prev_mc
if gain <= 0: # 任何一轮没涨就不算
consecutive_up = False
break
total_gain += gain
if not consecutive_up:
continue
# 连续涨了!检查放量(成交量在增)
vol_increasing = True
for i in range(1, len(recent)):
if recent[i]['buys'] < recent[i-1]['buys'] * 0.8: # 允许小幅波动
vol_increasing = False
break
# 计算总涨幅
first_mc = recent[0]['mc']
last_mc = recent[-1]['mc']
pct_gain = ((last_mc - first_mc) / first_mc * 100) if first_mc > 0 else 0
# 推送条件:连续涨 + 涨幅>5%
if pct_gain < 5:
continue
# 信号计数:同一个币每次触发信号,计数+1
push_info = MOMENTUM_PUSHED.get(addr, {'count': 0, 'last_ts': 0, 'last_mc': 0})
# 必须比上次推送时市值还高才推(真的还在涨)
if push_info['count'] > 0 and last_mc <= push_info['last_mc']:
continue
push_info['count'] += 1
push_info['last_ts'] = now
push_info['last_mc'] = last_mc
signal_count = push_info['count']
# 安全检查
safety = check_token_safety(token['chain'], addr)
if not safety.get('safe'):
continue
# 叙事分类 → 星级评分
category, matched_kw = classify_narrative(token['name'], token['symbol'], token['chain'])
is_flap = token.get('launchpad') == 'flap'
if category == 'musk_trump':
stars = 3
narrative_tag = f"马斯克/川普概念 ({', '.join(matched_kw[:3])})"
elif category == 'binance_cz':
stars = 3
narrative_tag = f"币安/CZ概念 ({', '.join(matched_kw[:3])})"
elif category == 'celebrity_viral':
stars = 2
narrative_tag = f"名人/热点 ({', '.join(matched_kw[:3])})"
elif is_flap:
stars = 2
narrative_tag = "FLAP社区币"
else:
# 检查是否全新叙事
theme = normalize_theme(token['name'], token['symbol'])
theme_words = [w for w in theme.split() if w not in COMMON_NOISE_WORDS and len(w) > 2]
if len(theme_words) >= 2:
stars = 2
narrative_tag = f"叙事: {theme}"
else:
stars = 1
narrative_tag = "无明确叙事"
# 生成推送
desc_info = fetch_token_description(token['chain'], addr)
# FLAP币额外标注社区/CTO信息
if is_flap:
has_twitter = bool(desc_info.get('twitter'))
has_tg = bool(desc_info.get('telegram'))
has_web = bool(desc_info.get('website'))
community_tags = []
if has_twitter:
community_tags.append("有推特")
if has_tg:
community_tags.append("有TG群")
if has_web:
community_tags.append("有官网")
if community_tags:
narrative_tag += f" | {' '.join(community_tags)}"
stars = min(3, stars + 1) # 有社区加一星
else:
narrative_tag += " | 无社区链接"
msg = format_momentum_alert(token, pct_gain, len(recent), vol_increasing, stars, narrative_tag, desc_info, signal_count)
alerts.append({'msg': msg, 'token': token})
MOMENTUM_PUSHED[addr] = push_info
log(f"[动量信号{signal_count}] {token['name']} ({token['symbol']}) on {token['chain']} — 连涨{len(recent)}轮 +{pct_gain:.1f}%")
# 清理不再出现的币
stale = [a for a in MOMENTUM_TRACKER if a not in current_addrs]
for a in stale:
if now - MOMENTUM_TRACKER[a][-1]['ts'] > 600: # 10分钟没出现就清理
del MOMENTUM_TRACKER[a]
# 清理推送记录 — 1小时没出现的清掉
MOMENTUM_PUSHED = {k: v for k, v in MOMENTUM_PUSHED.items() if now - v.get('last_ts', 0) < 3600}
return alerts
def format_momentum_alert(token, pct_gain, rounds, vol_up, stars, narrative_tag, desc_info=None, seen_count=0):
"""持续上涨动量推送 — 带叙事星级"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
vol_tag = "放量" if vol_up else ""
star_str = "★" * stars + "☆" * (3 - stars)
# 信号编号从标题移到下面
msg = f"链上雷达\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# 故事描述
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"叙事: {narrative_tag}\n"
msg += f"连涨{rounds}轮 +{pct_gain:.1f}% {vol_tag}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```\n"
msg += f"评星: {star_str} 出现次数: {seen_count}"
# 社交链接
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
def format_celebrity_alert(token, matched_kw, desc_info=None):
"""名人/推特热点推送 ★★"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"链上雷达 — 名人/热点 ★★\n"
msg += f"链: {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f"故事: {desc}\n\n"
msg += f"命中关键词: {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f"市值 ${token['mc']:>12,.0f}\n"
msg += f"流动性 ${token['liq']:>12,.0f}\n"
msg += f"1h涨幅 {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f"聪明钱 {token['sm']:>12d}\n"
msg += f"币龄 {token['age_h']:>10.1f}h\n"
msg += f"```"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# 核心扫描逻辑
# ============================================================
def scan_narratives():
"""主扫描函数"""
conn = init_db()
tokens = fetch_new_tokens()
log(f"扫描 {len(tokens)} 个新币...")
# === 动量追踪 — 每轮更新所有币的快照,检测持续上涨 ===
# 拉FLAP币一起喂进动量追踪器
flap_tokens = []
try:
flap_tokens = fetch_flap_tokens()
except:
pass
all_momentum_tokens = tokens + flap_tokens
momentum_alerts = track_momentum(all_momentum_tokens)
for token in tokens:
addr = token['address']
chain = token['chain']
name = token['name']
symbol = token['symbol']
# 已扫描过的 — 更新seen_count和narratives的token_count,但不重复推
if is_token_seen(conn, addr):
# 更新seen_count
c = conn.cursor()
c.execute('UPDATE tokens_seen SET seen_count = seen_count + 1, market_cap = ? WHERE address = ?', (token['mc'], addr))
# 更新narratives表的token_count(按主题)
theme_tmp = normalize_theme(name, symbol)
if theme_tmp:
c.execute('UPDATE narratives SET token_count = token_count + 1, last_seen_at = ? WHERE theme = ?', (int(time.time()), theme_tmp))
conn.commit()
continue
# 分类叙事
category, matched_kw = classify_narrative(name, symbol, chain)
if category == 'spam':
record_token(conn, addr, chain, name, symbol, '', 'spam', token['mc'])
continue
# 基本质量门槛(防止推太多垃圾)
min_mc = 1000
min_liq = 500
if token['mc'] < min_mc or token['liq'] < min_liq:
record_token(conn, addr, chain, name, symbol, '', 'too_small', token['mc'])
continue
theme = normalize_theme(name, symbol)
# 所有分类只记录,不直接推送 — 推送统一走动量引擎
record_token(conn, addr, chain, name, symbol, theme, category, token['mc'])
check_narrative_novelty(conn, theme, name, symbol, addr, chain)
conn.close()
# === 推送动量信号 ===
pushed = 0
for ma in momentum_alerts[:8]: # 单轮最多推8个
if tg_send(ma['msg']):
pushed += 1
time.sleep(1) # 避免TG限流
return pushed, len(momentum_alerts)
# ============================================================
# 主循环
# ============================================================
def main():
log("=" * 50)
log("链上雷达 v1 启动")
log(f"扫描间隔: {SCAN_INTERVAL}s")
log(f"推送逻辑: 动量优先 — 连涨才推,叙事只做分类标签")
log("=" * 50)
# 初始化DB
init_db()
# 启动通知
tg_send(
"链上雷达 v1 已启动\n\n"
"核心逻辑: 动量优先\n"
"连涨3轮+涨幅>5%才推送\n"
"叙事只做分类标签:\n"
"★★★ 马斯克/川普 | 币安/CZ | FLAP有社区\n"
"★★ 名人热点 | FLAP无社区 | 有叙事\n"
"★ 无明确叙事\n\n"
f"扫描频率: 每{SCAN_INTERVAL}秒"
)
scan_count = 0
total_pushed = 0
while True:
try:
scan_count += 1
pushed, found = scan_narratives()
total_pushed += pushed
if pushed > 0:
log(f"第{scan_count}轮: 发现{found}个, 推送{pushed}个 (累计推送{total_pushed})")
else:
if scan_count % 20 == 0: # 每20轮报一次无信号
log(f"第{scan_count}轮: 无新信号 (累计推送{total_pushed})")
except Exception as e:
log(f"扫描异常: {e}")
time.sleep(SCAN_INTERVAL)
if __name__ == '__main__':
main()
OI + 费率转负扫描器
发布日期: 2026.04.25 标签: Python · Binance Futures · Telegram
费率正转负 + OI 放大 · 实时扫描
快照对比:费率刚从正变负且OI在涨时秒推TG。每5分钟运行,零AI成本。
完整源码
#!/usr/bin/env python3
"""
OI持续放大 + 费率由正转负 扫描器
- 每分钟运行一次
- 检测: OI持续放大(4段递增, 总涨幅>8%) + 费率由正转负
- 去重: 同一币种24小时内只推一次
- 纯API零成本
"""
import requests
import json
import os
import time
import sys
from datetime import datetime, timedelta
from pathlib import Path
# ============ 配置 ============
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env.oi"
ALERT_HISTORY_FILE = SCRIPT_DIR / "oi_funding_alerts.json"
FR_SNAPSHOT_FILE = SCRIPT_DIR / "fr_snapshot.json" # 上一次费率快照
# 信号参数
MIN_OI_CHANGE_PCT = 8 # OI总涨幅最低8%
MIN_VOLUME_USDT = 0 # 无门槛,全扫
MIN_FR_PERIODS_POSITIVE = 2 # 转负前至少2期为正
DEDUP_HOURS = 24 # 去重窗口24小时
# ============ 加载TG配置 ============
def load_env():
env = {}
if ENV_FILE.exists():
for line in ENV_FILE.read_text().strip().split('\n'):
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k.strip()] = v.strip()
return env
env = load_env()
TG_BOT_TOKEN = env.get('TG_BOT_TOKEN', '')
TG_CHAT_ID = env.get('TG_CHAT_ID', '')
# ============ TG推送 ============
def send_tg(text):
if not TG_BOT_TOKEN or not TG_CHAT_ID:
print("[TG] 未配置, 仅打印:")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
# 分段发送(TG限制4096字)
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
for chunk in chunks:
try:
resp = requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk,
'parse_mode': 'Markdown'
}, timeout=10)
if resp.status_code != 200:
# fallback无格式
requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk
}, timeout=10)
except Exception as e:
print(f"[TG] 发送失败: {e}")
# ============ 去重 ============
def load_alert_history():
if ALERT_HISTORY_FILE.exists():
try:
return json.loads(ALERT_HISTORY_FILE.read_text())
except:
return {}
return {}
def save_alert_history(history):
ALERT_HISTORY_FILE.write_text(json.dumps(history))
def is_duplicate(symbol, history):
if symbol not in history:
return False
last_alert = datetime.fromisoformat(history[symbol])
return (datetime.now() - last_alert).total_seconds() < DEDUP_HOURS * 3600
def mark_alerted(symbol, history):
history[symbol] = datetime.now().isoformat()
# 清理过期记录
cutoff = datetime.now() - timedelta(hours=DEDUP_HOURS * 2)
history = {k: v for k, v in history.items()
if datetime.fromisoformat(v) > cutoff}
return history
# ============ 费率快照 ============
def load_fr_snapshot():
if FR_SNAPSHOT_FILE.exists():
try:
return json.loads(FR_SNAPSHOT_FILE.read_text())
except:
pass
return {}
def save_fr_snapshot(snapshot):
FR_SNAPSHOT_FILE.write_text(json.dumps(snapshot))
# ============ 核心扫描 ============
def scan():
ts_start = time.time()
# 1. 获取所有永续合约
try:
info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo', timeout=10).json()
symbols = [s['symbol'] for s in info['symbols']
if s['contractType'] == 'PERPETUAL' and s['quoteAsset'] == 'USDT' and s['status'] == 'TRADING']
except Exception as e:
print(f"[ERROR] exchangeInfo: {e}")
return []
# 2. 批量获取24h行情(过滤低量币)
try:
tickers = requests.get('https://fapi.binance.com/fapi/v1/ticker/24hr', timeout=10).json()
ticker_map = {t['symbol']: t for t in tickers}
except Exception as e:
print(f"[ERROR] ticker: {e}")
return []
active = [s for s in symbols if float(ticker_map.get(s, {}).get('quoteVolume', 0)) > MIN_VOLUME_USDT]
# 3. 批量获取当前费率 (一次拿全部)
try:
fr_all = requests.get('https://fapi.binance.com/fapi/v1/premiumIndex', timeout=10).json()
fr_current = {item['symbol']: float(item['lastFundingRate']) for item in fr_all}
except:
fr_current = {}
# 4. 加载上次快照,对比找"刚转负"的
prev_snapshot = load_fr_snapshot()
# 保存本次快照(供下次对比)
save_fr_snapshot(fr_current)
if not prev_snapshot:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 首次运行,保存快照,下次开始对比")
return []
# 找出: 上次>=0, 这次<0 的币
just_turned_negative = []
for sym in active:
prev_fr = prev_snapshot.get(sym)
curr_fr = fr_current.get(sym)
if prev_fr is None or curr_fr is None:
continue
if prev_fr >= 0 and curr_fr < 0:
just_turned_negative.append(sym)
if not just_turned_negative:
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] 扫描完成: {len(active)}币/{elapsed:.1f}s, 无新转负")
return []
print(f"[{datetime.now().strftime('%H:%M:%S')}] 发现 {len(just_turned_negative)} 个刚转负: {just_turned_negative}")
# 5. 只对刚转负的币查OI
signals = []
for sym in just_turned_negative:
try:
# OI历史
oi_hist = requests.get('https://fapi.binance.com/futures/data/openInterestHist',
params={'symbol': sym, 'period': '1h', 'limit': 48}, timeout=10).json()
oi_chg = 0
segs = []
oi_rising = False
if oi_hist and len(oi_hist) >= 12:
oi_values = [float(x['sumOpenInterestValue']) for x in oi_hist]
seg_len = len(oi_values) // 4
if seg_len >= 3:
segs = [
sum(oi_values[:seg_len]) / seg_len,
sum(oi_values[seg_len:seg_len*2]) / seg_len,
sum(oi_values[seg_len*2:seg_len*3]) / seg_len,
sum(oi_values[seg_len*3:]) / max(1, len(oi_values[seg_len*3:]))
]
oi_chg = (segs[3] - segs[0]) / segs[0] * 100 if segs[0] > 0 else 0
oi_rising = oi_chg > 0
t = ticker_map.get(sym, {})
signals.append({
'symbol': sym,
'price': float(t.get('lastPrice', 0)),
'price_chg_24h': float(t.get('priceChangePercent', 0)),
'volume': float(t.get('quoteVolume', 0)),
'oi_change': oi_chg,
'oi_segments': segs,
'oi_rising': oi_rising,
'current_fr': fr_current.get(sym, 0),
'prev_fr': prev_snapshot.get(sym, 0),
})
except:
continue
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] 扫描完成: {len(active)}币/{elapsed:.1f}s, 信号: {len(signals)}")
return signals
# ============ 附加信息 ============
def get_square_discussion(coin):
"""查询币安广场该币的帖子数和浏览量"""
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v4/friendly/pgc/content/queryByHashtag",
params={"hashtag": f"#{coin.lower()}", "pageIndex": 1, "pageSize": 1, "orderBy": "HOT"},
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.binance.com/en/square"},
timeout=8
)
if r.status_code == 200:
ht = r.json().get("data", {}).get("hashtag", {})
return ht.get("contentCount", 0), ht.get("viewCount", 0)
except:
pass
return 0, 0
def get_market_caps():
"""获取币安流通市值"""
mcap = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap[name] = float(mc)
except:
pass
return mcap
def get_spot_symbols():
"""获取有现货的币种"""
try:
info = requests.get("https://api.binance.com/api/v3/exchangeInfo", timeout=10).json()
return {s["baseAsset"] for s in info["symbols"]
if s["quoteAsset"] == "USDT" and s["status"] == "TRADING"}
except:
return set()
def fmt_mcap(v):
if v >= 1e9: return f"${v/1e9:.2f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def fmt_views(v):
if v >= 1e6: return f"{v/1e6:.1f}M"
if v >= 1e3: return f"{v/1e3:.0f}K"
return str(v)
# ============ 格式化推送 ============
def format_alert(signals):
if not signals:
return None
# OI在涨的排前面,同组内按费率绝对值排序
signals.sort(key=lambda x: (-int(x.get('oi_rising', False)), x['current_fr']))
# 批量获取附加信息
mcap_map = get_market_caps()
spot_set = get_spot_symbols()
now = datetime.now().strftime('%m-%d %H:%M')
lines = [f"*[ 费率刚转负+OI涨 ]* {now}\n"]
for s in signals:
coin = s['symbol'].replace('USDT', '')
# 费率: 上期→本期
fr_change = f"{s['prev_fr']:+.4%} -> {s['current_fr']:+.4%}"
# 附加信息
mcap = mcap_map.get(coin, 0)
has_spot = coin in spot_set
sq_posts, sq_views = get_square_discussion(coin)
lines.append(f"```")
lines.append(f"{coin}")
lines.append(f" 价格: {s['price']:.4f} 24h: {s['price_chg_24h']:+.1f}%")
lines.append(f" 费率: {fr_change}")
if s['oi_segments']:
oi_segs = ' > '.join([f"{v/1e6:.1f}M" for v in s['oi_segments']])
lines.append(f" OI: +{s['oi_change']:.1f}% ({oi_segs})")
lines.append(f" 成交额: ${s['volume']/1e6:.1f}M")
lines.append(f" 市值: {fmt_mcap(mcap) if mcap > 0 else '未知'} 现货: {'有' if has_spot else '仅合约'}")
if sq_posts > 0:
lines.append(f" 广场: {sq_posts}帖 / {fmt_views(sq_views)}浏览")
else:
lines.append(f" 广场: 无讨论")
lines.append(f"```")
return '\n'.join(lines)
# ============ 主逻辑 ============
def main():
signals = scan()
if signals:
# 只推: 费率当前为负 + OI在涨 (最强组合)
strong = [s for s in signals if s['current_fr'] < 0 and s.get('oi_rising')]
if strong:
msg = format_alert(strong)
if msg:
send_tg(msg)
print(f" 推送 {len(strong)} 个信号 (总{len(signals)}个转负, {len(strong)}个OI也涨)")
else:
print(f" {len(signals)} 个转负但无OI在涨的, 跳过")
else:
print(f" 无信号")
if __name__ == '__main__':
main()
热度做多雷达
发布日期: 2026.04.25 标签: Python · Binance · CoinGlass · Telegram
热度排行 + OI 异动 + 智能推送
每小时扫描:热度币追踪、OI异动检测、TG推送。纯Python零AI成本。
完整源码
#!/usr/bin/env python3
"""
热度做多雷达 v2 — 热度+费率+OI 三维扫描
核心逻辑(拉哪模式):
1. 热度先行 → CG热搜+放量=资金涌入信号
2. 负费率=空头燃料,庄家拉盘爆空单
3. OI暴涨=大资金建仓=即将拉盘
单策略:发现热度→小仓做多→严格止损→拿住赢家
数据源:币安合约API + CoinGecko Trending(零成本)
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
from square_heat import get_square_heat
# === 加载 .env ===
env_file = Path(__file__).parent / ".env.oi"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
# === 配置 ===
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "YOUR_CHAT_ID")
FAPI = "https://fapi.binance.com"
# 热度历史记录(用于检测首次上榜)
HEAT_HISTORY_FILE = Path(__file__).parent / "heat_history.json"
# 热度参数
VOL_SURGE_MULT = 2.5 # 成交量放大2.5倍以上=放量
MIN_VOL_USD = 20_000_000 # 日均成交>$20M才检测放量
# OI异动参数
MIN_OI_DELTA_PCT = 3.0 # OI变化至少3%
MIN_OI_USD = 2_000_000 # 最低OI门槛 $2M
def api_get(endpoint, params=None):
"""币安API请求"""
url = f"{FAPI}{endpoint}"
for attempt in range(3):
try:
resp = requests.get(url, params=params, timeout=10)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 429:
time.sleep(2)
else:
return None
except:
time.sleep(1)
return None
def format_usd(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def mcap_str(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.0f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def send_telegram(text):
"""发送TG消息"""
if not TG_BOT_TOKEN:
print("\n[TG] No token, stdout:\n")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
# 分段发送(TG限制4096字)
chunks = []
current = ""
for line in text.split("\n"):
if len(current) + len(line) + 1 > 3800:
chunks.append(current)
current = line
else:
current += "\n" + line if current else line
if current:
chunks.append(current)
for chunk in chunks:
try:
resp = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk,
"parse_mode": "Markdown"
}, timeout=10)
if resp.status_code == 200:
print(f"[TG] Sent ✓ ({len(chunk)} chars)")
else:
# Markdown失败就用纯文本
resp2 = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk.replace("*", "").replace("_", ""),
}, timeout=10)
print(f"[TG] Sent plain ({'✓' if resp2.status_code == 200 else '✗'})")
except Exception as e:
print(f"[TG] Error: {e}")
time.sleep(0.5)
def main():
print(f"🔥 热度做多雷达 v2 — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 1. 全市场行情+费率
tickers_raw = api_get("/fapi/v1/ticker/24hr")
premiums_raw = api_get("/fapi/v1/premiumIndex")
if not tickers_raw or not premiums_raw:
print("❌ API失败")
return
ticker_map = {}
for t in tickers_raw:
if t["symbol"].endswith("USDT"):
ticker_map[t["symbol"]] = {
"px_chg": float(t["priceChangePercent"]),
"vol": float(t["quoteVolume"]),
"price": float(t["lastPrice"]),
}
funding_map = {}
for p in premiums_raw:
if p["symbol"].endswith("USDT"):
funding_map[p["symbol"]] = float(p["lastFundingRate"])
# 2. 真实流通市值(币安现货API)
mcap_map = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap_map[name] = float(mc)
print(f"✅ 真实市值: {len(mcap_map)}个币")
except Exception as e:
print(f"⚠️ 市值API失败: {e}")
# 3. 热度检测:币安广场热搜 + CoinGecko Trending + 成交量暴增
heat_map = {}
cg_trending = set()
square_trending = set()
# 3a. 币安广场热搜(6H)— 最重要!币安用户=交易用户
sq_coins = get_square_heat()
if sq_coins:
for i, c in enumerate(sq_coins):
coin = c["coin"]
square_trending.add(coin)
# 排名越靠前分越高,急速上升额外加分
rank_score = max(50 - i * 4, 10)
if c.get("rapidRiser"):
rank_score += 15
heat_map[coin] = heat_map.get(coin, 0) + rank_score
print(f"🏦 广场热搜: {len(square_trending)}个币 {[c['coin'] for c in sq_coins[:5]]}")
# 3b. CoinGecko Trending
try:
r = requests.get("https://api.coingecko.com/api/v3/search/trending", timeout=10)
if r.status_code == 200:
for item in r.json().get("coins", []):
sym = item["item"]["symbol"].upper()
rank = item["item"].get("score", 99)
cg_trending.add(sym)
heat_map[sym] = heat_map.get(sym, 0) + max(50 - rank * 3, 10)
print(f"🌐 CG Trending: {len(cg_trending)}个币")
except Exception as e:
print(f"⚠️ CG Trending失败: {e}")
# 成交量暴增检测
vol_surge_coins = set()
for sym, tk in ticker_map.items():
coin = sym.replace("USDT", "")
vol_24h = tk["vol"]
if vol_24h > MIN_VOL_USD:
kl = api_get("/fapi/v1/klines", {"symbol": sym, "interval": "1d", "limit": 8})
if kl and len(kl) >= 5:
avg_prev = sum(float(k[7]) for k in kl[:-1]) / (len(kl) - 1)
if avg_prev > 0:
ratio = vol_24h / avg_prev
if ratio >= VOL_SURGE_MULT:
vol_surge_coins.add(coin)
heat_map[coin] = heat_map.get(coin, 0) + min(ratio * 10, 50)
time.sleep(0.05)
print(f"📈 放量(≥{VOL_SURGE_MULT}x): {len(vol_surge_coins)}个币")
# 双重/三重热度
dual_heat = cg_trending & vol_surge_coins
square_vol = square_trending & vol_surge_coins
triple_heat = cg_trending & vol_surge_coins & square_trending
all_multi_heat = dual_heat | square_vol
if all_multi_heat:
for coin in all_multi_heat:
heat_map[coin] = heat_map.get(coin, 0) + 20
if triple_heat:
for coin in triple_heat:
heat_map[coin] = heat_map.get(coin, 0) + 30 # 三重热度超级加分
print(f"🔥🔥🔥 三重热度: {triple_heat}")
else:
print(f"🔥🔥 双重热度: {all_multi_heat}")
# 4. OI扫描(Top100成交量 + 热度币)
scan_syms = set()
# 热度币必扫
for coin in heat_map:
sym = coin + "USDT"
if sym in ticker_map:
scan_syms.add(sym)
# Top100成交量
top_by_vol = sorted(ticker_map.items(), key=lambda x: x[1]["vol"], reverse=True)[:100]
for sym, _ in top_by_vol:
scan_syms.add(sym)
oi_map = {}
for i, sym in enumerate(scan_syms):
oi_hist = api_get("/futures/data/openInterestHist", {
"symbol": sym, "period": "1h", "limit": 6
})
if oi_hist and len(oi_hist) >= 2:
curr = float(oi_hist[-1]["sumOpenInterestValue"])
prev_1h = float(oi_hist[-2]["sumOpenInterestValue"])
prev_6h = float(oi_hist[0]["sumOpenInterestValue"])
d1h = ((curr - prev_1h) / prev_1h * 100) if prev_1h > 0 else 0
d6h = ((curr - prev_6h) / prev_6h * 100) if prev_6h > 0 else 0
oi_map[sym] = {"oi_usd": curr, "d1h": d1h, "d6h": d6h}
if (i + 1) % 10 == 0:
time.sleep(0.5)
print(f"📊 OI扫描: {len(oi_map)}个币")
# 5. 整合所有数据
all_syms = set(list(ticker_map.keys()))
coin_data = {}
for sym in all_syms:
tk = ticker_map.get(sym, {})
if not tk:
continue
oi = oi_map.get(sym, {})
fr = funding_map.get(sym, 0)
coin = sym.replace("USDT", "")
d6h = oi.get("d6h", 0)
fr_pct = fr * 100
oi_usd = oi.get("oi_usd", 0)
# 真实市值优先,fallback粗估
if coin in mcap_map:
est_mcap = mcap_map[coin]
else:
est_mcap = max(tk["vol"] * 0.3, oi_usd * 2) if oi_usd > 0 else tk["vol"] * 0.3
heat = heat_map.get(coin, 0)
coin_data[sym] = {
"coin": coin, "sym": sym,
"px_chg": tk["px_chg"], "vol": tk["vol"],
"fr_pct": fr_pct, "d6h": d6h,
"oi_usd": oi_usd, "est_mcap": est_mcap,
"heat": heat,
"in_cg": coin in cg_trending,
"in_sq": coin in square_trending,
"vol_surge": coin in vol_surge_coins,
}
# ═══════════════════════════════════════
# 热度榜
# ═══════════════════════════════════════
hot_coins = sorted(
[d for d in coin_data.values() if d["heat"] > 0],
key=lambda x: x["heat"], reverse=True
)
# 检测首次上榜
heat_history = {}
if HEAT_HISTORY_FILE.exists():
try:
heat_history = json.loads(HEAT_HISTORY_FILE.read_text())
except:
pass
now_ts = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
new_entries = [] # 首次上榜的币
for s in hot_coins:
coin = s["coin"]
if coin not in heat_history:
# 首次上榜!
heat_history[coin] = {"first_seen": now_ts, "price": s.get("px_chg", 0)}
sources = []
if s["in_sq"]: sources.append("广场")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("放量")
new_entries.append({"coin": coin, "sources": sources, "data": s})
# 清理超过7天的历史(避免文件无限增长)
cutoff = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=7)).strftime("%Y-%m-%d")
heat_history = {k: v for k, v in heat_history.items()
if v.get("first_seen", "9999") >= cutoff}
# 保存历史
HEAT_HISTORY_FILE.write_text(json.dumps(heat_history, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════
# 追多:负费率+在涨
# ═══════════════════════════════════════
chase = []
for sym, d in coin_data.items():
if d["px_chg"] > 3 and d["fr_pct"] < -0.005 and d["vol"] > 1_000_000:
fr_hist = api_get("/fapi/v1/fundingRate", {"symbol": sym, "limit": 5})
fr_rates = [float(f["fundingRate"]) * 100 for f in fr_hist] if fr_hist else [d["fr_pct"]]
fr_prev = fr_rates[-2] if len(fr_rates) >= 2 else d["fr_pct"]
fr_delta = d["fr_pct"] - fr_prev
trend = "加速恶化" if fr_delta < -0.05 else "转负" if fr_delta < -0.01 else "持平" if abs(fr_delta) < 0.01 else "回升"
chase.append({**d, "fr_delta": fr_delta, "trend": trend,
"rates": " → ".join([f"{x:.3f}" for x in fr_rates[-3:]])})
time.sleep(0.2)
chase.sort(key=lambda x: x["fr_pct"])
# ═══════════════════════════════════════
# 生成推送
# ═══════════════════════════════════════
now = datetime.now(timezone(timedelta(hours=8)))
lines = [
f"**热度做多雷达**",
f"{now.strftime('%Y-%m-%d %H:%M')} CST",
]
# 热度榜(表格)
# 首次上榜放最前面
if new_entries:
lines.append(f"\n**[ 首次上榜 ]** 新出现的热度币,重点关注")
tbl = ["```"]
tbl.append(f"{'币种':<10} {'市值':>8} {'涨幅':>7} {'来源'}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for e in new_entries:
s = e["data"]
src_str = "/".join(e["sources"])
tbl.append(f"{s['coin']:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
if hot_coins:
lines.append(f"\n**[ 热度榜 ]**")
tbl = ["```"]
tbl.append(f"{'币种':<10} {'市值':>8} {'涨幅':>7} {'来源'}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for s in hot_coins[:10]:
sources = []
if s["in_sq"]: sources.append("广场")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("放量")
extra = []
if abs(s["d6h"]) >= 3: extra.append(f"OI{s['d6h']:+.0f}%")
if s["fr_pct"] < -0.03: extra.append(f"费率{s['fr_pct']:.2f}%")
src_str = "/".join(sources)
if extra:
src_str += " " + " ".join(extra)
coin_name = s['coin']
tbl.append(f"{coin_name:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append("\n**[ 热度榜 ]** 暂无热点")
# 追多(表格)
lines.append(f"\n**[ 追多 ]** 涨了+费率负=空头燃料")
if chase:
tbl = ["```"]
tbl.append(f"{'币种':<10} {'费率':>10} {'趋势':>8} {'涨幅':>7} {'市值':>8}")
tbl.append(f"{'-'*10} {'-'*10} {'-'*8} {'-'*7} {'-'*8}")
for s in chase[:8]:
tbl.append(
f"{s['coin']:<10} {s['fr_pct']:>+9.3f}% {s['trend']:>8} {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append(" 暂无符合条件的标的")
# OI异动(表格)
oi_alerts = []
for sym, oi in oi_map.items():
if abs(oi["d6h"]) >= 8:
d = coin_data.get(sym)
if d and d["heat"] == 0:
oi_alerts.append(d)
oi_alerts.sort(key=lambda x: abs(x["d6h"]), reverse=True)
if oi_alerts:
lines.append(f"\n**[ OI异动 ]** 6小时持仓变化>=8%")
tbl = ["```"]
tbl.append(f"{'币种':<10} {'方向':>4} {'OI变化':>8} {'涨幅':>7} {'市值':>8}")
tbl.append(f"{'-'*10} {'-'*4} {'-'*8} {'-'*7} {'-'*8}")
for s in oi_alerts[:6]:
direction = "增仓" if s["d6h"] > 0 else "减仓"
tbl.append(
f"{s['coin']:<10} {direction:>4} {s['d6h']:>+7.1f}% {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
# 值得关注
highlights = []
hot_oi = [d for d in coin_data.values() if d["heat"] > 0 and d["d6h"] > 5]
for s in sorted(hot_oi, key=lambda x: x["d6h"], reverse=True)[:3]:
highlights.append(f"{s['coin']} — 热度高+OI涨{s['d6h']:+.0f}%,资金涌入")
hot_fuel = [d for d in coin_data.values() if d["heat"] > 0 and d["fr_pct"] < -0.03]
for s in sorted(hot_fuel, key=lambda x: x["fr_pct"])[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — 热度高+费率{s['fr_pct']:.2f}%,空头燃料足")
chase_fire = [s for s in chase[:5] if "加速" in s.get("trend", "")]
for s in chase_fire[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — 费率{s['fr_pct']:.3f}%持续恶化,逼空在即")
if highlights:
lines.append(f"\n**[ 值得关注 ]**")
for h in highlights[:5]:
lines.append(f" {h}")
lines.append(f"\n广场=币安站内搜索 / CG=CoinGecko全球热度")
lines.append(f"费率负=做空多,庄家拉盘爆空头")
report = "\n".join(lines)
send_telegram(report)
print("\n✅ 完成")
if __name__ == "__main__":
main()
币安 Alpha 公告监控
发布日期: 2026.04.23 标签: Python · Binance · Claude AI · Telegram
WebSocket 实时监听 · AI 分析 · TG 推送
自动检测币安Alpha新上币公告,Claude AI分析代币质量,TG实时推送。
完整源码
#!/usr/bin/env python3
"""
Binance Alpha Monitor v2 — 稳定版
REST轮询 + 智能过滤 + 评级 + TG推送
零API Key,零AI成本(评级用规则引擎)
运行: python3 alpha_monitor.py
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import httpx
# ============================================================
# 配置
# ============================================================
BASE_DIR = Path(__file__).parent
DB_PATH = str(BASE_DIR / "data" / "alpha.db")
# TG
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# LLM (可选,用于抽取叙事,不配置则降级为规则)
# 优先从hermes auth.json读credential pool(与scanner一致)
def _load_anthropic_key():
"""从auth.json或环境变量获取API key"""
# 1. hermes auth.json credential pool
try:
auth_file = os.path.expanduser("~/.hermes/auth.json")
if os.path.exists(auth_file):
with open(auth_file) as f:
auth = json.load(f)
pool = auth.get("credential_pool", {}).get("anthropic", [])
if pool:
key = pool[0].get("access_token", "")
if key and key != "***":
return key
except Exception:
pass
# 2. 环境变量 fallback
return os.environ.get("ANTHROPIC_API_KEY", "")
ANTHROPIC_API_KEY = _load_anthropic_key()
ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")
# 轮询间隔
ANNOUNCEMENT_POLL_INTERVAL = 30 # 公告轮询30秒
AGGREGATION_POLL_INTERVAL = 15 # 聚合工作者15秒
MONITOR_POLL_INTERVAL = 120 # 上线后监控2分钟
# HTTP
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
}
BINANCE_ANNOUNCEMENT_API = "https://www.binance.com/bapi/composite/v1/public/cms/article/list/query"
# ============================================================
# 日志
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("alpha")
# ============================================================
# 过滤规则
# ============================================================
# 触发关键词 — 命中任意一个就触发
TRIGGER_KEYWORDS = [
"alpha", "空投", "airdrop", "tge", "token generation",
"将上线", "will list", "will launch",
"独家", "exclusive", "binance wallet", "hodler",
]
# 排除关键词 — 命中任意一个直接排除
EXCLUDE_KEYWORDS = [
"delisting", "delist", "下架", "deprecate", "退市",
"maintenance", "维护",
"launchpool", "megadrop",
"buyback", "回购",
"已完成", "完成结算",
"perpetual contract", # 永续合约
"futures will launch", # 期货上新
"usdⓢ-margined", # U本位合约
"coin-margined", # 币本位合约
"margin will add", # 杠杆上新
"trading bots services", # 交易机器人
"trading pairs", # 交易对调整(非新币)
]
# Alpha Box 盲盒
ALPHA_BOX_KEYWORDS = ["alpha box", "盲盒", "mystery box"]
# ============================================================
# VC 白名单
# ============================================================
TIER1_VCS = [
"binance labs", "yzi labs",
"coinbase ventures", "a16z", "andreessen horowitz", "paradigm",
"polychain", "polychain capital", "sequoia", "sequoia china", "sequoia capital",
"multicoin", "multicoin capital", "pantera", "pantera capital",
"dragonfly", "dragonfly capital", "founders fund",
]
TIER2_VCS = [
"abcde", "iosg", "hashkey", "okx ventures",
"sevenx", "folius", "foresight", "hashed",
"bitkraft", "framework", "framework ventures",
"delphi", "delphi digital", "electric capital",
"variant", "1kx", "placeholder",
"animoca", "animoca brands", "jump", "jump crypto",
"hack vc", "bain capital",
]
# 叙事热度
HOT_NARRATIVES = ["defi_perp", "ai_agent", "ai_defi", "defai", "zk_proof"]
WEAK_NARRATIVES = ["gamefi", "meme", "social"]
# 币安亲儿子信号
BINANCE_DARLING_KEYWORDS = ["yzi labs", "binance labs"]
# ============================================================
# 评级引擎
# ============================================================
TIER_ICONS = {"S": "🟢🟢🟢", "A": "🟡🟡", "B": "🟠", "C": "⚪"}
TIER_LABELS = {"S": "S 级(必研究)", "A": "A 级(值得看)", "B": "B 级(正常)", "C": "C 级(了解)"}
def count_vc_tier(vcs: list, vc_list: list) -> int:
count = 0
vcs_lower = [v.lower() for v in vcs]
for t in vc_list:
if any(t in v for v in vcs_lower):
count += 1
return count
def rate_project(circ_mcap: float, fdv: float, vcs: list,
narrative: str, is_darling: bool) -> dict:
"""S/A/B/C 评级"""
t1 = count_vc_tier(vcs, TIER1_VCS)
t2 = count_vc_tier(vcs, TIER2_VCS)
hot = narrative in HOT_NARRATIVES
weak = narrative in WEAK_NARRATIVES
circ_mcap = circ_mcap or 0
fdv = fdv or 0
warnings = []
if weak:
warnings.append(f"⚠️ {narrative} 历史破发率较高")
# S级 5条路径
if is_darling:
return {"tier": "S", "reason": "币安亲儿子(YZi/Binance Labs/CZ)", "warnings": warnings}
if hot and t1 >= 1 and fdv < 500_000_000:
return {"tier": "S", "reason": f"热叙事({narrative})+ Tier1 VC", "warnings": warnings}
if t1 >= 2 and circ_mcap < 50_000_000 and fdv < 300_000_000:
return {"tier": "S", "reason": "≥2家 Tier1 中盘", "warnings": warnings}
if t1 >= 1 and circ_mcap < 10_000_000 and fdv < 100_000_000:
return {"tier": "S", "reason": "Tier1 微盘", "warnings": warnings}
if hot and circ_mcap < 10_000_000 and fdv < 50_000_000:
return {"tier": "S", "reason": f"热叙事({narrative})微盘", "warnings": warnings}
# A级
if t1 >= 1 and circ_mcap < 20_000_000 and fdv < 200_000_000:
return {"tier": "A", "reason": "Tier1 小盘", "warnings": warnings}
# B级
if circ_mcap < 50_000_000 and fdv < 500_000_000:
return {"tier": "B", "reason": "中盘", "warnings": warnings}
return {"tier": "C", "reason": "大盘/弱信号", "warnings": warnings}
# ============================================================
# 数据库
# ============================================================
def init_db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
symbol TEXT NOT NULL,
name TEXT,
launch_time TEXT,
source TEXT,
raw_text TEXT,
tier TEXT DEFAULT 'PENDING',
tier_reason TEXT,
narrative TEXT,
narrative_desc TEXT,
vcs_json TEXT DEFAULT '[]',
is_darling INTEGER DEFAULT 0,
open_price REAL,
total_supply REAL,
circulating_supply REAL,
fdv REAL,
circulating_mcap REAL,
excluded INTEGER DEFAULT 0,
exclude_reason TEXT,
discovered_at TEXT,
updated_at TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS pushes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
push_type TEXT,
sent_at TEXT,
content TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
price REAL,
circulating_mcap REAL,
fdv REAL
)""")
conn.commit()
conn.close()
def project_id(symbol: str, date_str: str) -> str:
return hashlib.md5(f"{symbol.upper()}_{date_str}".encode()).hexdigest()[:16]
def project_exists(pid: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute("SELECT 1 FROM projects WHERE id=?", (pid,)).fetchone() is not None
conn.close()
return exists
def save_project(project: dict):
conn = sqlite3.connect(DB_PATH)
now = datetime.utcnow().isoformat()
conn.execute("""
INSERT OR IGNORE INTO projects
(id, symbol, name, launch_time, source, raw_text, tier, tier_reason,
narrative, narrative_desc, vcs_json, is_darling,
open_price, total_supply, circulating_supply, fdv, circulating_mcap,
excluded, exclude_reason, discovered_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
project["id"], project["symbol"], project.get("name"),
project.get("launch_time"), project.get("source"), project.get("raw_text"),
project.get("tier", "PENDING"), project.get("tier_reason"),
project.get("narrative"), project.get("narrative_desc"),
json.dumps(project.get("vcs", [])), int(project.get("is_darling", False)),
project.get("open_price"), project.get("total_supply"),
project.get("circulating_supply"), project.get("fdv"),
project.get("circulating_mcap"),
int(project.get("excluded", 0)), project.get("exclude_reason"),
now, now,
))
conn.commit()
conn.close()
def update_project(pid: str, fields: dict):
if not fields:
return
conn = sqlite3.connect(DB_PATH)
fields["updated_at"] = datetime.utcnow().isoformat()
set_parts = [f"{k}=?" for k in fields]
values = list(fields.values()) + [pid]
conn.execute(f"UPDATE projects SET {','.join(set_parts)} WHERE id=?", values)
conn.commit()
conn.close()
def get_project(pid: str) -> Optional[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM projects WHERE id=?", (pid,)).fetchone()
conn.close()
return dict(row) if row else None
def list_pending() -> list:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM projects WHERE excluded=0 AND tier='PENDING' ORDER BY discovered_at"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_active() -> list:
"""上线后需要监控的项目(非PENDING非EXCLUDED,有launch_time)"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM projects
WHERE excluded=0 AND launch_time IS NOT NULL AND launch_time != ''
AND tier NOT IN ('PENDING', 'EXCLUDED', 'ERROR')
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def has_pushed(pid: str, push_type: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute(
"SELECT 1 FROM pushes WHERE project_id=? AND push_type=?", (pid, push_type)
).fetchone() is not None
conn.close()
return exists
def log_push(pid: str, push_type: str, content: str):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO pushes (project_id, push_type, sent_at, content) VALUES (?,?,?,?)",
(pid, push_type, datetime.utcnow().isoformat(), content)
)
conn.commit()
conn.close()
def save_snapshot(pid: str, price: float, mcap: float, fdv: float):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO snapshots (project_id, timestamp, price, circulating_mcap, fdv) VALUES (?,?,?,?,?)",
(pid, datetime.utcnow().isoformat(), price, mcap, fdv)
)
conn.commit()
conn.close()
# ============================================================
# TG 推送
# ============================================================
async def send_tg(text: str, silent: bool = False) -> bool:
if not TG_BOT_TOKEN or not TG_CHAT_ID:
logger.error("TG_BOT_TOKEN 或 TG_CHAT_ID 未配置")
return False
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_notification": silent,
}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.post(url, json=payload)
if resp.status_code != 200:
logger.error(f"TG发送失败 {resp.status_code}: {resp.text[:200]}")
return False
return True
except Exception as e:
logger.error(f"TG发送异常: {e}")
return False
# ============================================================
# 公告标题解析
# ============================================================
def is_trigger(title: str) -> tuple[bool, Optional[str]]:
t = title.lower()
for kw in EXCLUDE_KEYWORDS:
if kw.lower() in t:
return False, f"排除: {kw}"
for kw in ALPHA_BOX_KEYWORDS:
if kw.lower() in t:
return False, "Alpha Box 盲盒"
for kw in TRIGGER_KEYWORDS:
if kw.lower() in t:
return True, None
return False, None
def extract_symbol(title: str) -> Optional[str]:
# 英文括号: "Chip (CHIP)"
m = re.search(r"\(([A-Z0-9]{2,10})\)", title)
if m:
return m.group(1)
# 中文括号
m = re.search(r"(([A-Z0-9]{2,10}))", title)
if m:
return m.group(1)
return None
def extract_name(title: str) -> Optional[str]:
patterns = [
r"(?:上线|List|list|Launch|launch|featured)\s+([A-Za-z0-9 ]+?)\s*[\((]",
]
for p in patterns:
m = re.search(p, title, re.IGNORECASE)
if m:
return m.group(1).strip()
return None
# ============================================================
# 币安公告抓取
# ============================================================
async def fetch_announcements(limit: int = 20) -> list:
"""抓取币安最新公告"""
all_articles = []
# 48: New Cryptocurrency Listing, 161: Latest Activities, 93: Latest News
for catalog_id in [48, 161, 93]:
params = {"type": 1, "catalogId": catalog_id, "pageNo": 1, "pageSize": limit}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.get(BINANCE_ANNOUNCEMENT_API, params=params)
resp.raise_for_status()
data = resp.json()
for catalog in data.get("data", {}).get("catalogs", []):
for a in catalog.get("articles", []):
a["_catalog_id"] = catalog_id
all_articles.append(a)
except Exception as e:
logger.warning(f"抓取分类 {catalog_id} 失败: {e}")
# 去重
seen = set()
unique = []
for a in all_articles:
code = a.get("code")
if code and code not in seen:
seen.add(code)
unique.append(a)
return unique
# ============================================================
# CoinGecko 数据
# ============================================================
async def fetch_coingecko(symbol: str, project_name: str = "") -> dict:
"""查CoinGecko代币经济数据。双重匹配:symbol精确匹配 + 项目名模糊匹配"""
result = {"found": False, "price": None, "fdv": None, "mcap": None,
"total_supply": None, "circ_supply": None, "chain": None, "contract": None}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
# 搜索策略:先搜symbol,如果项目名不同再搜项目名
queries = [symbol]
if project_name and project_name.upper() != symbol.upper():
queries.append(project_name)
coin_id = None
best_rank = 999999
name_exact_match = None # 项目名精确匹配优先
for query in queries:
resp = await client.get("https://api.coingecko.com/api/v3/search",
params={"query": query})
if resp.status_code != 200:
continue
coins = resp.json().get("coins", [])
for c in coins:
c_sym = c.get("symbol", "").upper()
c_name = c.get("name", "").lower()
c_rank = c.get("market_cap_rank") or 999999
# 项目名精确匹配(最高优先级,如MegaETH -> megaeth)
if project_name and c_name == project_name.lower():
name_exact_match = c["id"]
# 精确symbol匹配 — 取market_cap_rank最高(最小)的
if c_sym == symbol.upper():
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
# 项目名模糊匹配(如MegaETH搜mega)
if project_name and project_name.lower() in c_name:
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
# 项目名精确匹配 > 其他匹配
if name_exact_match:
coin_id = name_exact_match
if not coin_id:
logger.info(f"CoinGecko未找到 {symbol}/{project_name}")
return result
logger.info(f"CoinGecko匹配: {symbol} -> {coin_id} (rank={best_rank})")
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code == 429:
await asyncio.sleep(5)
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code != 200:
return result
d = resp2.json()
md = d.get("market_data", {})
result.update({
"found": True,
"price": (md.get("current_price") or {}).get("usd"),
"fdv": (md.get("fully_diluted_valuation") or {}).get("usd"),
"mcap": (md.get("market_cap") or {}).get("usd"),
"total_supply": md.get("total_supply"),
"circ_supply": md.get("circulating_supply"),
})
# 提取categories(含VC信息如"YZi Labs Portfolio")和description
result["categories"] = d.get("categories", [])
result["description"] = (d.get("description") or {}).get("en", "")[:500]
platforms = d.get("platforms", {})
for chain, addr in platforms.items():
if addr:
result["chain"] = chain
result["contract"] = addr
break
# CoinGecko没价格时(新币常见),从币安合约/现货补充
if not result["price"]:
binance_price = await _fetch_binance_price(symbol, client)
if binance_price:
result["price"] = binance_price
ts = result.get("total_supply") or 0
cs = result.get("circ_supply") or 0
if ts > 0:
result["fdv"] = binance_price * ts
if cs > 0:
result["mcap"] = binance_price * cs
logger.info(f"币安补充价格 {symbol}: ${binance_price}, FDV=${result.get('fdv',0):,.0f}")
except Exception as e:
logger.warning(f"CoinGecko查询失败 {symbol}: {e}")
return result
async def _fetch_binance_price(symbol: str, client: httpx.AsyncClient) -> float:
"""从币安现货或合约获取价格"""
pair = f"{symbol.upper()}USDT"
# 1. 现货
try:
resp = await client.get(f"https://api.binance.com/api/v3/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
# 2. 合约
try:
resp = await client.get(f"https://fapi.binance.com/fapi/v1/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
return 0.0
# ============================================================
# LLM 叙事抽取(可选,降级为规则)
# ============================================================
async def llm_extract(raw_text: str, symbol: str, name: str = "", cg_data: dict = None) -> dict:
"""用LLM从公告+CoinGecko数据抽取叙事/VC/是否亲儿子"""
fallback = {
"narrative": "unknown", "narrative_desc": "",
"vcs": [], "is_darling": False, "exclude_reason": None,
}
# 从CoinGecko categories自动提取信息
cg_data = cg_data or {}
categories = cg_data.get("categories", [])
description = cg_data.get("description", "")
# 自动检测亲儿子(从categories)
darling_cats = [c for c in categories if any(kw in c.lower() for kw in ["yzi labs", "binance labs"])]
if darling_cats:
fallback["is_darling"] = True
if not ANTHROPIC_API_KEY:
# 降级:从标题+categories关键词猜
t = raw_text.lower()
for kw in BINANCE_DARLING_KEYWORDS:
if kw in t:
fallback["is_darling"] = True
# 从categories猜叙事
cat_str = " ".join(categories).lower()
if "defi" in cat_str: fallback["narrative"] = "defi"
elif "ai" in cat_str: fallback["narrative"] = "ai_agent"
elif "gaming" in cat_str or "gamefi" in cat_str: fallback["narrative"] = "gamefi"
elif "meme" in cat_str: fallback["narrative"] = "meme"
elif "rwa" in cat_str or "real world" in cat_str: fallback["narrative"] = "rwa"
return fallback
# 构建丰富的上下文
extra_context = ""
if categories:
extra_context += f"\nCoinGecko分类: {', '.join(categories)}"
if description:
extra_context += f"\n项目描述: {description[:300]}"
if cg_data.get("found"):
extra_context += f"\n市场数据: FDV=${cg_data.get('fdv',0):,.0f}, MCap=${cg_data.get('mcap',0):,.0f}, 价格=${cg_data.get('price',0)}"
if cg_data.get("chain"):
extra_context += f", 链={cg_data['chain']}"
system = "你是加密货币研究员,从币安公告和项目数据中提取关键信息。只返回JSON,无其他文字。"
user = f"""分析这个币安上新项目:
代币: {symbol}, 项目名: {name or "未知"}
公告原文: {raw_text}
{extra_context}
返回JSON:
{{
"narrative": "defi_perp|ai_agent|ai_defi|defai|zk_proof|infra|defi|rwa|gamefi|meme|social|stablecoin|unknown",
"narrative_desc": "一句话中文描述这个项目做什么、有什么特点",
"vcs": ["从CoinGecko分类和公告中提取的投资机构列表"],
"is_darling": true/false,
"exclude_reason": null|"already_tge"|"meme_only"
}}
判断规则:
- narrative: 选最主要的一个类别
- vcs: CoinGecko分类里如果有 "XXX Portfolio" 就提取XXX作为机构
- is_darling: 如果有YZi Labs/Binance Labs投资 或 CZ/何一站台 则true
- exclude_reason: 只有当项目在其他主要CEX(如Coinbase/OKX/Bybit)上线超过3个月才算"already_tge"。如果只是在DEX或刚在币安上线,不算already_tge。CoinGecko有价格数据不代表already_tge。纯meme无叙事则"meme_only"
"""
try:
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(
f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": ANTHROPIC_MODEL,
"max_tokens": 800,
"temperature": 0,
"system": system,
"messages": [{"role": "user", "content": user}],
}
)
if resp.status_code != 200:
logger.warning(f"LLM调用失败 {resp.status_code}")
return fallback
data = resp.json()
text = ""
for block in data.get("content", []):
if block.get("type") == "text":
text = block.get("text", "")
break
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(lines[1:-1])
return json.loads(text)
except Exception as e:
logger.warning(f"LLM抽取异常: {e}")
return fallback
# ============================================================
# 消息格式化
# ============================================================
def _fmt_mcap(v):
if not v:
return "N/A"
if v >= 1e9:
return f"${v/1e9:.1f}B"
if v >= 1e6:
return f"${v/1e6:.1f}M"
if v >= 1e3:
return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def _fmt_price(v):
if not v:
return "N/A"
if v >= 1:
return f"${v:.2f}"
if v >= 0.01:
return f"${v:.4f}"
return f"${v:.6f}"
def fmt_discovery(p: dict) -> str:
tier = p.get("tier", "C")
icon = TIER_ICONS.get(tier, "⚪")
label = TIER_LABELS.get(tier, "")
symbol = p["symbol"]
name = p.get("name") or ""
vcs = json.loads(p.get("vcs_json", "[]")) if isinstance(p.get("vcs_json"), str) else p.get("vcs", [])
lines = [
f"{icon} <b>Alpha 首发 · ${symbol}</b> {icon}",
f"📋 {label}",
"",
f"<b>{name}</b>" if name else "",
]
if p.get("narrative_desc"):
lines.append(f"💡 {p['narrative_desc']}")
if p.get("narrative") and p["narrative"] != "unknown":
lines.append(f"🏷 叙事: {p['narrative']}")
lines.append("")
if p.get("fdv"):
lines.append(f"📊 FDV: {_fmt_mcap(p['fdv'])}")
if p.get("circulating_mcap"):
lines.append(f"📊 流通市值: {_fmt_mcap(p['circulating_mcap'])}")
if p.get("open_price"):
lines.append(f"💰 预估开盘价: {_fmt_price(p['open_price'])}")
if p.get("total_supply") and p.get("circulating_supply"):
pct = p["circulating_supply"] / p["total_supply"] * 100
lines.append(f"📦 初始流通: {pct:.1f}%")
if vcs:
lines.append("")
lines.append("🏛 <b>机构</b>")
for v in vcs[:5]:
is_t1 = any(t in v.lower() for t in TIER1_VCS)
lines.append(f" {'⭐' if is_t1 else '·'} {v}")
if p.get("is_darling"):
lines.append("")
lines.append("🔥 <b>币安亲儿子</b>")
if p.get("tier_reason"):
lines.append("")
lines.append(f"🎯 {p['tier_reason']}")
lines.append("")
lines.append(f"<i>📌 来源: {p.get('source', 'binance')}</i>")
if p.get("raw_text"):
lines.append(f"<i>{p['raw_text'][:120]}</i>")
return "\n".join(l for l in lines if l is not None)
def fmt_countdown(p: dict, minutes: int) -> str:
icon = TIER_ICONS.get(p.get("tier", "C"), "⚪")
t = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
lines = [
f"{icon} <b>倒计时提醒</b>",
f"<b>${p['symbol']}</b> · {p.get('name', '')}",
f"⏰ 距上线还有 <b>{t}</b>",
]
if p.get("fdv"):
lines.append(f"FDV: {_fmt_mcap(p['fdv'])}")
if minutes <= 30:
lines.append("🔔 <b>准备下单</b>")
return "\n".join(lines)
def fmt_launch(p: dict, price: float, mcap: float, fdv: float) -> str:
lines = [
f"🚀 <b>${p['symbol']} 已上线</b>",
f"开盘价: <b>{_fmt_price(price)}</b>",
f"流通市值: <b>{_fmt_mcap(mcap)}</b>",
f"FDV: <b>{_fmt_mcap(fdv)}</b>",
]
return "\n".join(lines)
def fmt_periodic(p: dict, idx: int, price: float, mcap: float, change_pct: float) -> str:
arrow = "📈" if change_pct > 0 else "📉"
minutes = 30 * idx
lines = [
f"⏱ <b>${p['symbol']} · +{minutes}min</b>",
f"流通市值: {_fmt_mcap(mcap)} ({arrow} {change_pct:+.1f}%)",
f"当前价: {_fmt_price(price)}",
]
if change_pct >= 100:
lines.append("💡 <b>已翻倍,考虑分批止盈</b>")
elif change_pct <= -30:
lines.append("⚠️ 跌幅较大,评估是否止损")
return "\n".join(lines)
def fmt_anomaly(p: dict, atype: str, price: float, change_pct: float) -> str:
emoji = {"double": "🚀", "halve": "🔻"}.get(atype, "⚡")
desc = {"double": "市值翻倍", "halve": "市值腰斩"}.get(atype, "异动")
return f"{emoji} <b>${p['symbol']} {desc}</b>\n变化: {change_pct:+.1f}%\n当前价: {_fmt_price(price)}"
# ============================================================
# 核心逻辑: 公告监听
# ============================================================
async def announcement_listener():
"""轮询币安公告,发现新Alpha项目"""
logger.info(f"📡 公告监听启动 · 轮询 {ANNOUNCEMENT_POLL_INTERVAL}s")
while True:
try:
articles = await fetch_announcements()
new_count = 0
for art in articles:
title = art.get("title", "")
triggered, reason = is_trigger(title)
if not triggered:
continue
symbol = extract_symbol(title)
if not symbol:
continue
# 用发布日期去重
release_ts = art.get("releaseDate")
release_iso = datetime.fromtimestamp(release_ts / 1000).isoformat() if release_ts else ""
launch_date = release_iso[:10] if release_iso else datetime.utcnow().date().isoformat()
pid = project_id(symbol, launch_date)
if project_exists(pid):
continue
project = {
"id": pid,
"symbol": symbol,
"name": extract_name(title),
"launch_time": release_iso,
"source": "binance_announcement",
"raw_text": title,
"tier": "PENDING",
"vcs": [],
"is_darling": False,
"excluded": 0,
}
save_project(project)
new_count += 1
logger.info(f"🆕 发现 ${symbol}: {title[:80]}")
if new_count:
logger.info(f"本轮发现 {new_count} 个新项目")
except Exception as e:
logger.error(f"公告监听异常: {e}", exc_info=True)
await asyncio.sleep(ANNOUNCEMENT_POLL_INTERVAL)
# ============================================================
# 核心逻辑: 聚合 + 推送
# ============================================================
async def aggregation_worker():
"""对PENDING项目做数据聚合、评级、推送"""
logger.info(f"🧠 聚合工作者启动 · 轮询 {AGGREGATION_POLL_INTERVAL}s")
while True:
try:
pending = list_pending()
for p in pending:
symbol = p["symbol"]
try:
logger.info(f"📦 聚合 ${symbol}")
# 1. CoinGecko
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
await asyncio.sleep(1) # 避免限流
# 2. LLM抽取叙事(传入CoinGecko数据增强分析)
llm = await llm_extract(p.get("raw_text", ""), symbol, p.get("name"), cg_data=cg)
await asyncio.sleep(1)
# 3. 判断是否排除
if llm.get("exclude_reason") in ("already_tge", "meme_only"):
update_project(p["id"], {
"excluded": 1,
"exclude_reason": llm["exclude_reason"],
"tier": "EXCLUDED",
})
logger.info(f"⏭ ${symbol} 排除: {llm['exclude_reason']}")
continue
# 4. 数据校验 — CoinGecko数据可能匹配错或项目太新没数据
cg_fdv = cg.get("fdv", 0) or 0
cg_mcap = cg.get("mcap", 0) or 0
data_suspect = False
data_warnings = []
# 上币安的项目FDV不可能低于$100K(除非CoinGecko匹配错了)
if cg.get("found") and cg_fdv > 0 and cg_fdv < 100_000:
data_suspect = True
data_warnings.append(f"FDV=${cg_fdv:,.0f}异常低,可能CoinGecko匹配错误")
logger.warning(f"⚠️ {symbol} FDV=${cg_fdv:,.0f} 异常低,数据可能不准")
# 流通100%的大项目也要警惕(新代币通常不会100%流通)
if (cg.get("circ_supply") and cg.get("total_supply")
and cg["circ_supply"] == cg["total_supply"]
and cg_fdv < 1_000_000):
data_suspect = True
data_warnings.append("流通=总量且FDV极低,可能是同名垃圾币")
if data_suspect:
# 标记数据不可靠,评级时不使用CoinGecko的FDV/MCap
cg_fdv = 0
cg_mcap = 0
logger.warning(f"⚠️ {symbol} CoinGecko数据不可靠,评级使用LLM判断为主")
# 5. 评级
is_darling = llm.get("is_darling", False)
vcs = llm.get("vcs", [])
narrative = llm.get("narrative", "unknown")
rating = rate_project(
cg_mcap, cg_fdv,
vcs, narrative, is_darling
)
# 6. 更新DB(data_suspect时不存CoinGecko的错误数据)
update_project(p["id"], {
"tier": rating["tier"],
"tier_reason": rating["reason"],
"narrative": narrative,
"narrative_desc": llm.get("narrative_desc", ""),
"vcs_json": json.dumps(vcs),
"is_darling": int(is_darling),
"open_price": cg.get("price") if not data_suspect else None,
"total_supply": cg.get("total_supply") if not data_suspect else None,
"circulating_supply": cg.get("circ_supply") if not data_suspect else None,
"fdv": cg_fdv if cg_fdv else None,
"circulating_mcap": cg_mcap if cg_mcap else None,
})
# 6. 推送discovery
full = get_project(p["id"])
if full and not has_pushed(p["id"], "discovery"):
text = fmt_discovery(full)
silent = rating["tier"] in ("B", "C")
ok = await send_tg(text, silent=silent)
if ok:
log_push(p["id"], "discovery", text)
logger.info(f"✅ 推送 ${symbol} [{rating['tier']}]")
except Exception as e:
logger.error(f"聚合 {symbol} 失败: {e}", exc_info=True)
update_project(p["id"], {"tier": "ERROR", "tier_reason": str(e)[:100]})
except Exception as e:
logger.error(f"聚合循环异常: {e}", exc_info=True)
await asyncio.sleep(AGGREGATION_POLL_INTERVAL)
# ============================================================
# 核心逻辑: 上线后监控
# ============================================================
async def post_launch_monitor():
"""倒计时提醒 + 上线瞬间 + 30min×4跟踪 + 异动"""
logger.info(f"📊 上线监控启动 · 轮询 {MONITOR_POLL_INTERVAL}s")
while True:
try:
projects = list_active()
for p in projects:
try:
await _monitor_project(p)
except Exception as e:
logger.error(f"监控 {p['symbol']} 异常: {e}")
except Exception as e:
logger.error(f"监控循环异常: {e}", exc_info=True)
await asyncio.sleep(MONITOR_POLL_INTERVAL)
async def _monitor_project(p: dict):
pid = p["id"]
symbol = p["symbol"]
launch_str = p.get("launch_time", "")
if not launch_str:
return
try:
launch = datetime.fromisoformat(launch_str.replace("Z", "").split("+")[0])
except:
return
now = datetime.utcnow()
delta_sec = (launch - now).total_seconds()
# T-3h
if 3*3600 - 300 <= delta_sec <= 3*3600 + 300:
if not has_pushed(pid, "t_minus_3h"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=p.get("tier") in ("B", "C"))
if ok:
log_push(pid, "t_minus_3h", text)
# T-30m
elif 30*60 - 150 <= delta_sec <= 30*60 + 150:
if not has_pushed(pid, "t_minus_30m"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "t_minus_30m", text)
# 上线瞬间
elif -300 <= delta_sec <= 0:
if not has_pushed(pid, "at_launch"):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
text = fmt_launch(p, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "at_launch", text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
# 上线后30min × 4
elif 0 < -delta_sec <= 2.5 * 3600:
minutes_after = int(-delta_sec / 60)
for idx, target in enumerate([30, 60, 90, 120], 1):
if abs(minutes_after - target) <= 5:
ptype = f"post_30m_{idx}"
if not has_pushed(pid, ptype):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
open_price = p.get("open_price") or cg["price"]
change = ((cg["price"] - open_price) / open_price * 100) if open_price else 0
text = fmt_periodic(p, idx, cg["price"], cg.get("mcap", 0), change)
ok = await send_tg(text, silent=p.get("tier") in ("B", "C") and idx > 1)
if ok:
log_push(pid, ptype, text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
# 异动
if change >= 100 and not has_pushed(pid, "anomaly_double"):
t = fmt_anomaly(p, "double", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_double", t)
elif change <= -50 and not has_pushed(pid, "anomaly_halve"):
t = fmt_anomaly(p, "halve", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_halve", t)
break
# ============================================================
# 启动
# ============================================================
async def main():
init_db()
logger.info(f"📂 数据库: {DB_PATH}")
# 测试TG
ok = await send_tg("🎉 <b>Alpha Monitor v2 启动</b>\n\n📡 币安公告监听中...\n🔔 有新Alpha会立即推送")
if ok:
logger.info("✅ TG推送正常")
else:
logger.warning("⚠️ TG推送失败,检查配置")
tasks = [
asyncio.create_task(announcement_listener(), name="announcements"),
asyncio.create_task(aggregation_worker(), name="aggregator"),
asyncio.create_task(post_launch_monitor(), name="monitor"),
]
logger.info("=" * 50)
logger.info("🚀 Alpha Monitor v2 启动完成")
logger.info(f" 📡 公告轮询: {ANNOUNCEMENT_POLL_INTERVAL}s")
logger.info(f" 🧠 LLM: {'Sonnet' if ANTHROPIC_API_KEY else '降级(规则)'}")
logger.info(f" 🔔 TG: {'✅' if TG_BOT_TOKEN else '❌'}")
logger.info("=" * 50)
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
for t in tasks:
t.cancel()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
AI自主交易
合约+Alpha代币 自主交易 v1
发布日期: 2026.04.29 标签: Python · Binance Futures · Autonomous · Telegram
AI独立完成 扫描→分析→虚拟开仓→盯盘→平仓→复盘 完整交易循环
⚠️ 风险提示:本代码仅运行过虚拟盘,没有任何实盘交易经验,不能作为接入实盘的交易依据。本代码未经过真实资金的实际交易验证,使用本代码时请务必谨慎,风险自担。
AI每30秒全自动扫描币安全市场合约,发现异常信号后独立分析并虚拟开仓。4种信号检测策略(费率极端深负→做多逼空、费率极端正→做空、暴涨后回落做空、暴跌企稳反弹)。开仓前必须通过多维度综合环境检查(BTC环境+Fear&Greed情绪+OI关注度+成交量活跃度),得分>=3/7才开仓。每30秒自动检查持仓止损止盈。
当前成绩(诚实公布):已平仓4笔,胜率75%,收益+13.94U(初始100U)。但利润集中在一笔(IR做空+45%),去掉这笔基本持平。持仓风险分散不够,容易同方向同逻辑重仓。综合判断仍是规则打分,不是真正独立思考。
做不到:不能像人类交易员理解叙事转折,不能判断信号背后深层原因,容易重复犯同类错误,无法处理突发事件。
训练路径:扫描→开仓→平仓→复盘→发现问题→改进策略→再循环。目标:从规则执行器进化为独立思考的交易员。
完整源码
#!/usr/bin/env python3
"""
市场扫描器 - 每分钟运行
纯Python零AI成本,发现异常信号自动开仓
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_FILE = os.path.join(SCRIPT_DIR, "trades.json")
SCANNER_STATE = os.path.join(SCRIPT_DIR, "scanner_state.json")
SCANNER_LOG = os.path.join(SCRIPT_DIR, "scanner.log")
INITIAL_BALANCE = 100.0
TZ_UTC8 = timezone(timedelta(hours=8))
# === 配置 ===
MAX_OPEN_POSITIONS = 3 # 最多同时持仓
POSITION_PCT = 30 # 每笔仓位占比%
LEVERAGE = 3 # 杠杆
COOLDOWN_HOURS = 4 # 同一币种冷却时间
MIN_VOLUME_M = 10 # 最小24h成交额(百万U)
# === TG推送 ===
def load_tg_config():
"""Load TG config from environment variables or .env file"""
env = {}
# Try .env in script directory, then current directory
for env_path in [
os.path.join(SCRIPT_DIR, ".env"),
os.path.join(os.getcwd(), ".env"),
]:
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v.strip().strip('"').strip("'")
break
# OS environment variables override file
for key in ['TG_BOT_TOKEN', 'TELEGRAM_BOT_TOKEN', 'TG_CHAT_ID']:
val = os.environ.get(key)
if val:
env[key] = val
return env
def send_tg(text):
try:
env = load_tg_config()
token = env.get('TG_BOT_TOKEN', env.get('TELEGRAM_BOT_TOKEN', ''))
if not token:
return
chat_id = env.get('TG_CHAT_ID', '')
if not chat_id:
return
url = f"https://api.telegram.org/bot{token}/sendMessage"
requests.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
}, timeout=10)
except:
pass
# === 数据加载 ===
def load_trades():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"initial_balance": INITIAL_BALANCE, "trades": []}
def save_trades(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_state():
if os.path.exists(SCANNER_STATE):
with open(SCANNER_STATE, "r") as f:
return json.load(f)
return {"last_opens": {}, "signals_seen": {}}
def save_state(state):
with open(SCANNER_STATE, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def get_balance(data):
balance = data.get("initial_balance", INITIAL_BALANCE)
for t in data["trades"]:
if t["status"] == "closed" and t["pnl_usd"] is not None:
balance += t["pnl_usd"]
return balance
def next_id(data):
if not data["trades"]:
return "001"
max_id = max(int(t["id"]) for t in data["trades"])
return f"{max_id + 1:03d}"
def now_str():
return datetime.now(TZ_UTC8).strftime("%Y-%m-%dT%H:%M:%S")
def log(msg):
ts = datetime.now(TZ_UTC8).strftime("%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(SCANNER_LOG, "a") as f:
f.write(line + "\n")
# === 币安API ===
def get_all_tickers():
url = "https://fapi.binance.com/fapi/v1/ticker/24hr"
resp = requests.get(url, timeout=10)
return resp.json()
def get_funding_rates():
"""获取所有币种最新费率"""
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
resp = requests.get(url, timeout=10)
return {item['symbol']: float(item['lastFundingRate']) * 100
for item in resp.json()}
def get_funding_history(symbol, limit=8):
url = f"https://fapi.binance.com/fapi/v1/fundingRate?symbol={symbol}&limit={limit}"
resp = requests.get(url, timeout=10)
return [float(item['fundingRate']) * 100 for item in resp.json()]
def get_open_interest(symbol):
url = f"https://fapi.binance.com/fapi/v1/openInterest?symbol={symbol}"
resp = requests.get(url, timeout=10)
data = resp.json()
return float(data['openInterest'])
def get_klines(symbol, interval="4h", limit=6):
url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
resp = requests.get(url, timeout=10)
return resp.json()
# === 信号检测 ===
def detect_extreme_negative_funding(symbol, funding_rate, funding_rates_map):
"""
策略1: 费率极端深负 → 做多(逼空)
条件: 当前费率<-0.08% 且 连续多期为负
"""
if funding_rate >= -0.08:
return None
try:
history = get_funding_history(symbol, 8)
neg_count = sum(1 for r in history if r < -0.03)
if neg_count < 4:
return None
avg_rate = sum(history) / len(history)
# 费率越极端,信号越强
strength = "S" if avg_rate < -0.15 else "A" if avg_rate < -0.10 else "B"
return {
"type": "extreme_neg_funding",
"direction": "long",
"strength": strength,
"reason": f"费率极端深负 avg:{avg_rate:.4f}% 连续{neg_count}/8期为负 逼空概率高",
"sl_pct": 0.08, # 止损8%
"tp_pct": 0.12, # 止盈12%
}
except:
return None
def detect_extreme_positive_funding(symbol, funding_rate, funding_rates_map):
"""
策略2: 费率极端正 → 做空(多头拥挤)
条件: 当前费率>0.10% 且 连续多期高正
"""
if funding_rate <= 0.10:
return None
try:
history = get_funding_history(symbol, 8)
pos_count = sum(1 for r in history if r > 0.05)
if pos_count < 4:
return None
avg_rate = sum(history) / len(history)
strength = "S" if avg_rate > 0.20 else "A" if avg_rate > 0.12 else "B"
return {
"type": "extreme_pos_funding",
"direction": "short",
"strength": strength,
"reason": f"费率极端正 avg:{avg_rate:.4f}% 连续{pos_count}/8期高正 多头过度拥挤",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
return None
def detect_crash_bounce(ticker):
"""
策略3: 暴跌后反弹(超跌反弹)
条件: 24h跌>25% 但最近4h企稳/反弹
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct >= -25:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
# 最近2根K线
recent_closes = [float(k[4]) for k in klines[-3:]]
# 企稳: 最近K线收盘 >= 前一根
if len(recent_closes) >= 2 and recent_closes[-1] >= recent_closes[-2]:
return {
"type": "crash_bounce",
"direction": "long",
"strength": "B", # 风险较高给B级
"reason": f"24h暴跌{change_pct:.1f}%后企稳 超跌反弹",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
pass
return None
def detect_pump_short(ticker):
"""
策略4: 暴涨后做空(ATH回落)
条件: 24h涨>40% — 根据生命周期模型,暴涨后回调概率>85%
需要确认已经开始回落(不在最高点做空)
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct <= 40:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
highs = [float(k[2]) for k in klines]
closes = [float(k[4]) for k in klines]
current = closes[-1]
peak = max(highs)
# 从最高点回落超过10%才做空
pullback = (peak - current) / peak * 100
if pullback < 10:
return None
strength = "A" if change_pct > 80 else "B"
return {
"type": "pump_short",
"direction": "short",
"strength": strength,
"reason": f"24h暴涨{change_pct:.1f}%后回落{pullback:.1f}% 历史回调概率>85%",
"sl_pct": 0.15, # 暴涨币波动大,止损宽一些
"tp_pct": 0.20,
}
except:
pass
return None
# === 综合环境检查 ===
def check_environment(symbol, signal):
"""
开仓前综合检查:不是单一信号触发就开,要多维度对齐
返回 (pass/fail, analysis_dict, adjusted_strength)
"""
analysis = {
"btc_env": "",
"sentiment": "",
"oi_check": "",
"volume_check": "",
"verdict": ""
}
score = 0 # 综合得分,>=3才开仓
try:
# 1. BTC环境 — 做多需要BTC不在暴跌,做空需要BTC不在暴涨
btc_url = "https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT"
btc = requests.get(btc_url, timeout=5).json()
btc_chg = float(btc['priceChangePercent'])
if signal["direction"] == "long":
if btc_chg > -2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 环境正常 +1"
elif btc_chg < -5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 暴跌中做多危险 -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 偏弱 0"
else: # short
if btc_chg < 2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 环境正常 +1"
elif btc_chg > 5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 暴涨中做空危险 -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 偏强 0"
# 2. 市场情绪(Fear & Greed)
try:
fng = requests.get("https://api.alternative.me/fng/", timeout=5).json()
fng_val = int(fng['data'][0]['value'])
if signal["direction"] == "long":
if fng_val <= 25:
score += 1
analysis["sentiment"] = f"FGI={fng_val}极度恐惧 逆向做多 +1"
elif fng_val >= 75:
score -= 1
analysis["sentiment"] = f"FGI={fng_val}极度贪婪 做多风险 -1"
else:
analysis["sentiment"] = f"FGI={fng_val}中性 0"
else:
if fng_val >= 75:
score += 1
analysis["sentiment"] = f"FGI={fng_val}极度贪婪 逆向做空 +1"
elif fng_val <= 25:
score -= 1
analysis["sentiment"] = f"FGI={fng_val}极度恐惧 做空风险 -1"
else:
analysis["sentiment"] = f"FGI={fng_val}中性 0"
except:
analysis["sentiment"] = "FGI获取失败 0"
# 3. OI变化 — 看该币OI是否支持方向
try:
oi = get_open_interest(symbol)
ticker = requests.get(f"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol={symbol}", timeout=5).json()
price = float(ticker['lastPrice'])
oi_usd = oi * price
if oi_usd > 5_000_000: # OI > 5M说明有关注度
score += 1
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 有关注度 +1"
else:
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 关注度低 0"
except:
analysis["oi_check"] = "OI获取失败 0"
# 4. 成交量 — 量能是否活跃
try:
vol = float(ticker.get('quoteVolume', 0))
if vol > 50_000_000:
score += 1
analysis["volume_check"] = f"24h量={vol/1e6:.0f}M 活跃 +1"
elif vol > 20_000_000:
analysis["volume_check"] = f"24h量={vol/1e6:.0f}M 一般 0"
else:
score -= 1
analysis["volume_check"] = f"24h量={vol/1e6:.0f}M 冷清 -1"
except:
analysis["volume_check"] = "量能获取失败 0"
# 5. 信号本身的强度加分
if signal["strength"] == "S":
score += 2
elif signal["strength"] == "A":
score += 1
# 综合判定: >=3通过
analysis["verdict"] = f"综合得分:{score}/7"
if score >= 3:
return True, analysis, signal["strength"]
else:
return False, analysis, signal["strength"]
except Exception as e:
analysis["verdict"] = f"检查异常:{e} 保守不开"
return False, analysis, signal["strength"]
# === 开仓执行 ===
def execute_open(data, state, symbol, price, signal):
"""执行虚拟开仓 — 先过综合环境检查"""
# 综合环境检查
passed, env_analysis, strength = check_environment(symbol, signal)
env_summary = " | ".join(v for v in env_analysis.values() if v)
if not passed:
log(f"综合检查未通过 {symbol}: {env_summary}")
return
log(f"综合检查通过 {symbol}: {env_summary}")
balance = get_balance(data)
position_usd = balance * POSITION_PCT / 100
if signal["direction"] == "long":
sl = round(price * (1 - signal["sl_pct"]), 6)
tp = round(price * (1 + signal["tp_pct"]), 6)
else:
sl = round(price * (1 + signal["sl_pct"]), 6)
tp = round(price * (1 - signal["tp_pct"]), 6)
trade = {
"id": next_id(data),
"symbol": symbol,
"direction": signal["direction"],
"leverage": LEVERAGE,
"position_pct": POSITION_PCT,
"position_usd": round(position_usd, 4),
"notional_usd": round(position_usd * LEVERAGE, 4),
"entry_price": price,
"stop_loss": sl,
"take_profit": tp,
"entry_time": now_str(),
"exit_price": None,
"exit_time": None,
"exit_reason": None,
"pnl_pct": None,
"pnl_usd": None,
"status": "open",
"pre_analysis": {
"btc_env": env_analysis.get("btc_env", ""),
"sentiment": env_analysis.get("sentiment", ""),
"oi": env_analysis.get("oi_check", ""),
"volume": env_analysis.get("volume_check", ""),
"key_reason": f"[{signal['strength']}级] {signal['reason']}",
"risk": f"综合得分:{env_analysis.get('verdict','')} 策略:{signal['type']}"
},
"post_review": None
}
data["trades"].append(trade)
save_trades(data)
# 记录冷却
state["last_opens"][symbol] = now_str()
save_state(state)
direction_cn = "做多" if signal["direction"] == "long" else "做空"
msg = f"""```
[扫描开仓] #{trade['id']}
币种: {symbol}
方向: {direction_cn} {LEVERAGE}x
入场: {price}
止损: {sl}
止盈: {tp}
仓位: {position_usd:.2f}U
信号: [{signal['strength']}] {signal['reason']}
时间: {trade['entry_time']}
```"""
log(f"开仓 #{trade['id']} {symbol} {direction_cn} @ {price} | {signal['reason']}")
send_tg(msg)
print(msg)
# === 换仓逻辑 ===
def swap_weakest(data, state, open_positions, new_signal, tickers):
"""满仓时遇到S级信号,平掉浮亏最大的持仓,开新仓"""
ticker_map = {t['symbol']: float(t['lastPrice']) for t in tickers}
# 计算每个持仓的浮盈%
worst_trade = None
worst_pnl = float('inf')
for t in open_positions:
price = ticker_map.get(t["symbol"])
if price is None:
continue
if t["direction"] == "long":
pnl_pct = (price - t["entry_price"]) / t["entry_price"] * 100
else:
pnl_pct = (t["entry_price"] - price) / t["entry_price"] * 100
if pnl_pct < worst_pnl:
worst_pnl = pnl_pct
worst_trade = t
worst_price = price
if worst_trade is None:
return
# 只换掉亏损的仓位,盈利的不动
if worst_pnl > 0:
log(f"满仓但所有持仓盈利,不换仓 | 新信号: {new_signal['symbol']}")
return
# 平掉最弱的
if worst_trade["direction"] == "long":
pnl_pct_lev = (worst_price - worst_trade["entry_price"]) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
else:
pnl_pct_lev = (worst_trade["entry_price"] - worst_price) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
pos_usd = worst_trade.get("position_usd", worst_trade.get("position_pct", 30))
pnl_usd = round(pnl_pct_lev / 100 * pos_usd, 4)
worst_trade["exit_price"] = worst_price
worst_trade["exit_time"] = now_str()
worst_trade["exit_reason"] = f"换仓→{new_signal['symbol']}"
worst_trade["pnl_pct"] = round(pnl_pct_lev, 2)
worst_trade["pnl_usd"] = pnl_usd
worst_trade["status"] = "closed"
save_trades(data)
direction_cn = "多" if worst_trade["direction"] == "long" else "空"
msg = f"""```
[换仓平仓] #{worst_trade['id']}
平掉: {worst_trade['symbol']} {direction_cn}
入场: {worst_trade['entry_price']}
出场: {worst_price}
盈亏: {pnl_pct_lev:+.2f}% ({pnl_usd:+.2f}U)
原因: S级信号{new_signal['symbol']}更强
```"""
log(f"换仓平 #{worst_trade['id']} {worst_trade['symbol']} {pnl_usd:+.2f}U → 开 {new_signal['symbol']}")
send_tg(msg)
# 开新仓
execute_open(data, state, new_signal["symbol"], new_signal["price"], new_signal)
# === 主扫描逻辑 ===
def scan():
data = load_trades()
state = load_state()
now = datetime.now(TZ_UTC8)
# 检查持仓数
open_positions = [t for t in data["trades"] if t["status"] == "open"]
open_symbols = set(t["symbol"] for t in open_positions)
if len(open_positions) >= MAX_OPEN_POSITIONS:
return
# 获取市场数据
try:
tickers = get_all_tickers()
funding_rates = get_funding_rates()
except Exception as e:
log(f"API错误: {e}")
return
# 过滤USDT合约 + 最小成交量
exclude = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "FDUSDUSDT", "BTCDOMUSDT", "BTCSTUSDT"}
candidates = [t for t in tickers
if t['symbol'].endswith('USDT')
and t['symbol'] not in exclude
and float(t['quoteVolume']) > MIN_VOLUME_M * 1e6]
signals = []
for ticker in candidates:
symbol = ticker['symbol']
# 跳过已持仓的币
if symbol in open_symbols:
continue
# 冷却检查
last_open = state.get("last_opens", {}).get(symbol)
if last_open:
try:
last_dt = datetime.fromisoformat(last_open)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=TZ_UTC8)
if (now - last_dt).total_seconds() < COOLDOWN_HOURS * 3600:
continue
except:
pass
fr = funding_rates.get(symbol, 0)
# 运行所有策略检测
for detect_fn in [
lambda s, f, m: detect_extreme_negative_funding(s, f, m),
lambda s, f, m: detect_extreme_positive_funding(s, f, m),
lambda s, f, m: detect_crash_bounce(ticker),
lambda s, f, m: detect_pump_short(ticker),
]:
try:
signal = detect_fn(symbol, fr, funding_rates)
if signal:
signal["symbol"] = symbol
signal["price"] = float(ticker['lastPrice'])
signal["volume_m"] = float(ticker['quoteVolume']) / 1e6
signals.append(signal)
except:
continue
if not signals:
return
# 按信号强度排序 S>A>B
strength_order = {"S": 0, "A": 1, "B": 2}
signals.sort(key=lambda x: strength_order.get(x["strength"], 3))
# 只取最强的信号开仓(一次最多开1笔)
best = signals[0]
# B级信号跳过,只开S和A级
if best["strength"] == "B":
log(f"B级信号跳过: {best['symbol']} {best['reason']}")
return
slots = MAX_OPEN_POSITIONS - len(open_positions)
if slots > 0:
execute_open(data, state, best["symbol"], best["price"], best)
elif best["strength"] == "S":
# 满仓但遇到S级信号 → 换掉最弱的持仓
swap_weakest(data, state, open_positions, best, tickers)
if __name__ == "__main__":
scan()
实用工具
VoiceKey — 声纹密钥
发布日期: 2026.04.27 标签: Python · Security · Telegram · Speaker Verification
用声纹替代密码,保护你的 AI 助手
为什么做这个?越来越多人用TG控制AI Agent(服务器、交易机器人、智能家居)。一旦TG账号被盗,攻击者可以用你的AI做任何事。密码可以被偷看、复制、社工,但声纹不行——每个人的声音特征是唯一的。VoiceKey要求每次新会话发一条语音验证身份,通过才放行。零AI成本,纯本地CPU运行。
完整源码
#!/usr/bin/env python3
"""
VoiceKey — 声纹密钥 (Voiceprint Authentication)
Speaker verification for Telegram bot security.
Uses resemblyzer (GE2E model) for speaker embedding extraction
and cosine similarity for verification.
Zero AI cost — runs entirely on local CPU.
Usage:
# Register voiceprint from audio files
python voicekey.py register --audio voice1.ogg voice2.ogg --owner "YourName"
# Verify a voice against stored voiceprint
python voicekey.py verify --audio test.ogg
# As a Python module
from voicekey import VoiceKey
vk = VoiceKey()
vk.register(["voice1.ogg", "voice2.ogg"], owner="YourName")
is_owner, score = vk.verify("test.ogg")
"""
import os
import json
import tempfile
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime
# Lazy imports to speed up module load when not needed
_encoder = None
def _get_encoder():
"""Lazy-load the voice encoder model (first call takes ~1s)."""
global _encoder
if _encoder is None:
from resemblyzer import VoiceEncoder
_encoder = VoiceEncoder()
return _encoder
def _audio_to_wav(audio_path: str) -> str:
"""Convert any audio format to 16kHz mono WAV for processing."""
from pydub import AudioSegment
ext = Path(audio_path).suffix.lower()
if ext in ('.ogg', '.oga'):
audio = AudioSegment.from_ogg(audio_path)
elif ext == '.mp3':
audio = AudioSegment.from_mp3(audio_path)
elif ext == '.wav':
return audio_path # already wav
elif ext in ('.m4a', '.aac'):
audio = AudioSegment.from_file(audio_path, format=ext.lstrip('.'))
else:
audio = AudioSegment.from_file(audio_path)
# Convert to 16kHz mono
audio = audio.set_frame_rate(16000).set_channels(1)
tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
audio.export(tmp.name, format='wav')
return tmp.name
def _extract_embedding(audio_path: str) -> np.ndarray:
"""Extract voice embedding from an audio file."""
from resemblyzer import preprocess_wav
encoder = _get_encoder()
wav_path = _audio_to_wav(audio_path)
wav = preprocess_wav(wav_path)
# Cleanup temp file
if wav_path != audio_path:
os.unlink(wav_path)
if len(wav) < 1600: # Less than 0.1s
raise ValueError(f"Audio too short: {len(wav)/16000:.1f}s (need >0.1s)")
return encoder.embed_utterance(wav)
class VoiceKey:
"""
Speaker verification using voice embeddings.
Attributes:
data_dir: Directory to store voiceprint data
threshold: Cosine similarity threshold for verification (default: 0.75)
voiceprint: The registered owner's voice embedding (256-dim vector)
"""
def __init__(self, data_dir: str = None, threshold: float = 0.75):
if data_dir is None:
data_dir = os.path.expanduser("~/.hermes/voiceprint")
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.threshold = threshold
self.voiceprint = None
self.metadata = {}
self._load()
def _load(self):
"""Load existing voiceprint if available."""
vp_path = self.data_dir / "voiceprint.npy"
meta_path = self.data_dir / "voiceprint_meta.json"
if vp_path.exists():
self.voiceprint = np.load(str(vp_path))
if meta_path.exists():
with open(meta_path) as f:
self.metadata = json.load(f)
self.threshold = self.metadata.get("threshold", self.threshold)
@property
def is_registered(self) -> bool:
"""Check if a voiceprint is registered."""
return self.voiceprint is not None
def register(self, audio_paths: list, owner: str = "owner") -> dict:
"""
Register a voiceprint from multiple audio samples.
Args:
audio_paths: List of audio file paths (ogg, mp3, wav, etc.)
owner: Name of the voiceprint owner
Returns:
dict with registration results
"""
embeddings = []
results = []
for path in audio_paths:
try:
embed = _extract_embedding(path)
embeddings.append(embed)
results.append({"file": os.path.basename(path), "status": "ok"})
except Exception as e:
results.append({"file": os.path.basename(path), "status": "error", "error": str(e)})
if not embeddings:
raise ValueError("No valid audio samples provided")
# Average embeddings and normalize
voiceprint = np.mean(embeddings, axis=0)
voiceprint = voiceprint / np.linalg.norm(voiceprint)
# Save
np.save(str(self.data_dir / "voiceprint.npy"), voiceprint)
self.metadata = {
"owner": owner,
"samples_used": len(embeddings),
"embedding_dim": int(voiceprint.shape[0]),
"threshold": self.threshold,
"created": datetime.now().isoformat(),
}
with open(self.data_dir / "voiceprint_meta.json", "w") as f:
json.dump(self.metadata, f, indent=2)
self.voiceprint = voiceprint
# Self-test
similarities = []
for embed in embeddings:
sim = float(np.dot(voiceprint, embed / np.linalg.norm(embed)))
similarities.append(sim)
return {
"owner": owner,
"samples": len(embeddings),
"self_test_scores": similarities,
"min_score": min(similarities),
"details": results,
}
def verify(self, audio_path: str) -> tuple:
"""
Verify if an audio sample matches the registered voiceprint.
Args:
audio_path: Path to audio file to verify
Returns:
tuple: (is_verified: bool, similarity_score: float)
"""
if not self.is_registered:
raise RuntimeError("No voiceprint registered. Call register() first.")
embed = _extract_embedding(audio_path)
embed = embed / np.linalg.norm(embed)
similarity = float(np.dot(self.voiceprint, embed))
is_verified = similarity >= self.threshold
return is_verified, similarity
def get_info(self) -> dict:
"""Get voiceprint registration info."""
if not self.is_registered:
return {"registered": False}
return {
"registered": True,
**self.metadata,
}
def main():
parser = argparse.ArgumentParser(
description="VoiceKey — Speaker verification for security"
)
sub = parser.add_subparsers(dest="command")
# Register
reg = sub.add_parser("register", help="Register voiceprint from audio files")
reg.add_argument("--audio", nargs="+", required=True, help="Audio files (ogg/mp3/wav)")
reg.add_argument("--owner", default="owner", help="Owner name")
reg.add_argument("--data-dir", default=None, help="Data directory")
reg.add_argument("--threshold", type=float, default=0.75, help="Verification threshold")
# Verify
ver = sub.add_parser("verify", help="Verify audio against voiceprint")
ver.add_argument("--audio", required=True, help="Audio file to verify")
ver.add_argument("--data-dir", default=None, help="Data directory")
# Info
sub.add_parser("info", help="Show voiceprint info")
args = parser.parse_args()
if args.command == "register":
vk = VoiceKey(data_dir=args.data_dir, threshold=args.threshold)
result = vk.register(args.audio, owner=args.owner)
print(f"Registered voiceprint for: {result['owner']}")
print(f"Samples used: {result['samples']}")
print(f"Self-test scores: {[f'{s:.4f}' for s in result['self_test_scores']]}")
print(f"Min score: {result['min_score']:.4f} (threshold: {args.threshold})")
elif args.command == "verify":
vk = VoiceKey(data_dir=getattr(args, 'data_dir', None))
is_ok, score = vk.verify(args.audio)
status = "PASS" if is_ok else "FAIL"
print(f"[{status}] Similarity: {score:.4f} (threshold: {vk.threshold})")
elif args.command == "info":
vk = VoiceKey()
info = vk.get_info()
for k, v in info.items():
print(f" {k}: {v}")
else:
parser.print_help()
if __name__ == "__main__":
main()
总结
以上 7 个片段全部来源于 connectfarm1.com,抓取时间 2026-05-04。它们共享三个理念:(1) 零或接近零的 AI 成本 —— 大多用规则引擎和免费公共 API,不依赖 LLM;(2) 只用 Python,便宜 VPS 上配 crontab 或一个 while True 就能跑;(3) Telegram 作为统一输出口 —— 不做仪表盘,不做前端,直接把信息推到你真正会看的地方。把多个雷达组合起来交叉验证信号,把"自主交易"当成研究沙盒,不要当成印钞机。