Skip to content

数据过滤器

原文: https://www.backtrader.com/blog/posts/2015-11-21-data-filters/data-filling-filtering/

前一段时间,罚单 23 让我想到了在该罚单的背景下进行的讨论的潜在改进。

在票中我加了一个DataFilter类,但这太复杂了。实际上让人想起DataResamplerDataReplayer中构建的复杂性,这些类用于实现相同名称的功能。

因此,由于有两个版本,backtrader支持在数据提要中添加filter(如果愿意,可以称之为processor。重采样和重放是使用该功能在内部重新实现的,所有事情似乎都不那么复杂(尽管仍然如此)

过滤器在工作

给定现有数据源/数据源,您使用数据源的addfilter方法:

data = MyDataFeed(name=myname)

data.addfilter(filter, *args, **kwargs) 

显然,filter必须符合给定的接口,即:

  • 接受此签名的可调用函数:

    py callable(data, *args, **kwargs)

  • 可以实例化和调用的类

    • 实例化时,init方法必须支持签名:

    py def __init__(self, data, *args, **kwargs)

    • 调用和最后一个方法这个:

    ```py def call(self, data)

    def last(self, data) ```

将为数据源生成的每个数据调用可调用/实例。

一个更好的解决方案

我想要那张票:

  • 以日内为基础的指数

  • 日内数据可能丢失

  • 会前/会后数据可能会到达

实现几个过滤器可以缓解回溯测试环境的情况。

筛选出上市前/上市后数据

以下过滤器(已在backtrader中提供)用于救援:

class SessionFilter(with_metaclass(metabase.MetaParams, object)):
    '''
    This class can be applied to a data source as a filter and will filter out
    intraday bars which fall outside of the regular session times (ie: pre/post
    market data)

    This is a "non-simple" filter and must manage the stack of the data (passed
    during init and __call__)

    It needs no "last" method because it has nothing to deliver
    '''
    def __init__(self, data):
        pass

    def __call__(self, data):
        '''
        Return Values:

          - False: data stream was not touched
          - True: data stream was manipulated (bar outside of session times and
          - removed)
        '''
        if data.sessionstart <= data.datetime.tm(0) <= data.sessionend:
            # Both ends of the comparison are in the session
            return False  # say the stream is untouched

        # bar outside of the regular session times
        data.backwards()  # remove bar from data stack
        return True  # signal the data was manipulated 

过滤器使用嵌入会话开始/结束时间中的数据来过滤条

  • 如果新数据的日期时间在会话时间内,则返回False以指示数据未被触及

  • 如果数据时间超出范围,则发送数据源backwards有效擦除最后生成的数据。返回True表示数据流被操作。

笔记

调用data.backwards()可能是/可能是低级别的,过滤器应该有一个处理数据流内部的 API

脚本末尾的示例代码可以使用过滤器运行,也可以不使用过滤器运行。第一次运行是 100%未筛选的,并且没有指定会话时间:

$ ./data-filler.py --writer --wrcsv 

查看第 1st天的开始和结束:

===============================================================================
Id,2006-01-02-volume-min-001,len,datetime,open,high,low,close,volume,openinterest,Strategy,len
1,2006-01-02-volume-min-001,1,2006-01-02 09:01:00,3602.0,3603.0,3597.0,3599.0,5699.0,0.0,Strategy,1
2,2006-01-02-volume-min-001,2,2006-01-02 09:02:00,3600.0,3601.0,3598.0,3599.0,894.0,0.0,Strategy,2
...
...
581,2006-01-02-volume-min-001,581,2006-01-02 19:59:00,3619.0,3619.0,3619.0,3619.0,1.0,0.0,Strategy,581
582,2006-01-02-volume-min-001,582,2006-01-02 20:00:00,3618.0,3618.0,3617.0,3618.0,242.0,0.0,Strategy,582
583,2006-01-02-volume-min-001,583,2006-01-02 20:01:00,3618.0,3618.0,3617.0,3617.0,15.0,0.0,Strategy,583
584,2006-01-02-volume-min-001,584,2006-01-02 20:04:00,3617.0,3617.0,3617.0,3617.0,107.0,0.0,Strategy,584
585,2006-01-02-volume-min-001,585,2006-01-03 09:01:00,3623.0,3625.0,3622.0,3624.0,4026.0,0.0,Strategy,585
... 

会议时间为 2006 年 1 月 2 日 09:01:00 至 20:04:00。

现在使用SessionFilter运行,并告诉脚本使用 09:30 和 17:30 作为会话的开始/结束时间:

$ ./data-filler.py --writer --wrcsv --tstart 09:30 --tend 17:30 --filter

===============================================================================
Id,2006-01-02-volume-min-001,len,datetime,open,high,low,close,volume,openinterest,Strategy,len
1,2006-01-02-volume-min-001,1,2006-01-02 09:30:00,3604.0,3605.0,3603.0,3604.0,546.0,0.0,Strategy,1
2,2006-01-02-volume-min-001,2,2006-01-02 09:31:00,3604.0,3606.0,3604.0,3606.0,438.0,0.0,Strategy,2
...
...
445,2006-01-02-volume-min-001,445,2006-01-02 17:29:00,3621.0,3621.0,3620.0,3620.0,866.0,0.0,Strategy,445
446,2006-01-02-volume-min-001,446,2006-01-02 17:30:00,3620.0,3621.0,3619.0,3621.0,1670.0,0.0,Strategy,446
447,2006-01-02-volume-min-001,447,2006-01-03 09:30:00,3637.0,3638.0,3635.0,3636.0,1458.0,0.0,Strategy,447
... 

数据输出现在从 09:30 开始,到 17:30 结束。已筛选出上市前/上市后数据。

填写缺失数据

对输出的深入检查显示如下:

...
61,2006-01-02-volume-min-001,61,2006-01-02 10:30:00,3613.0,3614.0,3613.0,3614.0,112.0,0.0,Strategy,61
62,2006-01-02-volume-min-001,62,2006-01-02 10:31:00,3614.0,3614.0,3614.0,3614.0,183.0,0.0,Strategy,62
63,2006-01-02-volume-min-001,63,2006-01-02 10:34:00,3614.0,3614.0,3614.0,3614.0,841.0,0.0,Strategy,63
64,2006-01-02-volume-min-001,64,2006-01-02 10:35:00,3614.0,3614.0,3614.0,3614.0,17.0,0.0,Strategy,64
... 

缺少 10:32 和 10:33 分钟的数据。作为一年中的第一个交易日,可能根本没有任何谈判。或者,数据源可能无法捕获该数据。

为了记录 23 分钟,并能够将给定分钟的音量与前一天的相同分钟进行比较,我们将填写缺失的数据。

backtrader中已经有一个SessionFiller,正如预期的那样,它填充了缺失的数据。代码很长,比过滤器更复杂(完整的实现请参见结尾),但让我们看看类/参数的定义:

class SessionFiller(with_metaclass(metabase.MetaParams, object)):
    '''
    Bar Filler for a Data Source inside the declared session start/end times.

    The fill bars are constructed using the declared Data Source ``timeframe``
    and ``compression`` (used to calculate the intervening missing times)

    Params:

      - fill_price (def: None):

        If None is passed, the closing price of the previous bar will be
        used. To end up with a bar which for example takes time but it is not
        displayed in a plot ... use float('Nan')

      - fill_vol (def: float('NaN')):

        Value to use to fill the missing volume

      - fill_oi (def: float('NaN')):

        Value to use to fill the missing Open Interest

      - skip_first_fill (def: True):

        Upon seeing the 1st valid bar do not fill from the sessionstart up to
        that bar
    '''
    params = (('fill_price', None),
              ('fill_vol', float('NaN')),
              ('fill_oi', float('NaN')),
              ('skip_first_fill', True)) 

示例脚本现在可以过滤和填充数据:

./data-filler.py --writer --wrcsv --tstart 09:30 --tend 17:30 --filter --filler

...
62,2006-01-02-volume-min-001,62,2006-01-02 10:31:00,3614.0,3614.0,3614.0,3614.0,183.0,0.0,Strategy,62
63,2006-01-02-volume-min-001,63,2006-01-02 10:32:00,3614.0,3614.0,3614.0,3614.0,0.0,,Strategy,63
64,2006-01-02-volume-min-001,64,2006-01-02 10:33:00,3614.0,3614.0,3614.0,3614.0,0.0,,Strategy,64
65,2006-01-02-volume-min-001,65,2006-01-02 10:34:00,3614.0,3614.0,3614.0,3614.0,841.0,0.0,Strategy,65
... 

还有 10 点 32 分和 10 点 33 分。脚本使用最后一个已知的“关闭”价格填充价格值,并将 volume 和 openinterest 字段设置为 0。脚本接受一个--fvol参数,将卷设置为任何值(包括'NaN')

填写车票#23

通过SessionFilterSessionFiller已完成以下工作:

  • 未提供上市前/上市后数据

  • 没有(给定时间段的)数据丢失

现在不再需要票据 23 中讨论的“同步”来实现RelativeVolume指示器,因为所有的日子都有完全相同的条数(在本例中,从 09:30 到 17:30 的所有分钟都包括在内)

记住,默认设置是将缺少的音量设置为0,可以开发一个简单的RelativeVolume指示器:

class RelativeVolume(bt.Indicator):
    csv = True  # show up in csv output (default for indicators is False)

    lines = ('relvol',)
    params = (
        ('period', 20),
        ('volisnan', True),
    )

    def __init__(self):
        if self.p.volisnan:
            # if missing volume will be NaN, do a simple division
            # the end result for missing volumes will also be NaN
            relvol = self.data.volume(-self.p.period) / self.data.volume
        else:
            # Else do a controlled Div with a built-in function
            relvol = bt.DivByZero(
                self.data.volume(-self.p.period),
                self.data.volume,
                zero=0.0)

        self.lines.relvol = relvol 

它足够聪明,可以通过使用backtrader中的内置辅助来避免被零除。

在脚本的下一次调用中将所有部分放在一起:

./data-filler.py --writer --wrcsv --tstart 09:30 --tend 17:30 --filter --filler --relvol

===============================================================================
Id,2006-01-02-volume-min-001,len,datetime,open,high,low,close,volume,openinterest,Strategy,len,RelativeVolume,len,relvol
1,2006-01-02-volume-min-001,1,2006-01-02 09:30:00,3604.0,3605.0,3603.0,3604.0,546.0,0.0,Strategy,1,RelativeVolume,1,
2,2006-01-02-volume-min-001,2,2006-01-02 09:31:00,3604.0,3606.0,3604.0,3606.0,438.0,0.0,Strategy,2,RelativeVolume,2,
... 

在 1st条期间,RelativeVolume指示器不产生预期的输出。脚本中的周期计算为:(17:30-09:30*60)+1。让我们直接看看第二天 10:32 和 10:33 的相对体积是如何变化的,考虑到第 1st天,体积值填充了0

...
543,2006-01-02-volume-min-001,543,2006-01-03 10:31:00,3648.0,3648.0,3647.0,3648.0,56.0,0.0,Strategy,543,RelativeVolume,543,3.26785714286
544,2006-01-02-volume-min-001,544,2006-01-03 10:32:00,3647.0,3648.0,3647.0,3647.0,313.0,0.0,Strategy,544,RelativeVolume,544,0.0
545,2006-01-02-volume-min-001,545,2006-01-03 10:33:00,3647.0,3647.0,3647.0,3647.0,135.0,0.0,Strategy,545,RelativeVolume,545,0.0
546,2006-01-02-volume-min-001,546,2006-01-03 10:34:00,3648.0,3648.0,3647.0,3648.0,171.0,0.0,Strategy,546,RelativeVolume,546,4.91812865497
... 

如预期的那样,它被设置为0

结论

数据源中的filter机制打开了完全操作数据流的可能性。小心使用。

脚本代码和用法

可在backtrader来源中作为样本获得:

usage: data-filler.py [-h] [--data DATA] [--filter] [--filler] [--fvol FVOL]
                      [--tstart TSTART] [--tend TEND] [--relvol]
                      [--fromdate FROMDATE] [--todate TODATE] [--writer]
                      [--wrcsv] [--plot] [--numfigs NUMFIGS]

DataFilter/DataFiller Sample

optional arguments:
  -h, --help            show this help message and exit
  --data DATA, -d DATA  data to add to the system
  --filter, -ft         Filter using session start/end times
  --filler, -fl         Fill missing bars inside start/end times
  --fvol FVOL           Use as fill volume for missing bar (def: 0.0)
  --tstart TSTART, -ts TSTART
                        Start time for the Session Filter (HH:MM)
  --tend TEND, -te TEND
                        End time for the Session Filter (HH:MM)
  --relvol, -rv         Add relative volume indicator
  --fromdate FROMDATE, -f FROMDATE
                        Starting date in YYYY-MM-DD format
  --todate TODATE, -t TODATE
                        Starting date in YYYY-MM-DD format
  --writer, -w          Add a writer to cerebro
  --wrcsv, -wc          Enable CSV Output in the writer
  --plot, -p            Plot the read data
  --numfigs NUMFIGS, -n NUMFIGS
                        Plot using numfigs figures 

守则:

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import argparse
import datetime
import math

# The above could be sent to an independent module
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.utils.flushfile
import backtrader.filters as btfilters

from relativevolume import RelativeVolume

def runstrategy():
    args = parse_args()

    # Create a cerebro
    cerebro = bt.Cerebro()

    # Get the dates from the args
    fromdate = datetime.datetime.strptime(args.fromdate, '%Y-%m-%d')
    todate = datetime.datetime.strptime(args.todate, '%Y-%m-%d')

    # Get the session times to pass them to the indicator
    # datetime.time has no strptime ...
    dtstart = datetime.datetime.strptime(args.tstart, '%H:%M')
    dtend = datetime.datetime.strptime(args.tend, '%H:%M')

    # Create the 1st data
    data = btfeeds.BacktraderCSVData(
        dataname=args.data,
        fromdate=fromdate,
        todate=todate,
        timeframe=bt.TimeFrame.Minutes,
        compression=1,
        sessionstart=dtstart,  # internally just the "time" part will be used
        sessionend=dtend,  # internally just the "time" part will be used
    )

    if args.filter:
        data.addfilter(btfilters.SessionFilter)

    if args.filler:
        data.addfilter(btfilters.SessionFiller, fill_vol=args.fvol)

    # Add the data to cerebro
    cerebro.adddata(data)

    if args.relvol:
        # Calculate backward period - tend tstart are in same day
        # + 1 to include last moment of the interval dstart <-> dtend
        td = ((dtend - dtstart).seconds // 60) + 1
        cerebro.addindicator(RelativeVolume,
                             period=td,
                             volisnan=math.isnan(args.fvol))

    # Add an empty strategy
    cerebro.addstrategy(bt.Strategy)

    # Add a writer with CSV
    if args.writer:
        cerebro.addwriter(bt.WriterFile, csv=args.wrcsv)

    # And run it - no trading - disable stdstats
    cerebro.run(stdstats=False)

    # Plot if requested
    if args.plot:
        cerebro.plot(numfigs=args.numfigs, volume=True)

def parse_args():
    parser = argparse.ArgumentParser(
        description='DataFilter/DataFiller Sample')

    parser.add_argument('--data', '-d',
                        default='../../datas/2006-01-02-volume-min-001.txt',
                        help='data to add to the system')

    parser.add_argument('--filter', '-ft', action='store_true',
                        help='Filter using session start/end times')

    parser.add_argument('--filler', '-fl', action='store_true',
                        help='Fill missing bars inside start/end times')

    parser.add_argument('--fvol', required=False, default=0.0,
                        type=float,
                        help='Use as fill volume for missing bar (def: 0.0)')

    parser.add_argument('--tstart', '-ts',
                        # default='09:14:59',
                        # help='Start time for the Session Filter (%H:%M:%S)')
                        default='09:15',
                        help='Start time for the Session Filter (HH:MM)')

    parser.add_argument('--tend', '-te',
                        # default='17:15:59',
                        # help='End time for the Session Filter (%H:%M:%S)')
                        default='17:15',
                        help='End time for the Session Filter (HH:MM)')

    parser.add_argument('--relvol', '-rv', action='store_true',
                        help='Add relative volume indicator')

    parser.add_argument('--fromdate', '-f',
                        default='2006-01-01',
                        help='Starting date in YYYY-MM-DD format')

    parser.add_argument('--todate', '-t',
                        default='2006-12-31',
                        help='Starting date in YYYY-MM-DD format')

    parser.add_argument('--writer', '-w', action='store_true',
                        help='Add a writer to cerebro')

    parser.add_argument('--wrcsv', '-wc', action='store_true',
                        help='Enable CSV Output in the writer')

    parser.add_argument('--plot', '-p', action='store_true',
                        help='Plot the read data')

    parser.add_argument('--numfigs', '-n', default=1,
                        help='Plot using numfigs figures')

    return parser.parse_args()

if __name__ == '__main__':
    runstrategy() 

SessionFiller

来自backtrader来源:

class SessionFiller(with_metaclass(metabase.MetaParams, object)):
    '''
    Bar Filler for a Data Source inside the declared session start/end times.

    The fill bars are constructed using the declared Data Source ``timeframe``
    and ``compression`` (used to calculate the intervening missing times)

    Params:

      - fill_price (def: None):

        If None is passed, the closing price of the previous bar will be
        used. To end up with a bar which for example takes time but it is not
        displayed in a plot ... use float('Nan')

      - fill_vol (def: float('NaN')):

        Value to use to fill the missing volume

      - fill_oi (def: float('NaN')):

        Value to use to fill the missing Open Interest

      - skip_first_fill (def: True):

        Upon seeing the 1st valid bar do not fill from the sessionstart up to
        that bar
    '''
    params = (('fill_price', None),
              ('fill_vol', float('NaN')),
              ('fill_oi', float('NaN')),
              ('skip_first_fill', True))

    # Minimum delta unit in between bars
    _tdeltas = {
        TimeFrame.Minutes: datetime.timedelta(seconds=60),
        TimeFrame.Seconds: datetime.timedelta(seconds=1),
        TimeFrame.MicroSeconds: datetime.timedelta(microseconds=1),
    }

    def __init__(self, data):
        # Calculate and save timedelta for timeframe
        self._tdunit = self._tdeltas[data._timeframe] * data._compression

        self.seenbar = False  # control if at least one bar has been seen
        self.sessend = MAXDATE  # maxdate is the control for bar in session

    def __call__(self, data):
        '''
        Params:
          - data: the data source to filter/process

        Returns:
          - False (always) because this filter does not remove bars from the
        stream

        The logic (starting with a session end control flag of MAXDATE)

          - If new bar is over session end (never true for 1st bar)

            Fill up to session end. Reset sessionend to MAXDATE & fall through

          - If session end is flagged as MAXDATE

            Recalculate session limits and check whether the bar is within them

            if so, fill up and record the last seen tim

          - Else ... the incoming bar is in the session, fill up to it
        '''
        # Get time of current (from data source) bar
        dtime_cur = data.datetime.datetime()

        if dtime_cur > self.sessend:
            # bar over session end - fill up and invalidate
            self._fillbars(data, self.dtime_prev, self.sessend + self._tdunit)
            self.sessend = MAXDATE

        # Fall through from previous check ... the bar which is over the
        # session could already be in a new session and within the limits
        if self.sessend == MAXDATE:
            # No bar seen yet or one went over previous session limit
            sessstart = data.datetime.tm2datetime(data.sessionstart)
            self.sessend = sessend = data.datetime.tm2datetime(data.sessionend)

            if sessstart <= dtime_cur <= sessend:
                # 1st bar from session in the session - fill from session start
                if self.seenbar or not self.p.skip_first_fill:
                    self._fillbars(data, sessstart - self._tdunit, dtime_cur)

            self.seenbar = True
            self.dtime_prev = dtime_cur

        else:
            # Seen a previous bar and this is in the session - fill up to it
            self._fillbars(data, self.dtime_prev, dtime_cur)
            self.dtime_prev = dtime_cur

        return False

    def _fillbars(self, data, time_start, time_end, forcedirty=False):
        '''
        Fills one by one bars as needed from time_start to time_end

        Invalidates the control dtime_prev if requested
        '''
        # Control flag - bars added to the stack
        dirty = False

        time_start += self._tdunit
        while time_start < time_end:
            dirty = self._fillbar(data, time_start)
            time_start += self._tdunit

        if dirty or forcedirty:
            data._save2stack(erase=True)

    def _fillbar(self, data, dtime):
        # Prepare an array of the needed size
        bar = [float('Nan')] * data.size()

        # Fill datetime
        bar[data.DateTime] = date2num(dtime)

        # Fill the prices
        price = self.p.fill_price or data.close[-1]
        for pricetype in [data.Open, data.High, data.Low, data.Close]:
            bar[pricetype] = price

        # Fill volume and open interest
        bar[data.Volume] = self.p.fill_vol
        bar[data.OpenInterest] = self.p.fill_oi

        # Fill extra lines the data feed may have defined beyond DateTime
        for i in range(data.DateTime + 1, data.size()):
            bar[i] = data.lines[i][0]

        # Add tot he stack of bars to save
        data._add2stack(bar)

        return True 


回到顶部