Skip to main content

Code Vault — 7 Open-Source Crypto Radar & Trading Tools

Code Vault: 7 essential crypto trading tools for automated trading, market analysis, and portfolio management. Open source trading utilities.

Go Python
应用领域: Dev Utils

{</* resource-info */>}

This is a complete walkthrough of the seven snippets currently published in Code Vault — a personal repository of crypto trading radars, autonomous trading systems, and security utilities, all in pure Python with zero or near-zero API costs. Every snippet below includes the full source code so you can read, fork, and run them locally. Use the table of contents on the right to jump to a specific tool.

⚠️ Risk warning — These tools touch live markets and on-chain data. Several of them push real-time alerts to Telegram and one of them (the AI autonomous trader) opens virtual positions on Binance Futures. Read the per-tool notes carefully. Use at your own risk; the author makes no warranty for trading outcomes.

Trading Radar #

Vitalik Sell Radar #

Date: 2026.05.02 Tags: Python · WebSocket · Ethereum · Telegram · Event-Driven

GitHub: vitalik-sell-radar{rel=“nofollow”}

WebSocket event-driven Vitalik wallet sell detection with Telegram alerts

Monitors Vitalik Buterin’s wallet (vitalik.eth) for ERC-20 token sells via WebSocket event subscription — zero polling, sub-second latency. Automatically classifies recipients as DEX Router (Uniswap, 1inch, SushiSwap), CEX hot wallet (Binance, Coinbase, Kraken), or LP Pool. Fetches real-time token prices from DexScreener. Multi-RPC failover with auto-reconnect. Pure Python, zero cost — uses free public RPC nodes.

Full source code #

#!/usr/bin/env python3
"""
Vitalik Sell Radar — Event-Driven Edition
WebSocket subscription to ERC-20 Transfer events, real-time detection of
Vitalik's sell activity, instant push to Telegram.

Architecture:
1. WebSocket subscribes to Transfer(from=vitalik) events → sub-second detection
2. Classifies sell behavior (transfers to DEX Router / CEX / LP Pool)
3. Queries token info + price via DexScreener
4. Pushes alert to Telegram
5. Auto-reconnect + multi-RPC failover
"""

import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

import aiohttp
import websockets

# Load .env file
def load_env():
 env_path = Path(__file__).parent / ".env"
 if env_path.exists():
 for line in env_path.read_text().splitlines():
 line = line.strip()
 if line and not line.startswith("#") and "=" in line:
 k, v = line.split("=", 1)
 os.environ.setdefault(k.strip(), v.strip())

load_env()

# ============================================================
# Configuration
# ============================================================

# Vitalik's main wallet
VITALIK_ADDRESS = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
VITALIK_PADDED = "0x" + VITALIK_ADDRESS[2:].lower().zfill(64)

# ERC-20 Transfer event topic
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"

# WebSocket RPC endpoints (free, support eth_subscribe)
WS_ENDPOINTS = [
 "wss://ethereum-rpc.publicnode.com",
 "wss://eth.drpc.org",
 "wss://ethereum.publicnode.com",
]

# HTTP RPC for querying token info
HTTP_RPC = os.environ.get("HTTP_RPC", "https://eth.drpc.org")

# Telegram
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")

# Known DEX Router addresses (sell destinations)
KNOWN_DEX_ROUTERS = {
 # Uniswap
 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap V2 Router",
 "0xe592427a0aece92de3edee1f18e0157c05861564": "Uniswap V3 Router",
 "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "Uniswap V3 Router2",
 "0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad": "Uniswap Universal Router",
 "0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b": "Uniswap Universal Router (old)",
 # 1inch
 "0x1111111254eeb25477b68fb85ed929f73a960582": "1inch V5",
 "0x111111125421ca6dc452d289314280a0f8842a65": "1inch V6",
 # SushiSwap
 "0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "SushiSwap Router",
 # CoW Protocol
 "0x9008d19f58aabd9ed0d60971565aa8510560ab41": "CoW Settlement",
 # 0x
 "0xdef1c0ded9bec7f1a1670819833240f027b25eff": "0x Exchange Proxy",
 # Curve
 "0x99a58482bd75cbab83b27ec03ca68ff489b5788f": "Curve Router",
}

# Known CEX hot wallets (partial list)
KNOWN_CEX = {
 "0x28c6c06298d514db089934071355e5743bf21d60": "Binance Hot Wallet",
 "0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance Hot Wallet 2",
 "0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance Hot Wallet 3",
 "0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance Hot Wallet 4",
 "0x71660c4005ba85c37ccec55d0c4493e66fe775d3": "Coinbase",
 "0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43": "Coinbase 10",
 "0x503828976d22510aad0201ac7ec88293211d23da": "Coinbase 2",
 "0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2": "FTX (defunct)",
 "0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0": "Kraken",
 "0xae2d4617c862309a3d75a0ffb358c7a5009c673f": "Kraken 10",
}

# Minimum notification amount (USD), 0 = notify all
MIN_NOTIFY_USD = int(os.environ.get("MIN_NOTIFY_USD", "0"))

# Reconnect settings
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60

# ============================================================
# Logging
# ============================================================

logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s [%(levelname)s] %(message)s",
 datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("vitalik-radar")

# ============================================================
# Global state
# ============================================================

# Dedup set for processed tx hashes (last 1000)
seen_txs: set = set()
seen_txs_list: list = []

# HTTP session
http_session: aiohttp.ClientSession | None = None

# Token info cache: {address: {symbol, name, decimals}}
token_cache: dict = {}

# Running flag
running = True

# ============================================================
# Utility functions
# ============================================================

def shorten_addr(addr: str) -> str:
 """Shorten address for display"""
 return f"{addr[:6]}...{addr[-4:]}"

def decode_transfer_value(data_hex: str, decimals: int) -> float:
 """Decode Transfer event value"""
 try:
 raw = int(data_hex, 16)
 return raw / (10 ** decimals)
 except:
 return 0.0

def topic_to_address(topic: str) -> str:
 """Extract 20-byte address from 32-byte topic"""
 return "0x" + topic[-40:]

def classify_recipient(to_addr: str) -> tuple[str, str]:
 """
 Classify recipient address.
 Returns (type, name)
 Type: "dex" / "cex" / "pool" / "unknown"
 """
 to_lower = to_addr.lower()

 if to_lower in KNOWN_DEX_ROUTERS:
 return "dex", KNOWN_DEX_ROUTERS[to_lower]

 if to_lower in KNOWN_CEX:
 return "cex", KNOWN_CEX[to_lower]

 return "unknown", ""

async def get_http_session() -> aiohttp.ClientSession:
 global http_session
 if http_session is None or http_session.closed:
 http_session = aiohttp.ClientSession()
 return http_session

async def rpc_call(method: str, params: list) -> dict | None:
 """HTTP JSON-RPC call"""
 session = await get_http_session()
 try:
 async with session.post(
 HTTP_RPC,
 json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params},
 timeout=aiohttp.ClientTimeout(total=10),
 ) as resp:
 data = await resp.json()
 return data.get("result")
 except Exception as e:
 log.error(f"RPC call {method} failed: {e}")
 return None

async def get_token_info(token_addr: str) -> dict:
 """Query ERC-20 token info (symbol, name, decimals)"""
 addr_lower = token_addr.lower()
 if addr_lower in token_cache:
 return token_cache[addr_lower]

 info = {"symbol": "???", "name": "Unknown", "decimals": 18, "address": token_addr}

 # symbol()
 result = await rpc_call("eth_call", [
 {"to": token_addr, "data": "0x95d89b41"}, "latest"
 ])
 if result and len(result) > 2:
 try:
 hex_str = result[2:]
 if len(hex_str) >= 128:
 offset = int(hex_str[:64], 16) * 2
 length = int(hex_str[offset:offset+64], 16)
 symbol_hex = hex_str[offset+64:offset+64+length*2]
 info["symbol"] = bytes.fromhex(symbol_hex).decode("utf-8", errors="replace").strip('\x00')
 elif len(hex_str) == 64:
 info["symbol"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
 except:
 pass

 # decimals()
 result = await rpc_call("eth_call", [
 {"to": token_addr, "data": "0x313ce567"}, "latest"
 ])
 if result and len(result) > 2:
 try:
 info["decimals"] = int(result, 16)
 except:
 pass

 # name()
 result = await rpc_call("eth_call", [
 {"to": token_addr, "data": "0x06fdde03"}, "latest"
 ])
 if result and len(result) > 2:
 try:
 hex_str = result[2:]
 if len(hex_str) >= 128:
 offset = int(hex_str[:64], 16) * 2
 length = int(hex_str[offset:offset+64], 16)
 name_hex = hex_str[offset+64:offset+64+length*2]
 info["name"] = bytes.fromhex(name_hex).decode("utf-8", errors="replace").strip('\x00')
 elif len(hex_str) == 64:
 info["name"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
 except:
 pass

 token_cache[addr_lower] = info
 return info

async def check_if_pool(addr: str) -> bool:
 """Check if address is a Uniswap V2/V3 pool (has token0 method)"""
 result = await rpc_call("eth_call", [
 {"to": addr, "data": "0x0dfe1681"}, "latest" # token0()
 ])
 if result and len(result) == 66: # 0x + 64 hex chars
 return True
 return False

async def get_eth_price() -> float:
 """Get ETH price from CoinGecko"""
 session = await get_http_session()
 try:
 async with session.get(
 "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
 timeout=aiohttp.ClientTimeout(total=5),
 ) as resp:
 data = await resp.json()
 return data.get("ethereum", {}).get("usd", 0)
 except:
 return 2300 # fallback

async def get_token_price_usd(token_addr: str) -> float | None:
 """Get token USD price from DexScreener (free, no API key needed)"""
 session = await get_http_session()
 try:
 async with session.get(
 f"https://api.dexscreener.com/latest/dex/tokens/{token_addr}",
 timeout=aiohttp.ClientTimeout(total=5),
 ) as resp:
 data = await resp.json()
 pairs = data.get("pairs", [])
 if pairs:
 # Pick pair with highest liquidity
 pairs.sort(key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0), reverse=True)
 price = float(pairs[0].get("priceUsd", 0) or 0)
 return price if price > 0 else None
 except:
 pass
 return None

# ============================================================
# Telegram notifications
# ============================================================

async def send_telegram(text: str):
 """Send Telegram message"""
 if not TG_BOT_TOKEN:
 log.warning("[TG] No bot token configured, skipping notification")
 return

 session = await get_http_session()
 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
 payload = {
 "chat_id": TG_CHAT_ID,
 "text": text,
 "parse_mode": "HTML",
 "disable_web_page_preview": True,
 }

 try:
 async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
 result = await resp.json()
 if not result.get("ok"):
 log.error(f"[TG] Send failed: {result.get('description', '')}")
 # Retry with plain text if HTML parse fails
 if "parse" in result.get("description", "").lower():
 payload["parse_mode"] = None
 async with session.post(url, json=payload) as resp2:
 pass
 else:
 log.info("[TG] Message sent")
 except Exception as e:
 log.error(f"[TG] Error: {e}")

# ============================================================
# Event handling
# ============================================================

async def handle_transfer_event(log_entry: dict):
 """Process a single Transfer event"""
 tx_hash = log_entry.get("transactionHash", "")
 token_addr = log_entry.get("address", "")
 topics = log_entry.get("topics", [])
 data = log_entry.get("data", "0x0")

 # Dedup
 dedup_key = f"{tx_hash}:{token_addr}"
 if dedup_key in seen_txs:
 return
 seen_txs.add(dedup_key)
 seen_txs_list.append(dedup_key)
 # Cleanup (keep last 1000)
 while len(seen_txs_list) > 1000:
 old = seen_txs_list.pop(0)
 seen_txs.discard(old)

 # Parse topics: [Transfer, from, to]
 if len(topics) < 3:
 return

 from_addr = topic_to_address(topics[1])
 to_addr = topic_to_address(topics[2])

 # Confirm it's from Vitalik
 if from_addr.lower() != VITALIK_ADDRESS.lower():
 return

 # Get token info
 token_info = await get_token_info(token_addr)
 symbol = token_info["symbol"]
 decimals = token_info["decimals"]
 amount = decode_transfer_value(data, decimals)

 if amount == 0:
 return

 # Classify recipient
 recipient_type, recipient_name = classify_recipient(to_addr)

 # If unknown, check if it's an LP pool
 is_pool = False
 if recipient_type == "unknown":
 is_pool = await check_if_pool(to_addr)
 if is_pool:
 recipient_type = "pool"
 recipient_name = "LP Pool"

 # Determine if this is a "sell" action
 is_sell = recipient_type in ("dex", "cex", "pool")

 # If not a sell, just a regular transfer — log silently
 if not is_sell:
 log.info(f"[Transfer] {symbol} {amount:,.2f}{shorten_addr(to_addr)} (regular transfer, not notifying)")
 return

 # Get price
 token_price = await get_token_price_usd(token_addr)
 usd_value = amount * token_price if token_price else None

 # Below minimum notification amount — skip
 if usd_value is not None and usd_value < MIN_NOTIFY_USD:
 log.info(f"[Sell] {symbol} ${usd_value:.0f} < ${MIN_NOTIFY_USD} minimum, skipping")
 return

 # Build notification message
 timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S UTC")

 sell_type_label = {
 "dex": "🔄 DEX Sell",
 "cex": "🏦 CEX Transfer",
 "pool": "🌊 Pool Sell",
 }

 msg_lines = [
 f"<b>🚨 Vitalik Sell Signal</b>",
 f"",
 f"Token: <b>{symbol}</b>",
 f"Amount: {amount:,.4f}",
 ]

 if usd_value is not None:
 msg_lines.append(f"Value: <b>${usd_value:,.0f}</b>")

 if token_price is not None:
 msg_lines.append(f"Price: ${token_price:,.8f}")

 msg_lines.extend([
 f"",
 f"Type: {sell_type_label.get(recipient_type, 'Unknown')}",
 f"Destination: {recipient_name or shorten_addr(to_addr)}",
 f"Time: {timestamp}",
 f"",
 f"TX: https://etherscan.io/tx/{tx_hash}",
 f"Token: https://dexscreener.com/ethereum/{token_addr}",
 ])

 msg = "\n".join(msg_lines)
 log.info(f"[SELL DETECTED] {symbol} {amount:,.4f}{recipient_name or to_addr} | ${usd_value or '?'}")
 await send_telegram(msg)

# ============================================================
# WebSocket listener main loop
# ============================================================

async def subscribe_and_listen(ws_url: str):
 """Connect to WebSocket and subscribe to Vitalik Transfer events"""
 log.info(f"Connecting to {ws_url}...")

 async with websockets.connect(
 ws_url,
 ping_interval=20,
 ping_timeout=30,
 close_timeout=10,
 max_size=2**20, # 1MB
 ) as ws:
 # Subscribe to Transfer FROM Vitalik
 sub_from = {
 "jsonrpc": "2.0", "id": 1,
 "method": "eth_subscribe",
 "params": ["logs", {
 "topics": [TRANSFER_TOPIC, VITALIK_PADDED]
 }]
 }
 await ws.send(json.dumps(sub_from))
 resp = await asyncio.wait_for(ws.recv(), timeout=10)
 data = json.loads(resp)
 if "error" in data:
 raise Exception(f"Subscribe failed: {data['error']}")
 sub_id_from = data.get("result", "")
 log.info(f"✅ Subscribed to Transfer FROM Vitalik (id={sub_id_from[:10]}...)")

 # Also subscribe to Transfer TO Vitalik (monitor buys/receives)
 sub_to = {
 "jsonrpc": "2.0", "id": 2,
 "method": "eth_subscribe",
 "params": ["logs", {
 "topics": [TRANSFER_TOPIC, None, VITALIK_PADDED]
 }]
 }
 await ws.send(json.dumps(sub_to))
 resp2 = await asyncio.wait_for(ws.recv(), timeout=10)
 data2 = json.loads(resp2)
 sub_id_to = data2.get("result", "")
 if sub_id_to:
 log.info(f"✅ Subscribed to Transfer TO Vitalik (id={sub_id_to[:10]}...)")

 log.info("🔍 Monitoring Vitalik wallet... waiting for events")

 # Listen for events
 async for message in ws:
 if not running:
 break

 try:
 evt = json.loads(message)

 if "params" not in evt:
 continue

 sub_id = evt["params"].get("subscription", "")
 log_entry = evt["params"].get("result", {})

 if sub_id == sub_id_from:
 # Vitalik sent tokens → check if it's a sell
 await handle_transfer_event(log_entry)
 elif sub_id == sub_id_to:
 # Vitalik received tokens — log silently
 token_addr = log_entry.get("address", "")
 token_info = await get_token_info(token_addr)
 data_hex = log_entry.get("data", "0x0")
 amount = decode_transfer_value(data_hex, token_info["decimals"])
 log.debug(f"[Receive] {token_info['symbol']} +{amount:,.4f}")

 except json.JSONDecodeError:
 continue
 except Exception as e:
 log.error(f"Event handling error: {e}", exc_info=True)

async def main():
 """Main loop with auto-reconnect"""
 global running

 # Validate required config
 if not TG_BOT_TOKEN:
 log.warning("TG_BOT_TOKEN not set — Telegram notifications disabled")
 if not TG_CHAT_ID:
 log.warning("TG_CHAT_ID not set — Telegram notifications disabled")

 # Signal handling
 def shutdown(sig, frame):
 global running
 log.info(f"Received signal {sig}, shutting down...")
 running = False

 signal.signal(signal.SIGINT, shutdown)
 signal.signal(signal.SIGTERM, shutdown)

 # Startup notice
 log.info("=" * 50)
 log.info("Vitalik Sell Radar — Started")
 log.info(f"Monitoring: {VITALIK_ADDRESS}")
 log.info(f"Min notify: ${MIN_NOTIFY_USD}")
 log.info(f"Telegram: {'✅ Configured' if TG_BOT_TOKEN else '❌ Not configured'}")
 log.info("=" * 50)

 if TG_BOT_TOKEN and TG_CHAT_ID:
 await send_telegram(
 "🟢 Vitalik Sell Radar — Started\n\n"
 f"Monitoring: {shorten_addr(VITALIK_ADDRESS)}\n"
 f"Min notify: ${MIN_NOTIFY_USD}\n"
 "Mode: WebSocket event-driven (sub-second latency)"
 )

 endpoint_idx = 0
 reconnect_delay = RECONNECT_DELAY

 while running:
 ws_url = WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]
 try:
 await subscribe_and_listen(ws_url)
 reconnect_delay = RECONNECT_DELAY # reset on success
 except websockets.exceptions.ConnectionClosed as e:
 log.warning(f"WebSocket closed: {e}")
 except asyncio.TimeoutError:
 log.warning("WebSocket timeout")
 except Exception as e:
 log.error(f"WebSocket error: {e}")

 if not running:
 break

 # Switch endpoint and retry
 endpoint_idx += 1
 log.info(f"Reconnecting in {reconnect_delay}s... (next: {WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]})")
 await asyncio.sleep(reconnect_delay)
 reconnect_delay = min(reconnect_delay * 1.5, MAX_RECONNECT_DELAY)

 # Cleanup
 if http_session and not http_session.closed:
 await http_session.close()

 log.info("Vitalik Sell Radar — Stopped")

if __name__ == "__main__":
 asyncio.run(main())

On-Chain Narrative Radar #

Date: 2026.04.28 Tags: Python · GMGN · DEXScreener · Telegram

Momentum-driven on-chain token discovery across ETH/SOL/BSC/Base

Momentum is the only push engine — narrative is just a classification label. Scans every 30 seconds across 4 chains. Tokens must show 3 consecutive rounds of market cap increase with ≥5% total gain to trigger an alert. Narratives (Musk/Trump, Binance/CZ, celebrity) are classified as ★★★/★★/★ labels but never trigger pushes on their own. Safety checks via RugCheck (SOL) and GoPlus (EVM). Pure Python, zero AI cost.

Full source code #

#!/usr/bin/env python3
"""
radar → radar v1
Python,zero AI cost(key matching + )

3. Binance/CZ — BSC

data:GMGN + DEXScreenersearch
"""

import requests
import json
import time
import os
import re
import sqlite3
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from difflib import SequenceMatcher

# === configuration ===
DATA_DIR = os.path.expanduser("~/crypto-trading")
DB_FILE = os.path.join(DATA_DIR, "narrative_history.db")
LOG_FILE = os.path.join(DATA_DIR, "narrative_radar.log")
SEEN_FILE = os.path.join(DATA_DIR, "narrative_seen.json")
FLAP_SEEN_FILE = os.path.join(DATA_DIR, "flap_seen.json")

SCAN_INTERVAL = 30 # 30(GMGNdata1-5,10data)

# {address: [{'ts': timestamp, 'mc': market_cap, 'vol': volume, 'price': price}, ...]}
MOMENTUM_TRACKER = {}
MOMENTUM_PUSHED = {} # {address: {'count': N, 'last_ts': ts, 'last_mc': mc}} 
MOMENTUM_CONSECUTIVE_UP = 3 # consecutive3(data)

# from.envreadTGconfiguration
def load_env():
 env = {}
 env_file = os.path.expanduser("~/.env")
 if os.path.exists(env_file):
 with open(env_file) as f:
 for line in f:
 line = line.strip()
 if '=' in line and not line.startswith('#'):
 k, v = line.split('=', 1)
 env[k] = v
 return env

ENV = load_env()
TG_TOKEN = ENV.get('TELEGRAM_BOT_TOKEN', '')
TG_CHAT_ID = int(os.environ.get('TG_CHAT_ID', '0'))

GMGN_HEADERS = {
 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
 'Accept': 'application/json',
 'Referer': 'https://gmgn.ai/',
}

# ============================================================
# ============================================================
MUSK_TRUMP_KEYWORDS = {
 'musk', 'elon', 'elonmusk',
 # SpaceX/Tesla/X
 'spacex', 'starship', 'tesla', 'cybertruck', 'roadster',
 'neuralink', 'boring', 'hyperloop', 'xai', 'grok',
 'floki', 'shiba', # 
 'doge father', 'dogefather', 'technoking',
 'mars colony', 'mars',
 'trump', 'donald', 'maga', 'potus', 'trump47',
 'melania', 'barron', 'ivanka',
 'dark maga', 'darkmaga', 'ultra maga', 'save america',
 'truth social', 'covfefe',
 'doge department', 'd.o.g.e', 'government efficiency',
}

MUSK_TRUMP_PATTERNS = [
 r'\belon\b', r'\bmusk\b', r'\btrump\b', r'\bmaga\b',
 r'\bspacex\b', r'\bstarship\b', r'\btesla\b', r'\bgrok\b',
 r'\bmelania\b', r'\bbarron\b', r'\bdoge\s*department\b',
 r'\bd\.?o\.?g\.?e\b', # D.O.G.E
 r'\bx\s*ai\b', r'\bneuralink\b',
]

# ============================================================
# ============================================================
BINANCE_CZ_KEYWORDS = {
 # CZcore 
 'cz', 'changpeng', 'zhao', 'czb', 'czbinance',
 'heyi', 'yi he', 'he yi', '', 'yihe',
 'sister yi', 'yi jie', '', '',
 'binance', 'bnb', 'pancake', 'pancakeswap',
 'giggle academy', 'binance life', 'bnb chain',
 'principles', 'cz book',
 'yzi', 'yzi labs',
 '', 'Binance', '', 'cz', '',
 'fourmeme', 'four meme', '4meme',
 'czs dog', 'cz dog', 'bnb dog',
 'build on bnb', 'bnb ecosystem',
}

BINANCE_CZ_PATTERNS = [
 r'\bcz\b', r'\bbinance\b', r'\bbnb\b',
 r'\bheyi\b', r'\byi\s*he\b', r'\bhe\s*yi\b',
 r'\b\b', r'\b\b',
 r'\bpancake\b', r'\bgiggle\b', r'\byzi\b',
 r'\bfourmeme\b', r'\b4meme\b',
]

# ============================================================
# ============================================================
CELEBRITY_VIRAL_KEYWORDS = {
 'vitalik', 'buterin', 'sam altman', 'satoshi',
 'michael saylor', 'saylor', 'cathie wood',
 'jack dorsey', 'zuckerberg', 'bezos',
 'jensen huang', 'nvidia', 'tim cook',
 'justin sun', 'sun yuchen', '', 'tron',
 'arthur hayes', 'su zhu', '3ac',
 'brian armstrong', 'coinbase',
 'larry fink', 'blackrock',
 'gary gensler', 'sec',
 'michael novogratz', 'galaxy',
 'biden', 'obama', 'putin', 'xi jinping',
 'kanye', 'drake', 'snoop dogg', 'paris hilton',
 'mark cuban', 'mr beast', 'mrbeast',
 'lobster', '', 'lobsta',
 'hawk tuah', 'griddy', 'skibidi',
 'rizz', 'sigma', 'gyatt',
 'etf', 'halving', '',
 'world war', 'wwiii',
 'fed', 'rate cut', '',
 'tiktok ban', 'tiktok',
}

CELEBRITY_VIRAL_PATTERNS = [
 r'\bvitalik\b', r'\bsaylor\b', r'\bblackrock\b',
 r'\bcoinbase\b', r'\bjustin\s*sun\b', r'\blobster\b',
 r'\betf\b', r'\bhalving\b', r'\bmrbeast\b',
 r'\bsnoop\b', r'\bkanye\b', r'\bdrake\b',
]

# ============================================================
# ============================================================
SPAM_PATTERNS = [
 r'airdrop', r'presale', r'pre\s*sale',
 r'1000x', r'100x guaranteed',
 r'safe\s*moon', r'baby\s*\w+', # babydoge
 r'pornhub', r'porn', r'xxx', r'nsfw',
 r'nigga', r'nigger', r'faggot',
 r'scam', r'rugpull', r'rug\s*pull',
 r'official\s*token', r'official\s*coin',
]

COMMON_NOISE_WORDS = {
 'nice', 'good', 'bad', 'cool', 'hot', 'big', 'small',
 'life', 'love', 'hate', 'happy', 'sad', 'fun', 'lol',
 'cat', 'dog', 'moon', 'sun', 'star', 'king', 'queen',
 'gold', 'rich', 'cash', 'money', 'pay', 'buy', 'sell',
 'pump', 'dump', 'bull', 'bear', 'green', 'red',
 'hello', 'world', 'yes', 'no', 'wow', 'omg', 'lmao',
 'simp', 'chad', 'based', 'cope', 'seethe',
 'test', 'new', 'old', 'real', 'fake',
 'shit', 'shitcoin', 'fuck', 'fart', 'poop', 'pee',
 'cum', 'dick', 'ass', 'boob', 'tit',
 'nigga', 'retard', 'slop',
 'the', 'and', 'for', 'from', 'with', 'this', 'that',
 'coin', 'token', 'meme', 'pepe', 'wojak',
 'peg', 'usd', 'usdt', 'usdc', 'dai',
}

# ============================================================
# toolfunction
# ============================================================
def log(msg):
 ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 line = f"[{ts}] {msg}"
 print(line)
 os.makedirs(DATA_DIR, exist_ok=True)
 with open(LOG_FILE, 'a') as f:
 f.write(line + '\n')

def load_flap_seen():
 if os.path.exists(FLAP_SEEN_FILE):
 try:
 with open(FLAP_SEEN_FILE) as f:
 return json.load(f)
 except:
 pass
 return {}

def save_flap_seen(data):
 cutoff = int(time.time()) - 86400 * 7
 data = {k: v for k, v in data.items() if v > cutoff}
 with open(FLAP_SEEN_FILE, 'w') as f:
 json.dump(data, f)

def tg_send(text, parse_mode='Markdown'):
 if not TG_TOKEN:
 log(f"[TG] No token, skip: {text[:80]}")
 return False
 try:
 resp = requests.post(
 f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
 json={'chat_id': TG_CHAT_ID, 'text': text, 'parse_mode': parse_mode},
 timeout=10
 )
 result = resp.json()
 if not result.get('ok'):
 if 'can\'t parse' in str(result.get('description', '')).lower():
 resp = requests.post(
 f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
 json={'chat_id': TG_CHAT_ID, 'text': text},
 timeout=10
 )
 else:
 log(f"[TG] Error: {result.get('description', '')}")
 return False
 return True
 except Exception as e:
 log(f"[TG] Send error: {e}")
 return False

# ============================================================
# ============================================================
def init_db():
 """initializeSQLitelibrary"""
 conn = sqlite3.connect(DB_FILE)
 c = conn.cursor()
 
 c.execute('''CREATE TABLE IF NOT EXISTS narratives (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 theme TEXT NOT NULL, -- ()
 first_token_name TEXT, -- token
 first_token_address TEXT, -- address
 first_chain TEXT, -- 
 first_seen_at INTEGER, -- 
 token_count INTEGER DEFAULT 1, -- 
 last_seen_at INTEGER -- 
 )''')
 
 c.execute('''CREATE TABLE IF NOT EXISTS tokens_seen (
 address TEXT PRIMARY KEY,
 chain TEXT,
 name TEXT,
 symbol TEXT,
 narrative_theme TEXT,
 category TEXT, -- 'musk_trump' / 'binance_cz' / 'novel' / 'common'
 first_seen_at INTEGER,
 market_cap REAL,
 pushed INTEGER DEFAULT 0, -- 
 seen_count INTEGER DEFAULT 1 -- 
 )''')
 
 # index
 c.execute('CREATE INDEX IF NOT EXISTS idx_theme ON narratives(theme)')
 c.execute('CREATE INDEX IF NOT EXISTS idx_addr ON tokens_seen(address)')
 
 conn.commit()
 return conn

def normalize_theme(name, symbol):
 """
 :'Elon Mars Colony' → 'elon mars colony'
 'TRUMP2028' → 'trump'
 'PancakeBunny' → 'pancake bunny'
 """
 text = f"{name} {symbol}".lower().strip()
 
 noise = ['token', 'coin', 'inu', 'swap', 'finance', 'protocol',
 'dao', 'defi', 'nft', 'meta', 'verse', 'fi', 'ai',
 'pepe', 'wojak', 'chad', 'based']
 
 # splitcamelCase
 text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
 text = re.sub(r'\d+x?', '', text)
 text = re.sub(r'[^a-z\s]', ' ', text)
 words = [w for w in text.split() if w and len(w) > 1 and w not in noise]
 
 if not words:
 return name.lower().strip()
 
 return ' '.join(sorted(set(words)))

def is_similar_theme(theme1, theme2, threshold=0.7):
 if theme1 == theme2:
 return True
 if theme1 in theme2 or theme2 in theme1:
 return True
 words1 = set(theme1.split())
 words2 = set(theme2.split())
 if words1 and words2:
 overlap = len(words1 & words2) / min(len(words1), len(words2))
 if overlap >= 0.6:
 return True
 return SequenceMatcher(None, theme1, theme2).ratio() >= threshold

def check_narrative_novelty(conn, theme, name, symbol, address, chain):
 """
 return:
 ('novel', None) — 
 ('heating', narrative_row) — !!
 ('existing', existing_theme_row) — ,
 
 """
 c = conn.cursor()
 now = int(time.time())
 HEAT_WINDOW = 1800 # 30
 HEAT_THRESHOLD = 2 # 2aboveis 
 
 c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives WHERE theme = ?', (theme,))
 exact = c.fetchone()
 if exact:
 row_id, _, _, _, _, first_seen, count, last_seen = exact
 new_count = count + 1
 c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE theme = ?',
 (new_count, now, theme))
 conn.commit()
 
 if now - first_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
 return ('heating', exact)
 if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
 return ('heating', exact)
 
 return ('existing', exact)
 
 c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives ORDER BY last_seen_at DESC LIMIT 1000')
 for row in c.fetchall():
 if is_similar_theme(theme, row[1]):
 row_id, _, _, _, _, first_seen, count, last_seen = row
 new_count = count + 1
 c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE id = ?',
 (new_count, now, row[0]))
 conn.commit()
 
 if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
 return ('heating', row)
 
 return ('existing', row)
 
 c.execute('''INSERT INTO narratives (theme, first_token_name, first_token_address, first_chain, first_seen_at, last_seen_at)
 VALUES (?, ?, ?, ?, ?, ?)''',
 (theme, name, address, chain, now, now))
 conn.commit()
 return ('novel', None)

def get_token_seen_count(conn, address):
 c = conn.cursor()
 c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
 row = c.fetchone()
 return row[0] if row else 0

def is_token_seen(conn, address):
 c = conn.cursor()
 c.execute('SELECT address FROM tokens_seen WHERE address = ?', (address,))
 return c.fetchone() is not None

def record_token(conn, address, chain, name, symbol, theme, category, mc, pushed=False):
 c = conn.cursor()
 c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
 existing = c.fetchone()
 if existing:
 new_count = existing[0] + 1
 c.execute('''UPDATE tokens_seen SET seen_count = ?, market_cap = ?, category = ?
 WHERE address = ?''', (new_count, mc, category, address))
 else:
 c.execute('''INSERT INTO tokens_seen 
 (address, chain, name, symbol, narrative_theme, category, first_seen_at, market_cap, pushed, seen_count)
 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)''',
 (address, chain, name, symbol, theme, category, int(time.time()), mc, 1 if pushed else 0))
 conn.commit()

# ============================================================
# ============================================================
def classify_narrative(name, symbol, chain):
 """
 classificationtoken
 return:('musk_trump', matched_keywords) / ('binance_cz', matched_keywords) / ('novel', None) / ('common', None)
 """
 text = f"{name} {symbol}".lower()
 
 for pat in SPAM_PATTERNS:
 if re.search(pat, text, re.IGNORECASE):
 return ('spam', None)
 
 matched_mt = []
 for kw in MUSK_TRUMP_KEYWORDS:
 if kw.lower() in text:
 matched_mt.append(kw)
 if not matched_mt:
 for pat in MUSK_TRUMP_PATTERNS:
 m = re.search(pat, text, re.IGNORECASE)
 if m:
 matched_mt.append(m.group())
 
 if matched_mt:
 chain_lower = chain.lower()
 if chain_lower in ('eth', 'ethereum', 'sol', 'solana', 'bsc', 'base'):
 return ('musk_trump', matched_mt)
 
 matched_bc = []
 for kw in BINANCE_CZ_KEYWORDS:
 if kw.lower() in text:
 matched_bc.append(kw)
 if not matched_bc:
 for pat in BINANCE_CZ_PATTERNS:
 m = re.search(pat, text, re.IGNORECASE)
 if m:
 matched_bc.append(m.group())
 
 if matched_bc:
 chain_lower = chain.lower()
 if chain_lower in ('bsc',):
 return ('binance_cz', matched_bc)
 else:
 return ('binance_cz_wrong_chain', matched_bc)
 
 matched_cv = []
 for kw in CELEBRITY_VIRAL_KEYWORDS:
 if kw.lower() in text:
 matched_cv.append(kw)
 if not matched_cv:
 for pat in CELEBRITY_VIRAL_PATTERNS:
 m = re.search(pat, text, re.IGNORECASE)
 if m:
 matched_cv.append(m.group())
 
 if matched_cv:
 return ('celebrity_viral', matched_cv)
 
 return ('check_novelty', None)

# ============================================================
# ============================================================
def check_token_safety(chain, address):
 if chain in ('sol', 'solana'):
 try:
 r = requests.get(f'https://api.rugcheck.xyz/v1/tokens/{address}/report', timeout=10)
 if r.status_code == 200:
 data = r.json()
 score = data.get('score', 999)
 mint = data.get('mintAuthority')
 freeze = data.get('freezeAuthority')
 return {
 'safe': not mint and not freeze,
 'score': score, 'mint': mint is not None,
 'freeze': freeze is not None
 }
 except:
 pass
 else:
 chain_map = {'ethereum': '1', 'eth': '1', 'bsc': '56', 'base': '8453'}
 cid = chain_map.get(chain, '1')
 try:
 r = requests.get(f'https://api.gopluslabs.io/api/v1/token_security/{cid}?contract_addresses={address}', timeout=10)
 if r.status_code == 200:
 result = r.json().get('result', {})
 data = result.get(address.lower(), {})
 if data:
 honeypot = data.get('is_honeypot', '0') == '1'
 mintable = data.get('is_mintable', '0') == '1'
 sell_tax = float(data.get('sell_tax', '0') or '0')
 buy_tax = float(data.get('buy_tax', '0') or '0')
 return {
 'safe': not honeypot and not mintable, # as
 'honeypot': honeypot, 'mintable': mintable,
 'sell_tax': sell_tax, 'buy_tax': buy_tax
 }
 except:
 pass

# ============================================================
# ============================================================
def gmgn_get(url):
 try:
 resp = requests.get(url, headers=GMGN_HEADERS, timeout=15)
 if resp.status_code == 200:
 return resp.json().get('data', {})
 except:
 pass
 return {}

def fetch_token_description(chain, address):
 """token/ — radarcore """
 desc = ''
 
 if chain in ('sol', 'solana'):
 try:
 r = requests.get(f'https://frontend-api-v3.pump.fun/coins/{address}', timeout=8)
 if r.status_code == 200:
 data = r.json()
 desc = data.get('description', '') or ''
 twitter = data.get('twitter', '') or ''
 telegram = data.get('telegram', '') or ''
 website = data.get('website', '') or ''
 return {
 'description': desc.strip(),
 'twitter': twitter,
 'telegram': telegram,
 'website': website,
 }
 except:
 pass
 
 try:
 chain_dex = {'sol': 'solana', 'eth': 'ethereum', 'bsc': 'bsc', 'base': 'base',
 'solana': 'solana', 'ethereum': 'ethereum'}.get(chain, chain)
 r = requests.get(f'https://api.dexscreener.com/latest/dex/tokens/{address}', timeout=8)
 if r.status_code == 200:
 pairs = r.json().get('pairs', [])
 if pairs:
 info = pairs[0].get('info', {})
 websites = info.get('websites', [])
 socials = info.get('socials', [])
 twitter = ''
 telegram = ''
 website = ''
 for s in socials:
 if s.get('type') == 'twitter':
 twitter = s.get('url', '')
 elif s.get('type') == 'telegram':
 telegram = s.get('url', '')
 for w in websites:
 if w.get('label', '').lower() == 'website':
 website = w.get('url', '')
 if not desc:
 return {
 'description': desc,
 'twitter': twitter,
 'telegram': telegram,
 'website': website,
 }
 except:
 pass
 
 return {'description': desc, 'twitter': '', 'telegram': '', 'website': ''}

def fetch_new_tokens():
 """fromGMGN + multi-dimensionalcovering"""
 all_tokens = []
 seen_addrs = set()
 
 for chain in ['eth', 'bsc', 'base']:
 urls = [
 f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=open_timestamp&direction=desc&limit=100',
 f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=swaps&direction=desc&limit=50',
 ]
 
 for url in urls:
 data = gmgn_get(url)
 tokens = data.get('rank', [])
 
 for t in tokens:
 addr = t.get('address', '')
 if not addr or addr in seen_addrs:
 continue
 
 mc = t.get('market_cap', 0) or t.get('fdv', 0) or 0
 liq = t.get('liquidity', 0) or 0
 
 if mc < 1000 or liq < 500 or mc > 10000000:
 continue
 
 age_ts = t.get('open_timestamp', 0)
 age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 999
 
 
 seen_addrs.add(addr)
 all_tokens.append({
 'address': addr,
 'chain': chain,
 'name': t.get('name', '?'),
 'symbol': t.get('symbol', '?'),
 'mc': mc,
 'liq': liq,
 'volume': t.get('volume', 0) or 0,
 'holders': t.get('holder_count', 0) or 0,
 'sm': t.get('smart_degen_count', 0) or 0,
 'chg_1h': t.get('price_change_percent1h', 0) or 0,
 'chg_24h': t.get('price_change_percent', 0) or 0,
 'age_h': age_h,
 'price': t.get('price', 0),
 'buys_1h': t.get('buys', 0) or 0,
 'sells_1h': t.get('sells', 0) or 0,
 })
 
 time.sleep(0.3)
 
 return all_tokens

def fetch_flap_tokens():
 """
 FLAPplatformscan — BSCcommunitydriver
 features:24h,but 1hstabilize/,>sell,holders
 """
 data = gmgn_get(
 'https://gmgn.ai/defi/quotation/v1/rank/bsc/swaps/24h?launchpad=flap&orderby=volume&direction=desc&limit=30'
 )
 tokens = data.get('rank', [])
 
 candidates = []
 for t in tokens:
 addr = t.get('address', '')
 if not addr:
 continue
 
 mc = t.get('market_cap', 0) or 0
 liq = t.get('liquidity', 0) or 0
 vol = t.get('volume', 0) or 0
 holders = t.get('holder_count', 0) or 0
 buys = t.get('buys', 0) or 0
 sells = t.get('sells', 0) or 0
 chg_1h = t.get('price_change_percent1h', 0) or 0
 chg_24h = t.get('price_change_percent', 0) or 0
 age_ts = t.get('open_timestamp', 0)
 age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 0
 
 if mc < 1000 or liq < 500:
 continue
 if holders < 5:
 continue
 
 buy_ratio = buys / max(sells, 1)
 
 is_support = False
 reason = ''
 
 if chg_24h < -10 and chg_1h > chg_24h * 0.3:
 is_support = True
 reason = f'24h{chg_24h:.0f}%but 1hstabilize{chg_1h:+.0f}%'
 
 if -10 <= chg_24h <= 30 and chg_1h > -5 and buy_ratio > 1.1:
 is_support = True
 reason = f' {buy_ratio:.2f}'
 
 if chg_24h < -30 and chg_1h > 10:
 is_support = True
 reason = f'{chg_24h:.0f}%bounce{chg_1h:+.0f}%'
 
 if is_support and buy_ratio >= 1.0:
 candidates.append({
 'address': addr,
 'chain': 'bsc',
 'name': t.get('name', '?'),
 'symbol': t.get('symbol', '?'),
 'mc': mc,
 'liq': liq,
 'volume': vol,
 'holders': holders,
 'sm': 0,
 'chg_1h': chg_1h,
 'chg_24h': chg_24h,
 'age_h': age_h,
 'price': t.get('price', 0),
 'buys': buys,
 'sells': sells,
 'buy_ratio': buy_ratio,
 'support_reason': reason,
 'launchpad': 'flap',
 })
 
 candidates.sort(key=lambda x: x['mc'], reverse=True)
 return candidates

def format_flap_alert(token, desc_info=None):
 msg = f"radar — FLAP\n"
 msg += f": BSC | platform: FLAP\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {token['support_reason']}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f"liquidity ${token['liq']:>12,.0f}\n"
 msg += f"24h ${token['volume']:>12,.0f}\n"
 msg += f" {token['holders']:>12,d}\n"
 msg += f"/ {token['buys']:>6,d}/{token['sells']:>6,d}\n"
 msg += f" {token['buy_ratio']:>12.2f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 msg += f"24h {token['chg_24h']:>+11.1f}%\n"
 msg += f"```\n"
 msg += "\nFLAPcommunity — "
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

# ============================================================
# ============================================================
def format_musk_trump_alert(token, matched_kw, desc_info=None):
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f"key : {', '.join(matched_kw[:5])}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f"liquidity ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```\n"
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"Twitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n' + '\n'.join(links)
 
 return msg

def format_binance_cz_alert(token, matched_kw, desc_info=None):
 """Binance/CZ"""
 msg = f"radar — Binance/CZ\n"
 msg += f": BSC\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f"key : {', '.join(matched_kw[:5])}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f"liquidity ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```\n"
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"Twitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n' + '\n'.join(links)
 
 return msg

def format_novel_narrative_alert(token, theme, desc_info=None):
 return format_heating_narrative_alert(token, theme, 1, desc_info)

def format_heating_narrative_alert(token, theme, count, desc_info=None):
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 msg = f"radar — \n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 300:
 desc = desc[:300] + '...'
 msg += f": {desc}\n\n"
 else:
 msg += f": {theme}\n\n"
 
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f"liquidity ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['holders']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```"
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

# ============================================================
# ============================================================
def track_momentum(tokens):
 """
 """
 global MOMENTUM_TRACKER, MOMENTUM_PUSHED
 now = time.time()
 alerts = []
 
 current_addrs = set()
 
 for token in tokens:
 addr = token['address']
 mc = token['mc']
 vol = token.get('volume', 0) or 0
 price = token.get('price', 0) or 0
 buys = token.get('buys_1h', 0) or token.get('buys', 0) or 0
 
 current_addrs.add(addr)
 
 if mc < 1000 or token.get('liq', 0) < 500 or mc > 10000000:
 continue
 
 if addr not in MOMENTUM_TRACKER:
 MOMENTUM_TRACKER[addr] = []
 
 snapshots = MOMENTUM_TRACKER[addr]
 
 if snapshots and snapshots[-1]['mc'] == mc and snapshots[-1]['vol'] == vol:
 continue # data,
 
 snapshots.append({
 'ts': now,
 'mc': mc,
 'vol': vol,
 'price': price,
 'buys': buys,
 })
 
 if len(snapshots) > 20:
 snapshots[:] = snapshots[-20:]
 
 if len(snapshots) < MOMENTUM_CONSECUTIVE_UP:
 continue
 
 recent = snapshots[-MOMENTUM_CONSECUTIVE_UP:]
 consecutive_up = True
 total_gain = 0
 
 for i in range(1, len(recent)):
 prev_mc = recent[i-1]['mc']
 curr_mc = recent[i]['mc']
 if prev_mc <= 0:
 consecutive_up = False
 break
 gain = (curr_mc - prev_mc) / prev_mc
 consecutive_up = False
 break
 total_gain += gain
 
 if not consecutive_up:
 continue
 
 vol_increasing = True
 for i in range(1, len(recent)):
 if recent[i]['buys'] < recent[i-1]['buys'] * 0.8: # 
 vol_increasing = False
 break
 
 first_mc = recent[0]['mc']
 last_mc = recent[-1]['mc']
 pct_gain = ((last_mc - first_mc) / first_mc * 100) if first_mc > 0 else 0
 
 if pct_gain < 5:
 continue
 
 push_info = MOMENTUM_PUSHED.get(addr, {'count': 0, 'last_ts': 0, 'last_mc': 0})
 
 if push_info['count'] > 0 and last_mc <= push_info['last_mc']:
 continue
 
 push_info['count'] += 1
 push_info['last_ts'] = now
 push_info['last_mc'] = last_mc
 signal_count = push_info['count']
 
 safety = check_token_safety(token['chain'], addr)
 if not safety.get('safe'):
 continue
 
 category, matched_kw = classify_narrative(token['name'], token['symbol'], token['chain'])
 is_flap = token.get('launchpad') == 'flap'
 
 if category == 'musk_trump':
 stars = 3
 narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
 elif category == 'binance_cz':
 stars = 3
 narrative_tag = f"Binance/CZ ({', '.join(matched_kw[:3])})"
 elif category == 'celebrity_viral':
 stars = 2
 narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
 elif is_flap:
 stars = 2
 narrative_tag = "FLAPcommunity"
 else:
 theme = normalize_theme(token['name'], token['symbol'])
 theme_words = [w for w in theme.split() if w not in COMMON_NOISE_WORDS and len(w) > 2]
 if len(theme_words) >= 2:
 stars = 2
 narrative_tag = f": {theme}"
 else:
 stars = 1
 narrative_tag = ""
 
 desc_info = fetch_token_description(token['chain'], addr)
 
 if is_flap:
 has_twitter = bool(desc_info.get('twitter'))
 has_tg = bool(desc_info.get('telegram'))
 has_web = bool(desc_info.get('website'))
 community_tags = []
 if has_twitter:
 community_tags.append("")
 if has_tg:
 community_tags.append("TG")
 if has_web:
 community_tags.append("")
 if community_tags:
 narrative_tag += f" | {' '.join(community_tags)}"
 stars = min(3, stars + 1) # community
 else:
 narrative_tag += " | communityLinks"
 
 msg = format_momentum_alert(token, pct_gain, len(recent), vol_increasing, stars, narrative_tag, desc_info, signal_count)
 alerts.append({'msg': msg, 'token': token})
 MOMENTUM_PUSHED[addr] = push_info
 
 log(f"[{signal_count}] {token['name']} ({token['symbol']}) on {token['chain']}{len(recent)} +{pct_gain:.1f}%")
 
 stale = [a for a in MOMENTUM_TRACKER if a not in current_addrs]
 for a in stale:
 if now - MOMENTUM_TRACKER[a][-1]['ts'] > 600: # 10
 del MOMENTUM_TRACKER[a]
 
 MOMENTUM_PUSHED = {k: v for k, v in MOMENTUM_PUSHED.items() if now - v.get('last_ts', 0) < 3600}
 
 return alerts

def format_momentum_alert(token, pct_gain, rounds, vol_up, stars, narrative_tag, desc_info=None, seen_count=0):
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 vol_tag = "" if vol_up else ""
 star_str = "★" * stars + "☆" * (3 - stars)
 
 msg = f"radar\n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {narrative_tag}\n"
 msg += f"{rounds} +{pct_gain:.1f}% {vol_tag}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f"liquidity ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```\n"
 msg += f": {star_str} : {seen_count}"
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

def format_celebrity_alert(token, matched_kw, desc_info=None):
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 msg = f"radar — / ★★\n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f"key : {', '.join(matched_kw[:5])}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f"liquidity ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```"
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

# ============================================================
# ============================================================
def scan_narratives():
 """scanfunction"""
 conn = init_db()
 tokens = fetch_new_tokens()
 
 log(f"scan {len(tokens)} ...")
 
 flap_tokens = []
 try:
 flap_tokens = fetch_flap_tokens()
 except:
 pass
 all_momentum_tokens = tokens + flap_tokens
 momentum_alerts = track_momentum(all_momentum_tokens)
 
 for token in tokens:
 addr = token['address']
 chain = token['chain']
 name = token['name']
 symbol = token['symbol']
 
 if is_token_seen(conn, addr):
 # updateseen_count
 c = conn.cursor()
 c.execute('UPDATE tokens_seen SET seen_count = seen_count + 1, market_cap = ? WHERE address = ?', (token['mc'], addr))
 theme_tmp = normalize_theme(name, symbol)
 if theme_tmp:
 c.execute('UPDATE narratives SET token_count = token_count + 1, last_seen_at = ? WHERE theme = ?', (int(time.time()), theme_tmp))
 conn.commit()
 continue
 
 category, matched_kw = classify_narrative(name, symbol, chain)
 
 if category == 'spam':
 record_token(conn, addr, chain, name, symbol, '', 'spam', token['mc'])
 continue
 
 min_mc = 1000
 min_liq = 500
 if token['mc'] < min_mc or token['liq'] < min_liq:
 record_token(conn, addr, chain, name, symbol, '', 'too_small', token['mc'])
 continue
 
 theme = normalize_theme(name, symbol)
 
 record_token(conn, addr, chain, name, symbol, theme, category, token['mc'])
 check_narrative_novelty(conn, theme, name, symbol, addr, chain)
 
 conn.close()
 
 pushed = 0
 for ma in momentum_alerts[:8]: # 8
 if tg_send(ma['msg']):
 pushed += 1
 time.sleep(1) # TGrate limiting
 
 return pushed, len(momentum_alerts)

# ============================================================
# ============================================================
def main():
 log("=" * 50)
 log("radar v1 ")
 log(f"scan: {SCAN_INTERVAL}s")
 log("=" * 50)
 
 # initializeDB
 init_db()
 
 tg_send(
 "radar v1 \n\n"
 "classificationlabels:\n"
 "★★★ / | Binance/CZ | FLAPcommunity\n"
 "★★ | FLAPcommunity | \n"
 f"scan: {SCAN_INTERVAL}"
 )
 
 scan_count = 0
 total_pushed = 0
 
 while True:
 try:
 scan_count += 1
 pushed, found = scan_narratives()
 total_pushed += pushed
 
 if pushed > 0:
 log(f"{scan_count}: {found}, {pushed} ({total_pushed})")
 else:
 if scan_count % 20 == 0: # 20
 log(f"{scan_count}: ({total_pushed})")
 
 except Exception as e:
 log(f"scan: {e}")
 
 time.sleep(SCAN_INTERVAL)

if __name__ == '__main__':
 main()

OI + Funding Rate Scanner #

Date: 2026.04.25 Tags: Python · Binance Futures · Telegram

Funding rate flip detection + OI surge

Snapshot-based scanner: detects funding rate flipping from positive to negative while OI is rising. Runs every 5 minutes.

Full source code #

#!/usr/bin/env python3
"""
"""

import requests
import json
import os
import time
import sys
from datetime import datetime, timedelta
from pathlib import Path

# ============ configuration ============
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env.oi"
ALERT_HISTORY_FILE = SCRIPT_DIR / "oi_funding_alerts.json"
FR_SNAPSHOT_FILE = SCRIPT_DIR / "fr_snapshot.json" # 

MIN_OI_CHANGE_PCT = 8 # OI8%
MIN_VOLUME_USDT = 0 # ,
MIN_FR_PERIODS_POSITIVE = 2 # 2
DEDUP_HOURS = 24 # 24

def load_env():
 env = {}
 if ENV_FILE.exists():
 for line in ENV_FILE.read_text().strip().split('\n'):
 if '=' in line and not line.startswith('#'):
 k, v = line.split('=', 1)
 env[k.strip()] = v.strip()
 return env

env = load_env()
TG_BOT_TOKEN = env.get('TG_BOT_TOKEN', '')
TG_CHAT_ID = env.get('TG_CHAT_ID', '')

def send_tg(text):
 if not TG_BOT_TOKEN or not TG_CHAT_ID:
 print("[TG] configuration, :")
 print(text)
 return
 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
 chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
 for chunk in chunks:
 try:
 resp = requests.post(url, json={
 'chat_id': TG_CHAT_ID,
 'text': chunk,
 'parse_mode': 'Markdown'
 }, timeout=10)
 if resp.status_code != 200:
 requests.post(url, json={
 'chat_id': TG_CHAT_ID,
 'text': chunk
 }, timeout=10)
 except Exception as e:
 print(f"[TG] send: {e}")

def load_alert_history():
 if ALERT_HISTORY_FILE.exists():
 try:
 return json.loads(ALERT_HISTORY_FILE.read_text())
 except:
 return {}
 return {}

def save_alert_history(history):
 ALERT_HISTORY_FILE.write_text(json.dumps(history))

def is_duplicate(symbol, history):
 if symbol not in history:
 return False
 last_alert = datetime.fromisoformat(history[symbol])
 return (datetime.now() - last_alert).total_seconds() < DEDUP_HOURS * 3600

def mark_alerted(symbol, history):
 history[symbol] = datetime.now().isoformat()
 cutoff = datetime.now() - timedelta(hours=DEDUP_HOURS * 2)
 history = {k: v for k, v in history.items() 
 if datetime.fromisoformat(v) > cutoff}
 return history

def load_fr_snapshot():
 if FR_SNAPSHOT_FILE.exists():
 try:
 return json.loads(FR_SNAPSHOT_FILE.read_text())
 except:
 pass
 return {}

def save_fr_snapshot(snapshot):
 FR_SNAPSHOT_FILE.write_text(json.dumps(snapshot))

# ============ core scan ============
def scan():
 ts_start = time.time()
 
 try:
 info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo', timeout=10).json()
 symbols = [s['symbol'] for s in info['symbols'] 
 if s['contractType'] == 'PERPETUAL' and s['quoteAsset'] == 'USDT' and s['status'] == 'TRADING']
 except Exception as e:
 print(f"[ERROR] exchangeInfo: {e}")
 return []
 
 try:
 tickers = requests.get('https://fapi.binance.com/fapi/v1/ticker/24hr', timeout=10).json()
 ticker_map = {t['symbol']: t for t in tickers}
 except Exception as e:
 print(f"[ERROR] ticker: {e}")
 return []
 
 active = [s for s in symbols if float(ticker_map.get(s, {}).get('quoteVolume', 0)) > MIN_VOLUME_USDT]
 
 try:
 fr_all = requests.get('https://fapi.binance.com/fapi/v1/premiumIndex', timeout=10).json()
 fr_current = {item['symbol']: float(item['lastFundingRate']) for item in fr_all}
 except:
 fr_current = {}
 
 prev_snapshot = load_fr_snapshot()
 
 save_fr_snapshot(fr_current)
 
 if not prev_snapshot:
 print(f"[{datetime.now().strftime('%H:%M:%S')}] run,,")
 return []
 
 just_turned_negative = []
 for sym in active:
 prev_fr = prev_snapshot.get(sym)
 curr_fr = fr_current.get(sym)
 if prev_fr is None or curr_fr is None:
 continue
 if prev_fr >= 0 and curr_fr < 0:
 just_turned_negative.append(sym)
 
 if not just_turned_negative:
 elapsed = time.time() - ts_start
 print(f"[{datetime.now().strftime('%H:%M:%S')}] scan: {len(active)}/{elapsed:.1f}s, ")
 return []
 
 print(f"[{datetime.now().strftime('%H:%M:%S')}] {len(just_turned_negative)} : {just_turned_negative}")
 
 signals = []
 for sym in just_turned_negative:
 try:
 oi_hist = requests.get('https://fapi.binance.com/futures/data/openInterestHist', 
 params={'symbol': sym, 'period': '1h', 'limit': 48}, timeout=10).json()
 
 oi_chg = 0
 segs = []
 oi_rising = False
 if oi_hist and len(oi_hist) >= 12:
 oi_values = [float(x['sumOpenInterestValue']) for x in oi_hist]
 seg_len = len(oi_values) // 4
 if seg_len >= 3:
 segs = [
 sum(oi_values[:seg_len]) / seg_len,
 sum(oi_values[seg_len:seg_len*2]) / seg_len,
 sum(oi_values[seg_len*2:seg_len*3]) / seg_len,
 sum(oi_values[seg_len*3:]) / max(1, len(oi_values[seg_len*3:]))
 ]
 oi_chg = (segs[3] - segs[0]) / segs[0] * 100 if segs[0] > 0 else 0
 oi_rising = oi_chg > 0
 
 t = ticker_map.get(sym, {})
 signals.append({
 'symbol': sym,
 'price': float(t.get('lastPrice', 0)),
 'price_chg_24h': float(t.get('priceChangePercent', 0)),
 'volume': float(t.get('quoteVolume', 0)),
 'oi_change': oi_chg,
 'oi_segments': segs,
 'oi_rising': oi_rising,
 'current_fr': fr_current.get(sym, 0),
 'prev_fr': prev_snapshot.get(sym, 0),
 })
 except:
 continue
 
 elapsed = time.time() - ts_start
 print(f"[{datetime.now().strftime('%H:%M:%S')}] scan: {len(active)}/{elapsed:.1f}s, : {len(signals)}")
 
 return signals

def get_square_discussion(coin):
 try:
 r = requests.get(
 "https://www.binance.com/bapi/composite/v4/friendly/pgc/content/queryByHashtag",
 params={"hashtag": f"#{coin.lower()}", "pageIndex": 1, "pageSize": 1, "orderBy": "HOT"},
 headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.binance.com/en/square"},
 timeout=8
 )
 if r.status_code == 200:
 ht = r.json().get("data", {}).get("hashtag", {})
 return ht.get("contentCount", 0), ht.get("viewCount", 0)
 except:
 pass
 return 0, 0

def get_market_caps():
 mcap = {}
 try:
 r = requests.get(
 "https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
 timeout=10
 )
 if r.status_code == 200:
 for item in r.json().get("data", []):
 name = item.get("name", "")
 mc = item.get("marketCap", 0)
 if name and mc:
 mcap[name] = float(mc)
 except:
 pass
 return mcap

def get_spot_symbols():
 try:
 info = requests.get("https://api.binance.com/api/v3/exchangeInfo", timeout=10).json()
 return {s["baseAsset"] for s in info["symbols"]
 if s["quoteAsset"] == "USDT" and s["status"] == "TRADING"}
 except:
 return set()

def fmt_mcap(v):
 if v >= 1e9: return f"${v/1e9:.2f}B"
 if v >= 1e6: return f"${v/1e6:.1f}M"
 if v >= 1e3: return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def fmt_views(v):
 if v >= 1e6: return f"{v/1e6:.1f}M"
 if v >= 1e3: return f"{v/1e3:.0f}K"
 return str(v)

def format_alert(signals):
 if not signals:
 return None
 
 signals.sort(key=lambda x: (-int(x.get('oi_rising', False)), x['current_fr']))
 
 mcap_map = get_market_caps()
 spot_set = get_spot_symbols()
 
 now = datetime.now().strftime('%m-%d %H:%M')
 lines = [f"*[ +OI ]* {now}\n"]
 
 for s in signals:
 coin = s['symbol'].replace('USDT', '')
 
 fr_change = f"{s['prev_fr']:+.4%} -> {s['current_fr']:+.4%}"
 
 mcap = mcap_map.get(coin, 0)
 has_spot = coin in spot_set
 sq_posts, sq_views = get_square_discussion(coin)
 
 lines.append(f"```")
 lines.append(f"{coin}")
 lines.append(f" : {s['price']:.4f} 24h: {s['price_chg_24h']:+.1f}%")
 lines.append(f" : {fr_change}")
 if s['oi_segments']:
 oi_segs = ' > '.join([f"{v/1e6:.1f}M" for v in s['oi_segments']])
 lines.append(f" OI: +{s['oi_change']:.1f}% ({oi_segs})")
 lines.append(f" : ${s['volume']/1e6:.1f}M")
 lines.append(f" : {fmt_mcap(mcap) if mcap > 0 else ''} : {'' if has_spot else 'futures'}")
 if sq_posts > 0:
 lines.append(f" : {sq_posts} / {fmt_views(sq_views)}")
 else:
 lines.append(f" : ")
 lines.append(f"```")
 
 return '\n'.join(lines)

def main():
 signals = scan()
 
 if signals:
 strong = [s for s in signals if s['current_fr'] < 0 and s.get('oi_rising')]
 if strong:
 msg = format_alert(strong)
 if msg:
 send_tg(msg)
 print(f" {len(strong)} ({len(signals)}, {len(strong)}OI)")
 else:
 print(f" {len(signals)} but OI, ")
 else:
 print(f" ")

if __name__ == '__main__':
 main()

Accumulation Radar #

Date: 2026.04.25 Tags: Python · Binance · CoinGlass · Telegram

Momentum + OI anomaly + smart alerts

Hourly scan: top gainers momentum tracking, OI anomaly detection, and Telegram push. Pure Python, zero AI cost.

Full source code #

#!/usr/bin/env python3
"""
longradar v2 — ++OI scan

data:BinancefuturesAPI + CoinGecko Trending()
"""

import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
from square_heat import get_square_heat

env_file = Path(__file__).parent / ".env.oi"
if env_file.exists():
 with open(env_file) as f:
 for line in f:
 line = line.strip()
 if line and not line.startswith("#") and "=" in line:
 k, v = line.split("=", 1)
 os.environ.setdefault(k.strip(), v.strip())

# === configuration ===
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "YOUR_CHAT_ID")
FAPI = "https://fapi.binance.com"

HEAT_HISTORY_FILE = Path(__file__).parent / "heat_history.json"

VOL_SURGE_MULT = 2.5 # 2.5above=
MIN_VOL_USD = 20_000_000 # >$20Mdetection

MIN_OI_DELTA_PCT = 3.0 # OI3%
MIN_OI_USD = 2_000_000 # OI $2M

def api_get(endpoint, params=None):
 """BinanceAPI"""
 url = f"{FAPI}{endpoint}"
 for attempt in range(3):
 try:
 resp = requests.get(url, params=params, timeout=10)
 if resp.status_code == 200:
 return resp.json()
 elif resp.status_code == 429:
 time.sleep(2)
 else:
 return None
 except:
 time.sleep(1)
 return None

def format_usd(v):
 if v >= 1e9: return f"${v/1e9:.1f}B"
 if v >= 1e6: return f"${v/1e6:.1f}M"
 if v >= 1e3: return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def mcap_str(v):
 if v >= 1e9: return f"${v/1e9:.1f}B"
 if v >= 1e6: return f"${v/1e6:.0f}M"
 if v >= 1e3: return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def send_telegram(text):
 """sendTG"""
 if not TG_BOT_TOKEN:
 print("\n[TG] No token, stdout:\n")
 print(text)
 return

 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"

 chunks = []
 current = ""
 for line in text.split("\n"):
 if len(current) + len(line) + 1 > 3800:
 chunks.append(current)
 current = line
 else:
 current += "\n" + line if current else line
 if current:
 chunks.append(current)

 for chunk in chunks:
 try:
 resp = requests.post(url, json={
 "chat_id": TG_CHAT_ID,
 "text": chunk,
 "parse_mode": "Markdown"
 }, timeout=10)
 if resp.status_code == 200:
 print(f"[TG] Sent ✓ ({len(chunk)} chars)")
 else:
 resp2 = requests.post(url, json={
 "chat_id": TG_CHAT_ID,
 "text": chunk.replace("*", "").replace("_", ""),
 }, timeout=10)
 print(f"[TG] Sent plain ({'✓' if resp2.status_code == 200 else '✗'})")
 except Exception as e:
 print(f"[TG] Error: {e}")
 time.sleep(0.5)

def main():
 print(f"🔥 longradar v2 — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

 tickers_raw = api_get("/fapi/v1/ticker/24hr")
 premiums_raw = api_get("/fapi/v1/premiumIndex")

 if not tickers_raw or not premiums_raw:
 print("❌ API")
 return

 ticker_map = {}
 for t in tickers_raw:
 if t["symbol"].endswith("USDT"):
 ticker_map[t["symbol"]] = {
 "px_chg": float(t["priceChangePercent"]),
 "vol": float(t["quoteVolume"]),
 "price": float(t["lastPrice"]),
 }

 funding_map = {}
 for p in premiums_raw:
 if p["symbol"].endswith("USDT"):
 funding_map[p["symbol"]] = float(p["lastFundingRate"])

 mcap_map = {}
 try:
 r = requests.get(
 "https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
 timeout=10
 )
 if r.status_code == 200:
 for item in r.json().get("data", []):
 name = item.get("name", "")
 mc = item.get("marketCap", 0)
 if name and mc:
 mcap_map[name] = float(mc)
 print(f"✅ : {len(mcap_map)}")
 except Exception as e:
 print(f"⚠️ API: {e}")

 heat_map = {}
 cg_trending = set()
 square_trending = set()

 sq_coins = get_square_heat()
 if sq_coins:
 for i, c in enumerate(sq_coins):
 coin = c["coin"]
 square_trending.add(coin)
 rank_score = max(50 - i * 4, 10)
 if c.get("rapidRiser"):
 rank_score += 15
 heat_map[coin] = heat_map.get(coin, 0) + rank_score
 print(f"🏦 : {len(square_trending)} {[c['coin'] for c in sq_coins[:5]]}")

 # 3b. CoinGecko Trending
 try:
 r = requests.get("https://api.coingecko.com/api/v3/search/trending", timeout=10)
 if r.status_code == 200:
 for item in r.json().get("coins", []):
 sym = item["item"]["symbol"].upper()
 rank = item["item"].get("score", 99)
 cg_trending.add(sym)
 heat_map[sym] = heat_map.get(sym, 0) + max(50 - rank * 3, 10)
 print(f"🌐 CG Trending: {len(cg_trending)}")
 except Exception as e:
 print(f"⚠️ CG Trending: {e}")

 vol_surge_coins = set()
 for sym, tk in ticker_map.items():
 coin = sym.replace("USDT", "")
 vol_24h = tk["vol"]
 if vol_24h > MIN_VOL_USD:
 kl = api_get("/fapi/v1/klines", {"symbol": sym, "interval": "1d", "limit": 8})
 if kl and len(kl) >= 5:
 avg_prev = sum(float(k[7]) for k in kl[:-1]) / (len(kl) - 1)
 if avg_prev > 0:
 ratio = vol_24h / avg_prev
 if ratio >= VOL_SURGE_MULT:
 vol_surge_coins.add(coin)
 heat_map[coin] = heat_map.get(coin, 0) + min(ratio * 10, 50)
 time.sleep(0.05)

 print(f"📈 (≥{VOL_SURGE_MULT}x): {len(vol_surge_coins)}")

 dual_heat = cg_trending & vol_surge_coins
 square_vol = square_trending & vol_surge_coins
 triple_heat = cg_trending & vol_surge_coins & square_trending
 
 all_multi_heat = dual_heat | square_vol
 if all_multi_heat:
 for coin in all_multi_heat:
 heat_map[coin] = heat_map.get(coin, 0) + 20
 if triple_heat:
 for coin in triple_heat:
 heat_map[coin] = heat_map.get(coin, 0) + 30 # 
 print(f"🔥🔥🔥 : {triple_heat}")
 else:
 print(f"🔥🔥 : {all_multi_heat}")

 scan_syms = set()
 for coin in heat_map:
 sym = coin + "USDT"
 if sym in ticker_map:
 scan_syms.add(sym)
 top_by_vol = sorted(ticker_map.items(), key=lambda x: x[1]["vol"], reverse=True)[:100]
 for sym, _ in top_by_vol:
 scan_syms.add(sym)

 oi_map = {}
 for i, sym in enumerate(scan_syms):
 oi_hist = api_get("/futures/data/openInterestHist", {
 "symbol": sym, "period": "1h", "limit": 6
 })
 if oi_hist and len(oi_hist) >= 2:
 curr = float(oi_hist[-1]["sumOpenInterestValue"])
 prev_1h = float(oi_hist[-2]["sumOpenInterestValue"])
 prev_6h = float(oi_hist[0]["sumOpenInterestValue"])
 d1h = ((curr - prev_1h) / prev_1h * 100) if prev_1h > 0 else 0
 d6h = ((curr - prev_6h) / prev_6h * 100) if prev_6h > 0 else 0
 oi_map[sym] = {"oi_usd": curr, "d1h": d1h, "d6h": d6h}
 if (i + 1) % 10 == 0:
 time.sleep(0.5)

 print(f"📊 OIscan: {len(oi_map)}")

 all_syms = set(list(ticker_map.keys()))
 coin_data = {}
 for sym in all_syms:
 tk = ticker_map.get(sym, {})
 if not tk:
 continue
 oi = oi_map.get(sym, {})
 fr = funding_map.get(sym, 0)
 coin = sym.replace("USDT", "")

 d6h = oi.get("d6h", 0)
 fr_pct = fr * 100
 oi_usd = oi.get("oi_usd", 0)

 if coin in mcap_map:
 est_mcap = mcap_map[coin]
 else:
 est_mcap = max(tk["vol"] * 0.3, oi_usd * 2) if oi_usd > 0 else tk["vol"] * 0.3

 heat = heat_map.get(coin, 0)

 coin_data[sym] = {
 "coin": coin, "sym": sym,
 "px_chg": tk["px_chg"], "vol": tk["vol"],
 "fr_pct": fr_pct, "d6h": d6h,
 "oi_usd": oi_usd, "est_mcap": est_mcap,
 "heat": heat,
 "in_cg": coin in cg_trending,
 "in_sq": coin in square_trending,
 "vol_surge": coin in vol_surge_coins,
 }

 # ═══════════════════════════════════════
 # ═══════════════════════════════════════
 hot_coins = sorted(
 [d for d in coin_data.values() if d["heat"] > 0],
 key=lambda x: x["heat"], reverse=True
 )

 heat_history = {}
 if HEAT_HISTORY_FILE.exists():
 try:
 heat_history = json.loads(HEAT_HISTORY_FILE.read_text())
 except:
 pass

 now_ts = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
 new_entries = [] # 
 for s in hot_coins:
 coin = s["coin"]
 if coin not in heat_history:
 heat_history[coin] = {"first_seen": now_ts, "price": s.get("px_chg", 0)}
 sources = []
 if s["in_sq"]: sources.append("")
 if s["in_cg"]: sources.append("CG")
 if s["vol_surge"]: sources.append("")
 new_entries.append({"coin": coin, "sources": sources, "data": s})

 cutoff = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=7)).strftime("%Y-%m-%d")
 heat_history = {k: v for k, v in heat_history.items()
 if v.get("first_seen", "9999") >= cutoff}

 HEAT_HISTORY_FILE.write_text(json.dumps(heat_history, indent=2, ensure_ascii=False))

 # ═══════════════════════════════════════
 # ═══════════════════════════════════════
 chase = []
 for sym, d in coin_data.items():
 if d["px_chg"] > 3 and d["fr_pct"] < -0.005 and d["vol"] > 1_000_000:
 fr_hist = api_get("/fapi/v1/fundingRate", {"symbol": sym, "limit": 5})
 fr_rates = [float(f["fundingRate"]) * 100 for f in fr_hist] if fr_hist else [d["fr_pct"]]
 fr_prev = fr_rates[-2] if len(fr_rates) >= 2 else d["fr_pct"]
 fr_delta = d["fr_pct"] - fr_prev

 trend = "" if fr_delta < -0.05 else "" if fr_delta < -0.01 else "" if abs(fr_delta) < 0.01 else ""

 chase.append({**d, "fr_delta": fr_delta, "trend": trend,
 "rates": " → ".join([f"{x:.3f}" for x in fr_rates[-3:]])})
 time.sleep(0.2)

 chase.sort(key=lambda x: x["fr_pct"])

 # ═══════════════════════════════════════
 # ═══════════════════════════════════════
 now = datetime.now(timezone(timedelta(hours=8)))
 lines = [
 f"**longradar**",
 f"{now.strftime('%Y-%m-%d %H:%M')} CST",
 ]

 if new_entries:
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
 tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
 for e in new_entries:
 s = e["data"]
 src_str = "/".join(e["sources"])
 tbl.append(f"{s['coin']:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
 tbl.append("```")
 lines.append("\n".join(tbl))

 if hot_coins:
 lines.append(f"\n**[ ]**")
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
 tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
 for s in hot_coins[:10]:
 sources = []
 if s["in_sq"]: sources.append("")
 if s["in_cg"]: sources.append("CG")
 if s["vol_surge"]: sources.append("")
 extra = []
 if abs(s["d6h"]) >= 3: extra.append(f"OI{s['d6h']:+.0f}%")
 if s["fr_pct"] < -0.03: extra.append(f"{s['fr_pct']:.2f}%")
 src_str = "/".join(sources)
 if extra:
 src_str += " " + " ".join(extra)
 coin_name = s['coin']
 tbl.append(f"{coin_name:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
 tbl.append("```")
 lines.append("\n".join(tbl))
 else:
 lines.append("\n**[ ]** ")

 lines.append(f"\n**[ ]** +=")
 if chase:
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>10} {'':>8} {'':>7} {'':>8}")
 tbl.append(f"{'-'*10} {'-'*10} {'-'*8} {'-'*7} {'-'*8}")
 for s in chase[:8]:
 tbl.append(
 f"{s['coin']:<10} {s['fr_pct']:>+9.3f}% {s['trend']:>8} {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
 )
 tbl.append("```")
 lines.append("\n".join(tbl))
 else:
 lines.append(" condition")

 oi_alerts = []
 for sym, oi in oi_map.items():
 if abs(oi["d6h"]) >= 8:
 d = coin_data.get(sym)
 if d and d["heat"] == 0:
 oi_alerts.append(d)
 oi_alerts.sort(key=lambda x: abs(x["d6h"]), reverse=True)

 if oi_alerts:
 lines.append(f"\n**[ OI ]** 6positions>=8%")
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>4} {'OI':>8} {'':>7} {'':>8}")
 tbl.append(f"{'-'*10} {'-'*4} {'-'*8} {'-'*7} {'-'*8}")
 for s in oi_alerts[:6]:
 direction = "" if s["d6h"] > 0 else ""
 tbl.append(
 f"{s['coin']:<10} {direction:>4} {s['d6h']:>+7.1f}% {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
 )
 tbl.append("```")
 lines.append("\n".join(tbl))

 highlights = []

 hot_oi = [d for d in coin_data.values() if d["heat"] > 0 and d["d6h"] > 5]
 for s in sorted(hot_oi, key=lambda x: x["d6h"], reverse=True)[:3]:
 highlights.append(f"{s['coin']} — +OI{s['d6h']:+.0f}%,")

 hot_fuel = [d for d in coin_data.values() if d["heat"] > 0 and d["fr_pct"] < -0.03]
 for s in sorted(hot_fuel, key=lambda x: x["fr_pct"])[:2]:
 if s["coin"] not in " ".join(highlights):
 highlights.append(f"{s['coin']} — +{s['fr_pct']:.2f}%,")

 chase_fire = [s for s in chase[:5] if "" in s.get("trend", "")]
 for s in chase_fire[:2]:
 if s["coin"] not in " ".join(highlights):
 highlights.append(f"{s['coin']}{s['fr_pct']:.3f}%,short squeeze")

 if highlights:
 lines.append(f"\n**[ ]**")
 for h in highlights[:5]:
 lines.append(f" {h}")

 lines.append(f"\n=Binancesearch / CG=CoinGecko")

 report = "\n".join(lines)
 send_telegram(report)
 print("\n✅ ")

if __name__ == "__main__":
 main()

Binance Alpha Monitor #

Date: 2026.04.23 Tags: Python · Binance · Claude AI · Telegram

WebSocket + AI analysis + Telegram alerts

Auto-detects new Binance Alpha listings, analyzes token quality with Claude AI, and sends alerts via Telegram.

Full source code #

#!/usr/bin/env python3
"""
Binance Alpha Monitor v2 — 
REST + intelligent filter + + TG
API Key,zero AI cost(rule engine)

run: python3 alpha_monitor.py
"""

import asyncio
import hashlib
import json
import logging
import os
import re
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

import httpx

# ============================================================
# configuration
# ============================================================

BASE_DIR = Path(__file__).parent
DB_PATH = str(BASE_DIR / "data" / "alpha.db")

# TG
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")

def _load_anthropic_key():
 """fromauth.jsonvariableAPI key"""
 # 1. hermes auth.json credential pool
 try:
 auth_file = os.path.expanduser("~/.hermes/auth.json")
 if os.path.exists(auth_file):
 with open(auth_file) as f:
 auth = json.load(f)
 pool = auth.get("credential_pool", {}).get("anthropic", [])
 if pool:
 key = pool[0].get("access_token", "")
 if key and key != "***":
 return key
 except Exception:
 pass
 return os.environ.get("ANTHROPIC_API_KEY", "")

ANTHROPIC_API_KEY = _load_anthropic_key()
ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")

ANNOUNCEMENT_POLL_INTERVAL = 30 # 30
AGGREGATION_POLL_INTERVAL = 15 # 15
MONITOR_POLL_INTERVAL = 120 # monitoring2

# HTTP
HEADERS = {
 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
 "Accept": "application/json",
 "Accept-Language": "en-US,en;q=0.9",
}

BINANCE_ANNOUNCEMENT_API = "https://www.binance.com/bapi/composite/v1/public/cms/article/list/query"

# ============================================================
# logging
# ============================================================

logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
 datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("alpha")

# ============================================================
# ============================================================

TRIGGER_KEYWORDS = [
 "alpha", "", "airdrop", "tge", "token generation",
 "", "will list", "will launch",
 "", "exclusive", "binance wallet", "hodler",
]

EXCLUDE_KEYWORDS = [
 "delisting", "delist", "", "deprecate", "",
 "maintenance", "",
 "launchpool", "megadrop",
 "buyback", "",
 "perpetual contract", # futures
 "futures will launch", # 
 "usdⓢ-margined", # Ufutures
 "coin-margined", # futures
 "margin will add", # 
 "trading bots services", # trading
 "trading pairs", # trading()
]

ALPHA_BOX_KEYWORDS = ["alpha box", "", "mystery box"]

# ============================================================
# ============================================================

TIER1_VCS = [
 "binance labs", "yzi labs",
 "coinbase ventures", "a16z", "andreessen horowitz", "paradigm",
 "polychain", "polychain capital", "sequoia", "sequoia china", "sequoia capital",
 "multicoin", "multicoin capital", "pantera", "pantera capital",
 "dragonfly", "dragonfly capital", "founders fund",
]

TIER2_VCS = [
 "abcde", "iosg", "hashkey", "okx ventures",
 "sevenx", "folius", "foresight", "hashed",
 "bitkraft", "framework", "framework ventures",
 "delphi", "delphi digital", "electric capital",
 "variant", "1kx", "placeholder",
 "animoca", "animoca brands", "jump", "jump crypto",
 "hack vc", "bain capital",
]

HOT_NARRATIVES = ["defi_perp", "ai_agent", "ai_defi", "defai", "zk_proof"]
WEAK_NARRATIVES = ["gamefi", "meme", "social"]

BINANCE_DARLING_KEYWORDS = ["yzi labs", "binance labs"]

# ============================================================
# ============================================================

TIER_ICONS = {"S": "🟢🟢🟢", "A": "🟡🟡", "B": "🟠", "C": "⚪"}
TIER_LABELS = {"S": "S ()", "A": "A ()", "B": "B ()", "C": "C ()"}

def count_vc_tier(vcs: list, vc_list: list) -> int:
 count = 0
 vcs_lower = [v.lower() for v in vcs]
 for t in vc_list:
 if any(t in v for v in vcs_lower):
 count += 1
 return count

def rate_project(circ_mcap: float, fdv: float, vcs: list,
 narrative: str, is_darling: bool) -> dict:
 """S/A/B/C """
 t1 = count_vc_tier(vcs, TIER1_VCS)
 t2 = count_vc_tier(vcs, TIER2_VCS)
 hot = narrative in HOT_NARRATIVES
 weak = narrative in WEAK_NARRATIVES
 circ_mcap = circ_mcap or 0
 fdv = fdv or 0

 warnings = []
 if weak:
 warnings.append(f"⚠️ {narrative} ")

 if is_darling:
 return {"tier": "S", "reason": "Binance(YZi/Binance Labs/CZ)", "warnings": warnings}
 if hot and t1 >= 1 and fdv < 500_000_000:
 return {"tier": "S", "reason": f"({narrative})+ Tier1 VC", "warnings": warnings}
 if t1 >= 2 and circ_mcap < 50_000_000 and fdv < 300_000_000:
 return {"tier": "S", "reason": "≥2 Tier1 ", "warnings": warnings}
 if t1 >= 1 and circ_mcap < 10_000_000 and fdv < 100_000_000:
 return {"tier": "S", "reason": "Tier1 ", "warnings": warnings}
 if hot and circ_mcap < 10_000_000 and fdv < 50_000_000:
 return {"tier": "S", "reason": f"({narrative})", "warnings": warnings}

 if t1 >= 1 and circ_mcap < 20_000_000 and fdv < 200_000_000:
 return {"tier": "A", "reason": "Tier1 ", "warnings": warnings}

 if circ_mcap < 50_000_000 and fdv < 500_000_000:
 return {"tier": "B", "reason": "", "warnings": warnings}

 return {"tier": "C", "reason": "/", "warnings": warnings}

# ============================================================
# database
# ============================================================

def init_db():
 Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
 conn = sqlite3.connect(DB_PATH)
 c = conn.cursor()
 c.execute("""
 CREATE TABLE IF NOT EXISTS projects (
 id TEXT PRIMARY KEY,
 symbol TEXT NOT NULL,
 name TEXT,
 launch_time TEXT,
 source TEXT,
 raw_text TEXT,
 tier TEXT DEFAULT 'PENDING',
 tier_reason TEXT,
 narrative TEXT,
 narrative_desc TEXT,
 vcs_json TEXT DEFAULT '[]',
 is_darling INTEGER DEFAULT 0,
 open_price REAL,
 total_supply REAL,
 circulating_supply REAL,
 fdv REAL,
 circulating_mcap REAL,
 excluded INTEGER DEFAULT 0,
 exclude_reason TEXT,
 discovered_at TEXT,
 updated_at TEXT
 )""")
 c.execute("""
 CREATE TABLE IF NOT EXISTS pushes (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 project_id TEXT NOT NULL,
 push_type TEXT,
 sent_at TEXT,
 content TEXT
 )""")
 c.execute("""
 CREATE TABLE IF NOT EXISTS snapshots (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 project_id TEXT NOT NULL,
 timestamp TEXT NOT NULL,
 price REAL,
 circulating_mcap REAL,
 fdv REAL
 )""")
 conn.commit()
 conn.close()

def project_id(symbol: str, date_str: str) -> str:
 return hashlib.md5(f"{symbol.upper()}_{date_str}".encode()).hexdigest()[:16]

def project_exists(pid: str) -> bool:
 conn = sqlite3.connect(DB_PATH)
 exists = conn.execute("SELECT 1 FROM projects WHERE id=?", (pid,)).fetchone() is not None
 conn.close()
 return exists

def save_project(project: dict):
 conn = sqlite3.connect(DB_PATH)
 now = datetime.utcnow().isoformat()
 conn.execute("""
 INSERT OR IGNORE INTO projects
 (id, symbol, name, launch_time, source, raw_text, tier, tier_reason,
 narrative, narrative_desc, vcs_json, is_darling,
 open_price, total_supply, circulating_supply, fdv, circulating_mcap,
 excluded, exclude_reason, discovered_at, updated_at)
 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
 """, (
 project["id"], project["symbol"], project.get("name"),
 project.get("launch_time"), project.get("source"), project.get("raw_text"),
 project.get("tier", "PENDING"), project.get("tier_reason"),
 project.get("narrative"), project.get("narrative_desc"),
 json.dumps(project.get("vcs", [])), int(project.get("is_darling", False)),
 project.get("open_price"), project.get("total_supply"),
 project.get("circulating_supply"), project.get("fdv"),
 project.get("circulating_mcap"),
 int(project.get("excluded", 0)), project.get("exclude_reason"),
 now, now,
 ))
 conn.commit()
 conn.close()

def update_project(pid: str, fields: dict):
 if not fields:
 return
 conn = sqlite3.connect(DB_PATH)
 fields["updated_at"] = datetime.utcnow().isoformat()
 set_parts = [f"{k}=?" for k in fields]
 values = list(fields.values()) + [pid]
 conn.execute(f"UPDATE projects SET {','.join(set_parts)} WHERE id=?", values)
 conn.commit()
 conn.close()

def get_project(pid: str) -> Optional[dict]:
 conn = sqlite3.connect(DB_PATH)
 conn.row_factory = sqlite3.Row
 row = conn.execute("SELECT * FROM projects WHERE id=?", (pid,)).fetchone()
 conn.close()
 return dict(row) if row else None

def list_pending() -> list:
 conn = sqlite3.connect(DB_PATH)
 conn.row_factory = sqlite3.Row
 rows = conn.execute(
 "SELECT * FROM projects WHERE excluded=0 AND tier='PENDING' ORDER BY discovered_at"
 ).fetchall()
 conn.close()
 return [dict(r) for r in rows]

def list_active() -> list:
 """monitoringproject(PENDINGEXCLUDED,launch_time)"""
 conn = sqlite3.connect(DB_PATH)
 conn.row_factory = sqlite3.Row
 rows = conn.execute("""
 SELECT * FROM projects
 WHERE excluded=0 AND launch_time IS NOT NULL AND launch_time != ''
 AND tier NOT IN ('PENDING', 'EXCLUDED', 'ERROR')
 """).fetchall()
 conn.close()
 return [dict(r) for r in rows]

def has_pushed(pid: str, push_type: str) -> bool:
 conn = sqlite3.connect(DB_PATH)
 exists = conn.execute(
 "SELECT 1 FROM pushes WHERE project_id=? AND push_type=?", (pid, push_type)
 ).fetchone() is not None
 conn.close()
 return exists

def log_push(pid: str, push_type: str, content: str):
 conn = sqlite3.connect(DB_PATH)
 conn.execute(
 "INSERT INTO pushes (project_id, push_type, sent_at, content) VALUES (?,?,?,?)",
 (pid, push_type, datetime.utcnow().isoformat(), content)
 )
 conn.commit()
 conn.close()

def save_snapshot(pid: str, price: float, mcap: float, fdv: float):
 conn = sqlite3.connect(DB_PATH)
 conn.execute(
 "INSERT INTO snapshots (project_id, timestamp, price, circulating_mcap, fdv) VALUES (?,?,?,?,?)",
 (pid, datetime.utcnow().isoformat(), price, mcap, fdv)
 )
 conn.commit()
 conn.close()

# ============================================================
# ============================================================

async def send_tg(text: str, silent: bool = False) -> bool:
 if not TG_BOT_TOKEN or not TG_CHAT_ID:
 logger.error("TG_BOT_TOKEN TG_CHAT_ID configuration")
 return False
 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
 payload = {
 "chat_id": TG_CHAT_ID,
 "text": text,
 "parse_mode": "HTML",
 "disable_notification": silent,
 }
 try:
 async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
 resp = await client.post(url, json=payload)
 if resp.status_code != 200:
 logger.error(f"TGsend {resp.status_code}: {resp.text[:200]}")
 return False
 return True
 except Exception as e:
 logger.error(f"TGsend: {e}")
 return False

# ============================================================
# ============================================================

def is_trigger(title: str) -> tuple[bool, Optional[str]]:
 t = title.lower()
 for kw in EXCLUDE_KEYWORDS:
 if kw.lower() in t:
 return False, f": {kw}"
 for kw in ALPHA_BOX_KEYWORDS:
 if kw.lower() in t:
 return False, "Alpha Box "
 for kw in TRIGGER_KEYWORDS:
 if kw.lower() in t:
 return True, None
 return False, None

def extract_symbol(title: str) -> Optional[str]:
 m = re.search(r"\(([A-Z0-9]{2,10})\)", title)
 if m:
 return m.group(1)
 m = re.search(r"(([A-Z0-9]{2,10}))", title)
 if m:
 return m.group(1)
 return None

def extract_name(title: str) -> Optional[str]:
 patterns = [
 r"(?:|List|list|Launch|launch|featured)\s+([A-Za-z0-9 ]+?)\s*[\((]",
 ]
 for p in patterns:
 m = re.search(p, title, re.IGNORECASE)
 if m:
 return m.group(1).strip()
 return None

# ============================================================
# ============================================================

async def fetch_announcements(limit: int = 20) -> list:
 all_articles = []
 # 48: New Cryptocurrency Listing, 161: Latest Activities, 93: Latest News
 for catalog_id in [48, 161, 93]:
 params = {"type": 1, "catalogId": catalog_id, "pageNo": 1, "pageSize": limit}
 try:
 async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
 resp = await client.get(BINANCE_ANNOUNCEMENT_API, params=params)
 resp.raise_for_status()
 data = resp.json()
 for catalog in data.get("data", {}).get("catalogs", []):
 for a in catalog.get("articles", []):
 a["_catalog_id"] = catalog_id
 all_articles.append(a)
 except Exception as e:
 logger.warning(f"classification {catalog_id} : {e}")

 seen = set()
 unique = []
 for a in all_articles:
 code = a.get("code")
 if code and code not in seen:
 seen.add(code)
 unique.append(a)
 return unique

# ============================================================
# CoinGecko data
# ============================================================

async def fetch_coingecko(symbol: str, project_name: str = "") -> dict:
 """CoinGeckotokendata。matching:symbolmatching + projectmatching"""
 result = {"found": False, "price": None, "fdv": None, "mcap": None,
 "total_supply": None, "circ_supply": None, "chain": None, "contract": None}
 try:
 async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
 queries = [symbol]
 if project_name and project_name.upper() != symbol.upper():
 queries.append(project_name)

 coin_id = None
 best_rank = 999999
 name_exact_match = None # projectmatching

 for query in queries:
 resp = await client.get("https://api.coingecko.com/api/v3/search",
 params={"query": query})
 if resp.status_code != 200:
 continue
 coins = resp.json().get("coins", [])

 for c in coins:
 c_sym = c.get("symbol", "").upper()
 c_name = c.get("name", "").lower()
 c_rank = c.get("market_cap_rank") or 999999

 if project_name and c_name == project_name.lower():
 name_exact_match = c["id"]

 if c_sym == symbol.upper():
 if c_rank < best_rank:
 coin_id = c["id"]
 best_rank = c_rank

 if project_name and project_name.lower() in c_name:
 if c_rank < best_rank:
 coin_id = c["id"]
 best_rank = c_rank

 if name_exact_match:
 coin_id = name_exact_match

 if not coin_id:
 logger.info(f"CoinGecko {symbol}/{project_name}")
 return result

 logger.info(f"CoinGeckomatching: {symbol} -> {coin_id} (rank={best_rank})")

 resp2 = await client.get(
 f"https://api.coingecko.com/api/v3/coins/{coin_id}",
 params={"localization": "false", "tickers": "false",
 "market_data": "true", "community_data": "false",
 "developer_data": "false"}
 )
 if resp2.status_code == 429:
 await asyncio.sleep(5)
 resp2 = await client.get(
 f"https://api.coingecko.com/api/v3/coins/{coin_id}",
 params={"localization": "false", "tickers": "false",
 "market_data": "true", "community_data": "false",
 "developer_data": "false"}
 )
 if resp2.status_code != 200:
 return result
 d = resp2.json()
 md = d.get("market_data", {})
 result.update({
 "found": True,
 "price": (md.get("current_price") or {}).get("usd"),
 "fdv": (md.get("fully_diluted_valuation") or {}).get("usd"),
 "mcap": (md.get("market_cap") or {}).get("usd"),
 "total_supply": md.get("total_supply"),
 "circ_supply": md.get("circulating_supply"),
 })
 result["categories"] = d.get("categories", [])
 result["description"] = (d.get("description") or {}).get("en", "")[:500]
 platforms = d.get("platforms", {})
 for chain, addr in platforms.items():
 if addr:
 result["chain"] = chain
 result["contract"] = addr
 break

 if not result["price"]:
 binance_price = await _fetch_binance_price(symbol, client)
 if binance_price:
 result["price"] = binance_price
 ts = result.get("total_supply") or 0
 cs = result.get("circ_supply") or 0
 if ts > 0:
 result["fdv"] = binance_price * ts
 if cs > 0:
 result["mcap"] = binance_price * cs
 logger.info(f"Binance {symbol}: ${binance_price}, FDV=${result.get('fdv',0):,.0f}")

 except Exception as e:
 logger.warning(f"CoinGeckoquery {symbol}: {e}")
 return result

async def _fetch_binance_price(symbol: str, client: httpx.AsyncClient) -> float:
 """fromBinancefutures"""
 pair = f"{symbol.upper()}USDT"
 try:
 resp = await client.get(f"https://api.binance.com/api/v3/ticker/price",
 params={"symbol": pair})
 if resp.status_code == 200:
 return float(resp.json()["price"])
 except Exception:
 pass
 # 2. futures
 try:
 resp = await client.get(f"https://fapi.binance.com/fapi/v1/ticker/price",
 params={"symbol": pair})
 if resp.status_code == 200:
 return float(resp.json()["price"])
 except Exception:
 pass
 return 0.0

# ============================================================
# ============================================================

async def llm_extract(raw_text: str, symbol: str, name: str = "", cg_data: dict = None) -> dict:
 """LLMfrom+CoinGeckodata/VC/"""
 fallback = {
 "narrative": "unknown", "narrative_desc": "",
 "vcs": [], "is_darling": False, "exclude_reason": None,
 }

 cg_data = cg_data or {}
 categories = cg_data.get("categories", [])
 description = cg_data.get("description", "")

 darling_cats = [c for c in categories if any(kw in c.lower() for kw in ["yzi labs", "binance labs"])]
 if darling_cats:
 fallback["is_darling"] = True

 if not ANTHROPIC_API_KEY:
 t = raw_text.lower()
 for kw in BINANCE_DARLING_KEYWORDS:
 if kw in t:
 fallback["is_darling"] = True
 cat_str = " ".join(categories).lower()
 if "defi" in cat_str: fallback["narrative"] = "defi"
 elif "ai" in cat_str: fallback["narrative"] = "ai_agent"
 elif "gaming" in cat_str or "gamefi" in cat_str: fallback["narrative"] = "gamefi"
 elif "meme" in cat_str: fallback["narrative"] = "meme"
 elif "rwa" in cat_str or "real world" in cat_str: fallback["narrative"] = "rwa"
 return fallback

 extra_context = ""
 if categories:
 extra_context += f"\nCoinGeckoclassification: {', '.join(categories)}"
 if description:
 extra_context += f"\nproject: {description[:300]}"
 if cg_data.get("found"):
 extra_context += f"\ndata: FDV=${cg_data.get('fdv',0):,.0f}, MCap=${cg_data.get('mcap',0):,.0f}, =${cg_data.get('price',0)}"
 if cg_data.get("chain"):
 extra_context += f", ={cg_data['chain']}"

 system = "cryptocurrencyresearcher,fromBinanceprojectdatakey 。returnJSON,。"
 user = f"""Binanceproject:
token: {symbol}, project: {name or ""}
: {raw_text}
{extra_context}

returnJSON:
{{
 "narrative": "defi_perp|ai_agent|ai_defi|defai|zk_proof|infra|defi|rwa|gamefi|meme|social|stablecoin|unknown",
 "narrative_desc": "projectwhat、whatfeatures",
 "vcs": ["fromCoinGeckoclassification"],
 "is_darling": true/false,
 "exclude_reason": null|"already_tge"|"meme_only"
}}

judge:
- narrative: main class
- vcs: CoinGeckoclassification "XXX Portfolio" XXXas
- is_darling: YZi Labs/Binance Labs CZ/ true
- exclude_reason: projectmain CEX(Coinbase/OKX/Bybit)3"already_tge"。DEXBinance,already_tge。CoinGeckodataalready_tge。meme"meme_only"
"""

 try:
 async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
 resp = await client.post(
 f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages",
 headers={
 "x-api-key": ANTHROPIC_API_KEY,
 "anthropic-version": "2023-06-01",
 "content-type": "application/json",
 },
 json={
 "model": ANTHROPIC_MODEL,
 "max_tokens": 800,
 "temperature": 0,
 "system": system,
 "messages": [{"role": "user", "content": user}],
 }
 )
 if resp.status_code != 200:
 logger.warning(f"LLMcall {resp.status_code}")
 return fallback
 data = resp.json()
 text = ""
 for block in data.get("content", []):
 if block.get("type") == "text":
 text = block.get("text", "")
 break
 text = text.strip()
 if text.startswith("```"):
 lines = text.split("\n")
 text = "\n".join(lines[1:-1])
 return json.loads(text)
 except Exception as e:
 logger.warning(f"LLM: {e}")
 return fallback

# ============================================================
# ============================================================

def _fmt_mcap(v):
 if not v:
 return "N/A"
 if v >= 1e9:
 return f"${v/1e9:.1f}B"
 if v >= 1e6:
 return f"${v/1e6:.1f}M"
 if v >= 1e3:
 return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def _fmt_price(v):
 if not v:
 return "N/A"
 if v >= 1:
 return f"${v:.2f}"
 if v >= 0.01:
 return f"${v:.4f}"
 return f"${v:.6f}"

def fmt_discovery(p: dict) -> str:
 tier = p.get("tier", "C")
 icon = TIER_ICONS.get(tier, "⚪")
 label = TIER_LABELS.get(tier, "")
 symbol = p["symbol"]
 name = p.get("name") or ""
 vcs = json.loads(p.get("vcs_json", "[]")) if isinstance(p.get("vcs_json"), str) else p.get("vcs", [])

 lines = [
 f"{icon} <b>Alpha · ${symbol}</b> {icon}",
 f"📋 {label}",
 "",
 f"<b>{name}</b>" if name else "",
 ]

 if p.get("narrative_desc"):
 lines.append(f"💡 {p['narrative_desc']}")
 if p.get("narrative") and p["narrative"] != "unknown":
 lines.append(f"🏷 : {p['narrative']}")
 lines.append("")

 if p.get("fdv"):
 lines.append(f"📊 FDV: {_fmt_mcap(p['fdv'])}")
 if p.get("circulating_mcap"):
 lines.append(f"📊 : {_fmt_mcap(p['circulating_mcap'])}")
 if p.get("open_price"):
 lines.append(f"💰 : {_fmt_price(p['open_price'])}")
 if p.get("total_supply") and p.get("circulating_supply"):
 pct = p["circulating_supply"] / p["total_supply"] * 100
 lines.append(f"📦 initial: {pct:.1f}%")

 if vcs:
 lines.append("")
 lines.append("🏛 <b></b>")
 for v in vcs[:5]:
 is_t1 = any(t in v.lower() for t in TIER1_VCS)
 lines.append(f" {'⭐' if is_t1 else '·'} {v}")

 if p.get("is_darling"):
 lines.append("")
 lines.append("🔥 <b>Binance</b>")

 if p.get("tier_reason"):
 lines.append("")
 lines.append(f"🎯 {p['tier_reason']}")

 lines.append("")
 lines.append(f"<i>📌 : {p.get('source', 'binance')}</i>")
 if p.get("raw_text"):
 lines.append(f"<i>{p['raw_text'][:120]}</i>")

 return "\n".join(l for l in lines if l is not None)

def fmt_countdown(p: dict, minutes: int) -> str:
 icon = TIER_ICONS.get(p.get("tier", "C"), "⚪")
 t = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
 lines = [
 f"{icon} <b></b>",
 f"<b>${p['symbol']}</b> · {p.get('name', '')}",
 f"⏰ <b>{t}</b>",
 ]
 if p.get("fdv"):
 lines.append(f"FDV: {_fmt_mcap(p['fdv'])}")
 if minutes <= 30:
 lines.append("🔔 <b></b>")
 return "\n".join(lines)

def fmt_launch(p: dict, price: float, mcap: float, fdv: float) -> str:
 lines = [
 f"🚀 <b>${p['symbol']} </b>",
 f": <b>{_fmt_price(price)}</b>",
 f": <b>{_fmt_mcap(mcap)}</b>",
 f"FDV: <b>{_fmt_mcap(fdv)}</b>",
 ]
 return "\n".join(lines)

def fmt_periodic(p: dict, idx: int, price: float, mcap: float, change_pct: float) -> str:
 arrow = "📈" if change_pct > 0 else "📉"
 minutes = 30 * idx
 lines = [
 f"⏱ <b>${p['symbol']} · +{minutes}min</b>",
 f": {_fmt_mcap(mcap)} ({arrow} {change_pct:+.1f}%)",
 f": {_fmt_price(price)}",
 ]
 if change_pct >= 100:
 lines.append("💡 <b>,</b>")
 elif change_pct <= -30:
 lines.append("⚠️ ,evaluation")
 return "\n".join(lines)

def fmt_anomaly(p: dict, atype: str, price: float, change_pct: float) -> str:
 emoji = {"double": "🚀", "halve": "🔻"}.get(atype, "⚡")
 desc = {"double": "", "halve": ""}.get(atype, "")
 return f"{emoji} <b>${p['symbol']} {desc}</b>\n: {change_pct:+.1f}%\n: {_fmt_price(price)}"

# ============================================================
# ============================================================

async def announcement_listener():
 """Binance,Alphaproject"""
 logger.info(f"📡 · {ANNOUNCEMENT_POLL_INTERVAL}s")
 while True:
 try:
 articles = await fetch_announcements()
 new_count = 0
 for art in articles:
 title = art.get("title", "")
 triggered, reason = is_trigger(title)
 if not triggered:
 continue

 symbol = extract_symbol(title)
 if not symbol:
 continue

 release_ts = art.get("releaseDate")
 release_iso = datetime.fromtimestamp(release_ts / 1000).isoformat() if release_ts else ""
 launch_date = release_iso[:10] if release_iso else datetime.utcnow().date().isoformat()

 pid = project_id(symbol, launch_date)
 if project_exists(pid):
 continue

 project = {
 "id": pid,
 "symbol": symbol,
 "name": extract_name(title),
 "launch_time": release_iso,
 "source": "binance_announcement",
 "raw_text": title,
 "tier": "PENDING",
 "vcs": [],
 "is_darling": False,
 "excluded": 0,
 }
 save_project(project)
 new_count += 1
 logger.info(f"🆕 ${symbol}: {title[:80]}")

 if new_count:
 logger.info(f" {new_count} project")
 except Exception as e:
 logger.error(f": {e}", exc_info=True)

 await asyncio.sleep(ANNOUNCEMENT_POLL_INTERVAL)

# ============================================================
# ============================================================

async def aggregation_worker():
 """PENDINGprojectdata、、"""
 logger.info(f"🧠 · {AGGREGATION_POLL_INTERVAL}s")
 while True:
 try:
 pending = list_pending()
 for p in pending:
 symbol = p["symbol"]
 try:
 logger.info(f"📦 ${symbol}")

 # 1. CoinGecko
 cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
 await asyncio.sleep(1) # rate limiting

 llm = await llm_extract(p.get("raw_text", ""), symbol, p.get("name"), cg_data=cg)
 await asyncio.sleep(1)

 if llm.get("exclude_reason") in ("already_tge", "meme_only"):
 update_project(p["id"], {
 "excluded": 1,
 "exclude_reason": llm["exclude_reason"],
 "tier": "EXCLUDED",
 })
 logger.info(f"⏭ ${symbol} : {llm['exclude_reason']}")
 continue

 cg_fdv = cg.get("fdv", 0) or 0
 cg_mcap = cg.get("mcap", 0) or 0
 data_suspect = False
 data_warnings = []

 if cg.get("found") and cg_fdv > 0 and cg_fdv < 100_000:
 data_suspect = True
 data_warnings.append(f"FDV=${cg_fdv:,.0f},CoinGeckomatchingerror")
 logger.warning(f"⚠️ {symbol} FDV=${cg_fdv:,.0f} ,data")

 if (cg.get("circ_supply") and cg.get("total_supply")
 and cg["circ_supply"] == cg["total_supply"]
 and cg_fdv < 1_000_000):
 data_suspect = True

 if data_suspect:
 cg_fdv = 0
 cg_mcap = 0
 logger.warning(f"⚠️ {symbol} CoinGeckodata,usageLLMjudge")

 is_darling = llm.get("is_darling", False)
 vcs = llm.get("vcs", [])
 narrative = llm.get("narrative", "unknown")
 rating = rate_project(
 cg_mcap, cg_fdv,
 vcs, narrative, is_darling
 )

 update_project(p["id"], {
 "tier": rating["tier"],
 "tier_reason": rating["reason"],
 "narrative": narrative,
 "narrative_desc": llm.get("narrative_desc", ""),
 "vcs_json": json.dumps(vcs),
 "is_darling": int(is_darling),
 "open_price": cg.get("price") if not data_suspect else None,
 "total_supply": cg.get("total_supply") if not data_suspect else None,
 "circulating_supply": cg.get("circ_supply") if not data_suspect else None,
 "fdv": cg_fdv if cg_fdv else None,
 "circulating_mcap": cg_mcap if cg_mcap else None,
 })

 full = get_project(p["id"])
 if full and not has_pushed(p["id"], "discovery"):
 text = fmt_discovery(full)
 silent = rating["tier"] in ("B", "C")
 ok = await send_tg(text, silent=silent)
 if ok:
 log_push(p["id"], "discovery", text)
 logger.info(f"✅ ${symbol} [{rating['tier']}]")

 except Exception as e:
 logger.error(f" {symbol} : {e}", exc_info=True)
 update_project(p["id"], {"tier": "ERROR", "tier_reason": str(e)[:100]})

 except Exception as e:
 logger.error(f"loop: {e}", exc_info=True)

 await asyncio.sleep(AGGREGATION_POLL_INTERVAL)

# ============================================================
# ============================================================

async def post_launch_monitor():
 """ + + 30min×4tracking + """
 logger.info(f"📊 monitoring · {MONITOR_POLL_INTERVAL}s")
 while True:
 try:
 projects = list_active()
 for p in projects:
 try:
 await _monitor_project(p)
 except Exception as e:
 logger.error(f"monitoring {p['symbol']} : {e}")
 except Exception as e:
 logger.error(f"monitoringloop: {e}", exc_info=True)

 await asyncio.sleep(MONITOR_POLL_INTERVAL)

async def _monitor_project(p: dict):
 pid = p["id"]
 symbol = p["symbol"]
 launch_str = p.get("launch_time", "")
 if not launch_str:
 return

 try:
 launch = datetime.fromisoformat(launch_str.replace("Z", "").split("+")[0])
 except:
 return

 now = datetime.utcnow()
 delta_sec = (launch - now).total_seconds()

 # T-3h
 if 3*3600 - 300 <= delta_sec <= 3*3600 + 300:
 if not has_pushed(pid, "t_minus_3h"):
 text = fmt_countdown(p, int(delta_sec / 60))
 ok = await send_tg(text, silent=p.get("tier") in ("B", "C"))
 if ok:
 log_push(pid, "t_minus_3h", text)

 # T-30m
 elif 30*60 - 150 <= delta_sec <= 30*60 + 150:
 if not has_pushed(pid, "t_minus_30m"):
 text = fmt_countdown(p, int(delta_sec / 60))
 ok = await send_tg(text, silent=False)
 if ok:
 log_push(pid, "t_minus_30m", text)

 elif -300 <= delta_sec <= 0:
 if not has_pushed(pid, "at_launch"):
 cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
 if cg.get("price"):
 text = fmt_launch(p, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
 ok = await send_tg(text, silent=False)
 if ok:
 log_push(pid, "at_launch", text)
 save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))

 elif 0 < -delta_sec <= 2.5 * 3600:
 minutes_after = int(-delta_sec / 60)
 for idx, target in enumerate([30, 60, 90, 120], 1):
 if abs(minutes_after - target) <= 5:
 ptype = f"post_30m_{idx}"
 if not has_pushed(pid, ptype):
 cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
 if cg.get("price"):
 open_price = p.get("open_price") or cg["price"]
 change = ((cg["price"] - open_price) / open_price * 100) if open_price else 0
 text = fmt_periodic(p, idx, cg["price"], cg.get("mcap", 0), change)
 ok = await send_tg(text, silent=p.get("tier") in ("B", "C") and idx > 1)
 if ok:
 log_push(pid, ptype, text)
 save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))

 if change >= 100 and not has_pushed(pid, "anomaly_double"):
 t = fmt_anomaly(p, "double", cg["price"], change)
 if await send_tg(t):
 log_push(pid, "anomaly_double", t)
 elif change <= -50 and not has_pushed(pid, "anomaly_halve"):
 t = fmt_anomaly(p, "halve", cg["price"], change)
 if await send_tg(t):
 log_push(pid, "anomaly_halve", t)
 break

# ============================================================
# ============================================================

async def main():
 init_db()
 logger.info(f"📂 database: {DB_PATH}")

 # testingTG
 ok = await send_tg("🎉 <b>Alpha Monitor v2 </b>\n\n📡 Binance...\n🔔 Alpha")
 if ok:
 logger.info("✅ TG")
 else:
 logger.warning("⚠️ TG,configuration")

 tasks = [
 asyncio.create_task(announcement_listener(), name="announcements"),
 asyncio.create_task(aggregation_worker(), name="aggregator"),
 asyncio.create_task(post_launch_monitor(), name="monitor"),
 ]

 logger.info("=" * 50)
 logger.info("🚀 Alpha Monitor v2 ")
 logger.info(f" 📡 : {ANNOUNCEMENT_POLL_INTERVAL}s")
 logger.info(f" 🧠 LLM: {'Sonnet' if ANTHROPIC_API_KEY else 'degradation()'}")
 logger.info(f" 🔔 TG: {'✅' if TG_BOT_TOKEN else '❌'}")
 logger.info("=" * 50)

 try:
 await asyncio.gather(*tasks)
 except KeyboardInterrupt:
 for t in tasks:
 t.cancel()

if __name__ == "__main__":
 try:
 asyncio.run(main())
 except KeyboardInterrupt:
 pass

AI Autonomous Trading #

Futures + Alpha Autonomous Trading v1 #

Date: 2026.04.29 Tags: Python · Binance Futures · Autonomous · Telegram

AI scans market → analyzes → virtual trades → monitors → reviews — fully autonomous

⚠️ RISK WARNING: This code has only been tested on virtual/paper trading. It has NO real trading experience and should NOT be used as a basis for live trading. Not validated with real funds — use at your own risk.

An AI that autonomously scans the entire Binance futures market every 30 seconds, detects anomalies, and makes virtual trades. 4 signal detection strategies (extreme negative funding rate → long squeeze, extreme positive funding → short, post-pump short, crash bounce). Before opening any position, runs a multi-dimensional environment check (BTC trend + Fear&Greed sentiment + OI attention + volume activity) — needs score ≥3/7 to proceed. Auto stop-loss/take-profit monitoring every 30 seconds.

Current results (honest): 4 closed trades, 75% win rate, +13.94U. But profit concentrated in one trade (IR short +45%), rest basically break-even. Position diversification insufficient — tends to stack same-direction same-logic trades. Still rule-based scoring, not true independent thinking.

Training path: scan → trade → close → review → find problems → improve → repeat. Goal: evolve from rule executor to independent-thinking trader.b:T5192,#!/usr/bin/env pytho

Full source code #

#!/usr/bin/env python3
"""
"""

import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_FILE = os.path.join(SCRIPT_DIR, "trades.json")
SCANNER_STATE = os.path.join(SCRIPT_DIR, "scanner_state.json")
SCANNER_LOG = os.path.join(SCRIPT_DIR, "scanner.log")
INITIAL_BALANCE = 100.0
TZ_UTC8 = timezone(timedelta(hours=8))

# === configuration ===
MAX_OPEN_POSITIONS = 3 # positions
POSITION_PCT = 30 # %
LEVERAGE = 3 # 
COOLDOWN_HOURS = 4 # 
MIN_VOLUME_M = 10 # 24h(U)

def load_tg_config():
 """Load TG config from environment variables or .env file"""
 env = {}
 # Try .env in script directory, then current directory
 for env_path in [
 os.path.join(SCRIPT_DIR, ".env"),
 os.path.join(os.getcwd(), ".env"),
 ]:
 if os.path.exists(env_path):
 with open(env_path) as f:
 for line in f:
 line = line.strip()
 if '=' in line and not line.startswith('#'):
 k, v = line.split('=', 1)
 env[k] = v.strip().strip('"').strip("'")
 break
 # OS environment variables override file
 for key in ['TG_BOT_TOKEN', 'TELEGRAM_BOT_TOKEN', 'TG_CHAT_ID']:
 val = os.environ.get(key)
 if val:
 env[key] = val
 return env

def send_tg(text):
 try:
 env = load_tg_config()
 token = env.get('TG_BOT_TOKEN', env.get('TELEGRAM_BOT_TOKEN', ''))
 if not token:
 return
 chat_id = env.get('TG_CHAT_ID', '')
 if not chat_id:
 return
 url = f"https://api.telegram.org/bot{token}/sendMessage"
 requests.post(url, json={
 "chat_id": chat_id,
 "text": text,
 "parse_mode": "Markdown"
 }, timeout=10)
 except:
 pass

def load_trades():
 if os.path.exists(DATA_FILE):
 with open(DATA_FILE, "r", encoding="utf-8") as f:
 return json.load(f)
 return {"initial_balance": INITIAL_BALANCE, "trades": []}

def save_trades(data):
 with open(DATA_FILE, "w", encoding="utf-8") as f:
 json.dump(data, f, ensure_ascii=False, indent=2)

def load_state():
 if os.path.exists(SCANNER_STATE):
 with open(SCANNER_STATE, "r") as f:
 return json.load(f)
 return {"last_opens": {}, "signals_seen": {}}

def save_state(state):
 with open(SCANNER_STATE, "w") as f:
 json.dump(state, f, ensure_ascii=False, indent=2)

def get_balance(data):
 balance = data.get("initial_balance", INITIAL_BALANCE)
 for t in data["trades"]:
 if t["status"] == "closed" and t["pnl_usd"] is not None:
 balance += t["pnl_usd"]
 return balance

def next_id(data):
 if not data["trades"]:
 return "001"
 max_id = max(int(t["id"]) for t in data["trades"])
 return f"{max_id + 1:03d}"

def now_str():
 return datetime.now(TZ_UTC8).strftime("%Y-%m-%dT%H:%M:%S")

def log(msg):
 ts = datetime.now(TZ_UTC8).strftime("%m-%d %H:%M:%S")
 line = f"[{ts}] {msg}"
 print(line)
 with open(SCANNER_LOG, "a") as f:
 f.write(line + "\n")

# === BinanceAPI ===
def get_all_tickers():
 url = "https://fapi.binance.com/fapi/v1/ticker/24hr"
 resp = requests.get(url, timeout=10)
 return resp.json()

def get_funding_rates():
 url = "https://fapi.binance.com/fapi/v1/premiumIndex"
 resp = requests.get(url, timeout=10)
 return {item['symbol']: float(item['lastFundingRate']) * 100 
 for item in resp.json()}

def get_funding_history(symbol, limit=8):
 url = f"https://fapi.binance.com/fapi/v1/fundingRate?symbol={symbol}&limit={limit}"
 resp = requests.get(url, timeout=10)
 return [float(item['fundingRate']) * 100 for item in resp.json()]

def get_open_interest(symbol):
 url = f"https://fapi.binance.com/fapi/v1/openInterest?symbol={symbol}"
 resp = requests.get(url, timeout=10)
 data = resp.json()
 return float(data['openInterest'])

def get_klines(symbol, interval="4h", limit=6):
 url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
 resp = requests.get(url, timeout=10)
 return resp.json()

def detect_extreme_negative_funding(symbol, funding_rate, funding_rates_map):
 """
 policy1: extreme funding ratedeep negative → long(short squeeze)
 condition: <-0.08% consecutiveperiods negative
 """
 if funding_rate >= -0.08:
 return None
 
 try:
 history = get_funding_history(symbol, 8)
 neg_count = sum(1 for r in history if r < -0.03)
 if neg_count < 4:
 return None
 
 avg_rate = sum(history) / len(history)
 
 # more extreme rate,stronger signal
 strength = "S" if avg_rate < -0.15 else "A" if avg_rate < -0.10 else "B"
 
 return {
 "type": "extreme_neg_funding",
 "direction": "long",
 "strength": strength,
 "reason": f"extreme funding ratedeep negative avg:{avg_rate:.4f}% consecutive{neg_count}/8 short squeezeprobability",
 "sl_pct": 0.08, # 8%
 "tp_pct": 0.12, # 12%
 }
 except:
 return None

def detect_extreme_positive_funding(symbol, funding_rate, funding_rates_map):
 """
 policy2: extreme funding rate → short(crowded longs)
 condition: >0.10% consecutivehigh positive
 """
 if funding_rate <= 0.10:
 return None
 
 try:
 history = get_funding_history(symbol, 8)
 pos_count = sum(1 for r in history if r > 0.05)
 if pos_count < 4:
 return None
 
 avg_rate = sum(history) / len(history)
 strength = "S" if avg_rate > 0.20 else "A" if avg_rate > 0.12 else "B"
 
 return {
 "type": "extreme_pos_funding",
 "direction": "short",
 "strength": strength,
 "reason": f"extreme funding rate avg:{avg_rate:.4f}% consecutive{pos_count}/8high positive ",
 "sl_pct": 0.10,
 "tp_pct": 0.15,
 }
 except:
 return None

def detect_crash_bounce(ticker):
 """
 policy3: crashbounce(oversold bounce)
 condition: 24h>25% but 4hstabilize/
 """
 change_pct = float(ticker['priceChangePercent'])
 if change_pct >= -25:
 return None
 
 symbol = ticker['symbol']
 try:
 klines = get_klines(symbol, "1h", 6)
 recent_closes = [float(k[4]) for k in klines[-3:]]
 if len(recent_closes) >= 2 and recent_closes[-1] >= recent_closes[-2]:
 return {
 "type": "crash_bounce",
 "direction": "long",
 "strength": "B", # B
 "reason": f"24hcrash{change_pct:.1f}%stabilize oversold bounce",
 "sl_pct": 0.10,
 "tp_pct": 0.15,
 }
 except:
 pass
 return None

def detect_pump_short(ticker):
 """
 policy4: pumpshort after(ATHpullback)
 condition: 24h>40% — model,pumpcallbackprobability>85%
 """
 change_pct = float(ticker['priceChangePercent'])
 if change_pct <= 40:
 return None
 
 symbol = ticker['symbol']
 try:
 klines = get_klines(symbol, "1h", 6)
 highs = [float(k[2]) for k in klines]
 closes = [float(k[4]) for k in klines]
 current = closes[-1]
 peak = max(highs)
 
 pullback = (peak - current) / peak * 100
 if pullback < 10:
 return None
 
 strength = "A" if change_pct > 80 else "B"
 
 return {
 "type": "pump_short",
 "direction": "short",
 "strength": strength,
 "reason": f"24hpump{change_pct:.1f}%pullback{pullback:.1f}% callbackprobability>85%",
 "sl_pct": 0.15, # pump,
 "tp_pct": 0.20,
 }
 except:
 pass
 return None

# === environment check ===
def check_environment(symbol, signal):
 """
 before opening:,multi-dimensionalalignment
 return (pass/fail, analysis_dict, adjusted_strength)
 """
 analysis = {
 "btc_env": "",
 "sentiment": "",
 "oi_check": "",
 "volume_check": "",
 "verdict": ""
 }
 score = 0 # score,>=3to open
 
 try:
 btc_url = "https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT"
 btc = requests.get(btc_url, timeout=5).json()
 btc_chg = float(btc['priceChangePercent'])
 
 if signal["direction"] == "long":
 if btc_chg > -2:
 score += 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
 elif btc_chg < -5:
 score -= 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% crashlong -1"
 else:
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
 else: # short
 if btc_chg < 2:
 score += 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
 elif btc_chg > 5:
 score -= 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% pumpshort -1"
 else:
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
 
 try:
 fng = requests.get("https://api.alternative.me/fng/", timeout=5).json()
 fng_val = int(fng['data'][0]['value'])
 if signal["direction"] == "long":
 if fng_val <= 25:
 score += 1
 analysis["sentiment"] = f"FGI={fng_val} long +1"
 elif fng_val >= 75:
 score -= 1
 analysis["sentiment"] = f"FGI={fng_val} long -1"
 else:
 analysis["sentiment"] = f"FGI={fng_val} 0"
 else:
 if fng_val >= 75:
 score += 1
 analysis["sentiment"] = f"FGI={fng_val} short +1"
 elif fng_val <= 25:
 score -= 1
 analysis["sentiment"] = f"FGI={fng_val} short -1"
 else:
 analysis["sentiment"] = f"FGI={fng_val} 0"
 except:
 analysis["sentiment"] = "FGI 0"
 
 try:
 oi = get_open_interest(symbol)
 ticker = requests.get(f"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol={symbol}", timeout=5).json()
 price = float(ticker['lastPrice'])
 oi_usd = oi * price
 
 if oi_usd > 5_000_000: # OI > 5M
 score += 1
 analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M +1"
 else:
 analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 0"
 except:
 analysis["oi_check"] = "OI 0"
 
 try:
 vol = float(ticker.get('quoteVolume', 0))
 if vol > 50_000_000:
 score += 1
 analysis["volume_check"] = f"24h={vol/1e6:.0f}M +1"
 elif vol > 20_000_000:
 analysis["volume_check"] = f"24h={vol/1e6:.0f}M 0"
 else:
 score -= 1
 analysis["volume_check"] = f"24h={vol/1e6:.0f}M -1"
 except:
 analysis["volume_check"] = " 0"
 
 if signal["strength"] == "S":
 score += 2
 elif signal["strength"] == "A":
 score += 1
 
 analysis["verdict"] = f"score:{score}/7"
 
 if score >= 3:
 return True, analysis, signal["strength"]
 else:
 return False, analysis, signal["strength"]
 
 except Exception as e:
 analysis["verdict"] = f":{e} "
 return False, analysis, signal["strength"]

def execute_open(data, state, symbol, price, signal):
 """execute — environment check"""
 
 # environment check
 passed, env_analysis, strength = check_environment(symbol, signal)
 env_summary = " | ".join(v for v in env_analysis.values() if v)
 
 if not passed:
 log(f"via {symbol}: {env_summary}")
 return
 
 log(f"via {symbol}: {env_summary}")
 
 balance = get_balance(data)
 position_usd = balance * POSITION_PCT / 100
 
 if signal["direction"] == "long":
 sl = round(price * (1 - signal["sl_pct"]), 6)
 tp = round(price * (1 + signal["tp_pct"]), 6)
 else:
 sl = round(price * (1 + signal["sl_pct"]), 6)
 tp = round(price * (1 - signal["tp_pct"]), 6)
 
 trade = {
 "id": next_id(data),
 "symbol": symbol,
 "direction": signal["direction"],
 "leverage": LEVERAGE,
 "position_pct": POSITION_PCT,
 "position_usd": round(position_usd, 4),
 "notional_usd": round(position_usd * LEVERAGE, 4),
 "entry_price": price,
 "stop_loss": sl,
 "take_profit": tp,
 "entry_time": now_str(),
 "exit_price": None,
 "exit_time": None,
 "exit_reason": None,
 "pnl_pct": None,
 "pnl_usd": None,
 "status": "open",
 "pre_analysis": {
 "btc_env": env_analysis.get("btc_env", ""),
 "sentiment": env_analysis.get("sentiment", ""),
 "oi": env_analysis.get("oi_check", ""),
 "volume": env_analysis.get("volume_check", ""),
 "key_reason": f"[{signal['strength']}] {signal['reason']}",
 "risk": f"score:{env_analysis.get('verdict','')} policy:{signal['type']}"
 },
 "post_review": None
 }
 
 data["trades"].append(trade)
 save_trades(data)
 
 state["last_opens"][symbol] = now_str()
 save_state(state)
 
 direction_cn = "long" if signal["direction"] == "long" else "short"
 
 msg = f"""```
[scan] #{trade['id']}
: {symbol}
: {direction_cn} {LEVERAGE}x
: {price}
: {sl}
: {tp}
: {position_usd:.2f}U
: [{signal['strength']}] {signal['reason']}
: {trade['entry_time']}
```"""
 
 log(f" #{trade['id']} {symbol} {direction_cn} @ {price} | {signal['reason']}")
 send_tg(msg)
 print(msg)

def swap_weakest(data, state, open_positions, new_signal, tickers):
 ticker_map = {t['symbol']: float(t['lastPrice']) for t in tickers}
 
 worst_trade = None
 worst_pnl = float('inf')
 
 for t in open_positions:
 price = ticker_map.get(t["symbol"])
 if price is None:
 continue
 if t["direction"] == "long":
 pnl_pct = (price - t["entry_price"]) / t["entry_price"] * 100
 else:
 pnl_pct = (t["entry_price"] - price) / t["entry_price"] * 100
 
 if pnl_pct < worst_pnl:
 worst_pnl = pnl_pct
 worst_trade = t
 worst_price = price
 
 if worst_trade is None:
 return
 
 if worst_pnl > 0:
 log(f"but positions, | : {new_signal['symbol']}")
 return
 
 if worst_trade["direction"] == "long":
 pnl_pct_lev = (worst_price - worst_trade["entry_price"]) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
 else:
 pnl_pct_lev = (worst_trade["entry_price"] - worst_price) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
 
 pos_usd = worst_trade.get("position_usd", worst_trade.get("position_pct", 30))
 pnl_usd = round(pnl_pct_lev / 100 * pos_usd, 4)
 
 worst_trade["exit_price"] = worst_price
 worst_trade["exit_time"] = now_str()
 worst_trade["exit_reason"] = f"→{new_signal['symbol']}"
 worst_trade["pnl_pct"] = round(pnl_pct_lev, 2)
 worst_trade["pnl_usd"] = pnl_usd
 worst_trade["status"] = "closed"
 save_trades(data)
 
 direction_cn = "" if worst_trade["direction"] == "long" else ""
 msg = f"""```
[close] #{worst_trade['id']}
: {worst_trade['symbol']} {direction_cn}
: {worst_trade['entry_price']}
: {worst_price}
: {pnl_pct_lev:+.2f}% ({pnl_usd:+.2f}U)
: S{new_signal['symbol']}
```"""
 log(f" #{worst_trade['id']} {worst_trade['symbol']} {pnl_usd:+.2f}U → {new_signal['symbol']}")
 send_tg(msg)
 
 execute_open(data, state, new_signal["symbol"], new_signal["price"], new_signal)

def scan():
 data = load_trades()
 state = load_state()
 now = datetime.now(TZ_UTC8)
 
 open_positions = [t for t in data["trades"] if t["status"] == "open"]
 open_symbols = set(t["symbol"] for t in open_positions)
 
 if len(open_positions) >= MAX_OPEN_POSITIONS:
 return
 
 try:
 tickers = get_all_tickers()
 funding_rates = get_funding_rates()
 except Exception as e:
 log(f"APIerror: {e}")
 return
 
 exclude = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "FDUSDUSDT", "BTCDOMUSDT", "BTCSTUSDT"}
 candidates = [t for t in tickers 
 if t['symbol'].endswith('USDT') 
 and t['symbol'] not in exclude
 and float(t['quoteVolume']) > MIN_VOLUME_M * 1e6]
 
 signals = []
 
 for ticker in candidates:
 symbol = ticker['symbol']
 
 if symbol in open_symbols:
 continue
 
 last_open = state.get("last_opens", {}).get(symbol)
 if last_open:
 try:
 last_dt = datetime.fromisoformat(last_open)
 if last_dt.tzinfo is None:
 last_dt = last_dt.replace(tzinfo=TZ_UTC8)
 if (now - last_dt).total_seconds() < COOLDOWN_HOURS * 3600:
 continue
 except:
 pass
 
 fr = funding_rates.get(symbol, 0)
 
 for detect_fn in [
 lambda s, f, m: detect_extreme_negative_funding(s, f, m),
 lambda s, f, m: detect_extreme_positive_funding(s, f, m),
 lambda s, f, m: detect_crash_bounce(ticker),
 lambda s, f, m: detect_pump_short(ticker),
 ]:
 try:
 signal = detect_fn(symbol, fr, funding_rates)
 if signal:
 signal["symbol"] = symbol
 signal["price"] = float(ticker['lastPrice'])
 signal["volume_m"] = float(ticker['quoteVolume']) / 1e6
 signals.append(signal)
 except:
 continue
 
 if not signals:
 return
 
 strength_order = {"S": 0, "A": 1, "B": 2}
 signals.sort(key=lambda x: strength_order.get(x["strength"], 3))
 
 best = signals[0]
 
 if best["strength"] == "B":
 log(f"B: {best['symbol']} {best['reason']}")
 return
 
 slots = MAX_OPEN_POSITIONS - len(open_positions)
 if slots > 0:
 execute_open(data, state, best["symbol"], best["price"], best)
 elif best["strength"] == "S":
 swap_weakest(data, state, open_positions, best, tickers)

if __name__ == "__main__":
 scan()

Utility Tools #

VoiceKey — Speaker Verification #

Date: 2026.04.27 Tags: Python · Security · Telegram · Speaker Verification

Protect your AI agent with voiceprint authentication

Why this tool? More people use Telegram to control AI agents (servers, trading bots, smart home). If your TG account gets compromised, attackers can do anything with your AI. Passwords can be stolen or socially engineered — but your voice is unique and non-transferable. VoiceKey requires a voice message to verify identity before unlocking any commands. Zero AI cost, runs entirely on local CPU.

Full source code #

#!/usr/bin/env python3
"""
VoiceKey — voiceprintkey (Voiceprint Authentication)
Speaker verification for Telegram bot security.

Uses resemblyzer (GE2E model) for speaker embedding extraction
and cosine similarity for verification.

Zero AI cost — runs entirely on local CPU.

Usage:
 # Register voiceprint from audio files
 python voicekey.py register --audio voice1.ogg voice2.ogg --owner "YourName"
 
 # Verify a voice against stored voiceprint
 python voicekey.py verify --audio test.ogg
 
 # As a Python module
 from voicekey import VoiceKey
 vk = VoiceKey()
 vk.register(["voice1.ogg", "voice2.ogg"], owner="YourName")
 is_owner, score = vk.verify("test.ogg")
"""

import os
import json
import tempfile
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime

# Lazy imports to speed up module load when not needed
_encoder = None

def _get_encoder():
 """Lazy-load the voice encoder model (first call takes ~1s)."""
 global _encoder
 if _encoder is None:
 from resemblyzer import VoiceEncoder
 _encoder = VoiceEncoder()
 return _encoder

def _audio_to_wav(audio_path: str) -> str:
 """Convert any audio format to 16kHz mono WAV for processing."""
 from pydub import AudioSegment
 
 ext = Path(audio_path).suffix.lower()
 if ext in ('.ogg', '.oga'):
 audio = AudioSegment.from_ogg(audio_path)
 elif ext == '.mp3':
 audio = AudioSegment.from_mp3(audio_path)
 elif ext == '.wav':
 return audio_path # already wav
 elif ext in ('.m4a', '.aac'):
 audio = AudioSegment.from_file(audio_path, format=ext.lstrip('.'))
 else:
 audio = AudioSegment.from_file(audio_path)
 
 # Convert to 16kHz mono
 audio = audio.set_frame_rate(16000).set_channels(1)
 
 tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
 audio.export(tmp.name, format='wav')
 return tmp.name

def _extract_embedding(audio_path: str) -> np.ndarray:
 """Extract voice embedding from an audio file."""
 from resemblyzer import preprocess_wav
 
 encoder = _get_encoder()
 
 wav_path = _audio_to_wav(audio_path)
 wav = preprocess_wav(wav_path)
 
 # Cleanup temp file
 if wav_path != audio_path:
 os.unlink(wav_path)
 
 if len(wav) < 1600: # Less than 0.1s
 raise ValueError(f"Audio too short: {len(wav)/16000:.1f}s (need >0.1s)")
 
 return encoder.embed_utterance(wav)

class VoiceKey:
 """
 Speaker verification using voice embeddings.
 
 Attributes:
 data_dir: Directory to store voiceprint data
 threshold: Cosine similarity threshold for verification (default: 0.75)
 voiceprint: The registered owner's voice embedding (256-dim vector)
 """
 
 def __init__(self, data_dir: str = None, threshold: float = 0.75):
 if data_dir is None:
 data_dir = os.path.expanduser("~/.hermes/voiceprint")
 self.data_dir = Path(data_dir)
 self.data_dir.mkdir(parents=True, exist_ok=True)
 self.threshold = threshold
 self.voiceprint = None
 self.metadata = {}
 self._load()
 
 def _load(self):
 """Load existing voiceprint if available."""
 vp_path = self.data_dir / "voiceprint.npy"
 meta_path = self.data_dir / "voiceprint_meta.json"
 
 if vp_path.exists():
 self.voiceprint = np.load(str(vp_path))
 if meta_path.exists():
 with open(meta_path) as f:
 self.metadata = json.load(f)
 self.threshold = self.metadata.get("threshold", self.threshold)
 
 @property
 def is_registered(self) -> bool:
 """Check if a voiceprint is registered."""
 return self.voiceprint is not None
 
 def register(self, audio_paths: list, owner: str = "owner") -> dict:
 """
 Register a voiceprint from multiple audio samples.
 
 Args:
 audio_paths: List of audio file paths (ogg, mp3, wav, etc.)
 owner: Name of the voiceprint owner
 
 Returns:
 dict with registration results
 """
 embeddings = []
 results = []
 
 for path in audio_paths:
 try:
 embed = _extract_embedding(path)
 embeddings.append(embed)
 results.append({"file": os.path.basename(path), "status": "ok"})
 except Exception as e:
 results.append({"file": os.path.basename(path), "status": "error", "error": str(e)})
 
 if not embeddings:
 raise ValueError("No valid audio samples provided")
 
 # Average embeddings and normalize
 voiceprint = np.mean(embeddings, axis=0)
 voiceprint = voiceprint / np.linalg.norm(voiceprint)
 
 # Save
 np.save(str(self.data_dir / "voiceprint.npy"), voiceprint)
 
 self.metadata = {
 "owner": owner,
 "samples_used": len(embeddings),
 "embedding_dim": int(voiceprint.shape[0]),
 "threshold": self.threshold,
 "created": datetime.now().isoformat(),
 }
 with open(self.data_dir / "voiceprint_meta.json", "w") as f:
 json.dump(self.metadata, f, indent=2)
 
 self.voiceprint = voiceprint
 
 # Self-test
 similarities = []
 for embed in embeddings:
 sim = float(np.dot(voiceprint, embed / np.linalg.norm(embed)))
 similarities.append(sim)
 
 return {
 "owner": owner,
 "samples": len(embeddings),
 "self_test_scores": similarities,
 "min_score": min(similarities),
 "details": results,
 }
 
 def verify(self, audio_path: str) -> tuple:
 """
 Verify if an audio sample matches the registered voiceprint.
 
 Args:
 audio_path: Path to audio file to verify
 
 Returns:
 tuple: (is_verified: bool, similarity_score: float)
 """
 if not self.is_registered:
 raise RuntimeError("No voiceprint registered. Call register() first.")
 
 embed = _extract_embedding(audio_path)
 embed = embed / np.linalg.norm(embed)
 
 similarity = float(np.dot(self.voiceprint, embed))
 is_verified = similarity >= self.threshold
 
 return is_verified, similarity
 
 def get_info(self) -> dict:
 """Get voiceprint registration info."""
 if not self.is_registered:
 return {"registered": False}
 return {
 "registered": True,
 **self.metadata,
 }

def main():
 parser = argparse.ArgumentParser(
 description="VoiceKey — Speaker verification for security"
 )
 sub = parser.add_subparsers(dest="command")
 
 # Register
 reg = sub.add_parser("register", help="Register voiceprint from audio files")
 reg.add_argument("--audio", nargs="+", required=True, help="Audio files (ogg/mp3/wav)")
 reg.add_argument("--owner", default="owner", help="Owner name")
 reg.add_argument("--data-dir", default=None, help="Data directory")
 reg.add_argument("--threshold", type=float, default=0.75, help="Verification threshold")
 
 # Verify
 ver = sub.add_parser("verify", help="Verify audio against voiceprint")
 ver.add_argument("--audio", required=True, help="Audio file to verify")
 ver.add_argument("--data-dir", default=None, help="Data directory")
 
 # Info
 sub.add_parser("info", help="Show voiceprint info")
 
 args = parser.parse_args()
 
 if args.command == "register":
 vk = VoiceKey(data_dir=args.data_dir, threshold=args.threshold)
 result = vk.register(args.audio, owner=args.owner)
 print(f"Registered voiceprint for: {result['owner']}")
 print(f"Samples used: {result['samples']}")
 print(f"Self-test scores: {[f'{s:.4f}' for s in result['self_test_scores']]}")
 print(f"Min score: {result['min_score']:.4f} (threshold: {args.threshold})")
 
 elif args.command == "verify":
 vk = VoiceKey(data_dir=getattr(args, 'data_dir', None))
 is_ok, score = vk.verify(args.audio)
 status = "PASS" if is_ok else "FAIL"
 print(f"[{status}] Similarity: {score:.4f} (threshold: {vk.threshold})")
 
 elif args.command == "info":
 vk = VoiceKey()
 info = vk.get_info()
 for k, v in info.items():
 print(f" {k}: {v}")
 else:
 parser.print_help()

if __name__ == "__main__":
 main()

Closing notes #

All seven snippets above are pulled from connectfarm1.com on 2026-05-04. They share three philosophies: (1) zero or near-zero AI cost — most use rule engines and free public APIs instead of LLMs; (2) Python only, easy to run on a cheap VPS with crontab or a simple while True; (3) Telegram is the universal output — no dashboards, no front-end, just push messages where you actually read them. Combine the radars to triangulate signals, and treat the autonomous trader as a research sandbox, not a money printer.

发布于 Friday, May 15, 2026 · 最后更新 Friday, May 15, 2026