import asyncio import json from dataclasses import asdict from typing import Literal import websockets from sqlalchemy.orm import Session from sqlalchemy import and_, or_, desc, asc from sqlalchemy.sql import text from models import get_session, get_engine, Candle import pandas as pd, pandas_ta as ta from collections import defaultdict channels = defaultdict(set) session = get_session() def get_ticker( contract: str, target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w'] ): """ 10초 간격 캔들 데이터를 지정된 봉으로 리샘플링 Args: df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume) contract: 계약명 (필터링용) target_interval: 목표 봉 (예: '1m', '1h', '1d') Returns: pd.DataFrame: 리샘플링된 OHLCV 데이터 """ results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).limit(1000).all() df = pd.DataFrame([row.__dict__ for row in results]) # 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime) df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds # 3. index로 설정 df.set_index('ts', inplace=True) # 4. resample 주기 설정 freq_map = { '1m': '1Min', '5m': '5Min', '15m': '15Min', '30m': '30Min', '1h': '1H', '4h': '4H', '1d': '1D', '1w': '1W' } if target_interval not in freq_map: raise ValueError(f"Unsupported interval: {target_interval}") freq = freq_map[target_interval] # 4. 리샘플링 (OHLCV) ohlc = df['close'].resample(freq).ohlc() # open, high, low, close volume = df['volume'].resample(freq).sum().rename('volume') # 5. 병합 result = pd.concat([ohlc, volume], axis=1).dropna() # 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가 result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64) # 또는 밀리초 단위로 정확히: result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms # 7. (옵션) 'time'을 맨 앞으로 이동 cols = ['time', 'open', 'high', 'low', 'close', 'volume'] results = result[cols] last = results.tail(1) return last async def handler(websocket): subscribed = set() async def consumer(): """메시지를 계속 읽어 subscribe / unsubscribe 처리""" async for message in websocket: try: data = json.loads(message) typ = data["type"] ch = data["channel"] global ti ti = data["time"] if typ == "subscribe": channels[ch].add(websocket) subscribed.add(ch) print(f"{websocket.remote_address} joined {ch}") elif typ == "unsubscribe": channels[ch].discard(websocket) subscribed.discard(ch) print(f"{websocket.remote_address} left {ch}") except Exception as e: print("Bad message:", e) # consumer를 별도 태스크로 실행 consumer_task = asyncio.create_task(consumer()) try: await consumer_task # 소켓이 닫힐 때까지 대기 except websockets.ConnectionClosed: pass finally: # 연결 끊기면 가입한 채널 모두에서 제거 for ch in subscribed: channels[ch].discard(websocket) async def broadcast_loop(): """1초마다 모든 채널에 tick 브로드캐스트""" while True: await asyncio.sleep(2) for ch, subs in channels.items(): if subs: # 구독자가 있을 때만 ticker = get_ticker(ch, '1m') dict_row = pd.DataFrame(ticker).to_json(orient='table') payload = json.dumps({"channel": ch, "msg": dict_row}, default=str, ensure_ascii=False) websockets.broadcast(subs, payload) async def main(): async with websockets.serve(handler, "0.0.0.0", 8765) : await broadcast_loop() if __name__ == "__main__": asyncio.run(main())