유전 알고리즘으로 자동 진화하는 AI 트레이딩 전략 - 바이낸스 백테스팅 기반 최적화
DEAP 프레임워크 기반 유전 알고리즘으로 최적 전략 진화
병렬 섬 모델로 다양한 전략 동시 진화 & 마이그레이션
수익률, 샤프비율, 최대낙폭, 승률 동시 최적화
fitness = (total_return × 0.3) + (sharpe_ratio × 0.3) + ((1 + max_drawdown) × 0.2) + (win_rate × 0.2)
유전 알고리즘으로 최적 트레이딩 전략 진화
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
import random
import json
from deap import base, creator, tools, algorithms
import ccxt
import talib
from sklearn.metrics import sharpe_ratio
import asyncio
import aiohttp
from concurrent.futures import ProcessPoolExecutor
import warnings
warnings.filterwarnings('ignore')
@dataclass
class TradingGene:
"""트레이딩 전략 유전자"""
# 기술적 지표 파라미터
rsi_period: int
rsi_oversold: float
rsi_overbought: float
macd_fast: int
macd_slow: int
macd_signal: int
bb_period: int
bb_std: float
volume_threshold: float
# 리스크 관리
stop_loss: float
take_profit: float
position_size: float
# 시그널 가중치
rsi_weight: float
macd_weight: float
volume_weight: float
sentiment_weight: float
# 적응도 점수
fitness: float = 0.0
sharpe_ratio: float = 0.0
max_drawdown: float = 0.0
win_rate: float = 0.0
class GeneticTradingEvolution:
"""유전 알고리즘 기반 트레이딩 전략 진화 시스템"""
def __init__(self, population_size: int = 100):
self.population_size = population_size
self.exchange = ccxt.binance()
self.generation = 0
self.best_genes_history = []
# 유전자 범위 정의
self.gene_bounds = {
'rsi_period': (5, 30),
'rsi_oversold': (20, 40),
'rsi_overbought': (60, 80),
'macd_fast': (8, 15),
'macd_slow': (20, 30),
'macd_signal': (5, 12),
'bb_period': (10, 30),
'bb_std': (1.5, 3.0),
'volume_threshold': (1.0, 3.0),
'stop_loss': (0.01, 0.05),
'take_profit': (0.02, 0.10),
'position_size': (0.1, 1.0),
'rsi_weight': (0.0, 1.0),
'macd_weight': (0.0, 1.0),
'volume_weight': (0.0, 1.0),
'sentiment_weight': (0.0, 1.0)
}
# DEAP 설정
self._setup_deap()
# 백테스트 데이터 캐시
self.data_cache = {}
def _setup_deap(self):
"""DEAP 유전 알고리즘 프레임워크 설정"""
# Fitness 클래스 정의 (최대화)
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)
self.toolbox = base.Toolbox()
# 유전자 생성 함수
for gene_name, (min_val, max_val) in self.gene_bounds.items():
if isinstance(min_val, int):
self.toolbox.register(f"attr_{gene_name}",
random.randint, min_val, max_val)
else:
self.toolbox.register(f"attr_{gene_name}",
random.uniform, min_val, max_val)
# 개체 생성
gene_list = [getattr(self.toolbox, f"attr_{gene}")
for gene in self.gene_bounds.keys()]
self.toolbox.register("individual", tools.initCycle,
creator.Individual, gene_list, n=1)
self.toolbox.register("population", tools.initRepeat,
list, self.toolbox.individual)
# 유전 연산자
self.toolbox.register("evaluate", self.evaluate_strategy)
self.toolbox.register("mate", tools.cxTwoPoint)
self.toolbox.register("mutate", tools.mutGaussian,
mu=0, sigma=0.2, indpb=0.2)
self.toolbox.register("select", tools.selTournament, tournsize=3)
async def fetch_training_data(self, symbol: str = 'BTC/USDT',
timeframe: str = '1h',
limit: int = 2000) -> pd.DataFrame:
"""백테스트용 훈련 데이터 수집"""
cache_key = f"{symbol}_{timeframe}_{limit}"
if cache_key in self.data_cache:
return self.data_cache[cache_key]
print(f"Fetching {limit} candles for {symbol}...")
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high',
'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# 캐싱
self.data_cache[cache_key] = df
return df
def decode_gene(self, individual: List) -> TradingGene:
"""유전자 배열을 TradingGene 객체로 변환"""
gene_names = list(self.gene_bounds.keys())
gene_dict = {name: individual[i] for i, name in enumerate(gene_names)}
return TradingGene(
rsi_period=int(gene_dict['rsi_period']),
rsi_oversold=gene_dict['rsi_oversold'],
rsi_overbought=gene_dict['rsi_overbought'],
macd_fast=int(gene_dict['macd_fast']),
macd_slow=int(gene_dict['macd_slow']),
macd_signal=int(gene_dict['macd_signal']),
bb_period=int(gene_dict['bb_period']),
bb_std=gene_dict['bb_std'],
volume_threshold=gene_dict['volume_threshold'],
stop_loss=gene_dict['stop_loss'],
take_profit=gene_dict['take_profit'],
position_size=gene_dict['position_size'],
rsi_weight=gene_dict['rsi_weight'],
macd_weight=gene_dict['macd_weight'],
volume_weight=gene_dict['volume_weight'],
sentiment_weight=gene_dict['sentiment_weight']
)
def evaluate_strategy(self, individual: List) -> Tuple[float]:
"""전략 평가 함수 (백테스팅)"""
gene = self.decode_gene(individual)
# 동기 실행을 위한 래퍼
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
df = loop.run_until_complete(self.fetch_training_data())
loop.close()
# 기술적 지표 계산
df['rsi'] = talib.RSI(df['close'], timeperiod=gene.rsi_period)
macd, macd_signal, macd_hist = talib.MACD(
df['close'],
fastperiod=gene.macd_fast,
slowperiod=gene.macd_slow,
signalperiod=gene.macd_signal
)
df['macd'] = macd
df['macd_signal'] = macd_signal
df['macd_hist'] = macd_hist
bb_upper, bb_middle, bb_lower = talib.BBANDS(
df['close'],
timeperiod=gene.bb_period,
nbdevup=gene.bb_std,
nbdevdn=gene.bb_std
)
df['bb_upper'] = bb_upper
df['bb_lower'] = bb_lower
df['volume_ma'] = df['volume'].rolling(20).mean()
# 시그널 생성
signals = self._generate_signals(df, gene)
# 백테스팅
results = self._backtest(df, signals, gene)
# 적응도 계산
fitness = self._calculate_fitness(results)
return (fitness,)
def _generate_signals(self, df: pd.DataFrame, gene: TradingGene) -> pd.Series:
"""유전자 기반 시그널 생성"""
signals = pd.Series(0, index=df.index)
# RSI 시그널
rsi_buy = df['rsi'] < gene.rsi_oversold
rsi_sell = df['rsi'] > gene.rsi_overbought
# MACD 시그널
macd_buy = (df['macd'] > df['macd_signal']) & (df['macd'].shift(1) <= df['macd_signal'].shift(1))
macd_sell = (df['macd'] < df['macd_signal']) & (df['macd'].shift(1) >= df['macd_signal'].shift(1))
# Volume 시그널
volume_signal = df['volume'] > df['volume_ma'] * gene.volume_threshold
# 가중치 적용 시그널 조합
buy_score = (
rsi_buy * gene.rsi_weight +
macd_buy * gene.macd_weight +
volume_signal * gene.volume_weight
)
sell_score = (
rsi_sell * gene.rsi_weight +
macd_sell * gene.macd_weight
)
# 시그널 결정 (임계값 기반)
threshold = (gene.rsi_weight + gene.macd_weight + gene.volume_weight) * 0.5
signals[buy_score > threshold] = 1 # Buy
signals[sell_score > threshold * 0.7] = -1 # Sell
return signals
def _backtest(self, df: pd.DataFrame, signals: pd.Series,
gene: TradingGene) -> Dict:
"""백테스팅 실행"""
initial_capital = 10000
capital = initial_capital
position = 0
trades = []
equity_curve = []
for i in range(len(df)):
equity_curve.append(capital + position * df['close'].iloc[i])
if signals.iloc[i] == 1 and position == 0: # Buy signal
position = (capital * gene.position_size) / df['close'].iloc[i]
capital -= position * df['close'].iloc[i]
trades.append({
'type': 'buy',
'price': df['close'].iloc[i],
'timestamp': df.index[i],
'stop_loss': df['close'].iloc[i] * (1 - gene.stop_loss),
'take_profit': df['close'].iloc[i] * (1 + gene.take_profit)
})
elif position > 0:
current_price = df['close'].iloc[i]
last_trade = trades[-1] if trades else None
# Check stop loss or take profit
if last_trade:
if current_price <= last_trade['stop_loss'] or current_price >= last_trade['take_profit'] or signals.iloc[i] == -1:
# Close position
capital += position * current_price
pnl = (current_price - last_trade['price']) * position
trades[-1]['exit_price'] = current_price
trades[-1]['exit_timestamp'] = df.index[i]
trades[-1]['pnl'] = pnl
trades[-1]['return'] = pnl / (last_trade['price'] * position)
position = 0
# Calculate metrics
if not trades:
return {
'total_return': 0,
'sharpe_ratio': 0,
'max_drawdown': 0,
'win_rate': 0,
'num_trades': 0,
'equity_curve': equity_curve
}
completed_trades = [t for t in trades if 'pnl' in t]
if not completed_trades:
return {
'total_return': 0,
'sharpe_ratio': 0,
'max_drawdown': 0,
'win_rate': 0,
'num_trades': 0,
'equity_curve': equity_curve
}
# Performance metrics
returns = [t['return'] for t in completed_trades]
total_return = (equity_curve[-1] - initial_capital) / initial_capital
# Sharpe ratio
if len(returns) > 1:
sharpe = np.mean(returns) / (np.std(returns) + 1e-6) * np.sqrt(252)
else:
sharpe = 0
# Max drawdown
peak = np.maximum.accumulate(equity_curve)
drawdown = (equity_curve - peak) / peak
max_drawdown = np.min(drawdown)
# Win rate
winning_trades = sum(1 for t in completed_trades if t['pnl'] > 0)
win_rate = winning_trades / len(completed_trades) if completed_trades else 0
return {
'total_return': total_return,
'sharpe_ratio': sharpe,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'num_trades': len(completed_trades),
'equity_curve': equity_curve,
'trades': completed_trades
}
def _calculate_fitness(self, results: Dict) -> float:
"""적응도 함수"""
# 다목적 최적화: 수익률, 샤프비율, 최대낙폭, 승률
fitness = (
results['total_return'] * 0.3 +
results['sharpe_ratio'] * 0.3 +
(1 + results['max_drawdown']) * 0.2 + # 낙폭은 음수이므로
results['win_rate'] * 0.2
)
# 거래 횟수 페널티 (너무 적거나 많으면 페널티)
optimal_trades = 50
trade_penalty = 1 - abs(results['num_trades'] - optimal_trades) / optimal_trades
trade_penalty = max(0, trade_penalty)
fitness *= (0.8 + 0.2 * trade_penalty)
return fitness
def evolve_population(self, generations: int = 50) -> TradingGene:
"""유전 알고리즘 실행"""
print(f"Starting evolution for {generations} generations...")
# 초기 인구 생성
population = self.toolbox.population(n=self.population_size)
# 통계 설정
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
# Hall of Fame (최고 개체 보존)
hof = tools.HallOfFame(5)
# 진화 실행
for gen in range(generations):
self.generation = gen
print(f"\n--- Generation {gen + 1}/{generations} ---")
# 평가
fitnesses = list(map(self.toolbox.evaluate, population))
for ind, fit in zip(population, fitnesses):
ind.fitness.values = fit
# 통계 기록
record = stats.compile(population)
print(f"Fitness -> Avg: {record['avg']:.4f}, Max: {record['max']:.4f}")
# Hall of Fame 업데이트
hof.update(population)
# 최고 개체 저장
best_individual = hof[0]
best_gene = self.decode_gene(best_individual)
best_gene.fitness = best_individual.fitness.values[0]
self.best_genes_history.append(best_gene)
# 선택과 교배
offspring = self.toolbox.select(population, len(population))
offspring = list(map(self.toolbox.clone, offspring))
# 교차
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < 0.7: # 교차 확률
self.toolbox.mate(child1, child2)
del child1.fitness.values
del child2.fitness.values
# 돌연변이
for mutant in offspring:
if random.random() < 0.2: # 돌연변이 확률
self.toolbox.mutate(mutant)
del mutant.fitness.values
# 엘리트주의: 최고 개체 보존
offspring[-5:] = hof.items[:5]
# 다음 세대로 교체
population[:] = offspring
# 조기 종료 조건
if gen > 10 and abs(self.best_genes_history[-1].fitness -
self.best_genes_history[-10].fitness) < 0.001:
print("Convergence reached. Stopping early.")
break
# 최종 최적 개체 반환
best_gene = self.decode_gene(hof[0])
best_gene.fitness = hof[0].fitness.values[0]
return best_gene
def adaptive_mutation(self, generation: int) -> float:
"""적응적 돌연변이율"""
# 초반에는 높은 돌연변이율, 후반에는 낮은 돌연변이율
return 0.3 * np.exp(-generation / 20)
def save_best_gene(self, gene: TradingGene, filename: str = 'best_gene.json'):
"""최적 유전자 저장"""
gene_dict = {
'rsi_period': gene.rsi_period,
'rsi_oversold': gene.rsi_oversold,
'rsi_overbought': gene.rsi_overbought,
'macd_fast': gene.macd_fast,
'macd_slow': gene.macd_slow,
'macd_signal': gene.macd_signal,
'bb_period': gene.bb_period,
'bb_std': gene.bb_std,
'volume_threshold': gene.volume_threshold,
'stop_loss': gene.stop_loss,
'take_profit': gene.take_profit,
'position_size': gene.position_size,
'rsi_weight': gene.rsi_weight,
'macd_weight': gene.macd_weight,
'volume_weight': gene.volume_weight,
'sentiment_weight': gene.sentiment_weight,
'fitness': gene.fitness,
'generation': self.generation
}
with open(filename, 'w') as f:
json.dump(gene_dict, f, indent=2)
print(f"Best gene saved to {filename}")
def load_best_gene(self, filename: str = 'best_gene.json') -> TradingGene:
"""저장된 유전자 로드"""
with open(filename, 'r') as f:
gene_dict = json.load(f)
return TradingGene(**{k: v for k, v in gene_dict.items()
if k != 'generation'})
def parallel_evolution(self, islands: int = 4,
generations_per_island: int = 25) -> TradingGene:
"""섬 모델 병렬 진화"""
print(f"Starting parallel evolution with {islands} islands...")
island_populations = []
# 각 섬별 진화
with ProcessPoolExecutor(max_workers=islands) as executor:
futures = []
for island_id in range(islands):
# 각 섬에 다른 설정
island_evolution = GeneticTradingEvolution(
population_size=self.population_size // islands
)
future = executor.submit(
island_evolution.evolve_population,
generations_per_island
)
futures.append(future)
# 결과 수집
best_genes = []
for future in futures:
best_gene = future.result()
best_genes.append(best_gene)
# 최고 개체 선택
best_gene = max(best_genes, key=lambda g: g.fitness)
# 마이그레이션 후 추가 진화
print("\nFinal evolution phase with migrated populations...")
final_evolution = GeneticTradingEvolution(self.population_size)
# 최고 개체들로 초기 인구 구성
initial_population = []
for gene in best_genes:
for _ in range(self.population_size // islands):
individual = creator.Individual([
gene.rsi_period, gene.rsi_oversold, gene.rsi_overbought,
gene.macd_fast, gene.macd_slow, gene.macd_signal,
gene.bb_period, gene.bb_std, gene.volume_threshold,
gene.stop_loss, gene.take_profit, gene.position_size,
gene.rsi_weight, gene.macd_weight, gene.volume_weight,
gene.sentiment_weight
])
initial_population.append(individual)
# 최종 진화
final_best = final_evolution.evolve_population(10)
return final_best
# 사용 예제
async def main():
evolution = GeneticTradingEvolution(population_size=100)
# 단일 진화
best_gene = evolution.evolve_population(generations=50)
print(f"\n=== BEST EVOLVED STRATEGY ===")
print(f"Fitness Score: {best_gene.fitness:.4f}")
print(f"RSI: Period={best_gene.rsi_period}, OS={best_gene.rsi_oversold:.1f}, OB={best_gene.rsi_overbought:.1f}")
print(f"MACD: Fast={best_gene.macd_fast}, Slow={best_gene.macd_slow}, Signal={best_gene.macd_signal}")
print(f"Risk: SL={best_gene.stop_loss:.3f}, TP={best_gene.take_profit:.3f}, Size={best_gene.position_size:.2f}")
print(f"Weights: RSI={best_gene.rsi_weight:.2f}, MACD={best_gene.macd_weight:.2f}, Vol={best_gene.volume_weight:.2f}")
# 최적 유전자 저장
evolution.save_best_gene(best_gene)
# 병렬 진화 (선택적)
# best_gene = evolution.parallel_evolution(islands=4)
return best_gene
if __name__ == "__main__":
asyncio.run(main())