数据过滤器
原文: https://www.backtrader.com/blog/posts/2015-11-21-data-filters/data-filling-filtering/
前一段时间,罚单 23 让我想到了在该罚单的背景下进行的讨论的潜在改进。
在票中我加了一个DataFilter
类,但这太复杂了。实际上让人想起DataResampler
和DataReplayer
中构建的复杂性,这些类用于实现相同名称的功能。
因此,由于有两个版本,backtrader
支持在数据提要中添加filter
(如果愿意,可以称之为processor
。重采样和重放是使用该功能在内部重新实现的,所有事情似乎都不那么复杂(尽管仍然如此)
过滤器在工作
给定现有数据源/数据源,您使用数据源的addfilter
方法:
data = MyDataFeed(name=myname)
data.addfilter(filter, *args, **kwargs)
显然,filter
必须符合给定的接口,即:
-
接受此签名的可调用函数:
py callable(data, *args, **kwargs)
或
-
可以实例化和调用的类
- 实例化时,init方法必须支持签名:
py def __init__(self, data, *args, **kwargs)
- 调用和最后一个方法这个:
```py def call(self, data)
def last(self, data) ```
将为数据源生成的每个数据调用可调用/实例。
一个更好的解决方案
我想要那张票:
-
以日内为基础的指数
-
日内数据可能丢失
-
会前/会后数据可能会到达
实现几个过滤器可以缓解回溯测试环境的情况。
筛选出上市前/上市后数据
以下过滤器(已在backtrader
中提供)用于救援:
class SessionFilter(with_metaclass(metabase.MetaParams, object)):
'''
This class can be applied to a data source as a filter and will filter out
intraday bars which fall outside of the regular session times (ie: pre/post
market data)
This is a "non-simple" filter and must manage the stack of the data (passed
during init and __call__)
It needs no "last" method because it has nothing to deliver
'''
def __init__(self, data):
pass
def __call__(self, data):
'''
Return Values:
- False: data stream was not touched
- True: data stream was manipulated (bar outside of session times and
- removed)
'''
if data.sessionstart <= data.datetime.tm(0) <= data.sessionend:
# Both ends of the comparison are in the session
return False # say the stream is untouched
# bar outside of the regular session times
data.backwards() # remove bar from data stack
return True # signal the data was manipulated
过滤器使用嵌入会话开始/结束时间中的数据来过滤条
-
如果新数据的日期时间在会话时间内,则返回
False
以指示数据未被触及 -
如果数据时间超出范围,则发送数据源
backwards
有效擦除最后生成的数据。返回True
表示数据流被操作。
笔记
调用data.backwards()
可能是/可能是低级别的,过滤器应该有一个处理数据流内部的 API
脚本末尾的示例代码可以使用过滤器运行,也可以不使用过滤器运行。第一次运行是 100%未筛选的,并且没有指定会话时间:
$ ./data-filler.py --writer --wrcsv
查看第 1st天的开始和结束:
===============================================================================
Id,2006-01-02-volume-min-001,len,datetime,open,high,low,close,volume,openinterest,Strategy,len
1,2006-01-02-volume-min-001,1,2006-01-02 09:01:00,3602.0,3603.0,3597.0,3599.0,5699.0,0.0,Strategy,1
2,2006-01-02-volume-min-001,2,2006-01-02 09:02:00,3600.0,3601.0,3598.0,3599.0,894.0,0.0,Strategy,2
...
...
581,2006-01-02-volume-min-001,581,2006-01-02 19:59:00,3619.0,3619.0,3619.0,3619.0,1.0,0.0,Strategy,581
582,2006-01-02-volume-min-001,582,2006-01-02 20:00:00,3618.0,3618.0,3617.0,3618.0,242.0,0.0,Strategy,582
583,2006-01-02-volume-min-001,583,2006-01-02 20:01:00,3618.0,3618.0,3617.0,3617.0,15.0,0.0,Strategy,583
584,2006-01-02-volume-min-001,584,2006-01-02 20:04:00,3617.0,3617.0,3617.0,3617.0,107.0,0.0,Strategy,584
585,2006-01-02-volume-min-001,585,2006-01-03 09:01:00,3623.0,3625.0,3622.0,3624.0,4026.0,0.0,Strategy,585
...
会议时间为 2006 年 1 月 2 日 09:01:00 至 20:04:00。
现在使用SessionFilter
运行,并告诉脚本使用 09:30 和 17:30 作为会话的开始/结束时间:
$ ./data-filler.py --writer --wrcsv --tstart 09:30 --tend 17:30 --filter
===============================================================================
Id,2006-01-02-volume-min-001,len,datetime,open,high,low,close,volume,openinterest,Strategy,len
1,2006-01-02-volume-min-001,1,2006-01-02 09:30:00,3604.0,3605.0,3603.0,3604.0,546.0,0.0,Strategy,1
2,2006-01-02-volume-min-001,2,2006-01-02 09:31:00,3604.0,3606.0,3604.0,3606.0,438.0,0.0,Strategy,2
...
...
445,2006-01-02-volume-min-001,445,2006-01-02 17:29:00,3621.0,3621.0,3620.0,3620.0,866.0,0.0,Strategy,445
446,2006-01-02-volume-min-001,446,2006-01-02 17:30:00,3620.0,3621.0,3619.0,3621.0,1670.0,0.0,Strategy,446
447,2006-01-02-volume-min-001,447,2006-01-03 09:30:00,3637.0,3638.0,3635.0,3636.0,1458.0,0.0,Strategy,447
...
数据输出现在从 09:30 开始,到 17:30 结束。已筛选出上市前/上市后数据。
填写缺失数据
对输出的深入检查显示如下:
...
61,2006-01-02-volume-min-001,61,2006-01-02 10:30:00,3613.0,3614.0,3613.0,3614.0,112.0,0.0,Strategy,61
62,2006-01-02-volume-min-001,62,2006-01-02 10:31:00,3614.0,3614.0,3614.0,3614.0,183.0,0.0,Strategy,62
63,2006-01-02-volume-min-001,63,2006-01-02 10:34:00,3614.0,3614.0,3614.0,3614.0,841.0,0.0,Strategy,63
64,2006-01-02-volume-min-001,64,2006-01-02 10:35:00,3614.0,3614.0,3614.0,3614.0,17.0,0.0,Strategy,64
...
缺少 10:32 和 10:33 分钟的数据。作为一年中的第一个交易日,可能根本没有任何谈判。或者,数据源可能无法捕获该数据。
为了记录 23 分钟,并能够将给定分钟的音量与前一天的相同分钟进行比较,我们将填写缺失的数据。
在backtrader
中已经有一个SessionFiller
,正如预期的那样,它填充了缺失的数据。代码很长,比过滤器更复杂(完整的实现请参见结尾),但让我们看看类/参数的定义:
class SessionFiller(with_metaclass(metabase.MetaParams, object)):
'''
Bar Filler for a Data Source inside the declared session start/end times.
The fill bars are constructed using the declared Data Source ``timeframe``
and ``compression`` (used to calculate the intervening missing times)
Params:
- fill_price (def: None):
If None is passed, the closing price of the previous bar will be
used. To end up with a bar which for example takes time but it is not
displayed in a plot ... use float('Nan')
- fill_vol (def: float('NaN')):
Value to use to fill the missing volume
- fill_oi (def: float('NaN')):
Value to use to fill the missing Open Interest
- skip_first_fill (def: True):
Upon seeing the 1st valid bar do not fill from the sessionstart up to
that bar
'''
params = (('fill_price', None),
('fill_vol', float('NaN')),
('fill_oi', float('NaN')),
('skip_first_fill', True))
示例脚本现在可以过滤和填充数据:
./data-filler.py --writer --wrcsv --tstart 09:30 --tend 17:30 --filter --filler
...
62,2006-01-02-volume-min-001,62,2006-01-02 10:31:00,3614.0,3614.0,3614.0,3614.0,183.0,0.0,Strategy,62
63,2006-01-02-volume-min-001,63,2006-01-02 10:32:00,3614.0,3614.0,3614.0,3614.0,0.0,,Strategy,63
64,2006-01-02-volume-min-001,64,2006-01-02 10:33:00,3614.0,3614.0,3614.0,3614.0,0.0,,Strategy,64
65,2006-01-02-volume-min-001,65,2006-01-02 10:34:00,3614.0,3614.0,3614.0,3614.0,841.0,0.0,Strategy,65
...
还有 10 点 32 分和 10 点 33 分。脚本使用最后一个已知的“关闭”价格填充价格值,并将 volume 和 openinterest 字段设置为 0。脚本接受一个--fvol
参数,将卷设置为任何值(包括'NaN')
填写车票#23
通过SessionFilter
和SessionFiller
已完成以下工作:
-
未提供上市前/上市后数据
-
没有(给定时间段的)数据丢失
现在不再需要票据 23 中讨论的“同步”来实现RelativeVolume
指示器,因为所有的日子都有完全相同的条数(在本例中,从 09:30 到 17:30 的所有分钟都包括在内)
记住,默认设置是将缺少的音量设置为0
,可以开发一个简单的RelativeVolume
指示器:
class RelativeVolume(bt.Indicator):
csv = True # show up in csv output (default for indicators is False)
lines = ('relvol',)
params = (
('period', 20),
('volisnan', True),
)
def __init__(self):
if self.p.volisnan:
# if missing volume will be NaN, do a simple division
# the end result for missing volumes will also be NaN
relvol = self.data.volume(-self.p.period) / self.data.volume
else:
# Else do a controlled Div with a built-in function
relvol = bt.DivByZero(
self.data.volume(-self.p.period),
self.data.volume,
zero=0.0)
self.lines.relvol = relvol
它足够聪明,可以通过使用backtrader
中的内置辅助来避免被零除。
在脚本的下一次调用中将所有部分放在一起:
./data-filler.py --writer --wrcsv --tstart 09:30 --tend 17:30 --filter --filler --relvol
===============================================================================
Id,2006-01-02-volume-min-001,len,datetime,open,high,low,close,volume,openinterest,Strategy,len,RelativeVolume,len,relvol
1,2006-01-02-volume-min-001,1,2006-01-02 09:30:00,3604.0,3605.0,3603.0,3604.0,546.0,0.0,Strategy,1,RelativeVolume,1,
2,2006-01-02-volume-min-001,2,2006-01-02 09:31:00,3604.0,3606.0,3604.0,3606.0,438.0,0.0,Strategy,2,RelativeVolume,2,
...
在 1st条期间,RelativeVolume
指示器不产生预期的输出。脚本中的周期计算为:(17:30-09:30*60)+1。让我们直接看看第二天 10:32 和 10:33 的相对体积是如何变化的,考虑到第 1st天,体积值填充了0
:
...
543,2006-01-02-volume-min-001,543,2006-01-03 10:31:00,3648.0,3648.0,3647.0,3648.0,56.0,0.0,Strategy,543,RelativeVolume,543,3.26785714286
544,2006-01-02-volume-min-001,544,2006-01-03 10:32:00,3647.0,3648.0,3647.0,3647.0,313.0,0.0,Strategy,544,RelativeVolume,544,0.0
545,2006-01-02-volume-min-001,545,2006-01-03 10:33:00,3647.0,3647.0,3647.0,3647.0,135.0,0.0,Strategy,545,RelativeVolume,545,0.0
546,2006-01-02-volume-min-001,546,2006-01-03 10:34:00,3648.0,3648.0,3647.0,3648.0,171.0,0.0,Strategy,546,RelativeVolume,546,4.91812865497
...
如预期的那样,它被设置为0
。
结论
数据源中的filter
机制打开了完全操作数据流的可能性。小心使用。
脚本代码和用法
可在backtrader
来源中作为样本获得:
usage: data-filler.py [-h] [--data DATA] [--filter] [--filler] [--fvol FVOL]
[--tstart TSTART] [--tend TEND] [--relvol]
[--fromdate FROMDATE] [--todate TODATE] [--writer]
[--wrcsv] [--plot] [--numfigs NUMFIGS]
DataFilter/DataFiller Sample
optional arguments:
-h, --help show this help message and exit
--data DATA, -d DATA data to add to the system
--filter, -ft Filter using session start/end times
--filler, -fl Fill missing bars inside start/end times
--fvol FVOL Use as fill volume for missing bar (def: 0.0)
--tstart TSTART, -ts TSTART
Start time for the Session Filter (HH:MM)
--tend TEND, -te TEND
End time for the Session Filter (HH:MM)
--relvol, -rv Add relative volume indicator
--fromdate FROMDATE, -f FROMDATE
Starting date in YYYY-MM-DD format
--todate TODATE, -t TODATE
Starting date in YYYY-MM-DD format
--writer, -w Add a writer to cerebro
--wrcsv, -wc Enable CSV Output in the writer
--plot, -p Plot the read data
--numfigs NUMFIGS, -n NUMFIGS
Plot using numfigs figures
守则:
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import datetime
import math
# The above could be sent to an independent module
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.utils.flushfile
import backtrader.filters as btfilters
from relativevolume import RelativeVolume
def runstrategy():
args = parse_args()
# Create a cerebro
cerebro = bt.Cerebro()
# Get the dates from the args
fromdate = datetime.datetime.strptime(args.fromdate, '%Y-%m-%d')
todate = datetime.datetime.strptime(args.todate, '%Y-%m-%d')
# Get the session times to pass them to the indicator
# datetime.time has no strptime ...
dtstart = datetime.datetime.strptime(args.tstart, '%H:%M')
dtend = datetime.datetime.strptime(args.tend, '%H:%M')
# Create the 1st data
data = btfeeds.BacktraderCSVData(
dataname=args.data,
fromdate=fromdate,
todate=todate,
timeframe=bt.TimeFrame.Minutes,
compression=1,
sessionstart=dtstart, # internally just the "time" part will be used
sessionend=dtend, # internally just the "time" part will be used
)
if args.filter:
data.addfilter(btfilters.SessionFilter)
if args.filler:
data.addfilter(btfilters.SessionFiller, fill_vol=args.fvol)
# Add the data to cerebro
cerebro.adddata(data)
if args.relvol:
# Calculate backward period - tend tstart are in same day
# + 1 to include last moment of the interval dstart <-> dtend
td = ((dtend - dtstart).seconds // 60) + 1
cerebro.addindicator(RelativeVolume,
period=td,
volisnan=math.isnan(args.fvol))
# Add an empty strategy
cerebro.addstrategy(bt.Strategy)
# Add a writer with CSV
if args.writer:
cerebro.addwriter(bt.WriterFile, csv=args.wrcsv)
# And run it - no trading - disable stdstats
cerebro.run(stdstats=False)
# Plot if requested
if args.plot:
cerebro.plot(numfigs=args.numfigs, volume=True)
def parse_args():
parser = argparse.ArgumentParser(
description='DataFilter/DataFiller Sample')
parser.add_argument('--data', '-d',
default='../../datas/2006-01-02-volume-min-001.txt',
help='data to add to the system')
parser.add_argument('--filter', '-ft', action='store_true',
help='Filter using session start/end times')
parser.add_argument('--filler', '-fl', action='store_true',
help='Fill missing bars inside start/end times')
parser.add_argument('--fvol', required=False, default=0.0,
type=float,
help='Use as fill volume for missing bar (def: 0.0)')
parser.add_argument('--tstart', '-ts',
# default='09:14:59',
# help='Start time for the Session Filter (%H:%M:%S)')
default='09:15',
help='Start time for the Session Filter (HH:MM)')
parser.add_argument('--tend', '-te',
# default='17:15:59',
# help='End time for the Session Filter (%H:%M:%S)')
default='17:15',
help='End time for the Session Filter (HH:MM)')
parser.add_argument('--relvol', '-rv', action='store_true',
help='Add relative volume indicator')
parser.add_argument('--fromdate', '-f',
default='2006-01-01',
help='Starting date in YYYY-MM-DD format')
parser.add_argument('--todate', '-t',
default='2006-12-31',
help='Starting date in YYYY-MM-DD format')
parser.add_argument('--writer', '-w', action='store_true',
help='Add a writer to cerebro')
parser.add_argument('--wrcsv', '-wc', action='store_true',
help='Enable CSV Output in the writer')
parser.add_argument('--plot', '-p', action='store_true',
help='Plot the read data')
parser.add_argument('--numfigs', '-n', default=1,
help='Plot using numfigs figures')
return parser.parse_args()
if __name__ == '__main__':
runstrategy()
SessionFiller
来自backtrader
来源:
class SessionFiller(with_metaclass(metabase.MetaParams, object)):
'''
Bar Filler for a Data Source inside the declared session start/end times.
The fill bars are constructed using the declared Data Source ``timeframe``
and ``compression`` (used to calculate the intervening missing times)
Params:
- fill_price (def: None):
If None is passed, the closing price of the previous bar will be
used. To end up with a bar which for example takes time but it is not
displayed in a plot ... use float('Nan')
- fill_vol (def: float('NaN')):
Value to use to fill the missing volume
- fill_oi (def: float('NaN')):
Value to use to fill the missing Open Interest
- skip_first_fill (def: True):
Upon seeing the 1st valid bar do not fill from the sessionstart up to
that bar
'''
params = (('fill_price', None),
('fill_vol', float('NaN')),
('fill_oi', float('NaN')),
('skip_first_fill', True))
# Minimum delta unit in between bars
_tdeltas = {
TimeFrame.Minutes: datetime.timedelta(seconds=60),
TimeFrame.Seconds: datetime.timedelta(seconds=1),
TimeFrame.MicroSeconds: datetime.timedelta(microseconds=1),
}
def __init__(self, data):
# Calculate and save timedelta for timeframe
self._tdunit = self._tdeltas[data._timeframe] * data._compression
self.seenbar = False # control if at least one bar has been seen
self.sessend = MAXDATE # maxdate is the control for bar in session
def __call__(self, data):
'''
Params:
- data: the data source to filter/process
Returns:
- False (always) because this filter does not remove bars from the
stream
The logic (starting with a session end control flag of MAXDATE)
- If new bar is over session end (never true for 1st bar)
Fill up to session end. Reset sessionend to MAXDATE & fall through
- If session end is flagged as MAXDATE
Recalculate session limits and check whether the bar is within them
if so, fill up and record the last seen tim
- Else ... the incoming bar is in the session, fill up to it
'''
# Get time of current (from data source) bar
dtime_cur = data.datetime.datetime()
if dtime_cur > self.sessend:
# bar over session end - fill up and invalidate
self._fillbars(data, self.dtime_prev, self.sessend + self._tdunit)
self.sessend = MAXDATE
# Fall through from previous check ... the bar which is over the
# session could already be in a new session and within the limits
if self.sessend == MAXDATE:
# No bar seen yet or one went over previous session limit
sessstart = data.datetime.tm2datetime(data.sessionstart)
self.sessend = sessend = data.datetime.tm2datetime(data.sessionend)
if sessstart <= dtime_cur <= sessend:
# 1st bar from session in the session - fill from session start
if self.seenbar or not self.p.skip_first_fill:
self._fillbars(data, sessstart - self._tdunit, dtime_cur)
self.seenbar = True
self.dtime_prev = dtime_cur
else:
# Seen a previous bar and this is in the session - fill up to it
self._fillbars(data, self.dtime_prev, dtime_cur)
self.dtime_prev = dtime_cur
return False
def _fillbars(self, data, time_start, time_end, forcedirty=False):
'''
Fills one by one bars as needed from time_start to time_end
Invalidates the control dtime_prev if requested
'''
# Control flag - bars added to the stack
dirty = False
time_start += self._tdunit
while time_start < time_end:
dirty = self._fillbar(data, time_start)
time_start += self._tdunit
if dirty or forcedirty:
data._save2stack(erase=True)
def _fillbar(self, data, dtime):
# Prepare an array of the needed size
bar = [float('Nan')] * data.size()
# Fill datetime
bar[data.DateTime] = date2num(dtime)
# Fill the prices
price = self.p.fill_price or data.close[-1]
for pricetype in [data.Open, data.High, data.Low, data.Close]:
bar[pricetype] = price
# Fill volume and open interest
bar[data.Volume] = self.p.fill_vol
bar[data.OpenInterest] = self.p.fill_oi
# Fill extra lines the data feed may have defined beyond DateTime
for i in range(data.DateTime + 1, data.size()):
bar[i] = data.lines[i][0]
# Add tot he stack of bars to save
data._add2stack(bar)
return True