From 7c48d61edd85de23c2789472686472f088179227 Mon Sep 17 00:00:00 2001 From: bangae1 Date: Sat, 23 Aug 2025 13:08:58 +0900 Subject: [PATCH] a --- api.py | 84 ++++++++++++++++++++++++++++++++++ main.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ models.py | 64 +++++++++++--------------- test.py | 23 ---------- 4 files changed, 241 insertions(+), 62 deletions(-) create mode 100644 api.py create mode 100644 main.py delete mode 100644 test.py diff --git a/api.py b/api.py new file mode 100644 index 0000000..cc58790 --- /dev/null +++ b/api.py @@ -0,0 +1,84 @@ +import asyncio, json +from typing import Literal +from fastapi import FastAPI +from models import get_session, get_engine, Candle +import pandas as pd, pandas_ta as ta + + +session = get_session() +app = FastAPI() + + +def get_ticker_all( + contract: str, + target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w'] +): + print(target_interval) + """ + 10초 간격 캔들 데이터를 지정된 봉으로 리샘플링 + + Args: + df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume) + contract: 계약명 (필터링용) + target_interval: 목표 봉 (예: '1m', '1h', '1d') + + Returns: + pd.DataFrame: 리샘플링된 OHLCV 데이터 + """ + results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).all() + + df = pd.DataFrame([row.__dict__ for row in results]) + + # 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime) + df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds + + # 3. index로 설정 + df.set_index('ts', inplace=True) + + # 4. resample 주기 설정 + freq_map = { + '1m': '1Min', + '5m': '5Min', + '15m': '15Min', + '30m': '30Min', + '1h': '1H', + '4h': '4H', + '1d': '1D', + '1w': '1W' + } + + if target_interval not in freq_map: + raise ValueError(f"Unsupported interval: {target_interval}") + + freq = freq_map[target_interval] + + # 4. 리샘플링 (OHLCV) + ohlc = df['close'].resample(freq).ohlc() # open, high, low, close + volume = df['volume'].resample(freq).sum().rename('volume') + + # 5. 병합 + result = pd.concat([ohlc, volume], axis=1).dropna() + + # 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가 + result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64) + # 또는 밀리초 단위로 정확히: + result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms + + # 7. (옵션) 'time'을 맨 앞으로 이동 + cols = ['time', 'open', 'high', 'low', 'close', 'volume'] + result = result[cols] + result.tail(1) + + return result + +@app.get("/api/candle/{contract}/{time}") +def get_candle(contract: str, time: str): + results = get_ticker_all(contract, time) + + dict_row = pd.DataFrame(results).to_json(orient='table') + # payload = json.dumps({"msg": dict_row}, default=str, ensure_ascii=False) + return json.loads(dict_row) + +@app.get("/") +async def test(): + return {"msg":"hello"} diff --git a/main.py b/main.py new file mode 100644 index 0000000..95b0c68 --- /dev/null +++ b/main.py @@ -0,0 +1,132 @@ +import asyncio +import json +from dataclasses import asdict +from typing import Literal +import websockets +from sqlalchemy.orm import Session +from sqlalchemy import and_, or_, desc, asc +from sqlalchemy.sql import text +from models import get_session, get_engine, Candle +import pandas as pd, pandas_ta as ta +from collections import defaultdict + +channels = defaultdict(set) + +session = get_session() + +def get_ticker( + contract: str, + target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w'] +): + """ + 10초 간격 캔들 데이터를 지정된 봉으로 리샘플링 + + Args: + df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume) + contract: 계약명 (필터링용) + target_interval: 목표 봉 (예: '1m', '1h', '1d') + + Returns: + pd.DataFrame: 리샘플링된 OHLCV 데이터 + """ + results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).limit(1000).all() + + df = pd.DataFrame([row.__dict__ for row in results]) + + # 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime) + df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds + + # 3. index로 설정 + df.set_index('ts', inplace=True) + + # 4. resample 주기 설정 + freq_map = { + '1m': '1Min', + '5m': '5Min', + '15m': '15Min', + '30m': '30Min', + '1h': '1H', + '4h': '4H', + '1d': '1D', + '1w': '1W' + } + + if target_interval not in freq_map: + raise ValueError(f"Unsupported interval: {target_interval}") + + freq = freq_map[target_interval] + + # 4. 리샘플링 (OHLCV) + ohlc = df['close'].resample(freq).ohlc() # open, high, low, close + volume = df['volume'].resample(freq).sum().rename('volume') + + # 5. 병합 + result = pd.concat([ohlc, volume], axis=1).dropna() + + # 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가 + result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64) + # 또는 밀리초 단위로 정확히: + result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms + + # 7. (옵션) 'time'을 맨 앞으로 이동 + cols = ['time', 'open', 'high', 'low', 'close', 'volume'] + results = result[cols] + last = results.tail(1) + + return last + +async def handler(websocket): + subscribed = set() + async def consumer(): + + """메시지를 계속 읽어 subscribe / unsubscribe 처리""" + async for message in websocket: + try: + data = json.loads(message) + typ = data["type"] + ch = data["channel"] + global ti + ti = data["time"] + + if typ == "subscribe": + channels[ch].add(websocket) + subscribed.add(ch) + print(f"{websocket.remote_address} joined {ch}") + + elif typ == "unsubscribe": + channels[ch].discard(websocket) + subscribed.discard(ch) + print(f"{websocket.remote_address} left {ch}") + + except Exception as e: + print("Bad message:", e) + + # consumer를 별도 태스크로 실행 + consumer_task = asyncio.create_task(consumer()) + + try: + await consumer_task # 소켓이 닫힐 때까지 대기 + except websockets.ConnectionClosed: + pass + finally: + # 연결 끊기면 가입한 채널 모두에서 제거 + for ch in subscribed: + channels[ch].discard(websocket) + +async def broadcast_loop(): + """1초마다 모든 채널에 tick 브로드캐스트""" + while True: + await asyncio.sleep(2) + for ch, subs in channels.items(): + if subs: # 구독자가 있을 때만 + ticker = get_ticker(ch, '1m') + dict_row = pd.DataFrame(ticker).to_json(orient='table') + payload = json.dumps({"channel": ch, "msg": dict_row}, default=str, ensure_ascii=False) + websockets.broadcast(subs, payload) + +async def main(): + async with websockets.serve(handler, "0.0.0.0", 8765) : + await broadcast_loop() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/models.py b/models.py index b47c7e8..847e94a 100644 --- a/models.py +++ b/models.py @@ -4,56 +4,42 @@ from sqlalchemy.orm import registry, sessionmaker # 메타데이터 객체 관리 mapper_registry = registry() -# DB 유저 테이블 -ticker_table = Table( - "ticker", + +candle_table = Table( + 'candle', mapper_registry.metadata, Column("contract", String(20), primary_key=True), - Column("clast", String(50)), - Column("change_percentage", Double), - Column("total_size", Integer), - Column("volume_24h", Integer), - Column("volume_24h_base", Integer), - Column("volume_24h_quote", Integer), - Column("volume_24h_settle", Integer), - Column("funding_rate", Double), - Column("funding_rate_indicative", Double), - Column("quanto_base_rate", String(100)), - Column("low_24h", Double), - Column("high_24h", Double), - Column("price_type", String(10)), - Column("change_from", String(20)), - Column("change_price", Double), - Column("ts", DateTime, primary_key=True), + Column("open", String(50)), + Column("close", String(50)), + Column("high", String(50)), + Column("low", String(50)), + Column("volume", Integer), + Column("amount", String(50)), + Column("time", Integer, primary_key=True), + ) # Python 유저 객체 -class Ticker: +class Candle: contract: str - clast:float - change_percentage:float - total_size:int - volume_24h:int - volume_24h_base:int - volume_24h_quote:int - volume_24h_settle:int - funding_rate:float - funding_rate_indicative:float - quanto_base_rate:str - low_24h:float - high_24h:float - price_type:str - change_from:str - change_price:float - ts:str + open:str + close:str + high:str + low:str + volume:int + amount:str + time:int # 테이블과 클래스의 매핑 설정 -mapper_registry.map_imperatively(Ticker, ticker_table) +mapper_registry.map_imperatively(Candle, candle_table) def get_session(): # 데이터베이스 설정 - engine = create_engine('postgresql+psycopg2://bangae1:fpdlwms1@hmsn.ink:35432/coin') + engine = get_engine() mapper_registry.metadata.create_all(engine) # 테이블 생성 Session = sessionmaker(bind=engine) session = Session() - return session \ No newline at end of file + return session + +def get_engine() : + return create_engine('postgresql+psycopg2://bangae1:fpdlwms1@hmsn.ink:35432/coin') diff --git a/test.py b/test.py deleted file mode 100644 index d120dff..0000000 --- a/test.py +++ /dev/null @@ -1,23 +0,0 @@ -from sqlalchemy.orm import Session -from sqlalchemy import and_, or_, desc, asc - -from models import get_session, Ticker - - -def get_ticker(session: Session, contract: str): - return session.query(Ticker).filter(Ticker.contract == contract).limit(500).all() - -def get_ticker_count(session: Session, contract: str): - return session.query(Ticker).filter(Ticker.contract == contract).count() - -# 사용 예시 - -session = get_session() -try: - cnt = get_ticker_count(session, "BTC_USDT") - print(cnt) - tickers = get_ticker(session, "BTC_USDT") - for ticker in tickers: - print(ticker.clast) -finally: - session.close() \ No newline at end of file