From d8a793401fc64bf3e82fd5118f46e5aaa59fafd4 Mon Sep 17 00:00:00 2001 From: bangae1 Date: Sun, 24 Aug 2025 18:31:28 +0900 Subject: [PATCH] a --- api.py | 84 ---------------------------------- main.py | 136 ++++++++++++++++++++++---------------------------------- 2 files changed, 54 insertions(+), 166 deletions(-) delete mode 100644 api.py diff --git a/api.py b/api.py deleted file mode 100644 index cc58790..0000000 --- a/api.py +++ /dev/null @@ -1,84 +0,0 @@ -import asyncio, json -from typing import Literal -from fastapi import FastAPI -from models import get_session, get_engine, Candle -import pandas as pd, pandas_ta as ta - - -session = get_session() -app = FastAPI() - - -def get_ticker_all( - contract: str, - target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w'] -): - print(target_interval) - """ - 10초 간격 캔들 데이터를 지정된 봉으로 리샘플링 - - Args: - df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume) - contract: 계약명 (필터링용) - target_interval: 목표 봉 (예: '1m', '1h', '1d') - - Returns: - pd.DataFrame: 리샘플링된 OHLCV 데이터 - """ - results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).all() - - df = pd.DataFrame([row.__dict__ for row in results]) - - # 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime) - df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds - - # 3. index로 설정 - df.set_index('ts', inplace=True) - - # 4. resample 주기 설정 - freq_map = { - '1m': '1Min', - '5m': '5Min', - '15m': '15Min', - '30m': '30Min', - '1h': '1H', - '4h': '4H', - '1d': '1D', - '1w': '1W' - } - - if target_interval not in freq_map: - raise ValueError(f"Unsupported interval: {target_interval}") - - freq = freq_map[target_interval] - - # 4. 리샘플링 (OHLCV) - ohlc = df['close'].resample(freq).ohlc() # open, high, low, close - volume = df['volume'].resample(freq).sum().rename('volume') - - # 5. 병합 - result = pd.concat([ohlc, volume], axis=1).dropna() - - # 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가 - result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64) - # 또는 밀리초 단위로 정확히: - result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms - - # 7. (옵션) 'time'을 맨 앞으로 이동 - cols = ['time', 'open', 'high', 'low', 'close', 'volume'] - result = result[cols] - result.tail(1) - - return result - -@app.get("/api/candle/{contract}/{time}") -def get_candle(contract: str, time: str): - results = get_ticker_all(contract, time) - - dict_row = pd.DataFrame(results).to_json(orient='table') - # payload = json.dumps({"msg": dict_row}, default=str, ensure_ascii=False) - return json.loads(dict_row) - -@app.get("/") -async def test(): - return {"msg":"hello"} diff --git a/main.py b/main.py index 95b0c68..5b3ec22 100644 --- a/main.py +++ b/main.py @@ -1,81 +1,20 @@ -import asyncio -import json -from dataclasses import asdict -from typing import Literal -import websockets -from sqlalchemy.orm import Session -from sqlalchemy import and_, or_, desc, asc -from sqlalchemy.sql import text -from models import get_session, get_engine, Candle -import pandas as pd, pandas_ta as ta +import asyncio, json, uvicorn, websockets, requests from collections import defaultdict +from fastapi import FastAPI +from models import get_session, Candle +import gate_api +from gate_ws import Configuration, Connection, WebSocketResponse +from gate_ws.spot import SpotPublicTradeChannel +from gate_ws.futures import FuturesPublicTradeChannel, FuturesCandlesticksChannel channels = defaultdict(set) - session = get_session() +app = FastAPI() -def get_ticker( - contract: str, - target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w'] -): - """ - 10초 간격 캔들 데이터를 지정된 봉으로 리샘플링 - - Args: - df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume) - contract: 계약명 (필터링용) - target_interval: 목표 봉 (예: '1m', '1h', '1d') - - Returns: - pd.DataFrame: 리샘플링된 OHLCV 데이터 - """ - results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).limit(1000).all() - - df = pd.DataFrame([row.__dict__ for row in results]) - - # 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime) - df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds - - # 3. index로 설정 - df.set_index('ts', inplace=True) - - # 4. resample 주기 설정 - freq_map = { - '1m': '1Min', - '5m': '5Min', - '15m': '15Min', - '30m': '30Min', - '1h': '1H', - '4h': '4H', - '1d': '1D', - '1w': '1W' - } - - if target_interval not in freq_map: - raise ValueError(f"Unsupported interval: {target_interval}") - - freq = freq_map[target_interval] - - # 4. 리샘플링 (OHLCV) - ohlc = df['close'].resample(freq).ohlc() # open, high, low, close - volume = df['volume'].resample(freq).sum().rename('volume') - - # 5. 병합 - result = pd.concat([ohlc, volume], axis=1).dropna() - - # 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가 - result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64) - # 또는 밀리초 단위로 정확히: - result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms - - # 7. (옵션) 'time'을 맨 앞으로 이동 - cols = ['time', 'open', 'high', 'low', 'close', 'volume'] - results = result[cols] - last = results.tail(1) - - return last +gate_channels = set() async def handler(websocket): + subscribed = set() async def consumer(): @@ -92,10 +31,15 @@ async def handler(websocket): channels[ch].add(websocket) subscribed.add(ch) print(f"{websocket.remote_address} joined {ch}") + if f'{ti}_{ch}' not in gate_channels : + channel.subscribe([ti, ch]) + + gate_channels.add(f'{ti}_{ch}') elif typ == "unsubscribe": channels[ch].discard(websocket) subscribed.discard(ch) + channel.unsubscribe([ti, ch]) print(f"{websocket.remote_address} left {ch}") except Exception as e: @@ -113,20 +57,48 @@ async def handler(websocket): for ch in subscribed: channels[ch].discard(websocket) -async def broadcast_loop(): - """1초마다 모든 채널에 tick 브로드캐스트""" - while True: - await asyncio.sleep(2) - for ch, subs in channels.items(): - if subs: # 구독자가 있을 때만 - ticker = get_ticker(ch, '1m') - dict_row = pd.DataFrame(ticker).to_json(orient='table') - payload = json.dumps({"channel": ch, "msg": dict_row}, default=str, ensure_ascii=False) +async def broadcast_loop(conn: Connection, resp: WebSocketResponse): + for ch, subs in channels.items(): + if subs: # 구독자가 있을 때만 + result = resp.result[0] + if result['n'] == f'{ti}_{ch}' : + payload = json.dumps({"channel": ch, "msg": json.dumps(result)}, default=str, ensure_ascii=False) websockets.broadcast(subs, payload) -async def main(): +async def gate_conn() : + config = gate_api.Configuration(key='a086691c31868cd5c176a78896c0e977', secret='443ff6756cbc80c13afa50d2df1b8be97a0e89c7e4301c6cfd2088317eadb537') + futures_conn = Connection(Configuration(app='futures', settle='usdt', test_net=False, api_key=config.key, api_secret=config.secret)) + return futures_conn + +async def socket_server() : + futures_conn = await gate_conn() + global channel + channel = FuturesCandlesticksChannel(futures_conn, broadcast_loop) async with websockets.serve(handler, "0.0.0.0", 8765) : - await broadcast_loop() + await futures_conn.run() + # channel.subscribe(["5m","ETH_USDT"]) + # channel.subscribe(["5m","SOL_USDT"]) + # channel.subscribe(["5m","XRP_USDT"]) + # channel.subscribe(["5m","LINK_USDT"]) + + +@app.get("/api/candle/{contract}/{ti}/{to}") +def get_candle(contract: str, ti: str, to: str = None): + url = f'https://fx-api.gateio.ws/api/v4/futures/usdt/candlesticks?contract={contract}&interval={ti}&to={to}&limit=1500' + print(url) + res = requests.get(url) + return sorted(res.json(), key=lambda x: x['t']) + + +async def main(): + config = uvicorn.Config(app=app, host="0.0.0.0", port=7010, lifespan="off", reload=True) + server = uvicorn.Server(config) + + # 병렬로 소켓 서버와 FastAPI 실행 + await asyncio.gather( + socket_server(), # 비동기 소켓 서버 + server.serve() # FastAPI 서버 + ) if __name__ == "__main__": asyncio.run(main()) \ No newline at end of file