mirror of
https://git.hmsn.ink/coin/api.git
synced 2026-03-20 00:02:16 +09:00
a
This commit is contained in:
84
api.py
84
api.py
@@ -1,84 +0,0 @@
|
|||||||
import asyncio, json
|
|
||||||
from typing import Literal
|
|
||||||
from fastapi import FastAPI
|
|
||||||
from models import get_session, get_engine, Candle
|
|
||||||
import pandas as pd, pandas_ta as ta
|
|
||||||
|
|
||||||
|
|
||||||
session = get_session()
|
|
||||||
app = FastAPI()
|
|
||||||
|
|
||||||
|
|
||||||
def get_ticker_all(
|
|
||||||
contract: str,
|
|
||||||
target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w']
|
|
||||||
):
|
|
||||||
print(target_interval)
|
|
||||||
"""
|
|
||||||
10초 간격 캔들 데이터를 지정된 봉으로 리샘플링
|
|
||||||
|
|
||||||
Args:
|
|
||||||
df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume)
|
|
||||||
contract: 계약명 (필터링용)
|
|
||||||
target_interval: 목표 봉 (예: '1m', '1h', '1d')
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
pd.DataFrame: 리샘플링된 OHLCV 데이터
|
|
||||||
"""
|
|
||||||
results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).all()
|
|
||||||
|
|
||||||
df = pd.DataFrame([row.__dict__ for row in results])
|
|
||||||
|
|
||||||
# 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime)
|
|
||||||
df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds
|
|
||||||
|
|
||||||
# 3. index로 설정
|
|
||||||
df.set_index('ts', inplace=True)
|
|
||||||
|
|
||||||
# 4. resample 주기 설정
|
|
||||||
freq_map = {
|
|
||||||
'1m': '1Min',
|
|
||||||
'5m': '5Min',
|
|
||||||
'15m': '15Min',
|
|
||||||
'30m': '30Min',
|
|
||||||
'1h': '1H',
|
|
||||||
'4h': '4H',
|
|
||||||
'1d': '1D',
|
|
||||||
'1w': '1W'
|
|
||||||
}
|
|
||||||
|
|
||||||
if target_interval not in freq_map:
|
|
||||||
raise ValueError(f"Unsupported interval: {target_interval}")
|
|
||||||
|
|
||||||
freq = freq_map[target_interval]
|
|
||||||
|
|
||||||
# 4. 리샘플링 (OHLCV)
|
|
||||||
ohlc = df['close'].resample(freq).ohlc() # open, high, low, close
|
|
||||||
volume = df['volume'].resample(freq).sum().rename('volume')
|
|
||||||
|
|
||||||
# 5. 병합
|
|
||||||
result = pd.concat([ohlc, volume], axis=1).dropna()
|
|
||||||
|
|
||||||
# 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가
|
|
||||||
result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64)
|
|
||||||
# 또는 밀리초 단위로 정확히:
|
|
||||||
result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms
|
|
||||||
|
|
||||||
# 7. (옵션) 'time'을 맨 앞으로 이동
|
|
||||||
cols = ['time', 'open', 'high', 'low', 'close', 'volume']
|
|
||||||
result = result[cols]
|
|
||||||
result.tail(1)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
@app.get("/api/candle/{contract}/{time}")
|
|
||||||
def get_candle(contract: str, time: str):
|
|
||||||
results = get_ticker_all(contract, time)
|
|
||||||
|
|
||||||
dict_row = pd.DataFrame(results).to_json(orient='table')
|
|
||||||
# payload = json.dumps({"msg": dict_row}, default=str, ensure_ascii=False)
|
|
||||||
return json.loads(dict_row)
|
|
||||||
|
|
||||||
@app.get("/")
|
|
||||||
async def test():
|
|
||||||
return {"msg":"hello"}
|
|
||||||
132
main.py
132
main.py
@@ -1,81 +1,20 @@
|
|||||||
import asyncio
|
import asyncio, json, uvicorn, websockets, requests
|
||||||
import json
|
|
||||||
from dataclasses import asdict
|
|
||||||
from typing import Literal
|
|
||||||
import websockets
|
|
||||||
from sqlalchemy.orm import Session
|
|
||||||
from sqlalchemy import and_, or_, desc, asc
|
|
||||||
from sqlalchemy.sql import text
|
|
||||||
from models import get_session, get_engine, Candle
|
|
||||||
import pandas as pd, pandas_ta as ta
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from models import get_session, Candle
|
||||||
|
import gate_api
|
||||||
|
from gate_ws import Configuration, Connection, WebSocketResponse
|
||||||
|
from gate_ws.spot import SpotPublicTradeChannel
|
||||||
|
from gate_ws.futures import FuturesPublicTradeChannel, FuturesCandlesticksChannel
|
||||||
|
|
||||||
channels = defaultdict(set)
|
channels = defaultdict(set)
|
||||||
|
|
||||||
session = get_session()
|
session = get_session()
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
def get_ticker(
|
gate_channels = set()
|
||||||
contract: str,
|
|
||||||
target_interval: Literal['1m', '5m', '15m', '30m', '1h', '4h', '1d', '1w']
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
10초 간격 캔들 데이터를 지정된 봉으로 리샘플링
|
|
||||||
|
|
||||||
Args:
|
|
||||||
df: 원본 10초 데이터 (index=ts, columns=open, high, low, close, volume)
|
|
||||||
contract: 계약명 (필터링용)
|
|
||||||
target_interval: 목표 봉 (예: '1m', '1h', '1d')
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
pd.DataFrame: 리샘플링된 OHLCV 데이터
|
|
||||||
"""
|
|
||||||
results = session.query(Candle).filter(Candle.contract.like(f'%{contract}%')).limit(1000).all()
|
|
||||||
|
|
||||||
df = pd.DataFrame([row.__dict__ for row in results])
|
|
||||||
|
|
||||||
# 2. time 컬럼을 datetime으로 변환 (밀리초 -> 초 -> datetime)
|
|
||||||
df['ts'] = pd.to_datetime(df['time'], unit='s') # or 's' if in seconds
|
|
||||||
|
|
||||||
# 3. index로 설정
|
|
||||||
df.set_index('ts', inplace=True)
|
|
||||||
|
|
||||||
# 4. resample 주기 설정
|
|
||||||
freq_map = {
|
|
||||||
'1m': '1Min',
|
|
||||||
'5m': '5Min',
|
|
||||||
'15m': '15Min',
|
|
||||||
'30m': '30Min',
|
|
||||||
'1h': '1H',
|
|
||||||
'4h': '4H',
|
|
||||||
'1d': '1D',
|
|
||||||
'1w': '1W'
|
|
||||||
}
|
|
||||||
|
|
||||||
if target_interval not in freq_map:
|
|
||||||
raise ValueError(f"Unsupported interval: {target_interval}")
|
|
||||||
|
|
||||||
freq = freq_map[target_interval]
|
|
||||||
|
|
||||||
# 4. 리샘플링 (OHLCV)
|
|
||||||
ohlc = df['close'].resample(freq).ohlc() # open, high, low, close
|
|
||||||
volume = df['volume'].resample(freq).sum().rename('volume')
|
|
||||||
|
|
||||||
# 5. 병합
|
|
||||||
result = pd.concat([ohlc, volume], axis=1).dropna()
|
|
||||||
|
|
||||||
# 6. ✅ index(datetime)를 'time' 컬럼으로 유닉스 밀리초 추가
|
|
||||||
result['time'] = (result.index.astype('int64') // 1_000_000_000) # 나노초 → 밀리초 (int64)
|
|
||||||
# 또는 밀리초 단위로 정확히:
|
|
||||||
result['time'] = result.index.view('int64') // 1_000_000_000 # pd.Timestamp → 유닉스 ms
|
|
||||||
|
|
||||||
# 7. (옵션) 'time'을 맨 앞으로 이동
|
|
||||||
cols = ['time', 'open', 'high', 'low', 'close', 'volume']
|
|
||||||
results = result[cols]
|
|
||||||
last = results.tail(1)
|
|
||||||
|
|
||||||
return last
|
|
||||||
|
|
||||||
async def handler(websocket):
|
async def handler(websocket):
|
||||||
|
|
||||||
subscribed = set()
|
subscribed = set()
|
||||||
async def consumer():
|
async def consumer():
|
||||||
|
|
||||||
@@ -92,10 +31,15 @@ async def handler(websocket):
|
|||||||
channels[ch].add(websocket)
|
channels[ch].add(websocket)
|
||||||
subscribed.add(ch)
|
subscribed.add(ch)
|
||||||
print(f"{websocket.remote_address} joined {ch}")
|
print(f"{websocket.remote_address} joined {ch}")
|
||||||
|
if f'{ti}_{ch}' not in gate_channels :
|
||||||
|
channel.subscribe([ti, ch])
|
||||||
|
|
||||||
|
gate_channels.add(f'{ti}_{ch}')
|
||||||
|
|
||||||
elif typ == "unsubscribe":
|
elif typ == "unsubscribe":
|
||||||
channels[ch].discard(websocket)
|
channels[ch].discard(websocket)
|
||||||
subscribed.discard(ch)
|
subscribed.discard(ch)
|
||||||
|
channel.unsubscribe([ti, ch])
|
||||||
print(f"{websocket.remote_address} left {ch}")
|
print(f"{websocket.remote_address} left {ch}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -113,20 +57,48 @@ async def handler(websocket):
|
|||||||
for ch in subscribed:
|
for ch in subscribed:
|
||||||
channels[ch].discard(websocket)
|
channels[ch].discard(websocket)
|
||||||
|
|
||||||
async def broadcast_loop():
|
async def broadcast_loop(conn: Connection, resp: WebSocketResponse):
|
||||||
"""1초마다 모든 채널에 tick 브로드캐스트"""
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(2)
|
|
||||||
for ch, subs in channels.items():
|
for ch, subs in channels.items():
|
||||||
if subs: # 구독자가 있을 때만
|
if subs: # 구독자가 있을 때만
|
||||||
ticker = get_ticker(ch, '1m')
|
result = resp.result[0]
|
||||||
dict_row = pd.DataFrame(ticker).to_json(orient='table')
|
if result['n'] == f'{ti}_{ch}' :
|
||||||
payload = json.dumps({"channel": ch, "msg": dict_row}, default=str, ensure_ascii=False)
|
payload = json.dumps({"channel": ch, "msg": json.dumps(result)}, default=str, ensure_ascii=False)
|
||||||
websockets.broadcast(subs, payload)
|
websockets.broadcast(subs, payload)
|
||||||
|
|
||||||
async def main():
|
async def gate_conn() :
|
||||||
|
config = gate_api.Configuration(key='a086691c31868cd5c176a78896c0e977', secret='443ff6756cbc80c13afa50d2df1b8be97a0e89c7e4301c6cfd2088317eadb537')
|
||||||
|
futures_conn = Connection(Configuration(app='futures', settle='usdt', test_net=False, api_key=config.key, api_secret=config.secret))
|
||||||
|
return futures_conn
|
||||||
|
|
||||||
|
async def socket_server() :
|
||||||
|
futures_conn = await gate_conn()
|
||||||
|
global channel
|
||||||
|
channel = FuturesCandlesticksChannel(futures_conn, broadcast_loop)
|
||||||
async with websockets.serve(handler, "0.0.0.0", 8765) :
|
async with websockets.serve(handler, "0.0.0.0", 8765) :
|
||||||
await broadcast_loop()
|
await futures_conn.run()
|
||||||
|
# channel.subscribe(["5m","ETH_USDT"])
|
||||||
|
# channel.subscribe(["5m","SOL_USDT"])
|
||||||
|
# channel.subscribe(["5m","XRP_USDT"])
|
||||||
|
# channel.subscribe(["5m","LINK_USDT"])
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/api/candle/{contract}/{ti}/{to}")
|
||||||
|
def get_candle(contract: str, ti: str, to: str = None):
|
||||||
|
url = f'https://fx-api.gateio.ws/api/v4/futures/usdt/candlesticks?contract={contract}&interval={ti}&to={to}&limit=1500'
|
||||||
|
print(url)
|
||||||
|
res = requests.get(url)
|
||||||
|
return sorted(res.json(), key=lambda x: x['t'])
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
config = uvicorn.Config(app=app, host="0.0.0.0", port=7010, lifespan="off", reload=True)
|
||||||
|
server = uvicorn.Server(config)
|
||||||
|
|
||||||
|
# 병렬로 소켓 서버와 FastAPI 실행
|
||||||
|
await asyncio.gather(
|
||||||
|
socket_server(), # 비동기 소켓 서버
|
||||||
|
server.serve() # FastAPI 서버
|
||||||
|
)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
Reference in New Issue
Block a user