forked from wenkph/FGPGM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMCMCSampler.py
118 lines (112 loc) · 4.78 KB
/
MCMCSampler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# -*- coding: utf-8 -*-
# Author: Philippe Wenk <[email protected]>
"""
Class implementing the MCMC sampling scheme used for FGPGM.
"""
import numpy as np
from scipy.stats import norm
def MCMCWithBounds(logTarget, xInit, proposalStds, lowerBounds, upperBounds,
nSamples, nBurnin):
"""
Creates MCMC samples respecting boundaries provided by the experiment.
Uses zero mean Gaussian distributions as proposal.
Parameters
----------
logTarget: function taking one vector of length nDim as inputs
logarithm of the unnormalized density that should be
inferred using the MCMC scheme
xInit: vector of length nDim
initial values of the chain
proposalStd: vector of length nDim
Standard deviations for the proposal distributions.
lowerBounds: vector of length nDim
lower bounds for the arguments of targetDensity
upperBounds: vector of length nDim
upper bounds for the arguments of targetDensity
nSamples: integer
amount of MCMC steps to be performed in total
nBurnin: integer, smaller than nSamples
amount of MCMC steps that should be discarded at the
beginning for burn-in.
Returns
----------
samples: vector of length nSamples-nBurnin
samples of the chain, describing the probability density
"""
xInit = np.asarray(xInit)
allSamples = []
xNew = xInit.copy()
logPNew = logTarget(xInit)
allSamples.append(xNew)
nAccepted=np.zeros_like(xInit)
nRejected=np.zeros_like(xInit)
for i in np.arange(nSamples+nBurnin):
currSamples = []
for dim in range(xInit.size):
# save state and probability of previous iteration
xOld = xNew.copy()
logPOld = logPNew
# create new state
proposal = getProposal(mean=xOld[dim],
std=proposalStds[dim],
lowerBound=lowerBounds[dim],
upperBound=upperBounds[dim])
xPotential = xOld.copy()
xPotential[dim] = proposal
# get normalization constantes for proposal distribution
normalizationOldGivenNew = getValidProbability(
mean=proposal,
std=proposalStds[dim],
lowerBound=lowerBounds[dim],
upperBound=upperBounds[dim]
)
normalizationNewGivenOld = getValidProbability(
mean=xOld[dim],
std=proposalStds[dim],
lowerBound=lowerBounds[dim],
upperBound=upperBounds[dim]
)
# get new function value
logPPotential = logTarget(xPotential)
# accept if acceptable
if logPPotential-logPOld > np.log(normalizationOldGivenNew /
normalizationNewGivenOld):
# catch potential overflows
acceptanceProb = 1
else:
acceptanceProb = np.exp(logPPotential-logPOld)
acceptanceProb *= normalizationNewGivenOld \
/ normalizationOldGivenNew
if np.random.rand(1) <= acceptanceProb:
xNew = xPotential.copy()
if proposal > upperBounds[dim] or proposal < lowerBounds[dim]:
pass
print("Bounds violating value accepted (!)")
logPNew = logPPotential
nAccepted[dim] += 1
else:
logPNew = logPOld
nRejected[dim] += 1
pass
currSamples.append(xNew)
allSamples.append(np.mean(currSamples, axis=0))
return np.asarray(allSamples[nBurnin:]), nAccepted, nRejected
def getProposal(mean, std, lowerBound, upperBound, maxTry=100):
"""
Samples the Gaussian proposal distribution with given mean and std until
a sample within the lowerBound and upperBound is found.
Will throw an error if this takes longer than maxTry tries.
"""
for i in range(maxTry):
newX = mean + std*np.random.randn(1)
if newX < upperBound and newX > lowerBound:
return np.squeeze(newX)
raise ValueError("Cannot find proper proposal. Maybe bounds are " +
"improperly chosen?")
def getValidProbability(mean, std, lowerBound, upperBound):
"""
Calculates the probability mass of the Gaussian proposal distribution
with given mean and std that is within the lower and upper bound.
"""
return norm.cdf(upperBound, loc=mean, scale=std) \
- norm.cdf(lowerBound, loc=mean, scale=std)