Code Vault — 7 Open-Source Crypto Radar & Trading Tools
Code Vault: 7 essential crypto trading tools for automated trading, market analysis, and portfolio management. Open source trading utilities.
{</* resource-info */>}
This is a complete walkthrough of the seven snippets currently published in Code Vault — a personal repository of crypto trading radars, autonomous trading systems, and security utilities, all in pure Python with zero or near-zero API costs. Every snippet below includes the full source code so you can read, fork, and run them locally. Use the table of contents on the right to jump to a specific tool.
⚠️ Risk warning — These tools touch live markets and on-chain data. Several of them push real-time alerts to Telegram and one of them (the AI autonomous trader) opens virtual positions on Binance Futures. Read the per-tool notes carefully. Use at your own risk; the author makes no warranty for trading outcomes.
Trading Radar #
Vitalik Sell Radar #
Date: 2026.05.02 Tags: Python · WebSocket · Ethereum · Telegram · Event-Driven
GitHub: vitalik-sell-radar{rel=“nofollow”}
WebSocket event-driven Vitalik wallet sell detection with Telegram alerts
Monitors Vitalik Buterin’s wallet (vitalik.eth) for ERC-20 token sells via WebSocket event subscription — zero polling, sub-second latency. Automatically classifies recipients as DEX Router (Uniswap, 1inch, SushiSwap), CEX hot wallet (Binance, Coinbase, Kraken), or LP Pool. Fetches real-time token prices from DexScreener. Multi-RPC failover with auto-reconnect. Pure Python, zero cost — uses free public RPC nodes.
Full source code #
#!/usr/bin/env python3
"""
Vitalik Sell Radar — Event-Driven Edition
WebSocket subscription to ERC-20 Transfer events, real-time detection of
Vitalik's sell activity, instant push to Telegram.
Architecture:
1. WebSocket subscribes to Transfer(from=vitalik) events → sub-second detection
2. Classifies sell behavior (transfers to DEX Router / CEX / LP Pool)
3. Queries token info + price via DexScreener
4. Pushes alert to Telegram
5. Auto-reconnect + multi-RPC failover
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import aiohttp
import websockets
# Load .env file
def load_env():
env_path = Path(__file__).parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_env()
# ============================================================
# Configuration
# ============================================================
# Vitalik's main wallet
VITALIK_ADDRESS = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
VITALIK_PADDED = "0x" + VITALIK_ADDRESS[2:].lower().zfill(64)
# ERC-20 Transfer event topic
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
# WebSocket RPC endpoints (free, support eth_subscribe)
WS_ENDPOINTS = [
"wss://ethereum-rpc.publicnode.com",
"wss://eth.drpc.org",
"wss://ethereum.publicnode.com",
]
# HTTP RPC for querying token info
HTTP_RPC = os.environ.get("HTTP_RPC", "https://eth.drpc.org")
# Telegram
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# Known DEX Router addresses (sell destinations)
KNOWN_DEX_ROUTERS = {
# Uniswap
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap V2 Router",
"0xe592427a0aece92de3edee1f18e0157c05861564": "Uniswap V3 Router",
"0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "Uniswap V3 Router2",
"0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad": "Uniswap Universal Router",
"0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b": "Uniswap Universal Router (old)",
# 1inch
"0x1111111254eeb25477b68fb85ed929f73a960582": "1inch V5",
"0x111111125421ca6dc452d289314280a0f8842a65": "1inch V6",
# SushiSwap
"0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "SushiSwap Router",
# CoW Protocol
"0x9008d19f58aabd9ed0d60971565aa8510560ab41": "CoW Settlement",
# 0x
"0xdef1c0ded9bec7f1a1670819833240f027b25eff": "0x Exchange Proxy",
# Curve
"0x99a58482bd75cbab83b27ec03ca68ff489b5788f": "Curve Router",
}
# Known CEX hot wallets (partial list)
KNOWN_CEX = {
"0x28c6c06298d514db089934071355e5743bf21d60": "Binance Hot Wallet",
"0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance Hot Wallet 2",
"0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance Hot Wallet 3",
"0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance Hot Wallet 4",
"0x71660c4005ba85c37ccec55d0c4493e66fe775d3": "Coinbase",
"0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43": "Coinbase 10",
"0x503828976d22510aad0201ac7ec88293211d23da": "Coinbase 2",
"0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2": "FTX (defunct)",
"0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0": "Kraken",
"0xae2d4617c862309a3d75a0ffb358c7a5009c673f": "Kraken 10",
}
# Minimum notification amount (USD), 0 = notify all
MIN_NOTIFY_USD = int(os.environ.get("MIN_NOTIFY_USD", "0"))
# Reconnect settings
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60
# ============================================================
# Logging
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("vitalik-radar")
# ============================================================
# Global state
# ============================================================
# Dedup set for processed tx hashes (last 1000)
seen_txs: set = set()
seen_txs_list: list = []
# HTTP session
http_session: aiohttp.ClientSession | None = None
# Token info cache: {address: {symbol, name, decimals}}
token_cache: dict = {}
# Running flag
running = True
# ============================================================
# Utility functions
# ============================================================
def shorten_addr(addr: str) -> str:
"""Shorten address for display"""
return f"{addr[:6]}...{addr[-4:]}"
def decode_transfer_value(data_hex: str, decimals: int) -> float:
"""Decode Transfer event value"""
try:
raw = int(data_hex, 16)
return raw / (10 ** decimals)
except:
return 0.0
def topic_to_address(topic: str) -> str:
"""Extract 20-byte address from 32-byte topic"""
return "0x" + topic[-40:]
def classify_recipient(to_addr: str) -> tuple[str, str]:
"""
Classify recipient address.
Returns (type, name)
Type: "dex" / "cex" / "pool" / "unknown"
"""
to_lower = to_addr.lower()
if to_lower in KNOWN_DEX_ROUTERS:
return "dex", KNOWN_DEX_ROUTERS[to_lower]
if to_lower in KNOWN_CEX:
return "cex", KNOWN_CEX[to_lower]
return "unknown", ""
async def get_http_session() -> aiohttp.ClientSession:
global http_session
if http_session is None or http_session.closed:
http_session = aiohttp.ClientSession()
return http_session
async def rpc_call(method: str, params: list) -> dict | None:
"""HTTP JSON-RPC call"""
session = await get_http_session()
try:
async with session.post(
HTTP_RPC,
json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params},
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
data = await resp.json()
return data.get("result")
except Exception as e:
log.error(f"RPC call {method} failed: {e}")
return None
async def get_token_info(token_addr: str) -> dict:
"""Query ERC-20 token info (symbol, name, decimals)"""
addr_lower = token_addr.lower()
if addr_lower in token_cache:
return token_cache[addr_lower]
info = {"symbol": "???", "name": "Unknown", "decimals": 18, "address": token_addr}
# symbol()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x95d89b41"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
symbol_hex = hex_str[offset+64:offset+64+length*2]
info["symbol"] = bytes.fromhex(symbol_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["symbol"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
# decimals()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x313ce567"}, "latest"
])
if result and len(result) > 2:
try:
info["decimals"] = int(result, 16)
except:
pass
# name()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x06fdde03"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
name_hex = hex_str[offset+64:offset+64+length*2]
info["name"] = bytes.fromhex(name_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["name"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
token_cache[addr_lower] = info
return info
async def check_if_pool(addr: str) -> bool:
"""Check if address is a Uniswap V2/V3 pool (has token0 method)"""
result = await rpc_call("eth_call", [
{"to": addr, "data": "0x0dfe1681"}, "latest" # token0()
])
if result and len(result) == 66: # 0x + 64 hex chars
return True
return False
async def get_eth_price() -> float:
"""Get ETH price from CoinGecko"""
session = await get_http_session()
try:
async with session.get(
"https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
return data.get("ethereum", {}).get("usd", 0)
except:
return 2300 # fallback
async def get_token_price_usd(token_addr: str) -> float | None:
"""Get token USD price from DexScreener (free, no API key needed)"""
session = await get_http_session()
try:
async with session.get(
f"https://api.dexscreener.com/latest/dex/tokens/{token_addr}",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
pairs = data.get("pairs", [])
if pairs:
# Pick pair with highest liquidity
pairs.sort(key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0), reverse=True)
price = float(pairs[0].get("priceUsd", 0) or 0)
return price if price > 0 else None
except:
pass
return None
# ============================================================
# Telegram notifications
# ============================================================
async def send_telegram(text: str):
"""Send Telegram message"""
if not TG_BOT_TOKEN:
log.warning("[TG] No bot token configured, skipping notification")
return
session = await get_http_session()
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
try:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
result = await resp.json()
if not result.get("ok"):
log.error(f"[TG] Send failed: {result.get('description', '')}")
# Retry with plain text if HTML parse fails
if "parse" in result.get("description", "").lower():
payload["parse_mode"] = None
async with session.post(url, json=payload) as resp2:
pass
else:
log.info("[TG] Message sent")
except Exception as e:
log.error(f"[TG] Error: {e}")
# ============================================================
# Event handling
# ============================================================
async def handle_transfer_event(log_entry: dict):
"""Process a single Transfer event"""
tx_hash = log_entry.get("transactionHash", "")
token_addr = log_entry.get("address", "")
topics = log_entry.get("topics", [])
data = log_entry.get("data", "0x0")
# Dedup
dedup_key = f"{tx_hash}:{token_addr}"
if dedup_key in seen_txs:
return
seen_txs.add(dedup_key)
seen_txs_list.append(dedup_key)
# Cleanup (keep last 1000)
while len(seen_txs_list) > 1000:
old = seen_txs_list.pop(0)
seen_txs.discard(old)
# Parse topics: [Transfer, from, to]
if len(topics) < 3:
return
from_addr = topic_to_address(topics[1])
to_addr = topic_to_address(topics[2])
# Confirm it's from Vitalik
if from_addr.lower() != VITALIK_ADDRESS.lower():
return
# Get token info
token_info = await get_token_info(token_addr)
symbol = token_info["symbol"]
decimals = token_info["decimals"]
amount = decode_transfer_value(data, decimals)
if amount == 0:
return
# Classify recipient
recipient_type, recipient_name = classify_recipient(to_addr)
# If unknown, check if it's an LP pool
is_pool = False
if recipient_type == "unknown":
is_pool = await check_if_pool(to_addr)
if is_pool:
recipient_type = "pool"
recipient_name = "LP Pool"
# Determine if this is a "sell" action
is_sell = recipient_type in ("dex", "cex", "pool")
# If not a sell, just a regular transfer — log silently
if not is_sell:
log.info(f"[Transfer] {symbol} {amount:,.2f} → {shorten_addr(to_addr)} (regular transfer, not notifying)")
return
# Get price
token_price = await get_token_price_usd(token_addr)
usd_value = amount * token_price if token_price else None
# Below minimum notification amount — skip
if usd_value is not None and usd_value < MIN_NOTIFY_USD:
log.info(f"[Sell] {symbol} ${usd_value:.0f} < ${MIN_NOTIFY_USD} minimum, skipping")
return
# Build notification message
timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S UTC")
sell_type_label = {
"dex": "🔄 DEX Sell",
"cex": "🏦 CEX Transfer",
"pool": "🌊 Pool Sell",
}
msg_lines = [
f"<b>🚨 Vitalik Sell Signal</b>",
f"",
f"Token: <b>{symbol}</b>",
f"Amount: {amount:,.4f}",
]
if usd_value is not None:
msg_lines.append(f"Value: <b>${usd_value:,.0f}</b>")
if token_price is not None:
msg_lines.append(f"Price: ${token_price:,.8f}")
msg_lines.extend([
f"",
f"Type: {sell_type_label.get(recipient_type, 'Unknown')}",
f"Destination: {recipient_name or shorten_addr(to_addr)}",
f"Time: {timestamp}",
f"",
f"TX: https://etherscan.io/tx/{tx_hash}",
f"Token: https://dexscreener.com/ethereum/{token_addr}",
])
msg = "\n".join(msg_lines)
log.info(f"[SELL DETECTED] {symbol} {amount:,.4f} → {recipient_name or to_addr} | ${usd_value or '?'}")
await send_telegram(msg)
# ============================================================
# WebSocket listener main loop
# ============================================================
async def subscribe_and_listen(ws_url: str):
"""Connect to WebSocket and subscribe to Vitalik Transfer events"""
log.info(f"Connecting to {ws_url}...")
async with websockets.connect(
ws_url,
ping_interval=20,
ping_timeout=30,
close_timeout=10,
max_size=2**20, # 1MB
) as ws:
# Subscribe to Transfer FROM Vitalik
sub_from = {
"jsonrpc": "2.0", "id": 1,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_from))
resp = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(resp)
if "error" in data:
raise Exception(f"Subscribe failed: {data['error']}")
sub_id_from = data.get("result", "")
log.info(f"✅ Subscribed to Transfer FROM Vitalik (id={sub_id_from[:10]}...)")
# Also subscribe to Transfer TO Vitalik (monitor buys/receives)
sub_to = {
"jsonrpc": "2.0", "id": 2,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, None, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_to))
resp2 = await asyncio.wait_for(ws.recv(), timeout=10)
data2 = json.loads(resp2)
sub_id_to = data2.get("result", "")
if sub_id_to:
log.info(f"✅ Subscribed to Transfer TO Vitalik (id={sub_id_to[:10]}...)")
log.info("🔍 Monitoring Vitalik wallet... waiting for events")
# Listen for events
async for message in ws:
if not running:
break
try:
evt = json.loads(message)
if "params" not in evt:
continue
sub_id = evt["params"].get("subscription", "")
log_entry = evt["params"].get("result", {})
if sub_id == sub_id_from:
# Vitalik sent tokens → check if it's a sell
await handle_transfer_event(log_entry)
elif sub_id == sub_id_to:
# Vitalik received tokens — log silently
token_addr = log_entry.get("address", "")
token_info = await get_token_info(token_addr)
data_hex = log_entry.get("data", "0x0")
amount = decode_transfer_value(data_hex, token_info["decimals"])
log.debug(f"[Receive] {token_info['symbol']} +{amount:,.4f}")
except json.JSONDecodeError:
continue
except Exception as e:
log.error(f"Event handling error: {e}", exc_info=True)
async def main():
"""Main loop with auto-reconnect"""
global running
# Validate required config
if not TG_BOT_TOKEN:
log.warning("TG_BOT_TOKEN not set — Telegram notifications disabled")
if not TG_CHAT_ID:
log.warning("TG_CHAT_ID not set — Telegram notifications disabled")
# Signal handling
def shutdown(sig, frame):
global running
log.info(f"Received signal {sig}, shutting down...")
running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# Startup notice
log.info("=" * 50)
log.info("Vitalik Sell Radar — Started")
log.info(f"Monitoring: {VITALIK_ADDRESS}")
log.info(f"Min notify: ${MIN_NOTIFY_USD}")
log.info(f"Telegram: {'✅ Configured' if TG_BOT_TOKEN else '❌ Not configured'}")
log.info("=" * 50)
if TG_BOT_TOKEN and TG_CHAT_ID:
await send_telegram(
"🟢 Vitalik Sell Radar — Started\n\n"
f"Monitoring: {shorten_addr(VITALIK_ADDRESS)}\n"
f"Min notify: ${MIN_NOTIFY_USD}\n"
"Mode: WebSocket event-driven (sub-second latency)"
)
endpoint_idx = 0
reconnect_delay = RECONNECT_DELAY
while running:
ws_url = WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]
try:
await subscribe_and_listen(ws_url)
reconnect_delay = RECONNECT_DELAY # reset on success
except websockets.exceptions.ConnectionClosed as e:
log.warning(f"WebSocket closed: {e}")
except asyncio.TimeoutError:
log.warning("WebSocket timeout")
except Exception as e:
log.error(f"WebSocket error: {e}")
if not running:
break
# Switch endpoint and retry
endpoint_idx += 1
log.info(f"Reconnecting in {reconnect_delay}s... (next: {WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]})")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 1.5, MAX_RECONNECT_DELAY)
# Cleanup
if http_session and not http_session.closed:
await http_session.close()
log.info("Vitalik Sell Radar — Stopped")
if __name__ == "__main__":
asyncio.run(main())
On-Chain Narrative Radar #
Date: 2026.04.28 Tags: Python · GMGN · DEXScreener · Telegram
Momentum-driven on-chain token discovery across ETH/SOL/BSC/Base
Momentum is the only push engine — narrative is just a classification label. Scans every 30 seconds across 4 chains. Tokens must show 3 consecutive rounds of market cap increase with ≥5% total gain to trigger an alert. Narratives (Musk/Trump, Binance/CZ, celebrity) are classified as ★★★/★★/★ labels but never trigger pushes on their own. Safety checks via RugCheck (SOL) and GoPlus (EVM). Pure Python, zero AI cost.
Full source code #
#!/usr/bin/env python3
"""
radar → radar v1
Python,zero AI cost(key matching + )
3. Binance/CZ — BSC
data:GMGN + DEXScreenersearch
"""
import requests
import json
import time
import os
import re
import sqlite3
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from difflib import SequenceMatcher
# === configuration ===
DATA_DIR = os.path.expanduser("~/crypto-trading")
DB_FILE = os.path.join(DATA_DIR, "narrative_history.db")
LOG_FILE = os.path.join(DATA_DIR, "narrative_radar.log")
SEEN_FILE = os.path.join(DATA_DIR, "narrative_seen.json")
FLAP_SEEN_FILE = os.path.join(DATA_DIR, "flap_seen.json")
SCAN_INTERVAL = 30 # 30(GMGNdata1-5,10data)
# {address: [{'ts': timestamp, 'mc': market_cap, 'vol': volume, 'price': price}, ...]}
MOMENTUM_TRACKER = {}
MOMENTUM_PUSHED = {} # {address: {'count': N, 'last_ts': ts, 'last_mc': mc}}
MOMENTUM_CONSECUTIVE_UP = 3 # consecutive3(data)
# from.envreadTGconfiguration
def load_env():
env = {}
env_file = os.path.expanduser("~/.env")
if os.path.exists(env_file):
with open(env_file) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v
return env
ENV = load_env()
TG_TOKEN = ENV.get('TELEGRAM_BOT_TOKEN', '')
TG_CHAT_ID = int(os.environ.get('TG_CHAT_ID', '0'))
GMGN_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'application/json',
'Referer': 'https://gmgn.ai/',
}
# ============================================================
# ============================================================
MUSK_TRUMP_KEYWORDS = {
'musk', 'elon', 'elonmusk',
# SpaceX/Tesla/X
'spacex', 'starship', 'tesla', 'cybertruck', 'roadster',
'neuralink', 'boring', 'hyperloop', 'xai', 'grok',
'floki', 'shiba', #
'doge father', 'dogefather', 'technoking',
'mars colony', 'mars',
'trump', 'donald', 'maga', 'potus', 'trump47',
'melania', 'barron', 'ivanka',
'dark maga', 'darkmaga', 'ultra maga', 'save america',
'truth social', 'covfefe',
'doge department', 'd.o.g.e', 'government efficiency',
}
MUSK_TRUMP_PATTERNS = [
r'\belon\b', r'\bmusk\b', r'\btrump\b', r'\bmaga\b',
r'\bspacex\b', r'\bstarship\b', r'\btesla\b', r'\bgrok\b',
r'\bmelania\b', r'\bbarron\b', r'\bdoge\s*department\b',
r'\bd\.?o\.?g\.?e\b', # D.O.G.E
r'\bx\s*ai\b', r'\bneuralink\b',
]
# ============================================================
# ============================================================
BINANCE_CZ_KEYWORDS = {
# CZcore
'cz', 'changpeng', 'zhao', 'czb', 'czbinance',
'heyi', 'yi he', 'he yi', '', 'yihe',
'sister yi', 'yi jie', '', '',
'binance', 'bnb', 'pancake', 'pancakeswap',
'giggle academy', 'binance life', 'bnb chain',
'principles', 'cz book',
'yzi', 'yzi labs',
'', 'Binance', '', 'cz', '',
'fourmeme', 'four meme', '4meme',
'czs dog', 'cz dog', 'bnb dog',
'build on bnb', 'bnb ecosystem',
}
BINANCE_CZ_PATTERNS = [
r'\bcz\b', r'\bbinance\b', r'\bbnb\b',
r'\bheyi\b', r'\byi\s*he\b', r'\bhe\s*yi\b',
r'\b\b', r'\b\b',
r'\bpancake\b', r'\bgiggle\b', r'\byzi\b',
r'\bfourmeme\b', r'\b4meme\b',
]
# ============================================================
# ============================================================
CELEBRITY_VIRAL_KEYWORDS = {
'vitalik', 'buterin', 'sam altman', 'satoshi',
'michael saylor', 'saylor', 'cathie wood',
'jack dorsey', 'zuckerberg', 'bezos',
'jensen huang', 'nvidia', 'tim cook',
'justin sun', 'sun yuchen', '', 'tron',
'arthur hayes', 'su zhu', '3ac',
'brian armstrong', 'coinbase',
'larry fink', 'blackrock',
'gary gensler', 'sec',
'michael novogratz', 'galaxy',
'biden', 'obama', 'putin', 'xi jinping',
'kanye', 'drake', 'snoop dogg', 'paris hilton',
'mark cuban', 'mr beast', 'mrbeast',
'lobster', '', 'lobsta',
'hawk tuah', 'griddy', 'skibidi',
'rizz', 'sigma', 'gyatt',
'etf', 'halving', '',
'world war', 'wwiii',
'fed', 'rate cut', '',
'tiktok ban', 'tiktok',
}
CELEBRITY_VIRAL_PATTERNS = [
r'\bvitalik\b', r'\bsaylor\b', r'\bblackrock\b',
r'\bcoinbase\b', r'\bjustin\s*sun\b', r'\blobster\b',
r'\betf\b', r'\bhalving\b', r'\bmrbeast\b',
r'\bsnoop\b', r'\bkanye\b', r'\bdrake\b',
]
# ============================================================
# ============================================================
SPAM_PATTERNS = [
r'airdrop', r'presale', r'pre\s*sale',
r'1000x', r'100x guaranteed',
r'safe\s*moon', r'baby\s*\w+', # babydoge
r'pornhub', r'porn', r'xxx', r'nsfw',
r'nigga', r'nigger', r'faggot',
r'scam', r'rugpull', r'rug\s*pull',
r'official\s*token', r'official\s*coin',
]
COMMON_NOISE_WORDS = {
'nice', 'good', 'bad', 'cool', 'hot', 'big', 'small',
'life', 'love', 'hate', 'happy', 'sad', 'fun', 'lol',
'cat', 'dog', 'moon', 'sun', 'star', 'king', 'queen',
'gold', 'rich', 'cash', 'money', 'pay', 'buy', 'sell',
'pump', 'dump', 'bull', 'bear', 'green', 'red',
'hello', 'world', 'yes', 'no', 'wow', 'omg', 'lmao',
'simp', 'chad', 'based', 'cope', 'seethe',
'test', 'new', 'old', 'real', 'fake',
'shit', 'shitcoin', 'fuck', 'fart', 'poop', 'pee',
'cum', 'dick', 'ass', 'boob', 'tit',
'nigga', 'retard', 'slop',
'the', 'and', 'for', 'from', 'with', 'this', 'that',
'coin', 'token', 'meme', 'pepe', 'wojak',
'peg', 'usd', 'usdt', 'usdc', 'dai',
}
# ============================================================
# toolfunction
# ============================================================
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
os.makedirs(DATA_DIR, exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(line + '\n')
def load_flap_seen():
if os.path.exists(FLAP_SEEN_FILE):
try:
with open(FLAP_SEEN_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_flap_seen(data):
cutoff = int(time.time()) - 86400 * 7
data = {k: v for k, v in data.items() if v > cutoff}
with open(FLAP_SEEN_FILE, 'w') as f:
json.dump(data, f)
def tg_send(text, parse_mode='Markdown'):
if not TG_TOKEN:
log(f"[TG] No token, skip: {text[:80]}")
return False
try:
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text, 'parse_mode': parse_mode},
timeout=10
)
result = resp.json()
if not result.get('ok'):
if 'can\'t parse' in str(result.get('description', '')).lower():
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text},
timeout=10
)
else:
log(f"[TG] Error: {result.get('description', '')}")
return False
return True
except Exception as e:
log(f"[TG] Send error: {e}")
return False
# ============================================================
# ============================================================
def init_db():
"""initializeSQLitelibrary"""
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS narratives (
id INTEGER PRIMARY KEY AUTOINCREMENT,
theme TEXT NOT NULL, -- ()
first_token_name TEXT, -- token
first_token_address TEXT, -- address
first_chain TEXT, --
first_seen_at INTEGER, --
token_count INTEGER DEFAULT 1, --
last_seen_at INTEGER --
)''')
c.execute('''CREATE TABLE IF NOT EXISTS tokens_seen (
address TEXT PRIMARY KEY,
chain TEXT,
name TEXT,
symbol TEXT,
narrative_theme TEXT,
category TEXT, -- 'musk_trump' / 'binance_cz' / 'novel' / 'common'
first_seen_at INTEGER,
market_cap REAL,
pushed INTEGER DEFAULT 0, --
seen_count INTEGER DEFAULT 1 --
)''')
# index
c.execute('CREATE INDEX IF NOT EXISTS idx_theme ON narratives(theme)')
c.execute('CREATE INDEX IF NOT EXISTS idx_addr ON tokens_seen(address)')
conn.commit()
return conn
def normalize_theme(name, symbol):
"""
:'Elon Mars Colony' → 'elon mars colony'
'TRUMP2028' → 'trump'
'PancakeBunny' → 'pancake bunny'
"""
text = f"{name} {symbol}".lower().strip()
noise = ['token', 'coin', 'inu', 'swap', 'finance', 'protocol',
'dao', 'defi', 'nft', 'meta', 'verse', 'fi', 'ai',
'pepe', 'wojak', 'chad', 'based']
# splitcamelCase
text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
text = re.sub(r'\d+x?', '', text)
text = re.sub(r'[^a-z\s]', ' ', text)
words = [w for w in text.split() if w and len(w) > 1 and w not in noise]
if not words:
return name.lower().strip()
return ' '.join(sorted(set(words)))
def is_similar_theme(theme1, theme2, threshold=0.7):
if theme1 == theme2:
return True
if theme1 in theme2 or theme2 in theme1:
return True
words1 = set(theme1.split())
words2 = set(theme2.split())
if words1 and words2:
overlap = len(words1 & words2) / min(len(words1), len(words2))
if overlap >= 0.6:
return True
return SequenceMatcher(None, theme1, theme2).ratio() >= threshold
def check_narrative_novelty(conn, theme, name, symbol, address, chain):
"""
return:
('novel', None) —
('heating', narrative_row) — !!
('existing', existing_theme_row) — ,
"""
c = conn.cursor()
now = int(time.time())
HEAT_WINDOW = 1800 # 30
HEAT_THRESHOLD = 2 # 2aboveis
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives WHERE theme = ?', (theme,))
exact = c.fetchone()
if exact:
row_id, _, _, _, _, first_seen, count, last_seen = exact
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE theme = ?',
(new_count, now, theme))
conn.commit()
if now - first_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
return ('existing', exact)
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives ORDER BY last_seen_at DESC LIMIT 1000')
for row in c.fetchall():
if is_similar_theme(theme, row[1]):
row_id, _, _, _, _, first_seen, count, last_seen = row
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE id = ?',
(new_count, now, row[0]))
conn.commit()
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', row)
return ('existing', row)
c.execute('''INSERT INTO narratives (theme, first_token_name, first_token_address, first_chain, first_seen_at, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?)''',
(theme, name, address, chain, now, now))
conn.commit()
return ('novel', None)
def get_token_seen_count(conn, address):
c = conn.cursor()
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
row = c.fetchone()
return row[0] if row else 0
def is_token_seen(conn, address):
c = conn.cursor()
c.execute('SELECT address FROM tokens_seen WHERE address = ?', (address,))
return c.fetchone() is not None
def record_token(conn, address, chain, name, symbol, theme, category, mc, pushed=False):
c = conn.cursor()
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
existing = c.fetchone()
if existing:
new_count = existing[0] + 1
c.execute('''UPDATE tokens_seen SET seen_count = ?, market_cap = ?, category = ?
WHERE address = ?''', (new_count, mc, category, address))
else:
c.execute('''INSERT INTO tokens_seen
(address, chain, name, symbol, narrative_theme, category, first_seen_at, market_cap, pushed, seen_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)''',
(address, chain, name, symbol, theme, category, int(time.time()), mc, 1 if pushed else 0))
conn.commit()
# ============================================================
# ============================================================
def classify_narrative(name, symbol, chain):
"""
classificationtoken
return:('musk_trump', matched_keywords) / ('binance_cz', matched_keywords) / ('novel', None) / ('common', None)
"""
text = f"{name} {symbol}".lower()
for pat in SPAM_PATTERNS:
if re.search(pat, text, re.IGNORECASE):
return ('spam', None)
matched_mt = []
for kw in MUSK_TRUMP_KEYWORDS:
if kw.lower() in text:
matched_mt.append(kw)
if not matched_mt:
for pat in MUSK_TRUMP_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_mt.append(m.group())
if matched_mt:
chain_lower = chain.lower()
if chain_lower in ('eth', 'ethereum', 'sol', 'solana', 'bsc', 'base'):
return ('musk_trump', matched_mt)
matched_bc = []
for kw in BINANCE_CZ_KEYWORDS:
if kw.lower() in text:
matched_bc.append(kw)
if not matched_bc:
for pat in BINANCE_CZ_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_bc.append(m.group())
if matched_bc:
chain_lower = chain.lower()
if chain_lower in ('bsc',):
return ('binance_cz', matched_bc)
else:
return ('binance_cz_wrong_chain', matched_bc)
matched_cv = []
for kw in CELEBRITY_VIRAL_KEYWORDS:
if kw.lower() in text:
matched_cv.append(kw)
if not matched_cv:
for pat in CELEBRITY_VIRAL_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_cv.append(m.group())
if matched_cv:
return ('celebrity_viral', matched_cv)
return ('check_novelty', None)
# ============================================================
# ============================================================
def check_token_safety(chain, address):
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://api.rugcheck.xyz/v1/tokens/{address}/report', timeout=10)
if r.status_code == 200:
data = r.json()
score = data.get('score', 999)
mint = data.get('mintAuthority')
freeze = data.get('freezeAuthority')
return {
'safe': not mint and not freeze,
'score': score, 'mint': mint is not None,
'freeze': freeze is not None
}
except:
pass
else:
chain_map = {'ethereum': '1', 'eth': '1', 'bsc': '56', 'base': '8453'}
cid = chain_map.get(chain, '1')
try:
r = requests.get(f'https://api.gopluslabs.io/api/v1/token_security/{cid}?contract_addresses={address}', timeout=10)
if r.status_code == 200:
result = r.json().get('result', {})
data = result.get(address.lower(), {})
if data:
honeypot = data.get('is_honeypot', '0') == '1'
mintable = data.get('is_mintable', '0') == '1'
sell_tax = float(data.get('sell_tax', '0') or '0')
buy_tax = float(data.get('buy_tax', '0') or '0')
return {
'safe': not honeypot and not mintable, # as
'honeypot': honeypot, 'mintable': mintable,
'sell_tax': sell_tax, 'buy_tax': buy_tax
}
except:
pass
# ============================================================
# ============================================================
def gmgn_get(url):
try:
resp = requests.get(url, headers=GMGN_HEADERS, timeout=15)
if resp.status_code == 200:
return resp.json().get('data', {})
except:
pass
return {}
def fetch_token_description(chain, address):
"""token/ — radarcore """
desc = ''
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://frontend-api-v3.pump.fun/coins/{address}', timeout=8)
if r.status_code == 200:
data = r.json()
desc = data.get('description', '') or ''
twitter = data.get('twitter', '') or ''
telegram = data.get('telegram', '') or ''
website = data.get('website', '') or ''
return {
'description': desc.strip(),
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
try:
chain_dex = {'sol': 'solana', 'eth': 'ethereum', 'bsc': 'bsc', 'base': 'base',
'solana': 'solana', 'ethereum': 'ethereum'}.get(chain, chain)
r = requests.get(f'https://api.dexscreener.com/latest/dex/tokens/{address}', timeout=8)
if r.status_code == 200:
pairs = r.json().get('pairs', [])
if pairs:
info = pairs[0].get('info', {})
websites = info.get('websites', [])
socials = info.get('socials', [])
twitter = ''
telegram = ''
website = ''
for s in socials:
if s.get('type') == 'twitter':
twitter = s.get('url', '')
elif s.get('type') == 'telegram':
telegram = s.get('url', '')
for w in websites:
if w.get('label', '').lower() == 'website':
website = w.get('url', '')
if not desc:
return {
'description': desc,
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
return {'description': desc, 'twitter': '', 'telegram': '', 'website': ''}
def fetch_new_tokens():
"""fromGMGN + multi-dimensionalcovering"""
all_tokens = []
seen_addrs = set()
for chain in ['eth', 'bsc', 'base']:
urls = [
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=open_timestamp&direction=desc&limit=100',
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=swaps&direction=desc&limit=50',
]
for url in urls:
data = gmgn_get(url)
tokens = data.get('rank', [])
for t in tokens:
addr = t.get('address', '')
if not addr or addr in seen_addrs:
continue
mc = t.get('market_cap', 0) or t.get('fdv', 0) or 0
liq = t.get('liquidity', 0) or 0
if mc < 1000 or liq < 500 or mc > 10000000:
continue
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 999
seen_addrs.add(addr)
all_tokens.append({
'address': addr,
'chain': chain,
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': t.get('volume', 0) or 0,
'holders': t.get('holder_count', 0) or 0,
'sm': t.get('smart_degen_count', 0) or 0,
'chg_1h': t.get('price_change_percent1h', 0) or 0,
'chg_24h': t.get('price_change_percent', 0) or 0,
'age_h': age_h,
'price': t.get('price', 0),
'buys_1h': t.get('buys', 0) or 0,
'sells_1h': t.get('sells', 0) or 0,
})
time.sleep(0.3)
return all_tokens
def fetch_flap_tokens():
"""
FLAPplatformscan — BSCcommunitydriver
features:24h,but 1hstabilize/,>sell,holders
"""
data = gmgn_get(
'https://gmgn.ai/defi/quotation/v1/rank/bsc/swaps/24h?launchpad=flap&orderby=volume&direction=desc&limit=30'
)
tokens = data.get('rank', [])
candidates = []
for t in tokens:
addr = t.get('address', '')
if not addr:
continue
mc = t.get('market_cap', 0) or 0
liq = t.get('liquidity', 0) or 0
vol = t.get('volume', 0) or 0
holders = t.get('holder_count', 0) or 0
buys = t.get('buys', 0) or 0
sells = t.get('sells', 0) or 0
chg_1h = t.get('price_change_percent1h', 0) or 0
chg_24h = t.get('price_change_percent', 0) or 0
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 0
if mc < 1000 or liq < 500:
continue
if holders < 5:
continue
buy_ratio = buys / max(sells, 1)
is_support = False
reason = ''
if chg_24h < -10 and chg_1h > chg_24h * 0.3:
is_support = True
reason = f'24h{chg_24h:.0f}%but 1hstabilize{chg_1h:+.0f}%'
if -10 <= chg_24h <= 30 and chg_1h > -5 and buy_ratio > 1.1:
is_support = True
reason = f' {buy_ratio:.2f}'
if chg_24h < -30 and chg_1h > 10:
is_support = True
reason = f'{chg_24h:.0f}%bounce{chg_1h:+.0f}%'
if is_support and buy_ratio >= 1.0:
candidates.append({
'address': addr,
'chain': 'bsc',
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': vol,
'holders': holders,
'sm': 0,
'chg_1h': chg_1h,
'chg_24h': chg_24h,
'age_h': age_h,
'price': t.get('price', 0),
'buys': buys,
'sells': sells,
'buy_ratio': buy_ratio,
'support_reason': reason,
'launchpad': 'flap',
})
candidates.sort(key=lambda x: x['mc'], reverse=True)
return candidates
def format_flap_alert(token, desc_info=None):
msg = f"radar — FLAP\n"
msg += f": BSC | platform: FLAP\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {token['support_reason']}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f"liquidity ${token['liq']:>12,.0f}\n"
msg += f"24h ${token['volume']:>12,.0f}\n"
msg += f" {token['holders']:>12,d}\n"
msg += f"/ {token['buys']:>6,d}/{token['sells']:>6,d}\n"
msg += f" {token['buy_ratio']:>12.2f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
msg += f"24h {token['chg_24h']:>+11.1f}%\n"
msg += f"```\n"
msg += "\nFLAPcommunity — "
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# ============================================================
def format_musk_trump_alert(token, matched_kw, desc_info=None):
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f"key : {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f"liquidity ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```\n"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_binance_cz_alert(token, matched_kw, desc_info=None):
"""Binance/CZ"""
msg = f"radar — Binance/CZ\n"
msg += f": BSC\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f"key : {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f"liquidity ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```\n"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_novel_narrative_alert(token, theme, desc_info=None):
return format_heating_narrative_alert(token, theme, 1, desc_info)
def format_heating_narrative_alert(token, theme, count, desc_info=None):
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"radar — \n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 300:
desc = desc[:300] + '...'
msg += f": {desc}\n\n"
else:
msg += f": {theme}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f"liquidity ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['holders']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# ============================================================
def track_momentum(tokens):
"""
"""
global MOMENTUM_TRACKER, MOMENTUM_PUSHED
now = time.time()
alerts = []
current_addrs = set()
for token in tokens:
addr = token['address']
mc = token['mc']
vol = token.get('volume', 0) or 0
price = token.get('price', 0) or 0
buys = token.get('buys_1h', 0) or token.get('buys', 0) or 0
current_addrs.add(addr)
if mc < 1000 or token.get('liq', 0) < 500 or mc > 10000000:
continue
if addr not in MOMENTUM_TRACKER:
MOMENTUM_TRACKER[addr] = []
snapshots = MOMENTUM_TRACKER[addr]
if snapshots and snapshots[-1]['mc'] == mc and snapshots[-1]['vol'] == vol:
continue # data,
snapshots.append({
'ts': now,
'mc': mc,
'vol': vol,
'price': price,
'buys': buys,
})
if len(snapshots) > 20:
snapshots[:] = snapshots[-20:]
if len(snapshots) < MOMENTUM_CONSECUTIVE_UP:
continue
recent = snapshots[-MOMENTUM_CONSECUTIVE_UP:]
consecutive_up = True
total_gain = 0
for i in range(1, len(recent)):
prev_mc = recent[i-1]['mc']
curr_mc = recent[i]['mc']
if prev_mc <= 0:
consecutive_up = False
break
gain = (curr_mc - prev_mc) / prev_mc
consecutive_up = False
break
total_gain += gain
if not consecutive_up:
continue
vol_increasing = True
for i in range(1, len(recent)):
if recent[i]['buys'] < recent[i-1]['buys'] * 0.8: #
vol_increasing = False
break
first_mc = recent[0]['mc']
last_mc = recent[-1]['mc']
pct_gain = ((last_mc - first_mc) / first_mc * 100) if first_mc > 0 else 0
if pct_gain < 5:
continue
push_info = MOMENTUM_PUSHED.get(addr, {'count': 0, 'last_ts': 0, 'last_mc': 0})
if push_info['count'] > 0 and last_mc <= push_info['last_mc']:
continue
push_info['count'] += 1
push_info['last_ts'] = now
push_info['last_mc'] = last_mc
signal_count = push_info['count']
safety = check_token_safety(token['chain'], addr)
if not safety.get('safe'):
continue
category, matched_kw = classify_narrative(token['name'], token['symbol'], token['chain'])
is_flap = token.get('launchpad') == 'flap'
if category == 'musk_trump':
stars = 3
narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
elif category == 'binance_cz':
stars = 3
narrative_tag = f"Binance/CZ ({', '.join(matched_kw[:3])})"
elif category == 'celebrity_viral':
stars = 2
narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
elif is_flap:
stars = 2
narrative_tag = "FLAPcommunity"
else:
theme = normalize_theme(token['name'], token['symbol'])
theme_words = [w for w in theme.split() if w not in COMMON_NOISE_WORDS and len(w) > 2]
if len(theme_words) >= 2:
stars = 2
narrative_tag = f": {theme}"
else:
stars = 1
narrative_tag = ""
desc_info = fetch_token_description(token['chain'], addr)
if is_flap:
has_twitter = bool(desc_info.get('twitter'))
has_tg = bool(desc_info.get('telegram'))
has_web = bool(desc_info.get('website'))
community_tags = []
if has_twitter:
community_tags.append("")
if has_tg:
community_tags.append("TG")
if has_web:
community_tags.append("")
if community_tags:
narrative_tag += f" | {' '.join(community_tags)}"
stars = min(3, stars + 1) # community
else:
narrative_tag += " | communityLinks"
msg = format_momentum_alert(token, pct_gain, len(recent), vol_increasing, stars, narrative_tag, desc_info, signal_count)
alerts.append({'msg': msg, 'token': token})
MOMENTUM_PUSHED[addr] = push_info
log(f"[{signal_count}] {token['name']} ({token['symbol']}) on {token['chain']} — {len(recent)} +{pct_gain:.1f}%")
stale = [a for a in MOMENTUM_TRACKER if a not in current_addrs]
for a in stale:
if now - MOMENTUM_TRACKER[a][-1]['ts'] > 600: # 10
del MOMENTUM_TRACKER[a]
MOMENTUM_PUSHED = {k: v for k, v in MOMENTUM_PUSHED.items() if now - v.get('last_ts', 0) < 3600}
return alerts
def format_momentum_alert(token, pct_gain, rounds, vol_up, stars, narrative_tag, desc_info=None, seen_count=0):
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
vol_tag = "" if vol_up else ""
star_str = "★" * stars + "☆" * (3 - stars)
msg = f"radar\n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {narrative_tag}\n"
msg += f"{rounds} +{pct_gain:.1f}% {vol_tag}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f"liquidity ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```\n"
msg += f": {star_str} : {seen_count}"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
def format_celebrity_alert(token, matched_kw, desc_info=None):
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f"radar — / ★★\n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f"key : {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f"liquidity ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# ============================================================
def scan_narratives():
"""scanfunction"""
conn = init_db()
tokens = fetch_new_tokens()
log(f"scan {len(tokens)} ...")
flap_tokens = []
try:
flap_tokens = fetch_flap_tokens()
except:
pass
all_momentum_tokens = tokens + flap_tokens
momentum_alerts = track_momentum(all_momentum_tokens)
for token in tokens:
addr = token['address']
chain = token['chain']
name = token['name']
symbol = token['symbol']
if is_token_seen(conn, addr):
# updateseen_count
c = conn.cursor()
c.execute('UPDATE tokens_seen SET seen_count = seen_count + 1, market_cap = ? WHERE address = ?', (token['mc'], addr))
theme_tmp = normalize_theme(name, symbol)
if theme_tmp:
c.execute('UPDATE narratives SET token_count = token_count + 1, last_seen_at = ? WHERE theme = ?', (int(time.time()), theme_tmp))
conn.commit()
continue
category, matched_kw = classify_narrative(name, symbol, chain)
if category == 'spam':
record_token(conn, addr, chain, name, symbol, '', 'spam', token['mc'])
continue
min_mc = 1000
min_liq = 500
if token['mc'] < min_mc or token['liq'] < min_liq:
record_token(conn, addr, chain, name, symbol, '', 'too_small', token['mc'])
continue
theme = normalize_theme(name, symbol)
record_token(conn, addr, chain, name, symbol, theme, category, token['mc'])
check_narrative_novelty(conn, theme, name, symbol, addr, chain)
conn.close()
pushed = 0
for ma in momentum_alerts[:8]: # 8
if tg_send(ma['msg']):
pushed += 1
time.sleep(1) # TGrate limiting
return pushed, len(momentum_alerts)
# ============================================================
# ============================================================
def main():
log("=" * 50)
log("radar v1 ")
log(f"scan: {SCAN_INTERVAL}s")
log("=" * 50)
# initializeDB
init_db()
tg_send(
"radar v1 \n\n"
"classificationlabels:\n"
"★★★ / | Binance/CZ | FLAPcommunity\n"
"★★ | FLAPcommunity | \n"
f"scan: {SCAN_INTERVAL}"
)
scan_count = 0
total_pushed = 0
while True:
try:
scan_count += 1
pushed, found = scan_narratives()
total_pushed += pushed
if pushed > 0:
log(f"{scan_count}: {found}, {pushed} ({total_pushed})")
else:
if scan_count % 20 == 0: # 20
log(f"{scan_count}: ({total_pushed})")
except Exception as e:
log(f"scan: {e}")
time.sleep(SCAN_INTERVAL)
if __name__ == '__main__':
main()
OI + Funding Rate Scanner #
Date: 2026.04.25 Tags: Python · Binance Futures · Telegram
Funding rate flip detection + OI surge
Snapshot-based scanner: detects funding rate flipping from positive to negative while OI is rising. Runs every 5 minutes.
Full source code #
#!/usr/bin/env python3
"""
"""
import requests
import json
import os
import time
import sys
from datetime import datetime, timedelta
from pathlib import Path
# ============ configuration ============
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env.oi"
ALERT_HISTORY_FILE = SCRIPT_DIR / "oi_funding_alerts.json"
FR_SNAPSHOT_FILE = SCRIPT_DIR / "fr_snapshot.json" #
MIN_OI_CHANGE_PCT = 8 # OI8%
MIN_VOLUME_USDT = 0 # ,
MIN_FR_PERIODS_POSITIVE = 2 # 2
DEDUP_HOURS = 24 # 24
def load_env():
env = {}
if ENV_FILE.exists():
for line in ENV_FILE.read_text().strip().split('\n'):
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k.strip()] = v.strip()
return env
env = load_env()
TG_BOT_TOKEN = env.get('TG_BOT_TOKEN', '')
TG_CHAT_ID = env.get('TG_CHAT_ID', '')
def send_tg(text):
if not TG_BOT_TOKEN or not TG_CHAT_ID:
print("[TG] configuration, :")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
for chunk in chunks:
try:
resp = requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk,
'parse_mode': 'Markdown'
}, timeout=10)
if resp.status_code != 200:
requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk
}, timeout=10)
except Exception as e:
print(f"[TG] send: {e}")
def load_alert_history():
if ALERT_HISTORY_FILE.exists():
try:
return json.loads(ALERT_HISTORY_FILE.read_text())
except:
return {}
return {}
def save_alert_history(history):
ALERT_HISTORY_FILE.write_text(json.dumps(history))
def is_duplicate(symbol, history):
if symbol not in history:
return False
last_alert = datetime.fromisoformat(history[symbol])
return (datetime.now() - last_alert).total_seconds() < DEDUP_HOURS * 3600
def mark_alerted(symbol, history):
history[symbol] = datetime.now().isoformat()
cutoff = datetime.now() - timedelta(hours=DEDUP_HOURS * 2)
history = {k: v for k, v in history.items()
if datetime.fromisoformat(v) > cutoff}
return history
def load_fr_snapshot():
if FR_SNAPSHOT_FILE.exists():
try:
return json.loads(FR_SNAPSHOT_FILE.read_text())
except:
pass
return {}
def save_fr_snapshot(snapshot):
FR_SNAPSHOT_FILE.write_text(json.dumps(snapshot))
# ============ core scan ============
def scan():
ts_start = time.time()
try:
info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo', timeout=10).json()
symbols = [s['symbol'] for s in info['symbols']
if s['contractType'] == 'PERPETUAL' and s['quoteAsset'] == 'USDT' and s['status'] == 'TRADING']
except Exception as e:
print(f"[ERROR] exchangeInfo: {e}")
return []
try:
tickers = requests.get('https://fapi.binance.com/fapi/v1/ticker/24hr', timeout=10).json()
ticker_map = {t['symbol']: t for t in tickers}
except Exception as e:
print(f"[ERROR] ticker: {e}")
return []
active = [s for s in symbols if float(ticker_map.get(s, {}).get('quoteVolume', 0)) > MIN_VOLUME_USDT]
try:
fr_all = requests.get('https://fapi.binance.com/fapi/v1/premiumIndex', timeout=10).json()
fr_current = {item['symbol']: float(item['lastFundingRate']) for item in fr_all}
except:
fr_current = {}
prev_snapshot = load_fr_snapshot()
save_fr_snapshot(fr_current)
if not prev_snapshot:
print(f"[{datetime.now().strftime('%H:%M:%S')}] run,,")
return []
just_turned_negative = []
for sym in active:
prev_fr = prev_snapshot.get(sym)
curr_fr = fr_current.get(sym)
if prev_fr is None or curr_fr is None:
continue
if prev_fr >= 0 and curr_fr < 0:
just_turned_negative.append(sym)
if not just_turned_negative:
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] scan: {len(active)}/{elapsed:.1f}s, ")
return []
print(f"[{datetime.now().strftime('%H:%M:%S')}] {len(just_turned_negative)} : {just_turned_negative}")
signals = []
for sym in just_turned_negative:
try:
oi_hist = requests.get('https://fapi.binance.com/futures/data/openInterestHist',
params={'symbol': sym, 'period': '1h', 'limit': 48}, timeout=10).json()
oi_chg = 0
segs = []
oi_rising = False
if oi_hist and len(oi_hist) >= 12:
oi_values = [float(x['sumOpenInterestValue']) for x in oi_hist]
seg_len = len(oi_values) // 4
if seg_len >= 3:
segs = [
sum(oi_values[:seg_len]) / seg_len,
sum(oi_values[seg_len:seg_len*2]) / seg_len,
sum(oi_values[seg_len*2:seg_len*3]) / seg_len,
sum(oi_values[seg_len*3:]) / max(1, len(oi_values[seg_len*3:]))
]
oi_chg = (segs[3] - segs[0]) / segs[0] * 100 if segs[0] > 0 else 0
oi_rising = oi_chg > 0
t = ticker_map.get(sym, {})
signals.append({
'symbol': sym,
'price': float(t.get('lastPrice', 0)),
'price_chg_24h': float(t.get('priceChangePercent', 0)),
'volume': float(t.get('quoteVolume', 0)),
'oi_change': oi_chg,
'oi_segments': segs,
'oi_rising': oi_rising,
'current_fr': fr_current.get(sym, 0),
'prev_fr': prev_snapshot.get(sym, 0),
})
except:
continue
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] scan: {len(active)}/{elapsed:.1f}s, : {len(signals)}")
return signals
def get_square_discussion(coin):
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v4/friendly/pgc/content/queryByHashtag",
params={"hashtag": f"#{coin.lower()}", "pageIndex": 1, "pageSize": 1, "orderBy": "HOT"},
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.binance.com/en/square"},
timeout=8
)
if r.status_code == 200:
ht = r.json().get("data", {}).get("hashtag", {})
return ht.get("contentCount", 0), ht.get("viewCount", 0)
except:
pass
return 0, 0
def get_market_caps():
mcap = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap[name] = float(mc)
except:
pass
return mcap
def get_spot_symbols():
try:
info = requests.get("https://api.binance.com/api/v3/exchangeInfo", timeout=10).json()
return {s["baseAsset"] for s in info["symbols"]
if s["quoteAsset"] == "USDT" and s["status"] == "TRADING"}
except:
return set()
def fmt_mcap(v):
if v >= 1e9: return f"${v/1e9:.2f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def fmt_views(v):
if v >= 1e6: return f"{v/1e6:.1f}M"
if v >= 1e3: return f"{v/1e3:.0f}K"
return str(v)
def format_alert(signals):
if not signals:
return None
signals.sort(key=lambda x: (-int(x.get('oi_rising', False)), x['current_fr']))
mcap_map = get_market_caps()
spot_set = get_spot_symbols()
now = datetime.now().strftime('%m-%d %H:%M')
lines = [f"*[ +OI ]* {now}\n"]
for s in signals:
coin = s['symbol'].replace('USDT', '')
fr_change = f"{s['prev_fr']:+.4%} -> {s['current_fr']:+.4%}"
mcap = mcap_map.get(coin, 0)
has_spot = coin in spot_set
sq_posts, sq_views = get_square_discussion(coin)
lines.append(f"```")
lines.append(f"{coin}")
lines.append(f" : {s['price']:.4f} 24h: {s['price_chg_24h']:+.1f}%")
lines.append(f" : {fr_change}")
if s['oi_segments']:
oi_segs = ' > '.join([f"{v/1e6:.1f}M" for v in s['oi_segments']])
lines.append(f" OI: +{s['oi_change']:.1f}% ({oi_segs})")
lines.append(f" : ${s['volume']/1e6:.1f}M")
lines.append(f" : {fmt_mcap(mcap) if mcap > 0 else ''} : {'' if has_spot else 'futures'}")
if sq_posts > 0:
lines.append(f" : {sq_posts} / {fmt_views(sq_views)}")
else:
lines.append(f" : ")
lines.append(f"```")
return '\n'.join(lines)
def main():
signals = scan()
if signals:
strong = [s for s in signals if s['current_fr'] < 0 and s.get('oi_rising')]
if strong:
msg = format_alert(strong)
if msg:
send_tg(msg)
print(f" {len(strong)} ({len(signals)}, {len(strong)}OI)")
else:
print(f" {len(signals)} but OI, ")
else:
print(f" ")
if __name__ == '__main__':
main()
Accumulation Radar #
Date: 2026.04.25 Tags: Python · Binance · CoinGlass · Telegram
Momentum + OI anomaly + smart alerts
Hourly scan: top gainers momentum tracking, OI anomaly detection, and Telegram push. Pure Python, zero AI cost.
Full source code #
#!/usr/bin/env python3
"""
longradar v2 — ++OI scan
data:BinancefuturesAPI + CoinGecko Trending()
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
from square_heat import get_square_heat
env_file = Path(__file__).parent / ".env.oi"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
# === configuration ===
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "YOUR_CHAT_ID")
FAPI = "https://fapi.binance.com"
HEAT_HISTORY_FILE = Path(__file__).parent / "heat_history.json"
VOL_SURGE_MULT = 2.5 # 2.5above=
MIN_VOL_USD = 20_000_000 # >$20Mdetection
MIN_OI_DELTA_PCT = 3.0 # OI3%
MIN_OI_USD = 2_000_000 # OI $2M
def api_get(endpoint, params=None):
"""BinanceAPI"""
url = f"{FAPI}{endpoint}"
for attempt in range(3):
try:
resp = requests.get(url, params=params, timeout=10)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 429:
time.sleep(2)
else:
return None
except:
time.sleep(1)
return None
def format_usd(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def mcap_str(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.0f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def send_telegram(text):
"""sendTG"""
if not TG_BOT_TOKEN:
print("\n[TG] No token, stdout:\n")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
chunks = []
current = ""
for line in text.split("\n"):
if len(current) + len(line) + 1 > 3800:
chunks.append(current)
current = line
else:
current += "\n" + line if current else line
if current:
chunks.append(current)
for chunk in chunks:
try:
resp = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk,
"parse_mode": "Markdown"
}, timeout=10)
if resp.status_code == 200:
print(f"[TG] Sent ✓ ({len(chunk)} chars)")
else:
resp2 = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk.replace("*", "").replace("_", ""),
}, timeout=10)
print(f"[TG] Sent plain ({'✓' if resp2.status_code == 200 else '✗'})")
except Exception as e:
print(f"[TG] Error: {e}")
time.sleep(0.5)
def main():
print(f"🔥 longradar v2 — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
tickers_raw = api_get("/fapi/v1/ticker/24hr")
premiums_raw = api_get("/fapi/v1/premiumIndex")
if not tickers_raw or not premiums_raw:
print("❌ API")
return
ticker_map = {}
for t in tickers_raw:
if t["symbol"].endswith("USDT"):
ticker_map[t["symbol"]] = {
"px_chg": float(t["priceChangePercent"]),
"vol": float(t["quoteVolume"]),
"price": float(t["lastPrice"]),
}
funding_map = {}
for p in premiums_raw:
if p["symbol"].endswith("USDT"):
funding_map[p["symbol"]] = float(p["lastFundingRate"])
mcap_map = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap_map[name] = float(mc)
print(f"✅ : {len(mcap_map)}")
except Exception as e:
print(f"⚠️ API: {e}")
heat_map = {}
cg_trending = set()
square_trending = set()
sq_coins = get_square_heat()
if sq_coins:
for i, c in enumerate(sq_coins):
coin = c["coin"]
square_trending.add(coin)
rank_score = max(50 - i * 4, 10)
if c.get("rapidRiser"):
rank_score += 15
heat_map[coin] = heat_map.get(coin, 0) + rank_score
print(f"🏦 : {len(square_trending)} {[c['coin'] for c in sq_coins[:5]]}")
# 3b. CoinGecko Trending
try:
r = requests.get("https://api.coingecko.com/api/v3/search/trending", timeout=10)
if r.status_code == 200:
for item in r.json().get("coins", []):
sym = item["item"]["symbol"].upper()
rank = item["item"].get("score", 99)
cg_trending.add(sym)
heat_map[sym] = heat_map.get(sym, 0) + max(50 - rank * 3, 10)
print(f"🌐 CG Trending: {len(cg_trending)}")
except Exception as e:
print(f"⚠️ CG Trending: {e}")
vol_surge_coins = set()
for sym, tk in ticker_map.items():
coin = sym.replace("USDT", "")
vol_24h = tk["vol"]
if vol_24h > MIN_VOL_USD:
kl = api_get("/fapi/v1/klines", {"symbol": sym, "interval": "1d", "limit": 8})
if kl and len(kl) >= 5:
avg_prev = sum(float(k[7]) for k in kl[:-1]) / (len(kl) - 1)
if avg_prev > 0:
ratio = vol_24h / avg_prev
if ratio >= VOL_SURGE_MULT:
vol_surge_coins.add(coin)
heat_map[coin] = heat_map.get(coin, 0) + min(ratio * 10, 50)
time.sleep(0.05)
print(f"📈 (≥{VOL_SURGE_MULT}x): {len(vol_surge_coins)}")
dual_heat = cg_trending & vol_surge_coins
square_vol = square_trending & vol_surge_coins
triple_heat = cg_trending & vol_surge_coins & square_trending
all_multi_heat = dual_heat | square_vol
if all_multi_heat:
for coin in all_multi_heat:
heat_map[coin] = heat_map.get(coin, 0) + 20
if triple_heat:
for coin in triple_heat:
heat_map[coin] = heat_map.get(coin, 0) + 30 #
print(f"🔥🔥🔥 : {triple_heat}")
else:
print(f"🔥🔥 : {all_multi_heat}")
scan_syms = set()
for coin in heat_map:
sym = coin + "USDT"
if sym in ticker_map:
scan_syms.add(sym)
top_by_vol = sorted(ticker_map.items(), key=lambda x: x[1]["vol"], reverse=True)[:100]
for sym, _ in top_by_vol:
scan_syms.add(sym)
oi_map = {}
for i, sym in enumerate(scan_syms):
oi_hist = api_get("/futures/data/openInterestHist", {
"symbol": sym, "period": "1h", "limit": 6
})
if oi_hist and len(oi_hist) >= 2:
curr = float(oi_hist[-1]["sumOpenInterestValue"])
prev_1h = float(oi_hist[-2]["sumOpenInterestValue"])
prev_6h = float(oi_hist[0]["sumOpenInterestValue"])
d1h = ((curr - prev_1h) / prev_1h * 100) if prev_1h > 0 else 0
d6h = ((curr - prev_6h) / prev_6h * 100) if prev_6h > 0 else 0
oi_map[sym] = {"oi_usd": curr, "d1h": d1h, "d6h": d6h}
if (i + 1) % 10 == 0:
time.sleep(0.5)
print(f"📊 OIscan: {len(oi_map)}")
all_syms = set(list(ticker_map.keys()))
coin_data = {}
for sym in all_syms:
tk = ticker_map.get(sym, {})
if not tk:
continue
oi = oi_map.get(sym, {})
fr = funding_map.get(sym, 0)
coin = sym.replace("USDT", "")
d6h = oi.get("d6h", 0)
fr_pct = fr * 100
oi_usd = oi.get("oi_usd", 0)
if coin in mcap_map:
est_mcap = mcap_map[coin]
else:
est_mcap = max(tk["vol"] * 0.3, oi_usd * 2) if oi_usd > 0 else tk["vol"] * 0.3
heat = heat_map.get(coin, 0)
coin_data[sym] = {
"coin": coin, "sym": sym,
"px_chg": tk["px_chg"], "vol": tk["vol"],
"fr_pct": fr_pct, "d6h": d6h,
"oi_usd": oi_usd, "est_mcap": est_mcap,
"heat": heat,
"in_cg": coin in cg_trending,
"in_sq": coin in square_trending,
"vol_surge": coin in vol_surge_coins,
}
# ═══════════════════════════════════════
# ═══════════════════════════════════════
hot_coins = sorted(
[d for d in coin_data.values() if d["heat"] > 0],
key=lambda x: x["heat"], reverse=True
)
heat_history = {}
if HEAT_HISTORY_FILE.exists():
try:
heat_history = json.loads(HEAT_HISTORY_FILE.read_text())
except:
pass
now_ts = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
new_entries = [] #
for s in hot_coins:
coin = s["coin"]
if coin not in heat_history:
heat_history[coin] = {"first_seen": now_ts, "price": s.get("px_chg", 0)}
sources = []
if s["in_sq"]: sources.append("")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("")
new_entries.append({"coin": coin, "sources": sources, "data": s})
cutoff = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=7)).strftime("%Y-%m-%d")
heat_history = {k: v for k, v in heat_history.items()
if v.get("first_seen", "9999") >= cutoff}
HEAT_HISTORY_FILE.write_text(json.dumps(heat_history, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════
# ═══════════════════════════════════════
chase = []
for sym, d in coin_data.items():
if d["px_chg"] > 3 and d["fr_pct"] < -0.005 and d["vol"] > 1_000_000:
fr_hist = api_get("/fapi/v1/fundingRate", {"symbol": sym, "limit": 5})
fr_rates = [float(f["fundingRate"]) * 100 for f in fr_hist] if fr_hist else [d["fr_pct"]]
fr_prev = fr_rates[-2] if len(fr_rates) >= 2 else d["fr_pct"]
fr_delta = d["fr_pct"] - fr_prev
trend = "" if fr_delta < -0.05 else "" if fr_delta < -0.01 else "" if abs(fr_delta) < 0.01 else ""
chase.append({**d, "fr_delta": fr_delta, "trend": trend,
"rates": " → ".join([f"{x:.3f}" for x in fr_rates[-3:]])})
time.sleep(0.2)
chase.sort(key=lambda x: x["fr_pct"])
# ═══════════════════════════════════════
# ═══════════════════════════════════════
now = datetime.now(timezone(timedelta(hours=8)))
lines = [
f"**longradar**",
f"{now.strftime('%Y-%m-%d %H:%M')} CST",
]
if new_entries:
tbl = ["```"]
tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for e in new_entries:
s = e["data"]
src_str = "/".join(e["sources"])
tbl.append(f"{s['coin']:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
if hot_coins:
lines.append(f"\n**[ ]**")
tbl = ["```"]
tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for s in hot_coins[:10]:
sources = []
if s["in_sq"]: sources.append("")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("")
extra = []
if abs(s["d6h"]) >= 3: extra.append(f"OI{s['d6h']:+.0f}%")
if s["fr_pct"] < -0.03: extra.append(f"{s['fr_pct']:.2f}%")
src_str = "/".join(sources)
if extra:
src_str += " " + " ".join(extra)
coin_name = s['coin']
tbl.append(f"{coin_name:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append("\n**[ ]** ")
lines.append(f"\n**[ ]** +=")
if chase:
tbl = ["```"]
tbl.append(f"{'':<10} {'':>10} {'':>8} {'':>7} {'':>8}")
tbl.append(f"{'-'*10} {'-'*10} {'-'*8} {'-'*7} {'-'*8}")
for s in chase[:8]:
tbl.append(
f"{s['coin']:<10} {s['fr_pct']:>+9.3f}% {s['trend']:>8} {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append(" condition")
oi_alerts = []
for sym, oi in oi_map.items():
if abs(oi["d6h"]) >= 8:
d = coin_data.get(sym)
if d and d["heat"] == 0:
oi_alerts.append(d)
oi_alerts.sort(key=lambda x: abs(x["d6h"]), reverse=True)
if oi_alerts:
lines.append(f"\n**[ OI ]** 6positions>=8%")
tbl = ["```"]
tbl.append(f"{'':<10} {'':>4} {'OI':>8} {'':>7} {'':>8}")
tbl.append(f"{'-'*10} {'-'*4} {'-'*8} {'-'*7} {'-'*8}")
for s in oi_alerts[:6]:
direction = "" if s["d6h"] > 0 else ""
tbl.append(
f"{s['coin']:<10} {direction:>4} {s['d6h']:>+7.1f}% {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
highlights = []
hot_oi = [d for d in coin_data.values() if d["heat"] > 0 and d["d6h"] > 5]
for s in sorted(hot_oi, key=lambda x: x["d6h"], reverse=True)[:3]:
highlights.append(f"{s['coin']} — +OI{s['d6h']:+.0f}%,")
hot_fuel = [d for d in coin_data.values() if d["heat"] > 0 and d["fr_pct"] < -0.03]
for s in sorted(hot_fuel, key=lambda x: x["fr_pct"])[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — +{s['fr_pct']:.2f}%,")
chase_fire = [s for s in chase[:5] if "" in s.get("trend", "")]
for s in chase_fire[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — {s['fr_pct']:.3f}%,short squeeze")
if highlights:
lines.append(f"\n**[ ]**")
for h in highlights[:5]:
lines.append(f" {h}")
lines.append(f"\n=Binancesearch / CG=CoinGecko")
report = "\n".join(lines)
send_telegram(report)
print("\n✅ ")
if __name__ == "__main__":
main()
Binance Alpha Monitor #
Date: 2026.04.23 Tags: Python · Binance · Claude AI · Telegram
WebSocket + AI analysis + Telegram alerts
Auto-detects new Binance Alpha listings, analyzes token quality with Claude AI, and sends alerts via Telegram.
Full source code #
#!/usr/bin/env python3
"""
Binance Alpha Monitor v2 —
REST + intelligent filter + + TG
API Key,zero AI cost(rule engine)
run: python3 alpha_monitor.py
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import httpx
# ============================================================
# configuration
# ============================================================
BASE_DIR = Path(__file__).parent
DB_PATH = str(BASE_DIR / "data" / "alpha.db")
# TG
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
def _load_anthropic_key():
"""fromauth.jsonvariableAPI key"""
# 1. hermes auth.json credential pool
try:
auth_file = os.path.expanduser("~/.hermes/auth.json")
if os.path.exists(auth_file):
with open(auth_file) as f:
auth = json.load(f)
pool = auth.get("credential_pool", {}).get("anthropic", [])
if pool:
key = pool[0].get("access_token", "")
if key and key != "***":
return key
except Exception:
pass
return os.environ.get("ANTHROPIC_API_KEY", "")
ANTHROPIC_API_KEY = _load_anthropic_key()
ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")
ANNOUNCEMENT_POLL_INTERVAL = 30 # 30
AGGREGATION_POLL_INTERVAL = 15 # 15
MONITOR_POLL_INTERVAL = 120 # monitoring2
# HTTP
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
}
BINANCE_ANNOUNCEMENT_API = "https://www.binance.com/bapi/composite/v1/public/cms/article/list/query"
# ============================================================
# logging
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("alpha")
# ============================================================
# ============================================================
TRIGGER_KEYWORDS = [
"alpha", "", "airdrop", "tge", "token generation",
"", "will list", "will launch",
"", "exclusive", "binance wallet", "hodler",
]
EXCLUDE_KEYWORDS = [
"delisting", "delist", "", "deprecate", "",
"maintenance", "",
"launchpool", "megadrop",
"buyback", "",
"perpetual contract", # futures
"futures will launch", #
"usdⓢ-margined", # Ufutures
"coin-margined", # futures
"margin will add", #
"trading bots services", # trading
"trading pairs", # trading()
]
ALPHA_BOX_KEYWORDS = ["alpha box", "", "mystery box"]
# ============================================================
# ============================================================
TIER1_VCS = [
"binance labs", "yzi labs",
"coinbase ventures", "a16z", "andreessen horowitz", "paradigm",
"polychain", "polychain capital", "sequoia", "sequoia china", "sequoia capital",
"multicoin", "multicoin capital", "pantera", "pantera capital",
"dragonfly", "dragonfly capital", "founders fund",
]
TIER2_VCS = [
"abcde", "iosg", "hashkey", "okx ventures",
"sevenx", "folius", "foresight", "hashed",
"bitkraft", "framework", "framework ventures",
"delphi", "delphi digital", "electric capital",
"variant", "1kx", "placeholder",
"animoca", "animoca brands", "jump", "jump crypto",
"hack vc", "bain capital",
]
HOT_NARRATIVES = ["defi_perp", "ai_agent", "ai_defi", "defai", "zk_proof"]
WEAK_NARRATIVES = ["gamefi", "meme", "social"]
BINANCE_DARLING_KEYWORDS = ["yzi labs", "binance labs"]
# ============================================================
# ============================================================
TIER_ICONS = {"S": "🟢🟢🟢", "A": "🟡🟡", "B": "🟠", "C": "⚪"}
TIER_LABELS = {"S": "S ()", "A": "A ()", "B": "B ()", "C": "C ()"}
def count_vc_tier(vcs: list, vc_list: list) -> int:
count = 0
vcs_lower = [v.lower() for v in vcs]
for t in vc_list:
if any(t in v for v in vcs_lower):
count += 1
return count
def rate_project(circ_mcap: float, fdv: float, vcs: list,
narrative: str, is_darling: bool) -> dict:
"""S/A/B/C """
t1 = count_vc_tier(vcs, TIER1_VCS)
t2 = count_vc_tier(vcs, TIER2_VCS)
hot = narrative in HOT_NARRATIVES
weak = narrative in WEAK_NARRATIVES
circ_mcap = circ_mcap or 0
fdv = fdv or 0
warnings = []
if weak:
warnings.append(f"⚠️ {narrative} ")
if is_darling:
return {"tier": "S", "reason": "Binance(YZi/Binance Labs/CZ)", "warnings": warnings}
if hot and t1 >= 1 and fdv < 500_000_000:
return {"tier": "S", "reason": f"({narrative})+ Tier1 VC", "warnings": warnings}
if t1 >= 2 and circ_mcap < 50_000_000 and fdv < 300_000_000:
return {"tier": "S", "reason": "≥2 Tier1 ", "warnings": warnings}
if t1 >= 1 and circ_mcap < 10_000_000 and fdv < 100_000_000:
return {"tier": "S", "reason": "Tier1 ", "warnings": warnings}
if hot and circ_mcap < 10_000_000 and fdv < 50_000_000:
return {"tier": "S", "reason": f"({narrative})", "warnings": warnings}
if t1 >= 1 and circ_mcap < 20_000_000 and fdv < 200_000_000:
return {"tier": "A", "reason": "Tier1 ", "warnings": warnings}
if circ_mcap < 50_000_000 and fdv < 500_000_000:
return {"tier": "B", "reason": "", "warnings": warnings}
return {"tier": "C", "reason": "/", "warnings": warnings}
# ============================================================
# database
# ============================================================
def init_db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
symbol TEXT NOT NULL,
name TEXT,
launch_time TEXT,
source TEXT,
raw_text TEXT,
tier TEXT DEFAULT 'PENDING',
tier_reason TEXT,
narrative TEXT,
narrative_desc TEXT,
vcs_json TEXT DEFAULT '[]',
is_darling INTEGER DEFAULT 0,
open_price REAL,
total_supply REAL,
circulating_supply REAL,
fdv REAL,
circulating_mcap REAL,
excluded INTEGER DEFAULT 0,
exclude_reason TEXT,
discovered_at TEXT,
updated_at TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS pushes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
push_type TEXT,
sent_at TEXT,
content TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
price REAL,
circulating_mcap REAL,
fdv REAL
)""")
conn.commit()
conn.close()
def project_id(symbol: str, date_str: str) -> str:
return hashlib.md5(f"{symbol.upper()}_{date_str}".encode()).hexdigest()[:16]
def project_exists(pid: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute("SELECT 1 FROM projects WHERE id=?", (pid,)).fetchone() is not None
conn.close()
return exists
def save_project(project: dict):
conn = sqlite3.connect(DB_PATH)
now = datetime.utcnow().isoformat()
conn.execute("""
INSERT OR IGNORE INTO projects
(id, symbol, name, launch_time, source, raw_text, tier, tier_reason,
narrative, narrative_desc, vcs_json, is_darling,
open_price, total_supply, circulating_supply, fdv, circulating_mcap,
excluded, exclude_reason, discovered_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
project["id"], project["symbol"], project.get("name"),
project.get("launch_time"), project.get("source"), project.get("raw_text"),
project.get("tier", "PENDING"), project.get("tier_reason"),
project.get("narrative"), project.get("narrative_desc"),
json.dumps(project.get("vcs", [])), int(project.get("is_darling", False)),
project.get("open_price"), project.get("total_supply"),
project.get("circulating_supply"), project.get("fdv"),
project.get("circulating_mcap"),
int(project.get("excluded", 0)), project.get("exclude_reason"),
now, now,
))
conn.commit()
conn.close()
def update_project(pid: str, fields: dict):
if not fields:
return
conn = sqlite3.connect(DB_PATH)
fields["updated_at"] = datetime.utcnow().isoformat()
set_parts = [f"{k}=?" for k in fields]
values = list(fields.values()) + [pid]
conn.execute(f"UPDATE projects SET {','.join(set_parts)} WHERE id=?", values)
conn.commit()
conn.close()
def get_project(pid: str) -> Optional[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM projects WHERE id=?", (pid,)).fetchone()
conn.close()
return dict(row) if row else None
def list_pending() -> list:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM projects WHERE excluded=0 AND tier='PENDING' ORDER BY discovered_at"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_active() -> list:
"""monitoringproject(PENDINGEXCLUDED,launch_time)"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM projects
WHERE excluded=0 AND launch_time IS NOT NULL AND launch_time != ''
AND tier NOT IN ('PENDING', 'EXCLUDED', 'ERROR')
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def has_pushed(pid: str, push_type: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute(
"SELECT 1 FROM pushes WHERE project_id=? AND push_type=?", (pid, push_type)
).fetchone() is not None
conn.close()
return exists
def log_push(pid: str, push_type: str, content: str):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO pushes (project_id, push_type, sent_at, content) VALUES (?,?,?,?)",
(pid, push_type, datetime.utcnow().isoformat(), content)
)
conn.commit()
conn.close()
def save_snapshot(pid: str, price: float, mcap: float, fdv: float):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO snapshots (project_id, timestamp, price, circulating_mcap, fdv) VALUES (?,?,?,?,?)",
(pid, datetime.utcnow().isoformat(), price, mcap, fdv)
)
conn.commit()
conn.close()
# ============================================================
# ============================================================
async def send_tg(text: str, silent: bool = False) -> bool:
if not TG_BOT_TOKEN or not TG_CHAT_ID:
logger.error("TG_BOT_TOKEN TG_CHAT_ID configuration")
return False
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_notification": silent,
}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.post(url, json=payload)
if resp.status_code != 200:
logger.error(f"TGsend {resp.status_code}: {resp.text[:200]}")
return False
return True
except Exception as e:
logger.error(f"TGsend: {e}")
return False
# ============================================================
# ============================================================
def is_trigger(title: str) -> tuple[bool, Optional[str]]:
t = title.lower()
for kw in EXCLUDE_KEYWORDS:
if kw.lower() in t:
return False, f": {kw}"
for kw in ALPHA_BOX_KEYWORDS:
if kw.lower() in t:
return False, "Alpha Box "
for kw in TRIGGER_KEYWORDS:
if kw.lower() in t:
return True, None
return False, None
def extract_symbol(title: str) -> Optional[str]:
m = re.search(r"\(([A-Z0-9]{2,10})\)", title)
if m:
return m.group(1)
m = re.search(r"(([A-Z0-9]{2,10}))", title)
if m:
return m.group(1)
return None
def extract_name(title: str) -> Optional[str]:
patterns = [
r"(?:|List|list|Launch|launch|featured)\s+([A-Za-z0-9 ]+?)\s*[\((]",
]
for p in patterns:
m = re.search(p, title, re.IGNORECASE)
if m:
return m.group(1).strip()
return None
# ============================================================
# ============================================================
async def fetch_announcements(limit: int = 20) -> list:
all_articles = []
# 48: New Cryptocurrency Listing, 161: Latest Activities, 93: Latest News
for catalog_id in [48, 161, 93]:
params = {"type": 1, "catalogId": catalog_id, "pageNo": 1, "pageSize": limit}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.get(BINANCE_ANNOUNCEMENT_API, params=params)
resp.raise_for_status()
data = resp.json()
for catalog in data.get("data", {}).get("catalogs", []):
for a in catalog.get("articles", []):
a["_catalog_id"] = catalog_id
all_articles.append(a)
except Exception as e:
logger.warning(f"classification {catalog_id} : {e}")
seen = set()
unique = []
for a in all_articles:
code = a.get("code")
if code and code not in seen:
seen.add(code)
unique.append(a)
return unique
# ============================================================
# CoinGecko data
# ============================================================
async def fetch_coingecko(symbol: str, project_name: str = "") -> dict:
"""CoinGeckotokendata。matching:symbolmatching + projectmatching"""
result = {"found": False, "price": None, "fdv": None, "mcap": None,
"total_supply": None, "circ_supply": None, "chain": None, "contract": None}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
queries = [symbol]
if project_name and project_name.upper() != symbol.upper():
queries.append(project_name)
coin_id = None
best_rank = 999999
name_exact_match = None # projectmatching
for query in queries:
resp = await client.get("https://api.coingecko.com/api/v3/search",
params={"query": query})
if resp.status_code != 200:
continue
coins = resp.json().get("coins", [])
for c in coins:
c_sym = c.get("symbol", "").upper()
c_name = c.get("name", "").lower()
c_rank = c.get("market_cap_rank") or 999999
if project_name and c_name == project_name.lower():
name_exact_match = c["id"]
if c_sym == symbol.upper():
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
if project_name and project_name.lower() in c_name:
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
if name_exact_match:
coin_id = name_exact_match
if not coin_id:
logger.info(f"CoinGecko {symbol}/{project_name}")
return result
logger.info(f"CoinGeckomatching: {symbol} -> {coin_id} (rank={best_rank})")
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code == 429:
await asyncio.sleep(5)
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code != 200:
return result
d = resp2.json()
md = d.get("market_data", {})
result.update({
"found": True,
"price": (md.get("current_price") or {}).get("usd"),
"fdv": (md.get("fully_diluted_valuation") or {}).get("usd"),
"mcap": (md.get("market_cap") or {}).get("usd"),
"total_supply": md.get("total_supply"),
"circ_supply": md.get("circulating_supply"),
})
result["categories"] = d.get("categories", [])
result["description"] = (d.get("description") or {}).get("en", "")[:500]
platforms = d.get("platforms", {})
for chain, addr in platforms.items():
if addr:
result["chain"] = chain
result["contract"] = addr
break
if not result["price"]:
binance_price = await _fetch_binance_price(symbol, client)
if binance_price:
result["price"] = binance_price
ts = result.get("total_supply") or 0
cs = result.get("circ_supply") or 0
if ts > 0:
result["fdv"] = binance_price * ts
if cs > 0:
result["mcap"] = binance_price * cs
logger.info(f"Binance {symbol}: ${binance_price}, FDV=${result.get('fdv',0):,.0f}")
except Exception as e:
logger.warning(f"CoinGeckoquery {symbol}: {e}")
return result
async def _fetch_binance_price(symbol: str, client: httpx.AsyncClient) -> float:
"""fromBinancefutures"""
pair = f"{symbol.upper()}USDT"
try:
resp = await client.get(f"https://api.binance.com/api/v3/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
# 2. futures
try:
resp = await client.get(f"https://fapi.binance.com/fapi/v1/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
return 0.0
# ============================================================
# ============================================================
async def llm_extract(raw_text: str, symbol: str, name: str = "", cg_data: dict = None) -> dict:
"""LLMfrom+CoinGeckodata/VC/"""
fallback = {
"narrative": "unknown", "narrative_desc": "",
"vcs": [], "is_darling": False, "exclude_reason": None,
}
cg_data = cg_data or {}
categories = cg_data.get("categories", [])
description = cg_data.get("description", "")
darling_cats = [c for c in categories if any(kw in c.lower() for kw in ["yzi labs", "binance labs"])]
if darling_cats:
fallback["is_darling"] = True
if not ANTHROPIC_API_KEY:
t = raw_text.lower()
for kw in BINANCE_DARLING_KEYWORDS:
if kw in t:
fallback["is_darling"] = True
cat_str = " ".join(categories).lower()
if "defi" in cat_str: fallback["narrative"] = "defi"
elif "ai" in cat_str: fallback["narrative"] = "ai_agent"
elif "gaming" in cat_str or "gamefi" in cat_str: fallback["narrative"] = "gamefi"
elif "meme" in cat_str: fallback["narrative"] = "meme"
elif "rwa" in cat_str or "real world" in cat_str: fallback["narrative"] = "rwa"
return fallback
extra_context = ""
if categories:
extra_context += f"\nCoinGeckoclassification: {', '.join(categories)}"
if description:
extra_context += f"\nproject: {description[:300]}"
if cg_data.get("found"):
extra_context += f"\ndata: FDV=${cg_data.get('fdv',0):,.0f}, MCap=${cg_data.get('mcap',0):,.0f}, =${cg_data.get('price',0)}"
if cg_data.get("chain"):
extra_context += f", ={cg_data['chain']}"
system = "cryptocurrencyresearcher,fromBinanceprojectdatakey 。returnJSON,。"
user = f"""Binanceproject:
token: {symbol}, project: {name or ""}
: {raw_text}
{extra_context}
returnJSON:
{{
"narrative": "defi_perp|ai_agent|ai_defi|defai|zk_proof|infra|defi|rwa|gamefi|meme|social|stablecoin|unknown",
"narrative_desc": "projectwhat、whatfeatures",
"vcs": ["fromCoinGeckoclassification"],
"is_darling": true/false,
"exclude_reason": null|"already_tge"|"meme_only"
}}
judge:
- narrative: main class
- vcs: CoinGeckoclassification "XXX Portfolio" XXXas
- is_darling: YZi Labs/Binance Labs CZ/ true
- exclude_reason: projectmain CEX(Coinbase/OKX/Bybit)3"already_tge"。DEXBinance,already_tge。CoinGeckodataalready_tge。meme"meme_only"
"""
try:
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(
f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": ANTHROPIC_MODEL,
"max_tokens": 800,
"temperature": 0,
"system": system,
"messages": [{"role": "user", "content": user}],
}
)
if resp.status_code != 200:
logger.warning(f"LLMcall {resp.status_code}")
return fallback
data = resp.json()
text = ""
for block in data.get("content", []):
if block.get("type") == "text":
text = block.get("text", "")
break
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(lines[1:-1])
return json.loads(text)
except Exception as e:
logger.warning(f"LLM: {e}")
return fallback
# ============================================================
# ============================================================
def _fmt_mcap(v):
if not v:
return "N/A"
if v >= 1e9:
return f"${v/1e9:.1f}B"
if v >= 1e6:
return f"${v/1e6:.1f}M"
if v >= 1e3:
return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def _fmt_price(v):
if not v:
return "N/A"
if v >= 1:
return f"${v:.2f}"
if v >= 0.01:
return f"${v:.4f}"
return f"${v:.6f}"
def fmt_discovery(p: dict) -> str:
tier = p.get("tier", "C")
icon = TIER_ICONS.get(tier, "⚪")
label = TIER_LABELS.get(tier, "")
symbol = p["symbol"]
name = p.get("name") or ""
vcs = json.loads(p.get("vcs_json", "[]")) if isinstance(p.get("vcs_json"), str) else p.get("vcs", [])
lines = [
f"{icon} <b>Alpha · ${symbol}</b> {icon}",
f"📋 {label}",
"",
f"<b>{name}</b>" if name else "",
]
if p.get("narrative_desc"):
lines.append(f"💡 {p['narrative_desc']}")
if p.get("narrative") and p["narrative"] != "unknown":
lines.append(f"🏷 : {p['narrative']}")
lines.append("")
if p.get("fdv"):
lines.append(f"📊 FDV: {_fmt_mcap(p['fdv'])}")
if p.get("circulating_mcap"):
lines.append(f"📊 : {_fmt_mcap(p['circulating_mcap'])}")
if p.get("open_price"):
lines.append(f"💰 : {_fmt_price(p['open_price'])}")
if p.get("total_supply") and p.get("circulating_supply"):
pct = p["circulating_supply"] / p["total_supply"] * 100
lines.append(f"📦 initial: {pct:.1f}%")
if vcs:
lines.append("")
lines.append("🏛 <b></b>")
for v in vcs[:5]:
is_t1 = any(t in v.lower() for t in TIER1_VCS)
lines.append(f" {'⭐' if is_t1 else '·'} {v}")
if p.get("is_darling"):
lines.append("")
lines.append("🔥 <b>Binance</b>")
if p.get("tier_reason"):
lines.append("")
lines.append(f"🎯 {p['tier_reason']}")
lines.append("")
lines.append(f"<i>📌 : {p.get('source', 'binance')}</i>")
if p.get("raw_text"):
lines.append(f"<i>{p['raw_text'][:120]}</i>")
return "\n".join(l for l in lines if l is not None)
def fmt_countdown(p: dict, minutes: int) -> str:
icon = TIER_ICONS.get(p.get("tier", "C"), "⚪")
t = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
lines = [
f"{icon} <b></b>",
f"<b>${p['symbol']}</b> · {p.get('name', '')}",
f"⏰ <b>{t}</b>",
]
if p.get("fdv"):
lines.append(f"FDV: {_fmt_mcap(p['fdv'])}")
if minutes <= 30:
lines.append("🔔 <b></b>")
return "\n".join(lines)
def fmt_launch(p: dict, price: float, mcap: float, fdv: float) -> str:
lines = [
f"🚀 <b>${p['symbol']} </b>",
f": <b>{_fmt_price(price)}</b>",
f": <b>{_fmt_mcap(mcap)}</b>",
f"FDV: <b>{_fmt_mcap(fdv)}</b>",
]
return "\n".join(lines)
def fmt_periodic(p: dict, idx: int, price: float, mcap: float, change_pct: float) -> str:
arrow = "📈" if change_pct > 0 else "📉"
minutes = 30 * idx
lines = [
f"⏱ <b>${p['symbol']} · +{minutes}min</b>",
f": {_fmt_mcap(mcap)} ({arrow} {change_pct:+.1f}%)",
f": {_fmt_price(price)}",
]
if change_pct >= 100:
lines.append("💡 <b>,</b>")
elif change_pct <= -30:
lines.append("⚠️ ,evaluation")
return "\n".join(lines)
def fmt_anomaly(p: dict, atype: str, price: float, change_pct: float) -> str:
emoji = {"double": "🚀", "halve": "🔻"}.get(atype, "⚡")
desc = {"double": "", "halve": ""}.get(atype, "")
return f"{emoji} <b>${p['symbol']} {desc}</b>\n: {change_pct:+.1f}%\n: {_fmt_price(price)}"
# ============================================================
# ============================================================
async def announcement_listener():
"""Binance,Alphaproject"""
logger.info(f"📡 · {ANNOUNCEMENT_POLL_INTERVAL}s")
while True:
try:
articles = await fetch_announcements()
new_count = 0
for art in articles:
title = art.get("title", "")
triggered, reason = is_trigger(title)
if not triggered:
continue
symbol = extract_symbol(title)
if not symbol:
continue
release_ts = art.get("releaseDate")
release_iso = datetime.fromtimestamp(release_ts / 1000).isoformat() if release_ts else ""
launch_date = release_iso[:10] if release_iso else datetime.utcnow().date().isoformat()
pid = project_id(symbol, launch_date)
if project_exists(pid):
continue
project = {
"id": pid,
"symbol": symbol,
"name": extract_name(title),
"launch_time": release_iso,
"source": "binance_announcement",
"raw_text": title,
"tier": "PENDING",
"vcs": [],
"is_darling": False,
"excluded": 0,
}
save_project(project)
new_count += 1
logger.info(f"🆕 ${symbol}: {title[:80]}")
if new_count:
logger.info(f" {new_count} project")
except Exception as e:
logger.error(f": {e}", exc_info=True)
await asyncio.sleep(ANNOUNCEMENT_POLL_INTERVAL)
# ============================================================
# ============================================================
async def aggregation_worker():
"""PENDINGprojectdata、、"""
logger.info(f"🧠 · {AGGREGATION_POLL_INTERVAL}s")
while True:
try:
pending = list_pending()
for p in pending:
symbol = p["symbol"]
try:
logger.info(f"📦 ${symbol}")
# 1. CoinGecko
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
await asyncio.sleep(1) # rate limiting
llm = await llm_extract(p.get("raw_text", ""), symbol, p.get("name"), cg_data=cg)
await asyncio.sleep(1)
if llm.get("exclude_reason") in ("already_tge", "meme_only"):
update_project(p["id"], {
"excluded": 1,
"exclude_reason": llm["exclude_reason"],
"tier": "EXCLUDED",
})
logger.info(f"⏭ ${symbol} : {llm['exclude_reason']}")
continue
cg_fdv = cg.get("fdv", 0) or 0
cg_mcap = cg.get("mcap", 0) or 0
data_suspect = False
data_warnings = []
if cg.get("found") and cg_fdv > 0 and cg_fdv < 100_000:
data_suspect = True
data_warnings.append(f"FDV=${cg_fdv:,.0f},CoinGeckomatchingerror")
logger.warning(f"⚠️ {symbol} FDV=${cg_fdv:,.0f} ,data")
if (cg.get("circ_supply") and cg.get("total_supply")
and cg["circ_supply"] == cg["total_supply"]
and cg_fdv < 1_000_000):
data_suspect = True
if data_suspect:
cg_fdv = 0
cg_mcap = 0
logger.warning(f"⚠️ {symbol} CoinGeckodata,usageLLMjudge")
is_darling = llm.get("is_darling", False)
vcs = llm.get("vcs", [])
narrative = llm.get("narrative", "unknown")
rating = rate_project(
cg_mcap, cg_fdv,
vcs, narrative, is_darling
)
update_project(p["id"], {
"tier": rating["tier"],
"tier_reason": rating["reason"],
"narrative": narrative,
"narrative_desc": llm.get("narrative_desc", ""),
"vcs_json": json.dumps(vcs),
"is_darling": int(is_darling),
"open_price": cg.get("price") if not data_suspect else None,
"total_supply": cg.get("total_supply") if not data_suspect else None,
"circulating_supply": cg.get("circ_supply") if not data_suspect else None,
"fdv": cg_fdv if cg_fdv else None,
"circulating_mcap": cg_mcap if cg_mcap else None,
})
full = get_project(p["id"])
if full and not has_pushed(p["id"], "discovery"):
text = fmt_discovery(full)
silent = rating["tier"] in ("B", "C")
ok = await send_tg(text, silent=silent)
if ok:
log_push(p["id"], "discovery", text)
logger.info(f"✅ ${symbol} [{rating['tier']}]")
except Exception as e:
logger.error(f" {symbol} : {e}", exc_info=True)
update_project(p["id"], {"tier": "ERROR", "tier_reason": str(e)[:100]})
except Exception as e:
logger.error(f"loop: {e}", exc_info=True)
await asyncio.sleep(AGGREGATION_POLL_INTERVAL)
# ============================================================
# ============================================================
async def post_launch_monitor():
""" + + 30min×4tracking + """
logger.info(f"📊 monitoring · {MONITOR_POLL_INTERVAL}s")
while True:
try:
projects = list_active()
for p in projects:
try:
await _monitor_project(p)
except Exception as e:
logger.error(f"monitoring {p['symbol']} : {e}")
except Exception as e:
logger.error(f"monitoringloop: {e}", exc_info=True)
await asyncio.sleep(MONITOR_POLL_INTERVAL)
async def _monitor_project(p: dict):
pid = p["id"]
symbol = p["symbol"]
launch_str = p.get("launch_time", "")
if not launch_str:
return
try:
launch = datetime.fromisoformat(launch_str.replace("Z", "").split("+")[0])
except:
return
now = datetime.utcnow()
delta_sec = (launch - now).total_seconds()
# T-3h
if 3*3600 - 300 <= delta_sec <= 3*3600 + 300:
if not has_pushed(pid, "t_minus_3h"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=p.get("tier") in ("B", "C"))
if ok:
log_push(pid, "t_minus_3h", text)
# T-30m
elif 30*60 - 150 <= delta_sec <= 30*60 + 150:
if not has_pushed(pid, "t_minus_30m"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "t_minus_30m", text)
elif -300 <= delta_sec <= 0:
if not has_pushed(pid, "at_launch"):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
text = fmt_launch(p, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "at_launch", text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
elif 0 < -delta_sec <= 2.5 * 3600:
minutes_after = int(-delta_sec / 60)
for idx, target in enumerate([30, 60, 90, 120], 1):
if abs(minutes_after - target) <= 5:
ptype = f"post_30m_{idx}"
if not has_pushed(pid, ptype):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
open_price = p.get("open_price") or cg["price"]
change = ((cg["price"] - open_price) / open_price * 100) if open_price else 0
text = fmt_periodic(p, idx, cg["price"], cg.get("mcap", 0), change)
ok = await send_tg(text, silent=p.get("tier") in ("B", "C") and idx > 1)
if ok:
log_push(pid, ptype, text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
if change >= 100 and not has_pushed(pid, "anomaly_double"):
t = fmt_anomaly(p, "double", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_double", t)
elif change <= -50 and not has_pushed(pid, "anomaly_halve"):
t = fmt_anomaly(p, "halve", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_halve", t)
break
# ============================================================
# ============================================================
async def main():
init_db()
logger.info(f"📂 database: {DB_PATH}")
# testingTG
ok = await send_tg("🎉 <b>Alpha Monitor v2 </b>\n\n📡 Binance...\n🔔 Alpha")
if ok:
logger.info("✅ TG")
else:
logger.warning("⚠️ TG,configuration")
tasks = [
asyncio.create_task(announcement_listener(), name="announcements"),
asyncio.create_task(aggregation_worker(), name="aggregator"),
asyncio.create_task(post_launch_monitor(), name="monitor"),
]
logger.info("=" * 50)
logger.info("🚀 Alpha Monitor v2 ")
logger.info(f" 📡 : {ANNOUNCEMENT_POLL_INTERVAL}s")
logger.info(f" 🧠 LLM: {'Sonnet' if ANTHROPIC_API_KEY else 'degradation()'}")
logger.info(f" 🔔 TG: {'✅' if TG_BOT_TOKEN else '❌'}")
logger.info("=" * 50)
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
for t in tasks:
t.cancel()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
AI Autonomous Trading #
Futures + Alpha Autonomous Trading v1 #
Date: 2026.04.29 Tags: Python · Binance Futures · Autonomous · Telegram
AI scans market → analyzes → virtual trades → monitors → reviews — fully autonomous
⚠️ RISK WARNING: This code has only been tested on virtual/paper trading. It has NO real trading experience and should NOT be used as a basis for live trading. Not validated with real funds — use at your own risk.
An AI that autonomously scans the entire Binance futures market every 30 seconds, detects anomalies, and makes virtual trades. 4 signal detection strategies (extreme negative funding rate → long squeeze, extreme positive funding → short, post-pump short, crash bounce). Before opening any position, runs a multi-dimensional environment check (BTC trend + Fear&Greed sentiment + OI attention + volume activity) — needs score ≥3/7 to proceed. Auto stop-loss/take-profit monitoring every 30 seconds.
Current results (honest): 4 closed trades, 75% win rate, +13.94U. But profit concentrated in one trade (IR short +45%), rest basically break-even. Position diversification insufficient — tends to stack same-direction same-logic trades. Still rule-based scoring, not true independent thinking.
Training path: scan → trade → close → review → find problems → improve → repeat. Goal: evolve from rule executor to independent-thinking trader.b:T5192,#!/usr/bin/env pytho
Full source code #
#!/usr/bin/env python3
"""
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_FILE = os.path.join(SCRIPT_DIR, "trades.json")
SCANNER_STATE = os.path.join(SCRIPT_DIR, "scanner_state.json")
SCANNER_LOG = os.path.join(SCRIPT_DIR, "scanner.log")
INITIAL_BALANCE = 100.0
TZ_UTC8 = timezone(timedelta(hours=8))
# === configuration ===
MAX_OPEN_POSITIONS = 3 # positions
POSITION_PCT = 30 # %
LEVERAGE = 3 #
COOLDOWN_HOURS = 4 #
MIN_VOLUME_M = 10 # 24h(U)
def load_tg_config():
"""Load TG config from environment variables or .env file"""
env = {}
# Try .env in script directory, then current directory
for env_path in [
os.path.join(SCRIPT_DIR, ".env"),
os.path.join(os.getcwd(), ".env"),
]:
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v.strip().strip('"').strip("'")
break
# OS environment variables override file
for key in ['TG_BOT_TOKEN', 'TELEGRAM_BOT_TOKEN', 'TG_CHAT_ID']:
val = os.environ.get(key)
if val:
env[key] = val
return env
def send_tg(text):
try:
env = load_tg_config()
token = env.get('TG_BOT_TOKEN', env.get('TELEGRAM_BOT_TOKEN', ''))
if not token:
return
chat_id = env.get('TG_CHAT_ID', '')
if not chat_id:
return
url = f"https://api.telegram.org/bot{token}/sendMessage"
requests.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
}, timeout=10)
except:
pass
def load_trades():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"initial_balance": INITIAL_BALANCE, "trades": []}
def save_trades(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_state():
if os.path.exists(SCANNER_STATE):
with open(SCANNER_STATE, "r") as f:
return json.load(f)
return {"last_opens": {}, "signals_seen": {}}
def save_state(state):
with open(SCANNER_STATE, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def get_balance(data):
balance = data.get("initial_balance", INITIAL_BALANCE)
for t in data["trades"]:
if t["status"] == "closed" and t["pnl_usd"] is not None:
balance += t["pnl_usd"]
return balance
def next_id(data):
if not data["trades"]:
return "001"
max_id = max(int(t["id"]) for t in data["trades"])
return f"{max_id + 1:03d}"
def now_str():
return datetime.now(TZ_UTC8).strftime("%Y-%m-%dT%H:%M:%S")
def log(msg):
ts = datetime.now(TZ_UTC8).strftime("%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(SCANNER_LOG, "a") as f:
f.write(line + "\n")
# === BinanceAPI ===
def get_all_tickers():
url = "https://fapi.binance.com/fapi/v1/ticker/24hr"
resp = requests.get(url, timeout=10)
return resp.json()
def get_funding_rates():
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
resp = requests.get(url, timeout=10)
return {item['symbol']: float(item['lastFundingRate']) * 100
for item in resp.json()}
def get_funding_history(symbol, limit=8):
url = f"https://fapi.binance.com/fapi/v1/fundingRate?symbol={symbol}&limit={limit}"
resp = requests.get(url, timeout=10)
return [float(item['fundingRate']) * 100 for item in resp.json()]
def get_open_interest(symbol):
url = f"https://fapi.binance.com/fapi/v1/openInterest?symbol={symbol}"
resp = requests.get(url, timeout=10)
data = resp.json()
return float(data['openInterest'])
def get_klines(symbol, interval="4h", limit=6):
url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
resp = requests.get(url, timeout=10)
return resp.json()
def detect_extreme_negative_funding(symbol, funding_rate, funding_rates_map):
"""
policy1: extreme funding ratedeep negative → long(short squeeze)
condition: <-0.08% consecutiveperiods negative
"""
if funding_rate >= -0.08:
return None
try:
history = get_funding_history(symbol, 8)
neg_count = sum(1 for r in history if r < -0.03)
if neg_count < 4:
return None
avg_rate = sum(history) / len(history)
# more extreme rate,stronger signal
strength = "S" if avg_rate < -0.15 else "A" if avg_rate < -0.10 else "B"
return {
"type": "extreme_neg_funding",
"direction": "long",
"strength": strength,
"reason": f"extreme funding ratedeep negative avg:{avg_rate:.4f}% consecutive{neg_count}/8 short squeezeprobability",
"sl_pct": 0.08, # 8%
"tp_pct": 0.12, # 12%
}
except:
return None
def detect_extreme_positive_funding(symbol, funding_rate, funding_rates_map):
"""
policy2: extreme funding rate → short(crowded longs)
condition: >0.10% consecutivehigh positive
"""
if funding_rate <= 0.10:
return None
try:
history = get_funding_history(symbol, 8)
pos_count = sum(1 for r in history if r > 0.05)
if pos_count < 4:
return None
avg_rate = sum(history) / len(history)
strength = "S" if avg_rate > 0.20 else "A" if avg_rate > 0.12 else "B"
return {
"type": "extreme_pos_funding",
"direction": "short",
"strength": strength,
"reason": f"extreme funding rate avg:{avg_rate:.4f}% consecutive{pos_count}/8high positive ",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
return None
def detect_crash_bounce(ticker):
"""
policy3: crashbounce(oversold bounce)
condition: 24h>25% but 4hstabilize/
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct >= -25:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
recent_closes = [float(k[4]) for k in klines[-3:]]
if len(recent_closes) >= 2 and recent_closes[-1] >= recent_closes[-2]:
return {
"type": "crash_bounce",
"direction": "long",
"strength": "B", # B
"reason": f"24hcrash{change_pct:.1f}%stabilize oversold bounce",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
pass
return None
def detect_pump_short(ticker):
"""
policy4: pumpshort after(ATHpullback)
condition: 24h>40% — model,pumpcallbackprobability>85%
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct <= 40:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
highs = [float(k[2]) for k in klines]
closes = [float(k[4]) for k in klines]
current = closes[-1]
peak = max(highs)
pullback = (peak - current) / peak * 100
if pullback < 10:
return None
strength = "A" if change_pct > 80 else "B"
return {
"type": "pump_short",
"direction": "short",
"strength": strength,
"reason": f"24hpump{change_pct:.1f}%pullback{pullback:.1f}% callbackprobability>85%",
"sl_pct": 0.15, # pump,
"tp_pct": 0.20,
}
except:
pass
return None
# === environment check ===
def check_environment(symbol, signal):
"""
before opening:,multi-dimensionalalignment
return (pass/fail, analysis_dict, adjusted_strength)
"""
analysis = {
"btc_env": "",
"sentiment": "",
"oi_check": "",
"volume_check": "",
"verdict": ""
}
score = 0 # score,>=3to open
try:
btc_url = "https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT"
btc = requests.get(btc_url, timeout=5).json()
btc_chg = float(btc['priceChangePercent'])
if signal["direction"] == "long":
if btc_chg > -2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
elif btc_chg < -5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% crashlong -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
else: # short
if btc_chg < 2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
elif btc_chg > 5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% pumpshort -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
try:
fng = requests.get("https://api.alternative.me/fng/", timeout=5).json()
fng_val = int(fng['data'][0]['value'])
if signal["direction"] == "long":
if fng_val <= 25:
score += 1
analysis["sentiment"] = f"FGI={fng_val} long +1"
elif fng_val >= 75:
score -= 1
analysis["sentiment"] = f"FGI={fng_val} long -1"
else:
analysis["sentiment"] = f"FGI={fng_val} 0"
else:
if fng_val >= 75:
score += 1
analysis["sentiment"] = f"FGI={fng_val} short +1"
elif fng_val <= 25:
score -= 1
analysis["sentiment"] = f"FGI={fng_val} short -1"
else:
analysis["sentiment"] = f"FGI={fng_val} 0"
except:
analysis["sentiment"] = "FGI 0"
try:
oi = get_open_interest(symbol)
ticker = requests.get(f"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol={symbol}", timeout=5).json()
price = float(ticker['lastPrice'])
oi_usd = oi * price
if oi_usd > 5_000_000: # OI > 5M
score += 1
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M +1"
else:
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 0"
except:
analysis["oi_check"] = "OI 0"
try:
vol = float(ticker.get('quoteVolume', 0))
if vol > 50_000_000:
score += 1
analysis["volume_check"] = f"24h={vol/1e6:.0f}M +1"
elif vol > 20_000_000:
analysis["volume_check"] = f"24h={vol/1e6:.0f}M 0"
else:
score -= 1
analysis["volume_check"] = f"24h={vol/1e6:.0f}M -1"
except:
analysis["volume_check"] = " 0"
if signal["strength"] == "S":
score += 2
elif signal["strength"] == "A":
score += 1
analysis["verdict"] = f"score:{score}/7"
if score >= 3:
return True, analysis, signal["strength"]
else:
return False, analysis, signal["strength"]
except Exception as e:
analysis["verdict"] = f":{e} "
return False, analysis, signal["strength"]
def execute_open(data, state, symbol, price, signal):
"""execute — environment check"""
# environment check
passed, env_analysis, strength = check_environment(symbol, signal)
env_summary = " | ".join(v for v in env_analysis.values() if v)
if not passed:
log(f"via {symbol}: {env_summary}")
return
log(f"via {symbol}: {env_summary}")
balance = get_balance(data)
position_usd = balance * POSITION_PCT / 100
if signal["direction"] == "long":
sl = round(price * (1 - signal["sl_pct"]), 6)
tp = round(price * (1 + signal["tp_pct"]), 6)
else:
sl = round(price * (1 + signal["sl_pct"]), 6)
tp = round(price * (1 - signal["tp_pct"]), 6)
trade = {
"id": next_id(data),
"symbol": symbol,
"direction": signal["direction"],
"leverage": LEVERAGE,
"position_pct": POSITION_PCT,
"position_usd": round(position_usd, 4),
"notional_usd": round(position_usd * LEVERAGE, 4),
"entry_price": price,
"stop_loss": sl,
"take_profit": tp,
"entry_time": now_str(),
"exit_price": None,
"exit_time": None,
"exit_reason": None,
"pnl_pct": None,
"pnl_usd": None,
"status": "open",
"pre_analysis": {
"btc_env": env_analysis.get("btc_env", ""),
"sentiment": env_analysis.get("sentiment", ""),
"oi": env_analysis.get("oi_check", ""),
"volume": env_analysis.get("volume_check", ""),
"key_reason": f"[{signal['strength']}] {signal['reason']}",
"risk": f"score:{env_analysis.get('verdict','')} policy:{signal['type']}"
},
"post_review": None
}
data["trades"].append(trade)
save_trades(data)
state["last_opens"][symbol] = now_str()
save_state(state)
direction_cn = "long" if signal["direction"] == "long" else "short"
msg = f"""```
[scan] #{trade['id']}
: {symbol}
: {direction_cn} {LEVERAGE}x
: {price}
: {sl}
: {tp}
: {position_usd:.2f}U
: [{signal['strength']}] {signal['reason']}
: {trade['entry_time']}
```"""
log(f" #{trade['id']} {symbol} {direction_cn} @ {price} | {signal['reason']}")
send_tg(msg)
print(msg)
def swap_weakest(data, state, open_positions, new_signal, tickers):
ticker_map = {t['symbol']: float(t['lastPrice']) for t in tickers}
worst_trade = None
worst_pnl = float('inf')
for t in open_positions:
price = ticker_map.get(t["symbol"])
if price is None:
continue
if t["direction"] == "long":
pnl_pct = (price - t["entry_price"]) / t["entry_price"] * 100
else:
pnl_pct = (t["entry_price"] - price) / t["entry_price"] * 100
if pnl_pct < worst_pnl:
worst_pnl = pnl_pct
worst_trade = t
worst_price = price
if worst_trade is None:
return
if worst_pnl > 0:
log(f"but positions, | : {new_signal['symbol']}")
return
if worst_trade["direction"] == "long":
pnl_pct_lev = (worst_price - worst_trade["entry_price"]) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
else:
pnl_pct_lev = (worst_trade["entry_price"] - worst_price) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
pos_usd = worst_trade.get("position_usd", worst_trade.get("position_pct", 30))
pnl_usd = round(pnl_pct_lev / 100 * pos_usd, 4)
worst_trade["exit_price"] = worst_price
worst_trade["exit_time"] = now_str()
worst_trade["exit_reason"] = f"→{new_signal['symbol']}"
worst_trade["pnl_pct"] = round(pnl_pct_lev, 2)
worst_trade["pnl_usd"] = pnl_usd
worst_trade["status"] = "closed"
save_trades(data)
direction_cn = "" if worst_trade["direction"] == "long" else ""
msg = f"""```
[close] #{worst_trade['id']}
: {worst_trade['symbol']} {direction_cn}
: {worst_trade['entry_price']}
: {worst_price}
: {pnl_pct_lev:+.2f}% ({pnl_usd:+.2f}U)
: S{new_signal['symbol']}
```"""
log(f" #{worst_trade['id']} {worst_trade['symbol']} {pnl_usd:+.2f}U → {new_signal['symbol']}")
send_tg(msg)
execute_open(data, state, new_signal["symbol"], new_signal["price"], new_signal)
def scan():
data = load_trades()
state = load_state()
now = datetime.now(TZ_UTC8)
open_positions = [t for t in data["trades"] if t["status"] == "open"]
open_symbols = set(t["symbol"] for t in open_positions)
if len(open_positions) >= MAX_OPEN_POSITIONS:
return
try:
tickers = get_all_tickers()
funding_rates = get_funding_rates()
except Exception as e:
log(f"APIerror: {e}")
return
exclude = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "FDUSDUSDT", "BTCDOMUSDT", "BTCSTUSDT"}
candidates = [t for t in tickers
if t['symbol'].endswith('USDT')
and t['symbol'] not in exclude
and float(t['quoteVolume']) > MIN_VOLUME_M * 1e6]
signals = []
for ticker in candidates:
symbol = ticker['symbol']
if symbol in open_symbols:
continue
last_open = state.get("last_opens", {}).get(symbol)
if last_open:
try:
last_dt = datetime.fromisoformat(last_open)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=TZ_UTC8)
if (now - last_dt).total_seconds() < COOLDOWN_HOURS * 3600:
continue
except:
pass
fr = funding_rates.get(symbol, 0)
for detect_fn in [
lambda s, f, m: detect_extreme_negative_funding(s, f, m),
lambda s, f, m: detect_extreme_positive_funding(s, f, m),
lambda s, f, m: detect_crash_bounce(ticker),
lambda s, f, m: detect_pump_short(ticker),
]:
try:
signal = detect_fn(symbol, fr, funding_rates)
if signal:
signal["symbol"] = symbol
signal["price"] = float(ticker['lastPrice'])
signal["volume_m"] = float(ticker['quoteVolume']) / 1e6
signals.append(signal)
except:
continue
if not signals:
return
strength_order = {"S": 0, "A": 1, "B": 2}
signals.sort(key=lambda x: strength_order.get(x["strength"], 3))
best = signals[0]
if best["strength"] == "B":
log(f"B: {best['symbol']} {best['reason']}")
return
slots = MAX_OPEN_POSITIONS - len(open_positions)
if slots > 0:
execute_open(data, state, best["symbol"], best["price"], best)
elif best["strength"] == "S":
swap_weakest(data, state, open_positions, best, tickers)
if __name__ == "__main__":
scan()
Utility Tools #
VoiceKey — Speaker Verification #
Date: 2026.04.27 Tags: Python · Security · Telegram · Speaker Verification
Protect your AI agent with voiceprint authentication
Why this tool? More people use Telegram to control AI agents (servers, trading bots, smart home). If your TG account gets compromised, attackers can do anything with your AI. Passwords can be stolen or socially engineered — but your voice is unique and non-transferable. VoiceKey requires a voice message to verify identity before unlocking any commands. Zero AI cost, runs entirely on local CPU.
Full source code #
#!/usr/bin/env python3
"""
VoiceKey — voiceprintkey (Voiceprint Authentication)
Speaker verification for Telegram bot security.
Uses resemblyzer (GE2E model) for speaker embedding extraction
and cosine similarity for verification.
Zero AI cost — runs entirely on local CPU.
Usage:
# Register voiceprint from audio files
python voicekey.py register --audio voice1.ogg voice2.ogg --owner "YourName"
# Verify a voice against stored voiceprint
python voicekey.py verify --audio test.ogg
# As a Python module
from voicekey import VoiceKey
vk = VoiceKey()
vk.register(["voice1.ogg", "voice2.ogg"], owner="YourName")
is_owner, score = vk.verify("test.ogg")
"""
import os
import json
import tempfile
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime
# Lazy imports to speed up module load when not needed
_encoder = None
def _get_encoder():
"""Lazy-load the voice encoder model (first call takes ~1s)."""
global _encoder
if _encoder is None:
from resemblyzer import VoiceEncoder
_encoder = VoiceEncoder()
return _encoder
def _audio_to_wav(audio_path: str) -> str:
"""Convert any audio format to 16kHz mono WAV for processing."""
from pydub import AudioSegment
ext = Path(audio_path).suffix.lower()
if ext in ('.ogg', '.oga'):
audio = AudioSegment.from_ogg(audio_path)
elif ext == '.mp3':
audio = AudioSegment.from_mp3(audio_path)
elif ext == '.wav':
return audio_path # already wav
elif ext in ('.m4a', '.aac'):
audio = AudioSegment.from_file(audio_path, format=ext.lstrip('.'))
else:
audio = AudioSegment.from_file(audio_path)
# Convert to 16kHz mono
audio = audio.set_frame_rate(16000).set_channels(1)
tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
audio.export(tmp.name, format='wav')
return tmp.name
def _extract_embedding(audio_path: str) -> np.ndarray:
"""Extract voice embedding from an audio file."""
from resemblyzer import preprocess_wav
encoder = _get_encoder()
wav_path = _audio_to_wav(audio_path)
wav = preprocess_wav(wav_path)
# Cleanup temp file
if wav_path != audio_path:
os.unlink(wav_path)
if len(wav) < 1600: # Less than 0.1s
raise ValueError(f"Audio too short: {len(wav)/16000:.1f}s (need >0.1s)")
return encoder.embed_utterance(wav)
class VoiceKey:
"""
Speaker verification using voice embeddings.
Attributes:
data_dir: Directory to store voiceprint data
threshold: Cosine similarity threshold for verification (default: 0.75)
voiceprint: The registered owner's voice embedding (256-dim vector)
"""
def __init__(self, data_dir: str = None, threshold: float = 0.75):
if data_dir is None:
data_dir = os.path.expanduser("~/.hermes/voiceprint")
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.threshold = threshold
self.voiceprint = None
self.metadata = {}
self._load()
def _load(self):
"""Load existing voiceprint if available."""
vp_path = self.data_dir / "voiceprint.npy"
meta_path = self.data_dir / "voiceprint_meta.json"
if vp_path.exists():
self.voiceprint = np.load(str(vp_path))
if meta_path.exists():
with open(meta_path) as f:
self.metadata = json.load(f)
self.threshold = self.metadata.get("threshold", self.threshold)
@property
def is_registered(self) -> bool:
"""Check if a voiceprint is registered."""
return self.voiceprint is not None
def register(self, audio_paths: list, owner: str = "owner") -> dict:
"""
Register a voiceprint from multiple audio samples.
Args:
audio_paths: List of audio file paths (ogg, mp3, wav, etc.)
owner: Name of the voiceprint owner
Returns:
dict with registration results
"""
embeddings = []
results = []
for path in audio_paths:
try:
embed = _extract_embedding(path)
embeddings.append(embed)
results.append({"file": os.path.basename(path), "status": "ok"})
except Exception as e:
results.append({"file": os.path.basename(path), "status": "error", "error": str(e)})
if not embeddings:
raise ValueError("No valid audio samples provided")
# Average embeddings and normalize
voiceprint = np.mean(embeddings, axis=0)
voiceprint = voiceprint / np.linalg.norm(voiceprint)
# Save
np.save(str(self.data_dir / "voiceprint.npy"), voiceprint)
self.metadata = {
"owner": owner,
"samples_used": len(embeddings),
"embedding_dim": int(voiceprint.shape[0]),
"threshold": self.threshold,
"created": datetime.now().isoformat(),
}
with open(self.data_dir / "voiceprint_meta.json", "w") as f:
json.dump(self.metadata, f, indent=2)
self.voiceprint = voiceprint
# Self-test
similarities = []
for embed in embeddings:
sim = float(np.dot(voiceprint, embed / np.linalg.norm(embed)))
similarities.append(sim)
return {
"owner": owner,
"samples": len(embeddings),
"self_test_scores": similarities,
"min_score": min(similarities),
"details": results,
}
def verify(self, audio_path: str) -> tuple:
"""
Verify if an audio sample matches the registered voiceprint.
Args:
audio_path: Path to audio file to verify
Returns:
tuple: (is_verified: bool, similarity_score: float)
"""
if not self.is_registered:
raise RuntimeError("No voiceprint registered. Call register() first.")
embed = _extract_embedding(audio_path)
embed = embed / np.linalg.norm(embed)
similarity = float(np.dot(self.voiceprint, embed))
is_verified = similarity >= self.threshold
return is_verified, similarity
def get_info(self) -> dict:
"""Get voiceprint registration info."""
if not self.is_registered:
return {"registered": False}
return {
"registered": True,
**self.metadata,
}
def main():
parser = argparse.ArgumentParser(
description="VoiceKey — Speaker verification for security"
)
sub = parser.add_subparsers(dest="command")
# Register
reg = sub.add_parser("register", help="Register voiceprint from audio files")
reg.add_argument("--audio", nargs="+", required=True, help="Audio files (ogg/mp3/wav)")
reg.add_argument("--owner", default="owner", help="Owner name")
reg.add_argument("--data-dir", default=None, help="Data directory")
reg.add_argument("--threshold", type=float, default=0.75, help="Verification threshold")
# Verify
ver = sub.add_parser("verify", help="Verify audio against voiceprint")
ver.add_argument("--audio", required=True, help="Audio file to verify")
ver.add_argument("--data-dir", default=None, help="Data directory")
# Info
sub.add_parser("info", help="Show voiceprint info")
args = parser.parse_args()
if args.command == "register":
vk = VoiceKey(data_dir=args.data_dir, threshold=args.threshold)
result = vk.register(args.audio, owner=args.owner)
print(f"Registered voiceprint for: {result['owner']}")
print(f"Samples used: {result['samples']}")
print(f"Self-test scores: {[f'{s:.4f}' for s in result['self_test_scores']]}")
print(f"Min score: {result['min_score']:.4f} (threshold: {args.threshold})")
elif args.command == "verify":
vk = VoiceKey(data_dir=getattr(args, 'data_dir', None))
is_ok, score = vk.verify(args.audio)
status = "PASS" if is_ok else "FAIL"
print(f"[{status}] Similarity: {score:.4f} (threshold: {vk.threshold})")
elif args.command == "info":
vk = VoiceKey()
info = vk.get_info()
for k, v in info.items():
print(f" {k}: {v}")
else:
parser.print_help()
if __name__ == "__main__":
main()
Closing notes #
All seven snippets above are pulled from connectfarm1.com on 2026-05-04. They share three philosophies: (1) zero or near-zero AI cost — most use rule engines and free public APIs instead of LLMs; (2) Python only, easy to run on a cheap VPS with crontab or a simple while True; (3) Telegram is the universal output — no dashboards, no front-end, just push messages where you actually read them. Combine the radars to triangulate signals, and treat the autonomous trader as a research sandbox, not a money printer.
Related Articles #
- Free Claude Code: Use Claude Code CLI for Free with Any AI Provider — AI-powered coding assistant
- 28 Tools Behind a $1M Polymarket Trading Bot: Full Stack Breakdown — Prediction market trading automation
- Accept Payments in All Currencies with NowPayments — Crypto payment integration