import asyncio, json import time from datetime import datetime, tzinfo from zoneinfo import ZoneInfo import gate_api import psycopg2, pandas as pd, pandas_ta as ta import requests from gate_ws import Configuration, Connection, WebSocketResponse from gate_ws.spot import SpotPublicTradeChannel from gate_ws.futures import FuturesPublicTradeChannel, FuturesCandlesticksChannel from psycopg2.extras import execute_values from sqlalchemy import create_engine batch = [] class gate : def __init__(self): self.engine = create_engine("postgresql+psycopg2://bangae1:fpdlwms1@hmsn.ink:35432/coin") self.config = gate_api.Configuration(key='a086691c31868cd5c176a78896c0e977', secret='443ff6756cbc80c13afa50d2df1b8be97a0e89c7e4301c6cfd2088317eadb537') self.conn = psycopg2.connect( host='hmsn.ink', port='35432', dbname='coin', user='bangae1', password='fpdlwms1', options='-c synchronous_commit=off' ) self.cur = self.conn.cursor() self.oldDt = 0 def getAccount(self): api = gate_api.SpotApi(gate_api.ApiClient(self.config)) print(api.list_spot_accounts()) def print_message(self, conn: Connection, response: WebSocketResponse): if response.error: print('error returned: ', response.error) conn.close() return print(response.result) async def main(self): # initialize default connection, which connects to spot WebSocket V4 # it is recommended to use one conn to initialize multiple channels futures_conn = Connection(Configuration(app='futures', settle='usdt', test_net=False, api_key=self.config.key, api_secret=self.config.secret)) # subscribe to any channel you are interested into, with the callback function channel = FuturesCandlesticksChannel(futures_conn, self.on_ob_snapshot) channel.subscribe(["5m","BTC_USDT"]) channel.subscribe(["5m","ETH_USDT"]) channel.subscribe(["5m","SOL_USDT"]) channel.subscribe(["5m","XRP_USDT"]) channel.subscribe(["5m","LINK_USDT"]) # start the client await futures_conn.run() def on_ob_snapshot(self, conn: Connection, resp: WebSocketResponse): if resp.event == 'update': result = resp.result[0] try : insert_sql = ''' INSERT INTO candle (contract, open, close, high, low, volume, amount, time, ins_date) SELECT %(contract)s, %(open)s, %(close)s, %(high)s, %(low)s, %(volume)s, %(amount)s, %(time)s, %(ins_date)s ON CONFLICT (contract, time, ins_date) DO UPDATE SET open = EXCLUDED.open, close = EXCLUDED.close, high = EXCLUDED.high, low = EXCLUDED.low, volume = EXCLUDED.volume, amount = EXCLUDED.amount, ins_date = EXCLUDED.ins_date ''' self.cur.execute(insert_sql, { 'contract': result['n'].replace('5m_', ''), 'open': result['o'], 'close': result['c'], 'high': result['h'], 'low': result['l'], 'volume': result['v'], 'amount': result['a'], 'time': result['t'], 'ins_date': time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(result['t'])), }) # execute_values(self.cur, # '''insert into candle (contract, open, close, high, low, volume, amount, time, ins_date) # VALUES (%s)''', # batch) self.conn.commit() except Exception as exc: print(exc) self.conn.rollback() def getData(self, contract, time=None): limit = 50 if time is not None: if time.find('min') > -1 : limit = 60 * 50 elif time.find('hour') > -1 : limit = (60 * 60) * 50 sql = """ SELECT * FROM ticker WHERE contract = %s ORDER BY ts DESC LIMIT %s """ try : self.df = pd.read_sql(sql, self.engine, params=(contract, limit,)) self.df['ts'] = pd.to_datetime(self.df['ts'], unit='ms') # ms → datetime self.df = self.df.sort_values('ts').set_index('ts') # 2. 15분 봉(ohlcv) 리샘플링 self.ohlcv = self.df['clast'].resample('1s').ohlc() self.ohlcv = self.ohlcv.dropna() # NaN 제거 self.ohlcv = self.ohlcv.tail(50) # 최근 50개만 유지 # self.df = self.df .sort_values('ts') # self.df .set_index('ts', inplace=True) print(self.rsi()) print(self.macd()) print(self.bb()) print(self.atr()) print(self.ema()) except Exception as exc: print(exc) def rsi(self): close = self.ohlcv['close'] rsi = ta.rsi(close, length=14).iloc[-1] rsi_up = "BUY" if rsi < 30 else "SELL" if rsi > 70 else "HOLD" return rsi_up def macd(self): close = self.ohlcv['close'] macd = ta.macd(close, fast=12, slow=26, signal=9) macd_line = macd['MACD_12_26_9'].iloc[-1] signal_line = macd['MACDs_12_26_9'].iloc[-1] macd_cross = "BUY" if macd_line > signal_line and macd.iloc[-2]['MACD_12_26_9'] <= macd.iloc[-2]['MACDs_12_26_9'] else \ "SELL" if macd_line < signal_line and macd.iloc[-2]['MACD_12_26_9'] >= macd.iloc[-2]['MACDs_12_26_9'] else "HOLD" return macd_cross def bb(self): close = self.ohlcv['close'] bb = ta.bbands(close, length=20, std=2) bb_upper = bb['BBU_20_2.0'].iloc[-1] bb_lower = bb['BBL_20_2.0'].iloc[-1] price = close.iloc[-1] bb_sig = "BUY" if price <= bb_lower else "SELL" if price >= bb_upper else "HOLD" return bb_sig def atr(self): close = self.ohlcv['close'] df_1h = self.ohlcv(interval='1h') atr_1h = ta.atr(df_1h['high'], df_1h['low'], df_1h['close'], length=14).iloc[-1] atr = ta.atr(close, close, close, length=14).iloc[-1] price = close.iloc[-1] stop_loss = price - (1.5 * atr_1h) # 예: 1.5 ATR 손절 return stop_loss def ema(self): close = self.ohlcv['close'] ema5 = ta.ema(close, length=5) ema20 = ta.ema(close, length=20) # NaN 제거 if ema5.isna().any() or ema20.isna().any(): return "HOLD" # 방향성 비교 if ema5.iloc[-1] > ema20.iloc[-1] and ema5.iloc[-2] <= ema20.iloc[-2]: return "BUY" elif ema5.iloc[-1] < ema20.iloc[-1] and ema5.iloc[-2] >= ema20.iloc[-2]: return "SELL" else: return "HOLD" def init(self, contract, t): res = requests.get(f'https://fx-api.gateio.ws/api/v4/futures/usdt/candlesticks?contract={contract}&interval=5m&from={t}&limit=2000') lists = res.json() if self.oldDt != lists[len(lists) -1]['t']: self.save(contract, lists) self.oldDt = lists[len(lists) -1]['t'] self.init(contract, lists[len(lists) -1]['t']) def save(self, contract, lists): for list in lists: batch.append(( contract, list['o'], list['c'], list['h'], list['l'], list['v'], 0, list['t'], time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(list['t'])) )) try : execute_values(self.cur, '''insert into candle (contract, open, close, high, low, volume, amount, time, ins_date) VALUES %s''', batch) self.conn.commit() batch.clear() except Exception as exc: self.conn.rollback() batch.clear() # {'o': '7223.5', 'v': 0, 't': 1577860400, 'c': '7223.5', 'l': '7223.5', 'h': '7223.5', 'sum': '0'} if __name__ == '__main__': gate = gate() contracs = [ "BTC_USDT" ,"ETH_USDT" ,"SOL_USDT" ,"XRP_USDT" ,"LINK_USDT" ] for co in contracs : gate.init(co, 1755345900) # loop = asyncio.get_event_loop() # loop.run_until_complete(gate.main()) # loop.close() # gate.getData("SOL_USDT", '5min') # print(datetime.now().timestamp())