mirror of
https://git.hmsn.ink/coin/api.git
synced 2026-03-19 15:54:59 +09:00
104 lines
3.7 KiB
Python
104 lines
3.7 KiB
Python
import asyncio, json, uvicorn, websockets, requests
|
|
from collections import defaultdict
|
|
from fastapi import FastAPI
|
|
from models import get_session, Candle
|
|
import gate_api
|
|
from gate_ws import Configuration, Connection, WebSocketResponse
|
|
from gate_ws.spot import SpotPublicTradeChannel
|
|
from gate_ws.futures import FuturesPublicTradeChannel, FuturesCandlesticksChannel
|
|
|
|
channels = defaultdict(set)
|
|
session = get_session()
|
|
app = FastAPI()
|
|
|
|
gate_channels = set()
|
|
|
|
async def handler(websocket):
|
|
|
|
subscribed = set()
|
|
async def consumer():
|
|
|
|
"""메시지를 계속 읽어 subscribe / unsubscribe 처리"""
|
|
async for message in websocket:
|
|
try:
|
|
data = json.loads(message)
|
|
typ = data["type"]
|
|
ch = data["channel"]
|
|
global ti
|
|
ti = data["time"]
|
|
|
|
if typ == "subscribe":
|
|
channels[ch].add(websocket)
|
|
subscribed.add(ch)
|
|
print(f"{websocket.remote_address} joined {ch}")
|
|
if f'{ti}_{ch}' not in gate_channels :
|
|
channel.subscribe([ti, ch])
|
|
|
|
gate_channels.add(f'{ti}_{ch}')
|
|
|
|
elif typ == "unsubscribe":
|
|
channels[ch].discard(websocket)
|
|
subscribed.discard(ch)
|
|
channel.unsubscribe([ti, ch])
|
|
print(f"{websocket.remote_address} left {ch}")
|
|
|
|
except Exception as e:
|
|
print("Bad message:", e)
|
|
|
|
# consumer를 별도 태스크로 실행
|
|
consumer_task = asyncio.create_task(consumer())
|
|
|
|
try:
|
|
await consumer_task # 소켓이 닫힐 때까지 대기
|
|
except websockets.ConnectionClosed:
|
|
pass
|
|
finally:
|
|
# 연결 끊기면 가입한 채널 모두에서 제거
|
|
for ch in subscribed:
|
|
channels[ch].discard(websocket)
|
|
|
|
async def broadcast_loop(conn: Connection, resp: WebSocketResponse):
|
|
for ch, subs in channels.items():
|
|
if subs: # 구독자가 있을 때만
|
|
result = resp.result[0]
|
|
if result['n'] == f'{ti}_{ch}' :
|
|
payload = json.dumps({"channel": ch, "msg": json.dumps(result)}, default=str, ensure_ascii=False)
|
|
websockets.broadcast(subs, payload)
|
|
|
|
async def gate_conn() :
|
|
config = gate_api.Configuration(key='a086691c31868cd5c176a78896c0e977', secret='443ff6756cbc80c13afa50d2df1b8be97a0e89c7e4301c6cfd2088317eadb537')
|
|
futures_conn = Connection(Configuration(app='futures', settle='usdt', test_net=False, api_key=config.key, api_secret=config.secret))
|
|
return futures_conn
|
|
|
|
async def socket_server() :
|
|
futures_conn = await gate_conn()
|
|
global channel
|
|
channel = FuturesCandlesticksChannel(futures_conn, broadcast_loop)
|
|
async with websockets.serve(handler, "0.0.0.0", 8765) :
|
|
await futures_conn.run()
|
|
# channel.subscribe(["5m","ETH_USDT"])
|
|
# channel.subscribe(["5m","SOL_USDT"])
|
|
# channel.subscribe(["5m","XRP_USDT"])
|
|
# channel.subscribe(["5m","LINK_USDT"])
|
|
|
|
|
|
@app.get("/api/candle/{contract}/{ti}/{to}")
|
|
def get_candle(contract: str, ti: str, to: str = None):
|
|
url = f'https://fx-api.gateio.ws/api/v4/futures/usdt/candlesticks?contract={contract}&interval={ti}&to={to}&limit=1500'
|
|
print(url)
|
|
res = requests.get(url)
|
|
return sorted(res.json(), key=lambda x: x['t'])
|
|
|
|
|
|
async def main():
|
|
config = uvicorn.Config(app=app, host="0.0.0.0", port=7010, lifespan="off", reload=True)
|
|
server = uvicorn.Server(config)
|
|
|
|
# 병렬로 소켓 서버와 FastAPI 실행
|
|
await asyncio.gather(
|
|
socket_server(), # 비동기 소켓 서버
|
|
server.serve() # FastAPI 서버
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |