Skip to content

Kalman 等人。

原文: https://www.backtrader.com/blog/posts/2017-02-14-kalman-et-al/kalman-et-al/

笔记

对以下指令的支持从 commit 开始

开发分支中的 1146c83d9f9832630e97daab3ec7359705dc2c77

发布1.9.30.x将是包含它的 1st

backtrader的最初目标之一是成为纯 python,即:只使用标准发行版中可用的包。matplotlib有一个例外,即在不重新发明轮子的情况下进行绘图。虽然在最新可能的时刻导入,以避免中断可能根本不需要打印的标准操作(并避免未安装或不希望出现错误)

随着实时数据馈送(可能位于本地时区之外)的出现,在增加对时区的支持时,对pytz进行了部分的第 2例外。再次在后台执行导入操作,并且只有pytz可用(用户可以选择传递pytz实例)

但现在是一个完全例外的时刻,因为反向交易者使用的是numpypandasstatsmodel等知名软件包,以及pykalman等更为温和的软件包。或者在平台中包含使用这些包的东西。

社区的一些例子:

此愿望已添加到此处绘制的快速路线图中:

声明式方法

保持backtrader的原始精神,同时允许使用这些软件包的关键是不要强迫纯 python 用户必须安装这些软件包。

尽管这看起来很有挑战性,并且容易出现多个条件语句以及异常处理,但平台内部和用户外部的方法是依赖于已经用于开发其他概念的相同原则,例如参数(命名为参数)用于大多数对象。

让我们回忆一下如何定义Indicator接受参数和定义

class MyIndicator(bt.Indicator):
    lines = ('myline',)
    params = (
        ('period', 50),
    ) 

参数period稍后可寻址为self.params.periodself.p.period,如中所示:

def __init__(self):
    print('my period is:', self.p.period) 

行中的当前值为self.lines.mylineself.l.myline如中所示:

def next(self):
    print('mylines[0]:', self.lines.myline[0]) 

这并不是特别有用,只是展示了参数背景机制的声明性方法,它还具有对继承的适当支持(包括多重继承

介绍packages

使用相同的声明性技术(有些人称之为元编程),对外部的支持如下:

class MyIndicator(bt.Indicator):
    packages = ('pandas',)
    lines = ('myline',)
    params = (
        ('period', 50),
    ) 

起泡的藤壶!!!这似乎只是另一个宣言。指标实施者的第一个问题是:

  • 我必须手动导入“熊猫”吗?

答案很简单:。后台机器将导入pandas并使其在定义MyIndicator的模块中可用。现在可以在next中执行以下操作:

def next(self):
    print('mylines[0]:', pandas.SomeFunction(self.lines.myline[0])) 

packages指令也可用于:

  • 在一个声明中导入多个包

  • 将导入分配给别名 alaimport pandas as pd

假设 statsmodel 也希望sm完成pandas.SomeFunction

class MyIndicator(bt.Indicator):
    packages = ('pandas', ('statsmodel', 'sm'),)
    lines = ('myline',)
    params = (
        ('period', 50),
    )

    def next(self):
        print('mylines[0]:', sm.XX(pandas.SomeFunction(self.lines.myline[0]))) 

statsmodel已作为sm导入并可用。只需传递一个 iterable(一个tuplebacktrader约定)以及包的名称和所需的别名。

加入frompackages

Python 以不断查找事物而闻名,这也是该语言在动态性、内省功能和元编程方面非常出色的原因之一。同时也是无法提供相同性能的原因之一。

通常的加速方法之一是通过直接从模块中导入符号来删除模块中的查找,以便进行本地查找。我们从pandas开始的SomeFunction看起来像:

from pandas import SomeFunction 

或使用别名:

from pandas import SomeFunction as SomeFunc 

反向交易者通过frompackages指令为两者提供支持。让我们重做MyIndicator

class MyIndicator(bt.Indicator):
    frompackages = (('pandas', 'SomeFunction'),)
    lines = ('myline',)
    params = (
        ('period', 50),
    )

    def next(self):
        print('mylines[0]:', SomeFunction(self.lines.myline[0])) 

当然,这将开始添加更多的括号。例如,如果将从pandas导入两(2)件物品,则看起来是这样的:

class MyIndicator(bt.Indicator):
    frompackages = (('pandas', ['SomeFunction', 'SomeFunction2']),)
    lines = ('myline',)
    params = (
        ('period', 50),
    )

    def next(self):
        print('mylines[0]:', SomeFunction2(SomeFunction(self.lines.myline[0]))) 

为了清楚起见,将SomeFunctionSomeFunction2放在list而不是tuple中,以便使用方括号[]并能够更好地阅读。

也可以将SomeFunction别名为SFunc等。完整示例:

class MyIndicator(bt.Indicator):
    frompackages = (('pandas', [('SomeFunction', 'SFunc'), 'SomeFunction2']),)
    lines = ('myline',)
    params = (
        ('period', 50),
    )

    def next(self):
        print('mylines[0]:', SomeFunction2(SFunc(self.lines.myline[0]))) 

从不同的包中导入是可能的,但需要更多的括号。当然,换行符和缩进有助于:

class MyIndicator(bt.Indicator):
    frompackages = (
        ('pandas', [('SomeFunction', 'SFunc'), 'SomeFunction2']),
        ('statsmodel', 'XX'),
    )
    lines = ('myline',)
    params = (
        ('period', 50),
    )

    def next(self):
        print('mylines[0]:', XX(SomeFunction2(SFunc(self.lines.myline[0])))) 

使用继承

packagesfrompackages都支持(多重)继承。例如,可能有一个基类向所有子类添加了numpy支持:

class NumPySupport(object):
    packages = ('numpy',)

class MyIndicator(bt.Indicator, NumPySupport):
    packages = ('pandas',) 

MyIndicator需要从后台机器进口numpypandas并能够使用它们。

介绍 Kalman 和朋友

笔记

以下两个指标都需要同行审查以确认实施情况。小心使用。

下面可以找到实现KalmanMovingAverage的示例。这是模仿这里的一篇帖子:量子论系列讲座:卡尔曼滤波器

实施:

class KalmanMovingAverage(bt.indicators.MovingAverageBase):
    packages = ('pykalman',)
    frompackages = (('pykalman', [('KalmanFilter', 'KF')]),)
    lines = ('kma',)
    alias = ('KMA',)
    params = (
        ('initial_state_covariance', 1.0),
        ('observation_covariance', 1.0),
        ('transition_covariance', 0.05),
    )

    plotlines = dict(cov=dict(_plotskip=True))

    def __init__(self):
        self.addminperiod(self.p.period)  # when to deliver values
        self._dlast = self.data(-1)  # get previous day value

    def nextstart(self):
        self._k1 = self._dlast[0]
        self._c1 = self.p.initial_state_covariance

        self._kf = pykalman.KalmanFilter(
            transition_matrices=[1],
            observation_matrices=[1],
            observation_covariance=self.p.observation_covariance,
            transition_covariance=self.p.transition_covariance,
            initial_state_mean=self._k1,
            initial_state_covariance=self._c1,
        )

        self.next()

    def next(self):
        k1, self._c1 = self._kf.filter_update(self._k1, self._c1, self.data[0])
        self.lines.kma[0] = self._k1 = k1 

在这里的帖子后面有一个KalmanFilter基于卡尔曼滤波的 QSTrader成对交易策略

class NumPy(object):
    packages = (('numpy', 'np'),)

class KalmanFilterInd(bt.Indicator, NumPy):
    _mindatas = 2  # needs at least 2 data feeds

    packages = ('pandas',)
    lines = ('et', 'sqrt_qt')

    params = dict(
        delta=1e-4,
        vt=1e-3,
    )

    def __init__(self):
        self.wt = self.p.delta / (1 - self.p.delta) * np.eye(2)
        self.theta = np.zeros(2)
        self.P = np.zeros((2, 2))
        self.R = None

        self.d1_prev = self.data1(-1)  # data1 yesterday's price

    def next(self):
        F = np.asarray([self.data0[0], 1.0]).reshape((1, 2))
        y = self.d1_prev[0]

        if self.R is not None:  # self.R starts as None, self.C set below
            self.R = self.C + self.wt
        else:
            self.R = np.zeros((2, 2))

        yhat = F.dot(self.theta)
        et = y - yhat

        # Q_t is the variance of the prediction of observations and hence
        # \sqrt{Q_t} is the standard deviation of the predictions
        Qt = F.dot(self.R).dot(F.T) + self.p.vt
        sqrt_Qt = np.sqrt(Qt)

        # The posterior value of the states \theta_t is distributed as a
        # multivariate Gaussian with mean m_t and variance-covariance C_t
        At = self.R.dot(F.T) / Qt
        self.theta = self.theta + At.flatten() * et
        self.C = self.R - At * F.dot(self.R)

        # Fill the lines
        self.lines.et[0] = et
        self.lines.sqrt_qt[0] = sqrt_Qt 

为了说明packages如何与继承一起工作(实际上并不需要pandas

示例的执行:

$ ./kalman-things.py --plot 

生成此图表

!image

样本使用

$ ./kalman-things.py --help
usage: kalman-things.py [-h] [--data0 DATA0] [--data1 DATA1]
                        [--fromdate FROMDATE] [--todate TODATE]
                        [--cerebro kwargs] [--broker kwargs] [--sizer kwargs]
                        [--strat kwargs] [--plot [kwargs]]

Packages and Kalman

optional arguments:
  -h, --help           show this help message and exit
  --data0 DATA0        Data to read in (default:
                       ../../datas/nvda-1999-2014.txt)
  --data1 DATA1        Data to read in (default:
                       ../../datas/orcl-1995-2014.txt)
  --fromdate FROMDATE  Date[time] in YYYY-MM-DD[THH:MM:SS] format (default:
                       2006-01-01)
  --todate TODATE      Date[time] in YYYY-MM-DD[THH:MM:SS] format (default:
                       2007-01-01)
  --cerebro kwargs     kwargs in key=value format (default: runonce=False)
  --broker kwargs      kwargs in key=value format (default: )
  --sizer kwargs       kwargs in key=value format (default: )
  --strat kwargs       kwargs in key=value format (default: )
  --plot [kwargs]      kwargs in key=value format (default: ) 

示例代码

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import argparse
import datetime

import backtrader as bt

class KalmanMovingAverage(bt.indicators.MovingAverageBase):
    packages = ('pykalman',)
    frompackages = (('pykalman', [('KalmanFilter', 'KF')]),)
    lines = ('kma',)
    alias = ('KMA',)
    params = (
        ('initial_state_covariance', 1.0),
        ('observation_covariance', 1.0),
        ('transition_covariance', 0.05),
    )

    def __init__(self):
        self.addminperiod(self.p.period)  # when to deliver values
        self._dlast = self.data(-1)  # get previous day value

    def nextstart(self):
        self._k1 = self._dlast[0]
        self._c1 = self.p.initial_state_covariance

        self._kf = pykalman.KalmanFilter(
            transition_matrices=[1],
            observation_matrices=[1],
            observation_covariance=self.p.observation_covariance,
            transition_covariance=self.p.transition_covariance,
            initial_state_mean=self._k1,
            initial_state_covariance=self._c1,
        )

        self.next()

    def next(self):
        k1, self._c1 = self._kf.filter_update(self._k1, self._c1, self.data[0])
        self.lines.kma[0] = self._k1 = k1

class NumPy(object):
    packages = (('numpy', 'np'),)

class KalmanFilterInd(bt.Indicator, NumPy):
    _mindatas = 2  # needs at least 2 data feeds

    packages = ('pandas',)
    lines = ('et', 'sqrt_qt')

    params = dict(
        delta=1e-4,
        vt=1e-3,
    )

    def __init__(self):
        self.wt = self.p.delta / (1 - self.p.delta) * np.eye(2)
        self.theta = np.zeros(2)
        self.R = None

        self.d1_prev = self.data1(-1)  # data1 yesterday's price

    def next(self):
        F = np.asarray([self.data0[0], 1.0]).reshape((1, 2))
        y = self.d1_prev[0]

        if self.R is not None:  # self.R starts as None, self.C set below
            self.R = self.C + self.wt
        else:
            self.R = np.zeros((2, 2))

        yhat = F.dot(self.theta)
        et = y - yhat

        # Q_t is the variance of the prediction of observations and hence
        # \sqrt{Q_t} is the standard deviation of the predictions
        Qt = F.dot(self.R).dot(F.T) + self.p.vt
        sqrt_Qt = np.sqrt(Qt)

        # The posterior value of the states \theta_t is distributed as a
        # multivariate Gaussian with mean m_t and variance-covariance C_t
        At = self.R.dot(F.T) / Qt
        self.theta = self.theta + At.flatten() * et
        self.C = self.R - At * F.dot(self.R)

        # Fill the lines
        self.lines.et[0] = et
        self.lines.sqrt_qt[0] = sqrt_Qt

class KalmanSignals(bt.Indicator):
    _mindatas = 2  # needs at least 2 data feeds

    lines = ('long', 'short',)

    def __init__(self):
        kf = KalmanFilterInd()
        et, sqrt_qt = kf.lines.et, kf.lines.sqrt_qt

        self.lines.long = et < -1.0 * sqrt_qt
        # longexit is et > -1.0 * sqrt_qt ... the opposite of long
        self.lines.short = et > sqrt_qt
        # shortexit is et < sqrt_qt ... the opposite of short

class St(bt.Strategy):
    params = dict(
        ksigs=False,  # attempt trading
        period=30,
    )

    def __init__(self):
        if self.p.ksigs:
            self.ksig = KalmanSignals()
            KalmanFilter()

        KalmanMovingAverage(period=self.p.period)
        bt.ind.SMA(period=self.p.period)
        if True:
            kf = KalmanFilterInd()
            kf.plotlines.sqrt_qt._plotskip = True

    def next(self):
        if not self.p.ksigs:
            return

        size = self.position.size
        if not size:
            if self.ksig.long:
                self.buy()
            elif self.ksig.short:
                self.sell()

        elif size > 0:
            if not self.ksig.long:
                self.close()
        elif not self.ksig.short:  # implicit size < 0
            self.close()

def runstrat(args=None):
    args = parse_args(args)

    cerebro = bt.Cerebro()

    # Data feed kwargs
    kwargs = dict()

    # Parse from/to-date
    dtfmt, tmfmt = '%Y-%m-%d', 'T%H:%M:%S'
    for a, d in ((getattr(args, x), x) for x in ['fromdate', 'todate']):
        if a:
            strpfmt = dtfmt + tmfmt * ('T' in a)
            kwargs[d] = datetime.datetime.strptime(a, strpfmt)

    # Data feed
    data0 = bt.feeds.YahooFinanceCSVData(dataname=args.data0, **kwargs)
    cerebro.adddata(data0)

    data1 = bt.feeds.YahooFinanceCSVData(dataname=args.data1, **kwargs)
    data1.plotmaster = data0
    cerebro.adddata(data1)

    # Broker
    cerebro.broker = bt.brokers.BackBroker(**eval('dict(' + args.broker + ')'))

    # Sizer
    cerebro.addsizer(bt.sizers.FixedSize, **eval('dict(' + args.sizer + ')'))

    # Strategy
    cerebro.addstrategy(St, **eval('dict(' + args.strat + ')'))

    # Execute
    cerebro.run(**eval('dict(' + args.cerebro + ')'))

    if args.plot:  # Plot if requested to
        cerebro.plot(**eval('dict(' + args.plot + ')'))

def parse_args(pargs=None):
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
        description=(
            'Packages and Kalman'
        )
    )

    parser.add_argument('--data0', default='../../datas/nvda-1999-2014.txt',
                        required=False, help='Data to read in')

    parser.add_argument('--data1', default='../../datas/orcl-1995-2014.txt',
                        required=False, help='Data to read in')

    # Defaults for dates
    parser.add_argument('--fromdate', required=False, default='2006-01-01',
                        help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')

    parser.add_argument('--todate', required=False, default='2007-01-01',
                        help='Date[time] in YYYY-MM-DD[THH:MM:SS] format')

    parser.add_argument('--cerebro', required=False, default='runonce=False',
                        metavar='kwargs', help='kwargs in key=value format')

    parser.add_argument('--broker', required=False, default='',
                        metavar='kwargs', help='kwargs in key=value format')

    parser.add_argument('--sizer', required=False, default='',
                        metavar='kwargs', help='kwargs in key=value format')

    parser.add_argument('--strat', required=False, default='',
                        metavar='kwargs', help='kwargs in key=value format')

    parser.add_argument('--plot', required=False, default='',
                        nargs='?', const='{}',
                        metavar='kwargs', help='kwargs in key=value format')

    return parser.parse_args(pargs)

if __name__ == '__main__':
    runstrat() 


回到顶部