大数据技术之电影推荐系统7


第11部分 测试实时系统联调

我们的系统实时推荐的数据流向是:业务系统 -> 日志 -> flume 日志采集 -> kafka streaming 数据清洗和预处理 -> spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。

1 启动实时系统的基本组件

启动实时推荐系统 StreamingRecommender以及 mongodb、redis

2 启动 zookeeper

bin/zkServer.sh start

3 启动 kafka

bin/kafka-server-start.sh -daemon ./config/server.properties

4 启动kafkaStream的Application

出现问题:

KafkaStream 消费log出现 has invalid (negative) timestamp 异常详解

Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread)

org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(XXXX)has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
分析:

用recommender为消费者,log为生产者,kafkastream消费topic数据时候出现以上异常。以为log的时间戳格式不对(log的默认时间戳是10位时间戳),改了时间戳格式后还是异常。json和influx两种格式都报此错误,但是console-producer.sh 生成的数据就不报异常,只要符合json和influx格式就会报错

原因为,log发送过来的数据没有时间戳,而我的kafka为0.11.0.0版本,所以报以上异常

解决方案:

在com.bigworldxld.kafkastream包下新建MyEventTimeExtractor.class

package com.bigworldxld.kafkastream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
 * @author bigworldxld
 * @create 2022-04-16 22:23
 */
public class MyEventTimeExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        return 0;
    }
}

并在Application.class中添加配置

settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,MyEventTimeExtractor.class.getName());

问题解决

5 启动 flume:

./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console

6 启动业务系统后台

将业务代码加入系统中。注意在 src/main/resources/ 下的 log4j.properties 中,log4j.appender.file.File 的值应该替换为自己windows下的日志目录
启动业务系统后台,访问 localhost:8088/index.html;

注册两个用户后,redis中出现两个对应的uid

出现问题:

hadoop99的flume无法获取windows的idea产生的日志文件信息,导致点击评分无法实现StreamingRecommender和kafkaStream的数据接收

分析,开发时一般的平台都是windows,但windows对开发极其不友好,一般都会在本地开启虚拟机,安装上linux环境进行项目的部署测试。需要在IDEA中配置SFTP远程hadoop99服务器系统,来将businessServer产生的日志文件自动实时同步到hadoop99指定的目录下给flume采集采集日志信息,但是要注意安装的IDE必须是专业版的,社区版的IDE是没有这个代码同步功能。

解决问题:

前提条件:

在windows与Hadoop99系统下都有着项目的同一份代码。

Hadoop99系统项目代码地址:/root/MovieRecommendSystem

windows系统项目地址:E:\idea\project\MovieRecommendSystem

这两个文件夹里的项目文件要完全一样

同时,修改flume的config的log-kafka.properties

agent.sources.exectail.command = tail -f /root/MovieRecommendSystem/businessServer/src/main/log/agent.log

1、打开windows主机的idea(我这里是PhpStorm),打开上面windows系统项目地址:E:\idea\project\MovieRecommendSystem,然后选择tools->deployment->brower remote host。

2、点击右上方的三个点,新建一个连接。

3、选择添加服务器连接的类型,然后依次填写服务器信息:这里选择SFTP

4、切换到Mapping面板,两个路径都要一样

5、代码修改后自动上传配置:

然后在本地项目新建文件、修改文件、删除文件等操作,都会自动实时同步到Linux系统上了。

6、运行程序搜集日志后,在agent.log,右键Upload to hadoop99,加快时实同步

7 点击某个电影进行评分,查看实时推荐列表是否会发生变化。

后台变化:

前台变化:

搜索电影

添加标签

点击5颗星后,后台自动推荐相关内容


文章作者: 读序
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 读序 !
  目录