import asyncio, json import time from datetime import datetime, tzinfo import gate_api import psycopg2, pandas as pd, pandas_ta as ta from gate_ws import Configuration, Connection, WebSocketResponse from gate_ws.spot import SpotPublicTradeChannel from gate_ws.futures import FuturesBalanceChannel, FuturesTickerChannel, FuturesPublicTradeChannel from psycopg2.extras import execute_values from sqlalchemy import create_engine batch = [] class gate : def __init__(self): self.engine = create_engine("postgresql+psycopg2://bangae1:fpdlwms1@hmsn.ink:35432/coin") self.config = gate_api.Configuration(key='a086691c31868cd5c176a78896c0e977', secret='443ff6756cbc80c13afa50d2df1b8be97a0e89c7e4301c6cfd2088317eadb537') self.conn = psycopg2.connect( host='hmsn.ink', port='35432', dbname='coin', user='bangae1', password='fpdlwms1', options='-c synchronous_commit=off' ) self.cur = self.conn.cursor() def getAccount(self): api = gate_api.SpotApi(gate_api.ApiClient(self.config)) print(api.list_spot_accounts()) def print_message(self, conn: Connection, response: WebSocketResponse): if response.error: print('error returned: ', response.error) conn.close() return print(response.result) async def main(self): # initialize default connection, which connects to spot WebSocket V4 # it is recommended to use one conn to initialize multiple channels futures_conn = Connection(Configuration(app='futures', settle='usdt', test_net=False)) # subscribe to any channel you are interested into, with the callback function channel = FuturesTickerChannel(futures_conn, self.on_ob_snapshot) channel.subscribe(["BTC_USDT"]) channel.subscribe(["ETH_USDT"]) channel.subscribe(["SOL_USDT"]) channel.subscribe(["XRP_USDT"]) channel.subscribe(["LINK_USDT"]) # start the client await futures_conn.run() def on_ob_snapshot(self, conn: Connection, resp: WebSocketResponse): try : result = resp.result[0] except Exception as e: print(f'start {e}') global batch batch.append(( result['contract'], result['last'], result['change_percentage'], result['total_size'], result['volume_24h'], result['volume_24h_base'], result['volume_24h_quote'], result['volume_24h_settle'], result['funding_rate'], result['funding_rate_indicative'], result['quanto_base_rate'], result['low_24h'], result['high_24h'], result['price_type'], result['change_from'], result['change_price'], datetime.now() )) if len(batch) >= 4: try : execute_values(self.cur, '''insert into ticker (contract, clast, change_percentage, total_size, volume_24h, volume_24h_base, volume_24h_quote, volume_24h_settle, funding_rate, funding_rate_indicative, quanto_base_rate, low_24h, high_24h, price_type, change_from, change_price, ts) VALUES %s''', batch) self.conn.commit() batch.clear() except Exception as exc: print(exc) self.conn.rollback() def getData(self, contract, time=None): limit = 50 if time is not None: if time.find('min') > -1 : limit = 60 * 50 elif time.find('hour') > -1 : limit = (60 * 60) * 50 sql = """ SELECT * FROM ticker WHERE contract = %s ORDER BY ts DESC LIMIT %s """ try : self.df = pd.read_sql(sql, self.engine, params=(contract, limit,)) self.df['ts'] = pd.to_datetime(self.df['ts'], unit='ms') # ms → datetime self.df = self.df.sort_values('ts').set_index('ts') # 2. 15분 봉(ohlcv) 리샘플링 self.ohlcv = self.df['clast'].resample('1s').ohlc() self.ohlcv = self.ohlcv.dropna() # NaN 제거 self.ohlcv = self.ohlcv.tail(50) # 최근 50개만 유지 # self.df = self.df .sort_values('ts') # self.df .set_index('ts', inplace=True) print(self.rsi()) print(self.macd()) print(self.bb()) print(self.atr()) print(self.ema()) except Exception as exc: print(exc) def rsi(self): close = self.ohlcv['close'] rsi = ta.rsi(close, length=14).iloc[-1] rsi_up = "BUY" if rsi < 30 else "SELL" if rsi > 70 else "HOLD" return rsi_up def macd(self): close = self.ohlcv['close'] macd = ta.macd(close, fast=12, slow=26, signal=9) macd_line = macd['MACD_12_26_9'].iloc[-1] signal_line = macd['MACDs_12_26_9'].iloc[-1] macd_cross = "BUY" if macd_line > signal_line and macd.iloc[-2]['MACD_12_26_9'] <= macd.iloc[-2]['MACDs_12_26_9'] else \ "SELL" if macd_line < signal_line and macd.iloc[-2]['MACD_12_26_9'] >= macd.iloc[-2]['MACDs_12_26_9'] else "HOLD" return macd_cross def bb(self): close = self.ohlcv['close'] bb = ta.bbands(close, length=20, std=2) bb_upper = bb['BBU_20_2.0'].iloc[-1] bb_lower = bb['BBL_20_2.0'].iloc[-1] price = close.iloc[-1] bb_sig = "BUY" if price <= bb_lower else "SELL" if price >= bb_upper else "HOLD" return bb_sig def atr(self): close = self.ohlcv['close'] df_1h = self.ohlcv(interval='1h') atr_1h = ta.atr(df_1h['high'], df_1h['low'], df_1h['close'], length=14).iloc[-1] atr = ta.atr(close, close, close, length=14).iloc[-1] price = close.iloc[-1] stop_loss = price - (1.5 * atr_1h) # 예: 1.5 ATR 손절 return stop_loss def ema(self): close = self.ohlcv['close'] ema5 = ta.ema(close, length=5) ema20 = ta.ema(close, length=20) # NaN 제거 if ema5.isna().any() or ema20.isna().any(): return "HOLD" # 방향성 비교 if ema5.iloc[-1] > ema20.iloc[-1] and ema5.iloc[-2] <= ema20.iloc[-2]: return "BUY" elif ema5.iloc[-1] < ema20.iloc[-1] and ema5.iloc[-2] >= ema20.iloc[-2]: return "SELL" else: return "HOLD" if __name__ == '__main__': gate = gate() gate.getData("SOL_USDT", '5min') # loop = asyncio.get_event_loop() # loop.run_until_complete(gate.main()) # loop.close() # print(datetime.now().timestamp())