Code Vault — 7 Công Cụ Mã Nguồn Mở Cho Radar và Giao Dịch Crypto
Code Vault: 7 công cụ giao dịch tiền điện tử thiết yếu cho giao dịch tự động, phân tích thị trường và quản lý danh mục đầu tư. Tiện ích giao dịch mã nguồn mở.
{</* resource-info */>}
Bài viết này tổng hợp đầy đủ 7 đoạn code hiện được công bố trên Code Vault — một kho code cá nhân về radar giao dịch crypto, hệ thống giao dịch tự động và công cụ bảo mật, tất cả viết bằng Python thuần với chi phí API bằng 0 hoặc gần bằng 0. Mỗi đoạn code bên dưới đều kèm mã nguồn đầy đủ để bạn đọc, fork và chạy cục bộ. Sử dụng mục lục bên phải để nhảy đến công cụ cụ thể.
⚠️ Cảnh báo rủi ro — Các công cụ này tiếp xúc trực tiếp với thị trường và dữ liệu on-chain. Một số công cụ gửi cảnh báo thời gian thực qua Telegram, và một công cụ (“AI giao dịch tự động”) mở vị thế ảo trên Binance Futures. Hãy đọc kỹ ghi chú dưới mỗi công cụ. Sử dụng với rủi ro tự chịu; tác giả không bảo đảm bất kỳ kết quả giao dịch nào.
Radar Giao Dịch #
Vitalik Sell Radar (Radar bán của Vitalik) #
Ngày: 2026.05.02 Thẻ: Python · WebSocket · Ethereum · Telegram · Event-Driven
GitHub: vitalik-sell-radar{rel=“nofollow”}
Sự kiện WebSocket · Phát hiện ví Vitalik bán · Đẩy Telegram dưới giây
Theo dõi ví Vitalik Buterin (vitalik.eth) phát hiện các giao dịch bán token ERC-20 qua subscribe sự kiện WebSocket — không polling, độ trễ dưới 1 giây. Tự động phân loại người nhận: DEX Router (Uniswap, 1inch, SushiSwap), ví nóng CEX (Binance, Coinbase, Kraken), hoặc LP Pool. Lấy giá token thời gian thực từ DexScreener. Failover đa RPC với tự động reconnect. Python thuần, chi phí 0 — dùng RPC công khai miễn phí.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
Vitalik Sell Radar — Event-Driven Edition
WebSocket subscription to ERC-20 Transfer events, real-time detection of
Vitalik's sell activity, instant push to Telegram.
Architecture:
1. WebSocket subscribes to Transfer(from=vitalik) events → sub-second detection
2. Classifies sell behavior (transfers to DEX Router / CEX / LP Pool)
3. Queries token info + price via DexScreener
4. Pushes alert to Telegram
5. Auto-reconnect + multi-RPC failover
"""
import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
import aiohttp
import websockets
# Load .env file
def load_env():
env_path = Path(__file__).parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
load_env()
# ============================================================
# Configuration
# ============================================================
# Vitalik's main wallet
VITALIK_ADDRESS = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
VITALIK_PADDED = "0x" + VITALIK_ADDRESS[2:].lower().zfill(64)
# ERC-20 Transfer event topic
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
# WebSocket RPC endpoints (free, support eth_subscribe)
WS_ENDPOINTS = [
"wss://ethereum-rpc.publicnode.com",
"wss://eth.drpc.org",
"wss://ethereum.publicnode.com",
]
# HTTP RPC for querying token info
HTTP_RPC = os.environ.get("HTTP_RPC", "https://eth.drpc.org")
# Telegram
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# Known DEX Router addresses (sell destinations)
KNOWN_DEX_ROUTERS = {
# Uniswap
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap V2 Router",
"0xe592427a0aece92de3edee1f18e0157c05861564": "Uniswap V3 Router",
"0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "Uniswap V3 Router2",
"0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad": "Uniswap Universal Router",
"0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b": "Uniswap Universal Router (old)",
# 1inch
"0x1111111254eeb25477b68fb85ed929f73a960582": "1inch V5",
"0x111111125421ca6dc452d289314280a0f8842a65": "1inch V6",
# SushiSwap
"0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "SushiSwap Router",
# CoW Protocol
"0x9008d19f58aabd9ed0d60971565aa8510560ab41": "CoW Settlement",
# 0x
"0xdef1c0ded9bec7f1a1670819833240f027b25eff": "0x Exchange Proxy",
# Curve
"0x99a58482bd75cbab83b27ec03ca68ff489b5788f": "Curve Router",
}
# Known CEX hot wallets (partial list)
KNOWN_CEX = {
"0x28c6c06298d514db089934071355e5743bf21d60": "Binance Hot Wallet",
"0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance Hot Wallet 2",
"0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance Hot Wallet 3",
"0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance Hot Wallet 4",
"0x71660c4005ba85c37ccec55d0c4493e66fe775d3": "Coinbase",
"0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43": "Coinbase 10",
"0x503828976d22510aad0201ac7ec88293211d23da": "Coinbase 2",
"0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2": "FTX (defunct)",
"0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0": "Kraken",
"0xae2d4617c862309a3d75a0ffb358c7a5009c673f": "Kraken 10",
}
# Minimum notification amount (USD), 0 = notify all
MIN_NOTIFY_USD = int(os.environ.get("MIN_NOTIFY_USD", "0"))
# Reconnect settings
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60
# ============================================================
# Logging
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("vitalik-radar")
# ============================================================
# Global state
# ============================================================
# Dedup set for processed tx hashes (last 1000)
seen_txs: set = set()
seen_txs_list: list = []
# HTTP session
http_session: aiohttp.ClientSession | None = None
# Token info cache: {address: {symbol, name, decimals}}
token_cache: dict = {}
# Running flag
running = True
# ============================================================
# Utility functions
# ============================================================
def shorten_addr(addr: str) -> str:
"""Shorten address for display"""
return f"{addr[:6]}...{addr[-4:]}"
def decode_transfer_value(data_hex: str, decimals: int) -> float:
"""Decode Transfer event value"""
try:
raw = int(data_hex, 16)
return raw / (10 ** decimals)
except:
return 0.0
def topic_to_address(topic: str) -> str:
"""Extract 20-byte address from 32-byte topic"""
return "0x" + topic[-40:]
def classify_recipient(to_addr: str) -> tuple[str, str]:
"""
Classify recipient address.
Returns (type, name)
Type: "dex" / "cex" / "pool" / "unknown"
"""
to_lower = to_addr.lower()
if to_lower in KNOWN_DEX_ROUTERS:
return "dex", KNOWN_DEX_ROUTERS[to_lower]
if to_lower in KNOWN_CEX:
return "cex", KNOWN_CEX[to_lower]
return "unknown", ""
async def get_http_session() -> aiohttp.ClientSession:
global http_session
if http_session is None or http_session.closed:
http_session = aiohttp.ClientSession()
return http_session
async def rpc_call(method: str, params: list) -> dict | None:
"""HTTP JSON-RPC call"""
session = await get_http_session()
try:
async with session.post(
HTTP_RPC,
json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params},
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
data = await resp.json()
return data.get("result")
except Exception as e:
log.error(f"RPC call {method} failed: {e}")
return None
async def get_token_info(token_addr: str) -> dict:
"""Query ERC-20 token info (symbol, name, decimals)"""
addr_lower = token_addr.lower()
if addr_lower in token_cache:
return token_cache[addr_lower]
info = {"symbol": "???", "name": "Unknown", "decimals": 18, "address": token_addr}
# symbol()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x95d89b41"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
symbol_hex = hex_str[offset+64:offset+64+length*2]
info["symbol"] = bytes.fromhex(symbol_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["symbol"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
# decimals()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x313ce567"}, "latest"
])
if result and len(result) > 2:
try:
info["decimals"] = int(result, 16)
except:
pass
# name()
result = await rpc_call("eth_call", [
{"to": token_addr, "data": "0x06fdde03"}, "latest"
])
if result and len(result) > 2:
try:
hex_str = result[2:]
if len(hex_str) >= 128:
offset = int(hex_str[:64], 16) * 2
length = int(hex_str[offset:offset+64], 16)
name_hex = hex_str[offset+64:offset+64+length*2]
info["name"] = bytes.fromhex(name_hex).decode("utf-8", errors="replace").strip('\x00')
elif len(hex_str) == 64:
info["name"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
except:
pass
token_cache[addr_lower] = info
return info
async def check_if_pool(addr: str) -> bool:
"""Check if address is a Uniswap V2/V3 pool (has token0 method)"""
result = await rpc_call("eth_call", [
{"to": addr, "data": "0x0dfe1681"}, "latest" # token0()
])
if result and len(result) == 66: # 0x + 64 hex chars
return True
return False
async def get_eth_price() -> float:
"""Get ETH price from CoinGecko"""
session = await get_http_session()
try:
async with session.get(
"https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
return data.get("ethereum", {}).get("usd", 0)
except:
return 2300 # fallback
async def get_token_price_usd(token_addr: str) -> float | None:
"""Get token USD price from DexScreener (free, no API key needed)"""
session = await get_http_session()
try:
async with session.get(
f"https://api.dexscreener.com/latest/dex/tokens/{token_addr}",
timeout=aiohttp.ClientTimeout(total=5),
) as resp:
data = await resp.json()
pairs = data.get("pairs", [])
if pairs:
# Pick pair with highest liquidity
pairs.sort(key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0), reverse=True)
price = float(pairs[0].get("priceUsd", 0) or 0)
return price if price > 0 else None
except:
pass
return None
# ============================================================
# Telegram notifications
# ============================================================
async def send_telegram(text: str):
"""Send Telegram message"""
if not TG_BOT_TOKEN:
log.warning("[TG] No bot token configured, skipping notification")
return
session = await get_http_session()
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": True,
}
try:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
result = await resp.json()
if not result.get("ok"):
log.error(f"[TG] Send failed: {result.get('description', '')}")
# Retry with plain text if HTML parse fails
if "parse" in result.get("description", "").lower():
payload["parse_mode"] = None
async with session.post(url, json=payload) as resp2:
pass
else:
log.info("[TG] Message sent")
except Exception as e:
log.error(f"[TG] Error: {e}")
# ============================================================
# Event handling
# ============================================================
async def handle_transfer_event(log_entry: dict):
"""Process a single Transfer event"""
tx_hash = log_entry.get("transactionHash", "")
token_addr = log_entry.get("address", "")
topics = log_entry.get("topics", [])
data = log_entry.get("data", "0x0")
# Dedup
dedup_key = f"{tx_hash}:{token_addr}"
if dedup_key in seen_txs:
return
seen_txs.add(dedup_key)
seen_txs_list.append(dedup_key)
# Cleanup (keep last 1000)
while len(seen_txs_list) > 1000:
old = seen_txs_list.pop(0)
seen_txs.discard(old)
# Parse topics: [Transfer, from, to]
if len(topics) < 3:
return
from_addr = topic_to_address(topics[1])
to_addr = topic_to_address(topics[2])
# Confirm it's from Vitalik
if from_addr.lower() != VITALIK_ADDRESS.lower():
return
# Get token info
token_info = await get_token_info(token_addr)
symbol = token_info["symbol"]
decimals = token_info["decimals"]
amount = decode_transfer_value(data, decimals)
if amount == 0:
return
# Classify recipient
recipient_type, recipient_name = classify_recipient(to_addr)
# If unknown, check if it's an LP pool
is_pool = False
if recipient_type == "unknown":
is_pool = await check_if_pool(to_addr)
if is_pool:
recipient_type = "pool"
recipient_name = "LP Pool"
# Determine if this is a "sell" action
is_sell = recipient_type in ("dex", "cex", "pool")
# If not a sell, just a regular transfer — log silently
if not is_sell:
log.info(f"[Transfer] {symbol} {amount:,.2f} → {shorten_addr(to_addr)} (regular transfer, not notifying)")
return
# Get price
token_price = await get_token_price_usd(token_addr)
usd_value = amount * token_price if token_price else None
# Below minimum notification amount — skip
if usd_value is not None and usd_value < MIN_NOTIFY_USD:
log.info(f"[Sell] {symbol} ${usd_value:.0f} < ${MIN_NOTIFY_USD} minimum, skipping")
return
# Build notification message
timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S UTC")
sell_type_label = {
"dex": "🔄 DEX Sell",
"cex": "🏦 CEX Transfer",
"pool": "🌊 Pool Sell",
}
msg_lines = [
f"<b>🚨 Vitalik Sell Signal</b>",
f"",
f"Token: <b>{symbol}</b>",
f"Amount: {amount:,.4f}",
]
if usd_value is not None:
msg_lines.append(f"Value: <b>${usd_value:,.0f}</b>")
if token_price is not None:
msg_lines.append(f"Price: ${token_price:,.8f}")
msg_lines.extend([
f"",
f"Type: {sell_type_label.get(recipient_type, 'Unknown')}",
f"Destination: {recipient_name or shorten_addr(to_addr)}",
f"Time: {timestamp}",
f"",
f"TX: https://etherscan.io/tx/{tx_hash}",
f"Token: https://dexscreener.com/ethereum/{token_addr}",
])
msg = "\n".join(msg_lines)
log.info(f"[SELL DETECTED] {symbol} {amount:,.4f} → {recipient_name or to_addr} | ${usd_value or '?'}")
await send_telegram(msg)
# ============================================================
# WebSocket listener main loop
# ============================================================
async def subscribe_and_listen(ws_url: str):
"""Connect to WebSocket and subscribe to Vitalik Transfer events"""
log.info(f"Connecting to {ws_url}...")
async with websockets.connect(
ws_url,
ping_interval=20,
ping_timeout=30,
close_timeout=10,
max_size=2**20, # 1MB
) as ws:
# Subscribe to Transfer FROM Vitalik
sub_from = {
"jsonrpc": "2.0", "id": 1,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_from))
resp = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(resp)
if "error" in data:
raise Exception(f"Subscribe failed: {data['error']}")
sub_id_from = data.get("result", "")
log.info(f"✅ Subscribed to Transfer FROM Vitalik (id={sub_id_from[:10]}...)")
# Also subscribe to Transfer TO Vitalik (monitor buys/receives)
sub_to = {
"jsonrpc": "2.0", "id": 2,
"method": "eth_subscribe",
"params": ["logs", {
"topics": [TRANSFER_TOPIC, None, VITALIK_PADDED]
}]
}
await ws.send(json.dumps(sub_to))
resp2 = await asyncio.wait_for(ws.recv(), timeout=10)
data2 = json.loads(resp2)
sub_id_to = data2.get("result", "")
if sub_id_to:
log.info(f"✅ Subscribed to Transfer TO Vitalik (id={sub_id_to[:10]}...)")
log.info("🔍 Monitoring Vitalik wallet... waiting for events")
# Listen for events
async for message in ws:
if not running:
break
try:
evt = json.loads(message)
if "params" not in evt:
continue
sub_id = evt["params"].get("subscription", "")
log_entry = evt["params"].get("result", {})
if sub_id == sub_id_from:
# Vitalik sent tokens → check if it's a sell
await handle_transfer_event(log_entry)
elif sub_id == sub_id_to:
# Vitalik received tokens — log silently
token_addr = log_entry.get("address", "")
token_info = await get_token_info(token_addr)
data_hex = log_entry.get("data", "0x0")
amount = decode_transfer_value(data_hex, token_info["decimals"])
log.debug(f"[Receive] {token_info['symbol']} +{amount:,.4f}")
except json.JSONDecodeError:
continue
except Exception as e:
log.error(f"Event handling error: {e}", exc_info=True)
async def main():
"""Main loop with auto-reconnect"""
global running
# Validate required config
if not TG_BOT_TOKEN:
log.warning("TG_BOT_TOKEN not set — Telegram notifications disabled")
if not TG_CHAT_ID:
log.warning("TG_CHAT_ID not set — Telegram notifications disabled")
# Signal handling
def shutdown(sig, frame):
global running
log.info(f"Received signal {sig}, shutting down...")
running = False
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# Startup notice
log.info("=" * 50)
log.info("Vitalik Sell Radar — Started")
log.info(f"Monitoring: {VITALIK_ADDRESS}")
log.info(f"Min notify: ${MIN_NOTIFY_USD}")
log.info(f"Telegram: {'✅ Configured' if TG_BOT_TOKEN else '❌ Not configured'}")
log.info("=" * 50)
if TG_BOT_TOKEN and TG_CHAT_ID:
await send_telegram(
"🟢 Vitalik Sell Radar — Started\n\n"
f"Monitoring: {shorten_addr(VITALIK_ADDRESS)}\n"
f"Min notify: ${MIN_NOTIFY_USD}\n"
"Mode: WebSocket event-driven (sub-second latency)"
)
endpoint_idx = 0
reconnect_delay = RECONNECT_DELAY
while running:
ws_url = WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]
try:
await subscribe_and_listen(ws_url)
reconnect_delay = RECONNECT_DELAY # reset on success
except websockets.exceptions.ConnectionClosed as e:
log.warning(f"WebSocket closed: {e}")
except asyncio.TimeoutError:
log.warning("WebSocket timeout")
except Exception as e:
log.error(f"WebSocket error: {e}")
if not running:
break
# Switch endpoint and retry
endpoint_idx += 1
log.info(f"Reconnecting in {reconnect_delay}s... (next: {WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]})")
await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 1.5, MAX_RECONNECT_DELAY)
# Cleanup
if http_session and not http_session.closed:
await http_session.close()
log.info("Vitalik Sell Radar — Stopped")
if __name__ == "__main__":
asyncio.run(main())
On-Chain Narrative Radar (Radar tường thuật on-chain) #
Ngày: 2026.04.28 Thẻ: Python · GMGN · DEXScreener · Telegram
Theo động lượng · Phát hiện token on-chain mới · ETH/SOL/BSC/Base
Động lượng là động cơ đẩy duy nhất — tường thuật chỉ là nhãn phân loại. Quét 30 giây mỗi lần trên 4 chain. Token phải tăng vốn hóa 3 lần liên tiếp với tổng tăng ≥5% mới kích hoạt cảnh báo. Tường thuật (Musk/Trump, Binance/CZ, người nổi tiếng) được phân loại ★★★/★★/★ nhưng không tự kích hoạt push. Kiểm tra an toàn: SOL dùng RugCheck, EVM dùng GoPlus. Python thuần, chi phí AI = 0.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
→ v1
Python,AI( + )
:
1. — /,
2. / — ETH+SOL,BSC
3. /CZ — BSC
:GMGN + DEXScreener
:SQLite
"""
import requests
import json
import time
import os
import re
import sqlite3
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from difflib import SequenceMatcher
# === ===
DATA_DIR = os.path.expanduser("~/crypto-trading")
DB_FILE = os.path.join(DATA_DIR, "narrative_history.db")
LOG_FILE = os.path.join(DATA_DIR, "narrative_radar.log")
SEEN_FILE = os.path.join(DATA_DIR, "narrative_seen.json")
FLAP_SEEN_FILE = os.path.join(DATA_DIR, "flap_seen.json")
#
SCAN_INTERVAL = 30 # 30(GMGN1-5,10)
# — /
# {address: [{'ts': timestamp, 'mc': market_cap, 'vol': volume, 'price': price}, ...]}
MOMENTUM_TRACKER = {}
MOMENTUM_PUSHED = {} # {address: {'count': N, 'last_ts': ts, 'last_mc': mc}}
MOMENTUM_CONSECUTIVE_UP = 3 # 3()
# .envTG
def load_env():
env = {}
env_file = os.path.expanduser("~/.env")
if os.path.exists(env_file):
with open(env_file) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v
return env
ENV = load_env()
TG_TOKEN = ENV.get('TELEGRAM_BOT_TOKEN', '')
TG_CHAT_ID = int(os.environ.get('TG_CHAT_ID', '0'))
GMGN_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Accept': 'application/json',
'Referer': 'https://gmgn.ai/',
}
# ============================================================
# /()
# ============================================================
MUSK_TRUMP_KEYWORDS = {
#
'musk', 'elon', 'elonmusk',
# SpaceX/Tesla/X
'spacex', 'starship', 'tesla', 'cybertruck', 'roadster',
'neuralink', 'boring', 'hyperloop', 'xai', 'grok',
# //
'floki', 'shiba', #
'doge father', 'dogefather', 'technoking',
'mars colony', 'mars',
#
'trump', 'donald', 'maga', 'potus', 'trump47',
'melania', 'barron', 'ivanka',
#
'dark maga', 'darkmaga', 'ultra maga', 'save america',
'truth social', 'covfefe',
# +
'doge department', 'd.o.g.e', 'government efficiency',
}
# /()
MUSK_TRUMP_PATTERNS = [
r'\belon\b', r'\bmusk\b', r'\btrump\b', r'\bmaga\b',
r'\bspacex\b', r'\bstarship\b', r'\btesla\b', r'\bgrok\b',
r'\bmelania\b', r'\bbarron\b', r'\bdoge\s*department\b',
r'\bd\.?o\.?g\.?e\b', # D.O.G.E
r'\bx\s*ai\b', r'\bneuralink\b',
]
# ============================================================
# /CZ
# ============================================================
BINANCE_CZ_KEYWORDS = {
# CZ
'cz', 'changpeng', 'zhao', 'czb', 'czbinance',
# (BSC!)
'heyi', 'yi he', 'he yi', '', 'yihe',
'sister yi', 'yi jie', '', '',
#
'binance', 'bnb', 'pancake', 'pancakeswap',
# CZ(、、)
'giggle academy', 'binance life', 'bnb chain',
'principles', 'cz book',
# YZi Labs (Binance Labs)
'yzi', 'yzi labs',
# (BSC)
'', '', '', 'cz', '',
# Four.meme
'fourmeme', 'four meme', '4meme',
# CZ/
'czs dog', 'cz dog', 'bnb dog',
'build on bnb', 'bnb ecosystem',
}
BINANCE_CZ_PATTERNS = [
r'\bcz\b', r'\bbinance\b', r'\bbnb\b',
r'\bheyi\b', r'\byi\s*he\b', r'\bhe\s*yi\b',
r'\b\b', r'\b\b',
r'\bpancake\b', r'\bgiggle\b', r'\byzi\b',
r'\bfourmeme\b', r'\b4meme\b',
]
# ============================================================
# /(★★)
# ============================================================
CELEBRITY_VIRAL_KEYWORDS = {
#
'vitalik', 'buterin', 'sam altman', 'satoshi',
'michael saylor', 'saylor', 'cathie wood',
'jack dorsey', 'zuckerberg', 'bezos',
'jensen huang', 'nvidia', 'tim cook',
#
'justin sun', 'sun yuchen', '', 'tron',
'arthur hayes', 'su zhu', '3ac',
'brian armstrong', 'coinbase',
'larry fink', 'blackrock',
'gary gensler', 'sec',
'michael novogratz', 'galaxy',
# /
'biden', 'obama', 'putin', 'xi jinping',
'kanye', 'drake', 'snoop dogg', 'paris hilton',
'mark cuban', 'mr beast', 'mrbeast',
# ()
'lobster', '', 'lobsta',
'hawk tuah', 'griddy', 'skibidi',
'rizz', 'sigma', 'gyatt',
#
'etf', 'halving', '',
'world war', 'wwiii',
'fed', 'rate cut', '',
'tiktok ban', 'tiktok',
}
CELEBRITY_VIRAL_PATTERNS = [
r'\bvitalik\b', r'\bsaylor\b', r'\bblackrock\b',
r'\bcoinbase\b', r'\bjustin\s*sun\b', r'\blobster\b',
r'\betf\b', r'\bhalving\b', r'\bmrbeast\b',
r'\bsnoop\b', r'\bkanye\b', r'\bdrake\b',
]
# ============================================================
# (/)
# ============================================================
SPAM_PATTERNS = [
r'airdrop', r'presale', r'pre\s*sale',
r'1000x', r'100x guaranteed',
r'safe\s*moon', r'baby\s*\w+', # babydoge
r'pornhub', r'porn', r'xxx', r'nsfw',
r'nigga', r'nigger', r'faggot',
r'scam', r'rugpull', r'rug\s*pull',
r'official\s*token', r'official\s*coin',
]
# ()
COMMON_NOISE_WORDS = {
'nice', 'good', 'bad', 'cool', 'hot', 'big', 'small',
'life', 'love', 'hate', 'happy', 'sad', 'fun', 'lol',
'cat', 'dog', 'moon', 'sun', 'star', 'king', 'queen',
'gold', 'rich', 'cash', 'money', 'pay', 'buy', 'sell',
'pump', 'dump', 'bull', 'bear', 'green', 'red',
'hello', 'world', 'yes', 'no', 'wow', 'omg', 'lmao',
'simp', 'chad', 'based', 'cope', 'seethe',
'test', 'new', 'old', 'real', 'fake',
#
'shit', 'shitcoin', 'fuck', 'fart', 'poop', 'pee',
'cum', 'dick', 'ass', 'boob', 'tit',
'nigga', 'retard', 'slop',
#
'the', 'and', 'for', 'from', 'with', 'this', 'that',
'coin', 'token', 'meme', 'pepe', 'wojak',
'peg', 'usd', 'usdt', 'usdc', 'dai',
}
# ============================================================
#
# ============================================================
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
os.makedirs(DATA_DIR, exist_ok=True)
with open(LOG_FILE, 'a') as f:
f.write(line + '\n')
def load_flap_seen():
if os.path.exists(FLAP_SEEN_FILE):
try:
with open(FLAP_SEEN_FILE) as f:
return json.load(f)
except:
pass
return {}
def save_flap_seen(data):
# 7
cutoff = int(time.time()) - 86400 * 7
data = {k: v for k, v in data.items() if v > cutoff}
with open(FLAP_SEEN_FILE, 'w') as f:
json.dump(data, f)
def tg_send(text, parse_mode='Markdown'):
if not TG_TOKEN:
log(f"[TG] No token, skip: {text[:80]}")
return False
try:
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text, 'parse_mode': parse_mode},
timeout=10
)
result = resp.json()
if not result.get('ok'):
# Markdown
if 'can\'t parse' in str(result.get('description', '')).lower():
resp = requests.post(
f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
json={'chat_id': TG_CHAT_ID, 'text': text},
timeout=10
)
else:
log(f"[TG] Error: {result.get('description', '')}")
return False
return True
except Exception as e:
log(f"[TG] Send error: {e}")
return False
# ============================================================
#
# ============================================================
def init_db():
"""SQLite"""
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
#
c.execute('''CREATE TABLE IF NOT EXISTS narratives (
id INTEGER PRIMARY KEY AUTOINCREMENT,
theme TEXT NOT NULL, -- ()
first_token_name TEXT, --
first_token_address TEXT, --
first_chain TEXT, --
first_seen_at INTEGER, --
token_count INTEGER DEFAULT 1, --
last_seen_at INTEGER --
)''')
#
c.execute('''CREATE TABLE IF NOT EXISTS tokens_seen (
address TEXT PRIMARY KEY,
chain TEXT,
name TEXT,
symbol TEXT,
narrative_theme TEXT,
category TEXT, -- 'musk_trump' / 'binance_cz' / 'novel' / 'common'
first_seen_at INTEGER,
market_cap REAL,
pushed INTEGER DEFAULT 0, --
seen_count INTEGER DEFAULT 1 --
)''')
#
c.execute('CREATE INDEX IF NOT EXISTS idx_theme ON narratives(theme)')
c.execute('CREATE INDEX IF NOT EXISTS idx_addr ON tokens_seen(address)')
conn.commit()
return conn
def normalize_theme(name, symbol):
"""
+
:'Elon Mars Colony' → 'elon mars colony'
'TRUMP2028' → 'trump'
'PancakeBunny' → 'pancake bunny'
"""
# namesymbol
text = f"{name} {symbol}".lower().strip()
# /
noise = ['token', 'coin', 'inu', 'swap', 'finance', 'protocol',
'dao', 'defi', 'nft', 'meta', 'verse', 'fi', 'ai',
'pepe', 'wojak', 'chad', 'based']
# camelCase
text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
# (2028、1000x)
text = re.sub(r'\d+x?', '', text)
#
text = re.sub(r'[^a-z\s]', ' ', text)
#
words = [w for w in text.split() if w and len(w) > 1 and w not in noise]
if not words:
return name.lower().strip()
return ' '.join(sorted(set(words)))
def is_similar_theme(theme1, theme2, threshold=0.7):
""""""
if theme1 == theme2:
return True
#
if theme1 in theme2 or theme2 in theme1:
return True
#
words1 = set(theme1.split())
words2 = set(theme2.split())
if words1 and words2:
overlap = len(words1 & words2) / min(len(words1), len(words2))
if overlap >= 0.6:
return True
#
return SequenceMatcher(None, theme1, theme2).ratio() >= threshold
def check_narrative_novelty(conn, theme, name, symbol, address, chain):
"""
:
('novel', None) —
('heating', narrative_row) — !!
('existing', existing_theme_row) — ,
:302+ =
"""
c = conn.cursor()
now = int(time.time())
HEAT_WINDOW = 1800 # 30
HEAT_THRESHOLD = 2 # 2
#
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives WHERE theme = ?', (theme,))
exact = c.fetchone()
if exact:
row_id, _, _, _, _, first_seen, count, last_seen = exact
#
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE theme = ?',
(new_count, now, theme))
conn.commit()
# :HEAT_WINDOW
if now - first_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
# :()
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', exact)
return ('existing', exact)
# — 1000
c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives ORDER BY last_seen_at DESC LIMIT 1000')
for row in c.fetchall():
if is_similar_theme(theme, row[1]):
row_id, _, _, _, _, first_seen, count, last_seen = row
new_count = count + 1
c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE id = ?',
(new_count, now, row[0]))
conn.commit()
#
if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
return ('heating', row)
return ('existing', row)
# —
c.execute('''INSERT INTO narratives (theme, first_token_name, first_token_address, first_chain, first_seen_at, last_seen_at)
VALUES (?, ?, ?, ?, ?, ?)''',
(theme, name, address, chain, now, now))
conn.commit()
return ('novel', None)
def get_token_seen_count(conn, address):
""""""
c = conn.cursor()
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
row = c.fetchone()
return row[0] if row else 0
def is_token_seen(conn, address):
""""""
c = conn.cursor()
c.execute('SELECT address FROM tokens_seen WHERE address = ?', (address,))
return c.fetchone() is not None
def record_token(conn, address, chain, name, symbol, theme, category, mc, pushed=False):
""" — +1"""
c = conn.cursor()
#
c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
existing = c.fetchone()
if existing:
# :+1,
new_count = existing[0] + 1
c.execute('''UPDATE tokens_seen SET seen_count = ?, market_cap = ?, category = ?
WHERE address = ?''', (new_count, mc, category, address))
else:
#
c.execute('''INSERT INTO tokens_seen
(address, chain, name, symbol, narrative_theme, category, first_seen_at, market_cap, pushed, seen_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)''',
(address, chain, name, symbol, theme, category, int(time.time()), mc, 1 if pushed else 0))
conn.commit()
# ============================================================
#
# ============================================================
def classify_narrative(name, symbol, chain):
"""
:('musk_trump', matched_keywords) / ('binance_cz', matched_keywords) / ('novel', None) / ('common', None)
"""
text = f"{name} {symbol}".lower()
# 1.
for pat in SPAM_PATTERNS:
if re.search(pat, text, re.IGNORECASE):
return ('spam', None)
# 2. /
matched_mt = []
for kw in MUSK_TRUMP_KEYWORDS:
if kw.lower() in text:
matched_mt.append(kw)
if not matched_mt:
for pat in MUSK_TRUMP_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_mt.append(m.group())
if matched_mt:
# /:ETH+SOL,BSC
chain_lower = chain.lower()
if chain_lower in ('eth', 'ethereum', 'sol', 'solana', 'bsc', 'base'):
return ('musk_trump', matched_mt)
# 3. /CZ — BSC
matched_bc = []
for kw in BINANCE_CZ_KEYWORDS:
if kw.lower() in text:
matched_bc.append(kw)
if not matched_bc:
for pat in BINANCE_CZ_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_bc.append(m.group())
if matched_bc:
chain_lower = chain.lower()
if chain_lower in ('bsc',):
return ('binance_cz', matched_bc)
else:
return ('binance_cz_wrong_chain', matched_bc)
# 4. /(★★)
matched_cv = []
for kw in CELEBRITY_VIRAL_KEYWORDS:
if kw.lower() in text:
matched_cv.append(kw)
if not matched_cv:
for pat in CELEBRITY_VIRAL_PATTERNS:
m = re.search(pat, text, re.IGNORECASE)
if m:
matched_cv.append(m.group())
if matched_cv:
return ('celebrity_viral', matched_cv)
# 5. →
return ('check_novelty', None)
# ============================================================
# ()
# ============================================================
def check_token_safety(chain, address):
""" — (/),"""
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://api.rugcheck.xyz/v1/tokens/{address}/report', timeout=10)
if r.status_code == 200:
data = r.json()
score = data.get('score', 999)
mint = data.get('mintAuthority')
freeze = data.get('freezeAuthority')
return {
'safe': not mint and not freeze,
'score': score, 'mint': mint is not None,
'freeze': freeze is not None
}
except:
pass
else:
chain_map = {'ethereum': '1', 'eth': '1', 'bsc': '56', 'base': '8453'}
cid = chain_map.get(chain, '1')
try:
r = requests.get(f'https://api.gopluslabs.io/api/v1/token_security/{cid}?contract_addresses={address}', timeout=10)
if r.status_code == 200:
result = r.json().get('result', {})
data = result.get(address.lower(), {})
if data:
honeypot = data.get('is_honeypot', '0') == '1'
mintable = data.get('is_mintable', '0') == '1'
sell_tax = float(data.get('sell_tax', '0') or '0')
buy_tax = float(data.get('buy_tax', '0') or '0')
return {
'safe': not honeypot and not mintable, #
'honeypot': honeypot, 'mintable': mintable,
'sell_tax': sell_tax, 'buy_tax': buy_tax
}
except:
pass
return {'safe': False, 'reason': ''} # ,
# ============================================================
# GMGN
# ============================================================
def gmgn_get(url):
try:
resp = requests.get(url, headers=GMGN_HEADERS, timeout=15)
if resp.status_code == 200:
return resp.json().get('data', {})
except:
pass
return {}
def fetch_token_description(chain, address):
"""/ — """
desc = ''
# SOL:Pump.fundescription
if chain in ('sol', 'solana'):
try:
r = requests.get(f'https://frontend-api-v3.pump.fun/coins/{address}', timeout=8)
if r.status_code == 200:
data = r.json()
desc = data.get('description', '') or ''
twitter = data.get('twitter', '') or ''
telegram = data.get('telegram', '') or ''
website = data.get('website', '') or ''
return {
'description': desc.strip(),
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
# :DEXScreener info(+)
try:
chain_dex = {'sol': 'solana', 'eth': 'ethereum', 'bsc': 'bsc', 'base': 'base',
'solana': 'solana', 'ethereum': 'ethereum'}.get(chain, chain)
r = requests.get(f'https://api.dexscreener.com/latest/dex/tokens/{address}', timeout=8)
if r.status_code == 200:
pairs = r.json().get('pairs', [])
if pairs:
info = pairs[0].get('info', {})
websites = info.get('websites', [])
socials = info.get('socials', [])
twitter = ''
telegram = ''
website = ''
for s in socials:
if s.get('type') == 'twitter':
twitter = s.get('url', '')
elif s.get('type') == 'telegram':
telegram = s.get('url', '')
for w in websites:
if w.get('label', '').lower() == 'website':
website = w.get('url', '')
if not desc:
# DEXScreenerdescription
return {
'description': desc,
'twitter': twitter,
'telegram': telegram,
'website': website,
}
except:
pass
return {'description': desc, 'twitter': '', 'telegram': '', 'website': ''}
def fetch_new_tokens():
"""GMGN + """
all_tokens = []
seen_addrs = set()
for chain in ['eth', 'bsc', 'base']:
# ,
urls = [
# —
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=open_timestamp&direction=desc&limit=100',
# —
f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=swaps&direction=desc&limit=50',
]
for url in urls:
data = gmgn_get(url)
tokens = data.get('rank', [])
for t in tokens:
addr = t.get('address', '')
if not addr or addr in seen_addrs:
continue
mc = t.get('market_cap', 0) or t.get('fdv', 0) or 0
liq = t.get('liquidity', 0) or 0
# :
if mc < 1000 or liq < 500 or mc > 10000000:
continue
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 999
# — :,
seen_addrs.add(addr)
all_tokens.append({
'address': addr,
'chain': chain,
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': t.get('volume', 0) or 0,
'holders': t.get('holder_count', 0) or 0,
'sm': t.get('smart_degen_count', 0) or 0,
'chg_1h': t.get('price_change_percent1h', 0) or 0,
'chg_24h': t.get('price_change_percent', 0) or 0,
'age_h': age_h,
'price': t.get('price', 0),
'buys_1h': t.get('buys', 0) or 0,
'sells_1h': t.get('sells', 0) or 0,
})
time.sleep(0.3)
return all_tokens
def fetch_flap_tokens():
"""
FLAP — BSC
:()
:24h,1h/,>,holders
"""
data = gmgn_get(
'https://gmgn.ai/defi/quotation/v1/rank/bsc/swaps/24h?launchpad=flap&orderby=volume&direction=desc&limit=30'
)
tokens = data.get('rank', [])
candidates = []
for t in tokens:
addr = t.get('address', '')
if not addr:
continue
mc = t.get('market_cap', 0) or 0
liq = t.get('liquidity', 0) or 0
vol = t.get('volume', 0) or 0
holders = t.get('holder_count', 0) or 0
buys = t.get('buys', 0) or 0
sells = t.get('sells', 0) or 0
chg_1h = t.get('price_change_percent1h', 0) or 0
chg_24h = t.get('price_change_percent', 0) or 0
age_ts = t.get('open_timestamp', 0)
age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 0
#
if mc < 1000 or liq < 500:
continue
if holders < 5:
continue
# :
# 1: 24h(),
# 2: 1h24h,
# 3: > ,
buy_ratio = buys / max(sells, 1)
is_support = False
reason = ''
# A: 24h,1h/
if chg_24h < -10 and chg_1h > chg_24h * 0.3:
is_support = True
reason = f'24h{chg_24h:.0f}%1h{chg_1h:+.0f}%'
# B: 24h,1h,
if -10 <= chg_24h <= 30 and chg_1h > -5 and buy_ratio > 1.1:
is_support = True
reason = f' {buy_ratio:.2f}'
# C:
if chg_24h < -30 and chg_1h > 10:
is_support = True
reason = f'{chg_24h:.0f}%{chg_1h:+.0f}%'
if is_support and buy_ratio >= 1.0:
candidates.append({
'address': addr,
'chain': 'bsc',
'name': t.get('name', '?'),
'symbol': t.get('symbol', '?'),
'mc': mc,
'liq': liq,
'volume': vol,
'holders': holders,
'sm': 0,
'chg_1h': chg_1h,
'chg_24h': chg_24h,
'age_h': age_h,
'price': t.get('price', 0),
'buys': buys,
'sells': sells,
'buy_ratio': buy_ratio,
'support_reason': reason,
'launchpad': 'flap',
})
#
candidates.sort(key=lambda x: x['mc'], reverse=True)
return candidates
def format_flap_alert(token, desc_info=None):
"""FLAP"""
msg = f" — FLAP\n"
msg += f": BSC | : FLAP\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
#
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {token['support_reason']}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f" ${token['liq']:>12,.0f}\n"
msg += f"24h ${token['volume']:>12,.0f}\n"
msg += f" {token['holders']:>12,d}\n"
msg += f"/ {token['buys']:>6,d}/{token['sells']:>6,d}\n"
msg += f" {token['buy_ratio']:>12.2f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
msg += f"24h {token['chg_24h']:>+11.1f}%\n"
msg += f"```\n"
msg += "\nFLAP — "
#
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
#
# ============================================================
def format_musk_trump_alert(token, matched_kw, desc_info=None):
"""/"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f" — /\n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
# (!)
desc = (desc_info or {}).get('description', '')
if desc:
# 200,
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f" ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```\n"
#
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_binance_cz_alert(token, matched_kw, desc_info=None):
"""/CZ"""
msg = f" — /CZ\n"
msg += f": BSC\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
#
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f" ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```\n"
#
links = []
if (desc_info or {}).get('twitter'):
links.append(f"Twitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n' + '\n'.join(links)
return msg
def format_novel_narrative_alert(token, theme, desc_info=None):
""" — """
return format_heating_narrative_alert(token, theme, 1, desc_info)
def format_heating_narrative_alert(token, theme, count, desc_info=None):
""" — """
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f" — \n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
#
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 300:
desc = desc[:300] + '...'
msg += f": {desc}\n\n"
else:
msg += f": {theme}\n\n"
msg += f"{count} — \n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f" ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['holders']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```"
#
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
# — +
# ============================================================
def track_momentum(tokens):
"""
。
+ = ,。
"""
global MOMENTUM_TRACKER, MOMENTUM_PUSHED
now = time.time()
alerts = []
#
current_addrs = set()
for token in tokens:
addr = token['address']
mc = token['mc']
vol = token.get('volume', 0) or 0
price = token.get('price', 0) or 0
buys = token.get('buys_1h', 0) or token.get('buys', 0) or 0
current_addrs.add(addr)
#
if mc < 1000 or token.get('liq', 0) < 500 or mc > 10000000:
continue
# — (GMGN)
if addr not in MOMENTUM_TRACKER:
MOMENTUM_TRACKER[addr] = []
snapshots = MOMENTUM_TRACKER[addr]
# ()
if snapshots and snapshots[-1]['mc'] == mc and snapshots[-1]['vol'] == vol:
continue # ,
snapshots.append({
'ts': now,
'mc': mc,
'vol': vol,
'price': price,
'buys': buys,
})
# 20(200)
if len(snapshots) > 20:
snapshots[:] = snapshots[-20:]
# 3
if len(snapshots) < MOMENTUM_CONSECUTIVE_UP:
continue
# N
recent = snapshots[-MOMENTUM_CONSECUTIVE_UP:]
consecutive_up = True
total_gain = 0
for i in range(1, len(recent)):
prev_mc = recent[i-1]['mc']
curr_mc = recent[i]['mc']
if prev_mc <= 0:
consecutive_up = False
break
gain = (curr_mc - prev_mc) / prev_mc
if gain <= 0: #
consecutive_up = False
break
total_gain += gain
if not consecutive_up:
continue
# !()
vol_increasing = True
for i in range(1, len(recent)):
if recent[i]['buys'] < recent[i-1]['buys'] * 0.8: #
vol_increasing = False
break
#
first_mc = recent[0]['mc']
last_mc = recent[-1]['mc']
pct_gain = ((last_mc - first_mc) / first_mc * 100) if first_mc > 0 else 0
# : + >5%
if pct_gain < 5:
continue
# :,+1
push_info = MOMENTUM_PUSHED.get(addr, {'count': 0, 'last_ts': 0, 'last_mc': 0})
# ()
if push_info['count'] > 0 and last_mc <= push_info['last_mc']:
continue
push_info['count'] += 1
push_info['last_ts'] = now
push_info['last_mc'] = last_mc
signal_count = push_info['count']
#
safety = check_token_safety(token['chain'], addr)
if not safety.get('safe'):
continue
# →
category, matched_kw = classify_narrative(token['name'], token['symbol'], token['chain'])
is_flap = token.get('launchpad') == 'flap'
if category == 'musk_trump':
stars = 3
narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
elif category == 'binance_cz':
stars = 3
narrative_tag = f"/CZ ({', '.join(matched_kw[:3])})"
elif category == 'celebrity_viral':
stars = 2
narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
elif is_flap:
stars = 2
narrative_tag = "FLAP"
else:
#
theme = normalize_theme(token['name'], token['symbol'])
theme_words = [w for w in theme.split() if w not in COMMON_NOISE_WORDS and len(w) > 2]
if len(theme_words) >= 2:
stars = 2
narrative_tag = f": {theme}"
else:
stars = 1
narrative_tag = ""
#
desc_info = fetch_token_description(token['chain'], addr)
# FLAP/CTO
if is_flap:
has_twitter = bool(desc_info.get('twitter'))
has_tg = bool(desc_info.get('telegram'))
has_web = bool(desc_info.get('website'))
community_tags = []
if has_twitter:
community_tags.append("")
if has_tg:
community_tags.append("TG")
if has_web:
community_tags.append("")
if community_tags:
narrative_tag += f" | {' '.join(community_tags)}"
stars = min(3, stars + 1) #
else:
narrative_tag += " | "
msg = format_momentum_alert(token, pct_gain, len(recent), vol_increasing, stars, narrative_tag, desc_info, signal_count)
alerts.append({'msg': msg, 'token': token})
MOMENTUM_PUSHED[addr] = push_info
log(f"[{signal_count}] {token['name']} ({token['symbol']}) on {token['chain']} — {len(recent)} +{pct_gain:.1f}%")
#
stale = [a for a in MOMENTUM_TRACKER if a not in current_addrs]
for a in stale:
if now - MOMENTUM_TRACKER[a][-1]['ts'] > 600: # 10
del MOMENTUM_TRACKER[a]
# — 1
MOMENTUM_PUSHED = {k: v for k, v in MOMENTUM_PUSHED.items() if now - v.get('last_ts', 0) < 3600}
return alerts
def format_momentum_alert(token, pct_gain, rounds, vol_up, stars, narrative_tag, desc_info=None, seen_count=0):
""" — """
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
vol_tag = "" if vol_up else ""
star_str = "★" * stars + "☆" * (3 - stars)
#
msg = f"\n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
#
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {narrative_tag}\n"
msg += f"{rounds} +{pct_gain:.1f}% {vol_tag}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f" ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```\n"
msg += f": {star_str} : {seen_count}"
#
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if (desc_info or {}).get('website'):
links.append(f"Web: {desc_info['website']}")
if links:
msg += '\n'.join(links)
return msg
def format_celebrity_alert(token, matched_kw, desc_info=None):
"""/ ★★"""
chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
ch = chain_map.get(token['chain'], token['chain'].upper())
msg = f" — / ★★\n"
msg += f": {ch}\n\n"
msg += f"{token['name']} ({token['symbol']})\n"
msg += f"`{token['address']}`\n\n"
desc = (desc_info or {}).get('description', '')
if desc:
if len(desc) > 200:
desc = desc[:200] + '...'
msg += f": {desc}\n\n"
msg += f": {', '.join(matched_kw[:5])}\n\n"
msg += f"```\n"
msg += f" ${token['mc']:>12,.0f}\n"
msg += f" ${token['liq']:>12,.0f}\n"
msg += f"1h {token['chg_1h']:>+11.1f}%\n"
if token.get('sm', 0) > 0:
msg += f" {token['sm']:>12d}\n"
msg += f" {token['age_h']:>10.1f}h\n"
msg += f"```"
links = []
if (desc_info or {}).get('twitter'):
links.append(f"\nTwitter: {desc_info['twitter']}")
if (desc_info or {}).get('telegram'):
links.append(f"TG: {desc_info['telegram']}")
if links:
msg += '\n'.join(links)
return msg
# ============================================================
#
# ============================================================
def scan_narratives():
""""""
conn = init_db()
tokens = fetch_new_tokens()
log(f" {len(tokens)} ...")
# === — , ===
# FLAP
flap_tokens = []
try:
flap_tokens = fetch_flap_tokens()
except:
pass
all_momentum_tokens = tokens + flap_tokens
momentum_alerts = track_momentum(all_momentum_tokens)
for token in tokens:
addr = token['address']
chain = token['chain']
name = token['name']
symbol = token['symbol']
# — seen_countnarrativestoken_count,
if is_token_seen(conn, addr):
# seen_count
c = conn.cursor()
c.execute('UPDATE tokens_seen SET seen_count = seen_count + 1, market_cap = ? WHERE address = ?', (token['mc'], addr))
# narrativestoken_count()
theme_tmp = normalize_theme(name, symbol)
if theme_tmp:
c.execute('UPDATE narratives SET token_count = token_count + 1, last_seen_at = ? WHERE theme = ?', (int(time.time()), theme_tmp))
conn.commit()
continue
#
category, matched_kw = classify_narrative(name, symbol, chain)
if category == 'spam':
record_token(conn, addr, chain, name, symbol, '', 'spam', token['mc'])
continue
# ()
min_mc = 1000
min_liq = 500
if token['mc'] < min_mc or token['liq'] < min_liq:
record_token(conn, addr, chain, name, symbol, '', 'too_small', token['mc'])
continue
theme = normalize_theme(name, symbol)
# , —
record_token(conn, addr, chain, name, symbol, theme, category, token['mc'])
check_narrative_novelty(conn, theme, name, symbol, addr, chain)
conn.close()
# === ===
pushed = 0
for ma in momentum_alerts[:8]: # 8
if tg_send(ma['msg']):
pushed += 1
time.sleep(1) # TG
return pushed, len(momentum_alerts)
# ============================================================
#
# ============================================================
def main():
log("=" * 50)
log(" v1 ")
log(f": {SCAN_INTERVAL}s")
log(f": — ,")
log("=" * 50)
# DB
init_db()
#
tg_send(
" v1 \n\n"
": \n"
"3+>5%\n"
":\n"
"★★★ / | /CZ | FLAP\n"
"★★ | FLAP | \n"
"★ \n\n"
f": {SCAN_INTERVAL}"
)
scan_count = 0
total_pushed = 0
while True:
try:
scan_count += 1
pushed, found = scan_narratives()
total_pushed += pushed
if pushed > 0:
log(f"{scan_count}: {found}, {pushed} ({total_pushed})")
else:
if scan_count % 20 == 0: # 20
log(f"{scan_count}: ({total_pushed})")
except Exception as e:
log(f": {e}")
time.sleep(SCAN_INTERVAL)
if __name__ == '__main__':
main()
Bộ quét OI + Funding Rate #
Ngày: 2026.04.25 Thẻ: Python · Binance Futures · Telegram
Phát hiện funding rate đổi dấu + OI tăng mạnh
Scanner dựa trên snapshot: phát hiện funding rate chuyển từ dương sang âm trong khi OI đang tăng. Chạy mỗi 5 phút.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
OI +
-
- : OI(4, >8%) +
- : 24
- API
"""
import requests
import json
import os
import time
import sys
from datetime import datetime, timedelta
from pathlib import Path
# ============ ============
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env.oi"
ALERT_HISTORY_FILE = SCRIPT_DIR / "oi_funding_alerts.json"
FR_SNAPSHOT_FILE = SCRIPT_DIR / "fr_snapshot.json" #
#
MIN_OI_CHANGE_PCT = 8 # OI8%
MIN_VOLUME_USDT = 0 # ,
MIN_FR_PERIODS_POSITIVE = 2 # 2
DEDUP_HOURS = 24 # 24
# ============ TG ============
def load_env():
env = {}
if ENV_FILE.exists():
for line in ENV_FILE.read_text().strip().split('\n'):
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k.strip()] = v.strip()
return env
env = load_env()
TG_BOT_TOKEN = env.get('TG_BOT_TOKEN', '')
TG_CHAT_ID = env.get('TG_CHAT_ID', '')
# ============ TG ============
def send_tg(text):
if not TG_BOT_TOKEN or not TG_CHAT_ID:
print("[TG] , :")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
# (TG4096)
chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
for chunk in chunks:
try:
resp = requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk,
'parse_mode': 'Markdown'
}, timeout=10)
if resp.status_code != 200:
# fallback
requests.post(url, json={
'chat_id': TG_CHAT_ID,
'text': chunk
}, timeout=10)
except Exception as e:
print(f"[TG] : {e}")
# ============ ============
def load_alert_history():
if ALERT_HISTORY_FILE.exists():
try:
return json.loads(ALERT_HISTORY_FILE.read_text())
except:
return {}
return {}
def save_alert_history(history):
ALERT_HISTORY_FILE.write_text(json.dumps(history))
def is_duplicate(symbol, history):
if symbol not in history:
return False
last_alert = datetime.fromisoformat(history[symbol])
return (datetime.now() - last_alert).total_seconds() < DEDUP_HOURS * 3600
def mark_alerted(symbol, history):
history[symbol] = datetime.now().isoformat()
#
cutoff = datetime.now() - timedelta(hours=DEDUP_HOURS * 2)
history = {k: v for k, v in history.items()
if datetime.fromisoformat(v) > cutoff}
return history
# ============ ============
def load_fr_snapshot():
if FR_SNAPSHOT_FILE.exists():
try:
return json.loads(FR_SNAPSHOT_FILE.read_text())
except:
pass
return {}
def save_fr_snapshot(snapshot):
FR_SNAPSHOT_FILE.write_text(json.dumps(snapshot))
# ============ ============
def scan():
ts_start = time.time()
# 1.
try:
info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo', timeout=10).json()
symbols = [s['symbol'] for s in info['symbols']
if s['contractType'] == 'PERPETUAL' and s['quoteAsset'] == 'USDT' and s['status'] == 'TRADING']
except Exception as e:
print(f"[ERROR] exchangeInfo: {e}")
return []
# 2. 24h()
try:
tickers = requests.get('https://fapi.binance.com/fapi/v1/ticker/24hr', timeout=10).json()
ticker_map = {t['symbol']: t for t in tickers}
except Exception as e:
print(f"[ERROR] ticker: {e}")
return []
active = [s for s in symbols if float(ticker_map.get(s, {}).get('quoteVolume', 0)) > MIN_VOLUME_USDT]
# 3. ()
try:
fr_all = requests.get('https://fapi.binance.com/fapi/v1/premiumIndex', timeout=10).json()
fr_current = {item['symbol']: float(item['lastFundingRate']) for item in fr_all}
except:
fr_current = {}
# 4. ,""
prev_snapshot = load_fr_snapshot()
# ()
save_fr_snapshot(fr_current)
if not prev_snapshot:
print(f"[{datetime.now().strftime('%H:%M:%S')}] ,,")
return []
# : >=0, <0
just_turned_negative = []
for sym in active:
prev_fr = prev_snapshot.get(sym)
curr_fr = fr_current.get(sym)
if prev_fr is None or curr_fr is None:
continue
if prev_fr >= 0 and curr_fr < 0:
just_turned_negative.append(sym)
if not just_turned_negative:
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] : {len(active)}/{elapsed:.1f}s, ")
return []
print(f"[{datetime.now().strftime('%H:%M:%S')}] {len(just_turned_negative)} : {just_turned_negative}")
# 5. OI
signals = []
for sym in just_turned_negative:
try:
# OI
oi_hist = requests.get('https://fapi.binance.com/futures/data/openInterestHist',
params={'symbol': sym, 'period': '1h', 'limit': 48}, timeout=10).json()
oi_chg = 0
segs = []
oi_rising = False
if oi_hist and len(oi_hist) >= 12:
oi_values = [float(x['sumOpenInterestValue']) for x in oi_hist]
seg_len = len(oi_values) // 4
if seg_len >= 3:
segs = [
sum(oi_values[:seg_len]) / seg_len,
sum(oi_values[seg_len:seg_len*2]) / seg_len,
sum(oi_values[seg_len*2:seg_len*3]) / seg_len,
sum(oi_values[seg_len*3:]) / max(1, len(oi_values[seg_len*3:]))
]
oi_chg = (segs[3] - segs[0]) / segs[0] * 100 if segs[0] > 0 else 0
oi_rising = oi_chg > 0
t = ticker_map.get(sym, {})
signals.append({
'symbol': sym,
'price': float(t.get('lastPrice', 0)),
'price_chg_24h': float(t.get('priceChangePercent', 0)),
'volume': float(t.get('quoteVolume', 0)),
'oi_change': oi_chg,
'oi_segments': segs,
'oi_rising': oi_rising,
'current_fr': fr_current.get(sym, 0),
'prev_fr': prev_snapshot.get(sym, 0),
})
except:
continue
elapsed = time.time() - ts_start
print(f"[{datetime.now().strftime('%H:%M:%S')}] : {len(active)}/{elapsed:.1f}s, : {len(signals)}")
return signals
# ============ ============
def get_square_discussion(coin):
""""""
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v4/friendly/pgc/content/queryByHashtag",
params={"hashtag": f"#{coin.lower()}", "pageIndex": 1, "pageSize": 1, "orderBy": "HOT"},
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.binance.com/en/square"},
timeout=8
)
if r.status_code == 200:
ht = r.json().get("data", {}).get("hashtag", {})
return ht.get("contentCount", 0), ht.get("viewCount", 0)
except:
pass
return 0, 0
def get_market_caps():
""""""
mcap = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap[name] = float(mc)
except:
pass
return mcap
def get_spot_symbols():
""""""
try:
info = requests.get("https://api.binance.com/api/v3/exchangeInfo", timeout=10).json()
return {s["baseAsset"] for s in info["symbols"]
if s["quoteAsset"] == "USDT" and s["status"] == "TRADING"}
except:
return set()
def fmt_mcap(v):
if v >= 1e9: return f"${v/1e9:.2f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def fmt_views(v):
if v >= 1e6: return f"{v/1e6:.1f}M"
if v >= 1e3: return f"{v/1e3:.0f}K"
return str(v)
# ============ ============
def format_alert(signals):
if not signals:
return None
# OI,
signals.sort(key=lambda x: (-int(x.get('oi_rising', False)), x['current_fr']))
#
mcap_map = get_market_caps()
spot_set = get_spot_symbols()
now = datetime.now().strftime('%m-%d %H:%M')
lines = [f"*[ +OI ]* {now}\n"]
for s in signals:
coin = s['symbol'].replace('USDT', '')
# : →
fr_change = f"{s['prev_fr']:+.4%} -> {s['current_fr']:+.4%}"
#
mcap = mcap_map.get(coin, 0)
has_spot = coin in spot_set
sq_posts, sq_views = get_square_discussion(coin)
lines.append(f"```")
lines.append(f"{coin}")
lines.append(f" : {s['price']:.4f} 24h: {s['price_chg_24h']:+.1f}%")
lines.append(f" : {fr_change}")
if s['oi_segments']:
oi_segs = ' > '.join([f"{v/1e6:.1f}M" for v in s['oi_segments']])
lines.append(f" OI: +{s['oi_change']:.1f}% ({oi_segs})")
lines.append(f" : ${s['volume']/1e6:.1f}M")
lines.append(f" : {fmt_mcap(mcap) if mcap > 0 else ''} : {'' if has_spot else ''}")
if sq_posts > 0:
lines.append(f" : {sq_posts} / {fmt_views(sq_views)}")
else:
lines.append(f" : ")
lines.append(f"```")
return '\n'.join(lines)
# ============ ============
def main():
signals = scan()
if signals:
# : + OI ()
strong = [s for s in signals if s['current_fr'] < 0 and s.get('oi_rising')]
if strong:
msg = format_alert(strong)
if msg:
send_tg(msg)
print(f" {len(strong)} ({len(signals)}, {len(strong)}OI)")
else:
print(f" {len(signals)} OI, ")
else:
print(f" ")
if __name__ == '__main__':
main()
Accumulation Radar (Radar tích lũy) #
Ngày: 2026.04.25 Thẻ: Python · Binance · CoinGlass · Telegram
Động lượng + OI bất thường + Cảnh báo thông minh
Quét theo giờ: theo dõi động lượng các đồng tăng giá hàng đầu, phát hiện OI bất thường, đẩy Telegram. Python thuần, chi phí AI = 0.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
v2 — ++OI
():
1. → CG+=
2. =,
3. OI==
:→→→
:API + CoinGecko Trending()
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
from square_heat import get_square_heat
# === .env ===
env_file = Path(__file__).parent / ".env.oi"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
os.environ.setdefault(k.strip(), v.strip())
# === ===
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "YOUR_CHAT_ID")
FAPI = "https://fapi.binance.com"
# ()
HEAT_HISTORY_FILE = Path(__file__).parent / "heat_history.json"
#
VOL_SURGE_MULT = 2.5 # 2.5=
MIN_VOL_USD = 20_000_000 # >$20M
# OI
MIN_OI_DELTA_PCT = 3.0 # OI3%
MIN_OI_USD = 2_000_000 # OI $2M
def api_get(endpoint, params=None):
"""API"""
url = f"{FAPI}{endpoint}"
for attempt in range(3):
try:
resp = requests.get(url, params=params, timeout=10)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 429:
time.sleep(2)
else:
return None
except:
time.sleep(1)
return None
def format_usd(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.1f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def mcap_str(v):
if v >= 1e9: return f"${v/1e9:.1f}B"
if v >= 1e6: return f"${v/1e6:.0f}M"
if v >= 1e3: return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def send_telegram(text):
"""TG"""
if not TG_BOT_TOKEN:
print("\n[TG] No token, stdout:\n")
print(text)
return
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
# (TG4096)
chunks = []
current = ""
for line in text.split("\n"):
if len(current) + len(line) + 1 > 3800:
chunks.append(current)
current = line
else:
current += "\n" + line if current else line
if current:
chunks.append(current)
for chunk in chunks:
try:
resp = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk,
"parse_mode": "Markdown"
}, timeout=10)
if resp.status_code == 200:
print(f"[TG] Sent ✓ ({len(chunk)} chars)")
else:
# Markdown
resp2 = requests.post(url, json={
"chat_id": TG_CHAT_ID,
"text": chunk.replace("*", "").replace("_", ""),
}, timeout=10)
print(f"[TG] Sent plain ({'✓' if resp2.status_code == 200 else '✗'})")
except Exception as e:
print(f"[TG] Error: {e}")
time.sleep(0.5)
def main():
print(f"🔥 v2 — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# 1. +
tickers_raw = api_get("/fapi/v1/ticker/24hr")
premiums_raw = api_get("/fapi/v1/premiumIndex")
if not tickers_raw or not premiums_raw:
print("❌ API")
return
ticker_map = {}
for t in tickers_raw:
if t["symbol"].endswith("USDT"):
ticker_map[t["symbol"]] = {
"px_chg": float(t["priceChangePercent"]),
"vol": float(t["quoteVolume"]),
"price": float(t["lastPrice"]),
}
funding_map = {}
for p in premiums_raw:
if p["symbol"].endswith("USDT"):
funding_map[p["symbol"]] = float(p["lastFundingRate"])
# 2. (API)
mcap_map = {}
try:
r = requests.get(
"https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
timeout=10
)
if r.status_code == 200:
for item in r.json().get("data", []):
name = item.get("name", "")
mc = item.get("marketCap", 0)
if name and mc:
mcap_map[name] = float(mc)
print(f"✅ : {len(mcap_map)}")
except Exception as e:
print(f"⚠️ API: {e}")
# 3. : + CoinGecko Trending +
heat_map = {}
cg_trending = set()
square_trending = set()
# 3a. (6H)— !=
sq_coins = get_square_heat()
if sq_coins:
for i, c in enumerate(sq_coins):
coin = c["coin"]
square_trending.add(coin)
# ,
rank_score = max(50 - i * 4, 10)
if c.get("rapidRiser"):
rank_score += 15
heat_map[coin] = heat_map.get(coin, 0) + rank_score
print(f"🏦 : {len(square_trending)} {[c['coin'] for c in sq_coins[:5]]}")
# 3b. CoinGecko Trending
try:
r = requests.get("https://api.coingecko.com/api/v3/search/trending", timeout=10)
if r.status_code == 200:
for item in r.json().get("coins", []):
sym = item["item"]["symbol"].upper()
rank = item["item"].get("score", 99)
cg_trending.add(sym)
heat_map[sym] = heat_map.get(sym, 0) + max(50 - rank * 3, 10)
print(f"🌐 CG Trending: {len(cg_trending)}")
except Exception as e:
print(f"⚠️ CG Trending: {e}")
#
vol_surge_coins = set()
for sym, tk in ticker_map.items():
coin = sym.replace("USDT", "")
vol_24h = tk["vol"]
if vol_24h > MIN_VOL_USD:
kl = api_get("/fapi/v1/klines", {"symbol": sym, "interval": "1d", "limit": 8})
if kl and len(kl) >= 5:
avg_prev = sum(float(k[7]) for k in kl[:-1]) / (len(kl) - 1)
if avg_prev > 0:
ratio = vol_24h / avg_prev
if ratio >= VOL_SURGE_MULT:
vol_surge_coins.add(coin)
heat_map[coin] = heat_map.get(coin, 0) + min(ratio * 10, 50)
time.sleep(0.05)
print(f"📈 (≥{VOL_SURGE_MULT}x): {len(vol_surge_coins)}")
# /
dual_heat = cg_trending & vol_surge_coins
square_vol = square_trending & vol_surge_coins
triple_heat = cg_trending & vol_surge_coins & square_trending
all_multi_heat = dual_heat | square_vol
if all_multi_heat:
for coin in all_multi_heat:
heat_map[coin] = heat_map.get(coin, 0) + 20
if triple_heat:
for coin in triple_heat:
heat_map[coin] = heat_map.get(coin, 0) + 30 #
print(f"🔥🔥🔥 : {triple_heat}")
else:
print(f"🔥🔥 : {all_multi_heat}")
# 4. OI(Top100 + )
scan_syms = set()
#
for coin in heat_map:
sym = coin + "USDT"
if sym in ticker_map:
scan_syms.add(sym)
# Top100
top_by_vol = sorted(ticker_map.items(), key=lambda x: x[1]["vol"], reverse=True)[:100]
for sym, _ in top_by_vol:
scan_syms.add(sym)
oi_map = {}
for i, sym in enumerate(scan_syms):
oi_hist = api_get("/futures/data/openInterestHist", {
"symbol": sym, "period": "1h", "limit": 6
})
if oi_hist and len(oi_hist) >= 2:
curr = float(oi_hist[-1]["sumOpenInterestValue"])
prev_1h = float(oi_hist[-2]["sumOpenInterestValue"])
prev_6h = float(oi_hist[0]["sumOpenInterestValue"])
d1h = ((curr - prev_1h) / prev_1h * 100) if prev_1h > 0 else 0
d6h = ((curr - prev_6h) / prev_6h * 100) if prev_6h > 0 else 0
oi_map[sym] = {"oi_usd": curr, "d1h": d1h, "d6h": d6h}
if (i + 1) % 10 == 0:
time.sleep(0.5)
print(f"📊 OI: {len(oi_map)}")
# 5.
all_syms = set(list(ticker_map.keys()))
coin_data = {}
for sym in all_syms:
tk = ticker_map.get(sym, {})
if not tk:
continue
oi = oi_map.get(sym, {})
fr = funding_map.get(sym, 0)
coin = sym.replace("USDT", "")
d6h = oi.get("d6h", 0)
fr_pct = fr * 100
oi_usd = oi.get("oi_usd", 0)
# ,fallback
if coin in mcap_map:
est_mcap = mcap_map[coin]
else:
est_mcap = max(tk["vol"] * 0.3, oi_usd * 2) if oi_usd > 0 else tk["vol"] * 0.3
heat = heat_map.get(coin, 0)
coin_data[sym] = {
"coin": coin, "sym": sym,
"px_chg": tk["px_chg"], "vol": tk["vol"],
"fr_pct": fr_pct, "d6h": d6h,
"oi_usd": oi_usd, "est_mcap": est_mcap,
"heat": heat,
"in_cg": coin in cg_trending,
"in_sq": coin in square_trending,
"vol_surge": coin in vol_surge_coins,
}
# ═══════════════════════════════════════
#
# ═══════════════════════════════════════
hot_coins = sorted(
[d for d in coin_data.values() if d["heat"] > 0],
key=lambda x: x["heat"], reverse=True
)
#
heat_history = {}
if HEAT_HISTORY_FILE.exists():
try:
heat_history = json.loads(HEAT_HISTORY_FILE.read_text())
except:
pass
now_ts = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
new_entries = [] #
for s in hot_coins:
coin = s["coin"]
if coin not in heat_history:
# !
heat_history[coin] = {"first_seen": now_ts, "price": s.get("px_chg", 0)}
sources = []
if s["in_sq"]: sources.append("")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("")
new_entries.append({"coin": coin, "sources": sources, "data": s})
# 7()
cutoff = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=7)).strftime("%Y-%m-%d")
heat_history = {k: v for k, v in heat_history.items()
if v.get("first_seen", "9999") >= cutoff}
#
HEAT_HISTORY_FILE.write_text(json.dumps(heat_history, indent=2, ensure_ascii=False))
# ═══════════════════════════════════════
# :+
# ═══════════════════════════════════════
chase = []
for sym, d in coin_data.items():
if d["px_chg"] > 3 and d["fr_pct"] < -0.005 and d["vol"] > 1_000_000:
fr_hist = api_get("/fapi/v1/fundingRate", {"symbol": sym, "limit": 5})
fr_rates = [float(f["fundingRate"]) * 100 for f in fr_hist] if fr_hist else [d["fr_pct"]]
fr_prev = fr_rates[-2] if len(fr_rates) >= 2 else d["fr_pct"]
fr_delta = d["fr_pct"] - fr_prev
trend = "" if fr_delta < -0.05 else "" if fr_delta < -0.01 else "" if abs(fr_delta) < 0.01 else ""
chase.append({**d, "fr_delta": fr_delta, "trend": trend,
"rates": " → ".join([f"{x:.3f}" for x in fr_rates[-3:]])})
time.sleep(0.2)
chase.sort(key=lambda x: x["fr_pct"])
# ═══════════════════════════════════════
#
# ═══════════════════════════════════════
now = datetime.now(timezone(timedelta(hours=8)))
lines = [
f"****",
f"{now.strftime('%Y-%m-%d %H:%M')} CST",
]
# ()
#
if new_entries:
lines.append(f"\n**[ ]** ,")
tbl = ["```"]
tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for e in new_entries:
s = e["data"]
src_str = "/".join(e["sources"])
tbl.append(f"{s['coin']:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
if hot_coins:
lines.append(f"\n**[ ]**")
tbl = ["```"]
tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
for s in hot_coins[:10]:
sources = []
if s["in_sq"]: sources.append("")
if s["in_cg"]: sources.append("CG")
if s["vol_surge"]: sources.append("")
extra = []
if abs(s["d6h"]) >= 3: extra.append(f"OI{s['d6h']:+.0f}%")
if s["fr_pct"] < -0.03: extra.append(f"{s['fr_pct']:.2f}%")
src_str = "/".join(sources)
if extra:
src_str += " " + " ".join(extra)
coin_name = s['coin']
tbl.append(f"{coin_name:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append("\n**[ ]** ")
# ()
lines.append(f"\n**[ ]** +=")
if chase:
tbl = ["```"]
tbl.append(f"{'':<10} {'':>10} {'':>8} {'':>7} {'':>8}")
tbl.append(f"{'-'*10} {'-'*10} {'-'*8} {'-'*7} {'-'*8}")
for s in chase[:8]:
tbl.append(
f"{s['coin']:<10} {s['fr_pct']:>+9.3f}% {s['trend']:>8} {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
else:
lines.append(" ")
# OI()
oi_alerts = []
for sym, oi in oi_map.items():
if abs(oi["d6h"]) >= 8:
d = coin_data.get(sym)
if d and d["heat"] == 0:
oi_alerts.append(d)
oi_alerts.sort(key=lambda x: abs(x["d6h"]), reverse=True)
if oi_alerts:
lines.append(f"\n**[ OI ]** 6>=8%")
tbl = ["```"]
tbl.append(f"{'':<10} {'':>4} {'OI':>8} {'':>7} {'':>8}")
tbl.append(f"{'-'*10} {'-'*4} {'-'*8} {'-'*7} {'-'*8}")
for s in oi_alerts[:6]:
direction = "" if s["d6h"] > 0 else ""
tbl.append(
f"{s['coin']:<10} {direction:>4} {s['d6h']:>+7.1f}% {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
)
tbl.append("```")
lines.append("\n".join(tbl))
#
highlights = []
hot_oi = [d for d in coin_data.values() if d["heat"] > 0 and d["d6h"] > 5]
for s in sorted(hot_oi, key=lambda x: x["d6h"], reverse=True)[:3]:
highlights.append(f"{s['coin']} — +OI{s['d6h']:+.0f}%,")
hot_fuel = [d for d in coin_data.values() if d["heat"] > 0 and d["fr_pct"] < -0.03]
for s in sorted(hot_fuel, key=lambda x: x["fr_pct"])[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — +{s['fr_pct']:.2f}%,")
chase_fire = [s for s in chase[:5] if "" in s.get("trend", "")]
for s in chase_fire[:2]:
if s["coin"] not in " ".join(highlights):
highlights.append(f"{s['coin']} — {s['fr_pct']:.3f}%,")
if highlights:
lines.append(f"\n**[ ]**")
for h in highlights[:5]:
lines.append(f" {h}")
lines.append(f"\n= / CG=CoinGecko")
lines.append(f"=,")
report = "\n".join(lines)
send_telegram(report)
print("\n✅ ")
if __name__ == "__main__":
main()
Binance Alpha Monitor (Giám sát Binance Alpha) #
Ngày: 2026.04.23 Thẻ: Python · Binance · Claude AI · Telegram
WebSocket realtime · Phân tích AI · Đẩy Telegram
Tự động phát hiện token mới được niêm yết trên Binance Alpha, phân tích chất lượng token bằng Claude AI và gửi cảnh báo qua Telegram.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
Binance Alpha Monitor v2 —
REST + + + TG
API Key,AI()
: python3 alpha_monitor.py
"""
import asyncio
import hashlib
import json
import logging
import os
import re
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import httpx
# ============================================================
#
# ============================================================
BASE_DIR = Path(__file__).parent
DB_PATH = str(BASE_DIR / "data" / "alpha.db")
# TG
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")
# LLM (,,)
# hermes auth.jsoncredential pool(scanner)
def _load_anthropic_key():
"""auth.jsonAPI key"""
# 1. hermes auth.json credential pool
try:
auth_file = os.path.expanduser("~/.hermes/auth.json")
if os.path.exists(auth_file):
with open(auth_file) as f:
auth = json.load(f)
pool = auth.get("credential_pool", {}).get("anthropic", [])
if pool:
key = pool[0].get("access_token", "")
if key and key != "***":
return key
except Exception:
pass
# 2. fallback
return os.environ.get("ANTHROPIC_API_KEY", "")
ANTHROPIC_API_KEY = _load_anthropic_key()
ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")
#
ANNOUNCEMENT_POLL_INTERVAL = 30 # 30
AGGREGATION_POLL_INTERVAL = 15 # 15
MONITOR_POLL_INTERVAL = 120 # 2
# HTTP
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json",
"Accept-Language": "en-US,en;q=0.9",
}
BINANCE_ANNOUNCEMENT_API = "https://www.binance.com/bapi/composite/v1/public/cms/article/list/query"
# ============================================================
#
# ============================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("alpha")
# ============================================================
#
# ============================================================
# —
TRIGGER_KEYWORDS = [
"alpha", "", "airdrop", "tge", "token generation",
"", "will list", "will launch",
"", "exclusive", "binance wallet", "hodler",
]
# —
EXCLUDE_KEYWORDS = [
"delisting", "delist", "", "deprecate", "",
"maintenance", "",
"launchpool", "megadrop",
"buyback", "",
"", "",
"perpetual contract", #
"futures will launch", #
"usdⓢ-margined", # U
"coin-margined", #
"margin will add", #
"trading bots services", #
"trading pairs", # ()
]
# Alpha Box
ALPHA_BOX_KEYWORDS = ["alpha box", "", "mystery box"]
# ============================================================
# VC
# ============================================================
TIER1_VCS = [
"binance labs", "yzi labs",
"coinbase ventures", "a16z", "andreessen horowitz", "paradigm",
"polychain", "polychain capital", "sequoia", "sequoia china", "sequoia capital",
"multicoin", "multicoin capital", "pantera", "pantera capital",
"dragonfly", "dragonfly capital", "founders fund",
]
TIER2_VCS = [
"abcde", "iosg", "hashkey", "okx ventures",
"sevenx", "folius", "foresight", "hashed",
"bitkraft", "framework", "framework ventures",
"delphi", "delphi digital", "electric capital",
"variant", "1kx", "placeholder",
"animoca", "animoca brands", "jump", "jump crypto",
"hack vc", "bain capital",
]
#
HOT_NARRATIVES = ["defi_perp", "ai_agent", "ai_defi", "defai", "zk_proof"]
WEAK_NARRATIVES = ["gamefi", "meme", "social"]
#
BINANCE_DARLING_KEYWORDS = ["yzi labs", "binance labs"]
# ============================================================
#
# ============================================================
TIER_ICONS = {"S": "🟢🟢🟢", "A": "🟡🟡", "B": "🟠", "C": "⚪"}
TIER_LABELS = {"S": "S ()", "A": "A ()", "B": "B ()", "C": "C ()"}
def count_vc_tier(vcs: list, vc_list: list) -> int:
count = 0
vcs_lower = [v.lower() for v in vcs]
for t in vc_list:
if any(t in v for v in vcs_lower):
count += 1
return count
def rate_project(circ_mcap: float, fdv: float, vcs: list,
narrative: str, is_darling: bool) -> dict:
"""S/A/B/C """
t1 = count_vc_tier(vcs, TIER1_VCS)
t2 = count_vc_tier(vcs, TIER2_VCS)
hot = narrative in HOT_NARRATIVES
weak = narrative in WEAK_NARRATIVES
circ_mcap = circ_mcap or 0
fdv = fdv or 0
warnings = []
if weak:
warnings.append(f"⚠️ {narrative} ")
# S 5
if is_darling:
return {"tier": "S", "reason": "(YZi/Binance Labs/CZ)", "warnings": warnings}
if hot and t1 >= 1 and fdv < 500_000_000:
return {"tier": "S", "reason": f"({narrative})+ Tier1 VC", "warnings": warnings}
if t1 >= 2 and circ_mcap < 50_000_000 and fdv < 300_000_000:
return {"tier": "S", "reason": "≥2 Tier1 ", "warnings": warnings}
if t1 >= 1 and circ_mcap < 10_000_000 and fdv < 100_000_000:
return {"tier": "S", "reason": "Tier1 ", "warnings": warnings}
if hot and circ_mcap < 10_000_000 and fdv < 50_000_000:
return {"tier": "S", "reason": f"({narrative})", "warnings": warnings}
# A
if t1 >= 1 and circ_mcap < 20_000_000 and fdv < 200_000_000:
return {"tier": "A", "reason": "Tier1 ", "warnings": warnings}
# B
if circ_mcap < 50_000_000 and fdv < 500_000_000:
return {"tier": "B", "reason": "", "warnings": warnings}
return {"tier": "C", "reason": "/", "warnings": warnings}
# ============================================================
#
# ============================================================
def init_db():
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
symbol TEXT NOT NULL,
name TEXT,
launch_time TEXT,
source TEXT,
raw_text TEXT,
tier TEXT DEFAULT 'PENDING',
tier_reason TEXT,
narrative TEXT,
narrative_desc TEXT,
vcs_json TEXT DEFAULT '[]',
is_darling INTEGER DEFAULT 0,
open_price REAL,
total_supply REAL,
circulating_supply REAL,
fdv REAL,
circulating_mcap REAL,
excluded INTEGER DEFAULT 0,
exclude_reason TEXT,
discovered_at TEXT,
updated_at TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS pushes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
push_type TEXT,
sent_at TEXT,
content TEXT
)""")
c.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
price REAL,
circulating_mcap REAL,
fdv REAL
)""")
conn.commit()
conn.close()
def project_id(symbol: str, date_str: str) -> str:
return hashlib.md5(f"{symbol.upper()}_{date_str}".encode()).hexdigest()[:16]
def project_exists(pid: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute("SELECT 1 FROM projects WHERE id=?", (pid,)).fetchone() is not None
conn.close()
return exists
def save_project(project: dict):
conn = sqlite3.connect(DB_PATH)
now = datetime.utcnow().isoformat()
conn.execute("""
INSERT OR IGNORE INTO projects
(id, symbol, name, launch_time, source, raw_text, tier, tier_reason,
narrative, narrative_desc, vcs_json, is_darling,
open_price, total_supply, circulating_supply, fdv, circulating_mcap,
excluded, exclude_reason, discovered_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
project["id"], project["symbol"], project.get("name"),
project.get("launch_time"), project.get("source"), project.get("raw_text"),
project.get("tier", "PENDING"), project.get("tier_reason"),
project.get("narrative"), project.get("narrative_desc"),
json.dumps(project.get("vcs", [])), int(project.get("is_darling", False)),
project.get("open_price"), project.get("total_supply"),
project.get("circulating_supply"), project.get("fdv"),
project.get("circulating_mcap"),
int(project.get("excluded", 0)), project.get("exclude_reason"),
now, now,
))
conn.commit()
conn.close()
def update_project(pid: str, fields: dict):
if not fields:
return
conn = sqlite3.connect(DB_PATH)
fields["updated_at"] = datetime.utcnow().isoformat()
set_parts = [f"{k}=?" for k in fields]
values = list(fields.values()) + [pid]
conn.execute(f"UPDATE projects SET {','.join(set_parts)} WHERE id=?", values)
conn.commit()
conn.close()
def get_project(pid: str) -> Optional[dict]:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
row = conn.execute("SELECT * FROM projects WHERE id=?", (pid,)).fetchone()
conn.close()
return dict(row) if row else None
def list_pending() -> list:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute(
"SELECT * FROM projects WHERE excluded=0 AND tier='PENDING' ORDER BY discovered_at"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_active() -> list:
"""(PENDINGEXCLUDED,launch_time)"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT * FROM projects
WHERE excluded=0 AND launch_time IS NOT NULL AND launch_time != ''
AND tier NOT IN ('PENDING', 'EXCLUDED', 'ERROR')
""").fetchall()
conn.close()
return [dict(r) for r in rows]
def has_pushed(pid: str, push_type: str) -> bool:
conn = sqlite3.connect(DB_PATH)
exists = conn.execute(
"SELECT 1 FROM pushes WHERE project_id=? AND push_type=?", (pid, push_type)
).fetchone() is not None
conn.close()
return exists
def log_push(pid: str, push_type: str, content: str):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO pushes (project_id, push_type, sent_at, content) VALUES (?,?,?,?)",
(pid, push_type, datetime.utcnow().isoformat(), content)
)
conn.commit()
conn.close()
def save_snapshot(pid: str, price: float, mcap: float, fdv: float):
conn = sqlite3.connect(DB_PATH)
conn.execute(
"INSERT INTO snapshots (project_id, timestamp, price, circulating_mcap, fdv) VALUES (?,?,?,?,?)",
(pid, datetime.utcnow().isoformat(), price, mcap, fdv)
)
conn.commit()
conn.close()
# ============================================================
# TG
# ============================================================
async def send_tg(text: str, silent: bool = False) -> bool:
if not TG_BOT_TOKEN or not TG_CHAT_ID:
logger.error("TG_BOT_TOKEN TG_CHAT_ID ")
return False
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_notification": silent,
}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.post(url, json=payload)
if resp.status_code != 200:
logger.error(f"TG {resp.status_code}: {resp.text[:200]}")
return False
return True
except Exception as e:
logger.error(f"TG: {e}")
return False
# ============================================================
#
# ============================================================
def is_trigger(title: str) -> tuple[bool, Optional[str]]:
t = title.lower()
for kw in EXCLUDE_KEYWORDS:
if kw.lower() in t:
return False, f": {kw}"
for kw in ALPHA_BOX_KEYWORDS:
if kw.lower() in t:
return False, "Alpha Box "
for kw in TRIGGER_KEYWORDS:
if kw.lower() in t:
return True, None
return False, None
def extract_symbol(title: str) -> Optional[str]:
# : "Chip (CHIP)"
m = re.search(r"\(([A-Z0-9]{2,10})\)", title)
if m:
return m.group(1)
#
m = re.search(r"(([A-Z0-9]{2,10}))", title)
if m:
return m.group(1)
return None
def extract_name(title: str) -> Optional[str]:
patterns = [
r"(?:|List|list|Launch|launch|featured)\s+([A-Za-z0-9 ]+?)\s*[\((]",
]
for p in patterns:
m = re.search(p, title, re.IGNORECASE)
if m:
return m.group(1).strip()
return None
# ============================================================
#
# ============================================================
async def fetch_announcements(limit: int = 20) -> list:
""""""
all_articles = []
# 48: New Cryptocurrency Listing, 161: Latest Activities, 93: Latest News
for catalog_id in [48, 161, 93]:
params = {"type": 1, "catalogId": catalog_id, "pageNo": 1, "pageSize": limit}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
resp = await client.get(BINANCE_ANNOUNCEMENT_API, params=params)
resp.raise_for_status()
data = resp.json()
for catalog in data.get("data", {}).get("catalogs", []):
for a in catalog.get("articles", []):
a["_catalog_id"] = catalog_id
all_articles.append(a)
except Exception as e:
logger.warning(f" {catalog_id} : {e}")
#
seen = set()
unique = []
for a in all_articles:
code = a.get("code")
if code and code not in seen:
seen.add(code)
unique.append(a)
return unique
# ============================================================
# CoinGecko
# ============================================================
async def fetch_coingecko(symbol: str, project_name: str = "") -> dict:
"""CoinGecko。:symbol + """
result = {"found": False, "price": None, "fdv": None, "mcap": None,
"total_supply": None, "circ_supply": None, "chain": None, "contract": None}
try:
async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
# :symbol,
queries = [symbol]
if project_name and project_name.upper() != symbol.upper():
queries.append(project_name)
coin_id = None
best_rank = 999999
name_exact_match = None #
for query in queries:
resp = await client.get("https://api.coingecko.com/api/v3/search",
params={"query": query})
if resp.status_code != 200:
continue
coins = resp.json().get("coins", [])
for c in coins:
c_sym = c.get("symbol", "").upper()
c_name = c.get("name", "").lower()
c_rank = c.get("market_cap_rank") or 999999
# (,MegaETH -> megaeth)
if project_name and c_name == project_name.lower():
name_exact_match = c["id"]
# symbol — market_cap_rank()
if c_sym == symbol.upper():
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
# (MegaETHmega)
if project_name and project_name.lower() in c_name:
if c_rank < best_rank:
coin_id = c["id"]
best_rank = c_rank
# >
if name_exact_match:
coin_id = name_exact_match
if not coin_id:
logger.info(f"CoinGecko {symbol}/{project_name}")
return result
logger.info(f"CoinGecko: {symbol} -> {coin_id} (rank={best_rank})")
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code == 429:
await asyncio.sleep(5)
resp2 = await client.get(
f"https://api.coingecko.com/api/v3/coins/{coin_id}",
params={"localization": "false", "tickers": "false",
"market_data": "true", "community_data": "false",
"developer_data": "false"}
)
if resp2.status_code != 200:
return result
d = resp2.json()
md = d.get("market_data", {})
result.update({
"found": True,
"price": (md.get("current_price") or {}).get("usd"),
"fdv": (md.get("fully_diluted_valuation") or {}).get("usd"),
"mcap": (md.get("market_cap") or {}).get("usd"),
"total_supply": md.get("total_supply"),
"circ_supply": md.get("circulating_supply"),
})
# categories(VC"YZi Labs Portfolio")description
result["categories"] = d.get("categories", [])
result["description"] = (d.get("description") or {}).get("en", "")[:500]
platforms = d.get("platforms", {})
for chain, addr in platforms.items():
if addr:
result["chain"] = chain
result["contract"] = addr
break
# CoinGecko(),/
if not result["price"]:
binance_price = await _fetch_binance_price(symbol, client)
if binance_price:
result["price"] = binance_price
ts = result.get("total_supply") or 0
cs = result.get("circ_supply") or 0
if ts > 0:
result["fdv"] = binance_price * ts
if cs > 0:
result["mcap"] = binance_price * cs
logger.info(f" {symbol}: ${binance_price}, FDV=${result.get('fdv',0):,.0f}")
except Exception as e:
logger.warning(f"CoinGecko {symbol}: {e}")
return result
async def _fetch_binance_price(symbol: str, client: httpx.AsyncClient) -> float:
""""""
pair = f"{symbol.upper()}USDT"
# 1.
try:
resp = await client.get(f"https://api.binance.com/api/v3/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
# 2.
try:
resp = await client.get(f"https://fapi.binance.com/fapi/v1/ticker/price",
params={"symbol": pair})
if resp.status_code == 200:
return float(resp.json()["price"])
except Exception:
pass
return 0.0
# ============================================================
# LLM (,)
# ============================================================
async def llm_extract(raw_text: str, symbol: str, name: str = "", cg_data: dict = None) -> dict:
"""LLM+CoinGecko/VC/"""
fallback = {
"narrative": "unknown", "narrative_desc": "",
"vcs": [], "is_darling": False, "exclude_reason": None,
}
# CoinGecko categories
cg_data = cg_data or {}
categories = cg_data.get("categories", [])
description = cg_data.get("description", "")
# (categories)
darling_cats = [c for c in categories if any(kw in c.lower() for kw in ["yzi labs", "binance labs"])]
if darling_cats:
fallback["is_darling"] = True
if not ANTHROPIC_API_KEY:
# :+categories
t = raw_text.lower()
for kw in BINANCE_DARLING_KEYWORDS:
if kw in t:
fallback["is_darling"] = True
# categories
cat_str = " ".join(categories).lower()
if "defi" in cat_str: fallback["narrative"] = "defi"
elif "ai" in cat_str: fallback["narrative"] = "ai_agent"
elif "gaming" in cat_str or "gamefi" in cat_str: fallback["narrative"] = "gamefi"
elif "meme" in cat_str: fallback["narrative"] = "meme"
elif "rwa" in cat_str or "real world" in cat_str: fallback["narrative"] = "rwa"
return fallback
#
extra_context = ""
if categories:
extra_context += f"\nCoinGecko: {', '.join(categories)}"
if description:
extra_context += f"\n: {description[:300]}"
if cg_data.get("found"):
extra_context += f"\n: FDV=${cg_data.get('fdv',0):,.0f}, MCap=${cg_data.get('mcap',0):,.0f}, =${cg_data.get('price',0)}"
if cg_data.get("chain"):
extra_context += f", ={cg_data['chain']}"
system = ",。JSON,。"
user = f""":
: {symbol}, : {name or ""}
: {raw_text}
{extra_context}
JSON:
{{
"narrative": "defi_perp|ai_agent|ai_defi|defai|zk_proof|infra|defi|rwa|gamefi|meme|social|stablecoin|unknown",
"narrative_desc": "、",
"vcs": ["CoinGecko"],
"is_darling": true/false,
"exclude_reason": null|"already_tge"|"meme_only"
}}
:
- narrative:
- vcs: CoinGecko "XXX Portfolio" XXX
- is_darling: YZi Labs/Binance Labs CZ/ true
- exclude_reason: CEX(Coinbase/OKX/Bybit)3"already_tge"。DEX,already_tge。CoinGeckoalready_tge。meme"meme_only"
"""
try:
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(
f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": ANTHROPIC_MODEL,
"max_tokens": 800,
"temperature": 0,
"system": system,
"messages": [{"role": "user", "content": user}],
}
)
if resp.status_code != 200:
logger.warning(f"LLM {resp.status_code}")
return fallback
data = resp.json()
text = ""
for block in data.get("content", []):
if block.get("type") == "text":
text = block.get("text", "")
break
text = text.strip()
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(lines[1:-1])
return json.loads(text)
except Exception as e:
logger.warning(f"LLM: {e}")
return fallback
# ============================================================
#
# ============================================================
def _fmt_mcap(v):
if not v:
return "N/A"
if v >= 1e9:
return f"${v/1e9:.1f}B"
if v >= 1e6:
return f"${v/1e6:.1f}M"
if v >= 1e3:
return f"${v/1e3:.0f}K"
return f"${v:.0f}"
def _fmt_price(v):
if not v:
return "N/A"
if v >= 1:
return f"${v:.2f}"
if v >= 0.01:
return f"${v:.4f}"
return f"${v:.6f}"
def fmt_discovery(p: dict) -> str:
tier = p.get("tier", "C")
icon = TIER_ICONS.get(tier, "⚪")
label = TIER_LABELS.get(tier, "")
symbol = p["symbol"]
name = p.get("name") or ""
vcs = json.loads(p.get("vcs_json", "[]")) if isinstance(p.get("vcs_json"), str) else p.get("vcs", [])
lines = [
f"{icon} <b>Alpha · ${symbol}</b> {icon}",
f"📋 {label}",
"",
f"<b>{name}</b>" if name else "",
]
if p.get("narrative_desc"):
lines.append(f"💡 {p['narrative_desc']}")
if p.get("narrative") and p["narrative"] != "unknown":
lines.append(f"🏷 : {p['narrative']}")
lines.append("")
if p.get("fdv"):
lines.append(f"📊 FDV: {_fmt_mcap(p['fdv'])}")
if p.get("circulating_mcap"):
lines.append(f"📊 : {_fmt_mcap(p['circulating_mcap'])}")
if p.get("open_price"):
lines.append(f"💰 : {_fmt_price(p['open_price'])}")
if p.get("total_supply") and p.get("circulating_supply"):
pct = p["circulating_supply"] / p["total_supply"] * 100
lines.append(f"📦 : {pct:.1f}%")
if vcs:
lines.append("")
lines.append("🏛 <b></b>")
for v in vcs[:5]:
is_t1 = any(t in v.lower() for t in TIER1_VCS)
lines.append(f" {'⭐' if is_t1 else '·'} {v}")
if p.get("is_darling"):
lines.append("")
lines.append("🔥 <b></b>")
if p.get("tier_reason"):
lines.append("")
lines.append(f"🎯 {p['tier_reason']}")
lines.append("")
lines.append(f"<i>📌 : {p.get('source', 'binance')}</i>")
if p.get("raw_text"):
lines.append(f"<i>{p['raw_text'][:120]}</i>")
return "\n".join(l for l in lines if l is not None)
def fmt_countdown(p: dict, minutes: int) -> str:
icon = TIER_ICONS.get(p.get("tier", "C"), "⚪")
t = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
lines = [
f"{icon} <b></b>",
f"<b>${p['symbol']}</b> · {p.get('name', '')}",
f"⏰ <b>{t}</b>",
]
if p.get("fdv"):
lines.append(f"FDV: {_fmt_mcap(p['fdv'])}")
if minutes <= 30:
lines.append("🔔 <b></b>")
return "\n".join(lines)
def fmt_launch(p: dict, price: float, mcap: float, fdv: float) -> str:
lines = [
f"🚀 <b>${p['symbol']} </b>",
f": <b>{_fmt_price(price)}</b>",
f": <b>{_fmt_mcap(mcap)}</b>",
f"FDV: <b>{_fmt_mcap(fdv)}</b>",
]
return "\n".join(lines)
def fmt_periodic(p: dict, idx: int, price: float, mcap: float, change_pct: float) -> str:
arrow = "📈" if change_pct > 0 else "📉"
minutes = 30 * idx
lines = [
f"⏱ <b>${p['symbol']} · +{minutes}min</b>",
f": {_fmt_mcap(mcap)} ({arrow} {change_pct:+.1f}%)",
f": {_fmt_price(price)}",
]
if change_pct >= 100:
lines.append("💡 <b>,</b>")
elif change_pct <= -30:
lines.append("⚠️ ,")
return "\n".join(lines)
def fmt_anomaly(p: dict, atype: str, price: float, change_pct: float) -> str:
emoji = {"double": "🚀", "halve": "🔻"}.get(atype, "⚡")
desc = {"double": "", "halve": ""}.get(atype, "")
return f"{emoji} <b>${p['symbol']} {desc}</b>\n: {change_pct:+.1f}%\n: {_fmt_price(price)}"
# ============================================================
# :
# ============================================================
async def announcement_listener():
""",Alpha"""
logger.info(f"📡 · {ANNOUNCEMENT_POLL_INTERVAL}s")
while True:
try:
articles = await fetch_announcements()
new_count = 0
for art in articles:
title = art.get("title", "")
triggered, reason = is_trigger(title)
if not triggered:
continue
symbol = extract_symbol(title)
if not symbol:
continue
#
release_ts = art.get("releaseDate")
release_iso = datetime.fromtimestamp(release_ts / 1000).isoformat() if release_ts else ""
launch_date = release_iso[:10] if release_iso else datetime.utcnow().date().isoformat()
pid = project_id(symbol, launch_date)
if project_exists(pid):
continue
project = {
"id": pid,
"symbol": symbol,
"name": extract_name(title),
"launch_time": release_iso,
"source": "binance_announcement",
"raw_text": title,
"tier": "PENDING",
"vcs": [],
"is_darling": False,
"excluded": 0,
}
save_project(project)
new_count += 1
logger.info(f"🆕 ${symbol}: {title[:80]}")
if new_count:
logger.info(f" {new_count} ")
except Exception as e:
logger.error(f": {e}", exc_info=True)
await asyncio.sleep(ANNOUNCEMENT_POLL_INTERVAL)
# ============================================================
# : +
# ============================================================
async def aggregation_worker():
"""PENDING、、"""
logger.info(f"🧠 · {AGGREGATION_POLL_INTERVAL}s")
while True:
try:
pending = list_pending()
for p in pending:
symbol = p["symbol"]
try:
logger.info(f"📦 ${symbol}")
# 1. CoinGecko
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
await asyncio.sleep(1) #
# 2. LLM(CoinGecko)
llm = await llm_extract(p.get("raw_text", ""), symbol, p.get("name"), cg_data=cg)
await asyncio.sleep(1)
# 3.
if llm.get("exclude_reason") in ("already_tge", "meme_only"):
update_project(p["id"], {
"excluded": 1,
"exclude_reason": llm["exclude_reason"],
"tier": "EXCLUDED",
})
logger.info(f"⏭ ${symbol} : {llm['exclude_reason']}")
continue
# 4. — CoinGecko
cg_fdv = cg.get("fdv", 0) or 0
cg_mcap = cg.get("mcap", 0) or 0
data_suspect = False
data_warnings = []
# FDV$100K(CoinGecko)
if cg.get("found") and cg_fdv > 0 and cg_fdv < 100_000:
data_suspect = True
data_warnings.append(f"FDV=${cg_fdv:,.0f},CoinGecko")
logger.warning(f"⚠️ {symbol} FDV=${cg_fdv:,.0f} ,")
# 100%(100%)
if (cg.get("circ_supply") and cg.get("total_supply")
and cg["circ_supply"] == cg["total_supply"]
and cg_fdv < 1_000_000):
data_suspect = True
data_warnings.append("=FDV,")
if data_suspect:
# ,CoinGeckoFDV/MCap
cg_fdv = 0
cg_mcap = 0
logger.warning(f"⚠️ {symbol} CoinGecko,LLM")
# 5.
is_darling = llm.get("is_darling", False)
vcs = llm.get("vcs", [])
narrative = llm.get("narrative", "unknown")
rating = rate_project(
cg_mcap, cg_fdv,
vcs, narrative, is_darling
)
# 6. DB(data_suspectCoinGecko)
update_project(p["id"], {
"tier": rating["tier"],
"tier_reason": rating["reason"],
"narrative": narrative,
"narrative_desc": llm.get("narrative_desc", ""),
"vcs_json": json.dumps(vcs),
"is_darling": int(is_darling),
"open_price": cg.get("price") if not data_suspect else None,
"total_supply": cg.get("total_supply") if not data_suspect else None,
"circulating_supply": cg.get("circ_supply") if not data_suspect else None,
"fdv": cg_fdv if cg_fdv else None,
"circulating_mcap": cg_mcap if cg_mcap else None,
})
# 6. discovery
full = get_project(p["id"])
if full and not has_pushed(p["id"], "discovery"):
text = fmt_discovery(full)
silent = rating["tier"] in ("B", "C")
ok = await send_tg(text, silent=silent)
if ok:
log_push(p["id"], "discovery", text)
logger.info(f"✅ ${symbol} [{rating['tier']}]")
except Exception as e:
logger.error(f" {symbol} : {e}", exc_info=True)
update_project(p["id"], {"tier": "ERROR", "tier_reason": str(e)[:100]})
except Exception as e:
logger.error(f": {e}", exc_info=True)
await asyncio.sleep(AGGREGATION_POLL_INTERVAL)
# ============================================================
# :
# ============================================================
async def post_launch_monitor():
""" + + 30min×4 + """
logger.info(f"📊 · {MONITOR_POLL_INTERVAL}s")
while True:
try:
projects = list_active()
for p in projects:
try:
await _monitor_project(p)
except Exception as e:
logger.error(f" {p['symbol']} : {e}")
except Exception as e:
logger.error(f": {e}", exc_info=True)
await asyncio.sleep(MONITOR_POLL_INTERVAL)
async def _monitor_project(p: dict):
pid = p["id"]
symbol = p["symbol"]
launch_str = p.get("launch_time", "")
if not launch_str:
return
try:
launch = datetime.fromisoformat(launch_str.replace("Z", "").split("+")[0])
except:
return
now = datetime.utcnow()
delta_sec = (launch - now).total_seconds()
# T-3h
if 3*3600 - 300 <= delta_sec <= 3*3600 + 300:
if not has_pushed(pid, "t_minus_3h"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=p.get("tier") in ("B", "C"))
if ok:
log_push(pid, "t_minus_3h", text)
# T-30m
elif 30*60 - 150 <= delta_sec <= 30*60 + 150:
if not has_pushed(pid, "t_minus_30m"):
text = fmt_countdown(p, int(delta_sec / 60))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "t_minus_30m", text)
#
elif -300 <= delta_sec <= 0:
if not has_pushed(pid, "at_launch"):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
text = fmt_launch(p, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
ok = await send_tg(text, silent=False)
if ok:
log_push(pid, "at_launch", text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
# 30min × 4
elif 0 < -delta_sec <= 2.5 * 3600:
minutes_after = int(-delta_sec / 60)
for idx, target in enumerate([30, 60, 90, 120], 1):
if abs(minutes_after - target) <= 5:
ptype = f"post_30m_{idx}"
if not has_pushed(pid, ptype):
cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
if cg.get("price"):
open_price = p.get("open_price") or cg["price"]
change = ((cg["price"] - open_price) / open_price * 100) if open_price else 0
text = fmt_periodic(p, idx, cg["price"], cg.get("mcap", 0), change)
ok = await send_tg(text, silent=p.get("tier") in ("B", "C") and idx > 1)
if ok:
log_push(pid, ptype, text)
save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
#
if change >= 100 and not has_pushed(pid, "anomaly_double"):
t = fmt_anomaly(p, "double", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_double", t)
elif change <= -50 and not has_pushed(pid, "anomaly_halve"):
t = fmt_anomaly(p, "halve", cg["price"], change)
if await send_tg(t):
log_push(pid, "anomaly_halve", t)
break
# ============================================================
#
# ============================================================
async def main():
init_db()
logger.info(f"📂 : {DB_PATH}")
# TG
ok = await send_tg("🎉 <b>Alpha Monitor v2 </b>\n\n📡 ...\n🔔 Alpha")
if ok:
logger.info("✅ TG")
else:
logger.warning("⚠️ TG,")
tasks = [
asyncio.create_task(announcement_listener(), name="announcements"),
asyncio.create_task(aggregation_worker(), name="aggregator"),
asyncio.create_task(post_launch_monitor(), name="monitor"),
]
logger.info("=" * 50)
logger.info("🚀 Alpha Monitor v2 ")
logger.info(f" 📡 : {ANNOUNCEMENT_POLL_INTERVAL}s")
logger.info(f" 🧠 LLM: {'Sonnet' if ANTHROPIC_API_KEY else '()'}")
logger.info(f" 🔔 TG: {'✅' if TG_BOT_TOKEN else '❌'}")
logger.info("=" * 50)
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
for t in tasks:
t.cancel()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
AI Giao Dịch Tự Động #
Giao dịch tự động Futures + Alpha v1 #
Ngày: 2026.04.29 Thẻ: Python · Binance Futures · Autonomous · Telegram
AI quét thị trường → phân tích → giao dịch ảo → theo dõi → review — hoàn toàn tự động
⚠️ Cảnh báo rủi ro: Code này chỉ được kiểm tra trên giao dịch ảo/paper. Không có kinh nghiệm giao dịch thực và KHÔNG nên dùng làm cơ sở cho giao dịch thực. Chưa được xác thực với tiền thực — sử dụng tự chịu rủi ro.
Một AI tự động quét toàn bộ thị trường Binance Futures mỗi 30 giây, phát hiện bất thường và mở vị thế ảo. 4 chiến lược phát hiện tín hiệu (funding rate âm cực đoan → squeeze long, funding dương cực đoan → short, short sau khi pump, bật sau khi sập). Trước khi mở vị thế, chạy kiểm tra môi trường đa chiều (xu hướng BTC + Fear&Greed + OI attention + khối lượng) — cần ≥3/7 điểm. Tự động giám sát cắt lỗ/chốt lời mỗi 30 giây.
Kết quả hiện tại (công bố trung thực): 4 lệnh đã đóng, win rate 75%, +13.94U. Nhưng lợi nhuận tập trung ở một lệnh (IR short +45%), phần còn lại về cơ bản hòa vốn. Đa dạng hóa vị thế chưa đủ — có xu hướng chồng các lệnh cùng hướng cùng logic. Vẫn là chấm điểm dựa trên rule, chưa phải tư duy độc lập thực sự.
Lộ trình huấn luyện: scan → trade → close → review → tìm lỗi → cải thiện → lặp. Mục tiêu: tiến hóa từ trình thực thi rule thành trader có tư duy độc lập.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
-
PythonAI,
"""
import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_FILE = os.path.join(SCRIPT_DIR, "trades.json")
SCANNER_STATE = os.path.join(SCRIPT_DIR, "scanner_state.json")
SCANNER_LOG = os.path.join(SCRIPT_DIR, "scanner.log")
INITIAL_BALANCE = 100.0
TZ_UTC8 = timezone(timedelta(hours=8))
# === ===
MAX_OPEN_POSITIONS = 3 #
POSITION_PCT = 30 # %
LEVERAGE = 3 #
COOLDOWN_HOURS = 4 #
MIN_VOLUME_M = 10 # 24h(U)
# === TG ===
def load_tg_config():
"""Load TG config from environment variables or .env file"""
env = {}
# Try .env in script directory, then current directory
for env_path in [
os.path.join(SCRIPT_DIR, ".env"),
os.path.join(os.getcwd(), ".env"),
]:
if os.path.exists(env_path):
with open(env_path) as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
k, v = line.split('=', 1)
env[k] = v.strip().strip('"').strip("'")
break
# OS environment variables override file
for key in ['TG_BOT_TOKEN', 'TELEGRAM_BOT_TOKEN', 'TG_CHAT_ID']:
val = os.environ.get(key)
if val:
env[key] = val
return env
def send_tg(text):
try:
env = load_tg_config()
token = env.get('TG_BOT_TOKEN', env.get('TELEGRAM_BOT_TOKEN', ''))
if not token:
return
chat_id = env.get('TG_CHAT_ID', '')
if not chat_id:
return
url = f"https://api.telegram.org/bot{token}/sendMessage"
requests.post(url, json={
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown"
}, timeout=10)
except:
pass
# === ===
def load_trades():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"initial_balance": INITIAL_BALANCE, "trades": []}
def save_trades(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def load_state():
if os.path.exists(SCANNER_STATE):
with open(SCANNER_STATE, "r") as f:
return json.load(f)
return {"last_opens": {}, "signals_seen": {}}
def save_state(state):
with open(SCANNER_STATE, "w") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def get_balance(data):
balance = data.get("initial_balance", INITIAL_BALANCE)
for t in data["trades"]:
if t["status"] == "closed" and t["pnl_usd"] is not None:
balance += t["pnl_usd"]
return balance
def next_id(data):
if not data["trades"]:
return "001"
max_id = max(int(t["id"]) for t in data["trades"])
return f"{max_id + 1:03d}"
def now_str():
return datetime.now(TZ_UTC8).strftime("%Y-%m-%dT%H:%M:%S")
def log(msg):
ts = datetime.now(TZ_UTC8).strftime("%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(SCANNER_LOG, "a") as f:
f.write(line + "\n")
# === API ===
def get_all_tickers():
url = "https://fapi.binance.com/fapi/v1/ticker/24hr"
resp = requests.get(url, timeout=10)
return resp.json()
def get_funding_rates():
""""""
url = "https://fapi.binance.com/fapi/v1/premiumIndex"
resp = requests.get(url, timeout=10)
return {item['symbol']: float(item['lastFundingRate']) * 100
for item in resp.json()}
def get_funding_history(symbol, limit=8):
url = f"https://fapi.binance.com/fapi/v1/fundingRate?symbol={symbol}&limit={limit}"
resp = requests.get(url, timeout=10)
return [float(item['fundingRate']) * 100 for item in resp.json()]
def get_open_interest(symbol):
url = f"https://fapi.binance.com/fapi/v1/openInterest?symbol={symbol}"
resp = requests.get(url, timeout=10)
data = resp.json()
return float(data['openInterest'])
def get_klines(symbol, interval="4h", limit=6):
url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
resp = requests.get(url, timeout=10)
return resp.json()
# === ===
def detect_extreme_negative_funding(symbol, funding_rate, funding_rates_map):
"""
1: → ()
: <-0.08%
"""
if funding_rate >= -0.08:
return None
try:
history = get_funding_history(symbol, 8)
neg_count = sum(1 for r in history if r < -0.03)
if neg_count < 4:
return None
avg_rate = sum(history) / len(history)
# ,
strength = "S" if avg_rate < -0.15 else "A" if avg_rate < -0.10 else "B"
return {
"type": "extreme_neg_funding",
"direction": "long",
"strength": strength,
"reason": f" avg:{avg_rate:.4f}% {neg_count}/8 ",
"sl_pct": 0.08, # 8%
"tp_pct": 0.12, # 12%
}
except:
return None
def detect_extreme_positive_funding(symbol, funding_rate, funding_rates_map):
"""
2: → ()
: >0.10%
"""
if funding_rate <= 0.10:
return None
try:
history = get_funding_history(symbol, 8)
pos_count = sum(1 for r in history if r > 0.05)
if pos_count < 4:
return None
avg_rate = sum(history) / len(history)
strength = "S" if avg_rate > 0.20 else "A" if avg_rate > 0.12 else "B"
return {
"type": "extreme_pos_funding",
"direction": "short",
"strength": strength,
"reason": f" avg:{avg_rate:.4f}% {pos_count}/8 ",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
return None
def detect_crash_bounce(ticker):
"""
3: ()
: 24h>25% 4h/
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct >= -25:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
# 2K
recent_closes = [float(k[4]) for k in klines[-3:]]
# : K >=
if len(recent_closes) >= 2 and recent_closes[-1] >= recent_closes[-2]:
return {
"type": "crash_bounce",
"direction": "long",
"strength": "B", # B
"reason": f"24h{change_pct:.1f}% ",
"sl_pct": 0.10,
"tp_pct": 0.15,
}
except:
pass
return None
def detect_pump_short(ticker):
"""
4: (ATH)
: 24h>40% — ,>85%
()
"""
change_pct = float(ticker['priceChangePercent'])
if change_pct <= 40:
return None
symbol = ticker['symbol']
try:
klines = get_klines(symbol, "1h", 6)
highs = [float(k[2]) for k in klines]
closes = [float(k[4]) for k in klines]
current = closes[-1]
peak = max(highs)
# 10%
pullback = (peak - current) / peak * 100
if pullback < 10:
return None
strength = "A" if change_pct > 80 else "B"
return {
"type": "pump_short",
"direction": "short",
"strength": strength,
"reason": f"24h{change_pct:.1f}%{pullback:.1f}% >85%",
"sl_pct": 0.15, # ,
"tp_pct": 0.20,
}
except:
pass
return None
# === ===
def check_environment(symbol, signal):
"""
:,
(pass/fail, analysis_dict, adjusted_strength)
"""
analysis = {
"btc_env": "",
"sentiment": "",
"oi_check": "",
"volume_check": "",
"verdict": ""
}
score = 0 # ,>=3
try:
# 1. BTC — BTC,BTC
btc_url = "https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT"
btc = requests.get(btc_url, timeout=5).json()
btc_chg = float(btc['priceChangePercent'])
if signal["direction"] == "long":
if btc_chg > -2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
elif btc_chg < -5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
else: # short
if btc_chg < 2:
score += 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
elif btc_chg > 5:
score -= 1
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% -1"
else:
analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
# 2. (Fear & Greed)
try:
fng = requests.get("https://api.alternative.me/fng/", timeout=5).json()
fng_val = int(fng['data'][0]['value'])
if signal["direction"] == "long":
if fng_val <= 25:
score += 1
analysis["sentiment"] = f"FGI={fng_val} +1"
elif fng_val >= 75:
score -= 1
analysis["sentiment"] = f"FGI={fng_val} -1"
else:
analysis["sentiment"] = f"FGI={fng_val} 0"
else:
if fng_val >= 75:
score += 1
analysis["sentiment"] = f"FGI={fng_val} +1"
elif fng_val <= 25:
score -= 1
analysis["sentiment"] = f"FGI={fng_val} -1"
else:
analysis["sentiment"] = f"FGI={fng_val} 0"
except:
analysis["sentiment"] = "FGI 0"
# 3. OI — OI
try:
oi = get_open_interest(symbol)
ticker = requests.get(f"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol={symbol}", timeout=5).json()
price = float(ticker['lastPrice'])
oi_usd = oi * price
if oi_usd > 5_000_000: # OI > 5M
score += 1
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M +1"
else:
analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 0"
except:
analysis["oi_check"] = "OI 0"
# 4. —
try:
vol = float(ticker.get('quoteVolume', 0))
if vol > 50_000_000:
score += 1
analysis["volume_check"] = f"24h={vol/1e6:.0f}M +1"
elif vol > 20_000_000:
analysis["volume_check"] = f"24h={vol/1e6:.0f}M 0"
else:
score -= 1
analysis["volume_check"] = f"24h={vol/1e6:.0f}M -1"
except:
analysis["volume_check"] = " 0"
# 5.
if signal["strength"] == "S":
score += 2
elif signal["strength"] == "A":
score += 1
# : >=3
analysis["verdict"] = f":{score}/7"
if score >= 3:
return True, analysis, signal["strength"]
else:
return False, analysis, signal["strength"]
except Exception as e:
analysis["verdict"] = f":{e} "
return False, analysis, signal["strength"]
# === ===
def execute_open(data, state, symbol, price, signal):
""" — """
#
passed, env_analysis, strength = check_environment(symbol, signal)
env_summary = " | ".join(v for v in env_analysis.values() if v)
if not passed:
log(f" {symbol}: {env_summary}")
return
log(f" {symbol}: {env_summary}")
balance = get_balance(data)
position_usd = balance * POSITION_PCT / 100
if signal["direction"] == "long":
sl = round(price * (1 - signal["sl_pct"]), 6)
tp = round(price * (1 + signal["tp_pct"]), 6)
else:
sl = round(price * (1 + signal["sl_pct"]), 6)
tp = round(price * (1 - signal["tp_pct"]), 6)
trade = {
"id": next_id(data),
"symbol": symbol,
"direction": signal["direction"],
"leverage": LEVERAGE,
"position_pct": POSITION_PCT,
"position_usd": round(position_usd, 4),
"notional_usd": round(position_usd * LEVERAGE, 4),
"entry_price": price,
"stop_loss": sl,
"take_profit": tp,
"entry_time": now_str(),
"exit_price": None,
"exit_time": None,
"exit_reason": None,
"pnl_pct": None,
"pnl_usd": None,
"status": "open",
"pre_analysis": {
"btc_env": env_analysis.get("btc_env", ""),
"sentiment": env_analysis.get("sentiment", ""),
"oi": env_analysis.get("oi_check", ""),
"volume": env_analysis.get("volume_check", ""),
"key_reason": f"[{signal['strength']}] {signal['reason']}",
"risk": f":{env_analysis.get('verdict','')} :{signal['type']}"
},
"post_review": None
}
data["trades"].append(trade)
save_trades(data)
#
state["last_opens"][symbol] = now_str()
save_state(state)
direction_cn = "" if signal["direction"] == "long" else ""
msg = f"""```
[] #{trade['id']}
: {symbol}
: {direction_cn} {LEVERAGE}x
: {price}
: {sl}
: {tp}
: {position_usd:.2f}U
: [{signal['strength']}] {signal['reason']}
: {trade['entry_time']}
```"""
log(f" #{trade['id']} {symbol} {direction_cn} @ {price} | {signal['reason']}")
send_tg(msg)
print(msg)
# === ===
def swap_weakest(data, state, open_positions, new_signal, tickers):
"""S,,"""
ticker_map = {t['symbol']: float(t['lastPrice']) for t in tickers}
# %
worst_trade = None
worst_pnl = float('inf')
for t in open_positions:
price = ticker_map.get(t["symbol"])
if price is None:
continue
if t["direction"] == "long":
pnl_pct = (price - t["entry_price"]) / t["entry_price"] * 100
else:
pnl_pct = (t["entry_price"] - price) / t["entry_price"] * 100
if pnl_pct < worst_pnl:
worst_pnl = pnl_pct
worst_trade = t
worst_price = price
if worst_trade is None:
return
# ,
if worst_pnl > 0:
log(f", | : {new_signal['symbol']}")
return
#
if worst_trade["direction"] == "long":
pnl_pct_lev = (worst_price - worst_trade["entry_price"]) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
else:
pnl_pct_lev = (worst_trade["entry_price"] - worst_price) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
pos_usd = worst_trade.get("position_usd", worst_trade.get("position_pct", 30))
pnl_usd = round(pnl_pct_lev / 100 * pos_usd, 4)
worst_trade["exit_price"] = worst_price
worst_trade["exit_time"] = now_str()
worst_trade["exit_reason"] = f"→{new_signal['symbol']}"
worst_trade["pnl_pct"] = round(pnl_pct_lev, 2)
worst_trade["pnl_usd"] = pnl_usd
worst_trade["status"] = "closed"
save_trades(data)
direction_cn = "" if worst_trade["direction"] == "long" else ""
msg = f"""```
[] #{worst_trade['id']}
: {worst_trade['symbol']} {direction_cn}
: {worst_trade['entry_price']}
: {worst_price}
: {pnl_pct_lev:+.2f}% ({pnl_usd:+.2f}U)
: S{new_signal['symbol']}
```"""
log(f" #{worst_trade['id']} {worst_trade['symbol']} {pnl_usd:+.2f}U → {new_signal['symbol']}")
send_tg(msg)
#
execute_open(data, state, new_signal["symbol"], new_signal["price"], new_signal)
# === ===
def scan():
data = load_trades()
state = load_state()
now = datetime.now(TZ_UTC8)
#
open_positions = [t for t in data["trades"] if t["status"] == "open"]
open_symbols = set(t["symbol"] for t in open_positions)
if len(open_positions) >= MAX_OPEN_POSITIONS:
return
#
try:
tickers = get_all_tickers()
funding_rates = get_funding_rates()
except Exception as e:
log(f"API: {e}")
return
# USDT +
exclude = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "FDUSDUSDT", "BTCDOMUSDT", "BTCSTUSDT"}
candidates = [t for t in tickers
if t['symbol'].endswith('USDT')
and t['symbol'] not in exclude
and float(t['quoteVolume']) > MIN_VOLUME_M * 1e6]
signals = []
for ticker in candidates:
symbol = ticker['symbol']
#
if symbol in open_symbols:
continue
#
last_open = state.get("last_opens", {}).get(symbol)
if last_open:
try:
last_dt = datetime.fromisoformat(last_open)
if last_dt.tzinfo is None:
last_dt = last_dt.replace(tzinfo=TZ_UTC8)
if (now - last_dt).total_seconds() < COOLDOWN_HOURS * 3600:
continue
except:
pass
fr = funding_rates.get(symbol, 0)
#
for detect_fn in [
lambda s, f, m: detect_extreme_negative_funding(s, f, m),
lambda s, f, m: detect_extreme_positive_funding(s, f, m),
lambda s, f, m: detect_crash_bounce(ticker),
lambda s, f, m: detect_pump_short(ticker),
]:
try:
signal = detect_fn(symbol, fr, funding_rates)
if signal:
signal["symbol"] = symbol
signal["price"] = float(ticker['lastPrice'])
signal["volume_m"] = float(ticker['quoteVolume']) / 1e6
signals.append(signal)
except:
continue
if not signals:
return
# S>A>B
strength_order = {"S": 0, "A": 1, "B": 2}
signals.sort(key=lambda x: strength_order.get(x["strength"], 3))
# (1)
best = signals[0]
# B,SA
if best["strength"] == "B":
log(f"B: {best['symbol']} {best['reason']}")
return
slots = MAX_OPEN_POSITIONS - len(open_positions)
if slots > 0:
execute_open(data, state, best["symbol"], best["price"], best)
elif best["strength"] == "S":
# S →
swap_weakest(data, state, open_positions, best, tickers)
if __name__ == "__main__":
scan()
Công Cụ Tiện Ích #
VoiceKey — Xác thực giọng nói #
Ngày: 2026.04.27 Thẻ: Python · Security · Telegram · Speaker Verification
Bảo vệ AI agent của bạn bằng xác thực giọng nói
Xác minh người nói cho bảo mật bot Telegram. Sử dụng resemblyzer (mô hình GE2E) để trích xuất embedding giọng nói và so sánh độ tương đồng cosine. Đăng nhập bằng giọng nói thay cho mật khẩu — chỉ những người nói đã biết mới có thể đăng nhập. Phụ thuộc nhẹ, Python thuần.
Mã nguồn đầy đủ #
#!/usr/bin/env python3
"""
VoiceKey — (Voiceprint Authentication)
Speaker verification for Telegram bot security.
Uses resemblyzer (GE2E model) for speaker embedding extraction
and cosine similarity for verification.
Zero AI cost — runs entirely on local CPU.
Usage:
# Register voiceprint from audio files
python voicekey.py register --audio voice1.ogg voice2.ogg --owner "YourName"
# Verify a voice against stored voiceprint
python voicekey.py verify --audio test.ogg
# As a Python module
from voicekey import VoiceKey
vk = VoiceKey()
vk.register(["voice1.ogg", "voice2.ogg"], owner="YourName")
is_owner, score = vk.verify("test.ogg")
"""
import os
import json
import tempfile
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime
# Lazy imports to speed up module load when not needed
_encoder = None
def _get_encoder():
"""Lazy-load the voice encoder model (first call takes ~1s)."""
global _encoder
if _encoder is None:
from resemblyzer import VoiceEncoder
_encoder = VoiceEncoder()
return _encoder
def _audio_to_wav(audio_path: str) -> str:
"""Convert any audio format to 16kHz mono WAV for processing."""
from pydub import AudioSegment
ext = Path(audio_path).suffix.lower()
if ext in ('.ogg', '.oga'):
audio = AudioSegment.from_ogg(audio_path)
elif ext == '.mp3':
audio = AudioSegment.from_mp3(audio_path)
elif ext == '.wav':
return audio_path # already wav
elif ext in ('.m4a', '.aac'):
audio = AudioSegment.from_file(audio_path, format=ext.lstrip('.'))
else:
audio = AudioSegment.from_file(audio_path)
# Convert to 16kHz mono
audio = audio.set_frame_rate(16000).set_channels(1)
tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
audio.export(tmp.name, format='wav')
return tmp.name
def _extract_embedding(audio_path: str) -> np.ndarray:
"""Extract voice embedding from an audio file."""
from resemblyzer import preprocess_wav
encoder = _get_encoder()
wav_path = _audio_to_wav(audio_path)
wav = preprocess_wav(wav_path)
# Cleanup temp file
if wav_path != audio_path:
os.unlink(wav_path)
if len(wav) < 1600: # Less than 0.1s
raise ValueError(f"Audio too short: {len(wav)/16000:.1f}s (need >0.1s)")
return encoder.embed_utterance(wav)
class VoiceKey:
"""
Speaker verification using voice embeddings.
Attributes:
data_dir: Directory to store voiceprint data
threshold: Cosine similarity threshold for verification (default: 0.75)
voiceprint: The registered owner's voice embedding (256-dim vector)
"""
def __init__(self, data_dir: str = None, threshold: float = 0.75):
if data_dir is None:
data_dir = os.path.expanduser("~/.hermes/voiceprint")
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.threshold = threshold
self.voiceprint = None
self.metadata = {}
self._load()
def _load(self):
"""Load existing voiceprint if available."""
vp_path = self.data_dir / "voiceprint.npy"
meta_path = self.data_dir / "voiceprint_meta.json"
if vp_path.exists():
self.voiceprint = np.load(str(vp_path))
if meta_path.exists():
with open(meta_path) as f:
self.metadata = json.load(f)
self.threshold = self.metadata.get("threshold", self.threshold)
@property
def is_registered(self) -> bool:
"""Check if a voiceprint is registered."""
return self.voiceprint is not None
def register(self, audio_paths: list, owner: str = "owner") -> dict:
"""
Register a voiceprint from multiple audio samples.
Args:
audio_paths: List of audio file paths (ogg, mp3, wav, etc.)
owner: Name of the voiceprint owner
Returns:
dict with registration results
"""
embeddings = []
results = []
for path in audio_paths:
try:
embed = _extract_embedding(path)
embeddings.append(embed)
results.append({"file": os.path.basename(path), "status": "ok"})
except Exception as e:
results.append({"file": os.path.basename(path), "status": "error", "error": str(e)})
if not embeddings:
raise ValueError("No valid audio samples provided")
# Average embeddings and normalize
voiceprint = np.mean(embeddings, axis=0)
voiceprint = voiceprint / np.linalg.norm(voiceprint)
# Save
np.save(str(self.data_dir / "voiceprint.npy"), voiceprint)
self.metadata = {
"owner": owner,
"samples_used": len(embeddings),
"embedding_dim": int(voiceprint.shape[0]),
"threshold": self.threshold,
"created": datetime.now().isoformat(),
}
with open(self.data_dir / "voiceprint_meta.json", "w") as f:
json.dump(self.metadata, f, indent=2)
self.voiceprint = voiceprint
# Self-test
similarities = []
for embed in embeddings:
sim = float(np.dot(voiceprint, embed / np.linalg.norm(embed)))
similarities.append(sim)
return {
"owner": owner,
"samples": len(embeddings),
"self_test_scores": similarities,
"min_score": min(similarities),
"details": results,
}
def verify(self, audio_path: str) -> tuple:
"""
Verify if an audio sample matches the registered voiceprint.
Args:
audio_path: Path to audio file to verify
Returns:
tuple: (is_verified: bool, similarity_score: float)
"""
if not self.is_registered:
raise RuntimeError("No voiceprint registered. Call register() first.")
embed = _extract_embedding(audio_path)
embed = embed / np.linalg.norm(embed)
similarity = float(np.dot(self.voiceprint, embed))
is_verified = similarity >= self.threshold
return is_verified, similarity
def get_info(self) -> dict:
"""Get voiceprint registration info."""
if not self.is_registered:
return {"registered": False}
return {
"registered": True,
**self.metadata,
}
def main():
parser = argparse.ArgumentParser(
description="VoiceKey — Speaker verification for security"
)
sub = parser.add_subparsers(dest="command")
# Register
reg = sub.add_parser("register", help="Register voiceprint from audio files")
reg.add_argument("--audio", nargs="+", required=True, help="Audio files (ogg/mp3/wav)")
reg.add_argument("--owner", default="owner", help="Owner name")
reg.add_argument("--data-dir", default=None, help="Data directory")
reg.add_argument("--threshold", type=float, default=0.75, help="Verification threshold")
# Verify
ver = sub.add_parser("verify", help="Verify audio against voiceprint")
ver.add_argument("--audio", required=True, help="Audio file to verify")
ver.add_argument("--data-dir", default=None, help="Data directory")
# Info
sub.add_parser("info", help="Show voiceprint info")
args = parser.parse_args()
if args.command == "register":
vk = VoiceKey(data_dir=args.data_dir, threshold=args.threshold)
result = vk.register(args.audio, owner=args.owner)
print(f"Registered voiceprint for: {result['owner']}")
print(f"Samples used: {result['samples']}")
print(f"Self-test scores: {[f'{s:.4f}' for s in result['self_test_scores']]}")
print(f"Min score: {result['min_score']:.4f} (threshold: {args.threshold})")
elif args.command == "verify":
vk = VoiceKey(data_dir=getattr(args, 'data_dir', None))
is_ok, score = vk.verify(args.audio)
status = "PASS" if is_ok else "FAIL"
print(f"[{status}] Similarity: {score:.4f} (threshold: {vk.threshold})")
elif args.command == "info":
vk = VoiceKey()
info = vk.get_info()
for k, v in info.items():
print(f" {k}: {v}")
else:
parser.print_help()
if __name__ == "__main__":
main()
Lời kết #
7 đoạn code phía trên đều lấy từ connectfarm1.com vào ngày 2026-05-04. Cả 7 chia sẻ ba triết lý: (1) Chi phí AI gần bằng 0 — hầu hết dùng rule engine và API công khai miễn phí thay cho LLM; (2) Chỉ dùng Python, dễ chạy trên VPS giá rẻ với crontab hoặc một vòng while True đơn giản; (3) Telegram là kênh đầu ra duy nhất — không dashboard, không front-end, đẩy tin trực tiếp tới nơi bạn thật sự đọc. Hãy kết hợp các radar để xác thực tín hiệu chéo, và xem “AI giao dịch tự động” như một sandbox nghiên cứu, không phải máy in tiền.