import asyncio, json, uvicorn, websockets, requests from collections import defaultdict from fastapi import FastAPI from models import get_session, Candle import gate_api from gate_ws import Configuration, Connection, WebSocketResponse from gate_ws.spot import SpotPublicTradeChannel from gate_ws.futures import FuturesPublicTradeChannel, FuturesCandlesticksChannel channels = defaultdict(set) session = get_session() app = FastAPI() gate_channels = set() async def handler(websocket): subscribed = set() async def consumer(): """메시지를 계속 읽어 subscribe / unsubscribe 처리""" async for message in websocket: try: data = json.loads(message) typ = data["type"] ch = data["channel"] global ti ti = data["time"] if typ == "subscribe": channels[ch].add(websocket) subscribed.add(ch) print(f"{websocket.remote_address} joined {ch}") if f'{ti}_{ch}' not in gate_channels : channel.subscribe([ti, ch]) gate_channels.add(f'{ti}_{ch}') elif typ == "unsubscribe": channels[ch].discard(websocket) subscribed.discard(ch) channel.unsubscribe([ti, ch]) print(f"{websocket.remote_address} left {ch}") except Exception as e: print("Bad message:", e) # consumer를 별도 태스크로 실행 consumer_task = asyncio.create_task(consumer()) try: await consumer_task # 소켓이 닫힐 때까지 대기 except websockets.ConnectionClosed: pass finally: # 연결 끊기면 가입한 채널 모두에서 제거 for ch in subscribed: channels[ch].discard(websocket) async def broadcast_loop(conn: Connection, resp: WebSocketResponse): for ch, subs in channels.items(): if subs: # 구독자가 있을 때만 result = resp.result[0] if result['n'] == f'{ti}_{ch}' : payload = json.dumps({"channel": ch, "msg": json.dumps(result)}, default=str, ensure_ascii=False) websockets.broadcast(subs, payload) async def gate_conn() : config = gate_api.Configuration(key='a086691c31868cd5c176a78896c0e977', secret='443ff6756cbc80c13afa50d2df1b8be97a0e89c7e4301c6cfd2088317eadb537') futures_conn = Connection(Configuration(app='futures', settle='usdt', test_net=False, api_key=config.key, api_secret=config.secret)) return futures_conn async def socket_server() : futures_conn = await gate_conn() global channel channel = FuturesCandlesticksChannel(futures_conn, broadcast_loop) async with websockets.serve(handler, "0.0.0.0", 8765) : await futures_conn.run() # channel.subscribe(["5m","ETH_USDT"]) # channel.subscribe(["5m","SOL_USDT"]) # channel.subscribe(["5m","XRP_USDT"]) # channel.subscribe(["5m","LINK_USDT"]) @app.get("/api/candle/{contract}/{ti}/{to}") def get_candle(contract: str, ti: str, to: str = None): url = f'https://fx-api.gateio.ws/api/v4/futures/usdt/candlesticks?contract={contract}&interval={ti}&to={to}&limit=1500' print(url) res = requests.get(url) return sorted(res.json(), key=lambda x: x['t']) async def main(): config = uvicorn.Config(app=app, host="0.0.0.0", port=7010, lifespan="off", reload=True) server = uvicorn.Server(config) # 병렬로 소켓 서버와 FastAPI 실행 await asyncio.gather( socket_server(), # 비동기 소켓 서버 server.serve() # FastAPI 서버 ) if __name__ == "__main__": asyncio.run(main())