Numpy

公式HP:https://numpy.org/

wikiによれば

Pythonは動的型付け言語であるため、プログラムを柔軟に記述できる一方で、 純粋にPythonのみを使って数値計算を行うと、 ほとんどの場合C言語やJavaなどの静的型付き言語で書いたコードに比べて大幅に計算時間がかかる。 そこでNumPyは、Pythonに対して型付きの多次元配列オブジェクト (numpy.ndarray) と、 その配列に対する多数の演算関数や操作関数を提供することにより、 この問題を解決しようとしている。NumPyの内部はC言語 (およびFortran)によって実装されているため 非常に高速に動作する。 したがって、目的の処理を、大きな多次元配列(ベクトル・行列など)に対する演算として記述できれば (ベクトル化できれば)、計算時間の大半はPythonではなく C言語によるネイティブコードで実行されるようになり大幅に高速化する。
ということで、Pythonで解析処理を行うならNumpyは必須である。 また、Scipyやpandasライブラリの動作にはNumpyライブラリが必須であり(依存関係がある)、 Numpyを知っておくことには意味があるだろう。

ndarray配列の生成

「ndarray」は「N-dimensional array」(N次元配列)の略であり、 NumPyが提供する配列形式のオブジェクトである。 Pythonが標準でもつリストは、あらゆるオブジェクトを格納できたが ndarrayは(原則)1種類の型しか格納できない。 ndarrayは生成時に格納する型の指定が必要になる。 (数値型なら省略可能。というより数値型しか格納させない。)

import numpy as np

# np.array(リスト)で生成
a0 = np.array([1, 2, 3])        # 1次元配列生成
print(a0)
print(f'shape:{a0.shape}, size:{a0.size}, ndim:{a0.ndim}')

a1 = np.array([[1, 2], [3, 4]]) # 2次元配列生成
print(a1)
print(f'shape:{a1.shape}, size:{a1.size}, ndim:{a1.ndim}')

# その他の配列生成方法(いろいろある)
print(np.zeros(5))          # 初期値0、shape(5,)の配列生成
print(np.zeros((2, 5)))     # 初期値0、shape(2, 5)の配列生成
print(np.ones(3))           # 初期値1、shape(5,)の配列生成
print(np.empty(3))          # 初期値未定、shape(5,)の配列生成
print(np.arange(5))         # -> [0, 1, 2, 3, 4]
print(np.arange(2, 10, 3))  # -> [2, 5, 8]
print(np.linspace(0,5,100)) # 0~5までの間に等間隔に100個の要素

# 要素を取得するにはインデックスを指定する
print(a0[1])
print(a1[1, 0])     # a1[1][0]でも可(リストではa1[1, 0]は不可である)

ndarray配列の演算

ndarrayとリストでは+演算子や*演算子を作用させたときの振舞いが異なる。 ndarrayは行列だと思っておくと良い。

import numpy as np

list0 = [1, 3, 5, 7, 9, 11]
list1 = [0, 2, 4, 6, 8, 10]
list3 = [2, 3, 5, 7, 11]
arr0 = np.array(list0)  # 1次元配列(テンソル)なのでベクトルと同じ
arr1 = np.array(list1)
arr3 = np.array(list3)

# 四則演算
print(list0 + list1)    # リストの連結
print(arr0 + arr1)      # 各要素ごとに足す
print(arr0 + 5)         # 全要素に+5
print(arr0 * 5)         # 全要素にx5
print(arr0 * arr1)      # 各要素ごとに掛ける
#print(arr0 + arr3)      # -> Error shapeが異なる
#print(arr0 * arr3)      # -> Error shapeが異なる

# 2次元リスト
list4 = [
    [11, 12, 13, 14, 15],
    [21, 22, 23, 24, 25],
    [31, 32, 33, 34, 35],
    [41, 42, 43, 44, 45],
    [51, 52, 53, 54, 55]]
arr4 = np.array(list4)

print(array2 + 5)       # 全要素に+5
print(array2 * 5)       # 全要素に*5
print(arr4 + arr3)      # 各行に+ arr3
print(arr4 * arr3)      # 各行に* arr3

16、17行目はshape(6, )とshape(5, )の演算なのでエラーとなるが、 30、31行目はshape(5, 5)とshape(5, )演算で列数が一致しているためエラーはでない。

ファイル(csv)読み込みとスライス

Pythonの標準だけでcsvデータの解析は面倒であった。 NumPyはファイル読み込み機能をもち、その際にデータ型を指定したり、 読み込む行や列を指定することができる。 さらにndarrayは多次元配列のスライスが容易に行える。 (リストなら内包表記で書くしかない。) 以下のテストデータ「sample_data2.txt」を読み込んでみる。 (sample_data2.txtはsample_data1.txtの生徒数を増やしたもの。)

import numpy as np

path = 'sample_data2.txt'
data = np.loadtxt(              # loadtxt()でファイル読み込み
    path,                       # ファイルパス
    delimiter = '\t',           # 区切り文字
    dtype = 'int64',            # データ型(int64、float64、float128など)
    skiprows = 1,               # 読み込まない行(先頭から何行skipするか)
    usecols  = [3, 4, 5, 6, 7], # 読み込む列
    encoding = 'utf8')          # エンコード指定
#print(data)

# スライス
mats = data[:, 0]   # dataの0列目(1d配列)
engs = data[:, 1]   # dataの1列目(1d配列)
scis = data[:, 2]   # dataの2列目(1d配列)
socs = data[:, 3]   # dataの3列目(1d配列)
jpns = data[:, 4]   # dataの4列目(1d配列)
#print(mats)

# 平均点と標準偏差
print(f'数学の平均点:{mats.mean():.2f} +- {mats.std():5.2f}')
print(f'英語の平均点:{engs.mean():.2f} +- {engs.std():5.2f}')
print(f'理科の平均点:{scis.mean():.2f} +- {scis.std():5.2f}')
print(f'社会の平均点:{socs.mean():.2f} +- {socs.std():5.2f}')
print(f'国語の平均点:{jpns.mean():.2f} +- {jpns.std():5.2f}')

4行目でファイルを読み込んでいる。 (今回は見やすくするため改行したが、1行で書いても問題ない。) "dtype"はデータ型を指定するが、これは"skiplows"および"usecols"で指定した 行と列を考慮した後に読み込むデータ型を指定する。 (skiplowsは省略するとデフォルトが0のため、 ヘッダー('id', '氏名', 'クラス',…)の情報も読み込む。 この状態でdtype='int64'は文字列型を整数型で読み込もうとしているためエラーがでる。)

14~18行目でスライス処理して、科目ごとのデータを抽出している。 基本的な書き方は「ndarray[rowstart:rowstop, colstart:colstop]」である。 さらに、22行目以降は、平均を求めるmean()と標準偏差を求めるstd()を利用している。 ndarrayには他にもmax()やmin()はもちろん、分散や最頻値などの統計量を求める関数がある。

import numpy as np

list0 = [
    [11, 12, 13, 14, 15],
    [21, 22, 23, 24, 25],
    [31, 32, 33, 34, 35],
    [41, 42, 43, 44, 45],
    [51, 52, 53, 54, 55]]
arr0 = np.array(list0)

# スライスの例
print(arr0[0:2, 1:3])   # 0~1行目、1~2列目を取得 -> 2x2行列
print(arr0[0:2, 1: ])   # 0~1行目、1~最終列目を取得 -> 2x4行列
print(arr0[ :2, 1:4])   # 先頭~1行目、1~3列目を取得 -> 2x3行列
print(arr0[0:2,  : ])   # 0~1行目、全列を取得 -> 2x5行列
print(arr0[ : , 1:3])   # 全行、1~2列目を取得 -> 5x2行列
print(arr0[0:2, 3])     # 0~1行目、3列目を取得 -> 1次元配列(要素数2)
print(arr0[4, 1:4])     # 4行目、1~3列目を取得 -> 1次元配列(要素数3)
print(arr0[:, 3])       # 3列目を取得 -> 1次元配列(要素数5)
print(arr0[4, :])       # 4行目を取得 -> 1次元配列(要素数5)

numpy_loadtext.pyではファイル読み込みの際に用いる行と列を指定したが、 別の方法もあり、それは一旦全部をオブジェクト型として読み込み (型の指定をせずに読み込み) 、その後astype()メソッドで数値型に変換することもできる。 この方法だと型変換の手間が掛かるが、ヘッダーの情報を捨てずに読み込めるという利点もある。 以下その例である。

import numpy as np

path = 'sample_data2.txt'
data = np.loadtxt(path, delimiter = '\t', dtype = 'object', encoding = 'utf8')

subject_names = list(data[0, 3:])
subject_data_list = [data[1:, col_idx].astype(np.int32) for col_idx in range(3,8)]

for subject_name, subject_data in zip(subject_names, subject_data_list):
    print(f'{subject_name}の平均点:{subject_data.mean():.2f} +- {subject_data.std():5.2f}')

# 折角なので相関係数計算
print('相関係数計算\n 英,数,国,理,社')
print(np.corrcoef(subject_data_list))

上記コードではnumpy_loadtext.pyに追加して、相関係数を「np.corrcoef()」で求めている。 2x2行列の形式で数値が返ってくると思うがこれの見方は
英語数学国語理科社会
英語1英-数英-国英-理英-社
数学英-数1数-国数-理数-社
国語英-国数-国1国-理国-社
理科英-理数-理国-理1理-社
社会英-社数-社国-社理-社1
のペアでの相関係数である。(同じ科目の相関係数は1である。)

ndarrayの関数の適用について

numpyには三角関数や指数関数、対数関数が実装されている。 例えば、「np.sin()」の引数にはndarrayを渡せるし、数値型を格納したリスト型を渡すこともできる。

import numpy as np

x = np.linspace(-5, 5, 100)
y0 = np.sin(x)
y1 = np.sin(x) + x 
y2 = x**2 + 2*x -4 

l = [1, 2, 3, 4, 5, 6, 7, 8, 10]
print(np.sin(l))

もし、Pythonのリストだけで実装しようとするなら、 for文を回して各要素に対して演算を行う必要があり、非常に手間である。 (NumPyの関数便利ですねという話。)

上記で書いた「x**2 + 2*x -4」程度の単純な関数であれば問題ないのだが、 何か自作した関数をndarrayの全要素に適用したい場合もあるだろう。 そのようなときは、「np.frompyfunc(関数名, 引数の数, 戻りの数)」使うとよい。 (lambda式という選択肢もある。)

import numpy as np

def myfunc(x):
    if x < 0:
        return x
    elif x >= 0 and x < 3:
        return x**2 - x
    else:
        return 2*x

new_func = np.frompyfunc(myfunc, 1, 1)

x = np.linspace(-2, 5, 100)
y  = new_func(x)
print(y)

行列計算

NumPyでは行列の計算(内積・外積・行列式・逆行列など)が容易に行える。 以下のコードはその例である。

import numpy as np

A = np.array([
    [1, 2, 3],
    [4, 5, 6],
    [7, 8, 2]])
 
B =np.array([
    [2, 3, 5],
    [7, 2, 9],
    [1, 8, 7]])
# 内積
print(A.dot(B))         # オブジェクト指向っぽい書き方
print(np.dot(A, B))     # 分かりやすいのはこっちだろうか

# 外積
print(np.cross(A, B))

#行列式
A_det = np.linalg.det(A)
print(A_det)

# 逆行列
A_inv = np.linalg.inv(A)
print(A_inv)
print(A.dot(A_inv))

逆行列が求まるということは、連立方程式が解けることに等しい。 折角なので、連立方程式の解を求めるプログラムを書いてみよう。 \begin{align*} \begin{cases} a_{11}x_1+a_{12}x_2+a_{13}x_3=b_1\\ a_{21}x_1+a_{22}x_2+a_{23}x_3=b_2\\ a_{31}x_1+a_{32}x_3+a_{33}x_3=b_3 \end{cases} \quad\rm{or}\quad \begin{pmatrix} a_{11}&a_{12}&a_{13}\\ a_{21}&a_{22}&a_{23}\\ a_{31}&a_{32}&a_{33} \end{pmatrix} \begin{pmatrix} x_1\\ x_2\\ x_3 \end{pmatrix} = \begin{pmatrix} b_1\\ b_2\\ b_3 \end{pmatrix} \end{align*} \(a_{ij}\) (\(i,j=1,2,3\))を要素に持つ行列を\(A\)、\(x_{i}\)を要素に持つ行列を\(X\)、 \(b_{i}\)を要素に持つ行列を\(B\)とおけば、 \(A\)に逆行列\(A^{-1}\)が存在するなら、 \begin{align*} X=A^{-1}B \end{align*} で解が求まる。

import numpy as np
import mymoduleinput as myinput

print('3元連立方程式を求める AX = B')
while True:
    print('xの係数を入力して下さい')
    a1 = myinput.input_nums_N(3, 'a11, a12, a13 >')
    a2 = myinput.input_nums_N(3, 'a21, a22, a23 >')
    a3 = myinput.input_nums_N(3, 'a31, a32, a33 >')
    A = np.array([a1, a2, a3]) 

    try:
        A_inv = np.linalg.inv(A)
    except np.linalg.LinAlgError as err:
        print('Aの逆行列が存在しません(解が一意に定まらない)')
        if myinput.input_int('終了する場合は-1を入力> ') == -1:
            break
    else:
        b = myinput.input_nums(3, 'b1, b2, b3 >')
        B = np.array(b)
        X = A_inv.dot(B)
        print(f'解はX = {X}\n')
        if myinput.input_int('終了する場合は-1を入力> ') == -1:
            break

print('---Finish---')

行の抽出(付録)

'sample_data2.txt'を解析する際にまだやっていないことがあり、 それはクラスごとの解析である。 (面倒なので意図的に避けていた。pandasでグループ化して計算した方が楽である。) 条件に一致した行の抽出について紹介する。

import time
import numpy as np
ut_np = time.perf_counter()

########## Numpyを利用した処理 ##########

# 読み込み
path = 'sample_data2.txt'
data = np.loadtxt(path, delimiter='\t', dtype='object', encoding='utf8')
# 0行目なくした配列
noheader_data = data[1:,]
# class列をkeyにしてソート
sorted_data = noheader_data[noheader_data[:, 2].argsort()]
# ユニークな配列とその開始インデックス
uniques, indices = np.unique(sorted_data[:, 2], return_index=True)
# 科目名
subject_names = list(data[0, 3:])
# split()してクラスごとに分ける
data_groupby_class = np.split(sorted_data, indices[1:])
# クラスごとに計算
for u, data_class in zip(uniques, data_groupby_class):
    print('CLASS', u)
    subject_data_list = [data_class[0:, col_idx].astype(np.int32) for col_idx in range(3, 8)]
    for subject_name, subject_data in zip(subject_names, subject_data_list):
        print(f'{subject_name}の平均点:{subject_data.mean():.2f} +- {subject_data.std():5.2f}')

time_np = time.perf_counter() - ut_np

########## Python標準での処理 ##########

def append_dict(k, l):
    class_dict[k]['id'].append(int(l[0].strip()))
    class_dict[k]['name'].append(l[1].strip())
    class_dict[k]['英語'].append(int(l[3].strip()))
    class_dict[k]['数学'].append(int(l[4].strip()))
    class_dict[k]['国語'].append(int(l[5].strip()))
    class_dict[k]['理科'].append(int(l[6].strip()))
    class_dict[k]['社会'].append(int(l[7].strip()))

# 平均、分散、標準偏差計算
def calc_stat(l):
    ave = sum(l) / len(l)
    var = sum([(x-ave)**2 for x in l]) / len(l)
    return ave, var, var**(1/2)

ut_py = time.perf_counter()

with open(path, encoding="UTF-8") as f:
    list_data = f.readlines()


_SUBJECTS_ = ['英語','数学','国語','理科', '社会']
class_dict = {
    'A': {'id':[],'name':[],'英語':[],'数学':[],'国語':[],'理科':[], '社会':[]},
    'B': {'id':[],'name':[],'英語':[],'数学':[],'国語':[],'理科':[], '社会':[]},
    'C': {'id':[],'name':[],'英語':[],'数学':[],'国語':[],'理科':[], '社会':[]},
    'D': {'id':[],'name':[],'英語':[],'数学':[],'国語':[],'理科':[], '社会':[]},
    'E': {'id':[],'name':[],'英語':[],'数学':[],'国語':[],'理科':[], '社会':[]}
    }

for i, row in enumerate(list_data):
    if i != 0:
        rows = row.split('\t')
        if rows[2] == 'A':
            append_dict('A', rows)
        elif rows[2] =='B':
            append_dict('B', rows)
        elif rows[2] =='C':
            append_dict('C', rows)
        elif rows[2] =='D':
            append_dict('D', rows)
        elif rows[2] =='E':
            append_dict('E', rows)

for k in class_dict:    # クラスループ
    print('CLASS', k)
    for sub in _SUBJECTS_:  # 科目ループ
        ave, var, std = calc_stat(class_dict[k][sub])
        print(f'{sub}の平均点:{ave:.2f} +- {std:5.2f}')
time_py = time.perf_counter() - ut_py

print('###############\n処理速度')
print(f'{Numpy利用:10}:{time_np:5.4f}')
print(f'Python標準:\t{time_py:5.4f}')

2~23行目までがNumPyの処理である。上記コードが何をやっているかを理解する必要はあまりない。 pnadasのgroupby()が便利だということを言いたいがための布石である。 分かりやすさを優先するならPythonの標準のfor文で書いた方が良い(29~76行目)が、 処理速度とコード数が犠牲になる。

処理速度が2倍近く異なってくると思う(私の環境だとそのぐらい。500行のデータなので許容できるが、これが1000、10000とデータ数が増えてくると流石にPython標準のみだと嫌になってくる)。 コードを見て分かると思うが、NumPyは利用バージョンは任意個のクラス分けに対応しているが、 Python標準のみで書いたコードはA~Eにしか対応していない。 もし、追加のクラスがあるのならさらにコードが増えることになる。

回帰分析

Numpyではpolyfit(x, y, deg)で多項式回帰(最小二乗法)が行える。 なお、下記コードではmatplotlibも用いて可視化している。 また、読み込んでいるファイルは「「sample_date_approx.txt」」である。

import numpy as np 
import matplotlib.pyplot as plt

path = 'sample_date_approx.txt'
data = np.loadtxt(path, delimiter='\t', skiprows = 1, encoding='utf8')
print(data)

x = data[:,0]
y1 = data[:,1]
y2 = data[:,2]

# フィッティング
a1, a0 = np.polyfit(x, y1, 1)
b2, b1, b0 = np.polyfit(x, y2, 2)

fig = plt.figure()
ax1 = fig.add_subplot(121)
ax2 = fig.add_subplot(122)

ax1.scatter(x, y1, marker='.')
ax2.scatter(x, y2, marker='.')

ax1.plot(x, a1*x+a0, color='r')
ax2.plot(x, b2*x**2+b1*x+b0, color='r')

plt.show()

print(a1, a0)
print(b2, b1, b0)

polyfit(x, y, deg)では多項式近似しか扱えない。 その為、指数関数や三角関数でfittingを行いたいときはscipyのoptimize.curve_fitを使用すると良い。