변동성 수축과 확장을 감지하여 강력한 브레이크아웃 시그널을 포착하는 알고리즘
볼린저 밴드가 켈트너 채널 내부로 수축할 때 감지
스퀴즈 해제 시 강력한 방향성 움직임 포착
변동성 확장 가능성을 계산하여 타이밍 예측
20일 이동평균과 표준편차를 이용해 볼린저 밴드를 계산하고, ATR 기반 켈트너 채널을 생성합니다.
볼린저 밴드가 켈트너 채널 내부에 위치하면 스퀴즈 상태로 판단합니다.
선형 회귀를 통해 가격 모멘텀의 방향을 계산하여 브레이크아웃 방향을 예측합니다.
스퀴즈가 해제되고 모멘텀 방향이 확인되면 강력한 매매 시그널을 생성합니다.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import talib
@dataclass
class SqueezeSignal:
"""볼린저 밴드 스퀴즈 시그널 데이터 클래스"""
timestamp: datetime
symbol: str
squeeze_status: str # 'in_squeeze', 'squeeze_release', 'no_squeeze'
volatility_level: float
band_width: float
price_position: float # 밴드 내 가격 위치 (0-1)
momentum: str # 'bullish', 'bearish', 'neutral'
confidence: float
class BollingerSqueezeDetector:
"""볼린저 밴드 스퀴즈 감지 및 변동성 브레이크아웃 알고리즘"""
def __init__(self, period: int = 20, std_dev: float = 2.0, kc_period: int = 20, kc_mult: float = 1.5):
self.bb_period = period
self.bb_std_dev = std_dev
self.kc_period = kc_period # Keltner Channel 기간
self.kc_multiplier = kc_mult
self.squeeze_threshold = 0.001 # 스퀴즈 판단 임계값
def calculate_bollinger_bands(self, prices: pd.Series) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""볼린저 밴드 계산"""
# 중간 밴드 (SMA)
middle_band = prices.rolling(window=self.bb_period).mean()
# 표준편차
std = prices.rolling(window=self.bb_period).std()
# 상단/하단 밴드
upper_band = middle_band + (std * self.bb_std_dev)
lower_band = middle_band - (std * self.bb_std_dev)
return upper_band, middle_band, lower_band
def calculate_keltner_channels(self, df: pd.DataFrame) -> Tuple[pd.Series, pd.Series, pd.Series]:
"""켈트너 채널 계산 (스퀴즈 감지용)"""
# 중간선 (EMA)
middle_line = df['close'].ewm(span=self.kc_period, adjust=False).mean()
# ATR (Average True Range)
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
atr = true_range.rolling(window=self.kc_period).mean()
# 상단/하단 채널
upper_channel = middle_line + (atr * self.kc_multiplier)
lower_channel = middle_line - (atr * self.kc_multiplier)
return upper_channel, middle_line, lower_channel
def detect_squeeze(self, bb_upper: pd.Series, bb_lower: pd.Series,
kc_upper: pd.Series, kc_lower: pd.Series) -> pd.Series:
"""스퀴즈 상태 감지"""
# 볼린저 밴드가 켈트너 채널 내부에 있으면 스퀴즈
squeeze = (bb_upper < kc_upper) & (bb_lower > kc_lower)
return squeeze
def calculate_band_width(self, upper_band: pd.Series, lower_band: pd.Series,
middle_band: pd.Series) -> pd.Series:
"""밴드 폭 계산 (변동성 지표)"""
band_width = (upper_band - lower_band) / middle_band
return band_width
def calculate_momentum(self, prices: pd.Series, period: int = 12) -> pd.Series:
"""모멘텀 계산"""
# 선형 회귀를 이용한 모멘텀
momentum = pd.Series(index=prices.index, dtype=float)
for i in range(period, len(prices)):
y = prices.iloc[i-period:i].values
x = np.arange(period)
# 선형 회귀 기울기
slope = np.polyfit(x, y, 1)[0]
momentum.iloc[i] = slope
return momentum
def analyze_price_position(self, price: float, upper: float, middle: float, lower: float) -> Dict:
"""밴드 내 가격 위치 분석"""
# 가격이 밴드 내 어디에 위치하는지 (0: 하단, 0.5: 중간, 1: 상단)
if price >= upper:
position = 1.0
zone = 'overbought'
elif price <= lower:
position = 0.0
zone = 'oversold'
else:
position = (price - lower) / (upper - lower)
if position > 0.8:
zone = 'upper_band'
elif position < 0.2:
zone = 'lower_band'
else:
zone = 'middle_band'
return {
'position': position,
'zone': zone,
'distance_to_upper': (upper - price) / price,
'distance_to_lower': (price - lower) / price
}
def detect_squeeze_release(self, squeeze_series: pd.Series, lookback: int = 5) -> Optional[str]:
"""스퀴즈 해제 감지"""
if len(squeeze_series) < lookback + 1:
return None
recent_squeeze = squeeze_series.tail(lookback + 1)
# 스퀴즈가 해제되었는지 확인
if recent_squeeze.iloc[-2] and not recent_squeeze.iloc[-1]:
# 스퀴즈에서 벗어남
return 'squeeze_release'
elif not recent_squeeze.iloc[-2] and recent_squeeze.iloc[-1]:
# 스퀴즈 진입
return 'entering_squeeze'
elif all(recent_squeeze.tail(lookback)):
# 지속적인 스퀴즈
return 'in_squeeze'
else:
return 'no_squeeze'
def calculate_volatility_expansion_probability(self, band_width: pd.Series,
lookback: int = 50) -> float:
"""변동성 확장 가능성 계산"""
if len(band_width) < lookback:
return 0.5
recent_width = band_width.tail(lookback)
current_width = band_width.iloc[-1]
# 현재 밴드폭이 역사적으로 얼마나 좁은지
percentile = (recent_width < current_width).sum() / len(recent_width)
# 밴드폭이 좁을수록 변동성 확장 가능성 증가
if percentile < 0.1: # 하위 10%
return 0.9
elif percentile < 0.25: # 하위 25%
return 0.75
elif percentile < 0.5: # 하위 50%
return 0.5
else:
return 0.3
def generate_trading_signals(self, squeeze_status: str, momentum: float,
price_position: Dict, volatility_prob: float) -> Dict:
"""트레이딩 시그널 생성"""
signals = []
if squeeze_status == 'squeeze_release':
# 스퀴즈 해제 - 강한 시그널
if momentum > 0:
signals.append({
'type': 'BUY',
'strength': 'strong',
'reason': 'Squeeze release with bullish momentum',
'confidence': min(0.8 + volatility_prob * 0.2, 0.95)
})
elif momentum < 0:
signals.append({
'type': 'SELL',
'strength': 'strong',
'reason': 'Squeeze release with bearish momentum',
'confidence': min(0.8 + volatility_prob * 0.2, 0.95)
})
elif squeeze_status == 'in_squeeze':
# 스퀴즈 중 - 대기 또는 준비
if volatility_prob > 0.7:
signals.append({
'type': 'PREPARE',
'strength': 'moderate',
'reason': 'High probability of volatility expansion',
'confidence': volatility_prob
})
# 밴드 터치 전략
if price_position['zone'] == 'oversold' and momentum > 0:
signals.append({
'type': 'BUY',
'strength': 'moderate',
'reason': 'Oversold bounce from lower band',
'confidence': 0.65
})
elif price_position['zone'] == 'overbought' and momentum < 0:
signals.append({
'type': 'SELL',
'strength': 'moderate',
'reason': 'Overbought reversal from upper band',
'confidence': 0.65
})
return signals
def calculate_risk_parameters(self, current_price: float, upper: float,
lower: float, atr: float) -> Dict:
"""리스크 파라미터 계산"""
# ATR 기반 손절/익절 계산
stop_loss_distance = atr * 1.5
take_profit_distance = atr * 3.0
return {
'stop_loss_long': current_price - stop_loss_distance,
'stop_loss_short': current_price + stop_loss_distance,
'take_profit_long': current_price + take_profit_distance,
'take_profit_short': current_price - take_profit_distance,
'position_size_factor': 1 / (stop_loss_distance / current_price), # 리스크 기반 포지션 크기
'risk_reward_ratio': take_profit_distance / stop_loss_distance
}
def analyze(self, df: pd.DataFrame, symbol: str) -> Dict:
"""메인 분석 함수"""
# 볼린저 밴드 계산
bb_upper, bb_middle, bb_lower = self.calculate_bollinger_bands(df['close'])
# 켈트너 채널 계산
kc_upper, kc_middle, kc_lower = self.calculate_keltner_channels(df)
# 스퀴즈 감지
squeeze = self.detect_squeeze(bb_upper, bb_lower, kc_upper, kc_lower)
squeeze_status = self.detect_squeeze_release(squeeze)
# 밴드폭 계산
band_width = self.calculate_band_width(bb_upper, bb_lower, bb_middle)
# 모멘텀 계산
momentum = self.calculate_momentum(df['close'])
current_momentum = momentum.iloc[-1] if not momentum.empty else 0
# 가격 위치 분석
current_price = df['close'].iloc[-1]
price_position = self.analyze_price_position(
current_price,
bb_upper.iloc[-1],
bb_middle.iloc[-1],
bb_lower.iloc[-1]
)
# 변동성 확장 가능성
volatility_prob = self.calculate_volatility_expansion_probability(band_width)
# ATR 계산
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
atr = true_range.rolling(window=14).mean().iloc[-1]
# 리스크 파라미터
risk_params = self.calculate_risk_parameters(
current_price, bb_upper.iloc[-1], bb_lower.iloc[-1], atr
)
# 시그널 생성
signals = self.generate_trading_signals(
squeeze_status,
current_momentum,
price_position,
volatility_prob
)
# 모멘텀 방향 결정
if current_momentum > 0.001:
momentum_direction = 'bullish'
elif current_momentum < -0.001:
momentum_direction = 'bearish'
else:
momentum_direction = 'neutral'
return {
'symbol': symbol,
'timestamp': datetime.now(),
'current_price': current_price,
'bollinger_bands': {
'upper': bb_upper.iloc[-1],
'middle': bb_middle.iloc[-1],
'lower': bb_lower.iloc[-1],
'width': band_width.iloc[-1]
},
'squeeze': {
'status': squeeze_status,
'in_squeeze': squeeze.iloc[-1] if not squeeze.empty else False,
'volatility_expansion_probability': volatility_prob
},
'momentum': {
'value': current_momentum,
'direction': momentum_direction
},
'price_position': price_position,
'risk_parameters': risk_params,
'signals': signals
}
# 실시간 모니터링 클래스
class SqueezeMonitor:
"""실시간 스퀴즈 모니터링"""
def __init__(self, symbols: List[str]):
self.symbols = symbols
self.detector = BollingerSqueezeDetector()
self.squeeze_history = {symbol: [] for symbol in symbols}
def scan_markets(self, market_data: Dict[str, pd.DataFrame]) -> List[Dict]:
"""여러 심볼 동시 스캔"""
opportunities = []
for symbol, df in market_data.items():
result = self.detector.analyze(df, symbol)
# 스퀴즈 해제 시그널 우선순위
if result['squeeze']['status'] == 'squeeze_release':
opportunities.append({
'symbol': symbol,
'priority': 'HIGH',
'signal': result['signals'][0] if result['signals'] else None,
'volatility_prob': result['squeeze']['volatility_expansion_probability'],
'momentum': result['momentum']['direction']
})
# 스퀴즈 히스토리 업데이트
self.squeeze_history[symbol].append({
'timestamp': result['timestamp'],
'status': result['squeeze']['status'],
'band_width': result['bollinger_bands']['width']
})
# 우선순위 정렬
opportunities.sort(key=lambda x: x['volatility_prob'], reverse=True)
return opportunities
if __name__ == "__main__":
# 사용 예제
detector = BollingerSqueezeDetector(period=20, std_dev=2.0)
# 실제 사용시 바이낸스 API에서 데이터 로드
# df = fetch_binance_data('BTCUSDT', '1h', limit=200)
# result = detector.analyze(df, 'BTCUSDT')
# print(json.dumps(result, indent=2, default=str))