이동평균 수렴확산 지표를 활용한 모멘텀 기반 트레이딩 전략
MACD가 시그널 라인을 상향 돌파할 때 매수 시그널 발생
MACD가 시그널 라인을 하향 돌파할 때 매도 시그널 발생
히스토그램 변화로 모멘텀 강도와 방향 분석
12일 단기 EMA와 26일 장기 EMA를 계산하여 가격 추세를 파악합니다.
단기 EMA에서 장기 EMA를 뺀 값으로 MACD 라인을 생성합니다.
MACD의 9일 EMA를 계산하여 시그널 라인을 생성합니다.
MACD와 시그널 라인의 교차점을 찾아 매매 시그널을 생성합니다.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import talib
@dataclass
class MACDSignal:
"""MACD 크로스오버 시그널 데이터 클래스"""
timestamp: datetime
symbol: str
signal_type: str # 'golden_cross' or 'death_cross'
macd_value: float
signal_value: float
histogram: float
price: float
confidence: float
strength: str # 'weak', 'moderate', 'strong'
class MACDCrossoverDetector:
"""MACD 크로스오버 감지 및 분석 알고리즘"""
def __init__(self, fast_period: int = 12, slow_period: int = 26, signal_period: int = 9):
self.fast_period = fast_period
self.slow_period = slow_period
self.signal_period = signal_period
self.histogram_threshold = 0.001 # 히스토그램 변화 임계값
def calculate_macd(self, prices: pd.Series) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""MACD 지표 계산"""
# EMA 계산
ema_fast = prices.ewm(span=self.fast_period, adjust=False).mean()
ema_slow = prices.ewm(span=self.slow_period, adjust=False).mean()
# MACD 라인 계산
macd_line = ema_fast - ema_slow
# 시그널 라인 계산
signal_line = macd_line.ewm(span=self.signal_period, adjust=False).mean()
# 히스토그램 계산
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def detect_crossover(self, macd: pd.Series, signal: pd.Series) -> Optional[str]:
"""MACD와 시그널 라인의 크로스오버 감지"""
if len(macd) < 2 or len(signal) < 2:
return None
# 이전 값과 현재 값 비교
prev_macd = macd.iloc[-2]
curr_macd = macd.iloc[-1]
prev_signal = signal.iloc[-2]
curr_signal = signal.iloc[-1]
# Golden Cross (상향 돌파)
if prev_macd <= prev_signal and curr_macd > curr_signal:
return 'golden_cross'
# Death Cross (하향 돌파)
elif prev_macd >= prev_signal and curr_macd < curr_signal:
return 'death_cross'
return None
def analyze_histogram_trend(self, histogram: pd.Series, lookback: int = 5) -> Dict:
"""히스토그램 추세 분석"""
if len(histogram) < lookback:
return {'trend': 'unknown', 'strength': 0}
recent_hist = histogram.tail(lookback)
# 추세 방향 계산
changes = recent_hist.diff().dropna()
positive_changes = (changes > 0).sum()
negative_changes = (changes < 0).sum()
if positive_changes > negative_changes:
trend = 'bullish'
elif negative_changes > positive_changes:
trend = 'bearish'
else:
trend = 'neutral'
# 추세 강도 계산
strength = abs(recent_hist.iloc[-1] - recent_hist.iloc[0]) / (recent_hist.iloc[0] + 0.0001)
return {
'trend': trend,
'strength': min(strength, 1.0),
'momentum': recent_hist.iloc[-1] - recent_hist.iloc[-2]
}
def calculate_divergence(self, prices: pd.Series, macd: pd.Series, lookback: int = 20) -> Optional[str]:
"""가격과 MACD 간의 다이버전스 감지"""
if len(prices) < lookback or len(macd) < lookback:
return None
recent_prices = prices.tail(lookback)
recent_macd = macd.tail(lookback)
# 가격과 MACD의 추세 계산
price_trend = np.polyfit(range(lookback), recent_prices, 1)[0]
macd_trend = np.polyfit(range(lookback), recent_macd, 1)[0]
# Bullish Divergence: 가격은 하락, MACD는 상승
if price_trend < 0 and macd_trend > 0:
return 'bullish_divergence'
# Bearish Divergence: 가격은 상승, MACD는 하락
elif price_trend > 0 and macd_trend < 0:
return 'bearish_divergence'
return None
def evaluate_signal_strength(self, macd: float, signal: float, histogram: float,
price_change: float) -> Tuple[str, float]:
"""시그널 강도 평가"""
# 크로스오버 각도 계산
cross_angle = abs(macd - signal) / (abs(macd) + abs(signal) + 0.0001)
# 히스토그램 크기
hist_strength = min(abs(histogram) * 100, 1.0)
# 가격 모멘텀
price_momentum = min(abs(price_change) * 10, 1.0)
# 종합 점수 계산
total_score = (cross_angle * 0.3 + hist_strength * 0.4 + price_momentum * 0.3)
if total_score > 0.7:
return 'strong', total_score
elif total_score > 0.4:
return 'moderate', total_score
else:
return 'weak', total_score
def generate_trading_levels(self, current_price: float, signal_type: str,
strength: str) -> Dict:
"""진입, 손절, 목표가 레벨 생성"""
if signal_type == 'golden_cross':
# 매수 시그널
if strength == 'strong':
stop_loss_pct = 0.02
take_profit_pct = 0.06
elif strength == 'moderate':
stop_loss_pct = 0.015
take_profit_pct = 0.04
else:
stop_loss_pct = 0.01
take_profit_pct = 0.025
return {
'entry': current_price,
'stop_loss': current_price * (1 - stop_loss_pct),
'take_profit_1': current_price * (1 + take_profit_pct * 0.5),
'take_profit_2': current_price * (1 + take_profit_pct),
'take_profit_3': current_price * (1 + take_profit_pct * 1.5)
}
elif signal_type == 'death_cross':
# 매도 시그널
if strength == 'strong':
stop_loss_pct = 0.02
take_profit_pct = 0.06
elif strength == 'moderate':
stop_loss_pct = 0.015
take_profit_pct = 0.04
else:
stop_loss_pct = 0.01
take_profit_pct = 0.025
return {
'entry': current_price,
'stop_loss': current_price * (1 + stop_loss_pct),
'take_profit_1': current_price * (1 - take_profit_pct * 0.5),
'take_profit_2': current_price * (1 - take_profit_pct),
'take_profit_3': current_price * (1 - take_profit_pct * 1.5)
}
return {}
def analyze(self, df: pd.DataFrame, symbol: str) -> Dict:
"""메인 분석 함수"""
# MACD 계산
macd_line, signal_line, histogram = self.calculate_macd(df['close'])
# 크로스오버 감지
crossover_type = self.detect_crossover(macd_line, signal_line)
# 히스토그램 추세 분석
hist_trend = self.analyze_histogram_trend(histogram)
# 다이버전스 감지
divergence = self.calculate_divergence(df['close'], macd_line)
# 가격 변화율 계산
price_change = (df['close'].iloc[-1] - df['close'].iloc[-2]) / df['close'].iloc[-2]
results = {
'symbol': symbol,
'timestamp': datetime.now(),
'current_price': df['close'].iloc[-1],
'macd': macd_line.iloc[-1],
'signal': signal_line.iloc[-1],
'histogram': histogram.iloc[-1],
'histogram_trend': hist_trend,
'divergence': divergence,
'signals': []
}
if crossover_type:
# 시그널 강도 평가
strength, confidence = self.evaluate_signal_strength(
macd_line.iloc[-1],
signal_line.iloc[-1],
histogram.iloc[-1],
price_change
)
# 트레이딩 레벨 생성
levels = self.generate_trading_levels(
df['close'].iloc[-1],
crossover_type,
strength
)
signal = MACDSignal(
timestamp=datetime.now(),
symbol=symbol,
signal_type=crossover_type,
macd_value=macd_line.iloc[-1],
signal_value=signal_line.iloc[-1],
histogram=histogram.iloc[-1],
price=df['close'].iloc[-1],
confidence=confidence,
strength=strength
)
results['signals'].append({
'type': 'BUY' if crossover_type == 'golden_cross' else 'SELL',
'reason': f'MACD {crossover_type.replace("_", " ").title()}',
'strength': strength,
'confidence': confidence,
'levels': levels,
'histogram_trend': hist_trend['trend'],
'divergence': divergence
})
return results
# 백테스팅 함수
def backtest_strategy(df: pd.DataFrame, initial_capital: float = 10000) -> Dict:
"""MACD 전략 백테스팅"""
detector = MACDCrossoverDetector()
capital = initial_capital
position = 0
trades = []
for i in range(50, len(df)):
window_df = df.iloc[:i+1]
result = detector.analyze(window_df, 'BACKTEST')
if result['signals']:
signal = result['signals'][0]
if signal['type'] == 'BUY' and position == 0:
# 매수
position = capital / df['close'].iloc[i]
trades.append({
'type': 'BUY',
'price': df['close'].iloc[i],
'timestamp': df.index[i],
'confidence': signal['confidence']
})
elif signal['type'] == 'SELL' and position > 0:
# 매도
capital = position * df['close'].iloc[i]
position = 0
trades.append({
'type': 'SELL',
'price': df['close'].iloc[i],
'timestamp': df.index[i],
'profit': capital - initial_capital
})
# 최종 포지션 정리
if position > 0:
capital = position * df['close'].iloc[-1]
return {
'initial_capital': initial_capital,
'final_capital': capital,
'total_return': (capital - initial_capital) / initial_capital * 100,
'total_trades': len(trades),
'winning_trades': sum(1 for t in trades if t.get('profit', 0) > 0),
'trades': trades
}
if __name__ == "__main__":
# 사용 예제
detector = MACDCrossoverDetector(fast_period=12, slow_period=26, signal_period=9)
# 실제 사용시 바이낸스 API에서 데이터 로드
# df = fetch_binance_data('BTCUSDT', '1h', limit=200)
# result = detector.analyze(df, 'BTCUSDT')
# print(json.dumps(result, indent=2, default=str))