简单搭建 Apache Kafka 分布式消息系统

早先都是用的基于 JMS 规范的消息系统, 像 ActiveMQ, IBM MQSeries 等. 随着互联网的发展, 大约是要适应当今大数据, 高可用性, 高效的需求, 于是诞生了 Apache Kafka 这一新时代的分布式消息系统. Apache Kafka 也是发布-订阅式的消息系统, 用 Scala 语言写的, 它最初由 LinedIn 开发并贡献到 Apache 基金会.

Kafka 的集群实质是依赖于 ZooKeeper 的集群来协同管理, 所以这里可以参照之前的 ZooKeeper 快速搭建与体验 来搭建一个 ZooKeeper 集群(其实这是一个伪集群, 实际产品中应该把 ZooKeeper 集群分布在不同的机器上).

本文主要是参考官方的 Kafka Quickstart 来快速体验 Kafka 消息系统, 下载的 Kafka 自带了 ZooKeeper, 默认只启动了一个  ZooKeeper 节点. 如需 ZooKeeper 集群可以不依赖于 Kafka 自带的 ZooKeeper 而单独搭建.

下面开始演示建立一个最简单的 Kafka 系统

1,  下载并解压

当前最新版本是 0.10.01, 我选择的是 Scala 2.11 的 kafka_2.11-0.10.0.1.tgz, 下载并解压

> tar xzvf kafka_2.11-0.10.0.1.tgz
> cd kafka_2.11-0.10.0.1

接下来的命令都从 kafka_2.11-0.10.0.1 目录执行. 配置和可执行文件分别在 config 和  bin 目录

2, 启动 Server

包括要启动 ZooKeeper 和 Kafka, Kafka 依赖于 ZooKeeper, 所以 ZooKeeper 须先行启动.

启动 ZooKeeper

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2016-10-06 15:06:35,515] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
.....................
[2016-10-06 15:07:05,603] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-10-06 15:07:07,207] INFO Accepted socket connection from /0:0:0:0:0:0:0:1:65185 (org.apache.zookeeper.server.NIOServerCnxnFactory)
....................

从控制台输出或是查看 config/zookeeper.properties 文件, 可以发现 ZooKeeper 启动在端口号 2181 上, 如果带上参数 -daemon 会在后台启动. 可以执行 bin/zookeeper-server-stop.sh 来关闭 ZooKeeper.

启动 Kafka

> bin/kafka-server-start.sh config/server.properties
[2016-10-06 15:16:46,169] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1579b9b40dc0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
...............
[2016-10-06 15:16:46,593] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-10-06 15:16:46,596] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer)
..............
[2016-10-06 15:16:46,695] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
..............
[2016-10-06 15:17:46,959] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
.............

也可以加 -daemon 在后台启动, 从上面的输出可以看出 Kafka 连接到了 127.0.0.1:2181 上的 ZooKeeper, 并创建了 znode /controller 和 /brokers/ids/0. 如果需要让 Kafka 连接到集群中的 ZooKeeper, 可以修改 config/server.properties 文件, 其中的

zookeeper.connect=localhost:2181

改成像

zookeeper.connect=localhost:2181,192.168.0.1:2181,192.168.0.2:2182

bin/kafka-server-stop.sh 关闭 Kafka 服务.

这时候可以用 ZooKeeper 和客户端 zkCli.sh 或 bin/zookeeper-shell.sh 连接 ZooKeeper 服务, 查看有些什么 znode

> zkCli.sh -server localhost:2181
# 或 bin/zookeeper-shell.sh localhost:2181
[zk: localhost:2181(CONNECTED) 7] ls /
[controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]
[zk: localhost:2181(CONNECTED) 8] ls /brokers/topics
[]
[zk: localhost:2181(CONNECTED) 10] get /controller
{"version":1,"brokerid":0,"timestamp":"1475785006684"}
cZxid = 0x62
........

现在还没有 topic, 这是一下步的事情

3, 创建主题

我们将创建一个名为 test 的 topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

这个命令也可以列出所以的 topic

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

Topic 是一个活生生的存在于 ZooKeeper 中的 znode, 所以在 ZooKeeper 客户端 zkCli.sh 中能够找到

[zk: localhost:2181(CONNECTED) 13] ls /brokers/topics
[test]

4, 发送消息

我们还是用 Kafka 自带的命令来演示

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

5, 消费消息

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

我们看到在前一步发送的消息这里可以读取到.

如果我们把 producer 和 consumer 窗口同时打开, 那就是一个单工的实时聊天程序了

kafka-talk

字符集应该是 UTF-8 了, 中文得了毫无意外的支持. 要是每一端同时实现了 producer 和 consumer 的话, 一个实时全双工的聊天工具就这么简单.

消息上跑通了, 用程序代码来写 Producer 和 Consumer 就不难了.

链接: 

  1. Kafka实战-Kafka Cluster
  2. Kafka部署与代码实例

类别: Java/JEE. 标签: . 阅读(213). 订阅评论. TrackBack.

Leave a Reply

Be the First to Comment!

avatar
wpDiscuz