signalmax
Documentation
API & 통합
API 레퍼런스
거래소 연동
커뮤니티
지원
리소스
튜토리얼
단계별 가이드
활용 사례
바로가기
Version 2.1.0
Market StructureInstitutional Grade

Volume Profile Algorithm

거래량 분포를 분석하여 주요 가격대와 스마트머니의 축적/분산 구간을 식별하는 알고리즘

Point of Control

최대 거래량이 발생한 가격대를 식별하여 주요 지지/저항 레벨 파악

Value Area

70% 거래량이 집중된 가격 범위를 계산하여 공정 가치 구간 도출

Order Flow

매수/매도 압력을 분석하여 스마트머니의 움직임 추적

알고리즘 동작 원리
1

가격대별 거래량 분포 계산

30일간의 데이터를 50개 가격 구간으로 나누어 각 구간의 거래량을 집계합니다.

2

POC 및 Value Area 식별

최대 거래량 지점(POC)과 70% 거래가 발생한 가치 구간을 계산합니다.

3

HVN/LVN 노드 분석

고거래량 노드(지지/저항)와 저거래량 노드(빠른 이동 구간)를 식별합니다.

4

축적/분산 패턴 감지

가격 범위와 거래량 변화를 분석하여 스마트머니의 축적/분산 구간을 파악합니다.

소스 코드
volume_profile.py
Python
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict

@dataclass
class VolumeNode:
    """거래량 노드 데이터 클래스"""
    price_level: float
    volume: float
    buy_volume: float
    sell_volume: float
    time_at_level: int  # 해당 가격대에 머문 시간 (분)
    
@dataclass
class VolumeProfileSignal:
    """거래량 프로파일 시그널"""
    timestamp: datetime
    symbol: str
    signal_type: str  # 'support', 'resistance', 'breakout', 'accumulation'
    price_level: float
    volume_strength: float
    confidence: float
    target_levels: List[float]

class VolumeProfileAnalyzer:
    """거래량 프로파일 분석 및 주요 가격대 식별 알고리즘"""
    
    def __init__(self, bins: int = 50, lookback_days: int = 30):
        self.num_bins = bins
        self.lookback_days = lookback_days
        self.value_area_percentage = 0.70  # 가치 영역 70%
        self.poc_threshold = 0.15  # POC 주변 15% 범위
        
    def calculate_volume_profile(self, df: pd.DataFrame) -> Dict[float, VolumeNode]:
        """거래량 프로파일 계산"""
        # 가격 범위 설정
        price_min = df['low'].min()
        price_max = df['high'].max()
        price_range = price_max - price_min
        
        # 가격 구간 생성
        bins = np.linspace(price_min, price_max, self.num_bins + 1)
        volume_profile = {}
        
        for i in range(len(bins) - 1):
            bin_low = bins[i]
            bin_high = bins[i + 1]
            bin_center = (bin_low + bin_high) / 2
            
            # 해당 구간의 거래량 계산
            mask = ((df['low'] <= bin_high) & (df['high'] >= bin_low))
            
            if mask.any():
                # 구간별 거래량 분배
                overlapping_df = df[mask].copy()
                
                # 각 캔들이 해당 구간과 겹치는 비율 계산
                overlapping_df['overlap_ratio'] = overlapping_df.apply(
                    lambda row: self._calculate_overlap_ratio(
                        row['low'], row['high'], bin_low, bin_high
                    ), axis=1
                )
                
                # 총 거래량
                total_volume = (overlapping_df['volume'] * overlapping_df['overlap_ratio']).sum()
                
                # 매수/매도 거래량 추정
                buy_volume = (overlapping_df[overlapping_df['close'] > overlapping_df['open']]['volume'] * 
                             overlapping_df[overlapping_df['close'] > overlapping_df['open']]['overlap_ratio']).sum()
                sell_volume = total_volume - buy_volume
                
                # 시간 가중치
                time_weight = mask.sum()
                
                volume_profile[bin_center] = VolumeNode(
                    price_level=bin_center,
                    volume=total_volume,
                    buy_volume=buy_volume,
                    sell_volume=sell_volume,
                    time_at_level=time_weight
                )
        
        return volume_profile
    
    def _calculate_overlap_ratio(self, candle_low: float, candle_high: float, 
                                bin_low: float, bin_high: float) -> float:
        """캔들과 가격 구간의 겹침 비율 계산"""
        overlap_low = max(candle_low, bin_low)
        overlap_high = min(candle_high, bin_high)
        
        if overlap_high > overlap_low:
            candle_range = candle_high - candle_low
            if candle_range > 0:
                return (overlap_high - overlap_low) / candle_range
        return 0
    
    def find_poc(self, volume_profile: Dict[float, VolumeNode]) -> Tuple[float, float]:
        """Point of Control (POC) - 최대 거래량 가격대 찾기"""
        if not volume_profile:
            return 0, 0
        
        poc_price = max(volume_profile.keys(), 
                       key=lambda x: volume_profile[x].volume)
        poc_volume = volume_profile[poc_price].volume
        
        return poc_price, poc_volume
    
    def calculate_value_area(self, volume_profile: Dict[float, VolumeNode]) -> Tuple[float, float]:
        """Value Area (VA) - 70% 거래량이 발생한 가격 범위 계산"""
        if not volume_profile:
            return 0, 0
        
        # 총 거래량
        total_volume = sum(node.volume for node in volume_profile.values())
        target_volume = total_volume * self.value_area_percentage
        
        # POC 찾기
        poc_price, _ = self.find_poc(volume_profile)
        
        # POC를 중심으로 확장하며 Value Area 계산
        sorted_prices = sorted(volume_profile.keys())
        poc_index = sorted_prices.index(poc_price)
        
        accumulated_volume = volume_profile[poc_price].volume
        lower_index = poc_index
        upper_index = poc_index
        
        while accumulated_volume < target_volume:
            # 위아래 중 거래량이 많은 쪽으로 확장
            expand_up = expand_down = 0
            
            if upper_index < len(sorted_prices) - 1:
                expand_up = volume_profile[sorted_prices[upper_index + 1]].volume
            
            if lower_index > 0:
                expand_down = volume_profile[sorted_prices[lower_index - 1]].volume
            
            if expand_up >= expand_down and upper_index < len(sorted_prices) - 1:
                upper_index += 1
                accumulated_volume += expand_up
            elif lower_index > 0:
                lower_index -= 1
                accumulated_volume += expand_down
            else:
                break
        
        value_area_high = sorted_prices[upper_index]
        value_area_low = sorted_prices[lower_index]
        
        return value_area_high, value_area_low
    
    def identify_hvn_lvn(self, volume_profile: Dict[float, VolumeNode], 
                        threshold: float = 0.3) -> Tuple[List[float], List[float]]:
        """High Volume Nodes (HVN) 및 Low Volume Nodes (LVN) 식별"""
        if not volume_profile:
            return [], []
        
        volumes = [node.volume for node in volume_profile.values()]
        mean_volume = np.mean(volumes)
        std_volume = np.std(volumes)
        
        hvn_levels = []  # 지지/저항 역할
        lvn_levels = []  # 브레이크아웃 가능 지점
        
        for price, node in volume_profile.items():
            z_score = (node.volume - mean_volume) / (std_volume + 1e-10)
            
            if z_score > 1.5:  # HVN
                hvn_levels.append(price)
            elif z_score < -1.0:  # LVN
                lvn_levels.append(price)
        
        return hvn_levels, lvn_levels
    
    def analyze_order_flow(self, volume_profile: Dict[float, VolumeNode]) -> Dict:
        """주문 흐름 분석 - 매수/매도 압력 평가"""
        if not volume_profile:
            return {}
        
        total_buy_volume = sum(node.buy_volume for node in volume_profile.values())
        total_sell_volume = sum(node.sell_volume for node in volume_profile.values())
        total_volume = total_buy_volume + total_sell_volume
        
        if total_volume == 0:
            return {}
        
        # 매수/매도 불균형 계산
        buy_pressure = total_buy_volume / total_volume
        sell_pressure = total_sell_volume / total_volume
        
        # 누적 델타
        cumulative_delta = total_buy_volume - total_sell_volume
        
        # 압력 지점 찾기
        pressure_points = []
        for price, node in volume_profile.items():
            if node.volume > 0:
                local_pressure = (node.buy_volume - node.sell_volume) / node.volume
                if abs(local_pressure) > 0.3:  # 30% 이상 불균형
                    pressure_points.append({
                        'price': price,
                        'pressure': local_pressure,
                        'volume': node.volume
                    })
        
        return {
            'buy_pressure': buy_pressure,
            'sell_pressure': sell_pressure,
            'cumulative_delta': cumulative_delta,
            'pressure_points': sorted(pressure_points, 
                                    key=lambda x: abs(x['pressure']), 
                                    reverse=True)[:5]
        }
    
    def detect_accumulation_distribution(self, df: pd.DataFrame, 
                                        volume_profile: Dict[float, VolumeNode]) -> str:
        """축적/분산 패턴 감지"""
        if len(df) < 20:
            return 'unknown'
        
        recent_df = df.tail(20)
        
        # 가격 범위 축소 여부
        price_range = recent_df['high'].max() - recent_df['low'].min()
        avg_range = (df['high'] - df['low']).mean()
        
        # 거래량 증가 여부
        recent_volume = recent_df['volume'].mean()
        historical_volume = df['volume'].mean()
        
        # POC 근처 거래 집중도
        poc_price, _ = self.find_poc(volume_profile)
        near_poc = abs(recent_df['close'].mean() - poc_price) / poc_price < 0.02
        
        if price_range < avg_range * 0.7 and recent_volume > historical_volume * 1.2 and near_poc:
            return 'accumulation'  # 축적 - 스마트머니 매집
        elif price_range > avg_range * 1.3 and recent_volume > historical_volume * 1.5:
            return 'distribution'  # 분산 - 스마트머니 매도
        else:
            return 'neutral'
    
    def generate_targets(self, current_price: float, hvn_levels: List[float], 
                        lvn_levels: List[float], direction: str) -> List[float]:
        """목표가 설정"""
        targets = []
        
        if direction == 'bullish':
            # 상승 목표가 - 위쪽 HVN과 LVN
            above_hvn = [lvl for lvl in hvn_levels if lvl > current_price]
            above_lvn = [lvl for lvl in lvn_levels if lvl > current_price]
            
            if above_lvn:  # LVN은 빠르게 통과
                targets.append(min(above_lvn))
            if above_hvn:  # HVN은 저항
                targets.extend(sorted(above_hvn)[:3])
                
        elif direction == 'bearish':
            # 하락 목표가
            below_hvn = [lvl for lvl in hvn_levels if lvl < current_price]
            below_lvn = [lvl for lvl in lvn_levels if lvl < current_price]
            
            if below_lvn:
                targets.append(max(below_lvn))
            if below_hvn:
                targets.extend(sorted(below_hvn, reverse=True)[:3])
        
        return targets[:3]  # 최대 3개 목표가
    
    def calculate_signal_confidence(self, volume_profile: Dict[float, VolumeNode],
                                   order_flow: Dict, pattern: str) -> float:
        """시그널 신뢰도 계산"""
        confidence = 0.5  # 기본 신뢰도
        
        # 거래량 집중도
        if volume_profile:
            volumes = [node.volume for node in volume_profile.values()]
            volume_concentration = np.std(volumes) / (np.mean(volumes) + 1e-10)
            confidence += min(volume_concentration * 0.1, 0.2)
        
        # 주문 흐름 강도
        if order_flow:
            flow_strength = abs(order_flow.get('buy_pressure', 0.5) - 0.5) * 2
            confidence += flow_strength * 0.2
        
        # 패턴 확실성
        if pattern in ['accumulation', 'distribution']:
            confidence += 0.1
        
        return min(confidence, 0.95)
    
    def analyze(self, df: pd.DataFrame, symbol: str) -> Dict:
        """메인 분석 함수"""
        # 거래량 프로파일 계산
        volume_profile = self.calculate_volume_profile(df)
        
        if not volume_profile:
            return {'symbol': symbol, 'error': 'Insufficient data'}
        
        # POC 및 Value Area
        poc_price, poc_volume = self.find_poc(volume_profile)
        va_high, va_low = self.calculate_value_area(volume_profile)
        
        # HVN/LVN 식별
        hvn_levels, lvn_levels = self.identify_hvn_lvn(volume_profile)
        
        # 주문 흐름 분석
        order_flow = self.analyze_order_flow(volume_profile)
        
        # 축적/분산 패턴
        pattern = self.detect_accumulation_distribution(df, volume_profile)
        
        # 현재 가격 위치 분석
        current_price = df['close'].iloc[-1]
        price_position = 'above_poc' if current_price > poc_price else 'below_poc'
        in_value_area = va_low <= current_price <= va_high
        
        # 시그널 생성
        signals = []
        
        # 방향성 결정
        if order_flow.get('buy_pressure', 0) > 0.6:
            direction = 'bullish'
        elif order_flow.get('sell_pressure', 0) > 0.6:
            direction = 'bearish'
        else:
            direction = 'neutral'
        
        # 주요 시그널
        if pattern == 'accumulation' and price_position == 'above_poc':
            targets = self.generate_targets(current_price, hvn_levels, lvn_levels, 'bullish')
            confidence = self.calculate_signal_confidence(volume_profile, order_flow, pattern)
            
            signals.append(VolumeProfileSignal(
                timestamp=datetime.now(),
                symbol=symbol,
                signal_type='accumulation',
                price_level=current_price,
                volume_strength=poc_volume,
                confidence=confidence,
                target_levels=targets
            ))
        
        # 지지/저항 레벨 근처
        nearest_hvn = min(hvn_levels, key=lambda x: abs(x - current_price)) if hvn_levels else None
        if nearest_hvn and abs(current_price - nearest_hvn) / current_price < 0.01:
            signals.append(VolumeProfileSignal(
                timestamp=datetime.now(),
                symbol=symbol,
                signal_type='support' if current_price > nearest_hvn else 'resistance',
                price_level=nearest_hvn,
                volume_strength=volume_profile[nearest_hvn].volume,
                confidence=0.7,
                target_levels=[]
            ))
        
        return {
            'symbol': symbol,
            'timestamp': datetime.now(),
            'current_price': current_price,
            'volume_profile_stats': {
                'poc': poc_price,
                'poc_volume': poc_volume,
                'value_area_high': va_high,
                'value_area_low': va_low,
                'in_value_area': in_value_area
            },
            'levels': {
                'hvn': hvn_levels[:5],  # Top 5 HVN
                'lvn': lvn_levels[:5]   # Top 5 LVN
            },
            'order_flow': order_flow,
            'pattern': pattern,
            'direction': direction,
            'signals': [
                {
                    'type': sig.signal_type,
                    'price': sig.price_level,
                    'confidence': sig.confidence,
                    'targets': sig.target_levels
                } for sig in signals
            ]
        }

# 실시간 모니터링
class VolumeProfileMonitor:
    """실시간 거래량 프로파일 모니터링"""
    
    def __init__(self, update_interval: int = 300):  # 5분마다 업데이트
        self.analyzer = VolumeProfileAnalyzer()
        self.update_interval = update_interval
        self.profile_cache = {}
        
    def update_profile(self, symbol: str, new_data: pd.DataFrame):
        """프로파일 업데이트"""
        result = self.analyzer.analyze(new_data, symbol)
        self.profile_cache[symbol] = {
            'timestamp': datetime.now(),
            'profile': result
        }
        return result
    
    def get_market_structure(self, symbols: List[str]) -> Dict:
        """전체 시장 구조 분석"""
        market_structure = {
            'accumulation': [],
            'distribution': [],
            'breakout_candidates': []
        }
        
        for symbol, cache_data in self.profile_cache.items():
            profile = cache_data['profile']
            
            if profile.get('pattern') == 'accumulation':
                market_structure['accumulation'].append(symbol)
            elif profile.get('pattern') == 'distribution':
                market_structure['distribution'].append(symbol)
            
            # LVN 근처 - 브레이크아웃 가능성
            if profile.get('levels', {}).get('lvn'):
                current_price = profile['current_price']
                for lvn in profile['levels']['lvn']:
                    if abs(current_price - lvn) / current_price < 0.02:
                        market_structure['breakout_candidates'].append({
                            'symbol': symbol,
                            'level': lvn,
                            'direction': 'up' if current_price < lvn else 'down'
                        })
                        break
        
        return market_structure

if __name__ == "__main__":
    # 사용 예제
    analyzer = VolumeProfileAnalyzer(bins=50, lookback_days=30)
    
    # 실제 사용시 바이낸스 API에서 데이터 로드
    # df = fetch_binance_data('BTCUSDT', '1h', limit=720)  # 30일 데이터
    # result = analyzer.analyze(df, 'BTCUSDT')
    # print(json.dumps(result, indent=2, default=str))
LVN
Low Volume Node
POC
핵심 가격대
70%
Value Area
CBOT
최초 개발 (1984)