Skip to main content

Code Vault — 7개의 오픈소스 암호화폐 레이더 및 트레이딩 도구

Code Vault: 자동 거래, 시장 분석 및 포트폴리오 관리를 위한 7가지 필수 암호화폐 거래 도구. 오픈 소스 거래 유틸리티입니다.

Go Python
应用领域: Dev Utils

{</* resource-info */>}

본 글은 Code Vault에 현재 공개된 7개 코드 스니펫을 한 번에 정리한 것입니다 — 암호화폐 트레이딩 레이더, 자율 트레이딩 시스템, 보안 도구를 다루는 개인 코드 저장소이며, 모두 순수 Python으로 작성되었고 API 비용은 0 또는 거의 0입니다. 각 스니펫 아래에 전체 소스 코드가 포함되어 있어 그대로 읽고 Fork하여 로컬에서 실행할 수 있습니다. 오른쪽 목차에서 원하는 도구로 바로 이동할 수 있습니다.

⚠️ 위험 경고 — 이 도구들은 실시간 시세와 온체인 데이터를 다루며, 일부는 Telegram으로 실시간 알림을 보냅니다. 그 중 “AI 자율 트레이딩"은 바이낸스 선물에서 가상 포지션을 엽니다. 각 도구의 설명을 반드시 자세히 읽으세요. 사용에 따른 위험은 본인 책임이며, 저자는 트레이딩 결과에 대해 어떠한 보증도 하지 않습니다.

트레이딩 레이더 #

Vitalik 매도 레이더 #

게시일: 2026.05.02 태그: Python · WebSocket · Ethereum · Telegram · Event-Driven

GitHub: vitalik-sell-radar{rel=“nofollow”}

WebSocket 이벤트 기반 · Vitalik 지갑 매도 감지 · Telegram 초단위 푸시

WebSocket 이벤트 구독을 통해 Vitalik Buterin의 지갑(vitalik.eth)에서 발생하는 ERC-20 토큰 매도를 실시간 감지 — 폴링 없음, 1초 미만 지연. 수신자를 자동으로 분류합니다: DEX Router(Uniswap, 1inch, SushiSwap), CEX 핫월렛(Binance, Coinbase, Kraken), LP 풀. DexScreener에서 실시간 토큰 가격 조회. 다중 RPC 페일오버 + 자동 재연결. 순수 Python, 비용 0 — 무료 공개 RPC 노드 사용.

전체 소스 코드 #

#!/usr/bin/env python3
"""
Vitalik Sell Radar — Event-Driven Edition
WebSocket subscription to ERC-20 Transfer events, real-time detection of
Vitalik's sell activity, instant push to Telegram.

Architecture:
1. WebSocket subscribes to Transfer(from=vitalik) events → sub-second detection
2. Classifies sell behavior (transfers to DEX Router / CEX / LP Pool)
3. Queries token info + price via DexScreener
4. Pushes alert to Telegram
5. Auto-reconnect + multi-RPC failover
"""

import asyncio
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime, timezone
from pathlib import Path

import aiohttp
import websockets

# Load .env file
def load_env():
 env_path = Path(__file__).parent / ".env"
 if env_path.exists():
 for line in env_path.read_text().splitlines():
 line = line.strip()
 if line and not line.startswith("#") and "=" in line:
 k, v = line.split("=", 1)
 os.environ.setdefault(k.strip(), v.strip())

load_env()

# ============================================================
# Configuration
# ============================================================

# Vitalik's main wallet
VITALIK_ADDRESS = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045"
VITALIK_PADDED = "0x" + VITALIK_ADDRESS[2:].lower().zfill(64)

# ERC-20 Transfer event topic
TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"

# WebSocket RPC endpoints (free, support eth_subscribe)
WS_ENDPOINTS = [
 "wss://ethereum-rpc.publicnode.com",
 "wss://eth.drpc.org",
 "wss://ethereum.publicnode.com",
]

# HTTP RPC for querying token info
HTTP_RPC = os.environ.get("HTTP_RPC", "https://eth.drpc.org")

# Telegram
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")

# Known DEX Router addresses (sell destinations)
KNOWN_DEX_ROUTERS = {
 # Uniswap
 "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap V2 Router",
 "0xe592427a0aece92de3edee1f18e0157c05861564": "Uniswap V3 Router",
 "0x68b3465833fb72a70ecdf485e0e4c7bd8665fc45": "Uniswap V3 Router2",
 "0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad": "Uniswap Universal Router",
 "0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b": "Uniswap Universal Router (old)",
 # 1inch
 "0x1111111254eeb25477b68fb85ed929f73a960582": "1inch V5",
 "0x111111125421ca6dc452d289314280a0f8842a65": "1inch V6",
 # SushiSwap
 "0xd9e1ce17f2641f24ae83637ab66a2cca9c378b9f": "SushiSwap Router",
 # CoW Protocol
 "0x9008d19f58aabd9ed0d60971565aa8510560ab41": "CoW Settlement",
 # 0x
 "0xdef1c0ded9bec7f1a1670819833240f027b25eff": "0x Exchange Proxy",
 # Curve
 "0x99a58482bd75cbab83b27ec03ca68ff489b5788f": "Curve Router",
}

# Known CEX hot wallets (partial list)
KNOWN_CEX = {
 "0x28c6c06298d514db089934071355e5743bf21d60": "Binance Hot Wallet",
 "0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance Hot Wallet 2",
 "0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance Hot Wallet 3",
 "0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance Hot Wallet 4",
 "0x71660c4005ba85c37ccec55d0c4493e66fe775d3": "Coinbase",
 "0xa9d1e08c7793af67e9d92fe308d5697fb81d3e43": "Coinbase 10",
 "0x503828976d22510aad0201ac7ec88293211d23da": "Coinbase 2",
 "0x2faf487a4414fe77e2327f0bf4ae2a264a776ad2": "FTX (defunct)",
 "0x267be1c1d684f78cb4f6a176c4911b741e4ffdc0": "Kraken",
 "0xae2d4617c862309a3d75a0ffb358c7a5009c673f": "Kraken 10",
}

# Minimum notification amount (USD), 0 = notify all
MIN_NOTIFY_USD = int(os.environ.get("MIN_NOTIFY_USD", "0"))

# Reconnect settings
RECONNECT_DELAY = 5
MAX_RECONNECT_DELAY = 60

# ============================================================
# Logging
# ============================================================

logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s [%(levelname)s] %(message)s",
 datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("vitalik-radar")

# ============================================================
# Global state
# ============================================================

# Dedup set for processed tx hashes (last 1000)
seen_txs: set = set()
seen_txs_list: list = []

# HTTP session
http_session: aiohttp.ClientSession | None = None

# Token info cache: {address: {symbol, name, decimals}}
token_cache: dict = {}

# Running flag
running = True

# ============================================================
# Utility functions
# ============================================================

def shorten_addr(addr: str) -> str:
 """Shorten address for display"""
 return f"{addr[:6]}...{addr[-4:]}"

def decode_transfer_value(data_hex: str, decimals: int) -> float:
 """Decode Transfer event value"""
 try:
 raw = int(data_hex, 16)
 return raw / (10 ** decimals)
 except:
 return 0.0

def topic_to_address(topic: str) -> str:
 """Extract 20-byte address from 32-byte topic"""
 return "0x" + topic[-40:]

def classify_recipient(to_addr: str) -> tuple[str, str]:
 """
 Classify recipient address.
 Returns (type, name)
 Type: "dex" / "cex" / "pool" / "unknown"
 """
 to_lower = to_addr.lower()

 if to_lower in KNOWN_DEX_ROUTERS:
 return "dex", KNOWN_DEX_ROUTERS[to_lower]

 if to_lower in KNOWN_CEX:
 return "cex", KNOWN_CEX[to_lower]

 return "unknown", ""

async def get_http_session() -> aiohttp.ClientSession:
 global http_session
 if http_session is None or http_session.closed:
 http_session = aiohttp.ClientSession()
 return http_session

async def rpc_call(method: str, params: list) -> dict | None:
 """HTTP JSON-RPC call"""
 session = await get_http_session()
 try:
 async with session.post(
 HTTP_RPC,
 json={"jsonrpc": "2.0", "id": 1, "method": method, "params": params},
 timeout=aiohttp.ClientTimeout(total=10),
 ) as resp:
 data = await resp.json()
 return data.get("result")
 except Exception as e:
 log.error(f"RPC call {method} failed: {e}")
 return None

async def get_token_info(token_addr: str) -> dict:
 """Query ERC-20 token info (symbol, name, decimals)"""
 addr_lower = token_addr.lower()
 if addr_lower in token_cache:
 return token_cache[addr_lower]

 info = {"symbol": "???", "name": "Unknown", "decimals": 18, "address": token_addr}

 # symbol()
 result = await rpc_call("eth_call", [
 {"to": token_addr, "data": "0x95d89b41"}, "latest"
 ])
 if result and len(result) > 2:
 try:
 hex_str = result[2:]
 if len(hex_str) >= 128:
 offset = int(hex_str[:64], 16) * 2
 length = int(hex_str[offset:offset+64], 16)
 symbol_hex = hex_str[offset+64:offset+64+length*2]
 info["symbol"] = bytes.fromhex(symbol_hex).decode("utf-8", errors="replace").strip('\x00')
 elif len(hex_str) == 64:
 info["symbol"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
 except:
 pass

 # decimals()
 result = await rpc_call("eth_call", [
 {"to": token_addr, "data": "0x313ce567"}, "latest"
 ])
 if result and len(result) > 2:
 try:
 info["decimals"] = int(result, 16)
 except:
 pass

 # name()
 result = await rpc_call("eth_call", [
 {"to": token_addr, "data": "0x06fdde03"}, "latest"
 ])
 if result and len(result) > 2:
 try:
 hex_str = result[2:]
 if len(hex_str) >= 128:
 offset = int(hex_str[:64], 16) * 2
 length = int(hex_str[offset:offset+64], 16)
 name_hex = hex_str[offset+64:offset+64+length*2]
 info["name"] = bytes.fromhex(name_hex).decode("utf-8", errors="replace").strip('\x00')
 elif len(hex_str) == 64:
 info["name"] = bytes.fromhex(hex_str).decode("utf-8", errors="replace").strip('\x00')
 except:
 pass

 token_cache[addr_lower] = info
 return info

async def check_if_pool(addr: str) -> bool:
 """Check if address is a Uniswap V2/V3 pool (has token0 method)"""
 result = await rpc_call("eth_call", [
 {"to": addr, "data": "0x0dfe1681"}, "latest" # token0()
 ])
 if result and len(result) == 66: # 0x + 64 hex chars
 return True
 return False

async def get_eth_price() -> float:
 """Get ETH price from CoinGecko"""
 session = await get_http_session()
 try:
 async with session.get(
 "https://api.coingecko.com/api/v3/simple/price?ids=ethereum&vs_currencies=usd",
 timeout=aiohttp.ClientTimeout(total=5),
 ) as resp:
 data = await resp.json()
 return data.get("ethereum", {}).get("usd", 0)
 except:
 return 2300 # fallback

async def get_token_price_usd(token_addr: str) -> float | None:
 """Get token USD price from DexScreener (free, no API key needed)"""
 session = await get_http_session()
 try:
 async with session.get(
 f"https://api.dexscreener.com/latest/dex/tokens/{token_addr}",
 timeout=aiohttp.ClientTimeout(total=5),
 ) as resp:
 data = await resp.json()
 pairs = data.get("pairs", [])
 if pairs:
 # Pick pair with highest liquidity
 pairs.sort(key=lambda p: float(p.get("liquidity", {}).get("usd", 0) or 0), reverse=True)
 price = float(pairs[0].get("priceUsd", 0) or 0)
 return price if price > 0 else None
 except:
 pass
 return None

# ============================================================
# Telegram notifications
# ============================================================

async def send_telegram(text: str):
 """Send Telegram message"""
 if not TG_BOT_TOKEN:
 log.warning("[TG] No bot token configured, skipping notification")
 return

 session = await get_http_session()
 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
 payload = {
 "chat_id": TG_CHAT_ID,
 "text": text,
 "parse_mode": "HTML",
 "disable_web_page_preview": True,
 }

 try:
 async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=10)) as resp:
 result = await resp.json()
 if not result.get("ok"):
 log.error(f"[TG] Send failed: {result.get('description', '')}")
 # Retry with plain text if HTML parse fails
 if "parse" in result.get("description", "").lower():
 payload["parse_mode"] = None
 async with session.post(url, json=payload) as resp2:
 pass
 else:
 log.info("[TG] Message sent")
 except Exception as e:
 log.error(f"[TG] Error: {e}")

# ============================================================
# Event handling
# ============================================================

async def handle_transfer_event(log_entry: dict):
 """Process a single Transfer event"""
 tx_hash = log_entry.get("transactionHash", "")
 token_addr = log_entry.get("address", "")
 topics = log_entry.get("topics", [])
 data = log_entry.get("data", "0x0")

 # Dedup
 dedup_key = f"{tx_hash}:{token_addr}"
 if dedup_key in seen_txs:
 return
 seen_txs.add(dedup_key)
 seen_txs_list.append(dedup_key)
 # Cleanup (keep last 1000)
 while len(seen_txs_list) > 1000:
 old = seen_txs_list.pop(0)
 seen_txs.discard(old)

 # Parse topics: [Transfer, from, to]
 if len(topics) < 3:
 return

 from_addr = topic_to_address(topics[1])
 to_addr = topic_to_address(topics[2])

 # Confirm it's from Vitalik
 if from_addr.lower() != VITALIK_ADDRESS.lower():
 return

 # Get token info
 token_info = await get_token_info(token_addr)
 symbol = token_info["symbol"]
 decimals = token_info["decimals"]
 amount = decode_transfer_value(data, decimals)

 if amount == 0:
 return

 # Classify recipient
 recipient_type, recipient_name = classify_recipient(to_addr)

 # If unknown, check if it's an LP pool
 is_pool = False
 if recipient_type == "unknown":
 is_pool = await check_if_pool(to_addr)
 if is_pool:
 recipient_type = "pool"
 recipient_name = "LP Pool"

 # Determine if this is a "sell" action
 is_sell = recipient_type in ("dex", "cex", "pool")

 # If not a sell, just a regular transfer — log silently
 if not is_sell:
 log.info(f"[Transfer] {symbol} {amount:,.2f}{shorten_addr(to_addr)} (regular transfer, not notifying)")
 return

 # Get price
 token_price = await get_token_price_usd(token_addr)
 usd_value = amount * token_price if token_price else None

 # Below minimum notification amount — skip
 if usd_value is not None and usd_value < MIN_NOTIFY_USD:
 log.info(f"[Sell] {symbol} ${usd_value:.0f} < ${MIN_NOTIFY_USD} minimum, skipping")
 return

 # Build notification message
 timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S UTC")

 sell_type_label = {
 "dex": "🔄 DEX Sell",
 "cex": "🏦 CEX Transfer",
 "pool": "🌊 Pool Sell",
 }

 msg_lines = [
 f"<b>🚨 Vitalik Sell Signal</b>",
 f"",
 f"Token: <b>{symbol}</b>",
 f"Amount: {amount:,.4f}",
 ]

 if usd_value is not None:
 msg_lines.append(f"Value: <b>${usd_value:,.0f}</b>")

 if token_price is not None:
 msg_lines.append(f"Price: ${token_price:,.8f}")

 msg_lines.extend([
 f"",
 f"Type: {sell_type_label.get(recipient_type, 'Unknown')}",
 f"Destination: {recipient_name or shorten_addr(to_addr)}",
 f"Time: {timestamp}",
 f"",
 f"TX: https://etherscan.io/tx/{tx_hash}",
 f"Token: https://dexscreener.com/ethereum/{token_addr}",
 ])

 msg = "\n".join(msg_lines)
 log.info(f"[SELL DETECTED] {symbol} {amount:,.4f}{recipient_name or to_addr} | ${usd_value or '?'}")
 await send_telegram(msg)

# ============================================================
# WebSocket listener main loop
# ============================================================

async def subscribe_and_listen(ws_url: str):
 """Connect to WebSocket and subscribe to Vitalik Transfer events"""
 log.info(f"Connecting to {ws_url}...")

 async with websockets.connect(
 ws_url,
 ping_interval=20,
 ping_timeout=30,
 close_timeout=10,
 max_size=2**20, # 1MB
 ) as ws:
 # Subscribe to Transfer FROM Vitalik
 sub_from = {
 "jsonrpc": "2.0", "id": 1,
 "method": "eth_subscribe",
 "params": ["logs", {
 "topics": [TRANSFER_TOPIC, VITALIK_PADDED]
 }]
 }
 await ws.send(json.dumps(sub_from))
 resp = await asyncio.wait_for(ws.recv(), timeout=10)
 data = json.loads(resp)
 if "error" in data:
 raise Exception(f"Subscribe failed: {data['error']}")
 sub_id_from = data.get("result", "")
 log.info(f"✅ Subscribed to Transfer FROM Vitalik (id={sub_id_from[:10]}...)")

 # Also subscribe to Transfer TO Vitalik (monitor buys/receives)
 sub_to = {
 "jsonrpc": "2.0", "id": 2,
 "method": "eth_subscribe",
 "params": ["logs", {
 "topics": [TRANSFER_TOPIC, None, VITALIK_PADDED]
 }]
 }
 await ws.send(json.dumps(sub_to))
 resp2 = await asyncio.wait_for(ws.recv(), timeout=10)
 data2 = json.loads(resp2)
 sub_id_to = data2.get("result", "")
 if sub_id_to:
 log.info(f"✅ Subscribed to Transfer TO Vitalik (id={sub_id_to[:10]}...)")

 log.info("🔍 Monitoring Vitalik wallet... waiting for events")

 # Listen for events
 async for message in ws:
 if not running:
 break

 try:
 evt = json.loads(message)

 if "params" not in evt:
 continue

 sub_id = evt["params"].get("subscription", "")
 log_entry = evt["params"].get("result", {})

 if sub_id == sub_id_from:
 # Vitalik sent tokens → check if it's a sell
 await handle_transfer_event(log_entry)
 elif sub_id == sub_id_to:
 # Vitalik received tokens — log silently
 token_addr = log_entry.get("address", "")
 token_info = await get_token_info(token_addr)
 data_hex = log_entry.get("data", "0x0")
 amount = decode_transfer_value(data_hex, token_info["decimals"])
 log.debug(f"[Receive] {token_info['symbol']} +{amount:,.4f}")

 except json.JSONDecodeError:
 continue
 except Exception as e:
 log.error(f"Event handling error: {e}", exc_info=True)

async def main():
 """Main loop with auto-reconnect"""
 global running

 # Validate required config
 if not TG_BOT_TOKEN:
 log.warning("TG_BOT_TOKEN not set — Telegram notifications disabled")
 if not TG_CHAT_ID:
 log.warning("TG_CHAT_ID not set — Telegram notifications disabled")

 # Signal handling
 def shutdown(sig, frame):
 global running
 log.info(f"Received signal {sig}, shutting down...")
 running = False

 signal.signal(signal.SIGINT, shutdown)
 signal.signal(signal.SIGTERM, shutdown)

 # Startup notice
 log.info("=" * 50)
 log.info("Vitalik Sell Radar — Started")
 log.info(f"Monitoring: {VITALIK_ADDRESS}")
 log.info(f"Min notify: ${MIN_NOTIFY_USD}")
 log.info(f"Telegram: {'✅ Configured' if TG_BOT_TOKEN else '❌ Not configured'}")
 log.info("=" * 50)

 if TG_BOT_TOKEN and TG_CHAT_ID:
 await send_telegram(
 "🟢 Vitalik Sell Radar — Started\n\n"
 f"Monitoring: {shorten_addr(VITALIK_ADDRESS)}\n"
 f"Min notify: ${MIN_NOTIFY_USD}\n"
 "Mode: WebSocket event-driven (sub-second latency)"
 )

 endpoint_idx = 0
 reconnect_delay = RECONNECT_DELAY

 while running:
 ws_url = WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]
 try:
 await subscribe_and_listen(ws_url)
 reconnect_delay = RECONNECT_DELAY # reset on success
 except websockets.exceptions.ConnectionClosed as e:
 log.warning(f"WebSocket closed: {e}")
 except asyncio.TimeoutError:
 log.warning("WebSocket timeout")
 except Exception as e:
 log.error(f"WebSocket error: {e}")

 if not running:
 break

 # Switch endpoint and retry
 endpoint_idx += 1
 log.info(f"Reconnecting in {reconnect_delay}s... (next: {WS_ENDPOINTS[endpoint_idx % len(WS_ENDPOINTS)]})")
 await asyncio.sleep(reconnect_delay)
 reconnect_delay = min(reconnect_delay * 1.5, MAX_RECONNECT_DELAY)

 # Cleanup
 if http_session and not http_session.closed:
 await http_session.close()

 log.info("Vitalik Sell Radar — Stopped")

if __name__ == "__main__":
 asyncio.run(main())

온체인 내러티브 레이더 #

게시일: 2026.04.28 태그: Python · GMGN · DEXScreener · Telegram

모멘텀 기반 · 온체인 신규 토큰 발견 · ETH/SOL/BSC/Base 커버

모멘텀이 유일한 푸시 엔진이고, 내러티브는 단지 분류 라벨입니다. 4개 체인을 30초마다 스캔합니다. 토큰은 시가총액이 3회 연속 상승하고 누적 ≥5% 증가해야만 알림이 발동됩니다. 내러티브(머스크/트럼프, 바이낸스/CZ, 셀럽)는 ★★★/★★/★ 라벨로 분류되지만 단독으로 푸시를 트리거하지는 않습니다. 안전 점검은 SOL은 RugCheck, EVM은 GoPlus를 사용. 순수 Python, AI 비용 0.

전체 소스 코드 #

#!/usr/bin/env python3
"""
 → v1
Python,AI( + )

1. — /,
2. / — ETH+SOL,BSC
3. /CZ — BSC

:GMGN + DEXScreener
:SQLite
"""

import requests
import json
import time
import os
import re
import sqlite3
import hashlib
from datetime import datetime, timedelta
from pathlib import Path
from difflib import SequenceMatcher

# === ===
DATA_DIR = os.path.expanduser("~/crypto-trading")
DB_FILE = os.path.join(DATA_DIR, "narrative_history.db")
LOG_FILE = os.path.join(DATA_DIR, "narrative_radar.log")
SEEN_FILE = os.path.join(DATA_DIR, "narrative_seen.json")
FLAP_SEEN_FILE = os.path.join(DATA_DIR, "flap_seen.json")

# 
SCAN_INTERVAL = 30 # 30(GMGN1-5,10)

# — /
# {address: [{'ts': timestamp, 'mc': market_cap, 'vol': volume, 'price': price}, ...]}
MOMENTUM_TRACKER = {}
MOMENTUM_PUSHED = {} # {address: {'count': N, 'last_ts': ts, 'last_mc': mc}} 
MOMENTUM_CONSECUTIVE_UP = 3 # 3()

# .envTG
def load_env():
 env = {}
 env_file = os.path.expanduser("~/.env")
 if os.path.exists(env_file):
 with open(env_file) as f:
 for line in f:
 line = line.strip()
 if '=' in line and not line.startswith('#'):
 k, v = line.split('=', 1)
 env[k] = v
 return env

ENV = load_env()
TG_TOKEN = ENV.get('TELEGRAM_BOT_TOKEN', '')
TG_CHAT_ID = int(os.environ.get('TG_CHAT_ID', '0'))

GMGN_HEADERS = {
 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
 'Accept': 'application/json',
 'Referer': 'https://gmgn.ai/',
}

# ============================================================
# /()
# ============================================================
MUSK_TRUMP_KEYWORDS = {
 # 
 'musk', 'elon', 'elonmusk',
 # SpaceX/Tesla/X
 'spacex', 'starship', 'tesla', 'cybertruck', 'roadster',
 'neuralink', 'boring', 'hyperloop', 'xai', 'grok',
 # //
 'floki', 'shiba', # 
 'doge father', 'dogefather', 'technoking',
 'mars colony', 'mars',
 # 
 'trump', 'donald', 'maga', 'potus', 'trump47',
 'melania', 'barron', 'ivanka',
 # 
 'dark maga', 'darkmaga', 'ultra maga', 'save america',
 'truth social', 'covfefe',
 # +
 'doge department', 'd.o.g.e', 'government efficiency',
}

# /()
MUSK_TRUMP_PATTERNS = [
 r'\belon\b', r'\bmusk\b', r'\btrump\b', r'\bmaga\b',
 r'\bspacex\b', r'\bstarship\b', r'\btesla\b', r'\bgrok\b',
 r'\bmelania\b', r'\bbarron\b', r'\bdoge\s*department\b',
 r'\bd\.?o\.?g\.?e\b', # D.O.G.E
 r'\bx\s*ai\b', r'\bneuralink\b',
]

# ============================================================
# /CZ
# ============================================================
BINANCE_CZ_KEYWORDS = {
 # CZ
 'cz', 'changpeng', 'zhao', 'czb', 'czbinance',
 # (BSC!)
 'heyi', 'yi he', 'he yi', '', 'yihe',
 'sister yi', 'yi jie', '', '',
 # 
 'binance', 'bnb', 'pancake', 'pancakeswap',
 # CZ(、、)
 'giggle academy', 'binance life', 'bnb chain',
 'principles', 'cz book',
 # YZi Labs (Binance Labs)
 'yzi', 'yzi labs',
 # (BSC)
 '', '', '', 'cz', '',
 # Four.meme
 'fourmeme', 'four meme', '4meme',
 # CZ/
 'czs dog', 'cz dog', 'bnb dog',
 'build on bnb', 'bnb ecosystem',
}

BINANCE_CZ_PATTERNS = [
 r'\bcz\b', r'\bbinance\b', r'\bbnb\b',
 r'\bheyi\b', r'\byi\s*he\b', r'\bhe\s*yi\b',
 r'\b\b', r'\b\b',
 r'\bpancake\b', r'\bgiggle\b', r'\byzi\b',
 r'\bfourmeme\b', r'\b4meme\b',
]

# ============================================================
# /(★★)
# ============================================================
CELEBRITY_VIRAL_KEYWORDS = {
 # 
 'vitalik', 'buterin', 'sam altman', 'satoshi',
 'michael saylor', 'saylor', 'cathie wood',
 'jack dorsey', 'zuckerberg', 'bezos',
 'jensen huang', 'nvidia', 'tim cook',
 # 
 'justin sun', 'sun yuchen', '', 'tron',
 'arthur hayes', 'su zhu', '3ac',
 'brian armstrong', 'coinbase',
 'larry fink', 'blackrock',
 'gary gensler', 'sec',
 'michael novogratz', 'galaxy',
 # /
 'biden', 'obama', 'putin', 'xi jinping',
 'kanye', 'drake', 'snoop dogg', 'paris hilton',
 'mark cuban', 'mr beast', 'mrbeast',
 # ()
 'lobster', '', 'lobsta',
 'hawk tuah', 'griddy', 'skibidi',
 'rizz', 'sigma', 'gyatt',
 # 
 'etf', 'halving', '',
 'world war', 'wwiii',
 'fed', 'rate cut', '',
 'tiktok ban', 'tiktok',
}

CELEBRITY_VIRAL_PATTERNS = [
 r'\bvitalik\b', r'\bsaylor\b', r'\bblackrock\b',
 r'\bcoinbase\b', r'\bjustin\s*sun\b', r'\blobster\b',
 r'\betf\b', r'\bhalving\b', r'\bmrbeast\b',
 r'\bsnoop\b', r'\bkanye\b', r'\bdrake\b',
]

# ============================================================
# (/)
# ============================================================
SPAM_PATTERNS = [
 r'airdrop', r'presale', r'pre\s*sale',
 r'1000x', r'100x guaranteed',
 r'safe\s*moon', r'baby\s*\w+', # babydoge
 r'pornhub', r'porn', r'xxx', r'nsfw',
 r'nigga', r'nigger', r'faggot',
 r'scam', r'rugpull', r'rug\s*pull',
 r'official\s*token', r'official\s*coin',
]

# ()
COMMON_NOISE_WORDS = {
 'nice', 'good', 'bad', 'cool', 'hot', 'big', 'small',
 'life', 'love', 'hate', 'happy', 'sad', 'fun', 'lol',
 'cat', 'dog', 'moon', 'sun', 'star', 'king', 'queen',
 'gold', 'rich', 'cash', 'money', 'pay', 'buy', 'sell',
 'pump', 'dump', 'bull', 'bear', 'green', 'red',
 'hello', 'world', 'yes', 'no', 'wow', 'omg', 'lmao',
 'simp', 'chad', 'based', 'cope', 'seethe',
 'test', 'new', 'old', 'real', 'fake',
 # 
 'shit', 'shitcoin', 'fuck', 'fart', 'poop', 'pee',
 'cum', 'dick', 'ass', 'boob', 'tit',
 'nigga', 'retard', 'slop',
 # 
 'the', 'and', 'for', 'from', 'with', 'this', 'that',
 'coin', 'token', 'meme', 'pepe', 'wojak',
 'peg', 'usd', 'usdt', 'usdc', 'dai',
}

# ============================================================
# 
# ============================================================
def log(msg):
 ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
 line = f"[{ts}] {msg}"
 print(line)
 os.makedirs(DATA_DIR, exist_ok=True)
 with open(LOG_FILE, 'a') as f:
 f.write(line + '\n')

def load_flap_seen():
 if os.path.exists(FLAP_SEEN_FILE):
 try:
 with open(FLAP_SEEN_FILE) as f:
 return json.load(f)
 except:
 pass
 return {}

def save_flap_seen(data):
 # 7
 cutoff = int(time.time()) - 86400 * 7
 data = {k: v for k, v in data.items() if v > cutoff}
 with open(FLAP_SEEN_FILE, 'w') as f:
 json.dump(data, f)

def tg_send(text, parse_mode='Markdown'):
 if not TG_TOKEN:
 log(f"[TG] No token, skip: {text[:80]}")
 return False
 try:
 resp = requests.post(
 f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
 json={'chat_id': TG_CHAT_ID, 'text': text, 'parse_mode': parse_mode},
 timeout=10
 )
 result = resp.json()
 if not result.get('ok'):
 # Markdown
 if 'can\'t parse' in str(result.get('description', '')).lower():
 resp = requests.post(
 f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage',
 json={'chat_id': TG_CHAT_ID, 'text': text},
 timeout=10
 )
 else:
 log(f"[TG] Error: {result.get('description', '')}")
 return False
 return True
 except Exception as e:
 log(f"[TG] Send error: {e}")
 return False

# ============================================================
# 
# ============================================================
def init_db():
 """SQLite"""
 conn = sqlite3.connect(DB_FILE)
 c = conn.cursor()
 
 # 
 c.execute('''CREATE TABLE IF NOT EXISTS narratives (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 theme TEXT NOT NULL, -- ()
 first_token_name TEXT, -- 
 first_token_address TEXT, -- 
 first_chain TEXT, -- 
 first_seen_at INTEGER, -- 
 token_count INTEGER DEFAULT 1, -- 
 last_seen_at INTEGER -- 
 )''')
 
 # 
 c.execute('''CREATE TABLE IF NOT EXISTS tokens_seen (
 address TEXT PRIMARY KEY,
 chain TEXT,
 name TEXT,
 symbol TEXT,
 narrative_theme TEXT,
 category TEXT, -- 'musk_trump' / 'binance_cz' / 'novel' / 'common'
 first_seen_at INTEGER,
 market_cap REAL,
 pushed INTEGER DEFAULT 0, -- 
 seen_count INTEGER DEFAULT 1 -- 
 )''')
 
 # 
 c.execute('CREATE INDEX IF NOT EXISTS idx_theme ON narratives(theme)')
 c.execute('CREATE INDEX IF NOT EXISTS idx_addr ON tokens_seen(address)')
 
 conn.commit()
 return conn

def normalize_theme(name, symbol):
 """
 +
 :'Elon Mars Colony' → 'elon mars colony'
 'TRUMP2028' → 'trump'
 'PancakeBunny' → 'pancake bunny'
 """
 # namesymbol
 text = f"{name} {symbol}".lower().strip()
 
 # /
 noise = ['token', 'coin', 'inu', 'swap', 'finance', 'protocol',
 'dao', 'defi', 'nft', 'meta', 'verse', 'fi', 'ai',
 'pepe', 'wojak', 'chad', 'based']
 
 # camelCase
 text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
 # (2028、1000x)
 text = re.sub(r'\d+x?', '', text)
 # 
 text = re.sub(r'[^a-z\s]', ' ', text)
 # 
 words = [w for w in text.split() if w and len(w) > 1 and w not in noise]
 
 if not words:
 return name.lower().strip()
 
 return ' '.join(sorted(set(words)))

def is_similar_theme(theme1, theme2, threshold=0.7):
 """"""
 if theme1 == theme2:
 return True
 # 
 if theme1 in theme2 or theme2 in theme1:
 return True
 # 
 words1 = set(theme1.split())
 words2 = set(theme2.split())
 if words1 and words2:
 overlap = len(words1 & words2) / min(len(words1), len(words2))
 if overlap >= 0.6:
 return True
 # 
 return SequenceMatcher(None, theme1, theme2).ratio() >= threshold

def check_narrative_novelty(conn, theme, name, symbol, address, chain):
 """
 
 ('novel', None) — 
 ('heating', narrative_row) — !!
 ('existing', existing_theme_row) — ,
 
 :302+ = 
 """
 c = conn.cursor()
 now = int(time.time())
 HEAT_WINDOW = 1800 # 30
 HEAT_THRESHOLD = 2 # 2
 
 # 
 c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives WHERE theme = ?', (theme,))
 exact = c.fetchone()
 if exact:
 row_id, _, _, _, _, first_seen, count, last_seen = exact
 # 
 new_count = count + 1
 c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE theme = ?',
 (new_count, now, theme))
 conn.commit()
 
 # :HEAT_WINDOW
 if now - first_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
 return ('heating', exact)
 # :()
 if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
 return ('heating', exact)
 
 return ('existing', exact)
 
 # — 1000
 c.execute('SELECT id, theme, first_token_name, first_token_address, first_chain, first_seen_at, token_count, last_seen_at FROM narratives ORDER BY last_seen_at DESC LIMIT 1000')
 for row in c.fetchall():
 if is_similar_theme(theme, row[1]):
 row_id, _, _, _, _, first_seen, count, last_seen = row
 new_count = count + 1
 c.execute('UPDATE narratives SET token_count = ?, last_seen_at = ? WHERE id = ?',
 (new_count, now, row[0]))
 conn.commit()
 
 # 
 if now - last_seen < HEAT_WINDOW and new_count >= HEAT_THRESHOLD:
 return ('heating', row)
 
 return ('existing', row)
 
 # — 
 c.execute('''INSERT INTO narratives (theme, first_token_name, first_token_address, first_chain, first_seen_at, last_seen_at)
 VALUES (?, ?, ?, ?, ?, ?)''',
 (theme, name, address, chain, now, now))
 conn.commit()
 return ('novel', None)

def get_token_seen_count(conn, address):
 """"""
 c = conn.cursor()
 c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
 row = c.fetchone()
 return row[0] if row else 0

def is_token_seen(conn, address):
 """"""
 c = conn.cursor()
 c.execute('SELECT address FROM tokens_seen WHERE address = ?', (address,))
 return c.fetchone() is not None

def record_token(conn, address, chain, name, symbol, theme, category, mc, pushed=False):
 """ — +1"""
 c = conn.cursor()
 # 
 c.execute('SELECT seen_count FROM tokens_seen WHERE address = ?', (address,))
 existing = c.fetchone()
 if existing:
 # :+1,
 new_count = existing[0] + 1
 c.execute('''UPDATE tokens_seen SET seen_count = ?, market_cap = ?, category = ?
 WHERE address = ?''', (new_count, mc, category, address))
 else:
 # 
 c.execute('''INSERT INTO tokens_seen 
 (address, chain, name, symbol, narrative_theme, category, first_seen_at, market_cap, pushed, seen_count)
 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1)''',
 (address, chain, name, symbol, theme, category, int(time.time()), mc, 1 if pushed else 0))
 conn.commit()

# ============================================================
# 
# ============================================================
def classify_narrative(name, symbol, chain):
 """
 
 :('musk_trump', matched_keywords) / ('binance_cz', matched_keywords) / ('novel', None) / ('common', None)
 """
 text = f"{name} {symbol}".lower()
 
 # 1. 
 for pat in SPAM_PATTERNS:
 if re.search(pat, text, re.IGNORECASE):
 return ('spam', None)
 
 # 2. /
 matched_mt = []
 for kw in MUSK_TRUMP_KEYWORDS:
 if kw.lower() in text:
 matched_mt.append(kw)
 if not matched_mt:
 for pat in MUSK_TRUMP_PATTERNS:
 m = re.search(pat, text, re.IGNORECASE)
 if m:
 matched_mt.append(m.group())
 
 if matched_mt:
 # /:ETH+SOL,BSC
 chain_lower = chain.lower()
 if chain_lower in ('eth', 'ethereum', 'sol', 'solana', 'bsc', 'base'):
 return ('musk_trump', matched_mt)
 
 # 3. /CZ — BSC
 matched_bc = []
 for kw in BINANCE_CZ_KEYWORDS:
 if kw.lower() in text:
 matched_bc.append(kw)
 if not matched_bc:
 for pat in BINANCE_CZ_PATTERNS:
 m = re.search(pat, text, re.IGNORECASE)
 if m:
 matched_bc.append(m.group())
 
 if matched_bc:
 chain_lower = chain.lower()
 if chain_lower in ('bsc',):
 return ('binance_cz', matched_bc)
 else:
 return ('binance_cz_wrong_chain', matched_bc)
 
 # 4. /(★★)
 matched_cv = []
 for kw in CELEBRITY_VIRAL_KEYWORDS:
 if kw.lower() in text:
 matched_cv.append(kw)
 if not matched_cv:
 for pat in CELEBRITY_VIRAL_PATTERNS:
 m = re.search(pat, text, re.IGNORECASE)
 if m:
 matched_cv.append(m.group())
 
 if matched_cv:
 return ('celebrity_viral', matched_cv)
 
 # 5. → 
 return ('check_novelty', None)

# ============================================================
# ()
# ============================================================
def check_token_safety(chain, address):
 """ — (/),"""
 if chain in ('sol', 'solana'):
 try:
 r = requests.get(f'https://api.rugcheck.xyz/v1/tokens/{address}/report', timeout=10)
 if r.status_code == 200:
 data = r.json()
 score = data.get('score', 999)
 mint = data.get('mintAuthority')
 freeze = data.get('freezeAuthority')
 return {
 'safe': not mint and not freeze,
 'score': score, 'mint': mint is not None,
 'freeze': freeze is not None
 }
 except:
 pass
 else:
 chain_map = {'ethereum': '1', 'eth': '1', 'bsc': '56', 'base': '8453'}
 cid = chain_map.get(chain, '1')
 try:
 r = requests.get(f'https://api.gopluslabs.io/api/v1/token_security/{cid}?contract_addresses={address}', timeout=10)
 if r.status_code == 200:
 result = r.json().get('result', {})
 data = result.get(address.lower(), {})
 if data:
 honeypot = data.get('is_honeypot', '0') == '1'
 mintable = data.get('is_mintable', '0') == '1'
 sell_tax = float(data.get('sell_tax', '0') or '0')
 buy_tax = float(data.get('buy_tax', '0') or '0')
 return {
 'safe': not honeypot and not mintable, # 
 'honeypot': honeypot, 'mintable': mintable,
 'sell_tax': sell_tax, 'buy_tax': buy_tax
 }
 except:
 pass
 return {'safe': False, 'reason': ''} # ,

# ============================================================
# GMGN
# ============================================================
def gmgn_get(url):
 try:
 resp = requests.get(url, headers=GMGN_HEADERS, timeout=15)
 if resp.status_code == 200:
 return resp.json().get('data', {})
 except:
 pass
 return {}

def fetch_token_description(chain, address):
 """/ — """
 desc = ''
 
 # SOL:Pump.fundescription
 if chain in ('sol', 'solana'):
 try:
 r = requests.get(f'https://frontend-api-v3.pump.fun/coins/{address}', timeout=8)
 if r.status_code == 200:
 data = r.json()
 desc = data.get('description', '') or ''
 twitter = data.get('twitter', '') or ''
 telegram = data.get('telegram', '') or ''
 website = data.get('website', '') or ''
 return {
 'description': desc.strip(),
 'twitter': twitter,
 'telegram': telegram,
 'website': website,
 }
 except:
 pass
 
 # :DEXScreener info(+)
 try:
 chain_dex = {'sol': 'solana', 'eth': 'ethereum', 'bsc': 'bsc', 'base': 'base',
 'solana': 'solana', 'ethereum': 'ethereum'}.get(chain, chain)
 r = requests.get(f'https://api.dexscreener.com/latest/dex/tokens/{address}', timeout=8)
 if r.status_code == 200:
 pairs = r.json().get('pairs', [])
 if pairs:
 info = pairs[0].get('info', {})
 websites = info.get('websites', [])
 socials = info.get('socials', [])
 twitter = ''
 telegram = ''
 website = ''
 for s in socials:
 if s.get('type') == 'twitter':
 twitter = s.get('url', '')
 elif s.get('type') == 'telegram':
 telegram = s.get('url', '')
 for w in websites:
 if w.get('label', '').lower() == 'website':
 website = w.get('url', '')
 if not desc:
 # DEXScreenerdescription
 return {
 'description': desc,
 'twitter': twitter,
 'telegram': telegram,
 'website': website,
 }
 except:
 pass
 
 return {'description': desc, 'twitter': '', 'telegram': '', 'website': ''}

def fetch_new_tokens():
 """GMGN + """
 all_tokens = []
 seen_addrs = set()
 
 for chain in ['eth', 'bsc', 'base']:
 # ,
 urls = [
 # — 
 f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=open_timestamp&direction=desc&limit=100',
 # — 
 f'https://gmgn.ai/defi/quotation/v1/rank/{chain}/swaps/1h?orderby=swaps&direction=desc&limit=50',
 ]
 
 for url in urls:
 data = gmgn_get(url)
 tokens = data.get('rank', [])
 
 for t in tokens:
 addr = t.get('address', '')
 if not addr or addr in seen_addrs:
 continue
 
 mc = t.get('market_cap', 0) or t.get('fdv', 0) or 0
 liq = t.get('liquidity', 0) or 0
 
 # :
 if mc < 1000 or liq < 500 or mc > 10000000:
 continue
 
 age_ts = t.get('open_timestamp', 0)
 age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 999
 
 # — :,
 
 seen_addrs.add(addr)
 all_tokens.append({
 'address': addr,
 'chain': chain,
 'name': t.get('name', '?'),
 'symbol': t.get('symbol', '?'),
 'mc': mc,
 'liq': liq,
 'volume': t.get('volume', 0) or 0,
 'holders': t.get('holder_count', 0) or 0,
 'sm': t.get('smart_degen_count', 0) or 0,
 'chg_1h': t.get('price_change_percent1h', 0) or 0,
 'chg_24h': t.get('price_change_percent', 0) or 0,
 'age_h': age_h,
 'price': t.get('price', 0),
 'buys_1h': t.get('buys', 0) or 0,
 'sells_1h': t.get('sells', 0) or 0,
 })
 
 time.sleep(0.3)
 
 return all_tokens

def fetch_flap_tokens():
 """
 FLAP — BSC
 :()
 :24h,1h/,>,holders
 """
 data = gmgn_get(
 'https://gmgn.ai/defi/quotation/v1/rank/bsc/swaps/24h?launchpad=flap&orderby=volume&direction=desc&limit=30'
 )
 tokens = data.get('rank', [])
 
 candidates = []
 for t in tokens:
 addr = t.get('address', '')
 if not addr:
 continue
 
 mc = t.get('market_cap', 0) or 0
 liq = t.get('liquidity', 0) or 0
 vol = t.get('volume', 0) or 0
 holders = t.get('holder_count', 0) or 0
 buys = t.get('buys', 0) or 0
 sells = t.get('sells', 0) or 0
 chg_1h = t.get('price_change_percent1h', 0) or 0
 chg_24h = t.get('price_change_percent', 0) or 0
 age_ts = t.get('open_timestamp', 0)
 age_h = (time.time() - age_ts) / 3600 if age_ts > 0 else 0
 
 # 
 if mc < 1000 or liq < 500:
 continue
 if holders < 5:
 continue
 
 # :
 # 1: 24h(),
 # 2: 1h24h,
 # 3: > ,
 buy_ratio = buys / max(sells, 1)
 
 is_support = False
 reason = ''
 
 # A: 24h,1h/
 if chg_24h < -10 and chg_1h > chg_24h * 0.3:
 is_support = True
 reason = f'24h{chg_24h:.0f}%1h{chg_1h:+.0f}%'
 
 # B: 24h,1h,
 if -10 <= chg_24h <= 30 and chg_1h > -5 and buy_ratio > 1.1:
 is_support = True
 reason = f' {buy_ratio:.2f}'
 
 # C: 
 if chg_24h < -30 and chg_1h > 10:
 is_support = True
 reason = f'{chg_24h:.0f}%{chg_1h:+.0f}%'
 
 if is_support and buy_ratio >= 1.0:
 candidates.append({
 'address': addr,
 'chain': 'bsc',
 'name': t.get('name', '?'),
 'symbol': t.get('symbol', '?'),
 'mc': mc,
 'liq': liq,
 'volume': vol,
 'holders': holders,
 'sm': 0,
 'chg_1h': chg_1h,
 'chg_24h': chg_24h,
 'age_h': age_h,
 'price': t.get('price', 0),
 'buys': buys,
 'sells': sells,
 'buy_ratio': buy_ratio,
 'support_reason': reason,
 'launchpad': 'flap',
 })
 
 # 
 candidates.sort(key=lambda x: x['mc'], reverse=True)
 return candidates

def format_flap_alert(token, desc_info=None):
 """FLAP"""
 msg = f" — FLAP\n"
 msg += f": BSC | : FLAP\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 # 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {token['support_reason']}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f" ${token['liq']:>12,.0f}\n"
 msg += f"24h ${token['volume']:>12,.0f}\n"
 msg += f" {token['holders']:>12,d}\n"
 msg += f"/ {token['buys']:>6,d}/{token['sells']:>6,d}\n"
 msg += f" {token['buy_ratio']:>12.2f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 msg += f"24h {token['chg_24h']:>+11.1f}%\n"
 msg += f"```\n"
 msg += "\nFLAP — "
 
 # 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

# ============================================================
# 
# ============================================================
def format_musk_trump_alert(token, matched_kw, desc_info=None):
 """/"""
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 msg = f" — /\n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 # (!)
 desc = (desc_info or {}).get('description', '')
 if desc:
 # 200,
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {', '.join(matched_kw[:5])}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f" ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```\n"
 
 # 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"Twitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n' + '\n'.join(links)
 
 return msg

def format_binance_cz_alert(token, matched_kw, desc_info=None):
 """/CZ"""
 msg = f" — /CZ\n"
 msg += f": BSC\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 # 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {', '.join(matched_kw[:5])}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f" ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```\n"
 
 # 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"Twitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n' + '\n'.join(links)
 
 return msg

def format_novel_narrative_alert(token, theme, desc_info=None):
 """ — """
 return format_heating_narrative_alert(token, theme, 1, desc_info)

def format_heating_narrative_alert(token, theme, count, desc_info=None):
 """ — """
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 msg = f" — \n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 # 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 300:
 desc = desc[:300] + '...'
 msg += f": {desc}\n\n"
 else:
 msg += f": {theme}\n\n"
 
 msg += f"{count}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f" ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['holders']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```"
 
 # 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

# ============================================================
# — +
# ============================================================
def track_momentum(tokens):
 """
 + = ,。
 """
 global MOMENTUM_TRACKER, MOMENTUM_PUSHED
 now = time.time()
 alerts = []
 
 # 
 current_addrs = set()
 
 for token in tokens:
 addr = token['address']
 mc = token['mc']
 vol = token.get('volume', 0) or 0
 price = token.get('price', 0) or 0
 buys = token.get('buys_1h', 0) or token.get('buys', 0) or 0
 
 current_addrs.add(addr)
 
 # 
 if mc < 1000 or token.get('liq', 0) < 500 or mc > 10000000:
 continue
 
 # — (GMGN)
 if addr not in MOMENTUM_TRACKER:
 MOMENTUM_TRACKER[addr] = []
 
 snapshots = MOMENTUM_TRACKER[addr]
 
 # ()
 if snapshots and snapshots[-1]['mc'] == mc and snapshots[-1]['vol'] == vol:
 continue # ,
 
 snapshots.append({
 'ts': now,
 'mc': mc,
 'vol': vol,
 'price': price,
 'buys': buys,
 })
 
 # 20(200)
 if len(snapshots) > 20:
 snapshots[:] = snapshots[-20:]
 
 # 3
 if len(snapshots) < MOMENTUM_CONSECUTIVE_UP:
 continue
 
 # N
 recent = snapshots[-MOMENTUM_CONSECUTIVE_UP:]
 consecutive_up = True
 total_gain = 0
 
 for i in range(1, len(recent)):
 prev_mc = recent[i-1]['mc']
 curr_mc = recent[i]['mc']
 if prev_mc <= 0:
 consecutive_up = False
 break
 gain = (curr_mc - prev_mc) / prev_mc
 if gain <= 0: # 
 consecutive_up = False
 break
 total_gain += gain
 
 if not consecutive_up:
 continue
 
 # !()
 vol_increasing = True
 for i in range(1, len(recent)):
 if recent[i]['buys'] < recent[i-1]['buys'] * 0.8: # 
 vol_increasing = False
 break
 
 # 
 first_mc = recent[0]['mc']
 last_mc = recent[-1]['mc']
 pct_gain = ((last_mc - first_mc) / first_mc * 100) if first_mc > 0 else 0
 
 # : + >5%
 if pct_gain < 5:
 continue
 
 # :,+1
 push_info = MOMENTUM_PUSHED.get(addr, {'count': 0, 'last_ts': 0, 'last_mc': 0})
 
 # ()
 if push_info['count'] > 0 and last_mc <= push_info['last_mc']:
 continue
 
 push_info['count'] += 1
 push_info['last_ts'] = now
 push_info['last_mc'] = last_mc
 signal_count = push_info['count']
 
 # 
 safety = check_token_safety(token['chain'], addr)
 if not safety.get('safe'):
 continue
 
 # → 
 category, matched_kw = classify_narrative(token['name'], token['symbol'], token['chain'])
 is_flap = token.get('launchpad') == 'flap'
 
 if category == 'musk_trump':
 stars = 3
 narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
 elif category == 'binance_cz':
 stars = 3
 narrative_tag = f"/CZ ({', '.join(matched_kw[:3])})"
 elif category == 'celebrity_viral':
 stars = 2
 narrative_tag = f"/ ({', '.join(matched_kw[:3])})"
 elif is_flap:
 stars = 2
 narrative_tag = "FLAP"
 else:
 # 
 theme = normalize_theme(token['name'], token['symbol'])
 theme_words = [w for w in theme.split() if w not in COMMON_NOISE_WORDS and len(w) > 2]
 if len(theme_words) >= 2:
 stars = 2
 narrative_tag = f": {theme}"
 else:
 stars = 1
 narrative_tag = ""
 
 # 
 desc_info = fetch_token_description(token['chain'], addr)
 
 # FLAP/CTO
 if is_flap:
 has_twitter = bool(desc_info.get('twitter'))
 has_tg = bool(desc_info.get('telegram'))
 has_web = bool(desc_info.get('website'))
 community_tags = []
 if has_twitter:
 community_tags.append("")
 if has_tg:
 community_tags.append("TG")
 if has_web:
 community_tags.append("")
 if community_tags:
 narrative_tag += f" | {' '.join(community_tags)}"
 stars = min(3, stars + 1) # 
 else:
 narrative_tag += " | "
 
 msg = format_momentum_alert(token, pct_gain, len(recent), vol_increasing, stars, narrative_tag, desc_info, signal_count)
 alerts.append({'msg': msg, 'token': token})
 MOMENTUM_PUSHED[addr] = push_info
 
 log(f"[{signal_count}] {token['name']} ({token['symbol']}) on {token['chain']}{len(recent)} +{pct_gain:.1f}%")
 
 # 
 stale = [a for a in MOMENTUM_TRACKER if a not in current_addrs]
 for a in stale:
 if now - MOMENTUM_TRACKER[a][-1]['ts'] > 600: # 10
 del MOMENTUM_TRACKER[a]
 
 # — 1
 MOMENTUM_PUSHED = {k: v for k, v in MOMENTUM_PUSHED.items() if now - v.get('last_ts', 0) < 3600}
 
 return alerts

def format_momentum_alert(token, pct_gain, rounds, vol_up, stars, narrative_tag, desc_info=None, seen_count=0):
 """ — """
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 vol_tag = "" if vol_up else ""
 star_str = "★" * stars + "☆" * (3 - stars)
 
 # 
 msg = f"\n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 # 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {narrative_tag}\n"
 msg += f"{rounds} +{pct_gain:.1f}% {vol_tag}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f" ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```\n"
 msg += f": {star_str} : {seen_count}"
 
 # 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if (desc_info or {}).get('website'):
 links.append(f"Web: {desc_info['website']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

def format_celebrity_alert(token, matched_kw, desc_info=None):
 """/ ★★"""
 chain_map = {'sol': 'SOL', 'eth': 'ETH', 'bsc': 'BSC', 'base': 'BASE'}
 ch = chain_map.get(token['chain'], token['chain'].upper())
 
 msg = f" — / ★★\n"
 msg += f": {ch}\n\n"
 msg += f"{token['name']} ({token['symbol']})\n"
 msg += f"`{token['address']}`\n\n"
 
 desc = (desc_info or {}).get('description', '')
 if desc:
 if len(desc) > 200:
 desc = desc[:200] + '...'
 msg += f": {desc}\n\n"
 
 msg += f": {', '.join(matched_kw[:5])}\n\n"
 msg += f"```\n"
 msg += f" ${token['mc']:>12,.0f}\n"
 msg += f" ${token['liq']:>12,.0f}\n"
 msg += f"1h {token['chg_1h']:>+11.1f}%\n"
 if token.get('sm', 0) > 0:
 msg += f" {token['sm']:>12d}\n"
 msg += f" {token['age_h']:>10.1f}h\n"
 msg += f"```"
 
 links = []
 if (desc_info or {}).get('twitter'):
 links.append(f"\nTwitter: {desc_info['twitter']}")
 if (desc_info or {}).get('telegram'):
 links.append(f"TG: {desc_info['telegram']}")
 if links:
 msg += '\n'.join(links)
 
 return msg

# ============================================================
# 
# ============================================================
def scan_narratives():
 """"""
 conn = init_db()
 tokens = fetch_new_tokens()
 
 log(f" {len(tokens)} ...")
 
 # === — , ===
 # FLAP
 flap_tokens = []
 try:
 flap_tokens = fetch_flap_tokens()
 except:
 pass
 all_momentum_tokens = tokens + flap_tokens
 momentum_alerts = track_momentum(all_momentum_tokens)
 
 for token in tokens:
 addr = token['address']
 chain = token['chain']
 name = token['name']
 symbol = token['symbol']
 
 # — seen_countnarrativestoken_count,
 if is_token_seen(conn, addr):
 # seen_count
 c = conn.cursor()
 c.execute('UPDATE tokens_seen SET seen_count = seen_count + 1, market_cap = ? WHERE address = ?', (token['mc'], addr))
 # narrativestoken_count()
 theme_tmp = normalize_theme(name, symbol)
 if theme_tmp:
 c.execute('UPDATE narratives SET token_count = token_count + 1, last_seen_at = ? WHERE theme = ?', (int(time.time()), theme_tmp))
 conn.commit()
 continue
 
 # 
 category, matched_kw = classify_narrative(name, symbol, chain)
 
 if category == 'spam':
 record_token(conn, addr, chain, name, symbol, '', 'spam', token['mc'])
 continue
 
 # ()
 min_mc = 1000
 min_liq = 500
 if token['mc'] < min_mc or token['liq'] < min_liq:
 record_token(conn, addr, chain, name, symbol, '', 'too_small', token['mc'])
 continue
 
 theme = normalize_theme(name, symbol)
 
 # , — 
 record_token(conn, addr, chain, name, symbol, theme, category, token['mc'])
 check_narrative_novelty(conn, theme, name, symbol, addr, chain)
 
 conn.close()
 
 # === ===
 pushed = 0
 for ma in momentum_alerts[:8]: # 8
 if tg_send(ma['msg']):
 pushed += 1
 time.sleep(1) # TG
 
 return pushed, len(momentum_alerts)

# ============================================================
# 
# ============================================================
def main():
 log("=" * 50)
 log(" v1 ")
 log(f": {SCAN_INTERVAL}s")
 log(f": — ,")
 log("=" * 50)
 
 # DB
 init_db()
 
 # 
 tg_send(
 " v1 \n\n"
 ": \n"
 "3+>5%\n"
 ":\n"
 "★★★ / | /CZ | FLAP\n"
 "★★ | FLAP | \n"
 "★ \n\n"
 f": {SCAN_INTERVAL}"
 )
 
 scan_count = 0
 total_pushed = 0
 
 while True:
 try:
 scan_count += 1
 pushed, found = scan_narratives()
 total_pushed += pushed
 
 if pushed > 0:
 log(f"{scan_count}: {found}, {pushed} ({total_pushed})")
 else:
 if scan_count % 20 == 0: # 20
 log(f"{scan_count}: ({total_pushed})")
 
 except Exception as e:
 log(f": {e}")
 
 time.sleep(SCAN_INTERVAL)

if __name__ == '__main__':
 main()

OI + 펀딩비 스캐너 #

게시일: 2026.04.25 태그: Python · Binance Futures · Telegram

펀딩비 양→음 전환 감지 + OI 급증

스냅샷 기반 스캐너: 펀딩비가 양에서 음으로 뒤집히면서 동시에 OI가 상승 중인 종목을 감지합니다. 5분마다 실행.

전체 소스 코드 #

#!/usr/bin/env python3
"""
OI + 
- 
- : OI(4, >8%) + 
- : 24
- API
"""

import requests
import json
import os
import time
import sys
from datetime import datetime, timedelta
from pathlib import Path

# ============ ============
SCRIPT_DIR = Path(__file__).parent
ENV_FILE = SCRIPT_DIR / ".env.oi"
ALERT_HISTORY_FILE = SCRIPT_DIR / "oi_funding_alerts.json"
FR_SNAPSHOT_FILE = SCRIPT_DIR / "fr_snapshot.json" # 

# 
MIN_OI_CHANGE_PCT = 8 # OI8%
MIN_VOLUME_USDT = 0 # ,
MIN_FR_PERIODS_POSITIVE = 2 # 2
DEDUP_HOURS = 24 # 24

# ============ TG ============
def load_env():
 env = {}
 if ENV_FILE.exists():
 for line in ENV_FILE.read_text().strip().split('\n'):
 if '=' in line and not line.startswith('#'):
 k, v = line.split('=', 1)
 env[k.strip()] = v.strip()
 return env

env = load_env()
TG_BOT_TOKEN = env.get('TG_BOT_TOKEN', '')
TG_CHAT_ID = env.get('TG_CHAT_ID', '')

# ============ TG ============
def send_tg(text):
 if not TG_BOT_TOKEN or not TG_CHAT_ID:
 print("[TG] , :")
 print(text)
 return
 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
 # (TG4096)
 chunks = [text[i:i+4000] for i in range(0, len(text), 4000)]
 for chunk in chunks:
 try:
 resp = requests.post(url, json={
 'chat_id': TG_CHAT_ID,
 'text': chunk,
 'parse_mode': 'Markdown'
 }, timeout=10)
 if resp.status_code != 200:
 # fallback
 requests.post(url, json={
 'chat_id': TG_CHAT_ID,
 'text': chunk
 }, timeout=10)
 except Exception as e:
 print(f"[TG] : {e}")

# ============ ============
def load_alert_history():
 if ALERT_HISTORY_FILE.exists():
 try:
 return json.loads(ALERT_HISTORY_FILE.read_text())
 except:
 return {}
 return {}

def save_alert_history(history):
 ALERT_HISTORY_FILE.write_text(json.dumps(history))

def is_duplicate(symbol, history):
 if symbol not in history:
 return False
 last_alert = datetime.fromisoformat(history[symbol])
 return (datetime.now() - last_alert).total_seconds() < DEDUP_HOURS * 3600

def mark_alerted(symbol, history):
 history[symbol] = datetime.now().isoformat()
 # 
 cutoff = datetime.now() - timedelta(hours=DEDUP_HOURS * 2)
 history = {k: v for k, v in history.items() 
 if datetime.fromisoformat(v) > cutoff}
 return history

# ============ ============
def load_fr_snapshot():
 if FR_SNAPSHOT_FILE.exists():
 try:
 return json.loads(FR_SNAPSHOT_FILE.read_text())
 except:
 pass
 return {}

def save_fr_snapshot(snapshot):
 FR_SNAPSHOT_FILE.write_text(json.dumps(snapshot))

# ============ ============
def scan():
 ts_start = time.time()
 
 # 1. 
 try:
 info = requests.get('https://fapi.binance.com/fapi/v1/exchangeInfo', timeout=10).json()
 symbols = [s['symbol'] for s in info['symbols'] 
 if s['contractType'] == 'PERPETUAL' and s['quoteAsset'] == 'USDT' and s['status'] == 'TRADING']
 except Exception as e:
 print(f"[ERROR] exchangeInfo: {e}")
 return []
 
 # 2. 24h()
 try:
 tickers = requests.get('https://fapi.binance.com/fapi/v1/ticker/24hr', timeout=10).json()
 ticker_map = {t['symbol']: t for t in tickers}
 except Exception as e:
 print(f"[ERROR] ticker: {e}")
 return []
 
 active = [s for s in symbols if float(ticker_map.get(s, {}).get('quoteVolume', 0)) > MIN_VOLUME_USDT]
 
 # 3. ()
 try:
 fr_all = requests.get('https://fapi.binance.com/fapi/v1/premiumIndex', timeout=10).json()
 fr_current = {item['symbol']: float(item['lastFundingRate']) for item in fr_all}
 except:
 fr_current = {}
 
 # 4. ,""
 prev_snapshot = load_fr_snapshot()
 
 # ()
 save_fr_snapshot(fr_current)
 
 if not prev_snapshot:
 print(f"[{datetime.now().strftime('%H:%M:%S')}] ,,")
 return []
 
 # : >=0, <0 
 just_turned_negative = []
 for sym in active:
 prev_fr = prev_snapshot.get(sym)
 curr_fr = fr_current.get(sym)
 if prev_fr is None or curr_fr is None:
 continue
 if prev_fr >= 0 and curr_fr < 0:
 just_turned_negative.append(sym)
 
 if not just_turned_negative:
 elapsed = time.time() - ts_start
 print(f"[{datetime.now().strftime('%H:%M:%S')}] : {len(active)}/{elapsed:.1f}s, ")
 return []
 
 print(f"[{datetime.now().strftime('%H:%M:%S')}] {len(just_turned_negative)} : {just_turned_negative}")
 
 # 5. OI
 signals = []
 for sym in just_turned_negative:
 try:
 # OI
 oi_hist = requests.get('https://fapi.binance.com/futures/data/openInterestHist', 
 params={'symbol': sym, 'period': '1h', 'limit': 48}, timeout=10).json()
 
 oi_chg = 0
 segs = []
 oi_rising = False
 if oi_hist and len(oi_hist) >= 12:
 oi_values = [float(x['sumOpenInterestValue']) for x in oi_hist]
 seg_len = len(oi_values) // 4
 if seg_len >= 3:
 segs = [
 sum(oi_values[:seg_len]) / seg_len,
 sum(oi_values[seg_len:seg_len*2]) / seg_len,
 sum(oi_values[seg_len*2:seg_len*3]) / seg_len,
 sum(oi_values[seg_len*3:]) / max(1, len(oi_values[seg_len*3:]))
 ]
 oi_chg = (segs[3] - segs[0]) / segs[0] * 100 if segs[0] > 0 else 0
 oi_rising = oi_chg > 0
 
 t = ticker_map.get(sym, {})
 signals.append({
 'symbol': sym,
 'price': float(t.get('lastPrice', 0)),
 'price_chg_24h': float(t.get('priceChangePercent', 0)),
 'volume': float(t.get('quoteVolume', 0)),
 'oi_change': oi_chg,
 'oi_segments': segs,
 'oi_rising': oi_rising,
 'current_fr': fr_current.get(sym, 0),
 'prev_fr': prev_snapshot.get(sym, 0),
 })
 except:
 continue
 
 elapsed = time.time() - ts_start
 print(f"[{datetime.now().strftime('%H:%M:%S')}] : {len(active)}/{elapsed:.1f}s, : {len(signals)}")
 
 return signals

# ============ ============
def get_square_discussion(coin):
 """"""
 try:
 r = requests.get(
 "https://www.binance.com/bapi/composite/v4/friendly/pgc/content/queryByHashtag",
 params={"hashtag": f"#{coin.lower()}", "pageIndex": 1, "pageSize": 1, "orderBy": "HOT"},
 headers={"User-Agent": "Mozilla/5.0", "Referer": "https://www.binance.com/en/square"},
 timeout=8
 )
 if r.status_code == 200:
 ht = r.json().get("data", {}).get("hashtag", {})
 return ht.get("contentCount", 0), ht.get("viewCount", 0)
 except:
 pass
 return 0, 0

def get_market_caps():
 """"""
 mcap = {}
 try:
 r = requests.get(
 "https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
 timeout=10
 )
 if r.status_code == 200:
 for item in r.json().get("data", []):
 name = item.get("name", "")
 mc = item.get("marketCap", 0)
 if name and mc:
 mcap[name] = float(mc)
 except:
 pass
 return mcap

def get_spot_symbols():
 """"""
 try:
 info = requests.get("https://api.binance.com/api/v3/exchangeInfo", timeout=10).json()
 return {s["baseAsset"] for s in info["symbols"]
 if s["quoteAsset"] == "USDT" and s["status"] == "TRADING"}
 except:
 return set()

def fmt_mcap(v):
 if v >= 1e9: return f"${v/1e9:.2f}B"
 if v >= 1e6: return f"${v/1e6:.1f}M"
 if v >= 1e3: return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def fmt_views(v):
 if v >= 1e6: return f"{v/1e6:.1f}M"
 if v >= 1e3: return f"{v/1e3:.0f}K"
 return str(v)

# ============ ============
def format_alert(signals):
 if not signals:
 return None
 
 # OI,
 signals.sort(key=lambda x: (-int(x.get('oi_rising', False)), x['current_fr']))
 
 # 
 mcap_map = get_market_caps()
 spot_set = get_spot_symbols()
 
 now = datetime.now().strftime('%m-%d %H:%M')
 lines = [f"*[ +OI ]* {now}\n"]
 
 for s in signals:
 coin = s['symbol'].replace('USDT', '')
 
 # : →
 fr_change = f"{s['prev_fr']:+.4%} -> {s['current_fr']:+.4%}"
 
 # 
 mcap = mcap_map.get(coin, 0)
 has_spot = coin in spot_set
 sq_posts, sq_views = get_square_discussion(coin)
 
 lines.append(f"```")
 lines.append(f"{coin}")
 lines.append(f" : {s['price']:.4f} 24h: {s['price_chg_24h']:+.1f}%")
 lines.append(f" : {fr_change}")
 if s['oi_segments']:
 oi_segs = ' > '.join([f"{v/1e6:.1f}M" for v in s['oi_segments']])
 lines.append(f" OI: +{s['oi_change']:.1f}% ({oi_segs})")
 lines.append(f" : ${s['volume']/1e6:.1f}M")
 lines.append(f" : {fmt_mcap(mcap) if mcap > 0 else ''} : {'' if has_spot else ''}")
 if sq_posts > 0:
 lines.append(f" : {sq_posts} / {fmt_views(sq_views)}")
 else:
 lines.append(f" : ")
 lines.append(f"```")
 
 return '\n'.join(lines)

# ============ ============
def main():
 signals = scan()
 
 if signals:
 # : + OI ()
 strong = [s for s in signals if s['current_fr'] < 0 and s.get('oi_rising')]
 if strong:
 msg = format_alert(strong)
 if msg:
 send_tg(msg)
 print(f" {len(strong)} ({len(signals)}, {len(strong)}OI)")
 else:
 print(f" {len(signals)} OI, ")
 else:
 print(f" ")

if __name__ == '__main__':
 main()

축적 레이더 #

게시일: 2026.04.25 태그: Python · Binance · CoinGlass · Telegram

모멘텀 + OI 이상 + 스마트 알림

시간 단위 스캔: 상승률 상위 종목 모멘텀 추적, OI 이상 감지, Telegram 푸시. 순수 Python, AI 비용 0.

전체 소스 코드 #

#!/usr/bin/env python3
"""
 v2 — ++OI 

():
1. → CG+=
2. =,
3. OI==

:→→→

:API + CoinGecko Trending()
"""

import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta
from pathlib import Path
from square_heat import get_square_heat

# === .env ===
env_file = Path(__file__).parent / ".env.oi"
if env_file.exists():
 with open(env_file) as f:
 for line in f:
 line = line.strip()
 if line and not line.startswith("#") and "=" in line:
 k, v = line.split("=", 1)
 os.environ.setdefault(k.strip(), v.strip())

# === ===
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.getenv("TG_CHAT_ID", "YOUR_CHAT_ID")
FAPI = "https://fapi.binance.com"

# ()
HEAT_HISTORY_FILE = Path(__file__).parent / "heat_history.json"

# 
VOL_SURGE_MULT = 2.5 # 2.5=
MIN_VOL_USD = 20_000_000 # >$20M

# OI
MIN_OI_DELTA_PCT = 3.0 # OI3%
MIN_OI_USD = 2_000_000 # OI $2M

def api_get(endpoint, params=None):
 """API"""
 url = f"{FAPI}{endpoint}"
 for attempt in range(3):
 try:
 resp = requests.get(url, params=params, timeout=10)
 if resp.status_code == 200:
 return resp.json()
 elif resp.status_code == 429:
 time.sleep(2)
 else:
 return None
 except:
 time.sleep(1)
 return None

def format_usd(v):
 if v >= 1e9: return f"${v/1e9:.1f}B"
 if v >= 1e6: return f"${v/1e6:.1f}M"
 if v >= 1e3: return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def mcap_str(v):
 if v >= 1e9: return f"${v/1e9:.1f}B"
 if v >= 1e6: return f"${v/1e6:.0f}M"
 if v >= 1e3: return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def send_telegram(text):
 """TG"""
 if not TG_BOT_TOKEN:
 print("\n[TG] No token, stdout:\n")
 print(text)
 return

 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"

 # (TG4096)
 chunks = []
 current = ""
 for line in text.split("\n"):
 if len(current) + len(line) + 1 > 3800:
 chunks.append(current)
 current = line
 else:
 current += "\n" + line if current else line
 if current:
 chunks.append(current)

 for chunk in chunks:
 try:
 resp = requests.post(url, json={
 "chat_id": TG_CHAT_ID,
 "text": chunk,
 "parse_mode": "Markdown"
 }, timeout=10)
 if resp.status_code == 200:
 print(f"[TG] Sent ✓ ({len(chunk)} chars)")
 else:
 # Markdown
 resp2 = requests.post(url, json={
 "chat_id": TG_CHAT_ID,
 "text": chunk.replace("*", "").replace("_", ""),
 }, timeout=10)
 print(f"[TG] Sent plain ({'✓' if resp2.status_code == 200 else '✗'})")
 except Exception as e:
 print(f"[TG] Error: {e}")
 time.sleep(0.5)

def main():
 print(f"🔥 v2 — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")

 # 1. +
 tickers_raw = api_get("/fapi/v1/ticker/24hr")
 premiums_raw = api_get("/fapi/v1/premiumIndex")

 if not tickers_raw or not premiums_raw:
 print("❌ API")
 return

 ticker_map = {}
 for t in tickers_raw:
 if t["symbol"].endswith("USDT"):
 ticker_map[t["symbol"]] = {
 "px_chg": float(t["priceChangePercent"]),
 "vol": float(t["quoteVolume"]),
 "price": float(t["lastPrice"]),
 }

 funding_map = {}
 for p in premiums_raw:
 if p["symbol"].endswith("USDT"):
 funding_map[p["symbol"]] = float(p["lastFundingRate"])

 # 2. (API)
 mcap_map = {}
 try:
 r = requests.get(
 "https://www.binance.com/bapi/composite/v1/public/marketing/symbol/list",
 timeout=10
 )
 if r.status_code == 200:
 for item in r.json().get("data", []):
 name = item.get("name", "")
 mc = item.get("marketCap", 0)
 if name and mc:
 mcap_map[name] = float(mc)
 print(f"✅ : {len(mcap_map)}")
 except Exception as e:
 print(f"⚠️ API: {e}")

 # 3. : + CoinGecko Trending + 
 heat_map = {}
 cg_trending = set()
 square_trending = set()

 # 3a. (6H)— !=
 sq_coins = get_square_heat()
 if sq_coins:
 for i, c in enumerate(sq_coins):
 coin = c["coin"]
 square_trending.add(coin)
 # ,
 rank_score = max(50 - i * 4, 10)
 if c.get("rapidRiser"):
 rank_score += 15
 heat_map[coin] = heat_map.get(coin, 0) + rank_score
 print(f"🏦 : {len(square_trending)} {[c['coin'] for c in sq_coins[:5]]}")

 # 3b. CoinGecko Trending
 try:
 r = requests.get("https://api.coingecko.com/api/v3/search/trending", timeout=10)
 if r.status_code == 200:
 for item in r.json().get("coins", []):
 sym = item["item"]["symbol"].upper()
 rank = item["item"].get("score", 99)
 cg_trending.add(sym)
 heat_map[sym] = heat_map.get(sym, 0) + max(50 - rank * 3, 10)
 print(f"🌐 CG Trending: {len(cg_trending)}")
 except Exception as e:
 print(f"⚠️ CG Trending: {e}")

 # 
 vol_surge_coins = set()
 for sym, tk in ticker_map.items():
 coin = sym.replace("USDT", "")
 vol_24h = tk["vol"]
 if vol_24h > MIN_VOL_USD:
 kl = api_get("/fapi/v1/klines", {"symbol": sym, "interval": "1d", "limit": 8})
 if kl and len(kl) >= 5:
 avg_prev = sum(float(k[7]) for k in kl[:-1]) / (len(kl) - 1)
 if avg_prev > 0:
 ratio = vol_24h / avg_prev
 if ratio >= VOL_SURGE_MULT:
 vol_surge_coins.add(coin)
 heat_map[coin] = heat_map.get(coin, 0) + min(ratio * 10, 50)
 time.sleep(0.05)

 print(f"📈 (≥{VOL_SURGE_MULT}x): {len(vol_surge_coins)}")

 # /
 dual_heat = cg_trending & vol_surge_coins
 square_vol = square_trending & vol_surge_coins
 triple_heat = cg_trending & vol_surge_coins & square_trending
 
 all_multi_heat = dual_heat | square_vol
 if all_multi_heat:
 for coin in all_multi_heat:
 heat_map[coin] = heat_map.get(coin, 0) + 20
 if triple_heat:
 for coin in triple_heat:
 heat_map[coin] = heat_map.get(coin, 0) + 30 # 
 print(f"🔥🔥🔥 : {triple_heat}")
 else:
 print(f"🔥🔥 : {all_multi_heat}")

 # 4. OI(Top100 + )
 scan_syms = set()
 # 
 for coin in heat_map:
 sym = coin + "USDT"
 if sym in ticker_map:
 scan_syms.add(sym)
 # Top100
 top_by_vol = sorted(ticker_map.items(), key=lambda x: x[1]["vol"], reverse=True)[:100]
 for sym, _ in top_by_vol:
 scan_syms.add(sym)

 oi_map = {}
 for i, sym in enumerate(scan_syms):
 oi_hist = api_get("/futures/data/openInterestHist", {
 "symbol": sym, "period": "1h", "limit": 6
 })
 if oi_hist and len(oi_hist) >= 2:
 curr = float(oi_hist[-1]["sumOpenInterestValue"])
 prev_1h = float(oi_hist[-2]["sumOpenInterestValue"])
 prev_6h = float(oi_hist[0]["sumOpenInterestValue"])
 d1h = ((curr - prev_1h) / prev_1h * 100) if prev_1h > 0 else 0
 d6h = ((curr - prev_6h) / prev_6h * 100) if prev_6h > 0 else 0
 oi_map[sym] = {"oi_usd": curr, "d1h": d1h, "d6h": d6h}
 if (i + 1) % 10 == 0:
 time.sleep(0.5)

 print(f"📊 OI: {len(oi_map)}")

 # 5. 
 all_syms = set(list(ticker_map.keys()))
 coin_data = {}
 for sym in all_syms:
 tk = ticker_map.get(sym, {})
 if not tk:
 continue
 oi = oi_map.get(sym, {})
 fr = funding_map.get(sym, 0)
 coin = sym.replace("USDT", "")

 d6h = oi.get("d6h", 0)
 fr_pct = fr * 100
 oi_usd = oi.get("oi_usd", 0)

 # ,fallback
 if coin in mcap_map:
 est_mcap = mcap_map[coin]
 else:
 est_mcap = max(tk["vol"] * 0.3, oi_usd * 2) if oi_usd > 0 else tk["vol"] * 0.3

 heat = heat_map.get(coin, 0)

 coin_data[sym] = {
 "coin": coin, "sym": sym,
 "px_chg": tk["px_chg"], "vol": tk["vol"],
 "fr_pct": fr_pct, "d6h": d6h,
 "oi_usd": oi_usd, "est_mcap": est_mcap,
 "heat": heat,
 "in_cg": coin in cg_trending,
 "in_sq": coin in square_trending,
 "vol_surge": coin in vol_surge_coins,
 }

 # ═══════════════════════════════════════
 # 
 # ═══════════════════════════════════════
 hot_coins = sorted(
 [d for d in coin_data.values() if d["heat"] > 0],
 key=lambda x: x["heat"], reverse=True
 )

 # 
 heat_history = {}
 if HEAT_HISTORY_FILE.exists():
 try:
 heat_history = json.loads(HEAT_HISTORY_FILE.read_text())
 except:
 pass

 now_ts = datetime.now(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
 new_entries = [] # 
 for s in hot_coins:
 coin = s["coin"]
 if coin not in heat_history:
 # !
 heat_history[coin] = {"first_seen": now_ts, "price": s.get("px_chg", 0)}
 sources = []
 if s["in_sq"]: sources.append("")
 if s["in_cg"]: sources.append("CG")
 if s["vol_surge"]: sources.append("")
 new_entries.append({"coin": coin, "sources": sources, "data": s})

 # 7()
 cutoff = (datetime.now(timezone(timedelta(hours=8))) - timedelta(days=7)).strftime("%Y-%m-%d")
 heat_history = {k: v for k, v in heat_history.items()
 if v.get("first_seen", "9999") >= cutoff}

 # 
 HEAT_HISTORY_FILE.write_text(json.dumps(heat_history, indent=2, ensure_ascii=False))

 # ═══════════════════════════════════════
 # :+
 # ═══════════════════════════════════════
 chase = []
 for sym, d in coin_data.items():
 if d["px_chg"] > 3 and d["fr_pct"] < -0.005 and d["vol"] > 1_000_000:
 fr_hist = api_get("/fapi/v1/fundingRate", {"symbol": sym, "limit": 5})
 fr_rates = [float(f["fundingRate"]) * 100 for f in fr_hist] if fr_hist else [d["fr_pct"]]
 fr_prev = fr_rates[-2] if len(fr_rates) >= 2 else d["fr_pct"]
 fr_delta = d["fr_pct"] - fr_prev

 trend = "" if fr_delta < -0.05 else "" if fr_delta < -0.01 else "" if abs(fr_delta) < 0.01 else ""

 chase.append({**d, "fr_delta": fr_delta, "trend": trend,
 "rates": " → ".join([f"{x:.3f}" for x in fr_rates[-3:]])})
 time.sleep(0.2)

 chase.sort(key=lambda x: x["fr_pct"])

 # ═══════════════════════════════════════
 # 
 # ═══════════════════════════════════════
 now = datetime.now(timezone(timedelta(hours=8)))
 lines = [
 f"****",
 f"{now.strftime('%Y-%m-%d %H:%M')} CST",
 ]

 # ()
 # 
 if new_entries:
 lines.append(f"\n**[ ]** ,")
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
 tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
 for e in new_entries:
 s = e["data"]
 src_str = "/".join(e["sources"])
 tbl.append(f"{s['coin']:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
 tbl.append("```")
 lines.append("\n".join(tbl))

 if hot_coins:
 lines.append(f"\n**[ ]**")
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>8} {'':>7} {''}")
 tbl.append(f"{'-'*10} {'-'*8} {'-'*7} {'-'*20}")
 for s in hot_coins[:10]:
 sources = []
 if s["in_sq"]: sources.append("")
 if s["in_cg"]: sources.append("CG")
 if s["vol_surge"]: sources.append("")
 extra = []
 if abs(s["d6h"]) >= 3: extra.append(f"OI{s['d6h']:+.0f}%")
 if s["fr_pct"] < -0.03: extra.append(f"{s['fr_pct']:.2f}%")
 src_str = "/".join(sources)
 if extra:
 src_str += " " + " ".join(extra)
 coin_name = s['coin']
 tbl.append(f"{coin_name:<10} {mcap_str(s['est_mcap']):>8} {s['px_chg']:>+6.0f}% {src_str}")
 tbl.append("```")
 lines.append("\n".join(tbl))
 else:
 lines.append("\n**[ ]** ")

 # ()
 lines.append(f"\n**[ ]** +=")
 if chase:
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>10} {'':>8} {'':>7} {'':>8}")
 tbl.append(f"{'-'*10} {'-'*10} {'-'*8} {'-'*7} {'-'*8}")
 for s in chase[:8]:
 tbl.append(
 f"{s['coin']:<10} {s['fr_pct']:>+9.3f}% {s['trend']:>8} {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
 )
 tbl.append("```")
 lines.append("\n".join(tbl))
 else:
 lines.append(" ")

 # OI()
 oi_alerts = []
 for sym, oi in oi_map.items():
 if abs(oi["d6h"]) >= 8:
 d = coin_data.get(sym)
 if d and d["heat"] == 0:
 oi_alerts.append(d)
 oi_alerts.sort(key=lambda x: abs(x["d6h"]), reverse=True)

 if oi_alerts:
 lines.append(f"\n**[ OI ]** 6>=8%")
 tbl = ["```"]
 tbl.append(f"{'':<10} {'':>4} {'OI':>8} {'':>7} {'':>8}")
 tbl.append(f"{'-'*10} {'-'*4} {'-'*8} {'-'*7} {'-'*8}")
 for s in oi_alerts[:6]:
 direction = "" if s["d6h"] > 0 else ""
 tbl.append(
 f"{s['coin']:<10} {direction:>4} {s['d6h']:>+7.1f}% {s['px_chg']:>+6.0f}% {mcap_str(s['est_mcap']):>7}"
 )
 tbl.append("```")
 lines.append("\n".join(tbl))

 # 
 highlights = []

 hot_oi = [d for d in coin_data.values() if d["heat"] > 0 and d["d6h"] > 5]
 for s in sorted(hot_oi, key=lambda x: x["d6h"], reverse=True)[:3]:
 highlights.append(f"{s['coin']} — +OI{s['d6h']:+.0f}%,")

 hot_fuel = [d for d in coin_data.values() if d["heat"] > 0 and d["fr_pct"] < -0.03]
 for s in sorted(hot_fuel, key=lambda x: x["fr_pct"])[:2]:
 if s["coin"] not in " ".join(highlights):
 highlights.append(f"{s['coin']} — +{s['fr_pct']:.2f}%,")

 chase_fire = [s for s in chase[:5] if "" in s.get("trend", "")]
 for s in chase_fire[:2]:
 if s["coin"] not in " ".join(highlights):
 highlights.append(f"{s['coin']}{s['fr_pct']:.3f}%,")

 if highlights:
 lines.append(f"\n**[ ]**")
 for h in highlights[:5]:
 lines.append(f" {h}")

 lines.append(f"\n= / CG=CoinGecko")
 lines.append(f"=,")

 report = "\n".join(lines)
 send_telegram(report)
 print("\n✅ ")

if __name__ == "__main__":
 main()

바이낸스 Alpha 모니터 #

게시일: 2026.04.23 태그: Python · Binance · Claude AI · Telegram

WebSocket 실시간 + AI 분석 + Telegram 푸시

바이낸스 Alpha 신규 상장을 자동 감지하고 Claude AI로 토큰 품질을 분석한 다음 Telegram으로 알림을 보냅니다.

전체 소스 코드 #

#!/usr/bin/env python3
"""
Binance Alpha Monitor v2 — 
REST + + + TG
API Key,AI()

: python3 alpha_monitor.py
"""

import asyncio
import hashlib
import json
import logging
import os
import re
import sqlite3
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional

import httpx

# ============================================================
# 
# ============================================================

BASE_DIR = Path(__file__).parent
DB_PATH = str(BASE_DIR / "data" / "alpha.db")

# TG
TG_BOT_TOKEN = os.environ.get("TG_BOT_TOKEN", "")
TG_CHAT_ID = os.environ.get("TG_CHAT_ID", "")

# LLM (,,)
# hermes auth.jsoncredential pool(scanner)
def _load_anthropic_key():
 """auth.jsonAPI key"""
 # 1. hermes auth.json credential pool
 try:
 auth_file = os.path.expanduser("~/.hermes/auth.json")
 if os.path.exists(auth_file):
 with open(auth_file) as f:
 auth = json.load(f)
 pool = auth.get("credential_pool", {}).get("anthropic", [])
 if pool:
 key = pool[0].get("access_token", "")
 if key and key != "***":
 return key
 except Exception:
 pass
 # 2. fallback
 return os.environ.get("ANTHROPIC_API_KEY", "")

ANTHROPIC_API_KEY = _load_anthropic_key()
ANTHROPIC_BASE_URL = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")

# 
ANNOUNCEMENT_POLL_INTERVAL = 30 # 30
AGGREGATION_POLL_INTERVAL = 15 # 15
MONITOR_POLL_INTERVAL = 120 # 2

# HTTP
HEADERS = {
 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
 "Accept": "application/json",
 "Accept-Language": "en-US,en;q=0.9",
}

BINANCE_ANNOUNCEMENT_API = "https://www.binance.com/bapi/composite/v1/public/cms/article/list/query"

# ============================================================
# 
# ============================================================

logging.basicConfig(
 level=logging.INFO,
 format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
 datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("alpha")

# ============================================================
# 
# ============================================================

# — 
TRIGGER_KEYWORDS = [
 "alpha", "", "airdrop", "tge", "token generation",
 "", "will list", "will launch",
 "", "exclusive", "binance wallet", "hodler",
]

# — 
EXCLUDE_KEYWORDS = [
 "delisting", "delist", "", "deprecate", "",
 "maintenance", "",
 "launchpool", "megadrop",
 "buyback", "",
 "", "",
 "perpetual contract", # 
 "futures will launch", # 
 "usdⓢ-margined", # U
 "coin-margined", # 
 "margin will add", # 
 "trading bots services", # 
 "trading pairs", # ()
]

# Alpha Box 
ALPHA_BOX_KEYWORDS = ["alpha box", "", "mystery box"]

# ============================================================
# VC 
# ============================================================

TIER1_VCS = [
 "binance labs", "yzi labs",
 "coinbase ventures", "a16z", "andreessen horowitz", "paradigm",
 "polychain", "polychain capital", "sequoia", "sequoia china", "sequoia capital",
 "multicoin", "multicoin capital", "pantera", "pantera capital",
 "dragonfly", "dragonfly capital", "founders fund",
]

TIER2_VCS = [
 "abcde", "iosg", "hashkey", "okx ventures",
 "sevenx", "folius", "foresight", "hashed",
 "bitkraft", "framework", "framework ventures",
 "delphi", "delphi digital", "electric capital",
 "variant", "1kx", "placeholder",
 "animoca", "animoca brands", "jump", "jump crypto",
 "hack vc", "bain capital",
]

# 
HOT_NARRATIVES = ["defi_perp", "ai_agent", "ai_defi", "defai", "zk_proof"]
WEAK_NARRATIVES = ["gamefi", "meme", "social"]

# 
BINANCE_DARLING_KEYWORDS = ["yzi labs", "binance labs"]

# ============================================================
# 
# ============================================================

TIER_ICONS = {"S": "🟢🟢🟢", "A": "🟡🟡", "B": "🟠", "C": "⚪"}
TIER_LABELS = {"S": "S ()", "A": "A ()", "B": "B ()", "C": "C ()"}

def count_vc_tier(vcs: list, vc_list: list) -> int:
 count = 0
 vcs_lower = [v.lower() for v in vcs]
 for t in vc_list:
 if any(t in v for v in vcs_lower):
 count += 1
 return count

def rate_project(circ_mcap: float, fdv: float, vcs: list,
 narrative: str, is_darling: bool) -> dict:
 """S/A/B/C """
 t1 = count_vc_tier(vcs, TIER1_VCS)
 t2 = count_vc_tier(vcs, TIER2_VCS)
 hot = narrative in HOT_NARRATIVES
 weak = narrative in WEAK_NARRATIVES
 circ_mcap = circ_mcap or 0
 fdv = fdv or 0

 warnings = []
 if weak:
 warnings.append(f"⚠️ {narrative} ")

 # S 5
 if is_darling:
 return {"tier": "S", "reason": "(YZi/Binance Labs/CZ)", "warnings": warnings}
 if hot and t1 >= 1 and fdv < 500_000_000:
 return {"tier": "S", "reason": f"({narrative})+ Tier1 VC", "warnings": warnings}
 if t1 >= 2 and circ_mcap < 50_000_000 and fdv < 300_000_000:
 return {"tier": "S", "reason": "≥2 Tier1 ", "warnings": warnings}
 if t1 >= 1 and circ_mcap < 10_000_000 and fdv < 100_000_000:
 return {"tier": "S", "reason": "Tier1 ", "warnings": warnings}
 if hot and circ_mcap < 10_000_000 and fdv < 50_000_000:
 return {"tier": "S", "reason": f"({narrative})", "warnings": warnings}

 # A
 if t1 >= 1 and circ_mcap < 20_000_000 and fdv < 200_000_000:
 return {"tier": "A", "reason": "Tier1 ", "warnings": warnings}

 # B
 if circ_mcap < 50_000_000 and fdv < 500_000_000:
 return {"tier": "B", "reason": "", "warnings": warnings}

 return {"tier": "C", "reason": "/", "warnings": warnings}

# ============================================================
# 
# ============================================================

def init_db():
 Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
 conn = sqlite3.connect(DB_PATH)
 c = conn.cursor()
 c.execute("""
 CREATE TABLE IF NOT EXISTS projects (
 id TEXT PRIMARY KEY,
 symbol TEXT NOT NULL,
 name TEXT,
 launch_time TEXT,
 source TEXT,
 raw_text TEXT,
 tier TEXT DEFAULT 'PENDING',
 tier_reason TEXT,
 narrative TEXT,
 narrative_desc TEXT,
 vcs_json TEXT DEFAULT '[]',
 is_darling INTEGER DEFAULT 0,
 open_price REAL,
 total_supply REAL,
 circulating_supply REAL,
 fdv REAL,
 circulating_mcap REAL,
 excluded INTEGER DEFAULT 0,
 exclude_reason TEXT,
 discovered_at TEXT,
 updated_at TEXT
 )""")
 c.execute("""
 CREATE TABLE IF NOT EXISTS pushes (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 project_id TEXT NOT NULL,
 push_type TEXT,
 sent_at TEXT,
 content TEXT
 )""")
 c.execute("""
 CREATE TABLE IF NOT EXISTS snapshots (
 id INTEGER PRIMARY KEY AUTOINCREMENT,
 project_id TEXT NOT NULL,
 timestamp TEXT NOT NULL,
 price REAL,
 circulating_mcap REAL,
 fdv REAL
 )""")
 conn.commit()
 conn.close()

def project_id(symbol: str, date_str: str) -> str:
 return hashlib.md5(f"{symbol.upper()}_{date_str}".encode()).hexdigest()[:16]

def project_exists(pid: str) -> bool:
 conn = sqlite3.connect(DB_PATH)
 exists = conn.execute("SELECT 1 FROM projects WHERE id=?", (pid,)).fetchone() is not None
 conn.close()
 return exists

def save_project(project: dict):
 conn = sqlite3.connect(DB_PATH)
 now = datetime.utcnow().isoformat()
 conn.execute("""
 INSERT OR IGNORE INTO projects
 (id, symbol, name, launch_time, source, raw_text, tier, tier_reason,
 narrative, narrative_desc, vcs_json, is_darling,
 open_price, total_supply, circulating_supply, fdv, circulating_mcap,
 excluded, exclude_reason, discovered_at, updated_at)
 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
 """, (
 project["id"], project["symbol"], project.get("name"),
 project.get("launch_time"), project.get("source"), project.get("raw_text"),
 project.get("tier", "PENDING"), project.get("tier_reason"),
 project.get("narrative"), project.get("narrative_desc"),
 json.dumps(project.get("vcs", [])), int(project.get("is_darling", False)),
 project.get("open_price"), project.get("total_supply"),
 project.get("circulating_supply"), project.get("fdv"),
 project.get("circulating_mcap"),
 int(project.get("excluded", 0)), project.get("exclude_reason"),
 now, now,
 ))
 conn.commit()
 conn.close()

def update_project(pid: str, fields: dict):
 if not fields:
 return
 conn = sqlite3.connect(DB_PATH)
 fields["updated_at"] = datetime.utcnow().isoformat()
 set_parts = [f"{k}=?" for k in fields]
 values = list(fields.values()) + [pid]
 conn.execute(f"UPDATE projects SET {','.join(set_parts)} WHERE id=?", values)
 conn.commit()
 conn.close()

def get_project(pid: str) -> Optional[dict]:
 conn = sqlite3.connect(DB_PATH)
 conn.row_factory = sqlite3.Row
 row = conn.execute("SELECT * FROM projects WHERE id=?", (pid,)).fetchone()
 conn.close()
 return dict(row) if row else None

def list_pending() -> list:
 conn = sqlite3.connect(DB_PATH)
 conn.row_factory = sqlite3.Row
 rows = conn.execute(
 "SELECT * FROM projects WHERE excluded=0 AND tier='PENDING' ORDER BY discovered_at"
 ).fetchall()
 conn.close()
 return [dict(r) for r in rows]

def list_active() -> list:
 """(PENDINGEXCLUDED,launch_time)"""
 conn = sqlite3.connect(DB_PATH)
 conn.row_factory = sqlite3.Row
 rows = conn.execute("""
 SELECT * FROM projects
 WHERE excluded=0 AND launch_time IS NOT NULL AND launch_time != ''
 AND tier NOT IN ('PENDING', 'EXCLUDED', 'ERROR')
 """).fetchall()
 conn.close()
 return [dict(r) for r in rows]

def has_pushed(pid: str, push_type: str) -> bool:
 conn = sqlite3.connect(DB_PATH)
 exists = conn.execute(
 "SELECT 1 FROM pushes WHERE project_id=? AND push_type=?", (pid, push_type)
 ).fetchone() is not None
 conn.close()
 return exists

def log_push(pid: str, push_type: str, content: str):
 conn = sqlite3.connect(DB_PATH)
 conn.execute(
 "INSERT INTO pushes (project_id, push_type, sent_at, content) VALUES (?,?,?,?)",
 (pid, push_type, datetime.utcnow().isoformat(), content)
 )
 conn.commit()
 conn.close()

def save_snapshot(pid: str, price: float, mcap: float, fdv: float):
 conn = sqlite3.connect(DB_PATH)
 conn.execute(
 "INSERT INTO snapshots (project_id, timestamp, price, circulating_mcap, fdv) VALUES (?,?,?,?,?)",
 (pid, datetime.utcnow().isoformat(), price, mcap, fdv)
 )
 conn.commit()
 conn.close()

# ============================================================
# TG 
# ============================================================

async def send_tg(text: str, silent: bool = False) -> bool:
 if not TG_BOT_TOKEN or not TG_CHAT_ID:
 logger.error("TG_BOT_TOKEN TG_CHAT_ID ")
 return False
 url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
 payload = {
 "chat_id": TG_CHAT_ID,
 "text": text,
 "parse_mode": "HTML",
 "disable_notification": silent,
 }
 try:
 async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
 resp = await client.post(url, json=payload)
 if resp.status_code != 200:
 logger.error(f"TG {resp.status_code}: {resp.text[:200]}")
 return False
 return True
 except Exception as e:
 logger.error(f"TG: {e}")
 return False

# ============================================================
# 
# ============================================================

def is_trigger(title: str) -> tuple[bool, Optional[str]]:
 t = title.lower()
 for kw in EXCLUDE_KEYWORDS:
 if kw.lower() in t:
 return False, f": {kw}"
 for kw in ALPHA_BOX_KEYWORDS:
 if kw.lower() in t:
 return False, "Alpha Box "
 for kw in TRIGGER_KEYWORDS:
 if kw.lower() in t:
 return True, None
 return False, None

def extract_symbol(title: str) -> Optional[str]:
 # : "Chip (CHIP)"
 m = re.search(r"\(([A-Z0-9]{2,10})\)", title)
 if m:
 return m.group(1)
 # 
 m = re.search(r"(([A-Z0-9]{2,10}))", title)
 if m:
 return m.group(1)
 return None

def extract_name(title: str) -> Optional[str]:
 patterns = [
 r"(?:|List|list|Launch|launch|featured)\s+([A-Za-z0-9 ]+?)\s*[\((]",
 ]
 for p in patterns:
 m = re.search(p, title, re.IGNORECASE)
 if m:
 return m.group(1).strip()
 return None

# ============================================================
# 
# ============================================================

async def fetch_announcements(limit: int = 20) -> list:
 """"""
 all_articles = []
 # 48: New Cryptocurrency Listing, 161: Latest Activities, 93: Latest News
 for catalog_id in [48, 161, 93]:
 params = {"type": 1, "catalogId": catalog_id, "pageNo": 1, "pageSize": limit}
 try:
 async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
 resp = await client.get(BINANCE_ANNOUNCEMENT_API, params=params)
 resp.raise_for_status()
 data = resp.json()
 for catalog in data.get("data", {}).get("catalogs", []):
 for a in catalog.get("articles", []):
 a["_catalog_id"] = catalog_id
 all_articles.append(a)
 except Exception as e:
 logger.warning(f" {catalog_id} : {e}")

 # 
 seen = set()
 unique = []
 for a in all_articles:
 code = a.get("code")
 if code and code not in seen:
 seen.add(code)
 unique.append(a)
 return unique

# ============================================================
# CoinGecko 
# ============================================================

async def fetch_coingecko(symbol: str, project_name: str = "") -> dict:
 """CoinGecko。:symbol + """
 result = {"found": False, "price": None, "fdv": None, "mcap": None,
 "total_supply": None, "circ_supply": None, "chain": None, "contract": None}
 try:
 async with httpx.AsyncClient(timeout=15, headers=HEADERS) as client:
 # :symbol,
 queries = [symbol]
 if project_name and project_name.upper() != symbol.upper():
 queries.append(project_name)

 coin_id = None
 best_rank = 999999
 name_exact_match = None # 

 for query in queries:
 resp = await client.get("https://api.coingecko.com/api/v3/search",
 params={"query": query})
 if resp.status_code != 200:
 continue
 coins = resp.json().get("coins", [])

 for c in coins:
 c_sym = c.get("symbol", "").upper()
 c_name = c.get("name", "").lower()
 c_rank = c.get("market_cap_rank") or 999999

 # (,MegaETH -> megaeth)
 if project_name and c_name == project_name.lower():
 name_exact_match = c["id"]

 # symbol — market_cap_rank()
 if c_sym == symbol.upper():
 if c_rank < best_rank:
 coin_id = c["id"]
 best_rank = c_rank

 # (MegaETHmega)
 if project_name and project_name.lower() in c_name:
 if c_rank < best_rank:
 coin_id = c["id"]
 best_rank = c_rank

 # > 
 if name_exact_match:
 coin_id = name_exact_match

 if not coin_id:
 logger.info(f"CoinGecko {symbol}/{project_name}")
 return result

 logger.info(f"CoinGecko: {symbol} -> {coin_id} (rank={best_rank})")

 resp2 = await client.get(
 f"https://api.coingecko.com/api/v3/coins/{coin_id}",
 params={"localization": "false", "tickers": "false",
 "market_data": "true", "community_data": "false",
 "developer_data": "false"}
 )
 if resp2.status_code == 429:
 await asyncio.sleep(5)
 resp2 = await client.get(
 f"https://api.coingecko.com/api/v3/coins/{coin_id}",
 params={"localization": "false", "tickers": "false",
 "market_data": "true", "community_data": "false",
 "developer_data": "false"}
 )
 if resp2.status_code != 200:
 return result
 d = resp2.json()
 md = d.get("market_data", {})
 result.update({
 "found": True,
 "price": (md.get("current_price") or {}).get("usd"),
 "fdv": (md.get("fully_diluted_valuation") or {}).get("usd"),
 "mcap": (md.get("market_cap") or {}).get("usd"),
 "total_supply": md.get("total_supply"),
 "circ_supply": md.get("circulating_supply"),
 })
 # categories(VC"YZi Labs Portfolio")description
 result["categories"] = d.get("categories", [])
 result["description"] = (d.get("description") or {}).get("en", "")[:500]
 platforms = d.get("platforms", {})
 for chain, addr in platforms.items():
 if addr:
 result["chain"] = chain
 result["contract"] = addr
 break

 # CoinGecko(),/
 if not result["price"]:
 binance_price = await _fetch_binance_price(symbol, client)
 if binance_price:
 result["price"] = binance_price
 ts = result.get("total_supply") or 0
 cs = result.get("circ_supply") or 0
 if ts > 0:
 result["fdv"] = binance_price * ts
 if cs > 0:
 result["mcap"] = binance_price * cs
 logger.info(f" {symbol}: ${binance_price}, FDV=${result.get('fdv',0):,.0f}")

 except Exception as e:
 logger.warning(f"CoinGecko {symbol}: {e}")
 return result

async def _fetch_binance_price(symbol: str, client: httpx.AsyncClient) -> float:
 """"""
 pair = f"{symbol.upper()}USDT"
 # 1. 
 try:
 resp = await client.get(f"https://api.binance.com/api/v3/ticker/price",
 params={"symbol": pair})
 if resp.status_code == 200:
 return float(resp.json()["price"])
 except Exception:
 pass
 # 2. 
 try:
 resp = await client.get(f"https://fapi.binance.com/fapi/v1/ticker/price",
 params={"symbol": pair})
 if resp.status_code == 200:
 return float(resp.json()["price"])
 except Exception:
 pass
 return 0.0

# ============================================================
# LLM (,)
# ============================================================

async def llm_extract(raw_text: str, symbol: str, name: str = "", cg_data: dict = None) -> dict:
 """LLM+CoinGecko/VC/"""
 fallback = {
 "narrative": "unknown", "narrative_desc": "",
 "vcs": [], "is_darling": False, "exclude_reason": None,
 }

 # CoinGecko categories
 cg_data = cg_data or {}
 categories = cg_data.get("categories", [])
 description = cg_data.get("description", "")

 # (categories)
 darling_cats = [c for c in categories if any(kw in c.lower() for kw in ["yzi labs", "binance labs"])]
 if darling_cats:
 fallback["is_darling"] = True

 if not ANTHROPIC_API_KEY:
 # :+categories
 t = raw_text.lower()
 for kw in BINANCE_DARLING_KEYWORDS:
 if kw in t:
 fallback["is_darling"] = True
 # categories
 cat_str = " ".join(categories).lower()
 if "defi" in cat_str: fallback["narrative"] = "defi"
 elif "ai" in cat_str: fallback["narrative"] = "ai_agent"
 elif "gaming" in cat_str or "gamefi" in cat_str: fallback["narrative"] = "gamefi"
 elif "meme" in cat_str: fallback["narrative"] = "meme"
 elif "rwa" in cat_str or "real world" in cat_str: fallback["narrative"] = "rwa"
 return fallback

 # 
 extra_context = ""
 if categories:
 extra_context += f"\nCoinGecko: {', '.join(categories)}"
 if description:
 extra_context += f"\n: {description[:300]}"
 if cg_data.get("found"):
 extra_context += f"\n: FDV=${cg_data.get('fdv',0):,.0f}, MCap=${cg_data.get('mcap',0):,.0f}, =${cg_data.get('price',0)}"
 if cg_data.get("chain"):
 extra_context += f", ={cg_data['chain']}"

 system = ",。JSON,。"
 user = f""":
: {symbol}, : {name or ""}
: {raw_text}
{extra_context}

JSON:
{{
 "narrative": "defi_perp|ai_agent|ai_defi|defai|zk_proof|infra|defi|rwa|gamefi|meme|social|stablecoin|unknown",
 "narrative_desc": "、",
 "vcs": ["CoinGecko"],
 "is_darling": true/false,
 "exclude_reason": null|"already_tge"|"meme_only"
}}

:
- narrative: 
- vcs: CoinGecko "XXX Portfolio" XXX
- is_darling: YZi Labs/Binance Labs CZ/ true
- exclude_reason: CEX(Coinbase/OKX/Bybit)3"already_tge"。DEX,already_tge。CoinGeckoalready_tge。meme"meme_only"
"""

 try:
 async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
 resp = await client.post(
 f"{ANTHROPIC_BASE_URL.rstrip('/')}/v1/messages",
 headers={
 "x-api-key": ANTHROPIC_API_KEY,
 "anthropic-version": "2023-06-01",
 "content-type": "application/json",
 },
 json={
 "model": ANTHROPIC_MODEL,
 "max_tokens": 800,
 "temperature": 0,
 "system": system,
 "messages": [{"role": "user", "content": user}],
 }
 )
 if resp.status_code != 200:
 logger.warning(f"LLM {resp.status_code}")
 return fallback
 data = resp.json()
 text = ""
 for block in data.get("content", []):
 if block.get("type") == "text":
 text = block.get("text", "")
 break
 text = text.strip()
 if text.startswith("```"):
 lines = text.split("\n")
 text = "\n".join(lines[1:-1])
 return json.loads(text)
 except Exception as e:
 logger.warning(f"LLM: {e}")
 return fallback

# ============================================================
# 
# ============================================================

def _fmt_mcap(v):
 if not v:
 return "N/A"
 if v >= 1e9:
 return f"${v/1e9:.1f}B"
 if v >= 1e6:
 return f"${v/1e6:.1f}M"
 if v >= 1e3:
 return f"${v/1e3:.0f}K"
 return f"${v:.0f}"

def _fmt_price(v):
 if not v:
 return "N/A"
 if v >= 1:
 return f"${v:.2f}"
 if v >= 0.01:
 return f"${v:.4f}"
 return f"${v:.6f}"

def fmt_discovery(p: dict) -> str:
 tier = p.get("tier", "C")
 icon = TIER_ICONS.get(tier, "⚪")
 label = TIER_LABELS.get(tier, "")
 symbol = p["symbol"]
 name = p.get("name") or ""
 vcs = json.loads(p.get("vcs_json", "[]")) if isinstance(p.get("vcs_json"), str) else p.get("vcs", [])

 lines = [
 f"{icon} <b>Alpha · ${symbol}</b> {icon}",
 f"📋 {label}",
 "",
 f"<b>{name}</b>" if name else "",
 ]

 if p.get("narrative_desc"):
 lines.append(f"💡 {p['narrative_desc']}")
 if p.get("narrative") and p["narrative"] != "unknown":
 lines.append(f"🏷 : {p['narrative']}")
 lines.append("")

 if p.get("fdv"):
 lines.append(f"📊 FDV: {_fmt_mcap(p['fdv'])}")
 if p.get("circulating_mcap"):
 lines.append(f"📊 : {_fmt_mcap(p['circulating_mcap'])}")
 if p.get("open_price"):
 lines.append(f"💰 : {_fmt_price(p['open_price'])}")
 if p.get("total_supply") and p.get("circulating_supply"):
 pct = p["circulating_supply"] / p["total_supply"] * 100
 lines.append(f"📦 : {pct:.1f}%")

 if vcs:
 lines.append("")
 lines.append("🏛 <b></b>")
 for v in vcs[:5]:
 is_t1 = any(t in v.lower() for t in TIER1_VCS)
 lines.append(f" {'⭐' if is_t1 else '·'} {v}")

 if p.get("is_darling"):
 lines.append("")
 lines.append("🔥 <b></b>")

 if p.get("tier_reason"):
 lines.append("")
 lines.append(f"🎯 {p['tier_reason']}")

 lines.append("")
 lines.append(f"<i>📌 : {p.get('source', 'binance')}</i>")
 if p.get("raw_text"):
 lines.append(f"<i>{p['raw_text'][:120]}</i>")

 return "\n".join(l for l in lines if l is not None)

def fmt_countdown(p: dict, minutes: int) -> str:
 icon = TIER_ICONS.get(p.get("tier", "C"), "⚪")
 t = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
 lines = [
 f"{icon} <b></b>",
 f"<b>${p['symbol']}</b> · {p.get('name', '')}",
 f"⏰ <b>{t}</b>",
 ]
 if p.get("fdv"):
 lines.append(f"FDV: {_fmt_mcap(p['fdv'])}")
 if minutes <= 30:
 lines.append("🔔 <b></b>")
 return "\n".join(lines)

def fmt_launch(p: dict, price: float, mcap: float, fdv: float) -> str:
 lines = [
 f"🚀 <b>${p['symbol']} </b>",
 f": <b>{_fmt_price(price)}</b>",
 f": <b>{_fmt_mcap(mcap)}</b>",
 f"FDV: <b>{_fmt_mcap(fdv)}</b>",
 ]
 return "\n".join(lines)

def fmt_periodic(p: dict, idx: int, price: float, mcap: float, change_pct: float) -> str:
 arrow = "📈" if change_pct > 0 else "📉"
 minutes = 30 * idx
 lines = [
 f"⏱ <b>${p['symbol']} · +{minutes}min</b>",
 f": {_fmt_mcap(mcap)} ({arrow} {change_pct:+.1f}%)",
 f": {_fmt_price(price)}",
 ]
 if change_pct >= 100:
 lines.append("💡 <b>,</b>")
 elif change_pct <= -30:
 lines.append("⚠️ ,")
 return "\n".join(lines)

def fmt_anomaly(p: dict, atype: str, price: float, change_pct: float) -> str:
 emoji = {"double": "🚀", "halve": "🔻"}.get(atype, "⚡")
 desc = {"double": "", "halve": ""}.get(atype, "")
 return f"{emoji} <b>${p['symbol']} {desc}</b>\n: {change_pct:+.1f}%\n: {_fmt_price(price)}"

# ============================================================
# : 
# ============================================================

async def announcement_listener():
 """,Alpha"""
 logger.info(f"📡 · {ANNOUNCEMENT_POLL_INTERVAL}s")
 while True:
 try:
 articles = await fetch_announcements()
 new_count = 0
 for art in articles:
 title = art.get("title", "")
 triggered, reason = is_trigger(title)
 if not triggered:
 continue

 symbol = extract_symbol(title)
 if not symbol:
 continue

 # 
 release_ts = art.get("releaseDate")
 release_iso = datetime.fromtimestamp(release_ts / 1000).isoformat() if release_ts else ""
 launch_date = release_iso[:10] if release_iso else datetime.utcnow().date().isoformat()

 pid = project_id(symbol, launch_date)
 if project_exists(pid):
 continue

 project = {
 "id": pid,
 "symbol": symbol,
 "name": extract_name(title),
 "launch_time": release_iso,
 "source": "binance_announcement",
 "raw_text": title,
 "tier": "PENDING",
 "vcs": [],
 "is_darling": False,
 "excluded": 0,
 }
 save_project(project)
 new_count += 1
 logger.info(f"🆕 ${symbol}: {title[:80]}")

 if new_count:
 logger.info(f" {new_count} ")
 except Exception as e:
 logger.error(f": {e}", exc_info=True)

 await asyncio.sleep(ANNOUNCEMENT_POLL_INTERVAL)

# ============================================================
# : + 
# ============================================================

async def aggregation_worker():
 """PENDING、、"""
 logger.info(f"🧠 · {AGGREGATION_POLL_INTERVAL}s")
 while True:
 try:
 pending = list_pending()
 for p in pending:
 symbol = p["symbol"]
 try:
 logger.info(f"📦 ${symbol}")

 # 1. CoinGecko
 cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
 await asyncio.sleep(1) # 

 # 2. LLM(CoinGecko)
 llm = await llm_extract(p.get("raw_text", ""), symbol, p.get("name"), cg_data=cg)
 await asyncio.sleep(1)

 # 3. 
 if llm.get("exclude_reason") in ("already_tge", "meme_only"):
 update_project(p["id"], {
 "excluded": 1,
 "exclude_reason": llm["exclude_reason"],
 "tier": "EXCLUDED",
 })
 logger.info(f"⏭ ${symbol} : {llm['exclude_reason']}")
 continue

 # 4. — CoinGecko
 cg_fdv = cg.get("fdv", 0) or 0
 cg_mcap = cg.get("mcap", 0) or 0
 data_suspect = False
 data_warnings = []

 # FDV$100K(CoinGecko)
 if cg.get("found") and cg_fdv > 0 and cg_fdv < 100_000:
 data_suspect = True
 data_warnings.append(f"FDV=${cg_fdv:,.0f},CoinGecko")
 logger.warning(f"⚠️ {symbol} FDV=${cg_fdv:,.0f} ,")

 # 100%(100%)
 if (cg.get("circ_supply") and cg.get("total_supply")
 and cg["circ_supply"] == cg["total_supply"]
 and cg_fdv < 1_000_000):
 data_suspect = True
 data_warnings.append("=FDV,")

 if data_suspect:
 # ,CoinGeckoFDV/MCap
 cg_fdv = 0
 cg_mcap = 0
 logger.warning(f"⚠️ {symbol} CoinGecko,LLM")

 # 5. 
 is_darling = llm.get("is_darling", False)
 vcs = llm.get("vcs", [])
 narrative = llm.get("narrative", "unknown")
 rating = rate_project(
 cg_mcap, cg_fdv,
 vcs, narrative, is_darling
 )

 # 6. DB(data_suspectCoinGecko)
 update_project(p["id"], {
 "tier": rating["tier"],
 "tier_reason": rating["reason"],
 "narrative": narrative,
 "narrative_desc": llm.get("narrative_desc", ""),
 "vcs_json": json.dumps(vcs),
 "is_darling": int(is_darling),
 "open_price": cg.get("price") if not data_suspect else None,
 "total_supply": cg.get("total_supply") if not data_suspect else None,
 "circulating_supply": cg.get("circ_supply") if not data_suspect else None,
 "fdv": cg_fdv if cg_fdv else None,
 "circulating_mcap": cg_mcap if cg_mcap else None,
 })

 # 6. discovery
 full = get_project(p["id"])
 if full and not has_pushed(p["id"], "discovery"):
 text = fmt_discovery(full)
 silent = rating["tier"] in ("B", "C")
 ok = await send_tg(text, silent=silent)
 if ok:
 log_push(p["id"], "discovery", text)
 logger.info(f"✅ ${symbol} [{rating['tier']}]")

 except Exception as e:
 logger.error(f" {symbol} : {e}", exc_info=True)
 update_project(p["id"], {"tier": "ERROR", "tier_reason": str(e)[:100]})

 except Exception as e:
 logger.error(f": {e}", exc_info=True)

 await asyncio.sleep(AGGREGATION_POLL_INTERVAL)

# ============================================================
# : 
# ============================================================

async def post_launch_monitor():
 """ + + 30min×4 + """
 logger.info(f"📊 · {MONITOR_POLL_INTERVAL}s")
 while True:
 try:
 projects = list_active()
 for p in projects:
 try:
 await _monitor_project(p)
 except Exception as e:
 logger.error(f" {p['symbol']} : {e}")
 except Exception as e:
 logger.error(f": {e}", exc_info=True)

 await asyncio.sleep(MONITOR_POLL_INTERVAL)

async def _monitor_project(p: dict):
 pid = p["id"]
 symbol = p["symbol"]
 launch_str = p.get("launch_time", "")
 if not launch_str:
 return

 try:
 launch = datetime.fromisoformat(launch_str.replace("Z", "").split("+")[0])
 except:
 return

 now = datetime.utcnow()
 delta_sec = (launch - now).total_seconds()

 # T-3h
 if 3*3600 - 300 <= delta_sec <= 3*3600 + 300:
 if not has_pushed(pid, "t_minus_3h"):
 text = fmt_countdown(p, int(delta_sec / 60))
 ok = await send_tg(text, silent=p.get("tier") in ("B", "C"))
 if ok:
 log_push(pid, "t_minus_3h", text)

 # T-30m
 elif 30*60 - 150 <= delta_sec <= 30*60 + 150:
 if not has_pushed(pid, "t_minus_30m"):
 text = fmt_countdown(p, int(delta_sec / 60))
 ok = await send_tg(text, silent=False)
 if ok:
 log_push(pid, "t_minus_30m", text)

 # 
 elif -300 <= delta_sec <= 0:
 if not has_pushed(pid, "at_launch"):
 cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
 if cg.get("price"):
 text = fmt_launch(p, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))
 ok = await send_tg(text, silent=False)
 if ok:
 log_push(pid, "at_launch", text)
 save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))

 # 30min × 4
 elif 0 < -delta_sec <= 2.5 * 3600:
 minutes_after = int(-delta_sec / 60)
 for idx, target in enumerate([30, 60, 90, 120], 1):
 if abs(minutes_after - target) <= 5:
 ptype = f"post_30m_{idx}"
 if not has_pushed(pid, ptype):
 cg = await fetch_coingecko(symbol, project_name=p.get('name', ''))
 if cg.get("price"):
 open_price = p.get("open_price") or cg["price"]
 change = ((cg["price"] - open_price) / open_price * 100) if open_price else 0
 text = fmt_periodic(p, idx, cg["price"], cg.get("mcap", 0), change)
 ok = await send_tg(text, silent=p.get("tier") in ("B", "C") and idx > 1)
 if ok:
 log_push(pid, ptype, text)
 save_snapshot(pid, cg["price"], cg.get("mcap", 0), cg.get("fdv", 0))

 # 
 if change >= 100 and not has_pushed(pid, "anomaly_double"):
 t = fmt_anomaly(p, "double", cg["price"], change)
 if await send_tg(t):
 log_push(pid, "anomaly_double", t)
 elif change <= -50 and not has_pushed(pid, "anomaly_halve"):
 t = fmt_anomaly(p, "halve", cg["price"], change)
 if await send_tg(t):
 log_push(pid, "anomaly_halve", t)
 break

# ============================================================
# 
# ============================================================

async def main():
 init_db()
 logger.info(f"📂 : {DB_PATH}")

 # TG
 ok = await send_tg("🎉 <b>Alpha Monitor v2 </b>\n\n📡 ...\n🔔 Alpha")
 if ok:
 logger.info("✅ TG")
 else:
 logger.warning("⚠️ TG,")

 tasks = [
 asyncio.create_task(announcement_listener(), name="announcements"),
 asyncio.create_task(aggregation_worker(), name="aggregator"),
 asyncio.create_task(post_launch_monitor(), name="monitor"),
 ]

 logger.info("=" * 50)
 logger.info("🚀 Alpha Monitor v2 ")
 logger.info(f" 📡 : {ANNOUNCEMENT_POLL_INTERVAL}s")
 logger.info(f" 🧠 LLM: {'Sonnet' if ANTHROPIC_API_KEY else '()'}")
 logger.info(f" 🔔 TG: {'✅' if TG_BOT_TOKEN else '❌'}")
 logger.info("=" * 50)

 try:
 await asyncio.gather(*tasks)
 except KeyboardInterrupt:
 for t in tasks:
 t.cancel()

if __name__ == "__main__":
 try:
 asyncio.run(main())
 except KeyboardInterrupt:
 pass

AI 자율 트레이딩 #

선물 + Alpha 자율 트레이딩 v1 #

게시일: 2026.04.29 태그: Python · Binance Futures · Autonomous · Telegram

AI가 시장 스캔 → 분석 → 가상 매매 → 모니터 → 복기 — 완전 자율

⚠️ 위험 경고: 이 코드는 가상/페이퍼 트레이딩에서만 테스트되었습니다. 실거래 경험이 전혀 없으며 실거래 진입의 근거로 사용해서는 안 됩니다. 실자금 검증을 거치지 않았습니다 — 사용에 따른 위험은 본인 책임입니다.

30초마다 바이낸스 선물 시장 전체를 자율적으로 스캔하고 이상치를 감지하여 가상 거래를 진행하는 AI. 4가지 시그널 감지 전략(극단적 음수 펀딩비 → 롱 스퀴즈, 극단적 양수 펀딩 → 숏, 급등 후 숏, 급락 반등). 포지션을 열기 전에 다차원 환경 점검(BTC 추세 + Fear&Greed + OI 관심도 + 거래량) 수행 — 7점 만점에 3점 이상이어야 진행. 30초마다 자동 손절/익절 모니터링.

현재 성적(정직 공개): 청산 4건, 승률 75%, +13.94U. 그러나 수익은 한 거래에 집중(IR 숏 +45%), 나머지는 본전 수준. 포지션 분산 부족 — 같은 방향 같은 논리로 쌓는 경향. 여전히 규칙 기반 점수 매기기이며, 진정한 독립 사고가 아닙니다.

학습 경로: 스캔 → 진입 → 청산 → 복기 → 문제 발견 → 전략 개선 → 반복. 목표: 규칙 실행기에서 독립적으로 사고하는 트레이더로 진화.

전체 소스 코드 #

#!/usr/bin/env python3
"""
 - 
PythonAI,
"""

import json
import os
import sys
import time
import requests
from datetime import datetime, timezone, timedelta

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DATA_FILE = os.path.join(SCRIPT_DIR, "trades.json")
SCANNER_STATE = os.path.join(SCRIPT_DIR, "scanner_state.json")
SCANNER_LOG = os.path.join(SCRIPT_DIR, "scanner.log")
INITIAL_BALANCE = 100.0
TZ_UTC8 = timezone(timedelta(hours=8))

# === ===
MAX_OPEN_POSITIONS = 3 # 
POSITION_PCT = 30 # %
LEVERAGE = 3 # 
COOLDOWN_HOURS = 4 # 
MIN_VOLUME_M = 10 # 24h(U)

# === TG ===
def load_tg_config():
 """Load TG config from environment variables or .env file"""
 env = {}
 # Try .env in script directory, then current directory
 for env_path in [
 os.path.join(SCRIPT_DIR, ".env"),
 os.path.join(os.getcwd(), ".env"),
 ]:
 if os.path.exists(env_path):
 with open(env_path) as f:
 for line in f:
 line = line.strip()
 if '=' in line and not line.startswith('#'):
 k, v = line.split('=', 1)
 env[k] = v.strip().strip('"').strip("'")
 break
 # OS environment variables override file
 for key in ['TG_BOT_TOKEN', 'TELEGRAM_BOT_TOKEN', 'TG_CHAT_ID']:
 val = os.environ.get(key)
 if val:
 env[key] = val
 return env

def send_tg(text):
 try:
 env = load_tg_config()
 token = env.get('TG_BOT_TOKEN', env.get('TELEGRAM_BOT_TOKEN', ''))
 if not token:
 return
 chat_id = env.get('TG_CHAT_ID', '')
 if not chat_id:
 return
 url = f"https://api.telegram.org/bot{token}/sendMessage"
 requests.post(url, json={
 "chat_id": chat_id,
 "text": text,
 "parse_mode": "Markdown"
 }, timeout=10)
 except:
 pass

# === ===
def load_trades():
 if os.path.exists(DATA_FILE):
 with open(DATA_FILE, "r", encoding="utf-8") as f:
 return json.load(f)
 return {"initial_balance": INITIAL_BALANCE, "trades": []}

def save_trades(data):
 with open(DATA_FILE, "w", encoding="utf-8") as f:
 json.dump(data, f, ensure_ascii=False, indent=2)

def load_state():
 if os.path.exists(SCANNER_STATE):
 with open(SCANNER_STATE, "r") as f:
 return json.load(f)
 return {"last_opens": {}, "signals_seen": {}}

def save_state(state):
 with open(SCANNER_STATE, "w") as f:
 json.dump(state, f, ensure_ascii=False, indent=2)

def get_balance(data):
 balance = data.get("initial_balance", INITIAL_BALANCE)
 for t in data["trades"]:
 if t["status"] == "closed" and t["pnl_usd"] is not None:
 balance += t["pnl_usd"]
 return balance

def next_id(data):
 if not data["trades"]:
 return "001"
 max_id = max(int(t["id"]) for t in data["trades"])
 return f"{max_id + 1:03d}"

def now_str():
 return datetime.now(TZ_UTC8).strftime("%Y-%m-%dT%H:%M:%S")

def log(msg):
 ts = datetime.now(TZ_UTC8).strftime("%m-%d %H:%M:%S")
 line = f"[{ts}] {msg}"
 print(line)
 with open(SCANNER_LOG, "a") as f:
 f.write(line + "\n")

# === API ===
def get_all_tickers():
 url = "https://fapi.binance.com/fapi/v1/ticker/24hr"
 resp = requests.get(url, timeout=10)
 return resp.json()

def get_funding_rates():
 """"""
 url = "https://fapi.binance.com/fapi/v1/premiumIndex"
 resp = requests.get(url, timeout=10)
 return {item['symbol']: float(item['lastFundingRate']) * 100 
 for item in resp.json()}

def get_funding_history(symbol, limit=8):
 url = f"https://fapi.binance.com/fapi/v1/fundingRate?symbol={symbol}&limit={limit}"
 resp = requests.get(url, timeout=10)
 return [float(item['fundingRate']) * 100 for item in resp.json()]

def get_open_interest(symbol):
 url = f"https://fapi.binance.com/fapi/v1/openInterest?symbol={symbol}"
 resp = requests.get(url, timeout=10)
 data = resp.json()
 return float(data['openInterest'])

def get_klines(symbol, interval="4h", limit=6):
 url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval={interval}&limit={limit}"
 resp = requests.get(url, timeout=10)
 return resp.json()

# === ===

def detect_extreme_negative_funding(symbol, funding_rate, funding_rates_map):
 """
 1: → ()
 : <-0.08% 
 """
 if funding_rate >= -0.08:
 return None
 
 try:
 history = get_funding_history(symbol, 8)
 neg_count = sum(1 for r in history if r < -0.03)
 if neg_count < 4:
 return None
 
 avg_rate = sum(history) / len(history)
 
 # ,
 strength = "S" if avg_rate < -0.15 else "A" if avg_rate < -0.10 else "B"
 
 return {
 "type": "extreme_neg_funding",
 "direction": "long",
 "strength": strength,
 "reason": f" avg:{avg_rate:.4f}% {neg_count}/8 ",
 "sl_pct": 0.08, # 8%
 "tp_pct": 0.12, # 12%
 }
 except:
 return None

def detect_extreme_positive_funding(symbol, funding_rate, funding_rates_map):
 """
 2: → ()
 : >0.10% 
 """
 if funding_rate <= 0.10:
 return None
 
 try:
 history = get_funding_history(symbol, 8)
 pos_count = sum(1 for r in history if r > 0.05)
 if pos_count < 4:
 return None
 
 avg_rate = sum(history) / len(history)
 strength = "S" if avg_rate > 0.20 else "A" if avg_rate > 0.12 else "B"
 
 return {
 "type": "extreme_pos_funding",
 "direction": "short",
 "strength": strength,
 "reason": f" avg:{avg_rate:.4f}% {pos_count}/8 ",
 "sl_pct": 0.10,
 "tp_pct": 0.15,
 }
 except:
 return None

def detect_crash_bounce(ticker):
 """
 3: ()
 : 24h>25% 4h/
 """
 change_pct = float(ticker['priceChangePercent'])
 if change_pct >= -25:
 return None
 
 symbol = ticker['symbol']
 try:
 klines = get_klines(symbol, "1h", 6)
 # 2K
 recent_closes = [float(k[4]) for k in klines[-3:]]
 # : K >= 
 if len(recent_closes) >= 2 and recent_closes[-1] >= recent_closes[-2]:
 return {
 "type": "crash_bounce",
 "direction": "long",
 "strength": "B", # B
 "reason": f"24h{change_pct:.1f}% ",
 "sl_pct": 0.10,
 "tp_pct": 0.15,
 }
 except:
 pass
 return None

def detect_pump_short(ticker):
 """
 4: (ATH)
 : 24h>40% — ,>85%
 ()
 """
 change_pct = float(ticker['priceChangePercent'])
 if change_pct <= 40:
 return None
 
 symbol = ticker['symbol']
 try:
 klines = get_klines(symbol, "1h", 6)
 highs = [float(k[2]) for k in klines]
 closes = [float(k[4]) for k in klines]
 current = closes[-1]
 peak = max(highs)
 
 # 10%
 pullback = (peak - current) / peak * 100
 if pullback < 10:
 return None
 
 strength = "A" if change_pct > 80 else "B"
 
 return {
 "type": "pump_short",
 "direction": "short",
 "strength": strength,
 "reason": f"24h{change_pct:.1f}%{pullback:.1f}% >85%",
 "sl_pct": 0.15, # ,
 "tp_pct": 0.20,
 }
 except:
 pass
 return None

# === ===
def check_environment(symbol, signal):
 """
 :,
 (pass/fail, analysis_dict, adjusted_strength)
 """
 analysis = {
 "btc_env": "",
 "sentiment": "",
 "oi_check": "",
 "volume_check": "",
 "verdict": ""
 }
 score = 0 # ,>=3
 
 try:
 # 1. BTC — BTC,BTC
 btc_url = "https://fapi.binance.com/fapi/v1/ticker/24hr?symbol=BTCUSDT"
 btc = requests.get(btc_url, timeout=5).json()
 btc_chg = float(btc['priceChangePercent'])
 
 if signal["direction"] == "long":
 if btc_chg > -2:
 score += 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
 elif btc_chg < -5:
 score -= 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% -1"
 else:
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
 else: # short
 if btc_chg < 2:
 score += 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% +1"
 elif btc_chg > 5:
 score -= 1
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% -1"
 else:
 analysis["btc_env"] = f"BTC {btc_chg:+.1f}% 0"
 
 # 2. (Fear & Greed)
 try:
 fng = requests.get("https://api.alternative.me/fng/", timeout=5).json()
 fng_val = int(fng['data'][0]['value'])
 if signal["direction"] == "long":
 if fng_val <= 25:
 score += 1
 analysis["sentiment"] = f"FGI={fng_val} +1"
 elif fng_val >= 75:
 score -= 1
 analysis["sentiment"] = f"FGI={fng_val} -1"
 else:
 analysis["sentiment"] = f"FGI={fng_val} 0"
 else:
 if fng_val >= 75:
 score += 1
 analysis["sentiment"] = f"FGI={fng_val} +1"
 elif fng_val <= 25:
 score -= 1
 analysis["sentiment"] = f"FGI={fng_val} -1"
 else:
 analysis["sentiment"] = f"FGI={fng_val} 0"
 except:
 analysis["sentiment"] = "FGI 0"
 
 # 3. OI — OI
 try:
 oi = get_open_interest(symbol)
 ticker = requests.get(f"https://fapi.binance.com/fapi/v1/ticker/24hr?symbol={symbol}", timeout=5).json()
 price = float(ticker['lastPrice'])
 oi_usd = oi * price
 
 if oi_usd > 5_000_000: # OI > 5M
 score += 1
 analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M +1"
 else:
 analysis["oi_check"] = f"OI={oi_usd/1e6:.1f}M 0"
 except:
 analysis["oi_check"] = "OI 0"
 
 # 4. — 
 try:
 vol = float(ticker.get('quoteVolume', 0))
 if vol > 50_000_000:
 score += 1
 analysis["volume_check"] = f"24h={vol/1e6:.0f}M +1"
 elif vol > 20_000_000:
 analysis["volume_check"] = f"24h={vol/1e6:.0f}M 0"
 else:
 score -= 1
 analysis["volume_check"] = f"24h={vol/1e6:.0f}M -1"
 except:
 analysis["volume_check"] = " 0"
 
 # 5. 
 if signal["strength"] == "S":
 score += 2
 elif signal["strength"] == "A":
 score += 1
 
 # : >=3
 analysis["verdict"] = f":{score}/7"
 
 if score >= 3:
 return True, analysis, signal["strength"]
 else:
 return False, analysis, signal["strength"]
 
 except Exception as e:
 analysis["verdict"] = f":{e} "
 return False, analysis, signal["strength"]

# === ===
def execute_open(data, state, symbol, price, signal):
 """ — """
 
 # 
 passed, env_analysis, strength = check_environment(symbol, signal)
 env_summary = " | ".join(v for v in env_analysis.values() if v)
 
 if not passed:
 log(f" {symbol}: {env_summary}")
 return
 
 log(f" {symbol}: {env_summary}")
 
 balance = get_balance(data)
 position_usd = balance * POSITION_PCT / 100
 
 if signal["direction"] == "long":
 sl = round(price * (1 - signal["sl_pct"]), 6)
 tp = round(price * (1 + signal["tp_pct"]), 6)
 else:
 sl = round(price * (1 + signal["sl_pct"]), 6)
 tp = round(price * (1 - signal["tp_pct"]), 6)
 
 trade = {
 "id": next_id(data),
 "symbol": symbol,
 "direction": signal["direction"],
 "leverage": LEVERAGE,
 "position_pct": POSITION_PCT,
 "position_usd": round(position_usd, 4),
 "notional_usd": round(position_usd * LEVERAGE, 4),
 "entry_price": price,
 "stop_loss": sl,
 "take_profit": tp,
 "entry_time": now_str(),
 "exit_price": None,
 "exit_time": None,
 "exit_reason": None,
 "pnl_pct": None,
 "pnl_usd": None,
 "status": "open",
 "pre_analysis": {
 "btc_env": env_analysis.get("btc_env", ""),
 "sentiment": env_analysis.get("sentiment", ""),
 "oi": env_analysis.get("oi_check", ""),
 "volume": env_analysis.get("volume_check", ""),
 "key_reason": f"[{signal['strength']}] {signal['reason']}",
 "risk": f":{env_analysis.get('verdict','')} :{signal['type']}"
 },
 "post_review": None
 }
 
 data["trades"].append(trade)
 save_trades(data)
 
 # 
 state["last_opens"][symbol] = now_str()
 save_state(state)
 
 direction_cn = "" if signal["direction"] == "long" else ""
 
 msg = f"""```
[] #{trade['id']}
: {symbol}
: {direction_cn} {LEVERAGE}x
: {price}
: {sl}
: {tp}
: {position_usd:.2f}U
: [{signal['strength']}] {signal['reason']}
: {trade['entry_time']}
```"""
 
 log(f" #{trade['id']} {symbol} {direction_cn} @ {price} | {signal['reason']}")
 send_tg(msg)
 print(msg)

# === ===
def swap_weakest(data, state, open_positions, new_signal, tickers):
 """S,,"""
 ticker_map = {t['symbol']: float(t['lastPrice']) for t in tickers}
 
 # %
 worst_trade = None
 worst_pnl = float('inf')
 
 for t in open_positions:
 price = ticker_map.get(t["symbol"])
 if price is None:
 continue
 if t["direction"] == "long":
 pnl_pct = (price - t["entry_price"]) / t["entry_price"] * 100
 else:
 pnl_pct = (t["entry_price"] - price) / t["entry_price"] * 100
 
 if pnl_pct < worst_pnl:
 worst_pnl = pnl_pct
 worst_trade = t
 worst_price = price
 
 if worst_trade is None:
 return
 
 # ,
 if worst_pnl > 0:
 log(f", | : {new_signal['symbol']}")
 return
 
 # 
 if worst_trade["direction"] == "long":
 pnl_pct_lev = (worst_price - worst_trade["entry_price"]) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
 else:
 pnl_pct_lev = (worst_trade["entry_price"] - worst_price) / worst_trade["entry_price"] * 100 * worst_trade["leverage"]
 
 pos_usd = worst_trade.get("position_usd", worst_trade.get("position_pct", 30))
 pnl_usd = round(pnl_pct_lev / 100 * pos_usd, 4)
 
 worst_trade["exit_price"] = worst_price
 worst_trade["exit_time"] = now_str()
 worst_trade["exit_reason"] = f"→{new_signal['symbol']}"
 worst_trade["pnl_pct"] = round(pnl_pct_lev, 2)
 worst_trade["pnl_usd"] = pnl_usd
 worst_trade["status"] = "closed"
 save_trades(data)
 
 direction_cn = "" if worst_trade["direction"] == "long" else ""
 msg = f"""```
[] #{worst_trade['id']}
: {worst_trade['symbol']} {direction_cn}
: {worst_trade['entry_price']}
: {worst_price}
: {pnl_pct_lev:+.2f}% ({pnl_usd:+.2f}U)
: S{new_signal['symbol']}
```"""
 log(f" #{worst_trade['id']} {worst_trade['symbol']} {pnl_usd:+.2f}U → {new_signal['symbol']}")
 send_tg(msg)
 
 # 
 execute_open(data, state, new_signal["symbol"], new_signal["price"], new_signal)

# === ===
def scan():
 data = load_trades()
 state = load_state()
 now = datetime.now(TZ_UTC8)
 
 # 
 open_positions = [t for t in data["trades"] if t["status"] == "open"]
 open_symbols = set(t["symbol"] for t in open_positions)
 
 if len(open_positions) >= MAX_OPEN_POSITIONS:
 return
 
 # 
 try:
 tickers = get_all_tickers()
 funding_rates = get_funding_rates()
 except Exception as e:
 log(f"API: {e}")
 return
 
 # USDT + 
 exclude = {"BTCUSDT", "ETHUSDT", "USDCUSDT", "FDUSDUSDT", "BTCDOMUSDT", "BTCSTUSDT"}
 candidates = [t for t in tickers 
 if t['symbol'].endswith('USDT') 
 and t['symbol'] not in exclude
 and float(t['quoteVolume']) > MIN_VOLUME_M * 1e6]
 
 signals = []
 
 for ticker in candidates:
 symbol = ticker['symbol']
 
 # 
 if symbol in open_symbols:
 continue
 
 # 
 last_open = state.get("last_opens", {}).get(symbol)
 if last_open:
 try:
 last_dt = datetime.fromisoformat(last_open)
 if last_dt.tzinfo is None:
 last_dt = last_dt.replace(tzinfo=TZ_UTC8)
 if (now - last_dt).total_seconds() < COOLDOWN_HOURS * 3600:
 continue
 except:
 pass
 
 fr = funding_rates.get(symbol, 0)
 
 # 
 for detect_fn in [
 lambda s, f, m: detect_extreme_negative_funding(s, f, m),
 lambda s, f, m: detect_extreme_positive_funding(s, f, m),
 lambda s, f, m: detect_crash_bounce(ticker),
 lambda s, f, m: detect_pump_short(ticker),
 ]:
 try:
 signal = detect_fn(symbol, fr, funding_rates)
 if signal:
 signal["symbol"] = symbol
 signal["price"] = float(ticker['lastPrice'])
 signal["volume_m"] = float(ticker['quoteVolume']) / 1e6
 signals.append(signal)
 except:
 continue
 
 if not signals:
 return
 
 # S>A>B
 strength_order = {"S": 0, "A": 1, "B": 2}
 signals.sort(key=lambda x: strength_order.get(x["strength"], 3))
 
 # (1)
 best = signals[0]
 
 # B,SA
 if best["strength"] == "B":
 log(f"B: {best['symbol']} {best['reason']}")
 return
 
 slots = MAX_OPEN_POSITIONS - len(open_positions)
 if slots > 0:
 execute_open(data, state, best["symbol"], best["price"], best)
 elif best["strength"] == "S":
 # S → 
 swap_weakest(data, state, open_positions, best, tickers)

if __name__ == "__main__":
 scan()

유틸리티 도구 #

VoiceKey — 화자 인증 #

게시일: 2026.04.27 태그: Python · Security · Telegram · Speaker Verification

성문 인증으로 AI 에이전트를 보호하세요

Telegram 봇 보안을 위한 화자 검증. 화자 임베딩 추출에 resemblyzer(GE2E 모델)를 사용하고 코사인 유사도로 일치 여부를 판단합니다. 비밀번호 없이 음성으로 인증 — 알려진 화자만 로그인 가능. 가벼운 의존성, 순수 Python.

전체 소스 코드 #

#!/usr/bin/env python3
"""
VoiceKey — (Voiceprint Authentication)
Speaker verification for Telegram bot security.

Uses resemblyzer (GE2E model) for speaker embedding extraction
and cosine similarity for verification.

Zero AI cost — runs entirely on local CPU.

Usage:
 # Register voiceprint from audio files
 python voicekey.py register --audio voice1.ogg voice2.ogg --owner "YourName"
 
 # Verify a voice against stored voiceprint
 python voicekey.py verify --audio test.ogg
 
 # As a Python module
 from voicekey import VoiceKey
 vk = VoiceKey()
 vk.register(["voice1.ogg", "voice2.ogg"], owner="YourName")
 is_owner, score = vk.verify("test.ogg")
"""

import os
import json
import tempfile
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime

# Lazy imports to speed up module load when not needed
_encoder = None

def _get_encoder():
 """Lazy-load the voice encoder model (first call takes ~1s)."""
 global _encoder
 if _encoder is None:
 from resemblyzer import VoiceEncoder
 _encoder = VoiceEncoder()
 return _encoder

def _audio_to_wav(audio_path: str) -> str:
 """Convert any audio format to 16kHz mono WAV for processing."""
 from pydub import AudioSegment
 
 ext = Path(audio_path).suffix.lower()
 if ext in ('.ogg', '.oga'):
 audio = AudioSegment.from_ogg(audio_path)
 elif ext == '.mp3':
 audio = AudioSegment.from_mp3(audio_path)
 elif ext == '.wav':
 return audio_path # already wav
 elif ext in ('.m4a', '.aac'):
 audio = AudioSegment.from_file(audio_path, format=ext.lstrip('.'))
 else:
 audio = AudioSegment.from_file(audio_path)
 
 # Convert to 16kHz mono
 audio = audio.set_frame_rate(16000).set_channels(1)
 
 tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
 audio.export(tmp.name, format='wav')
 return tmp.name

def _extract_embedding(audio_path: str) -> np.ndarray:
 """Extract voice embedding from an audio file."""
 from resemblyzer import preprocess_wav
 
 encoder = _get_encoder()
 
 wav_path = _audio_to_wav(audio_path)
 wav = preprocess_wav(wav_path)
 
 # Cleanup temp file
 if wav_path != audio_path:
 os.unlink(wav_path)
 
 if len(wav) < 1600: # Less than 0.1s
 raise ValueError(f"Audio too short: {len(wav)/16000:.1f}s (need >0.1s)")
 
 return encoder.embed_utterance(wav)

class VoiceKey:
 """
 Speaker verification using voice embeddings.
 
 Attributes:
 data_dir: Directory to store voiceprint data
 threshold: Cosine similarity threshold for verification (default: 0.75)
 voiceprint: The registered owner's voice embedding (256-dim vector)
 """
 
 def __init__(self, data_dir: str = None, threshold: float = 0.75):
 if data_dir is None:
 data_dir = os.path.expanduser("~/.hermes/voiceprint")
 self.data_dir = Path(data_dir)
 self.data_dir.mkdir(parents=True, exist_ok=True)
 self.threshold = threshold
 self.voiceprint = None
 self.metadata = {}
 self._load()
 
 def _load(self):
 """Load existing voiceprint if available."""
 vp_path = self.data_dir / "voiceprint.npy"
 meta_path = self.data_dir / "voiceprint_meta.json"
 
 if vp_path.exists():
 self.voiceprint = np.load(str(vp_path))
 if meta_path.exists():
 with open(meta_path) as f:
 self.metadata = json.load(f)
 self.threshold = self.metadata.get("threshold", self.threshold)
 
 @property
 def is_registered(self) -> bool:
 """Check if a voiceprint is registered."""
 return self.voiceprint is not None
 
 def register(self, audio_paths: list, owner: str = "owner") -> dict:
 """
 Register a voiceprint from multiple audio samples.
 
 Args:
 audio_paths: List of audio file paths (ogg, mp3, wav, etc.)
 owner: Name of the voiceprint owner
 
 Returns:
 dict with registration results
 """
 embeddings = []
 results = []
 
 for path in audio_paths:
 try:
 embed = _extract_embedding(path)
 embeddings.append(embed)
 results.append({"file": os.path.basename(path), "status": "ok"})
 except Exception as e:
 results.append({"file": os.path.basename(path), "status": "error", "error": str(e)})
 
 if not embeddings:
 raise ValueError("No valid audio samples provided")
 
 # Average embeddings and normalize
 voiceprint = np.mean(embeddings, axis=0)
 voiceprint = voiceprint / np.linalg.norm(voiceprint)
 
 # Save
 np.save(str(self.data_dir / "voiceprint.npy"), voiceprint)
 
 self.metadata = {
 "owner": owner,
 "samples_used": len(embeddings),
 "embedding_dim": int(voiceprint.shape[0]),
 "threshold": self.threshold,
 "created": datetime.now().isoformat(),
 }
 with open(self.data_dir / "voiceprint_meta.json", "w") as f:
 json.dump(self.metadata, f, indent=2)
 
 self.voiceprint = voiceprint
 
 # Self-test
 similarities = []
 for embed in embeddings:
 sim = float(np.dot(voiceprint, embed / np.linalg.norm(embed)))
 similarities.append(sim)
 
 return {
 "owner": owner,
 "samples": len(embeddings),
 "self_test_scores": similarities,
 "min_score": min(similarities),
 "details": results,
 }
 
 def verify(self, audio_path: str) -> tuple:
 """
 Verify if an audio sample matches the registered voiceprint.
 
 Args:
 audio_path: Path to audio file to verify
 
 Returns:
 tuple: (is_verified: bool, similarity_score: float)
 """
 if not self.is_registered:
 raise RuntimeError("No voiceprint registered. Call register() first.")
 
 embed = _extract_embedding(audio_path)
 embed = embed / np.linalg.norm(embed)
 
 similarity = float(np.dot(self.voiceprint, embed))
 is_verified = similarity >= self.threshold
 
 return is_verified, similarity
 
 def get_info(self) -> dict:
 """Get voiceprint registration info."""
 if not self.is_registered:
 return {"registered": False}
 return {
 "registered": True,
 **self.metadata,
 }

def main():
 parser = argparse.ArgumentParser(
 description="VoiceKey — Speaker verification for security"
 )
 sub = parser.add_subparsers(dest="command")
 
 # Register
 reg = sub.add_parser("register", help="Register voiceprint from audio files")
 reg.add_argument("--audio", nargs="+", required=True, help="Audio files (ogg/mp3/wav)")
 reg.add_argument("--owner", default="owner", help="Owner name")
 reg.add_argument("--data-dir", default=None, help="Data directory")
 reg.add_argument("--threshold", type=float, default=0.75, help="Verification threshold")
 
 # Verify
 ver = sub.add_parser("verify", help="Verify audio against voiceprint")
 ver.add_argument("--audio", required=True, help="Audio file to verify")
 ver.add_argument("--data-dir", default=None, help="Data directory")
 
 # Info
 sub.add_parser("info", help="Show voiceprint info")
 
 args = parser.parse_args()
 
 if args.command == "register":
 vk = VoiceKey(data_dir=args.data_dir, threshold=args.threshold)
 result = vk.register(args.audio, owner=args.owner)
 print(f"Registered voiceprint for: {result['owner']}")
 print(f"Samples used: {result['samples']}")
 print(f"Self-test scores: {[f'{s:.4f}' for s in result['self_test_scores']]}")
 print(f"Min score: {result['min_score']:.4f} (threshold: {args.threshold})")
 
 elif args.command == "verify":
 vk = VoiceKey(data_dir=getattr(args, 'data_dir', None))
 is_ok, score = vk.verify(args.audio)
 status = "PASS" if is_ok else "FAIL"
 print(f"[{status}] Similarity: {score:.4f} (threshold: {vk.threshold})")
 
 elif args.command == "info":
 vk = VoiceKey()
 info = vk.get_info()
 for k, v in info.items():
 print(f" {k}: {v}")
 else:
 parser.print_help()

if __name__ == "__main__":
 main()

마무리 #

위 7개 스니펫은 모두 connectfarm1.com에서 가져왔으며 수집 시점은 2026-05-04입니다. 세 가지 철학을 공유합니다: (1) 거의 0에 가까운 AI 비용 — 대부분 LLM 대신 규칙 엔진과 무료 공개 API를 사용; (2) 오직 Python, 저렴한 VPS에서 crontab 또는 단순한 while True로 충분히 실행 가능; (3) Telegram을 단일 출력 채널로 — 대시보드도 프론트엔드도 만들지 않고, 실제로 보는 곳으로 바로 푸시. 여러 레이더를 조합해 신호를 교차 검증하고, “자율 트레이딩"은 인쇄기가 아니라 연구용 샌드박스로 다루세요.

发布于 Friday, May 15, 2026 · 最后更新 Friday, May 15, 2026