signalmax
Documentation
API & 통합
API 레퍼런스
거래소 연동
커뮤니티
지원
리소스
튜토리얼
단계별 가이드
활용 사례
바로가기
Version 2.1.0
Technical AnalysisHigh Accuracy

RSI Divergence Algorithm

가격과 RSI 지표 간의 다이버전스를 감지하여 추세 전환점을 포착하는 고급 알고리즘

Bullish Divergence

가격은 낮은 저점을 형성하지만 RSI는 높은 저점을 형성할 때 발생

Bearish Divergence

가격은 높은 고점을 형성하지만 RSI는 낮은 고점을 형성할 때 발생

Hidden Divergence

추세 지속 신호로 사용되는 숨겨진 다이버전스 감지

알고리즘 설명
1

RSI 계산

14일 기간의 RSI를 계산하여 과매수/과매도 구간을 식별합니다.

2

피벗 포인트 탐지

가격과 RSI의 고점/저점을 찾아 피벗 포인트를 식별합니다.

3

다이버전스 분석

가격과 RSI의 방향성 차이를 분석하여 다이버전스를 감지합니다.

4

신뢰도 계산

다이버전스의 강도와 RSI 레벨을 기반으로 시그널 신뢰도를 계산합니다.

소스 코드
rsi_divergence.py
Python
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class DivergenceSignal:
    """RSI 다이버전스 시그널 데이터 클래스"""
    timestamp: datetime
    symbol: str
    signal_type: str  # 'bullish' or 'bearish'
    price_low: float
    price_high: float
    rsi_value: float
    confidence: float
    
class RSIDivergenceDetector:
    """RSI 다이버전스 감지 알고리즘"""
    
    def __init__(self, period: int = 14, oversold: float = 30, overbought: float = 70):
        self.period = period
        self.oversold_level = oversold
        self.overbought_level = overbought
        self.min_pivot_bars = 5
        
    def calculate_rsi(self, prices: pd.Series) -> pd.Series:
        """RSI 지표 계산"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=self.period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=self.period).mean()
        
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    def find_pivot_points(self, data: pd.Series, is_high: bool = True) -> List[int]:
        """피벗 포인트 찾기"""
        pivots = []
        
        for i in range(self.min_pivot_bars, len(data) - self.min_pivot_bars):
            if is_high:
                # 고점 피벗 찾기
                if all(data[i] > data[i-j] for j in range(1, self.min_pivot_bars + 1)) and \
                   all(data[i] > data[i+j] for j in range(1, self.min_pivot_bars + 1)):
                    pivots.append(i)
            else:
                # 저점 피벗 찾기
                if all(data[i] < data[i-j] for j in range(1, self.min_pivot_bars + 1)) and \
                   all(data[i] < data[i+j] for j in range(1, self.min_pivot_bars + 1)):
                    pivots.append(i)
        
        return pivots
    
    def detect_bullish_divergence(self, prices: pd.Series, rsi: pd.Series) -> Optional[DivergenceSignal]:
        """Bullish 다이버전스 감지 (가격은 낮은 저점, RSI는 높은 저점)"""
        price_lows = self.find_pivot_points(prices, is_high=False)
        rsi_lows = self.find_pivot_points(rsi, is_high=False)
        
        if len(price_lows) < 2 or len(rsi_lows) < 2:
            return None
        
        # 최근 두 저점 비교
        if price_lows[-1] > price_lows[-2] + 10:  # 충분한 거리 확보
            price_low1 = prices[price_lows[-2]]
            price_low2 = prices[price_lows[-1]]
            
            # RSI에서 대응하는 저점 찾기
            rsi_idx1 = min(rsi_lows, key=lambda x: abs(x - price_lows[-2]))
            rsi_idx2 = min(rsi_lows, key=lambda x: abs(x - price_lows[-1]))
            
            rsi_low1 = rsi[rsi_idx1]
            rsi_low2 = rsi[rsi_idx2]
            
            # Bullish 다이버전스 조건
            if price_low2 < price_low1 and rsi_low2 > rsi_low1 and rsi_low2 < self.oversold_level:
                confidence = self._calculate_confidence(
                    price_diff=abs(price_low2 - price_low1) / price_low1,
                    rsi_diff=abs(rsi_low2 - rsi_low1),
                    rsi_level=rsi_low2
                )
                
                return DivergenceSignal(
                    timestamp=datetime.now(),
                    symbol="",  # 심볼은 외부에서 설정
                    signal_type="bullish",
                    price_low=price_low2,
                    price_high=prices[price_lows[-1]:].max(),
                    rsi_value=rsi_low2,
                    confidence=confidence
                )
        
        return None
    
    def detect_bearish_divergence(self, prices: pd.Series, rsi: pd.Series) -> Optional[DivergenceSignal]:
        """Bearish 다이버전스 감지 (가격은 높은 고점, RSI는 낮은 고점)"""
        price_highs = self.find_pivot_points(prices, is_high=True)
        rsi_highs = self.find_pivot_points(rsi, is_high=True)
        
        if len(price_highs) < 2 or len(rsi_highs) < 2:
            return None
        
        # 최근 두 고점 비교
        if price_highs[-1] > price_highs[-2] + 10:
            price_high1 = prices[price_highs[-2]]
            price_high2 = prices[price_highs[-1]]
            
            # RSI에서 대응하는 고점 찾기
            rsi_idx1 = min(rsi_highs, key=lambda x: abs(x - price_highs[-2]))
            rsi_idx2 = min(rsi_highs, key=lambda x: abs(x - price_highs[-1]))
            
            rsi_high1 = rsi[rsi_idx1]
            rsi_high2 = rsi[rsi_idx2]
            
            # Bearish 다이버전스 조건
            if price_high2 > price_high1 and rsi_high2 < rsi_high1 and rsi_high2 > self.overbought_level:
                confidence = self._calculate_confidence(
                    price_diff=abs(price_high2 - price_high1) / price_high1,
                    rsi_diff=abs(rsi_high2 - rsi_high1),
                    rsi_level=rsi_high2
                )
                
                return DivergenceSignal(
                    timestamp=datetime.now(),
                    symbol="",
                    signal_type="bearish",
                    price_low=prices[price_highs[-1]:].min(),
                    price_high=price_high2,
                    rsi_value=rsi_high2,
                    confidence=confidence
                )
        
        return None
    
    def _calculate_confidence(self, price_diff: float, rsi_diff: float, rsi_level: float) -> float:
        """시그널 신뢰도 계산"""
        # 가격 차이가 클수록 신뢰도 증가
        price_score = min(price_diff * 10, 1.0) * 0.3
        
        # RSI 차이가 클수록 신뢰도 증가
        rsi_score = min(rsi_diff / 20, 1.0) * 0.3
        
        # RSI가 극단 영역에 있을수록 신뢰도 증가
        if rsi_level < self.oversold_level:
            extreme_score = (self.oversold_level - rsi_level) / self.oversold_level
        elif rsi_level > self.overbought_level:
            extreme_score = (rsi_level - self.overbought_level) / (100 - self.overbought_level)
        else:
            extreme_score = 0
        
        extreme_score *= 0.4
        
        return min(price_score + rsi_score + extreme_score, 1.0)
    
    def analyze(self, df: pd.DataFrame, symbol: str) -> Dict:
        """메인 분석 함수"""
        # RSI 계산
        rsi = self.calculate_rsi(df['close'])
        
        # 다이버전스 감지
        bullish_signal = self.detect_bullish_divergence(df['close'], rsi)
        bearish_signal = self.detect_bearish_divergence(df['close'], rsi)
        
        results = {
            'symbol': symbol,
            'timestamp': datetime.now(),
            'current_price': df['close'].iloc[-1],
            'current_rsi': rsi.iloc[-1],
            'signals': []
        }
        
        if bullish_signal:
            bullish_signal.symbol = symbol
            results['signals'].append({
                'type': 'BUY',
                'reason': 'Bullish RSI Divergence',
                'confidence': bullish_signal.confidence,
                'rsi_value': bullish_signal.rsi_value,
                'entry_price': df['close'].iloc[-1],
                'stop_loss': bullish_signal.price_low * 0.98,
                'take_profit': df['close'].iloc[-1] * 1.05
            })
        
        if bearish_signal:
            bearish_signal.symbol = symbol
            results['signals'].append({
                'type': 'SELL',
                'reason': 'Bearish RSI Divergence',
                'confidence': bearish_signal.confidence,
                'rsi_value': bearish_signal.rsi_value,
                'entry_price': df['close'].iloc[-1],
                'stop_loss': bearish_signal.price_high * 1.02,
                'take_profit': df['close'].iloc[-1] * 0.95
            })
        
        return results

# 사용 예제
if __name__ == "__main__":
    # 샘플 데이터 로드
    detector = RSIDivergenceDetector(period=14, oversold=30, overbought=70)
    
    # 실제 사용시에는 바이낸스 API에서 데이터 수집
    # df = fetch_binance_data('BTCUSDT', '1h', limit=200)
    
    # 분석 실행
    # result = detector.analyze(df, 'BTCUSDT')
    # print(json.dumps(result, indent=2, default=str))
30/70
과매도/과매수
2-4
피벗 포인트 필요
14
RSI 기간
Wilder
개발자 (1978)