大数据技术之电影推荐系统4


第 4 部分 离线推荐服务建设

4.1 离线推荐服务

离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。
离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。
离线推荐服务主要分为统计性算法、基于 ALS 的协同过滤推荐算法以及基于ElasticSearch 的内容推荐算法。
在 recommender 下新建子项目 StatisticsRecommender,pom.xml 文件中只需引入spark、scala 和 mongodb 的相关依赖:

<dependencies>
    <!-- Spark 的依赖引入 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <!-- 引入 Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <!-- 加入 MongoDB 的驱动 -->
    <!-- 用于代码方式连接 MongoDB -->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.11</artifactId>
        <version>${casbah.version}</version>
    </dependency>
    <!-- 用于 Spark 和 MongoDB 的对接 -->
    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>${mongodb-spark.version}</version>
    </dependency>
</dependencies>

在 resources 文件夹下引入 log4j.properties,然后在 src/main/scala 下新建 scala 单例对象 com.bigworldxld.statistics.StatisticsRecommender。
同样,我们应该先建好样例类,在 main()方法中定义配置、创建 SparkSession并加载数据,最后关闭 spark。代码如下:
src/main/scala/com.bigworldxld.statistics/StatisticsRecommender.scala

package com.bigworldxld.statistics
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * @author bigworldxld
  * @create 2022-04-13 14:13
  */
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
                 shoot: String, language: String, genres: String, actors: String, directors: String)

case class Rating(uid: Int, mid: Int, score: Double, timestamp: Int )

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义电影类别top10推荐对象
case class GenresRecommendation( genres: String, recs: Seq[Recommendation] )
object StatisticsRecommender {
  // 定义表名
  val MONGODB_MOVIE_COLLECTION = "Movie"
  val MONGODB_RATING_COLLECTION = "Rating"

  //统计的表的名称
  val RATE_MORE_MOVIES = "RateMoreMovies"
  val RATE_MORE_RECENTLY_MOVIES = "RateMoreRecentlyMovies"
  val AVERAGE_MOVIES = "AverageMovies"
  val GENRES_TOP_MOVIES = "GenresTopMovies"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    // 创建一个sparkConf
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommeder")

    // 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 从mongodb加载数据
    val ratingDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Rating]
      .toDF()

    val movieDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Movie]
      .toDF()

    // 创建名为ratings的临时表
    ratingDF.createOrReplaceTempView("ratings")

    // 不同的统计推荐结果
    // 1. 历史热门统计,历史评分数据最多,mid,count
    val rateMoreMoviesDF = spark.sql("select mid, count(mid) as count from ratings group by mid")
    // 把结果写入对应的mongodb表中
    storeDFInMongoDB( rateMoreMoviesDF, RATE_MORE_MOVIES )

    // 2. 近期热门统计,按照“yyyyMM”格式选取最近的评分数据,统计评分个数
    // 创建一个日期格式化工具
    val simpleDateFormat = new SimpleDateFormat("yyyyMM")
    // 注册udf,把时间戳转换成年月格式
    spark.udf.register("changeDate", (x: Int)=>simpleDateFormat.format(new Date(x * 1000L)).toInt )

    // 对原始数据做预处理,去掉uid
    val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth from ratings")
    ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")

    // 从ratingOfMonth中查找电影在各个月份的评分,mid,count,yearmonth
    val rateMoreRecentlyMoviesDF = spark.sql("select mid, count(mid) as count, yearmonth from ratingOfMonth group by yearmonth, mid order by yearmonth desc, count desc")

    // 存入mongodb
    storeDFInMongoDB(rateMoreRecentlyMoviesDF, RATE_MORE_RECENTLY_MOVIES)

    // 3. 优质电影统计,统计电影的平均评分,mid,avg
    val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by mid")
    storeDFInMongoDB(averageMoviesDF, AVERAGE_MOVIES)

    // 4. 各类别电影Top统计
    // 定义所有类别
    val genres = List("Action","Adventure","Animation","Comedy","Crime","Documentary","Drama","Family","Fantasy","Foreign","History","Horror","Music","Mystery"
      ,"Romance","Science","Tv","Thriller","War","Western")

    // 把平均评分加入movie表里,加一列,inner join
    val movieWithScore = movieDF.join(averageMoviesDF, "mid")

    // 为做笛卡尔积,把genres转成rdd
    val genresRDD = spark.sparkContext.makeRDD(genres)

    // 计算类别top10,首先对类别和电影做笛卡尔积
    val genresTopMoviesDF = genresRDD.cartesian(movieWithScore.rdd)
      .filter{
        // 条件过滤,找出movie的字段genres值(Action|Adventure|Sci-Fi)包含当前类别genre(Action)的那些
        case (genre, movieRow) => movieRow.getAs[String]("genres").toLowerCase.contains( genre.toLowerCase )
      }
      .map{
        case (genre, movieRow) => ( genre, ( movieRow.getAs[Int]("mid"), movieRow.getAs[Double]("avg") ) )
      }
      .groupByKey()
      .map{
        case (genre, items) => GenresRecommendation( genre, items.toList.sortWith(_._2>_._2).take(10).map( item=> Recommendation(item._1, item._2)) )
      }
      .toDF()
    storeDFInMongoDB(genresTopMoviesDF, GENRES_TOP_MOVIES)
    spark.stop()
  }

  def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit ={
    df.write
      .option("uri", mongoConfig.uri)
      .option("collection", collection_name)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
  }
}

查看mongodb数据库生成RateMoreMovies,RateMoreRecentlyMovies,AverageMovies,GenresTopMovies的结果:

4.2 离线统计服务

4.2.4 历史热门电影统计

根据所有历史评分数据,计算历史评分次数最多的电影。
实现思路:
通过 Spark SQL 读取评分数据集,统计所有评分中评分数最多的电影,然后按照从大到小排序,将最终结果写入 MongoDB 的 RateMoreMovies 数据集中。

//统计所有历史数据中每个电影的评分数
//数据结构 -》 mid,count
val rateMoreMoviesDF = spark.sql("select mid, count(mid) as count from ratings 
group by mid")
rateMoreMoviesDF
 .write
 .option("uri",mongoConfig.uri)
 .option("collection",RATE_MORE_MOVIES)
 .mode("overwrite")
 .format("com.mongodb.spark.sql")
 .save()

4.2.2 最近热门电影统计

根据评分,按月为单位计算最近时间的月份里面评分数最多的电影集合。
实现思路:
通过 Spark SQL 读取评分数据集,通过 UDF 函数将评分的数据时间修改为月,然后统计每月电影的评分数 。 统 计 完 成 之 后 将 数 据 写 入 到 MongoDB 的RateMoreRecentlyMovies 数据集中。

//统计以月为单位拟每个电影的评分数
//数据结构 -》 mid,count,time
//创建一个日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
//注册一个 UDF 函数,用于将 timestamp 装换成年月格式 1260759144000 => 201605
spark.udf.register("changeDate",(x:Int) => simpleDateFormat.format(new Date(x * 
                                                                            1000L)).toInt)
// 将原来的 Rating 数据集中的时间转换成年月的格式
val ratingOfYearMonth = spark.sql("select mid, score, changeDate(timestamp) as yearmonth 
from ratings")
// 将新的数据集注册成为一张表
ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyMovies = spark.sql("select mid, count(mid) as count ,yearmonth from 
ratingOfMonth group by yearmonth,mid")
rateMoreRecentlyMovies
.write
.option("uri",mongoConfig.uri)
.option("collection",RATE_MORE_RECENTLY_MOVIES)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()

4.2.3 电影平均得分统计

根据历史数据中所有用户对电影的评分,周期性的计算每个电影的平均得分。
实现思路:
通过 Spark SQL 读取保存在 MongDB 中的 Rating 数据集,通过执行以下 SQL 语句实现对于电影的平均分统计:

//统计每个电影的平均评分
val averageMoviesDF = spark.sql("select mid, avg(score) as avg from ratings group by 
mid")
averageMoviesDF
 .write
 .option("uri",mongoConfig.uri)
 .option("collection",AVERAGE_MOVIES)
 .mode("overwrite")
 .format("com.mongodb.spark.sql")
 .save()

统计完成之后将生成的新的 DataFrame 写出到 MongoDB 的AverageMoviesScore集合中。

4.2.4 每个类别优质电影统计

根据提供的所有电影类别,分别计算每种类型的电影集合中评分最高的 10 个电影。
实现思路:
在计算完整个电影的平均得分之后,将影片集合与电影类型做笛卡尔积,然后过滤掉电影类型不符合的条目,将 DataFrame 输出到 MongoDB 的 GenresTopMovies集合中。

//统计每种电影类型中评分最高的 10 个电影
val movieWithScore = movieDF.join(averageMoviesDF,Seq("mid")) 
//所有的电影类别
val genres = 
List("Action","Adventure","Animation","Comedy","Crime","Documentary","Drama","Famil
y","Fantasy","Foreign","History","Horror","Music","Mystery"
     ,"Romance","Science","Tv","Thriller","War","Western")
//将电影类别转换成 RDD
val genresRDD = spark.sparkContext.makeRDD(genres)
//计算电影类别 top10
val genrenTopMovies = genresRDD.cartesian(movieWithScore.rdd) 
.filter{
    case (genres,row) => 
    row.getAs[String]("genres").toLowerCase.contains(genres.toLowerCase)
}
.map{
    // 将整个数据集的数据量减小,生成 RDD[String,Iter[mid,avg]]
    //统计每种电影类型中评分最高的 10 个电影
    val movieWithScore = movieDF.join(averageMoviesDF,Seq("mid")) 
    //所有的电影类别
    val genres = 
    List("Action","Adventure","Animation","Comedy","Crime","Documentary","Drama","Famil
y","Fantasy","Foreign","History","Horror","Music","Mystery"
         ,"Romance","Science","Tv","Thriller","War","Western")
    //将电影类别转换成 RDD
    val genresRDD = spark.sparkContext.makeRDD(genres)
    //计算电影类别 top10
    val genrenTopMovies = genresRDD.cartesian(movieWithScore.rdd) 
    .filter{
        case (genres,row) => 
        row.getAs[String]("genres").toLowerCase.contains(genres.toLowerCase)
    }
    .map{
        // 将整个数据集的数据量减小,生成 RDD[String,Iter[mid,avg]]
        case (genres,row) => {
            (genres,(row.getAs[Int]("mid"), row.getAs[Double]("avg")))
        }
    }.groupByKey()
    .map{
        case (genres, items) => GenresRecommendation(genres,items.toList.sortWith(_._2 > 
                                                                                  _._2).take(10).map(item => Recommendation(item._1,item._2)))
    }.toDF()

4.3 基于隐语义模型的协同过滤推荐

项目采用 ALS 作为协同过滤算法,分别根据 MongoDB 中的用户评分表和电影数据集计算用户电影推荐矩阵以及电影相似度矩阵。

4.3.1 用户电影推荐矩阵

通过 ALS 训练出来的 Model 来计算所有当前用户电影的推荐矩阵,主要思路如下:

  1. UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组
  2. 通过模型预测(uid,mid)的元组。
  3. 将预测结果通过预测分值进行排序。
  4. 返回分值最大的 K 个电影,作为当前用户的推荐。
    最后生成的数据结构如下:将数据保存到 MongoDB 的 UserRecs 表中

新建 recommender 的子项目 OfflineRecommender,引入 spark、scala、mongo 和 jblas 的依赖:

<dependencies>
    <dependency>
        <groupId>org.scalanlp</groupId>
        <artifactId>jblas</artifactId>
        <version>${jblas.version}</version>
    </dependency>
    <!-- Spark 的依赖引入 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-mllib_2.11</artifactId>
    </dependency>
    <!-- 引入 Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <!-- 加入 MongoDB 的驱动 -->
    <!-- 用于代码方式连接 MongoDB -->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.11</artifactId>
        <version>${casbah.version}</version>
    </dependency>
    <!-- 用于 Spark 和 MongoDB 的对接 -->
    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>${mongodb-spark.version}</version>
    </dependency>
</dependencies>

同样经过前期的构建样例类、声明配置、创建 SparkSession 等步骤,可以加载数据开始计算模型了。
核心代码如下:
src/main/scala/com.bigworldxld.offline/OfflineRecommender.scala

package com.bigworldxld.offine

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

/**
  * @author bigworldxld
  * @create 2022-04-14 11:55
  */
// 基于评分数据的LFM,只需要rating数据
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int )

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义基于预测评分的用户推荐列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )

// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )


object OfflineRecommender {
  // 定义表名和常量
  val MONGODB_RATING_COLLECTION = "Rating"

  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"

  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop99:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    // 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))


    // 加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map( rating => ( rating.uid, rating.mid, rating.score ) )    // 转化成rdd,并且去掉时间戳
      .cache()

    // 从rating数据中提取所有的uid和mid,并去重
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    // 训练隐语义模型
    val trainData = ratingRDD.map( x => Rating(x._1, x._2, x._3) )

    val (rank, iterations, lambda) = (200, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    // 基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表
    // 计算user和movie的笛卡尔积,得到一个空评分矩阵
    val userMovies = userRDD.cartesian(movieRDD)

    // 调用model的predict方法预测评分
    val preRatings = model.predict(userMovies)

    val userRecs = preRatings
      .filter(_.rating > 0)    // 过滤出评分大于0的项
      .map(rating => ( rating.user, (rating.product, rating.rating) ) )
      .groupByKey()
      .map{
        case (uid, recs) => UserRecs( uid, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1, x._2)) )
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    // 基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
    val movieFeatures = model.productFeatures.map{
      case (mid, features) => (mid, new DoubleMatrix(features))
    }

    // 对所有电影两两计算它们的相似度,先做笛卡尔积
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 把自己跟自己的配对过滤掉
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) => {
          val simScore = this.consinSim(a._2, b._2)
          ( a._1, ( b._1, simScore ) )
        }
      }
      .filter(_._2._2 > 0.6)    // 过滤出相似度大于0.6的
      .groupByKey()
      .map{
        case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
      }
      .toDF()
    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }
  // 求向量余弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
    movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
  }
}

4.3.2 电影相似度矩阵

通过 ALS 计算电影见相似度矩阵,该矩阵用于查询当前电影的相似电影并为实时推荐系统服务。
离线计算的 ALS 算法,算法最终会为用户、电影分别生成最终的特征矩阵,分别是表示用户特征矩阵的 U(m x k)矩阵,每个用户由 k 个特征描述;表示物品特征矩阵的 V(n x k)矩阵,每个物品也由 k 个特征描述。
V(n x k)表示物品特征矩阵,每一行是一个 k 维向量,虽然我们并不知道每一个维度的特征意义是什么,但是 k 个维度的数学向量表示了该行对应电影的特征。
所以,每个电影用 V(n x k)每一行的 向量表示其特征,于是任意
两个电影 p:特征向量为 ,电影 q:特征向量为 之
间的相似度 sim(p,q)可以使用 和 的余弦值来表示:

数据集中任意两个电影间相似度都可以由公式计算得到,电影与电影之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB 的 MovieRecs表中。

其中,consinSim 是求两个向量余弦相似度的函数,代码实现如下:

//计算两个电影之间的余弦相似度
//dot():如果处理的是一维数组,则得到的是两数组的內积.如果是二维数组(矩阵)之间的运算,则得到的是矩阵积(mastrix product)
def consinSim(movie1: DoubleMatrix, movie2:DoubleMatrix) : Double ={
 movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
}

查看生成的UserRecs,MovieRecs的结构:

4.3.3 模型评估和参数选取

​ 在上述模型训练的过程中,我们直接给定了隐语义模型的rank,iterations,lambda 三个参数。对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型 进行评估。通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之 间的误差。

​ 有了 RMSE,我们可以就可以通过多次调整参数值,来选取 RMSE 最小的一组 作为我们模型的优化选择。 在 scala/com.bigworldxld.offline/下新建单例对象 ALSTrainer,代码主体架构如下:

package com.bigworldxld.offline
import breeze.numerics.sqrt
import com.bigworldxld.offline.OfflineRecommender.MONGODB_RATING_COLLECTION
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
/**
  * @author bigworldxld
  * @create 2022-04-14 15:38
  */

object ALSTrainer {
  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://localhost:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")

    // 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 加载评分数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map( rating => Rating( rating.uid, rating.mid, rating.score ) )    // 转化成rdd,并且去掉时间戳
      .cache()

    // 随机切分数据集,生成训练集和测试集
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainingRDD = splits(0)
    val testRDD = splits(1)

    // 模型参数选择,输出最优参数
    adjustALSParam(trainingRDD, testRDD)

    spark.close()
  }

  def adjustALSParam(trainData: RDD[Rating], testData: RDD[Rating]): Unit ={
    val result = for( rank <- Array(50, 100, 200, 300); lambda <- Array( 0.01, 0.1, 1 ))
      yield { //每一次运算结果保存下来
        val model = ALS.train(trainData, rank, 5, lambda)
        // 计算当前参数对应模型的rmse,返回Double
        val rmse = getRMSE( model, testData )
        ( rank, lambda, rmse )
      }
    // 控制台打印输出最优参数
    println(result.minBy(_._3))
  }

  def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
    // 计算预测评分
    val userProducts = data.map(item => (item.user, item.product))
    val predictRating = model.predict(userProducts)

    // 以uid,mid作为外键,inner join实际观测值和预测值
    val observed = data.map( item => ( (item.user, item.product), item.rating ) )
    val predict = predictRating.map( item => ( (item.user, item.product), item.rating ) )
    // 内连接得到(uid, mid),(actual, predict)
    sqrt(
      observed.join(predict).map{
        case ( (uid, mid), (actual, pre) ) =>
          val err = actual - pre
          err * err
      }.mean()
    )
  }
}

运行代码,可以得到目前数据的最优模型参数,包括 ( rank, lambda, rmse )。

第 5 部分 实时推荐服务建设

5.1 实时推荐服务

​ 实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该反映最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好。
所以对于实时推荐算法,主要有两点需求:
(1)用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果;
(2)计算量不大,满足响应时间上的实时或者准实时要求;

5.2 实时推荐算法设计

当用户 u 对电影 p 进行了评分,将触发一次对 u 的推荐结果的更新。由于用户 u 对电影 p 评分,对于用户 u 来说,他与 p 最相似的电影们之间的推荐强度将发生变化,所以选取与电影 p 最相似的 K 个电影作为候选电影。
每个候选电影按照“推荐优先级”这一权重作为衡量这个电影被推荐给用户 u 的优先级。
这些电影将根据用户 u 最近的若干评分计算出各自对用户 u 的推荐优先级,然后与上次对用户 u 的实时推荐结果的进行基于推荐优先级的合并、替换得到更新后的推荐结果。
具体来说:
首先,获取用户 u 按时间顺序最近的 K 个评分,记为 RK;获取电影 p 的最相似的 K 个电影集合,记为 S;
然后,对于每个电影 q S ,计算其推荐优先级 ,计算公式如下:

其中:
表示用户 u 对电影 r 的评分;
sim(q,r)表示电影 q 与电影 r 的相似度,设定最小相似度为 0.6,当电影 q 和
电影 r 相似度低于 0.6 的阈值,则视为两者不相关并忽略;
sim_sum 表示 q 与 RK 中电影相似度大于最小阈值的个数;
incount 表示 RK 中与电影 q 相似的、且本身评分较高(>=3)的电影个数;
recount 表示 RK 中与电影 q 相似的、且本身评分较低(<3)的电影个数;

公式的意义如下:
首先对于每个候选电影 q,从 u 最近的 K 个评分中,找出与 q 相似度较高(>=0.6)的 u 已评分电影们,对于这些电影们中的每个电影 r,将 r 与 q 的相似度乘以用户 u 对 r 的评分,将这些乘积计算平均数,作为用户 u 对电影 q 的评分预测即

​ 然后,将 u 最近的 K 个评分中与电影 q 相似的、且本身评分较高(>=3)的 电影个数记为 incount,计算 lgmax{incount,1}作为电影 q 的“增强因子”,意义 在于电影 q 与 u 的最近 K 个评分中的 n 个高评分(>=3)电影相似,则电影 q 的 优先级被增加 lgmax{incount,1}。如果电影 q 与 u 的最近 K 个评分中相似的高评 分电影越多,也就是说 n 越大,则电影 q 更应该被推荐,所以推荐优先级被增强 的幅度较大;如果电影 q 与 u 的最近 K 个评分中相似的高评分电影越少,也就是 n 越小,则推荐优先级被增强的幅度较小;

​ 而后,将 u 最近的 K 个评分中与电影 q 相似的、且本身评分较低(<3)的电 影个数记为 recount,计算 lgmax{recount,1}作为电影 q 的“削弱因子”,意义在 于电影 q 与 u 的最近 K 个评分中的 n 个低评分(<3)电影相似,则电影 q 的优先 级被削减 lgmax{incount,1}。如果电影 q 与 u 的最近 K 个评分中相似的低评分电 影越多,也就是说 n 越大,则电影 q 更不应该被推荐,所以推荐优先级被减弱的 幅度较大;如果电影 q 与 u 的最近 K 个评分中相似的低评分电影越少,也就是 n 越 小,则推荐优先级被减弱的幅度较小;

​ 最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的 q 电 影对于 u 的推荐优先级。在计算完每个候选电影 q 的 后,将生成一组<电影 q 的 ID, q 的推荐优先级>的列表 updatedList:

而在本次为用户 u 实时推荐之前的上一次实时推荐结果 Rec 也是一组<电影 m,m 的推荐优先级>的列表,其大小也为 K:

接下来,将 updated_S 与本次为 u 实时推荐之前的上一次实时推荐结果 Rec进行基于合并、替换形成新的推荐结果 NewRec:

​ 其中,i 表示 updated_S 与 Rec 的电影集合中的每个电影,topK 是一个函数, 表示从 Rec updated _ S 中选择出最大的 K 个电影,cmp = 表示 topK 函数 将推荐优先级 值最大的 K 个电影选出来。最终,NewRec 即为经过用户 u 对电 影 p 评分后触发的实时推荐得到的最新推荐结果。

总之,实时推荐算法流程流程基本如下:
(1)用户 u 对电影 p 进行了评分,触发了实时推荐的一次计算;
(2)选出电影 p 最相似的 K 个电影作为集合 S;
(3)获取用户 u 最近时间内的 K 条评分,包含本次评分,作为集合 RK;
(4)计算电影的推荐优先级,产生<qID,>集合 updated_S;

将 updated_S 与上次对用户 u 的推荐结果 Rec 利用公式(4-4)进行合并,产生新的推荐结果 NewRec;作为最终输出。
我们在 recommender 下新建子项目 StreamingRecommender,引入 spark、scala、mongo、redis 和 kafka 的依赖

<dependencies>
    <!-- Spark 的依赖引入 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
    </dependency>
    <!-- 引入 Scala -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
    <!-- 加入 MongoDB 的驱动 -->
    <!-- 用于代码方式连接 MongoDB -->
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah-core_2.11</artifactId>
        <version>${casbah.version}</version>
    </dependency>
    <!-- 用于 Spark 和 MongoDB 的对接 -->
    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>${mongodb-spark.version}</version>
    </dependency>
    <!-- redis -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>
    <!-- kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
</dependencies>

代码中首先定义样例类和一个连接助手对象(用于建立 redis 和 mongo 连接),
并在 StreamingRecommender 中定义一些常量:
src/main/scala/com.bigworldxld.streaming/StreamingRecommender.scala

package com.bigworldxld.streaming
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import kafka.Kafka
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
/**
  * @author bigworldxld
  * @create 2022-04-15 11:51
  */

// 定义连接助手对象,序列化
object ConnHelper extends Serializable{
  lazy val jedis = new Jedis("hadoop99")
  lazy val mongoClient = MongoClient( MongoClientURI("mongodb://hadoop99:27017/recommender") )
}

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义基于预测评分的用户推荐列表
case class UserRecs( uid: Int, recs: Seq[Recommendation] )

// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
object StreamingRecommender {
  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop99:27017/recommender",
      "mongo.db" -> "recommender",
      "kafka.topic" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")

    // 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // 拿到streaming context
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2))    // batch duration

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 加载电影相似度矩阵数据,把它广播出去
    val simMovieMatrix = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRecs]
      .rdd
      .map{ movieRecs => // 为了查询相似度方便,转换成map
        (movieRecs.mid, movieRecs.recs.map( x=> (x.mid, x.score) ).toMap )
      }.collectAsMap()

    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)

    // 定义kafka连接参数
    val kafkaParam = Map(
      "bootstrap.servers" -> "hadoop99:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"
    )
    // 通过kafka创建一个DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam )
    )

    // 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流
    val ratingStream = kafkaStream.map{
      msg =>
        val attr = msg.value().split("\\|")
        ( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )
    }

    // 继续做流式处理,核心实时算法部分
    ratingStream.foreachRDD{
      rdds => rdds.foreach{
        case (uid, mid, score, timestamp) => {
          println("rating data coming! >>>>>>>>>>>>>>>>")

          // 1. 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]
          val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )

          // 2. 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid]
          val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )

          // 3. 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
          val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )

          // 4. 把推荐数据保存到mongodb
          saveDataToMongoDB( uid, streamRecs )
        }
      }
    }
    // 开始接收和处理数据
    ssc.start()

    println(">>>>>>>>>>>>>>> streaming started!")

    ssc.awaitTermination()

  }

  // redis操作返回的是java类,为了用map操作需要引入转换类
  import scala.collection.JavaConversions._

  def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
    // 从redis读取数据,用户评分数据保存在 uid:UID 为key的队列里,value是 MID:SCORE
    jedis.lrange("uid:" + uid, 0, num-1)
      .map{
        item => // 具体每个评分又是以冒号分隔的两个值
          val attr = item.split("\\:")
          ( attr(0).trim.toInt, attr(1).trim.toDouble )
      }
      .toArray
  }

  /**
    * 获取跟当前电影做相似的num个电影,作为备选电影
    * @param num       相似电影的数量
    * @param mid       当前电影ID
    * @param uid       当前评分用户ID
    * @param simMovies 相似度矩阵
    * @return          过滤之后的备选电影列表
    */
  def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
                     (implicit mongoConfig: MongoConfig): Array[Int] ={
    // 1. 从相似度矩阵中拿到所有相似的电影
    val allSimMovies = simMovies(mid).toArray

    // 2. 从mongodb中查询用户已看过的电影
    val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
      .find( MongoDBObject("uid" -> uid) )
      .toArray
      .map{
        item => item.get("mid").toString.toInt
      }

    // 3. 把看过的过滤,得到输出列表
    allSimMovies.filter( x=> ! ratingExist.contains(x._1) )
      .sortWith(_._2>_._2)
      .take(num)
      .map(x=>x._1)
  }

  def computeMovieScores(candidateMovies: Array[Int],
                         userRecentlyRatings: Array[(Int, Double)],
                         simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={
    // 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    // 定义一个HashMap,保存每一个备选电影的增强减弱因子
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()

    for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){
      // 拿到备选电影和最近评分电影的相似度
      val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )

      if(simScore > 0.7){
        // 计算备选电影的基础推荐得分
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
        if( userRecentlyRating._2 > 3 ){//getOrDefault:如果不存在,设置默认值为0
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
        } else{
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    // 根据备选电影的mid做groupby,根据公式去求最后的推荐评分
    scores.groupBy(_._1).map{
      // groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] )
      case (mid, scoreList) =>
        ( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray.sortWith(_._2>_._2)
  }

  // 获取两个电影之间的相似度
  def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int,
    scala.collection.immutable.Map[Int, Double]]): Double ={

    simMovies.get(mid1) match {
      case Some(sims) => sims.get(mid2) match {
        case Some(score) => score
        case None => 0.0
      }
      case None => 0.0
    }
  }

  // 求一个数的对数,利用换底公式,底数默认为10
  def log(m: Int): Double ={
    val N = 10
    math.log(m)/ math.log(N)
  }

  def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
    // 定义到StreamRecs表的连接
    val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)

    // 如果表中已有uid对应的数据,则删除
    streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
    // 将streamRecs数据存入表中
    streamRecsCollection.insert( MongoDBObject( "uid"->uid,
      "recs"-> streamRecs.map(x=>MongoDBObject( "mid"->x._1, "score"->x._2 )) ) )
  }
}

5.3 实时推荐算法的实现

实时推荐算法的前提:

  1. 在 Redis 集群中存储了每一个用户最近对电影的 K 次评分。实时算法可以快速获取。
  2. 离线推荐算法已经将电影相似度矩阵提前计算到了 MongoDB 中。
  3. Kafka 已经获取到了用户实时的评分数据。
    算法过程如下:
    实时推荐算法输入为一个评分<userId, movieId, rate,timestamp>,而执行的核心
    内容包括:获取 userId 最近 K 次评分、获取 movieId 最相似 K 个电影、计算候选电影的推荐优先级、更新对 userId 的实时推荐结果。

5.3.1 获取用户的 K 次最近评分

业务服务器在接收用户评分的时候,默认会将该评分情况以 userId, movieId, rate, timestamp 的格式插入到 Redis 中该用户对应的队列当中,在实时算法中,只需要通过 Redis 客户端获取相对应的队列内容即可。

5.3.2 获取当前电影最相似的 K 个电影

在 离线算 法中 ,已经预 先将电 影的 相似度矩 阵进行 了计 算 ,所以 每个电 影movieId 的最相似的 K 个电影很容易获取:从 MongoDB 中读取 MovieRecs 数据,从 movieId 在 simHash 对应的子哈希表中获取相似度前 K 大的那些电影。输出是数 据 类 型 为 Array[Int]的 数 组 , 表 示 与 movieId 最 相 似 的 电 影 集 合 , 并 命 名 为candidateMovies 以作为候选电影集合。

5.3.3 电影推荐优先级计算

对于候选电影集合 simiHash 和 userId 的最近 K 个评分 recentRatings

其中,getMovieSimScore 是取候选电影和已评分电影的相似度,而 log 是对数运算,这里实现为取 10 的对数(常用对数):

5.3.4 将结果保存到 mongoDB

saveRecsToMongoDB 函数实现了结果的保存

5.3.5 更新实时推荐结果

​ 当计算出候选电影的推荐优先级的数组 updatedRecommends后,这 个数组将被发送到 Web 后台服务器,与后台服务器上 userId 的上次实时推荐结果 recentRecommends进行合并、替换并选出优先级 E 前 K 大的电影作为 本次新的实时推荐。具体而言:

​ a.合并:将 updatedRecommends 与 recentRecommends 并集合成为一个新的<movieId, E>数组;
​ b.替换(去重):当 updatedRecommends 与recentRecommends 有重复的电影movieId 时,recentRecommends 中 movieId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的 updatedRecommends 的 movieId 的推荐优先级;
​ c.选取 TopK:在合并、替换后的<movieId, E>数组上,根据每个 movie 的推荐优先级,选择出前 K 大的电影,作为本次实时推荐的最终结果。

初步测试:

  1. 开启Redis,在Redis中添加uid:1和uid:2的两条评分数据
  1. 开启Mongodb,zookeeper,Kafka进程,启动kafka的topic为recommend的生产者
/usr/local/mongodb/bin/mongod
-config /usr/local/mongodb/data/mongodb.conf
bin/zkServer.sh start
bin/kafka-server-start.sh daemon./config/server.properties
 ./bin/kafka-console-producer.sh --broker-list hadoop99:9092 --topic recommender
  1. 在开启的生产者中输入一条uid为1的数据
  1. 执行StreamingRecommender.scala,查看mongodb中是否产生成StreamRecs,查看生成的uid:1的相似推荐电影mid和score

可能出现redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: connect错误

错误排查:

//1.1 检查reids是否启动
ps -aux|grep redis

// 1.2 检查虚拟机防火墙是否关闭
 service firewalld status

//关闭防火墙:
service firewalld stop

// 1.3 redis保护模式必须关闭
vim /myredis/redis.conf

 注释bind 127.0.0.1
protected-mode yes改为no

//关闭redis
redis-server shutdown

//重启redis服务
redis-server /myredis/redis.conf

文章作者: 读序
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 读序 !
  目录