大数据技术之电影推荐系统5


第 6 部分 冷启动问题处理

​ 整个推荐系统更多的是依赖于用于的偏好信息进行电影的推荐,那么就会存在一个问题,对于新注册的用户是没有任何偏好信息记录的,那这个时候推荐就会出现问题,导致没有任何推荐的项目出现。
​ 处理这个问题一般是通过当用户首次登陆时,为用户提供交互式的窗口来获取用户对于物品的偏好。
在本项目中,当用户第一次登陆的时候,系统会询问用户对于影片类别的偏好。

​ 当获取用户的偏好之后,对应于需要通过用户偏好信息获取的推荐结果,则更改为通过 对影片的类型的偏好的推荐。

第 7 部分 基于内容的推荐服务

7.1 基于内容的推荐服务

​ 原始数据中的 tag 文件,是用户给电影打上的标签,这部分内容想要直接转成 评分并不容易,不过我们可以将标签内容进行提取,得到电影的内容特征向量,进 而可以通过求取相似度矩阵。这部分可以与实时推荐系统直接对接,计算出与用户 当前评分电影的相似电影,实现基于内容的实时推荐。为了避免热门标签对特征提 取的影响,我们还可以通过 TF-IDF 算法对标签的权重进行调整,从而尽可能地接近 用户偏好。

7.2 基于内容推荐的实现

在 recommender 下新建 module为ContentRecommender,主要用来做内容的推荐处理。pom.xml 文件需要引入依赖:

<dependencies>

        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>jblas</artifactId>
            <version>${jblas.version}</version>
        </dependency>

        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
        </dependency>

        <!-- 引入Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <!-- 加入MongoDB的驱动 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>

在 src/main/scala下新建 java 类 com.bigworldxld.content.ContentRecommender

package com.bigworldxld.content
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
/**
  * @author bigworldxld
  * @create 2022-04-15 15:19
  */
// 需要的数据源是电影内容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
                 shoot: String, language: String, genres: String, actors: String, directors: String)

case class MongoConfig(uri:String, db:String)

// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )

// 定义电影内容信息提取出的特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
object ContentRecommender {
  // 定义表名和常量
  val MONGODB_MOVIE_COLLECTION = "Movie"

  val CONTENT_MOVIE_RECS = "ContentMovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop99:27017/recommender",
      "mongo.db" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")

    // 创建一个SparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    // 加载数据,并作预处理
    val movieTagsDF = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Movie]
      .map(
        // 提取mid,name,genres三项作为原始内容特征,分词器默认按照空格做分词
        x => ( x.mid, x.name, x.genres.map(c=> if(c=='|') ' ' else c) )
      )
      .toDF("mid", "name", "genres")
      .cache()

    // 核心部分: 用TF-IDF从内容信息中提取电影特征向量

    // 创建一个分词器,默认按空格分词
    val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")

    // 用分词器对原始数据做转换,生成新的一列words
    val wordsData = tokenizer.transform(movieTagsDF)

    // 引入HashingTF工具,可以把一个词语序列转化成对应的词频
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
    val featurizedData = hashingTF.transform(wordsData)

    // 引入IDF工具,可以得到idf模型
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    // 训练idf模型,得到每个词的逆文档频率
    val idfModel = idf.fit(featurizedData)
    // 用模型对原数据进行处理,得到文档中每个词的tf-idf,作为新的特征向量
    val rescaledData = idfModel.transform(featurizedData)

    //    rescaledData.show(truncate = false)

    val movieFeatures = rescaledData.map(
      row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
    )
      .rdd
      .map(
        x => ( x._1, new DoubleMatrix(x._2) )
      )
    movieFeatures.collect().foreach(println)

    // 对所有电影两两计算它们的相似度,先做笛卡尔积
    val movieRecs = movieFeatures.cartesian(movieFeatures)
      .filter{
        // 把自己跟自己的配对过滤掉
        case (a, b) => a._1 != b._1
      }
      .map{
        case (a, b) => {
          val simScore = this.consinSim(a._2, b._2)
          ( a._1, ( b._1, simScore ) )
        }
      }
      .filter(_._2._2 > 0.6)    // 过滤出相似度大于0.6的
      .groupByKey()
      .map{
        case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
      }
      .toDF()
    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", CONTENT_MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }
  // 求向量余弦相似度
  def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
    movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
  }
}

用TF-IDF从内容信息中提取电影特征向量,feature为稀疏向量:

将movieFeatures特征向量,转换为稠密矩阵

基于以上思想,加入 TF-IDF 算法的求取电影特征向量的核心代码如下:

// 载入电影数据集
val movieTagsDF = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.map(x => (x.mid, x.name, x.genres.map(c => if(c == '|') ' ' else c)))
.toDF("mid", "name", "genres").cache()
// 实例化一个分词器,默认按空格分
val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")
// 用分词器做转换
val wordsData = tokenizer.transform(movieTagsDF)
// 定义一个 HashingTF 工具
val hashingTF = new 
HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
// 用 HashingTF 做处理
val featurizedData = hashingTF.transform(wordsData)
// 定义一个 IDF 工具
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 将词频数据传入,得到 idf 模型(统计文档)
val idfModel = idf.fit(featurizedData)
// 用 tf-idf 算法得到新的特征矩阵
val rescaledData = idfModel.transform(featurizedData)
// 从计算得到的 rescaledData 中提取特征向量
val movieFeatures = rescaledData.map{
    case row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
}
.rdd
.map(x => {
    (x._1, new DoubleMatrix(x._2) )
})

​ 然后通过电影特征向量进而求出相似度矩阵,就可以为实时推荐提供基础,得 到用户推荐列表了。可以看出,基于内容和基于隐语义模型,目的都是为了提取出 物品的特征向量,从而可以计算出相似度矩阵。而我们的实时推荐系统算法正是基 于相似度来定义的。

第8部分 构建 Kafka Streaming 程序

在 recommender 下新建 module,KafkaStream,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml 文件需要引入依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>
</dependencies>
<build>
    <finalName>kafkastream</finalName>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <mainClass>com.bigworldxld.kafkastream.Application</mainClass>
                    </manifest>
                </archive>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在 src/main/java 下新建 java 类com.bigworldxld.kafkastream.Application

package com.bigworldxld.kafkastream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;

import java.util.Properties;
/**
 * @author bigworldxld
 * @create 2022-04-15 13:46
 */
public class Application {
    public static void main(String[] args) {
        String brokers = "hadoop99:9092";
        String zookeepers = "hadoop99:2181";

        // 输入和输出的topic
        String from = "log";
        String to = "recommender";

        // 定义kafka streaming的配置
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);

        // 创建 kafka stream 配置对象
        StreamsConfig config = new StreamsConfig(settings);

        // 创建一个拓扑建构器
        TopologyBuilder builder = new TopologyBuilder();

        // 定义流处理的拓扑结构
        builder.addSource("SOURCE", from)
                .addProcessor("PROCESSOR", ()->new LogProcessor(), "SOURCE")
                .addSink("SINK", to, "PROCESSOR");

        KafkaStreams streams = new KafkaStreams( builder, config );

        streams.start();

        System.out.println("Kafka stream started!>>>>>>>>>>>");
    }
}

这个程序会将 topic 为“log”的信息流获取来做处理,并以“recommender”为 新的 topic 转发出去。 流处理程序LogProcess.java

package com.bigworldxld.kafkastream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
 * @author bigworldxld
 * @create 2022-04-15 13:47
 */
public class LogProcessor implements Processor<byte[], byte[]>{
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
    }

    @Override
    public void process(byte[] dummy, byte[] line) {
        // 把收集到的日志信息用string表示
        String input = new String(line);
        // 根据前缀MOVIE_RATING_PREFIX:从日志信息中提取评分数据
        if( input.contains("MOVIE_RATING_PREFIX:") ){
            System.out.println("movie rating data coming!>>>>>>>>>>>" + input);

            input = input.split("MOVIE_RATING_PREFIX:")[1].trim();
            context.forward( "logProcessor".getBytes(), input.getBytes() );
        }
    }

    @Override
    public void punctuate(long l) {

    }

    @Override
    public void close() {
    }
}

完成代码后,启动 Application。

配置 flume

在 flume 的 conf 目录下新建 log-kafka.properties,对 flume 连接 kafka 做配置:

agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail -f /root/cluster/apache-flume-1.8.0-bin/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = hadoop99:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000

启动 flume:

./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console


文章作者: 读序
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 读序 !
  目录