前段时间尝试了爬虫,就想着这些数据在我学习大数据的路上能派上什么用场,于是构思了一个小项目来练手,大概的逻辑就是尝试将数据存入分布式数仓中,然后展示

基础结构

pyhton爬虫 -> csv -> filebeat -> kafka -> spark -> hive -> mysql -> python可视化

组件

Hadoop 、Filebeat、Kafka、Spark、Hive、MySQL

第一步:爬虫编写

参考我之前写的爬虫

爬虫初体验| 62bit的秘密基地

第二步:数据接入

模拟数据实时写入脚本(生产者脚本)

写个shell脚本模拟数据实时写入

1
2
3
4
5
6
7
8
9
10
11
12
#!/bin/bash

IFS=$'\n' # 设置换行符为字段分隔符
data=($(cat /home/myproject/data/bili.csv)) # 使用 cat 命令读取文件内容,并将其赋值给 data 数组

for line in "${data[@]}"
do
echo $line >> /home/myproject/data/bilibili.csv
# sleep 1
done

echo "Done!"

搭建Kafka集群

kafka简述与集群配置_Relian哈哈的博客-CSDN博客_kafka集群配置

编写Kafka集群启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/bin/bash
case $1 in
"start"){
for i in bigdata01 bigdata02 bigdata03
do
ssh $1 "/opt/install/apache-zookeeper-3.6.2-bin/bin/zkServer.sh start"
echo "$1 zookeeper start ..."
done

for i in bigdata01 bigdata02 bigdata03
do
ssh $1 "/opt/install/kafka_2.12-2.6.3/bin/kafka-server-start.sh -daemon /opt/install/kafka_2.12-2.6.3/config/server.properties"
echo "$1 kafka start ..."
done
}
;;
"stop"){
for i in bigdata01 bigdata02 bigdata03
do
ssh $1 "/opt/install/kafka_2.12-2.6.3/bin/kafka-server-stop.sh "
echo "$1 kafka stop ..."
done

for i in bigdata01 bigdata02 bigdata03
do
ssh $1 "/opt/install/apache-zookeeper-3.6.2-bin/bin/zkServer.sh stop"
echo "$1 zookeeper stop ..."
done

}
;;
esac

创建topic(测试用)

1
./bin/kafka-topics.sh --create --zookeeper bigdata01:2181,bigdata02:2181,bigdata03:2181 --topic bili

或者

1
./bin/kafka-topics.sh --create --bootstrap-server bigdata01:9092 --topic bili

这里可以指定多个分区多个副本来保证高可用

安装filebeat连接kafka

修改filebeat.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
filebeat.inputs:

- type: log

enabled: true

paths:
- /home/myproject/data/bilibili.csv

output.kafka:
hosts: ["bigdata01:9092","bigdata02:9092","bigdata03:9092"]
topic: test

测试filebeat是否能将数据传入kafka

启动集群(在kfk.sh所在目录下)

1
sh kfk.sh start

后台启动filebeat(在filebeat目录中)

1
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

以上命令的解释:linux重定向及/dev/null 2>&1详解 - 少说点话 - 博客园 (cnblogs.com)

开启kafka-console-consumer(在bigdata02)

1
/bin/kafka-console-consumer.sh --bootstrap-server bigdata02:9092 --topic bili --from-beginning

启动模拟数据实时写入的shell脚本(在bigdata01)

1
sh simulate_data_access.sh

可以在bigdata02中看到数据,但是这个好像把csv的第一行也当成数据给生产了。。

ppVpjw8.png

遇到的坑

zookeeper集群启动有时候会失败

有时候会起不来,发现zookeeper启动需要一些时间

在脚本中加入sleep 5后就可以稳定启动了

header被当作数据

最后查看消息时发现csv文件的第一行被当成数据给生产了于是我又重建了topic,删除了数据源的第一行,重新生产数据,正常了。