第 6 部分 冷启动问题处理
整个推荐系统更多的是依赖于用于的偏好信息进行电影的推荐,那么就会存在一个问题,对于新注册的用户是没有任何偏好信息记录的,那这个时候推荐就会出现问题,导致没有任何推荐的项目出现。
处理这个问题一般是通过当用户首次登陆时,为用户提供交互式的窗口来获取用户对于物品的偏好。
在本项目中,当用户第一次登陆的时候,系统会询问用户对于影片类别的偏好。
当获取用户的偏好之后,对应于需要通过用户偏好信息获取的推荐结果,则更改为通过 对影片的类型的偏好的推荐。
第 7 部分 基于内容的推荐服务
7.1 基于内容的推荐服务
原始数据中的 tag 文件,是用户给电影打上的标签,这部分内容想要直接转成 评分并不容易,不过我们可以将标签内容进行提取,得到电影的内容特征向量,进 而可以通过求取相似度矩阵。这部分可以与实时推荐系统直接对接,计算出与用户 当前评分电影的相似电影,实现基于内容的实时推荐。为了避免热门标签对特征提 取的影响,我们还可以通过 TF-IDF 算法对标签的权重进行调整,从而尽可能地接近 用户偏好。
7.2 基于内容推荐的实现
在 recommender 下新建 module为ContentRecommender,主要用来做内容的推荐处理。pom.xml 文件需要引入依赖:
<dependencies>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入MongoDB的驱动 -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
在 src/main/scala下新建 java 类 com.bigworldxld.content.ContentRecommender
package com.bigworldxld.content
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix
/**
* @author bigworldxld
* @create 2022-04-15 15:19
*/
// 需要的数据源是电影内容信息
case class Movie(mid: Int, name: String, descri: String, timelong: String, issue: String,
shoot: String, language: String, genres: String, actors: String, directors: String)
case class MongoConfig(uri:String, db:String)
// 定义一个基准推荐对象
case class Recommendation( mid: Int, score: Double )
// 定义电影内容信息提取出的特征向量的电影相似度列表
case class MovieRecs( mid: Int, recs: Seq[Recommendation] )
object ContentRecommender {
// 定义表名和常量
val MONGODB_MOVIE_COLLECTION = "Movie"
val CONTENT_MOVIE_RECS = "ContentMovieRecs"
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop99:27017/recommender",
"mongo.db" -> "recommender"
)
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 创建一个SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加载数据,并作预处理
val movieTagsDF = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.map(
// 提取mid,name,genres三项作为原始内容特征,分词器默认按照空格做分词
x => ( x.mid, x.name, x.genres.map(c=> if(c=='|') ' ' else c) )
)
.toDF("mid", "name", "genres")
.cache()
// 核心部分: 用TF-IDF从内容信息中提取电影特征向量
// 创建一个分词器,默认按空格分词
val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")
// 用分词器对原始数据做转换,生成新的一列words
val wordsData = tokenizer.transform(movieTagsDF)
// 引入HashingTF工具,可以把一个词语序列转化成对应的词频
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(50)
val featurizedData = hashingTF.transform(wordsData)
// 引入IDF工具,可以得到idf模型
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 训练idf模型,得到每个词的逆文档频率
val idfModel = idf.fit(featurizedData)
// 用模型对原数据进行处理,得到文档中每个词的tf-idf,作为新的特征向量
val rescaledData = idfModel.transform(featurizedData)
// rescaledData.show(truncate = false)
val movieFeatures = rescaledData.map(
row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
)
.rdd
.map(
x => ( x._1, new DoubleMatrix(x._2) )
)
movieFeatures.collect().foreach(println)
// 对所有电影两两计算它们的相似度,先做笛卡尔积
val movieRecs = movieFeatures.cartesian(movieFeatures)
.filter{
// 把自己跟自己的配对过滤掉
case (a, b) => a._1 != b._1
}
.map{
case (a, b) => {
val simScore = this.consinSim(a._2, b._2)
( a._1, ( b._1, simScore ) )
}
}
.filter(_._2._2 > 0.6) // 过滤出相似度大于0.6的
.groupByKey()
.map{
case (mid, items) => MovieRecs( mid, items.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)) )
}
.toDF()
movieRecs.write
.option("uri", mongoConfig.uri)
.option("collection", CONTENT_MOVIE_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
spark.stop()
}
// 求向量余弦相似度
def consinSim(movie1: DoubleMatrix, movie2: DoubleMatrix):Double ={
movie1.dot(movie2) / ( movie1.norm2() * movie2.norm2() )
}
}
用TF-IDF从内容信息中提取电影特征向量,feature为稀疏向量:
将movieFeatures特征向量,转换为稠密矩阵
基于以上思想,加入 TF-IDF 算法的求取电影特征向量的核心代码如下:
// 载入电影数据集
val movieTagsDF = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",MONGODB_MOVIE_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Movie]
.map(x => (x.mid, x.name, x.genres.map(c => if(c == '|') ' ' else c)))
.toDF("mid", "name", "genres").cache()
// 实例化一个分词器,默认按空格分
val tokenizer = new Tokenizer().setInputCol("genres").setOutputCol("words")
// 用分词器做转换
val wordsData = tokenizer.transform(movieTagsDF)
// 定义一个 HashingTF 工具
val hashingTF = new
HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
// 用 HashingTF 做处理
val featurizedData = hashingTF.transform(wordsData)
// 定义一个 IDF 工具
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
// 将词频数据传入,得到 idf 模型(统计文档)
val idfModel = idf.fit(featurizedData)
// 用 tf-idf 算法得到新的特征矩阵
val rescaledData = idfModel.transform(featurizedData)
// 从计算得到的 rescaledData 中提取特征向量
val movieFeatures = rescaledData.map{
case row => ( row.getAs[Int]("mid"), row.getAs[SparseVector]("features").toArray )
}
.rdd
.map(x => {
(x._1, new DoubleMatrix(x._2) )
})
然后通过电影特征向量进而求出相似度矩阵,就可以为实时推荐提供基础,得 到用户推荐列表了。可以看出,基于内容和基于隐语义模型,目的都是为了提取出 物品的特征向量,从而可以计算出相似度矩阵。而我们的实时推荐系统算法正是基 于相似度来定义的。
第8部分 构建 Kafka Streaming 程序
在 recommender 下新建 module,KafkaStream,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml 文件需要引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<finalName>kafkastream</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.bigworldxld.kafkastream.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在 src/main/java 下新建 java 类com.bigworldxld.kafkastream.Application
package com.bigworldxld.kafkastream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
/**
* @author bigworldxld
* @create 2022-04-15 13:46
*/
public class Application {
public static void main(String[] args) {
String brokers = "hadoop99:9092";
String zookeepers = "hadoop99:2181";
// 输入和输出的topic
String from = "log";
String to = "recommender";
// 定义kafka streaming的配置
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
// 创建 kafka stream 配置对象
StreamsConfig config = new StreamsConfig(settings);
// 创建一个拓扑建构器
TopologyBuilder builder = new TopologyBuilder();
// 定义流处理的拓扑结构
builder.addSource("SOURCE", from)
.addProcessor("PROCESSOR", ()->new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESSOR");
KafkaStreams streams = new KafkaStreams( builder, config );
streams.start();
System.out.println("Kafka stream started!>>>>>>>>>>>");
}
}
这个程序会将 topic 为“log”的信息流获取来做处理,并以“recommender”为 新的 topic 转发出去。 流处理程序LogProcess.java
package com.bigworldxld.kafkastream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
* @author bigworldxld
* @create 2022-04-15 13:47
*/
public class LogProcessor implements Processor<byte[], byte[]>{
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(byte[] dummy, byte[] line) {
// 把收集到的日志信息用string表示
String input = new String(line);
// 根据前缀MOVIE_RATING_PREFIX:从日志信息中提取评分数据
if( input.contains("MOVIE_RATING_PREFIX:") ){
System.out.println("movie rating data coming!>>>>>>>>>>>" + input);
input = input.split("MOVIE_RATING_PREFIX:")[1].trim();
context.forward( "logProcessor".getBytes(), input.getBytes() );
}
}
@Override
public void punctuate(long l) {
}
@Override
public void close() {
}
}
完成代码后,启动 Application。
配置 flume
在 flume 的 conf 目录下新建 log-kafka.properties,对 flume 连接 kafka 做配置:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail -f /root/cluster/apache-flume-1.8.0-bin/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = hadoop99:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
启动 flume:
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console