-
Notifications
You must be signed in to change notification settings - Fork 31
/
hmm.py
107 lines (84 loc) · 3.19 KB
/
hmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import numpy as np
from scipy.sparse import coo_matrix
import matplotlib.pyplot as plt
np.random.seed(42)
class HMM():
def __init__(self, d=3, k=2, n=10000):
self.d = d #dimension of data
self.k = k #dimension of latent state
self.n = n #number of data points
self.A = np.zeros((k,k)) #transition matrix
self.E = np.zeros((k,d)) #emission matrix
self.s = np.zeros(k) #initial state vector
self.x = np.zeros(self.n) #emitted observations
def normalize_mat(self, X, dim=1):
z = np.sum(X, axis=dim)
Xnorm = X/z.reshape(-1,1)
return Xnorm
def normalize_vec(self, v):
z = sum(v)
u = v / z
return u, z
def init_hmm(self):
#initialize matrices at random
self.A = self.normalize_mat(np.random.rand(self.k,self.k))
self.E = self.normalize_mat(np.random.rand(self.k,self.d))
self.s, _ = self.normalize_vec(np.random.rand(self.k))
#generate markov observations
z = np.random.choice(self.k, size=1, p=self.s)
self.x[0] = np.random.choice(self.d, size=1, p=self.E[z,:].ravel())
for i in range(1, self.n):
z = np.random.choice(self.k, size=1, p=self.A[z,:].ravel())
self.x[i] = np.random.choice(self.d, size=1, p=self.E[z,:].ravel())
#end for
def forward_backward(self):
#construct sparse matrix X of emission indicators
data = np.ones(self.n)
row = self.x
col = np.arange(self.n)
X = coo_matrix((data, (row, col)), shape=(self.d, self.n))
M = self.E * X
At = np.transpose(self.A)
c = np.zeros(self.n) #normalization constants
alpha = np.zeros((self.k, self.n)) #alpha = p(z_t = j | x_{1:T})
alpha[:,0], c[0] = self.normalize_vec(self.s * M[:,0])
for t in range(1, self.n):
alpha[:,t], c[t] = self.normalize_vec(np.dot(At, alpha[:,t-1]) * M[:,t])
#end for
beta = np.ones((self.k, self.n))
for t in range(self.n-2, 0, -1):
beta[:,t] = np.dot(self.A, beta[:,t+1] * M[:,t+1])/c[t+1]
#end for
gamma = alpha * beta
return gamma, alpha, beta, c
def viterbi(self):
#construct sparse matrix X of emission indicators
data = np.ones(self.n)
row = self.x
col = np.arange(self.n)
X = coo_matrix((data, (row, col)), shape=(self.d, self.n))
#log scale for numerical stability
s = np.log(self.s)
A = np.log(self.A)
M = np.log(self.E * X)
Z = np.zeros((self.k, self.n))
Z[:,0] = np.arange(self.k)
v = s + M[:,0]
for t in range(1, self.n):
Av = A + v.reshape(-1,1)
v = np.max(Av, axis=0)
idx = np.argmax(Av, axis=0)
v = v.reshape(-1,1) + M[:,t].reshape(-1,1)
Z = Z[idx,:]
Z[:,t] = np.arange(self.k)
#end for
llh = np.max(v)
idx = np.argmax(v)
z = Z[idx,:]
return z, llh
if __name__ == "__main__":
hmm = HMM()
hmm.init_hmm()
gamma, alpha, beta, c = hmm.forward_backward()
z, llh = hmm.viterbi()
import pdb; pdb.set_trace()