-
Notifications
You must be signed in to change notification settings - Fork 1
/
prob329.py
215 lines (170 loc) · 6.23 KB
/
prob329.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Solve Project Euler Problem 329
Find the probability of a sequence coming from the prime frog.
"""
import fractions
import itertools
import sys
import numpy as np
## {{{ http://code.activestate.com/recipes/117119/ (r2)
# Sieve of Eratosthenes
# David Eppstein, UC Irvine, 28 Feb 2002
def eratosthenes():
'''Yields the sequence of prime numbers via the Sieve of Eratosthenes.'''
D = {} # map composite integers to primes witnessing their compositeness
q = 2 # first integer to test for primality
while 1:
if q not in D:
yield q # not marked composite, must be prime
D[q*q] = [q] # first multiple of q not already marked
else:
for p in D[q]: # move each witness to its next multiple
D.setdefault(p+q,[]).append(p)
del D[q] # no longer need D[q], free memory
q += 1
## end of http://code.activestate.com/recipes/117119/ }}}
## {{{ Generate prime numbers below a certain value
def primes_below(n):
sieve = eratosthenes()
return list(itertools.takewhile(lambda p: p<n, sieve))
## }}}
def compute_probabilities(n, state):
"""
Compute the probability of seeing a P or N come from the frog for a
given state.
"""
primes_below_n = primes_below(n)
prime_indices = np.array([i+1 in primes_below_n for i in range(n)])
notprime_indices = np.array([i+1 not in primes_below_n for i in range(n)])
# Conditional probabilities for getting a P or an N if the frog is
# on a prime or non-prime square
prob_P_prime = fractions.Fraction(2, 3)
prob_N_prime = fractions.Fraction(1, 3)
prob_P_notprime = fractions.Fraction(1, 3)
prob_N_notprime = fractions.Fraction(2, 3)
assert(np.sum(state) == 1)
prob_prime = np.sum(state[prime_indices])
prob_notprime = np.sum(state[notprime_indices])
assert(prob_prime + prob_notprime == 1)
prob_P = prob_P_prime*prob_prime + prob_P_notprime*prob_notprime
prob_N = prob_N_prime*prob_prime + prob_N_notprime*prob_notprime
assert(prob_P + prob_N == 1)
return prob_P, prob_N
def make_transition_matrix(n):
# Off-diagonal means it's 1 shorter than the main diagonal
diag = [fractions.Fraction(1, 2) for i in range(n-1)]
# Upper diagonal is .5 .5 .5 .5 ... 1.
# Lower diagonal is 1. .5 .5 .5 ... .5
upper_diag = diag[:]
upper_diag[-1] = fractions.Fraction(1, 1)
lower_diag = diag[:]
lower_diag[0] = fractions.Fraction(1, 1)
return np.diag(upper_diag, k=1) + np.diag(lower_diag, k=-1)
def make_initial_condition(n, nonzero=None):
if nonzero is None:
x = [fractions.Fraction(1, n) for i in range(n)]
else:
x = [fractions.Fraction(0, 1) for i in range(n)]
x[nonzero] = fractions.Fraction(1, 1)
return np.array(x)
def solve(outcome='PPPPNNPPPNPPNPN'):
board_size = 500
A = make_transition_matrix(board_size)
x = make_initial_condition(board_size)
#outcome =
#outcome = 'PP'
prob = {}
p = fractions.Fraction(1, 1)
croak_result = ''
for croak in outcome:
prob['P'], prob['N'] = compute_probabilities(board_size, x)
print 'P(%s) = %s' % (croak, prob[croak])
p *= prob[croak]
croak_result += croak
print 'P(%s) = %s' % (croak_result, p)
x = np.dot(A, x)
print float(p)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
solve()
#### Monte Carlo Simulation
import random
class Frog:
# The frog croaks the correct prime/nonprime with probability 2/3
CROAK_ON_PRIME = ('P', 'P', 'N')
CROAK_ON_NONPRIME = ('N', 'N', 'P')
def __init__(self, board_size=500):
self.primes = primes_below(board_size)
self.position = random.randrange(board_size)
# Each square has a possible move to the adjacent squares, but
# the endpoints only have a single possible move.
self.possible_moves = [(-1, +1) for _ in range(board_size)]
self.possible_moves[0] = (+1,)
self.possible_moves[-1] = (-1,)
def jump(self):
move = random.choice(self.possible_moves[self.position])
self.position += move
def croak(self):
square_num = self.position + 1
if square_num in self.primes:
return random.choice(Frog.CROAK_ON_PRIME)
else:
return random.choice(Frog.CROAK_ON_NONPRIME)
def __iter__(self):
while True:
yield self.croak()
self.jump()
# Serial Version
def run(ref_string, num_trials=50000):
results = [trial(ref_string) for _ in xrange(num_trials)]
num_correct = results.count(True)
prob_correct = float(num_correct) / float(num_trials)
print ('%d correct out of %d trials: p(%s) = %f' %
(num_correct, num_trials, ref_string, prob_correct))
return prob_correct
def trial(croak_string):
frog = Frog()
for (ref_croak , croak) in zip(croak_string, frog):
if ref_croak != croak:
return False
return True
# Parallel Version
import multiprocessing
import Queue
def run_parallel(ref_string, num_trials=50000):
result_queue = multiprocessing.Queue()
pool = multiprocessing.Pool(initializer=parallel_init,
initargs=(result_queue,))
ref_strings = (ref_string for _ in xrange(num_trials))
pool.imap(parallel_trial, ref_strings, chunksize=100)
pool.close()
results = process_queue(result_queue)
pool.join()
num_correct = results.count(True)
prob_correct = float(num_correct) / float(num_trials)
print ('%d correct out of %d trials: p(%s) = %f' %
(num_correct, num_trials, ref_string, prob_correct))
return prob_correct
def process_queue(q):
results = []
try:
while True:
results.append(q.get(block=True, timeout=0.5))
except Queue.Empty:
pass
return results
def parallel_trial(croak_string):
result = trial(croak_string)
parallel_trial.q.put(result)
def parallel_init(q):
parallel_trial.q = q
if __name__ == "__main__":
#outcome = 'PPPPNNPPPNPPNPN'
outcome = 'PP'
#probs = [run(outcome, num_trials=50000) for _ in range(10)]
probs = [run_parallel(outcome, num_trials=500000) for _ in range(500)]
total_prob = sum(probs) / len(probs)
print total_prob
solve(outcome='PP')
#main()