거래량 분포를 분석하여 주요 가격대와 스마트머니의 축적/분산 구간을 식별하는 알고리즘
최대 거래량이 발생한 가격대를 식별하여 주요 지지/저항 레벨 파악
70% 거래량이 집중된 가격 범위를 계산하여 공정 가치 구간 도출
매수/매도 압력을 분석하여 스마트머니의 움직임 추적
30일간의 데이터를 50개 가격 구간으로 나누어 각 구간의 거래량을 집계합니다.
최대 거래량 지점(POC)과 70% 거래가 발생한 가치 구간을 계산합니다.
고거래량 노드(지지/저항)와 저거래량 노드(빠른 이동 구간)를 식별합니다.
가격 범위와 거래량 변화를 분석하여 스마트머니의 축적/분산 구간을 파악합니다.
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class VolumeNode:
"""거래량 노드 데이터 클래스"""
price_level: float
volume: float
buy_volume: float
sell_volume: float
time_at_level: int # 해당 가격대에 머문 시간 (분)
@dataclass
class VolumeProfileSignal:
"""거래량 프로파일 시그널"""
timestamp: datetime
symbol: str
signal_type: str # 'support', 'resistance', 'breakout', 'accumulation'
price_level: float
volume_strength: float
confidence: float
target_levels: List[float]
class VolumeProfileAnalyzer:
"""거래량 프로파일 분석 및 주요 가격대 식별 알고리즘"""
def __init__(self, bins: int = 50, lookback_days: int = 30):
self.num_bins = bins
self.lookback_days = lookback_days
self.value_area_percentage = 0.70 # 가치 영역 70%
self.poc_threshold = 0.15 # POC 주변 15% 범위
def calculate_volume_profile(self, df: pd.DataFrame) -> Dict[float, VolumeNode]:
"""거래량 프로파일 계산"""
# 가격 범위 설정
price_min = df['low'].min()
price_max = df['high'].max()
price_range = price_max - price_min
# 가격 구간 생성
bins = np.linspace(price_min, price_max, self.num_bins + 1)
volume_profile = {}
for i in range(len(bins) - 1):
bin_low = bins[i]
bin_high = bins[i + 1]
bin_center = (bin_low + bin_high) / 2
# 해당 구간의 거래량 계산
mask = ((df['low'] <= bin_high) & (df['high'] >= bin_low))
if mask.any():
# 구간별 거래량 분배
overlapping_df = df[mask].copy()
# 각 캔들이 해당 구간과 겹치는 비율 계산
overlapping_df['overlap_ratio'] = overlapping_df.apply(
lambda row: self._calculate_overlap_ratio(
row['low'], row['high'], bin_low, bin_high
), axis=1
)
# 총 거래량
total_volume = (overlapping_df['volume'] * overlapping_df['overlap_ratio']).sum()
# 매수/매도 거래량 추정
buy_volume = (overlapping_df[overlapping_df['close'] > overlapping_df['open']]['volume'] *
overlapping_df[overlapping_df['close'] > overlapping_df['open']]['overlap_ratio']).sum()
sell_volume = total_volume - buy_volume
# 시간 가중치
time_weight = mask.sum()
volume_profile[bin_center] = VolumeNode(
price_level=bin_center,
volume=total_volume,
buy_volume=buy_volume,
sell_volume=sell_volume,
time_at_level=time_weight
)
return volume_profile
def _calculate_overlap_ratio(self, candle_low: float, candle_high: float,
bin_low: float, bin_high: float) -> float:
"""캔들과 가격 구간의 겹침 비율 계산"""
overlap_low = max(candle_low, bin_low)
overlap_high = min(candle_high, bin_high)
if overlap_high > overlap_low:
candle_range = candle_high - candle_low
if candle_range > 0:
return (overlap_high - overlap_low) / candle_range
return 0
def find_poc(self, volume_profile: Dict[float, VolumeNode]) -> Tuple[float, float]:
"""Point of Control (POC) - 최대 거래량 가격대 찾기"""
if not volume_profile:
return 0, 0
poc_price = max(volume_profile.keys(),
key=lambda x: volume_profile[x].volume)
poc_volume = volume_profile[poc_price].volume
return poc_price, poc_volume
def calculate_value_area(self, volume_profile: Dict[float, VolumeNode]) -> Tuple[float, float]:
"""Value Area (VA) - 70% 거래량이 발생한 가격 범위 계산"""
if not volume_profile:
return 0, 0
# 총 거래량
total_volume = sum(node.volume for node in volume_profile.values())
target_volume = total_volume * self.value_area_percentage
# POC 찾기
poc_price, _ = self.find_poc(volume_profile)
# POC를 중심으로 확장하며 Value Area 계산
sorted_prices = sorted(volume_profile.keys())
poc_index = sorted_prices.index(poc_price)
accumulated_volume = volume_profile[poc_price].volume
lower_index = poc_index
upper_index = poc_index
while accumulated_volume < target_volume:
# 위아래 중 거래량이 많은 쪽으로 확장
expand_up = expand_down = 0
if upper_index < len(sorted_prices) - 1:
expand_up = volume_profile[sorted_prices[upper_index + 1]].volume
if lower_index > 0:
expand_down = volume_profile[sorted_prices[lower_index - 1]].volume
if expand_up >= expand_down and upper_index < len(sorted_prices) - 1:
upper_index += 1
accumulated_volume += expand_up
elif lower_index > 0:
lower_index -= 1
accumulated_volume += expand_down
else:
break
value_area_high = sorted_prices[upper_index]
value_area_low = sorted_prices[lower_index]
return value_area_high, value_area_low
def identify_hvn_lvn(self, volume_profile: Dict[float, VolumeNode],
threshold: float = 0.3) -> Tuple[List[float], List[float]]:
"""High Volume Nodes (HVN) 및 Low Volume Nodes (LVN) 식별"""
if not volume_profile:
return [], []
volumes = [node.volume for node in volume_profile.values()]
mean_volume = np.mean(volumes)
std_volume = np.std(volumes)
hvn_levels = [] # 지지/저항 역할
lvn_levels = [] # 브레이크아웃 가능 지점
for price, node in volume_profile.items():
z_score = (node.volume - mean_volume) / (std_volume + 1e-10)
if z_score > 1.5: # HVN
hvn_levels.append(price)
elif z_score < -1.0: # LVN
lvn_levels.append(price)
return hvn_levels, lvn_levels
def analyze_order_flow(self, volume_profile: Dict[float, VolumeNode]) -> Dict:
"""주문 흐름 분석 - 매수/매도 압력 평가"""
if not volume_profile:
return {}
total_buy_volume = sum(node.buy_volume for node in volume_profile.values())
total_sell_volume = sum(node.sell_volume for node in volume_profile.values())
total_volume = total_buy_volume + total_sell_volume
if total_volume == 0:
return {}
# 매수/매도 불균형 계산
buy_pressure = total_buy_volume / total_volume
sell_pressure = total_sell_volume / total_volume
# 누적 델타
cumulative_delta = total_buy_volume - total_sell_volume
# 압력 지점 찾기
pressure_points = []
for price, node in volume_profile.items():
if node.volume > 0:
local_pressure = (node.buy_volume - node.sell_volume) / node.volume
if abs(local_pressure) > 0.3: # 30% 이상 불균형
pressure_points.append({
'price': price,
'pressure': local_pressure,
'volume': node.volume
})
return {
'buy_pressure': buy_pressure,
'sell_pressure': sell_pressure,
'cumulative_delta': cumulative_delta,
'pressure_points': sorted(pressure_points,
key=lambda x: abs(x['pressure']),
reverse=True)[:5]
}
def detect_accumulation_distribution(self, df: pd.DataFrame,
volume_profile: Dict[float, VolumeNode]) -> str:
"""축적/분산 패턴 감지"""
if len(df) < 20:
return 'unknown'
recent_df = df.tail(20)
# 가격 범위 축소 여부
price_range = recent_df['high'].max() - recent_df['low'].min()
avg_range = (df['high'] - df['low']).mean()
# 거래량 증가 여부
recent_volume = recent_df['volume'].mean()
historical_volume = df['volume'].mean()
# POC 근처 거래 집중도
poc_price, _ = self.find_poc(volume_profile)
near_poc = abs(recent_df['close'].mean() - poc_price) / poc_price < 0.02
if price_range < avg_range * 0.7 and recent_volume > historical_volume * 1.2 and near_poc:
return 'accumulation' # 축적 - 스마트머니 매집
elif price_range > avg_range * 1.3 and recent_volume > historical_volume * 1.5:
return 'distribution' # 분산 - 스마트머니 매도
else:
return 'neutral'
def generate_targets(self, current_price: float, hvn_levels: List[float],
lvn_levels: List[float], direction: str) -> List[float]:
"""목표가 설정"""
targets = []
if direction == 'bullish':
# 상승 목표가 - 위쪽 HVN과 LVN
above_hvn = [lvl for lvl in hvn_levels if lvl > current_price]
above_lvn = [lvl for lvl in lvn_levels if lvl > current_price]
if above_lvn: # LVN은 빠르게 통과
targets.append(min(above_lvn))
if above_hvn: # HVN은 저항
targets.extend(sorted(above_hvn)[:3])
elif direction == 'bearish':
# 하락 목표가
below_hvn = [lvl for lvl in hvn_levels if lvl < current_price]
below_lvn = [lvl for lvl in lvn_levels if lvl < current_price]
if below_lvn:
targets.append(max(below_lvn))
if below_hvn:
targets.extend(sorted(below_hvn, reverse=True)[:3])
return targets[:3] # 최대 3개 목표가
def calculate_signal_confidence(self, volume_profile: Dict[float, VolumeNode],
order_flow: Dict, pattern: str) -> float:
"""시그널 신뢰도 계산"""
confidence = 0.5 # 기본 신뢰도
# 거래량 집중도
if volume_profile:
volumes = [node.volume for node in volume_profile.values()]
volume_concentration = np.std(volumes) / (np.mean(volumes) + 1e-10)
confidence += min(volume_concentration * 0.1, 0.2)
# 주문 흐름 강도
if order_flow:
flow_strength = abs(order_flow.get('buy_pressure', 0.5) - 0.5) * 2
confidence += flow_strength * 0.2
# 패턴 확실성
if pattern in ['accumulation', 'distribution']:
confidence += 0.1
return min(confidence, 0.95)
def analyze(self, df: pd.DataFrame, symbol: str) -> Dict:
"""메인 분석 함수"""
# 거래량 프로파일 계산
volume_profile = self.calculate_volume_profile(df)
if not volume_profile:
return {'symbol': symbol, 'error': 'Insufficient data'}
# POC 및 Value Area
poc_price, poc_volume = self.find_poc(volume_profile)
va_high, va_low = self.calculate_value_area(volume_profile)
# HVN/LVN 식별
hvn_levels, lvn_levels = self.identify_hvn_lvn(volume_profile)
# 주문 흐름 분석
order_flow = self.analyze_order_flow(volume_profile)
# 축적/분산 패턴
pattern = self.detect_accumulation_distribution(df, volume_profile)
# 현재 가격 위치 분석
current_price = df['close'].iloc[-1]
price_position = 'above_poc' if current_price > poc_price else 'below_poc'
in_value_area = va_low <= current_price <= va_high
# 시그널 생성
signals = []
# 방향성 결정
if order_flow.get('buy_pressure', 0) > 0.6:
direction = 'bullish'
elif order_flow.get('sell_pressure', 0) > 0.6:
direction = 'bearish'
else:
direction = 'neutral'
# 주요 시그널
if pattern == 'accumulation' and price_position == 'above_poc':
targets = self.generate_targets(current_price, hvn_levels, lvn_levels, 'bullish')
confidence = self.calculate_signal_confidence(volume_profile, order_flow, pattern)
signals.append(VolumeProfileSignal(
timestamp=datetime.now(),
symbol=symbol,
signal_type='accumulation',
price_level=current_price,
volume_strength=poc_volume,
confidence=confidence,
target_levels=targets
))
# 지지/저항 레벨 근처
nearest_hvn = min(hvn_levels, key=lambda x: abs(x - current_price)) if hvn_levels else None
if nearest_hvn and abs(current_price - nearest_hvn) / current_price < 0.01:
signals.append(VolumeProfileSignal(
timestamp=datetime.now(),
symbol=symbol,
signal_type='support' if current_price > nearest_hvn else 'resistance',
price_level=nearest_hvn,
volume_strength=volume_profile[nearest_hvn].volume,
confidence=0.7,
target_levels=[]
))
return {
'symbol': symbol,
'timestamp': datetime.now(),
'current_price': current_price,
'volume_profile_stats': {
'poc': poc_price,
'poc_volume': poc_volume,
'value_area_high': va_high,
'value_area_low': va_low,
'in_value_area': in_value_area
},
'levels': {
'hvn': hvn_levels[:5], # Top 5 HVN
'lvn': lvn_levels[:5] # Top 5 LVN
},
'order_flow': order_flow,
'pattern': pattern,
'direction': direction,
'signals': [
{
'type': sig.signal_type,
'price': sig.price_level,
'confidence': sig.confidence,
'targets': sig.target_levels
} for sig in signals
]
}
# 실시간 모니터링
class VolumeProfileMonitor:
"""실시간 거래량 프로파일 모니터링"""
def __init__(self, update_interval: int = 300): # 5분마다 업데이트
self.analyzer = VolumeProfileAnalyzer()
self.update_interval = update_interval
self.profile_cache = {}
def update_profile(self, symbol: str, new_data: pd.DataFrame):
"""프로파일 업데이트"""
result = self.analyzer.analyze(new_data, symbol)
self.profile_cache[symbol] = {
'timestamp': datetime.now(),
'profile': result
}
return result
def get_market_structure(self, symbols: List[str]) -> Dict:
"""전체 시장 구조 분석"""
market_structure = {
'accumulation': [],
'distribution': [],
'breakout_candidates': []
}
for symbol, cache_data in self.profile_cache.items():
profile = cache_data['profile']
if profile.get('pattern') == 'accumulation':
market_structure['accumulation'].append(symbol)
elif profile.get('pattern') == 'distribution':
market_structure['distribution'].append(symbol)
# LVN 근처 - 브레이크아웃 가능성
if profile.get('levels', {}).get('lvn'):
current_price = profile['current_price']
for lvn in profile['levels']['lvn']:
if abs(current_price - lvn) / current_price < 0.02:
market_structure['breakout_candidates'].append({
'symbol': symbol,
'level': lvn,
'direction': 'up' if current_price < lvn else 'down'
})
break
return market_structure
if __name__ == "__main__":
# 사용 예제
analyzer = VolumeProfileAnalyzer(bins=50, lookback_days=30)
# 실제 사용시 바이낸스 API에서 데이터 로드
# df = fetch_binance_data('BTCUSDT', '1h', limit=720) # 30일 데이터
# result = analyzer.analyze(df, 'BTCUSDT')
# print(json.dumps(result, indent=2, default=str))