-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtwitterStream.py
95 lines (71 loc) · 2.84 KB
/
twitterStream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import operator
import numpy as np
import matplotlib.pyplot as plt
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
ssc = StreamingContext(sc, 10) # Create a streaming context with batch interval of 10 sec
ssc.checkpoint("checkpoint")
pwords = load_wordlist("positive.txt")
nwords = load_wordlist("negative.txt")
counts = stream(ssc, pwords, nwords, 100)
make_plot(counts)
def make_plot(counts):
"""
Plot the counts for the positive and negative words for each timestep.
Use plt.show() so that the plot will popup.
"""
positive_count = []
negetive_count = []
for count in counts:
for word in count:
if word[0] == "positive":
positive_count.append(word[1])
else:
negetive_count.append(word[1])
plt.axis([-1, len(positive_count), 0 , max(max(positive_count), max(negetive_count))+5000])
pos, = plt.plot(positive_count, 'y:', marker ='x', markersize =10)
neg, = plt.plot(negetive_count, 'r:', marker ='x', markersize =10)
plt.legend((pos,neg),('Positive', 'negative'), loc=2)
plt.xticks(np.arange(0, len(positive_count),1))
plt.xlabel('Time')
plt.ylabel('Word Count')
plt.show()
def load_wordlist(filename):
"""
This function should return a list or set of words from the given filename.
"""
file = open(filename, 'r')
words =[]
for x in file:
words.append(x.split("\n")[0])
return set(words)
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
def stream(ssc, pwords, nwords, duration):
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: x[1])
tweets = tweets.flatMap(lambda x:x.split(" "))
tweets_pos = tweets.filter(lambda word:word in pwords)
tweets_neg = tweets.filter(lambda word:word in nwords)
tweets = tweets_pos.union(tweets_neg)
tweets = tweets.map(lambda word: ('positive', 1) if (word in pwords) else ('negative', 1))
tweets = tweets.reduceByKey(lambda x, y : x+y)
totalCount = tweets.updateStateByKey(updateFunction)
totalCount.pprint()
counts = []
tweets.foreachRDD(lambda t,rdd: counts.append(rdd.collect()))
ssc.start()
ssc.awaitTerminationOrTimeout(duration)
ssc.stop(stopGraceFully=True)
return counts
if __name__=="__main__":
main()