第11部分 测试实时系统联调
我们的系统实时推荐的数据流向是:业务系统 -> 日志 -> flume 日志采集 -> kafka streaming 数据清洗和预处理 -> spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。
1 启动实时系统的基本组件
启动实时推荐系统 StreamingRecommender以及 mongodb、redis
2 启动 zookeeper
bin/zkServer.sh start
3 启动 kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
4 启动kafkaStream的Application
出现问题:
KafkaStream 消费log出现 has invalid (negative) timestamp 异常详解
Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(XXXX)has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
分析:
用recommender为消费者,log为生产者,kafkastream消费topic数据时候出现以上异常。以为log的时间戳格式不对(log的默认时间戳是10位时间戳),改了时间戳格式后还是异常。json和influx两种格式都报此错误,但是console-producer.sh 生成的数据就不报异常,只要符合json和influx格式就会报错
原因为,log发送过来的数据没有时间戳,而我的kafka为0.11.0.0版本,所以报以上异常
解决方案:
在com.bigworldxld.kafkastream包下新建MyEventTimeExtractor.class
package com.bigworldxld.kafkastream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
/**
* @author bigworldxld
* @create 2022-04-16 22:23
*/
public class MyEventTimeExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
return 0;
}
}
并在Application.class中添加配置
settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,MyEventTimeExtractor.class.getName());
问题解决
5 启动 flume:
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
6 启动业务系统后台
将业务代码加入系统中。注意在 src/main/resources/ 下的 log4j.properties 中,log4j.appender.file.File 的值应该替换为自己windows下的日志目录
启动业务系统后台,访问 localhost:8088/index.html;
注册两个用户后,redis中出现两个对应的uid
出现问题:
hadoop99的flume无法获取windows的idea产生的日志文件信息,导致点击评分无法实现StreamingRecommender和kafkaStream的数据接收
分析,开发时一般的平台都是windows,但windows对开发极其不友好,一般都会在本地开启虚拟机,安装上linux环境进行项目的部署测试。需要在IDEA中配置SFTP远程hadoop99服务器系统,来将businessServer产生的日志文件自动实时同步到hadoop99指定的目录下给flume采集采集日志信息,但是要注意安装的IDE必须是专业版的,社区版的IDE是没有这个代码同步功能。
解决问题:
前提条件:
在windows与Hadoop99系统下都有着项目的同一份代码。
Hadoop99系统项目代码地址:/root/MovieRecommendSystem
windows系统项目地址:E:\idea\project\MovieRecommendSystem
这两个文件夹里的项目文件要完全一样
同时,修改flume的config的log-kafka.properties
agent.sources.exectail.command = tail -f /root/MovieRecommendSystem/businessServer/src/main/log/agent.log
1、打开windows主机的idea(我这里是PhpStorm),打开上面windows系统项目地址:E:\idea\project\MovieRecommendSystem,然后选择tools->deployment->brower remote host。
2、点击右上方的三个点,新建一个连接。
3、选择添加服务器连接的类型,然后依次填写服务器信息:这里选择SFTP
4、切换到Mapping面板,两个路径都要一样
5、代码修改后自动上传配置:
然后在本地项目新建文件、修改文件、删除文件等操作,都会自动实时同步到Linux系统上了。
6、运行程序搜集日志后,在agent.log,右键Upload to hadoop99,加快时实同步
7 点击某个电影进行评分,查看实时推荐列表是否会发生变化。
后台变化:
前台变化:
搜索电影
添加标签
点击5颗星后,后台自动推荐相关内容