离线推荐服务
离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。
离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。
离线推荐服务主要分为统计性算法、基于ALS的协同过滤推荐算法以及基于ElasticSearch的内容推荐算法。
在recommender下新建子项目StatisticsRecommender,pom.xml文件中只需引入spark、scala和mongodb的相关依赖:
<dependencies>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_2.11artifactId>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_2.11artifactId>
dependency>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
dependency>
<dependency>
<groupId>org.mongodbgroupId>
<artifactId>casbah-core_2.11artifactId>
<version>${casbah.version}version>
dependency>
<dependency>
<groupId>org.mongodb.sparkgroupId>
<artifactId>mongo-spark-connector_2.11artifactId>
<version>${mongodb-spark.version}version>
dependency>
dependencies>
在resources文件夹下引入log4j.properties,然后在src/main/scala下新建scala 单例对象com.atguigu.statistics.StatisticsRecommender。
同样,我们应该先建好样例类,在main()方法中定义配置、创建SparkSession并加载数据,最后关闭spark。代码如下:
src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String, shoot: String, language: String, genres: String, actors: String, directors: String)
case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int)
case class MongoConfig(uri:String, db:String)
case class Recommendation(mid:Int, score:Double)
case class GenresRecommendation(genres:String, recs:Seq[Recommendation])
object StatisticsRecommender {
val MONGODB_RATING_COLLECTION = “Rating”
val MONGODB_MOVIE_COLLECTION = “Movie”
//统计的表的名称
val RATE_MORE_MOVIES = “RateMoreMovies”
val RATE_MORE_RECENTLY_MOVIES = “RateMoreRecentlyMovies”
val AVERAGE_MOVIES = “AverageMovies”
val GENRES_TOP_MOVIES = “GenresTopMovies”
// 入口方法
def main(args: Array[String]): Unit = {
val config = Map(
“spark.cores” -> “local[*]”,
“mongo.uri” -> “mongodb://localhost:27017/recommender”,
“mongo.db” -> “recommender”
)
//创建SparkConf配置
val sparkConf = new SparkConf().setAppName(“StatisticsRecommender”).setMaster(config(“spark.cores”))
//创建SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val mongoConfig = MongoConfig(config(“mongo.uri”),config(“mongo.db”))
//加入隐式转换
import spark.implicits._
//数据加载进来
val ratingDF = spark
.read
.option(“uri”,mongoConfig.uri)
.option(“collection”,MONGODB_RATING_COLLECTION)
.format(“com.mongodb.spark.sql”)
.load()
.as[Rating]
.toDF()
val movieDF = spark
.read
.option(“uri”,mongoConfig.uri)
.option(“collection”,MONGODB_MOVIE_COLLECTION)
.format(“com.mongodb.spark.sql”)
.load()
.as[Movie]
.toDF()
//创建一张名叫ratings的表
ratingDF.createOrReplaceTempView(“ratings”)
//TODO: 不同的统计推荐结果
spark.stop()
}
留言与评论(共有 0 条评论) “” |