pythonでファイナンス計算
先日会社で退職金の説明会がありました。それによると退職金の一部を確定拠出年金として自分で運用しなくてはいけないとのことで、ポートフォリオを設計する必要あります。今時は各種金融機関がポートフォリオのシミュレーターを公開していますし、Excelを使って計算するという方法もありますが、ここでは例によってpythonでポートフォリオの設計をやってみます。
実は、pythonにはnumpyやscipyといった科学計算のライブラリやcvxoptという最適化計算のライブラリがあるので、それらを利用すれば簡単かつ高速に最適なポートフォリオを計算することができます。
ちなみに、僕はファイナンス理論の専門家ではないので、ここに書いてある記述が正確であるという保証はできません。あしからず。
分散共分散行列を計算する
各投資対象のリスクと投資対象同士の相関係数が分かっているならば、分散共分散行列は行列計算で計算できます。
各投資対象の相関係数行列をC、各投資対象の標準偏差の対角行列をSとすると、分散共分散行列Σは
で計算できます。
pythonのコード例
import cvxopt correl_mat = cvxopt.matrix([[1.0, 0.16, -0.06, -0.05, -0.29], [0.16, 1.0, -0.25, 0.27, 0.73], [-0.06, -0.25, 1.0, 0.56, 0.11], [-0.05, 0.27, 0.56, 1.0, 0.91], [-0.29, 0.73, 0.11, 0.91, 1.0]]) stddev_mat = cvxopt.spdiag([5.40, 22.15, 13.25, 19.59, 26.25])*0.01 sigma_mat = stddev_mat*correl_mat*stddev_mat
株価や投資信託の基準価額の時系列データから分散共分散行列を計算する場合は、numpy.covを使用すると簡単です。
import numpy data1 = numpy.array([投資対象1の時系列データ], dtype=float) data2 = numpy.array([投資対象2の時系列データ], dtype=float) # リターンの計算 return_mat1 = data1[0:-1]/data1[1:] return_mat2 = data2[0:-1]/data2[1:] # 分散共分散行列の計算 sigma_mat = numpy.cov(numpy.vstack(return_mat1, return_mat2)) # 単位が年率でないので注意
効率的フロンティア上のポートフォリオを計算する
最適なポートフォリオを設計する問題は、ポートフォリオのリターンがある値以上という制約下でリスクを最小化するという問題と考えることができます。
これは、各種投資対象の期待リターンのベクトルをr、配分割合のベクトルをx、ポートフォリオの目標リターンをr0とすると、
となります。
これは、スラック変数sを導入すると、次のような最適化問題として定式化できます。
この最適化問題はcvxopt.solvers.coneqpを使って解くことができます。
import math import cvxopt import cvxopt.solvers def calc_portfolio(sigma_mat, r, r0): """ sigma_mat : 分散共分散行列 (cvxopt.base.matrix) r : 各投資対象の期待リターンのベクトル (cvxopt.base.matrix または list) r0 : 目標リターン (float) """ n = sigma_mat.size[0] minus_r = [-x for x in r] P = sigma_mat q = cvxopt.matrix(0.0, (n, 1)) G = cvxopt.matrix([cvxopt.matrix([[1.0]*n, minus_r]).T, cvxopt.spdiag([-1.0]*n)]) h = cvxopt.matrix([1.0, -r0] + [0 for i in xrange(n)]) x = cvxopt.solvers.coneqp(P, q, G, h)['x'] # (配分割合のリスト、ポートフォリオのリターン, ポートフォリオのリスク(標準偏差)) return list(x), (r.T*x)[0], math.sqrt((x.T*P*x)[0])
ポートフォリオのリスクを計算する
正規分布の確率分布関数やその逆関数はscipy.stats.normで計算することが出来るので、これを使ってショートフォール確率やバリューアットリスクを計算します。
from scipy.stats import norm def calc_shortfall_risk(r_min, r, sigma): return norm.cdf(r_min, r, sigma) def calc_value_at_risk(probability, r, sigma): return norm.ppf(1.0-probability, r, sigma) def calc_sharp_ratio(r_norisk, r, sigma): return (r - r_norisk)/sigma
その他
cvxoptの最適化計算の途中経過を表示しないようにするには、次のコードを実行します。
cvxopt.solvers.options['show_progress'] = False
参考URL
各種数値や計算結果の検算に下記のサイトを利用させていただきました。
アセットアロケーション分析 〜 投資信託のガイド|ファンドの海
Rubyメソッドのアラウンドエイリアスについて
Rubyで末尾再帰最適化をする。を読んで思ったこと。
このエントリやメタプログラミングRubyでメソッドのアラウンドエイリアスを定義する際に、古いメソッドに別名をつけてから新しいメソッドを定義しています。以下はメタプログラミングRubyの例。
class String alias :real_length :length def length real_length > 5 ? 'long' : 'short' end end
この方法の問題点はエイリアスしたメソッド(この例の場合real_length)が残ってしまうことです。メソッドオブジェクトを使えばエイリアスしたメソッドを残すことなくアラウンドエイリアスを行うことができると思ったので、実際にやってみました。
以下のプログラムは、Rubyで末尾再帰最適化をする。のプログラムをメソッドエイリアスではなくメソッドオブジェクトを使って書いた例です。
#!/usr/bin/ruby -Ku # -*- coding: utf-8 -*- class Module def tco(name) continue = [] first = true arguments = nil original_method = self.instance_method(name) proc = lambda do |*args| if first first = false while true result = original_method.bind(self).call(*args) if result.equal? continue args = arguments else first = true return result end end else arguments = args continue end end define_method name, proc end end class Sum def sum1(n, acc=0) if n == 0 acc else sum1(n-1, acc+n) end end def sum2(n, acc=0) if n == 0 acc else sum2(n-1, acc+n) end end tco :sum2 end if $0 == __FILE__ o = Sum.new p o.sum2(100000) p o.sum1(100000) end
実行結果
> ./tco.rb 5000050000 ./tco.rb:37:in `sum1': stack level too deep (SystemStackError) from ./tco.rb:37:in `sum1' from ./tco.rb:54
original_method = self.instance_method(name)
の部分で元のメソッドのUnboundMethodオブジェクトを取り出して、
result = original_method.bind(self).call(*args)
で元のメソッドを呼び出しています。
このエントリを書いてる途中に、rubikitchさんが同じことをもっと詳しく解説してくれているのを発見したので、リンクを張っておきます。
ついでに、末尾再帰最適化のPythonの例を読んだときに、ネタに対して、「これってマルチスレッドで動かないよね」と思ってしまったので、Pythonの末尾再帰最適化のマルチスレッド対応版を書いておきます。
#!/usr/bin/env python # -*- coding: utf-8 -*- class tail_recursive(object): def __init__(self, func): self.func = func self.firstcall = True self.CONTINUE = object() def __call__(self, *args, **kwd): if self.firstcall: func = self.func CONTINUE = self.CONTINUE self.firstcall = False try: while True: result = func(*args, **kwd) if result is CONTINUE: # update arguments args, kwd = self.argskwd else: # last call return result finally: self.firstcall = True else: # return the arguments of the tail call self.argskwd = args, kwd return self.CONTINUE import threading class tail_recursive_mt(object): def __init__(self, func): self.func = func self.local = threading.local() self.CONTINUE = object() def __call__(self, *args, **kwd): if not hasattr(self.local, "firstcall") or self.local.firstcall: func = self.func CONTINUE = self.CONTINUE self.local.firstcall = False try: while True: result = func(*args, **kwd) if result is CONTINUE: # update arguments args, kwd = self.local.argskwd else: # last call return result finally: self.local.firstcall = True else: # return the arguments of the tail call self.local.argskwd = args, kwd return self.CONTINUE @tail_recursive def sum(n, acc=0): if n == 0: return acc else: return sum(n-1, acc+n) @tail_recursive_mt def sum_mt(n, acc=0): if n == 0: return acc else: return sum_mt(n-1, acc+n) class SumTest(threading.Thread): def run(self): try: assert(sum(100000) == 5000050000) except AssertionError, e: print "AssertionError" else: print "OK" class SumMtTest(threading.Thread): def run(self): try: assert(sum_mt(100000) == 5000050000) except AssertionError, e: print "AssertionError" else: print "OK" if __name__ == '__main__': print "="*60 print "Single Thread Version" print "="*60 threads = [SumTest() for i in xrange(5)] for t in threads: t.start() for t in threads: t.join() print "="*60 print "Multi Thread Version" print "="*60 threads = [SumMtTest() for i in xrange(5)] for t in threads: t.start() for t in threads: t.join()
実行結果
> ./tco.py ============================================================ Single Thread Version ============================================================ AssertionError AssertionError AssertionError AssertionError OK ============================================================ Multi Thread Version ============================================================ OK OK OK OK OK
Emacsのauto-insertでテンプレートを自動生成する
プログラムの単体テストを書く場合や、各種フレームワークを使用してプログラムを書く場合などは、ある規則にそってファイル名を付けると思います。(例えば単体プログラムのソースファイル名は"モジュール名_test.py"(Pythonの場合)など)
Emacsのauto-insertモードを使って、特定の名前のファイルを作成したときに、専用のテンプレートを自動生成してくれると便利だと思い、その方法を調べてみました。
ひげぽんさんのブログとその元ネタにほぼ正解が紹介されているのですが、lispのマクロを使って色々な種類のテンプレートを簡単に作れるようにしたので、新しいエントリを書いてみました。
ここではPythonの単体テストのソースファイルの雛形を自動生成するためのElispを書いてみます。
まず、~/.emacs.d/insertにunittest_template.pyという名前のテンプレートファイルを用意します。
#!/usr/bin/env python # -*- coding: utf-8 -*- import unittest import %testee% class %test_class%(unittest.TestCase): def setUp(self): pass def tearDown(self): pass if __name__ == '__main__': unittest.main()
次に、~/.emacs.elに次の設定を追加します。
(require 'autoinsert) (auto-insert-mode) (setq auto-insert-directory "~/.emacs.d/insert/") (defun buffer-file-name-nondirectory () (file-name-nondirectory (buffer-file-name))) (defun python-module-name () (substring (buffer-file-name-nondirectory) 0 -8)) (defun python-test-case-name () (concat (capitalize (python-module-name)) "Test")) (require 'cl) (defmacro define-replace-function (name replacement-alist) `(defun ,name () (loop for (string . func) in ,replacement-alist do (progn (goto-char (point-min)) (replace-string string (funcall func) nil))) (goto-char (point-max)) (message "done."))) (define-replace-function python-unittest-template '(("%testee%" . (lambda () (python-module-name))) ("%test_class%" . (lambda () (python-test-case-name))) )) (add-to-list 'auto-insert-alist '("_test\\.py$" . ["unittest_template.py" python-unittest-template]))
ポイントはこの部分です。
(require 'cl) (defmacro define-replace-function (name replacement-alist) `(defun ,name () (loop for (string . func) in ,replacement-alist do (progn (goto-char (point-min)) (replace-string string (funcall func) nil))) (goto-char (point-max)) (message "done."))) (define-replace-function python-unittest-template '(("%testee%" . (lambda () (python-module-name))) ("%test_class%" . (lambda () (python-test-case-name))) )) (add-to-list 'auto-insert-alist '("_test\\.py$" . ["unittest_template.py" python-unittest-template]))
まず、loopマクロを使用するため、(require 'cl)を行っています。
次に、define-replace-functionマクロを定義しています。このマクロは、関数名と(置換する文字列 . 置換後の文字列を返す関数)のリストを渡すと、テンプレート文字列を置換する関数を定義するマクロです。
続いて、定義したマクロを使用して、先ほど作成したunittest_template.pyのテンプレート文字列を置換する関数を定義しています。
最後の行では、_test.pyで終わるファイル名を作成しようとしたときに、ここで作成したテンプレートファイルと文字列置換関数を使用するようにしています。
define-replace-functionの第2引数に渡すリストを変えれば、色々なテンプレートを作成することができます。
PythonでWEBページをクローリングする時のTips
PythonでWEBページをクローリングする時のTipsをまとめてみました。
urllib2.urlopenのデフォルトのユーザーエージェントを変更する
PythonでURLを開くには、urllib2.urlopenします。
urllib2.urlopenは、デフォルトで"Python-urllib/(Pythonのバージョン)"というユーザーエージェントを使用しますが、Wikipediaなど一部のページではこのユーザーエージェントに対し403 Forbiddenを返してきます。以下のコードによってデフォルトのユーザーエージェントを変更すると、403エラーを回避することができます。
import urllib2 opener = urllib2.build_opener() opener.addheaders = [('User-agent', 'your user agent string')] urllib2.install_opener(opener)
HTMLからタグを除去する
PythonでHTMLをパースするライブラリではBeautifulSoupが有名ですが、HTMLのタグを除去する程度のタスクの場合、標準ライブラリのsgmllibでも比較的簡単に達成することができます。
import sgmllib import StringIO class TagRemover(sgmllib.SGMLParser): def remove_tag(self, some_html): self.stringio = StringIO.StringIO() self.feed(some_html) self.close() result = self.stringio.getvalue() self.stringio.close() return result def handle_data(self, data): self.stringio.write(data) def clean_html(some_html): return TagRemover().remove_tag(some_html)
ElementTreeモジュールでXMLをパースする
PythonではXMLをパースするための標準ライブラリが複数ありますが、Python2.5以降で使用可能なElementTreeモジュールが一番使いやすいと思います。ただ、このモジュールのXML名前空間の扱いで少しハマったので、ここにメモにしておきます。
ElementTree.ElementTreeのfindやfindallなどのメソッドではタグの名前空間を{名前空間}で指定します。RSSやAtom等からデータを抜き出す際には、タグの名前空間をきちんと指定する必要があります。
ElementTreeのサンプルプログラム
import urllib2 from xml.etree import ElementTree url = 'http://d.hatena.ne.jp/saitodevel01/rss' etree = ElementTree.fromstring(urllib2.urlopen(url).read()) print etree.find("item") print etree.find("{http://purl.org/rss/1.0/}item")
実行結果
None <Element {http://purl.org/rss/1.0/}item at 10101d248>
時刻情報をパースする
はてなのRSSなどでは時刻情報がISO8601というフォーマットで出てきます。このフォーマットは標準ライブラリのtimeモジュールやdatetimeモジュールではパースすることが出来ないので、iso8601というライブラリを使用します。
iso8601のインストール
sudo easy_install.py iso8601 もしくは sudo pip install iso8601
iso8601の使い方
>>> import iso8601 >>> iso8601.parse_date("2007-01-25T12:00:00Z") datetime.datetime(2007, 1, 25, 12, 0, tzinfo=<iso8601.iso8601.Utc ...>) >>>
文字列をUnicodeに変換する際のTips
WEBページでは様々な文字列エンコードが使用されていますが、内部データはUnicodeで統一し、シリアライズはutf-8で行うのが定石かと思います。文字列をUnicodeへ変換する際に文字列のエンコードを指定する必要がありますが、文字列エンコードを推定するためのchardetというライブラリを使えば、unicodeへの変換を簡単に行うことができます。
chardetのインストール
sudo easy_install.py chardet もしくは sudo pip install chardet
chardet.detectに文字列を渡すと、encodingとconfidenceの2つのキーが入ったディクショナリが返され、encodingキーの値はそのままunicode関数の第2引数に渡すことが出来ます。また、unicode関数の第3引数を'ignore'とすると、UnicodeDecodeErrorが発生する文字列を無視してくれます。
# -*- coding: utf-8 -*- import chardet def unicode2(raw_string): encoding = chardet.detect(raw_string)['encoding'] if encoding is not None: return unicode(raw_string, encoding, 'ignore') else: raise ValueError, 'Can not detect encoding' if __name__ == '__main__': text = 'あいうえお' print repr(text) print repr(unicode2(text))
Djangoのデータベースをsqlite3からMySQLに移行したときにハマったこと
sqlite3はデータベースが1つのファイルで簡潔するので、バックアップがファイルのコピーで済む等、扱いが非常に簡単です。
そのため僕はDjangoのアプリケーションを作るときは、sqlite3を使用しています。
しかし、sqlite3ではトランザクション実行時にファイル全体がロックされるので、複数プロセス・スレッドから同時に書き込みを行うような用途の場合、別のデータベースエンジンを使用した方が良いと思われます。
僕は今、一定時間ごとにwebページをクローリングして、得られたデータをデータベースに保存するようなプログラムを今書いています。
cronで指定したインターバル以内にプログラムが終了しなかった場合、古いプロセスと新しいプロセスで同時にデータベースへのアクセスが発生してしまい、うまくデータベースへの書き込みが出来ないという自体に陥ってしまいました。
そこで、データベースエンジンをsqlite3からMySQLに移行することにしました。
Djangoではデータベースのバックアップとリストアがmanage.pyで行え、
python manage.py dumpdata app_name --format=json > app.json
でデータベースのバックアップが、
python manage.py loaddata app.json
でデータベースのリストアがそれぞれ実行できます。
基本的には、Djangoのこの機能を使えばデータベースの移行は簡単に行えると思っていたのですが、sqlite3とMySQLの違いによって、修正が必要な箇所がいくつかありました。そこで、このエントリでは、僕がDjangoのデータベースをsqlite3からMySQLに移行したときにハマったポイントについてまとめたいと思います。
1. MySQLのvarchar型の長さの制限
DjangoのCharFieldのMySQLでの型はvarcharですが、MySQLではvarchar型の長さは255文字までに制限されているので、CharFieldのmax_lengthを256以上に設定していた場合は255以下に変更する必要があります。
2. MySQLの文字列比較の大文字・小文字の区別
MySQLではvarchar型の比較時に大文字と小文字をデフォルトで区別しません。
そのため文字列フィールドをuniqueに設定していた場合、sqlite3では問題なかったデータでも、MySQLに移行させた場合にエラーとなる場合があります。
そのため、MySQLのデータベースをsyncdbで初期化した後、こちらの参考エントリの方法をで、大文字・小文字の区別を行うようにテーブルを変更する必要があります。
3. MySQLのdatetime型はタイムゾーンが扱えない
MySQLのdatetime型はタイムゾーンが扱えないらしく、DateTimeFieldの値がタイムゾーン情報を持っていた場合に例外が発生します。
そのため、以下のようにタイムゾーン情報を消す必要があります。
new_datetime_obj = original_datetime_obj.replace(tzinfo=None)
serversman@vpsでキャッシュの使用量が0と表示される件について
serversman@vpsでfreeコマンドでメモリの使用量を確かめようとすると、キャッシュの使用量が0と表示されます。
> free -m total used free shared buffers cached Mem: 1024 234 789 0 0 0 -/+ buffers/cache: 234 789 Swap: 0 0 0
(ちなみにOSはCentOSです。serversman@vpsではDebianやUbuntuが使えるますが、それらのOSでも同じ現象が発生するかどうかは分かりません。)
メモリを節約するために、キャッシュを行わないようLinuxカーネルのパラメータを変えているのかと考えましたが、もし本当にキャッシュが行われないならば、ファイルを読み込む度にディスクアクセスが発生するので、遅すぎて使い物にならないはずです。
(それともディスク自体も仮想化されているので問題ないのか)
本当にキャッシュが行われていないのかどうかを確かめるために、こちらのエントリで紹介されているプログラムをserversman@vps上で実行してみました。
> ./fincore_min /bin/tcsh pagesize: 4096 filesize: 341068 page num: 84 in memory pages: 0 > cat /bin/tcsh > /dev/null > ./fincore_min /bin/tcsh pagesize: 4096 filesize: 341068 page num: 84 in memory pages: 84
最初は/bin/tcshの内容はメモリに全く載っていませんが、catコマンドでファイルの内容を読み込んだあとは、全てメモリ上に乗っていることが分かります。
つまり、freeコマンドではキャッシュとして使用しているメモリの量が0と表示されるだけで、実際にはキャッシュが効いているということが分かります。
ちなみに、/proc/sys/vm/drop_cachesに書き込みを行ってキャッシュを解放しようとすると、エラーが発生してしまいました。
serversman@vpsではカーネルのパラメータの変更が制限されているようです。
su echo 1 > /proc/sys/vm/drop_caches bash: echo: write error: 許可されていない操作です
Matlabでポインタ(間接参照)を実現する
私は、このブログではPythonやRubyのことばかり書いていますが、仕事ではMatlabという数値計算ソフトを使っています。
このMatlabというソフトは、ツールとしては非常に便利ですが、ことプログラミング言語として見ると、非常に大きな欠陥があります。
それは、間接参照、C言語でいうポインタに相当するものが無いということです。
Python・Ruby・Javaなどの言語では、プリミティブ型を除くと、変数は全てオブジェクトへの参照です。
そのため、関数やメソッド内で変数の状態を操作した場合、呼び出し側の変数も状態が更新されます。
# Pythonの例 def change_array(a, value): a[0] = value a = [1] assert(a[0] == 1) change_array(a, 2) assert(a[0] == 2) print 'OK'
しかし、Matlabの場合、変数は値なので、関数内で変数を操作しても呼び出し側の変数は更新されません。
function change_array_by_func() a = [1]; assert(a(1) == 1, 'a(1) == 1'); change_array(a, 2); assert(a(1) == 2, 'a(1) == 2'); % ここでエラー disp('OK'); end function change_array(a, value) a(1) = value; end
一応、Matlabでもhandleというクラスから派生したクラスのオブジェクトは参照型となるらしいですが、その方法の場合、いちいち新しいクラスを作らなければいけないので非常に不便です。
また、比較的新しいバージョンのMatlabでないと、その機能すら使えないようです。
ここで、以前読んだLET OVER LAMBDAという本に書いてある、クロージャでポインタを模倣するテクニックを思い出しました。
LET OVER LAMBDAに書いてある方法とは少し違いますが、以下のプログラムでMatlabで間接参照を実現することが出来ます。
function self = Pointer(pointee) pointer = pointee; self = struct; function retval = get() retval = pointer; end function retval = set(new_pointee) pointer = new_pointee; retval = self; end self.get = @get; self.set = @set; end
次にPointerのデモプログラムを示します。
function test_Pointer() p1 = Pointer(1); p2 = p1; assert(p1.get() == 1, 'p1.get() == 1'); assert(p2.get() == 1, 'p2.get() == 1'); change_pointer(p1, 2); assert(p1.get() == 2, 'p1.get() == 2'); assert(p2.get() == 2, 'p2.get() == 2'); disp('OK'); end function change_pointer(p, value) p.set(value); end
関数内で行った変更が呼び出し元でも反映されており、なおかつp1のコピーであるp2にも変更が反映されています。
ちなみに、先ほども紹介したLET OVER LAMBDAという本はLispのクロージャとマクロについて書かれた本です。
プログラミングの書籍としては難解な部類に入ると思いますが、第1章のクロージャの部分だけでも十分元がとれるほどの内容です。