pythonでファイナンス計算

先日会社で退職金の説明会がありました。それによると退職金の一部を確定拠出年金として自分で運用しなくてはいけないとのことで、ポートフォリオを設計する必要あります。今時は各種金融機関がポートフォリオのシミュレーターを公開していますし、Excelを使って計算するという方法もありますが、ここでは例によってpythonポートフォリオの設計をやってみます。

実は、pythonにはnumpyやscipyといった科学計算のライブラリやcvxoptという最適化計算のライブラリがあるので、それらを利用すれば簡単かつ高速に最適なポートフォリオを計算することができます。

ちなみに、僕はファイナンス理論の専門家ではないので、ここに書いてある記述が正確であるという保証はできません。あしからず。

分散共分散行列を計算する

各投資対象のリスクと投資対象同士の相関係数が分かっているならば、分散共分散行列は行列計算で計算できます。
各投資対象の相関係数行列をC、各投資対象の標準偏差の対角行列をSとすると、分散共分散行列Σは

で計算できます。

pythonのコード例

import cvxopt
correl_mat = cvxopt.matrix([[1.0, 0.16, -0.06, -0.05, -0.29],
                            [0.16, 1.0, -0.25, 0.27, 0.73],
                            [-0.06, -0.25, 1.0, 0.56, 0.11],
                            [-0.05, 0.27, 0.56, 1.0, 0.91],
                            [-0.29, 0.73, 0.11, 0.91, 1.0]])
stddev_mat = cvxopt.spdiag([5.40, 22.15, 13.25, 19.59, 26.25])*0.01
sigma_mat = stddev_mat*correl_mat*stddev_mat

株価や投資信託の基準価額の時系列データから分散共分散行列を計算する場合は、numpy.covを使用すると簡単です。

import numpy
data1 = numpy.array([投資対象1の時系列データ], dtype=float)
data2 = numpy.array([投資対象2の時系列データ], dtype=float)

# リターンの計算
return_mat1 = data1[0:-1]/data1[1:]
return_mat2 = data2[0:-1]/data2[1:]

# 分散共分散行列の計算
sigma_mat = numpy.cov(numpy.vstack(return_mat1, return_mat2)) # 単位が年率でないので注意

効率的フロンティア上のポートフォリオを計算する

最適なポートフォリオを設計する問題は、ポートフォリオのリターンがある値以上という制約下でリスクを最小化するという問題と考えることができます。
これは、各種投資対象の期待リターンのベクトルをr、配分割合のベクトルをx、ポートフォリオの目標リターンをr0とすると、

となります。

これは、スラック変数sを導入すると、次のような最適化問題として定式化できます。

この最適化問題はcvxopt.solvers.coneqpを使って解くことができます。

import math
import cvxopt
import cvxopt.solvers
def calc_portfolio(sigma_mat, r, r0):
  """
  sigma_mat : 分散共分散行列 (cvxopt.base.matrix)
  r  : 各投資対象の期待リターンのベクトル (cvxopt.base.matrix または list)
  r0 : 目標リターン (float)
  """
  n = sigma_mat.size[0]
  minus_r = [-x for x in r]

  P = sigma_mat
  q = cvxopt.matrix(0.0, (n, 1))
  
  G = cvxopt.matrix([cvxopt.matrix([[1.0]*n, minus_r]).T, cvxopt.spdiag([-1.0]*n)])
  h = cvxopt.matrix([1.0, -r0] + [0 for i in xrange(n)])
  
  x = cvxopt.solvers.coneqp(P, q, G, h)['x']

  # (配分割合のリスト、ポートフォリオのリターン, ポートフォリオのリスク(標準偏差))
  return list(x), (r.T*x)[0], math.sqrt((x.T*P*x)[0])

ポートフォリオのリスクを計算する

正規分布の確率分布関数やその逆関数はscipy.stats.normで計算することが出来るので、これを使ってショートフォール確率やバリューアットリスクを計算します。

from scipy.stats import norm
def calc_shortfall_risk(r_min, r, sigma):
  return norm.cdf(r_min, r, sigma)

def calc_value_at_risk(probability, r, sigma):
  return norm.ppf(1.0-probability, r, sigma)

def calc_sharp_ratio(r_norisk, r, sigma):
  return (r - r_norisk)/sigma

その他

cvxoptの最適化計算の途中経過を表示しないようにするには、次のコードを実行します。

cvxopt.solvers.options['show_progress'] = False

参考URL

各種数値や計算結果の検算に下記のサイトを利用させていただきました。
アセットアロケーション分析 〜 投資信託のガイド|ファンドの海

Rubyメソッドのアラウンドエイリアスについて

Rubyで末尾再帰最適化をする。を読んで思ったこと。
このエントリやメタプログラミングRubyでメソッドのアラウンドエイリアスを定義する際に、古いメソッドに別名をつけてから新しいメソッドを定義しています。以下はメタプログラミングRubyの例。

class String
  alias :real_length :length

  def length
    real_length > 5 ? 'long' : 'short'
  end
end

この方法の問題点はエイリアスしたメソッド(この例の場合real_length)が残ってしまうことです。メソッドオブジェクトを使えばエイリアスしたメソッドを残すことなくアラウンドエイリアスを行うことができると思ったので、実際にやってみました。

以下のプログラムは、Rubyで末尾再帰最適化をする。のプログラムをメソッドエイリアスではなくメソッドオブジェクトを使って書いた例です。

#!/usr/bin/ruby -Ku
# -*- coding: utf-8 -*-

class Module
  def tco(name)
    continue = []
    first = true
    arguments = nil
    original_method = self.instance_method(name)

    proc = lambda do |*args|
      if first
        first = false
        while true
          result = original_method.bind(self).call(*args)
          if result.equal? continue
            args = arguments
          else
            first = true
            return result
          end
        end
      else
        arguments = args
        continue
      end
    end
    define_method name, proc
  end
end

class Sum
  def sum1(n, acc=0)
    if n == 0
      acc
    else
      sum1(n-1, acc+n)
    end
  end

  def sum2(n, acc=0)
    if n == 0
      acc
    else
      sum2(n-1, acc+n)
    end
  end
  tco :sum2
end

if $0 == __FILE__
  o = Sum.new
  p o.sum2(100000)
  p o.sum1(100000)
end

実行結果

> ./tco.rb 
5000050000
./tco.rb:37:in `sum1': stack level too deep (SystemStackError)
	from ./tco.rb:37:in `sum1'
	from ./tco.rb:54
  original_method = self.instance_method(name)

の部分で元のメソッドのUnboundMethodオブジェクトを取り出して、

  result = original_method.bind(self).call(*args)

で元のメソッドを呼び出しています。

このエントリを書いてる途中に、rubikitchさんが同じことをもっと詳しく解説してくれているのを発見したので、リンクを張っておきます。

ついでに、末尾再帰最適化のPythonの例を読んだときに、ネタに対して、「これってマルチスレッドで動かないよね」と思ってしまったので、Pythonの末尾再帰最適化のマルチスレッド対応版を書いておきます。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

class tail_recursive(object):
  def __init__(self, func):
    self.func = func
    self.firstcall = True
    self.CONTINUE = object()

  def __call__(self, *args, **kwd):
    if self.firstcall:
      func = self.func
      CONTINUE = self.CONTINUE
      self.firstcall = False
      try:
        while True:
          result = func(*args, **kwd)
          if result is CONTINUE: # update arguments
            args, kwd = self.argskwd
          else: # last call
            return result
      finally:
        self.firstcall = True
    else: # return the arguments of the tail call
      self.argskwd = args, kwd
      return self.CONTINUE

import threading
class tail_recursive_mt(object):
  def __init__(self, func):
    self.func = func
    self.local = threading.local()
    self.CONTINUE = object()

  def __call__(self, *args, **kwd):
    if not hasattr(self.local, "firstcall") or self.local.firstcall:
      func = self.func
      CONTINUE = self.CONTINUE
      self.local.firstcall = False
      try:
        while True:
          result = func(*args, **kwd)
          if result is CONTINUE: # update arguments
            args, kwd = self.local.argskwd
          else: # last call
            return result
      finally:
        self.local.firstcall = True
    else: # return the arguments of the tail call
      self.local.argskwd = args, kwd
      return self.CONTINUE    

@tail_recursive
def sum(n, acc=0):
  if n == 0:
    return acc
  else:
    return sum(n-1, acc+n)

@tail_recursive_mt
def sum_mt(n, acc=0):
  if n == 0:
    return acc
  else:
    return sum_mt(n-1, acc+n)  

class SumTest(threading.Thread):
  def run(self):
    try:
      assert(sum(100000) == 5000050000)
    except AssertionError, e:
      print "AssertionError"
    else:
      print "OK"

class SumMtTest(threading.Thread):
  def run(self):
    try:
      assert(sum_mt(100000) == 5000050000)
    except AssertionError, e:
      print "AssertionError"
    else:
      print "OK"
    
if __name__ == '__main__':
  print "="*60
  print "Single Thread Version"
  print "="*60
  threads = [SumTest() for i in xrange(5)]
  for t in threads:
    t.start()
  for t in threads:
    t.join()

  print "="*60
  print "Multi Thread Version"
  print "="*60
  threads = [SumMtTest() for i in xrange(5)]
  for t in threads:
    t.start()
  for t in threads:
    t.join()

実行結果

> ./tco.py
============================================================
Single Thread Version
============================================================
AssertionError
AssertionError
AssertionError
AssertionError
OK
============================================================
Multi Thread Version
============================================================
OK
OK
OK
OK
OK

Emacsのauto-insertでテンプレートを自動生成する

プログラムの単体テストを書く場合や、各種フレームワークを使用してプログラムを書く場合などは、ある規則にそってファイル名を付けると思います。(例えば単体プログラムのソースファイル名は"モジュール名_test.py"(Pythonの場合)など)
Emacsのauto-insertモードを使って、特定の名前のファイルを作成したときに、専用のテンプレートを自動生成してくれると便利だと思い、その方法を調べてみました。

ひげぽんさんのブログその元ネタにほぼ正解が紹介されているのですが、lispのマクロを使って色々な種類のテンプレートを簡単に作れるようにしたので、新しいエントリを書いてみました。

ここではPython単体テストのソースファイルの雛形を自動生成するためのElispを書いてみます。

まず、~/.emacs.d/insertにunittest_template.pyという名前のテンプレートファイルを用意します。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest
import %testee%

class %test_class%(unittest.TestCase):    
  def setUp(self):
    pass

  def tearDown(self):
    pass

if __name__ == '__main__':
  unittest.main()

次に、~/.emacs.elに次の設定を追加します。

(require 'autoinsert)
(auto-insert-mode)
(setq auto-insert-directory "~/.emacs.d/insert/")

(defun buffer-file-name-nondirectory ()
  (file-name-nondirectory (buffer-file-name)))
(defun python-module-name ()
  (substring (buffer-file-name-nondirectory) 0 -8))
(defun python-test-case-name ()
  (concat (capitalize (python-module-name)) "Test"))

(require 'cl)
(defmacro define-replace-function (name replacement-alist)
  `(defun ,name ()
     (loop for (string . func) in ,replacement-alist
	   do (progn
		(goto-char (point-min))
		(replace-string string (funcall func) nil)))
     (goto-char (point-max))
     (message "done.")))
(define-replace-function python-unittest-template
  '(("%testee%"     . (lambda () (python-module-name)))
    ("%test_class%" . (lambda () (python-test-case-name)))
    ))
(add-to-list 'auto-insert-alist '("_test\\.py$" . ["unittest_template.py" python-unittest-template]))

ポイントはこの部分です。

(require 'cl)
(defmacro define-replace-function (name replacement-alist)
  `(defun ,name ()
     (loop for (string . func) in ,replacement-alist
	   do (progn
		(goto-char (point-min))
		(replace-string string (funcall func) nil)))
     (goto-char (point-max))
     (message "done.")))
(define-replace-function python-unittest-template
  '(("%testee%"     . (lambda () (python-module-name)))
    ("%test_class%" . (lambda () (python-test-case-name)))
    ))
(add-to-list 'auto-insert-alist '("_test\\.py$" . ["unittest_template.py" python-unittest-template]))

まず、loopマクロを使用するため、(require 'cl)を行っています。
次に、define-replace-functionマクロを定義しています。このマクロは、関数名と(置換する文字列 . 置換後の文字列を返す関数)のリストを渡すと、テンプレート文字列を置換する関数を定義するマクロです。
続いて、定義したマクロを使用して、先ほど作成したunittest_template.pyのテンプレート文字列を置換する関数を定義しています。
最後の行では、_test.pyで終わるファイル名を作成しようとしたときに、ここで作成したテンプレートファイルと文字列置換関数を使用するようにしています。

define-replace-functionの第2引数に渡すリストを変えれば、色々なテンプレートを作成することができます。

PythonでWEBページをクローリングする時のTips

PythonでWEBページをクローリングする時のTipsをまとめてみました。

urllib2.urlopenのデフォルトのユーザーエージェントを変更する

PythonでURLを開くには、urllib2.urlopenします。
urllib2.urlopenは、デフォルトで"Python-urllib/(Pythonのバージョン)"というユーザーエージェントを使用しますが、Wikipediaなど一部のページではこのユーザーエージェントに対し403 Forbiddenを返してきます。以下のコードによってデフォルトのユーザーエージェントを変更すると、403エラーを回避することができます。

import urllib2
opener = urllib2.build_opener()
opener.addheaders = [('User-agent', 'your user agent string')]
urllib2.install_opener(opener)

HTMLからタグを除去する

PythonでHTMLをパースするライブラリではBeautifulSoupが有名ですが、HTMLのタグを除去する程度のタスクの場合、標準ライブラリのsgmllibでも比較的簡単に達成することができます。

import sgmllib
import StringIO

class TagRemover(sgmllib.SGMLParser):
  def remove_tag(self, some_html):
    self.stringio = StringIO.StringIO()
    self.feed(some_html)
    self.close()
    result = self.stringio.getvalue()
    self.stringio.close()
    return result

  def handle_data(self, data):
    self.stringio.write(data)

def clean_html(some_html):
  return TagRemover().remove_tag(some_html)

ElementTreeモジュールでXMLをパースする

PythonではXMLをパースするための標準ライブラリが複数ありますが、Python2.5以降で使用可能なElementTreeモジュールが一番使いやすいと思います。ただ、このモジュールのXML名前空間の扱いで少しハマったので、ここにメモにしておきます。

ElementTree.ElementTreeのfindやfindallなどのメソッドではタグの名前空間を{名前空間}で指定します。RSSAtom等からデータを抜き出す際には、タグの名前空間をきちんと指定する必要があります。

ElementTreeのサンプルプログラム

import urllib2
from xml.etree import ElementTree

url = 'http://d.hatena.ne.jp/saitodevel01/rss'
etree = ElementTree.fromstring(urllib2.urlopen(url).read())
print etree.find("item")
print etree.find("{http://purl.org/rss/1.0/}item")

実行結果

None
<Element {http://purl.org/rss/1.0/}item at 10101d248>

時刻情報をパースする

はてなRSSなどでは時刻情報がISO8601というフォーマットで出てきます。このフォーマットは標準ライブラリのtimeモジュールやdatetimeモジュールではパースすることが出来ないので、iso8601というライブラリを使用します。

iso8601のインストール

sudo easy_install.py iso8601
もしくは
sudo pip install iso8601

iso8601の使い方

>>> import iso8601
>>> iso8601.parse_date("2007-01-25T12:00:00Z")
datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>)
>>>

文字列をUnicodeに変換する際のTips

WEBページでは様々な文字列エンコードが使用されていますが、内部データはUnicodeで統一し、シリアライズutf-8で行うのが定石かと思います。文字列をUnicodeへ変換する際に文字列のエンコードを指定する必要がありますが、文字列エンコードを推定するためのchardetというライブラリを使えば、unicodeへの変換を簡単に行うことができます。

chardetのインストール

sudo easy_install.py chardet
もしくは
sudo pip install chardet

chardet.detectに文字列を渡すと、encodingとconfidenceの2つのキーが入ったディクショナリが返され、encodingキーの値はそのままunicode関数の第2引数に渡すことが出来ます。また、unicode関数の第3引数を'ignore'とすると、UnicodeDecodeErrorが発生する文字列を無視してくれます。

# -*- coding: utf-8 -*-
import chardet

def unicode2(raw_string):
  encoding = chardet.detect(raw_string)['encoding']
  if encoding is not None:
    return unicode(raw_string, encoding, 'ignore')
  else:
    raise ValueError, 'Can not detect encoding'

if __name__ == '__main__':
  text = 'あいうえお'
  print repr(text)
  print repr(unicode2(text))

Djangoのデータベースをsqlite3からMySQLに移行したときにハマったこと

sqlite3はデータベースが1つのファイルで簡潔するので、バックアップがファイルのコピーで済む等、扱いが非常に簡単です。
そのため僕はDjangoのアプリケーションを作るときは、sqlite3を使用しています。
しかし、sqlite3ではトランザクション実行時にファイル全体がロックされるので、複数プロセス・スレッドから同時に書き込みを行うような用途の場合、別のデータベースエンジンを使用した方が良いと思われます。

僕は今、一定時間ごとにwebページをクローリングして、得られたデータをデータベースに保存するようなプログラムを今書いています。
cronで指定したインターバル以内にプログラムが終了しなかった場合、古いプロセスと新しいプロセスで同時にデータベースへのアクセスが発生してしまい、うまくデータベースへの書き込みが出来ないという自体に陥ってしまいました。
そこで、データベースエンジンをsqlite3からMySQLに移行することにしました。


Djangoではデータベースのバックアップとリストアがmanage.pyで行え、

python manage.py dumpdata app_name --format=json > app.json

でデータベースのバックアップが、

python manage.py loaddata app.json

でデータベースのリストアがそれぞれ実行できます。

基本的には、Djangoのこの機能を使えばデータベースの移行は簡単に行えると思っていたのですが、sqlite3とMySQLの違いによって、修正が必要な箇所がいくつかありました。そこで、このエントリでは、僕がDjangoのデータベースをsqlite3からMySQLに移行したときにハマったポイントについてまとめたいと思います。


1. MySQLのvarchar型の長さの制限
DjangoのCharFieldのMySQLでの型はvarcharですが、MySQLではvarchar型の長さは255文字までに制限されているので、CharFieldのmax_lengthを256以上に設定していた場合は255以下に変更する必要があります。


2. MySQLの文字列比較の大文字・小文字の区別
MySQLではvarchar型の比較時に大文字と小文字をデフォルトで区別しません。
そのため文字列フィールドをuniqueに設定していた場合、sqlite3では問題なかったデータでも、MySQLに移行させた場合にエラーとなる場合があります。
そのため、MySQLのデータベースをsyncdbで初期化した後、こちらの参考エントリの方法をで、大文字・小文字の区別を行うようにテーブルを変更する必要があります。


3. MySQLのdatetime型はタイムゾーンが扱えない
MySQLのdatetime型はタイムゾーンが扱えないらしく、DateTimeFieldの値がタイムゾーン情報を持っていた場合に例外が発生します。
そのため、以下のようにタイムゾーン情報を消す必要があります。

new_datetime_obj = original_datetime_obj.replace(tzinfo=None)

serversman@vpsでキャッシュの使用量が0と表示される件について

serversman@vpsでfreeコマンドでメモリの使用量を確かめようとすると、キャッシュの使用量が0と表示されます。

> free -m
             total       used       free     shared    buffers     cached
Mem:          1024        234        789          0          0          0
-/+ buffers/cache:        234        789
Swap:            0          0          0

(ちなみにOSはCentOSです。serversman@vpsではDebianUbuntuが使えるますが、それらのOSでも同じ現象が発生するかどうかは分かりません。)

メモリを節約するために、キャッシュを行わないようLinuxカーネルのパラメータを変えているのかと考えましたが、もし本当にキャッシュが行われないならば、ファイルを読み込む度にディスクアクセスが発生するので、遅すぎて使い物にならないはずです。
(それともディスク自体も仮想化されているので問題ないのか)

本当にキャッシュが行われていないのかどうかを確かめるために、こちらのエントリで紹介されているプログラムをserversman@vps上で実行してみました。

> ./fincore_min /bin/tcsh 
pagesize: 4096
filesize: 341068
page num: 84
in memory pages: 0
> cat /bin/tcsh > /dev/null 
> ./fincore_min /bin/tcsh 
pagesize: 4096
filesize: 341068
page num: 84
in memory pages: 84

最初は/bin/tcshの内容はメモリに全く載っていませんが、catコマンドでファイルの内容を読み込んだあとは、全てメモリ上に乗っていることが分かります。
つまり、freeコマンドではキャッシュとして使用しているメモリの量が0と表示されるだけで、実際にはキャッシュが効いているということが分かります。

ちなみに、/proc/sys/vm/drop_cachesに書き込みを行ってキャッシュを解放しようとすると、エラーが発生してしまいました。
serversman@vpsではカーネルのパラメータの変更が制限されているようです。

su
echo 1 > /proc/sys/vm/drop_caches
bash: echo: write error: 許可されていない操作です

Matlabでポインタ(間接参照)を実現する

私は、このブログではPythonRubyのことばかり書いていますが、仕事ではMatlabという数値計算ソフトを使っています。
このMatlabというソフトは、ツールとしては非常に便利ですが、ことプログラミング言語として見ると、非常に大きな欠陥があります。
それは、間接参照、C言語でいうポインタに相当するものが無いということです。

PythonRubyJavaなどの言語では、プリミティブ型を除くと、変数は全てオブジェクトへの参照です。
そのため、関数やメソッド内で変数の状態を操作した場合、呼び出し側の変数も状態が更新されます。

# Pythonの例
def change_array(a, value):
  a[0] = value

a = [1]
assert(a[0] == 1)
change_array(a, 2)
assert(a[0] == 2)
print 'OK'

しかし、Matlabの場合、変数は値なので、関数内で変数を操作しても呼び出し側の変数は更新されません。

function change_array_by_func()
  a = [1];
  assert(a(1) == 1, 'a(1) == 1');
  change_array(a, 2);
  assert(a(1) == 2, 'a(1) == 2');  % ここでエラー
  disp('OK');
end

function change_array(a, value)
  a(1) = value;
end

一応、Matlabでもhandleというクラスから派生したクラスのオブジェクトは参照型となるらしいですが、その方法の場合、いちいち新しいクラスを作らなければいけないので非常に不便です。
また、比較的新しいバージョンのMatlabでないと、その機能すら使えないようです。

ここで、以前読んだLET OVER LAMBDAという本に書いてある、クロージャでポインタを模倣するテクニックを思い出しました。
LET OVER LAMBDAに書いてある方法とは少し違いますが、以下のプログラムでMatlabで間接参照を実現することが出来ます。

function self = Pointer(pointee)
  pointer = pointee;
  self = struct;
  function retval = get()
    retval = pointer;
  end
  function retval = set(new_pointee)
    pointer = new_pointee;
    retval = self;
  end
  self.get = @get;
  self.set = @set;
end

次にPointerのデモプログラムを示します。

function test_Pointer()
  p1 = Pointer(1);
  p2 = p1;
  assert(p1.get() == 1, 'p1.get() == 1');
  assert(p2.get() == 1, 'p2.get() == 1');    
  change_pointer(p1, 2);
  assert(p1.get() == 2, 'p1.get() == 2');
  assert(p2.get() == 2, 'p2.get() == 2');  
  disp('OK');
end

function change_pointer(p, value)
  p.set(value);
end

関数内で行った変更が呼び出し元でも反映されており、なおかつp1のコピーであるp2にも変更が反映されています。

ちなみに、先ほども紹介したLET OVER LAMBDAという本はLispクロージャとマクロについて書かれた本です。
プログラミングの書籍としては難解な部類に入ると思いますが、第1章のクロージャの部分だけでも十分元がとれるほどの内容です。

追記 (2012年2月11日)

MatlabJavaのライブラリを呼び出すことができますが、Javaのオブジェクトは参照型なので、例えばArrayListを使って間接参照を実現することが出来ます。こちらの方法のほうが上記の方法よりもおそらく効率もよいでしょう。ただし、関数ハンドラなどMatlabの一部のデータ型は、Javaのコレクション型に入れることができないようです。