-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathMovieDatabase.scala
171 lines (130 loc) · 6.02 KB
/
MovieDatabase.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
* @author Aapo Kyrola <[email protected]>
* @version 1.0
*
* @section LICENSE
*
* Copyright [2014] [Aapo Kyrola / Carnegie Mellon University]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Publication to cite: http://arxiv.org/abs/1403.0701
*/
package edu.cmu.graphchidb.examples
import edu.cmu.graphchi.GraphChiEnvironment
import scala.io.Source
import java.io.File
import edu.cmu.graphchidb.Util._
import edu.cmu.graphchidb.{GraphChiDatabaseAdmin, GraphChiDatabase}
import edu.cmu.graphchidb.examples.computation.ALSMatrixFactorization
/**
* Example use of GraphChi-DB. Imports Netflix's movie database and computes ALS matrix
* factorization to produce recommendations.
* @author Aapo Kyrola
*/
object MovieDatabase {
/*
import edu.cmu.graphchidb.examples.MovieDatabase._
startIngest
// After ingestion, run
runALS
// Then to recommend for a user id X
recommendForUser(X)
*/
// Get data from http://www.select.cs.cmu.edu/code/graphlab/datasets/netflix_mm
val sourceFile = System.getProperty("user.home") + "/graphs/netflix_mm"
val movieNamesFile = System.getProperty("user.home") + "/graphs/movie_titles.txt"
val baseFilename = System.getProperty("user.home") + "/graphs/DB/moviedb/netflix"
// User vertex ids are original id + userIdOffset
// Movie IDs with their original IDs.
// This is required because GraphChi-DB does not support typed vertices
val userIdOffset = 200000
val numShards = 32
val RATING_EDGE = 0.toByte
GraphChiDatabaseAdmin.createDatabaseIfNotExists(baseFilename, numShards = numShards)
val DB = new GraphChiDatabase(baseFilename, numShards = numShards)
// Edges for rating
val ratingEdgeColumn = DB.createByteColumn("rating", DB.edgeIndexing)
// Vertex type
val vertexTypeColumn = DB.createCategoricalColumn("vertextype", IndexedSeq("null", "movie", "user"), DB.vertexIndexing, temporary=false)
// auto-fill
vertexTypeColumn.autoFillVertexFunc = Some((vertexId) => vertexTypeColumn.indexForName(
if (DB.internalToOriginalId(vertexId) < userIdOffset) { "movie" } else { "user "}))
// Movie parameters
val movieYearColumn = DB.createShortColumn("year", DB.vertexIndexing)
// Movie name: one column for variable data (the name), and for vertex then a column
// that contains pointer to the vardata log. This is awkward, but GraphChi-DB is not a full-blown DB...
val movieNameColumn = DB.createVarDataColumn("moviename", DB.vertexIndexing)
val movieNamePtrColumn = DB.createLongColumn("movienameptr", DB.vertexIndexing)
val alsComputation = new ALSMatrixFactorization("alsfactor", ratingEdgeColumn, DB)
DB.initialize()
def startIngest = {
async {
val ingestMeter = GraphChiEnvironment.metrics.meter("edgeingest")
var i = 0
val t = System.currentTimeMillis()
Source.fromFile(new File(sourceFile)).getLines().foreach( ln => {
if (i >= 3 && !ln.startsWith("%")) { // Skip header
val toks = ln.split(" ")
if (toks.length >= 3) {
val user = userIdOffset + Integer.parseInt(toks(0))
val movie = Integer.parseInt(toks(1))
val rating = Integer.parseInt(toks(toks.length - 1))
DB.addEdgeOrigId(RATING_EDGE, user, movie, rating.toByte)
if (i % 1000 == 0) ingestMeter.mark(1000)
if (i % 1000000 == 0) println((System.currentTimeMillis - t) / 1000 + " s. : Processed: %d".format(i) + " ;" + ingestMeter.getOneMinuteRate + " / sec"
+ "; mean=" + ingestMeter.getMeanRate + " edges/sec")
}
}
i += 1
} )
DB.flushAllBuffers()
println("Finished inserting ratings, now populate movie information")
insertMovieInfo
println("Done ingest.")
}
}
def insertMovieInfo = {
// Read movie names
Source.fromFile(new File(movieNamesFile), "iso-8859-1").getLines().foreach( ln => {
println(ln)
val toks = ln.split(",")
val origMovieId = Integer.parseInt(toks(0))
val year = if (toks(1) == "NULL") { 0 } else { Integer.parseInt(toks(1))}
val name = toks(2)
val namePtr = movieNameColumn.insert(name)
DB.setVertexColumnValueOrigId(origMovieId, movieNamePtrColumn, namePtr) // Notice the use of original ID here.. Confusing?
DB.setVertexColumnValueOrigId(origMovieId, movieYearColumn, year.toShort)
})
DB.flushAllBuffers()
}
/**
* Run ALS to compute the model for recommendations.
*/
def runALS = DB.runGraphChiComputation(alsComputation, numIterations=5, enableScheduler=false)
val movieTypeId = vertexTypeColumn.indexForName("movie")
/**
* @return Top 20 top-predicted movies as tuples (rating, original-movieid, internal-movieid, movie name)
*/
def recommendForUser(origUserId: Long) = {
val userId = internalUserId(origUserId)
val movieIds = vertexTypeColumn.select((vid, t) => t == movieTypeId).map(tup => tup._1).toIndexedSeq
val movieIdsAndRatings = movieIds.map(movieId => (movieId, alsComputation.predictRating(userId, movieId))).sortBy(-_._2).take(20)
movieIdsAndRatings.map { case (mvid: Long, rating: Double)=> (rating, DB.internalToOriginalId(mvid), mvid, movieNameColumn.getString(movieNamePtrColumn.get(mvid).get)) }
}
def movies = {
vertexTypeColumn.select((vid, t) => t == movieTypeId).map(tup => tup._1).map(mvid => (DB.internalToOriginalId(mvid), mvid, movieNameColumn.getString(movieNamePtrColumn.get(mvid).get)))
}
/* Mapping from user ids in original data to the ids used by the database */
def internalUserId(userId: Long) = DB.originalToInternalId(userIdOffset + userId)
}