Kafka简介与安装使用
# 一、kafka简介
Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等。
- 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反 馈,比如报警和报告。
# 二、快速开始 (opens new window)
# 2.1、安装前环境准备
# 2.1.1、安装JDK
Kafka是用Scala语言开发的,运行在JVM上,所以需要先安装JDK (opens new window)
# 2.1.2、安装Zookeeper
kafka依赖zookeeper,所以需要先安装Zookeeper
# 2.2、安装Kafka
# 2.2.1、下载安装包
#下载2.4.1 release版本,并解压
wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz #2.11是scala的版本,2.4.1是kafka的版本
tar -zxvf kafka_2.11‐2.4.1.tgz
1
2
3
2
3
# 2.2.2、修改配置文件config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://node0:9092 #kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=node0:2181
1
2
3
4
5
6
7
2
3
4
5
6
7
# 2.2.3、启动服务
# 启动kafka,运行日志在logs目录的server.log文件里
bin/kafka-server-start.sh -daemon config/server.properties#后台启动,不会打印日志到控制台或者用
bin/kafka-server-start.sh config/server.properties&
# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh
ls / #查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点
# 停止kafka
bin/kafka-server-stop.sh
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
kafka在zk节点数据图
# 三、Kafka集群
由于服务器资源有限,现在我们在一台机器上同时启动三个broker实例。
# 3.1、创建配置文件并修改内容
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
1
2
2
修改配置文件vi config/server-1.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://node0:9093 #kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址
zookeeper.connect=node0:2181
1
2
3
4
5
6
7
2
3
4
5
6
7
修改配置文件vi config/server-2.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=2
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://node0:9094 #kafka的消息存储文件
log.dir=/usr/local/data/kafka-logs-2
#kafka连接zookeeper的地址
zookeeper.connect=node0:2181
1
2
3
4
5
6
7
2
3
4
5
6
7
# 3.2、启动节点
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
#查看zookeeper确认集群节点是否都注册成功
[zk: localhost:2181(CONNECTED) 13] ls /brokers/ids
[0,1,2]
1
2
3
4
5
6
2
3
4
5
6
# 四、Kafka基本使用
# 4.1、创建主题
#创建一个test-topic的主题
bin/kafka-topics.sh --create --zookeeper node0:2181 --replication-factor 1 --partitions 2 --topic test-topic
#查看所有主题
bin/kafka-topics.sh --list --zookeeper node0:2181
#增加分区数量(目前kafka不支持减少分区)
bin/kafka-topics.sh -alter --partitions 3 --zookeeper node0:2181 --topic test-topic
#删除主题
bin/kafka-topics.sh --delete --topic test-topic --zookeeper node0:2181
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
查看某个topic的情况
bin/kafka-topics.sh --describe --topic test-topic --zookeeper node0:2181
1
第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息
Leader
节点负责给定partition的所有读写请求,同一个主题不同分区leader副本一般不一样(为了容灾)Replicas
表示某个partition在哪几个broker上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。Isr
是replicas的一个子集,它只列出当前还存活着的,并且已同步备份了该partition的节点。
# 4.2、发送消息
bin/kafka-console-producer.sh --broker-list node0:9092,node1:9093,node2:9094 --topic test-topic
>this is a msg1
>this is a msg2
1
2
3
2
3
# 4.3、消费消息
#默认是消费最新的消息
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic
#消费之前的消息`--from-beginning`
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic --from-beginning
#消费多主题
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --whitelist "test-topic|test-2"
#单播消费:一条消息只能被某一个消费者消费的模式,只需让所有消费者在同一个消费组里即可
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic --consumer-property group.id=testGroup1
#多播消费:一条消息能被多个消费者消费的模式,类似publish-subscribe模式,只要保证这些消费者属于不同的消费组即可
bin/kafka-console-consumer.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --topic test-topic --consumer-property group.id=testGroup2
#查看消费组名
bin/kafka-consumer-groups.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
查看消费组的消费偏移量
bin/kafka-consumer-groups.sh --bootstrap-server node0:9092,node1:9093,node2:9094 --describe --group testGroup
1
current-offset
:当前消费组的已消费偏移量
log-end-offset
:主题对应分区消息的结束偏移量(HW)
lag
:当前消费组未消费的消息数
# 五、kafka可视化管理工具
# 5.1、下载解压
unzip kafka-manager-1.3.3.7.zip
cd kafka-manager-1.3.3.7
1
2
2
# 5.2、修改配置文件vi conf/application.conf
#kafka-manager.zkhosts="localhost:2181" ##注释这一行,下面换成zk集群地址
kafka-manager.zkhosts="node0:2181,node1:2181,node2:2181"
1
2
2
# 5.3、启动
#bin/kafka-manager
#kafka-manager 默认的端口是9000,可通过 -Dhttp.port,指定端口; -Dconfig.file=conf/application.conf指定配置文件:
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &
1
2
3
4
2
3
4
更多安装使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html (opens new window)