-
Couldn't load subscription status.
- Fork 91
Closed
Description
Hello, I'm try write backtesting code for ,is there anything wrong about the code ? help?
sample_data.csv
timestamp open high low close volume
2024-01-01 00:00:00 42000.00 42100.00 41900.00 42050.00 100.50
2024-01-01 00:01:00 42050.00 42150.00 42000.00 42100.00 95.30
`
import os
from collections import deque
from decimal import Decimal
import pandas as pd
from nexustrader.config import BasicConfig, Config, LogConfig, MockConnectorConfig
from nexustrader.constants import ExchangeType, OrderSide, OrderType
from nexustrader.engine import Engine
from nexustrader.exchange.binance import BinanceAccountType
from nexustrader.schema import Order
from nexustrader.strategy import Strategy
class HistoricalDataBacktestStrategy(Strategy):
def __init__(self, csv_file_path, speed=1.0):
super().__init__()
self.symbol = "BTCUSDT-PERP.BINANCE"
self.csv_file_path = csv_file_path
self.speed = speed # 回测速度倍数
# 策略参数
self.fast_period = 10
self.slow_period = 20
self.fast_prices = deque(maxlen=self.fast_period)
self.slow_prices = deque(maxlen=self.slow_period)
# 数据索引
self.data_index = 0
self.df = None
# 统计信息
self.trade_count = 0
self.win_count = 0
self.last_trade_price = None
def on_start(self):
"""策略启动 - 加载历史数据"""
self.log.info("=" * 60)
self.log.info("启动历史数据回测策略")
self.log.info("=" * 60)
# 检查文件是否存在
if not os.path.exists(self.csv_file_path):
self.log.error(f"数据文件不存在: {self.csv_file_path}")
self.log.info("请先准备历史数据文件,格式如下:")
self.log.info("timestamp,open,high,low,close,volume")
self.log.info("2024-01-01 00:00:00,42000.0,42100.0,41900.0,42050.0,100.5")
self.log.info("...")
self.stop()
return
# 加载历史数据
try:
self.df = pd.read_csv(self.csv_file_path)
self.df["timestamp"] = pd.to_datetime(self.df["timestamp"])
self.df = self.df.sort_values("timestamp")
self.log.info(f"✅ 成功加载 {len(self.df)} 条历史K线数据")
self.log.info(
f"📅 时间范围: {self.df['timestamp'].min()} 至 {self.df['timestamp'].max()}"
)
self.log.info(f"⚡ 回测速度: {self.speed}x")
self.log.info(f"💰 初始资金: 10,000 USDT")
self.log.info(
f"📊 策略: MA{self.fast_period} / MA{self.slow_period} 双均线"
)
self.log.info("=" * 60)
except Exception as e:
self.log.error(f"加载数据失败: {e}")
self.stop()
return
# 使用定时器逐条处理数据
# 假设CSV是1分钟K线,那么基础间隔是60秒 / speed
interval_seconds = 60.0 / self.speed
self.schedule(
func=self.process_next_bar,
trigger="interval",
seconds=interval_seconds,
)
def process_next_bar(self):
"""处理下一根K线"""
if self.data_index >= len(self.df):
self.log.info("📊 历史数据处理完成!")
self.stop()
return
row = self.df.iloc[self.data_index]
self.data_index += 1
# 提取K线数据
timestamp = row["timestamp"]
close_price = row["close"]
# 更新价格队列
self.fast_prices.append(close_price)
self.slow_prices.append(close_price)
# 显示进度
progress = (self.data_index / len(self.df)) * 100
if self.data_index % 100 == 0: # 每100根K线显示一次进度
self.log.info(f"进度: {progress:.1f}% ({self.data_index}/{len(self.df)})")
# 等待足够的数据
if len(self.slow_prices) < self.slow_period:
return
# 计算均线
fast_ma = sum(self.fast_prices) / len(self.fast_prices)
slow_ma = sum(self.slow_prices) / len(self.slow_prices)
# 获取当前持仓
position = self.cache.get_position(self.symbol)
has_long_position = position and position.is_long and position.amount > 0
has_short_position = position and position.is_short and position.amount > 0
# 交易信号
if fast_ma > slow_ma and not has_long_position:
# 金叉:买入信号
self.log.info(
f"🟢 [{timestamp}] 金叉买入信号 - "
f"价格:{close_price:.2f}, MA{self.fast_period}:{fast_ma:.2f}, "
f"MA{self.slow_period}:{slow_ma:.2f}"
)
# 如果有空仓,先平仓
if has_short_position:
self.create_order(
symbol=self.symbol,
side=OrderSide.BUY,
type=OrderType.MARKET,
amount=position.amount,
)
# 开多仓
self.create_order(
symbol=self.symbol,
side=OrderSide.BUY,
type=OrderType.MARKET,
amount=Decimal("0.01"),
)
self.last_trade_price = close_price
elif fast_ma < slow_ma and not has_short_position:
# 死叉:卖出信号
self.log.info(
f"🔴 [{timestamp}] 死叉卖出信号 - "
f"价格:{close_price:.2f}, MA{self.fast_period}:{fast_ma:.2f}, "
f"MA{self.slow_period}:{slow_ma:.2f}"
)
# 如果有多仓,先平仓
if has_long_position:
self.create_order(
symbol=self.symbol,
side=OrderSide.SELL,
type=OrderType.MARKET,
amount=position.amount,
)
# 开空仓
self.create_order(
symbol=self.symbol,
side=OrderSide.SELL,
type=OrderType.MARKET,
amount=Decimal("0.01"),
)
self.last_trade_price = close_price
def on_filled_order(self, order: Order):
self.trade_count += 1
# 统计盈亏
if self.last_trade_price:
if order.side == OrderSide.SELL:
profit = (order.price - self.last_trade_price) * float(order.amount)
else:
profit = (self.last_trade_price - order.price) * float(order.amount)
if profit > 0:
self.win_count += 1
# 获取账户信息
account_type = BinanceAccountType.LINEAR_MOCK
balance = self.cache.get_balance(account_type)
position = self.cache.get_position(self.symbol)
usdt_balance = (
balance.balance.get("USDT", 0) if balance and balance.balance else 0
)
self.log.info(
f"✅ 订单成交 #{self.trade_count}: {order.side.value} "
f"{order.amount} @ {order.price:.2f}, 余额: {usdt_balance:.2f} USDT"
)
if position and position.amount > 0:
self.log.info(
f" 持仓: {position.side.value} {position.amount} @ "
f"{position.entry_price:.2f}, 未实现盈亏: {position.unrealized_pnl:.2f}"
)
def on_stop(self):
self.log.info("")
self.log.info("=" * 60)
self.log.info("回测完成 - 最终统计")
self.log.info("=" * 60)
account_type = BinanceAccountType.LINEAR_MOCK
balance = self.cache.get_balance(account_type)
if balance and balance.balance:
initial_balance = 10000
final_balance = balance.balance.get("USDT", 0)
profit = final_balance - initial_balance
profit_rate = (profit / initial_balance) * 100
win_rate = (
(self.win_count / self.trade_count * 100) if self.trade_count > 0 else 0
)
self.log.info(f"💰 初始资金: {initial_balance:.2f} USDT")
self.log.info(f"💰 最终余额: {final_balance:.2f} USDT")
self.log.info(f"📈 盈亏: {profit:.2f} USDT ({profit_rate:+.2f}%)")
self.log.info(f"📊 交易次数: {self.trade_count}")
self.log.info(f"📊 胜率: {win_rate:.1f}%")
self.log.info("=" * 60)
config = Config(
strategy_id="historical_backtest",
user_id="backtest_user",
strategy=HistoricalDataBacktestStrategy(
csv_file_path="examples/sample_data.csv", # 使用示例数据
speed=10.0, # 回测速度: 10倍速
),
# 日志配置
log_config=LogConfig(
level_stdout="INFO",
),
# 交易所基础配置
basic_config={ExchangeType.BINANCE: BasicConfig()},
# 不需要公共连接器(不从交易所获取数据)
public_conn_config={},
# 私有连接器配置(模拟订单执行)
private_conn_config={
ExchangeType.BINANCE: [
MockConnectorConfig(
account_type=BinanceAccountType.LINEAR_MOCK,
initial_balance={"USDT": 10_000},
quote_currency="USDT",
fee_rate=0.0005,
leverage=1,
overwrite_balance=True,
overwrite_position=True,
)
]
},
)
engine = Engine(config)
if __name__ == "__main__":
try:
engine.start()
except KeyboardInterrupt:
print("\n\n⚠️ 收到停止信号,正在关闭策略...")
finally:
engine.dispose()
print("\n✅ 回测已完成\n")
`
Metadata
Metadata
Assignees
Labels
No labels