data 包
data 包提供了数据处理和管理相关的功能。
此package的重要概念如下:
- Provider:K线数据提供者,可包含多个有相同起止时间的Feeder
- Feeder:对应一个品种的数据源,可以包含多个周期的数据
- Spider:实时监听多个交易所数据、存储数据库,并TCP通知订阅方
- Miner:每个交易所+市场对应一个Miner
- KLineWatcher:用于和Spider通信的客户端
Provider和Feeder
回测和实盘时都需要订阅K线数据,也可能同时运行多个策略,每个策略同时订阅多个时间周期;故提出接口IProvider
支持回测(HistProvider)和实盘(LiveProvider)。
每个品种的K线数据可能被多个策略同时使用,为避免每个策略重复获取,提出接口IKlineFeeder
支持回测(DBKlineFeeder)和实盘(KlineFeeder+KLineWatcher)。
每个KlineFeeder对应一个品种,可包含多个周期的数据,比如品种BTC/USDT可能被多个策略使用,订阅的周期有5m,1h,1d三个;为避免冗余数据读取,只会对最小周期(这里是5m)获取K线数据;其他更大周期数据会从最小周期聚合得到。
Provider和Feeder适合的场景
起止时间一致的多品种、多周期数据读取;比如回测或实盘都是对特定一段时间,运行一组策略,涉及一些品种的不同周期数据,建议使用Provider+Feeder。如果需要对多组不同的时间段分别获取数据,应实例化多个Provider+Feeder分开进行。
Provider和Feeder不适合的场景
不同周期数据起止时间不一致,比如对BTC/USDT的1m和1h都希望获得最近1k个K线用于回测或其他任务,如果强制使用Feeder,则1m实际会读取60*1k个K线,不如直接调用orm.GetOHLCV
逐周期获取,或者分两次初始化Feeder分开读取。
重要结构体
Feeder
每个Feeder对应一个交易对,可包含多个时间维度。
公开字段:
ExSymbol *orm.ExSymbol
- 交易所交易对信息States []*PairTFCache
- 各时间维度的缓存状态WaitBar *banexg.Kline
- 等待中的K线数据CallBack FnPairKline
- K线数据回调函数OnEnvEnd FuncEnvEnd
- 环境结束回调函数(期货主力切换或股票除权时需要先平仓)isWarmUp bool
- 当前是否预热状态
IKlineFeeder
K线数据馈送器接口。
公开方法:
getSymbol() string
- 获取交易对名称getWaitBar() *banexg.Kline
- 获取等待中的K线setWaitBar(bar *banexg.Kline)
- 设置等待中的K线SubTfs(timeFrames []string, delOther bool) []string
- 订阅指定时间周期的数据WarmTfs(curMS int64, tfNums map[string]int, pBar *utils.PrgBar) (int64, *errs.Error)
- 预热时间周期onNewBars(barTfMSecs int64, bars []*banexg.Kline) (bool, *errs.Error)
- 处理新K线数据getStates() []*PairTFCache
- 获取缓存状态
KlineFeeder
每个Feeder对应一个交易对,可包含多个时间维度。实盘使用。
公开字段:
Feeder
- 继承自FeederPreFire float64
- 提前触发bar的比率showLog bool
- 是否显示日志
IHistKlineFeeder
历史K线数据馈送器接口,继承自IKlineFeeder。
额外公开方法:
getNextMS() int64
- 获取下一个bar的结束时间戳DownIfNeed(sess *orm.Queries, exchange banexg.BanExchange, pBar *utils.PrgBar) *errs.Error
- 下载整个范围的K线SetSeek(since int64)
- 设置读取位置GetBar() *banexg.Kline
- 获取当前K线RunBar(bar *banexg.Kline) *errs.Error
- 运行K线对应的回调函数CallNext()
- 移动指针到下一个K线
HistKLineFeeder
历史数据反馈器,是文件反馈器和数据库反馈器的基类。
公开字段:
KlineFeeder
- 继承自KlineFeederTimeRange *config.TimeTuple
- 时间范围TradeTimes [][2]int64
- 可交易时间
DBKlineFeeder
数据库读取K线的Feeder,用于回测。
公开字段:
HistKLineFeeder
- 继承自HistKLineFeederoffsetMS int64
- 偏移时间戳
IProvider
数据提供者接口。
公开方法:
LoopMain() *errs.Error
- 主循环SubWarmPairs(items map[string]map[string]int, delOther bool) *errs.Error
- 订阅并预热交易对UnSubPairs(pairs ...string) *errs.Error
- 取消订阅交易对SetDirty()
- 设置脏标记
Provider
数据提供者基类。
公开字段:
holders map[string]T
- 持有的Feeder映射newFeeder func(pair string, tfs []string) (T, *errs.Error)
- 创建新Feeder的函数dirtyVers chan int
- 脏版本通道showLog bool
- 是否显示日志
HistProvider
历史数据提供者。
公开字段:
Provider[IHistKlineFeeder]
- 继承自ProviderpBar *utils.StagedPrg
- 进度条
LiveProvider
实时数据提供者。
公开字段:
Provider[IKlineFeeder]
- 继承自Provider*KLineWatcher
- K线监视器
NotifyKLines
K线通知消息。
公开字段:
TFSecs int
- 时间周期(秒)Interval int
- 更新间隔(秒)Arr []*banexg.Kline
- K线数组
KLineMsg
K线消息。
公开字段:
ExgName string
- 交易所名称Market string
- 市场类型Pair string
- 交易对TFSecs int
- 时间周期(秒)Interval int
- 更新间隔(秒)Arr []*banexg.Kline
- K线数组
SaveKline
保存K线的任务。
公开字段:
Sid int32
- 交易对IDTimeFrame string
- 时间周期Arr []*banexg.Kline
- K线数组SkipFirst bool
- 是否跳过第一个MsgAction string
- 消息动作
FetchJob
K线获取任务。
公开字段:
PairTFCache
- K线缓存Pair string
- 交易对CheckSecs int
- 检查间隔(秒)Since int64
- 开始时间戳NextRun int64
- 下次运行时间戳
Miner
数据挖掘器。
公开字段:
ExgName string
- 交易所名称Market string
- 市场类型Fetchs map[string]*FetchJob
- 获取任务映射KlineReady bool
- K线是否就绪KlinePairs map[string]bool
- K线交易对映射TradeReady bool
- 交易是否就绪TradePairs map[string]bool
- 交易交易对映射BookReady bool
- 订单簿是否就绪BookPairs map[string]bool
- 订单簿交易对映射IsWatchPrice bool
- 是否监控价格
LiveSpider
实时数据爬虫。
公开字段:
*utils.ServerIO
- 服务器IOminers map[string]*Miner
- 挖掘器映射
SubKLineState
K线订阅状态。
公开字段:
Sid int32
- 交易对IDNextNotify float64
- 下次通知时间PrevBar *banexg.Kline
- 前一个K线
KLineWatcher
K线监视器。
公开字段:
*utils.ClientIO
- 客户端IOjobs map[string]*PairTFCache
- 任务映射OnKLineMsg func(msg *KLineMsg)
- 收到K线消息的回调OnTrade func(exgName, market string, trade *banexg.Trade)
- 收到交易的回调
WatchJob
监视任务。
公开字段:
Symbol string
- 交易对TimeFrame string
- 时间周期Since int64
- 开始时间戳
K线数据相关
NewKlineFeeder
创建一个新的K线数据馈送器,用于处理实时K线数据。
参数:
exs *orm.ExSymbol
- 交易所交易对信息callBack FnPairKline
- K线数据回调函数showLog bool
- 是否显示日志
返回:
*KlineFeeder
- K线馈送器实例*errs.Error
- 错误信息
NewDBKlineFeeder
创建一个新的数据库K线馈送器,用于从数据库读取历史K线数据。
参数:
exs *orm.ExSymbol
- 交易所交易对信息callBack FnPairKline
- K线数据回调函数showLog bool
- 是否显示日志
返回:
*DBKlineFeeder
- 数据库K线馈送器实例*errs.Error
- 错误信息
NewHistProvider
创建一个新的历史数据提供者,用于管理历史K线数据的获取和处理。
参数:
callBack FnPairKline
- K线数据回调函数envEnd FuncEnvEnd
- 环境结束回调函数showLog bool
- 是否显示日志pBar *utils.StagedPrg
- 进度条对象
返回:
*HistProvider
- 历史数据提供者实例
RunHistFeeders
运行历史K线馈送器集合,用于批量处理历史数据。
参数:
makeFeeders func() []IHistKlineFeeder
- 创建馈送器列表的函数versions chan int
- 版本控制通道pBar *utils.PrgBar
- 进度条对象
返回:
*errs.Error
- 错误信息
SortFeeders
对K线馈送器进行排序或插入操作。
参数:
holds []IHistKlineFeeder
- 现有的馈送器列表hold IHistKlineFeeder
- 待处理的馈送器insert bool
- 是否为插入操作
返回:
[]IHistKlineFeeder
- 处理后的馈送器列表
NewLiveProvider
创建一个新的实时数据提供者,用于处理实时K线数据。
参数:
callBack FnPairKline
- K线数据回调函数envEnd FuncEnvEnd
- 环境结束回调函数
返回:
*LiveProvider
- 实时数据提供者实例*errs.Error
- 错误信息
数据工具相关
FindPathNames
查找指定路径下符合后缀名的所有文件。
参数:
inPath string
- 输入路径suffix string
- 文件后缀名
返回:
[]string
- 文件路径列表*errs.Error
- 错误信息
ReadZipCSVs
读取ZIP压缩包中的CSV文件。
参数:
inPath string
- ZIP文件路径pBar *utils.PrgBar
- 进度条对象handle FuncReadZipItem
- 处理每个CSV文件的回调函数arg interface{}
- 传递给回调函数的参数
返回:
*errs.Error
- 错误信息
RunSpider
运行数据爬虫服务。
参数:
addr string
- 服务监听地址
返回:
*errs.Error
- 错误信息
NewKlineWatcher
创建一个新的K线数据监视器。
参数:
addr string
- 连接地址
返回:
*KLineWatcher
- K线监视器实例*errs.Error
- 错误信息
RunFormatTick
运行Tick数据格式化工具。
参数:
args *config.CmdArgs
- 命令行参数
返回:
*errs.Error
- 错误信息
Build1mWithTicks
使用Tick数据构建1分钟K线。
参数:
args *config.CmdArgs
- 命令行参数
返回:
*errs.Error
- 错误信息
CalcFilePerfs
计算文件性能指标。
参数:
args *config.CmdArgs
- 命令行参数
返回:
*errs.Error
- 错误信息