-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrecommender_ALS_Spark_Python.py
123 lines (84 loc) · 3.57 KB
/
recommender_ALS_Spark_Python.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import findspark
findspark.init()
from pyspark.mllib.recommendation import *
import random
from operator import *
from collections import defaultdict
# In[430]:
# Initialize Spark Context
# YOUR CODE GOES HERE
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# ## Loading data
artistData = sc.textFile('./data_raw/artist_data_small.txt').map(lambda s:(int(s.split("\t")[0]),s.split("\t")[1]))
artistAlias = sc.textFile('./data_raw/artist_alias_small.txt')
userArtistData = sc.textFile('./data_raw/user_artist_data_small.txt')
# ## Data Exploration
userArtistData = userArtistData.map(lambda s:(int(s.split(" ")[0]),int(s.split(" ")[1]),int(s.split(" ")[2])))
aliasDict = {}
entities = artistAlias.map(lambda s:(int(s.split("\t")[0]),int(s.split("\t")[1])))
for item in entities.collect():
aliasDict[item[0]] = item[1]
userArtistData = userArtistData.map(lambda x: (x[0], aliasDict[x[1]] if x[1] in aliasDict else x[1], x[2]))
user = userArtistData.map(lambda x:(x[0],x[2]))
counts = user.map(lambda x: (x[0],x[1])).reduceByKey(lambda x,y : x+y)
countz = user.map(lambda x: (x[0],1)).reduceByKey(lambda x,y:x+y)
final = counts.leftOuterJoin(countz)
final = final.map(lambda x: (x[0],x[1][0],int(x[1][0]/x[1][1])))
l = final.top(3,key=lambda x: x[1])
for i in l:
print('User '+str(i[0])+' has a total play count of '+str(i[1])+' and a mean play count of '+str(i[2])+'.')
# #### Splitting Data for Testing
trainData, validationData, testData = userArtistData.randomSplit((0.4,0.4,0.2), seed=13)
trainData.cache()
validationData.cache()
testData.cache()
print(trainData.take(3))
print(validationData.take(3))
print(testData.take(3))
print(trainData.count())
print(validationData.count())
print(testData.count())
# ## The Recommender Model
# ### Model Evaluation
def modelEval(model, dataset):
Artists = sc.parallelize(set(userArtistData.map(lambda x:x[1]).collect()))
Users = sc.parallelize(set(dataset.map(lambda x:x[0]).collect()))
TestDict ={}
for user in Users.collect():
filtered = dataset.filter(lambda x:x[0] == user).collect()
for item in filtered:
if user in TestDict:
TestDict[user].append(item[1])
else:
TestDict[user] = [item[1]]
TrainDict = {}
for user in Users.collect():
filtered = trainData.filter(lambda x:x[0] == user).collect()
for item in filtered:
if user in TrainDict:
TrainDict[user].append(item[1])
else:
TrainDict[user] = [item[1]]
score =0.00
for user in Users.collect():
predictionData = Artists.map(lambda x:(user,x))
predictions = model.predictAll(predictionData)
filtered = predictions.filter(lambda x :not x[1] in TrainDict[x[0]])
topPredictions = filtered.top(len(TestDict[user]),key=lambda x:x[2])
l=[]
for pre in topPredictions:
l.append(pre[1])
score+=len(set(l).intersection(TestDict[user]))/len(TestDict[user])
print("The model score for rank "+str(model.rank)+" is ~"+str(score/len(TestDict)))
# ### Model Construction
rankList = [2,10,20]
for rank in rankList:
model = ALS.trainImplicit(trainData, rank , seed=345)
modelEval(model,validationData)
bestModel = ALS.trainImplicit(trainData, rank=10, seed=345)
modelEval(bestModel, testData)
# Find the top 5 artists for a particular user and list their names
top5 = bestModel.recommendProducts(1059637,5)
for item in range(0,5):
print("Artist "+str(item)+": "+artistData.filter(lambda x:x[0] == top5[item][1]).collect()[0][1])