가격과 RSI 지표 간의 다이버전스를 감지하여 추세 전환점을 포착하는 고급 알고리즘
가격은 낮은 저점을 형성하지만 RSI는 높은 저점을 형성할 때 발생
가격은 높은 고점을 형성하지만 RSI는 낮은 고점을 형성할 때 발생
추세 지속 신호로 사용되는 숨겨진 다이버전스 감지
14일 기간의 RSI를 계산하여 과매수/과매도 구간을 식별합니다.
가격과 RSI의 고점/저점을 찾아 피벗 포인트를 식별합니다.
가격과 RSI의 방향성 차이를 분석하여 다이버전스를 감지합니다.
다이버전스의 강도와 RSI 레벨을 기반으로 시그널 신뢰도를 계산합니다.
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DivergenceSignal:
"""RSI 다이버전스 시그널 데이터 클래스"""
timestamp: datetime
symbol: str
signal_type: str # 'bullish' or 'bearish'
price_low: float
price_high: float
rsi_value: float
confidence: float
class RSIDivergenceDetector:
"""RSI 다이버전스 감지 알고리즘"""
def __init__(self, period: int = 14, oversold: float = 30, overbought: float = 70):
self.period = period
self.oversold_level = oversold
self.overbought_level = overbought
self.min_pivot_bars = 5
def calculate_rsi(self, prices: pd.Series) -> pd.Series:
"""RSI 지표 계산"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=self.period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=self.period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def find_pivot_points(self, data: pd.Series, is_high: bool = True) -> List[int]:
"""피벗 포인트 찾기"""
pivots = []
for i in range(self.min_pivot_bars, len(data) - self.min_pivot_bars):
if is_high:
# 고점 피벗 찾기
if all(data[i] > data[i-j] for j in range(1, self.min_pivot_bars + 1)) and \
all(data[i] > data[i+j] for j in range(1, self.min_pivot_bars + 1)):
pivots.append(i)
else:
# 저점 피벗 찾기
if all(data[i] < data[i-j] for j in range(1, self.min_pivot_bars + 1)) and \
all(data[i] < data[i+j] for j in range(1, self.min_pivot_bars + 1)):
pivots.append(i)
return pivots
def detect_bullish_divergence(self, prices: pd.Series, rsi: pd.Series) -> Optional[DivergenceSignal]:
"""Bullish 다이버전스 감지 (가격은 낮은 저점, RSI는 높은 저점)"""
price_lows = self.find_pivot_points(prices, is_high=False)
rsi_lows = self.find_pivot_points(rsi, is_high=False)
if len(price_lows) < 2 or len(rsi_lows) < 2:
return None
# 최근 두 저점 비교
if price_lows[-1] > price_lows[-2] + 10: # 충분한 거리 확보
price_low1 = prices[price_lows[-2]]
price_low2 = prices[price_lows[-1]]
# RSI에서 대응하는 저점 찾기
rsi_idx1 = min(rsi_lows, key=lambda x: abs(x - price_lows[-2]))
rsi_idx2 = min(rsi_lows, key=lambda x: abs(x - price_lows[-1]))
rsi_low1 = rsi[rsi_idx1]
rsi_low2 = rsi[rsi_idx2]
# Bullish 다이버전스 조건
if price_low2 < price_low1 and rsi_low2 > rsi_low1 and rsi_low2 < self.oversold_level:
confidence = self._calculate_confidence(
price_diff=abs(price_low2 - price_low1) / price_low1,
rsi_diff=abs(rsi_low2 - rsi_low1),
rsi_level=rsi_low2
)
return DivergenceSignal(
timestamp=datetime.now(),
symbol="", # 심볼은 외부에서 설정
signal_type="bullish",
price_low=price_low2,
price_high=prices[price_lows[-1]:].max(),
rsi_value=rsi_low2,
confidence=confidence
)
return None
def detect_bearish_divergence(self, prices: pd.Series, rsi: pd.Series) -> Optional[DivergenceSignal]:
"""Bearish 다이버전스 감지 (가격은 높은 고점, RSI는 낮은 고점)"""
price_highs = self.find_pivot_points(prices, is_high=True)
rsi_highs = self.find_pivot_points(rsi, is_high=True)
if len(price_highs) < 2 or len(rsi_highs) < 2:
return None
# 최근 두 고점 비교
if price_highs[-1] > price_highs[-2] + 10:
price_high1 = prices[price_highs[-2]]
price_high2 = prices[price_highs[-1]]
# RSI에서 대응하는 고점 찾기
rsi_idx1 = min(rsi_highs, key=lambda x: abs(x - price_highs[-2]))
rsi_idx2 = min(rsi_highs, key=lambda x: abs(x - price_highs[-1]))
rsi_high1 = rsi[rsi_idx1]
rsi_high2 = rsi[rsi_idx2]
# Bearish 다이버전스 조건
if price_high2 > price_high1 and rsi_high2 < rsi_high1 and rsi_high2 > self.overbought_level:
confidence = self._calculate_confidence(
price_diff=abs(price_high2 - price_high1) / price_high1,
rsi_diff=abs(rsi_high2 - rsi_high1),
rsi_level=rsi_high2
)
return DivergenceSignal(
timestamp=datetime.now(),
symbol="",
signal_type="bearish",
price_low=prices[price_highs[-1]:].min(),
price_high=price_high2,
rsi_value=rsi_high2,
confidence=confidence
)
return None
def _calculate_confidence(self, price_diff: float, rsi_diff: float, rsi_level: float) -> float:
"""시그널 신뢰도 계산"""
# 가격 차이가 클수록 신뢰도 증가
price_score = min(price_diff * 10, 1.0) * 0.3
# RSI 차이가 클수록 신뢰도 증가
rsi_score = min(rsi_diff / 20, 1.0) * 0.3
# RSI가 극단 영역에 있을수록 신뢰도 증가
if rsi_level < self.oversold_level:
extreme_score = (self.oversold_level - rsi_level) / self.oversold_level
elif rsi_level > self.overbought_level:
extreme_score = (rsi_level - self.overbought_level) / (100 - self.overbought_level)
else:
extreme_score = 0
extreme_score *= 0.4
return min(price_score + rsi_score + extreme_score, 1.0)
def analyze(self, df: pd.DataFrame, symbol: str) -> Dict:
"""메인 분석 함수"""
# RSI 계산
rsi = self.calculate_rsi(df['close'])
# 다이버전스 감지
bullish_signal = self.detect_bullish_divergence(df['close'], rsi)
bearish_signal = self.detect_bearish_divergence(df['close'], rsi)
results = {
'symbol': symbol,
'timestamp': datetime.now(),
'current_price': df['close'].iloc[-1],
'current_rsi': rsi.iloc[-1],
'signals': []
}
if bullish_signal:
bullish_signal.symbol = symbol
results['signals'].append({
'type': 'BUY',
'reason': 'Bullish RSI Divergence',
'confidence': bullish_signal.confidence,
'rsi_value': bullish_signal.rsi_value,
'entry_price': df['close'].iloc[-1],
'stop_loss': bullish_signal.price_low * 0.98,
'take_profit': df['close'].iloc[-1] * 1.05
})
if bearish_signal:
bearish_signal.symbol = symbol
results['signals'].append({
'type': 'SELL',
'reason': 'Bearish RSI Divergence',
'confidence': bearish_signal.confidence,
'rsi_value': bearish_signal.rsi_value,
'entry_price': df['close'].iloc[-1],
'stop_loss': bearish_signal.price_high * 1.02,
'take_profit': df['close'].iloc[-1] * 0.95
})
return results
# 사용 예제
if __name__ == "__main__":
# 샘플 데이터 로드
detector = RSIDivergenceDetector(period=14, oversold=30, overbought=70)
# 실제 사용시에는 바이낸스 API에서 데이터 수집
# df = fetch_binance_data('BTCUSDT', '1h', limit=200)
# 분석 실행
# result = detector.analyze(df, 'BTCUSDT')
# print(json.dumps(result, indent=2, default=str))