作者选择了 自由和开源基金作为 写给捐款计划的一部分接受捐款。
介绍
Apache Kafka是一个流行的分布式消息经纪商,旨在高效地处理大量的实时数据。卡夫卡集群不仅高度可扩展和耐错误,而且与其他消息经纪商如 ActiveMQ和 RabbitMQ相比,其传输量也更高。
发布/订阅消息系统允许一个或多个生产商发布消息,而不考虑消费者的数量或他们将如何处理消息。订阅客户自动通知更新和创建新消息。
在本教程中,您将安装并在Ubuntu 18.04上使用Apache Kafka 2.1.1。
前提条件
要跟上,你需要:
- 一个 Ubuntu 18.04 服务器和一个非 root 用户具有 sudo 特权。如果您没有一个非 root 用户,请按照本指南所述步骤(https://andsky.com/tech/tutorials/initial-server-setup-with-ubuntu-18-04)设置。
- 至少在您的服务器上安装了 4GB 的 RAM. 没有此数量的 RAM 的安装可能会导致 Kafka 服务失败,而 Java 虚拟机 (JVM)在启动过程中抛出
内存
例外。 - OpenJDK 8 安装在您的服务器上。 为了安装此版本,请遵循 这些指令在安装特定版本的 OpenJDK。 Kafka 是用 Java 编写的,所以它需要一个 JVM;然而,其启
步骤 1 — 为 Kafka 创建用户
由于Kafka可以通过网络处理请求,所以你应该为它创建一个专用用户,这样才能最大限度地减少对Ubuntu机器的损害,如果Kafka服务器受到破坏,我们将在这个步骤中创建一个专用kafka 用户,但一旦你完成了Kafka的设置,你应该创建一个不同的非根用户,在这个服务器上执行其他任务。
登录为非 root sudo 用户,使用useradd
命令创建一个名为 kafka 的用户:
1sudo useradd kafka -m
这个主目录,‘/home/kafka’,将作为我们的工作区目录来执行下面的部分中的命令。
使用passwd
设置密码:
1sudo passwd kafka
用adduser
命令将 kafka 用户添加到sudo
组中,以便它具有安装 Kafka 的依赖性所需的权限:
1sudo adduser kafka sudo
你的 kafka 用户已经准备好了. 使用su
登录这个帐户:
1su -l kafka
现在我们已经创建了卡夫卡特定的用户,我们可以继续下载和提取卡夫卡二进制。
步骤2:下载和提取卡夫卡二进制
让我们下载并将卡夫卡二进制提取到我们的 kafka 用户主目录的专用文件夹中。
首先,在/home/kafka
中创建一个名为下载
的目录,以存储下载:
1mkdir ~/Downloads
使用弯曲
下载卡夫卡二进制:
1curl "https://www.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
创建一个名为kafka
的目录,并切换到这个目录,这将是 Kafka 安装的基本目录:
1mkdir ~/kafka && cd ~/kafka
使用tar
命令提取您下载的档案:
1tar -xvzf ~/Downloads/kafka.tgz --strip 1
我们指定―条1
旗,以确保档案的内容被提取到~/kafka/
,而不是在其内部的其他目录(如~/kafka/kafka_2.11-2.1.1/
)中。
现在我们已经下载并成功提取了二进制,我们可以继续配置到Kafka以允许主题删除。
步骤 3 – 配置 Kafka 服务器
卡夫卡的默认行为不允许我们删除可发布消息的 topic、类别、群组或信号名称。
卡夫卡的配置选项在server.properties
中指定。 使用nano
或您最喜欢的编辑器打开此文件:
1nano ~/kafka/config/server.properties
让我们添加一个设置,允许我们删除Kafka主题. 将下列内容添加到文件底部:
1[label ~/kafka/config/server.properties]
2delete.topic.enable = true
保存文件,然后退出nano
。现在我们已经配置了Kafka,我们可以继续创建系统d单元文件来运行并在启动时启用它。
步骤 4 — 创建 Systemd 单元文件并启动 Kafka 服务器
在本节中,我们将为 Kafka 服务创建 systemd 单元文件,这将有助于我们以与其他 Linux 服务一致的方式执行常见的服务操作,如启动、停止和重新启动 Kafka。
Zookeeper 是 Kafka 用来管理其集群状态和配置的服务,它通常在许多分布式系统中被用作组成部分,如果您想了解更多信息,请访问官方 Zookeeper docs。
创建Zookeeper
的单元文件:
1sudo nano /etc/systemd/system/zookeeper.service
将下列单位定义输入到文件中:
1[label /etc/systemd/system/zookeeper.service]
2[Unit]
3Requires=network.target remote-fs.target
4After=network.target remote-fs.target
5
6[Service]
7Type=simple
8User=kafka
9ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
10ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target
该[单位]
部分规定 Zookeeper 需要网络和文件系统在启动之前准备好。
该[服务]
部分规定 systemd 应该使用zookeeper-server-start.sh
和zookeeper-server-stop.sh
壳文件来启动和停止服务。
接下来,创建kafka
的 systemd 服务文件:
1sudo nano /etc/systemd/system/kafka.service
将下列单位定义输入到文件中:
1[label /etc/systemd/system/kafka.service]
2[Unit]
3Requires=zookeeper.service
4After=zookeeper.service
5
6[Service]
7Type=simple
8User=kafka
9ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
10ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target
[单位]
部分规定这个单元文件取决于zookeeper.service
。这将确保zookeeper
在kafka
服务开始时自动启动。
该[服务]
部分规定 systemd 应该使用kafka-server-start.sh
和kafka-server-stop.sh
壳文件来启动和停止服务。
现在单位已经定义了,开始卡夫卡用以下命令:
1sudo systemctl start kafka
要确保服务器成功启动,请检查kafka
单元的日志日志:
1sudo journalctl -u kafka
你应该看到类似于以下的输出:
1[secondary_label Output]
2Jul 17 18:38:59 kafka-ubuntu systemd[1]: Started kafka.service.
你现在有一个卡夫卡服务器在端口9092
上听。
虽然我们已经启动了kafka
服务,如果我们要重新启动我们的服务器,它不会自动启动。
1sudo systemctl enable kafka
现在我们已经开始并启用了服务,让我们检查安装。
步骤5:测试安装
让我们发布和消耗一个 Hello World
消息,以确保卡夫卡服务器的行为是正确的。
- 一个 producer,它允许对主题发布记录和数据。
- 一个 consumer,它读取来自主题的消息和数据。
首先,创建一个名为TutorialTopic
的主题,键入:
1~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用kafka-console-producer.sh
脚本从命令行创建一个制作者,它将 Kafka 服务器的主机名、端口和主题名称作为参数。
将字符串Hello, World
发布到TutorialTopic
主题,键入:
1echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,您可以使用kafka-console-consumer.sh
脚本创建一个Kafka消费者,它预计ZooKeeper服务器的主机名和端口,以及主题名称作为参数。
下面的命令消耗了来自TutorialTopic
的消息,请注意使用从开始
的旗帜,允许消耗在消费者开始之前发布的消息:
1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您应该在您的终端中看到你好,世界
:
1[secondary_label Output]
2Hello, World
脚本将继续运行,等待更多消息发布到主题. 请自由打开一个新的终端,并开始一个生产者发布一些更多消息. 你应该能够在消费者输出中看到它们。
当您完成测试时,请按CTRL+C
来停止用户脚本. 现在我们已经测试了安装,让我们继续安装 KafkaT。
步骤 6 — 安装 KafkaT (可选)
KafkaT是Airbnb的一种工具,可以让您更轻松地查看卡夫卡集群的细节,并从命令行执行某些管理任务,因为它是卢比宝石,所以您需要Ruby来使用它。
1sudo apt install ruby ruby-dev build-essential
您现在可以使用宝石命令安装 KafkaT:
1sudo gem install kafkat
KafkaT 使用 .kafkatcfg
作为配置文件来确定您的 Kafka 服务器的安装和日志目录. 它还应该有一个指向 KafkaT 的输入到您的 ZooKeeper 实例。
创建一个名为「.kafkatcfg」的新文件:
1nano ~/.kafkatcfg
添加以下行来指定关于 Kafka 服务器和 Zookeeper 实例所需的信息:
1[label ~/.kafkatcfg]
2{
3 "kafka_path": "~/kafka",
4 "log_path": "/tmp/kafka-logs",
5 "zk_path": "localhost:2181"
6}
您现在已经准备好使用 KafkaT. 首先,这里是如何使用它来查看所有 Kafka 分区的详细信息:
1kafkat partitions
您将看到以下结果:
1[secondary_label Output]
2Topic Partition Leader Replicas ISRs
3TutorialTopic 0 0 [0] [0]
4__consumer_offsets 0 0 [0] [0]
5...
6...
您将看到TutorialTopic
,以及__consumer_offsets
,这是卡夫卡用于存储客户相关信息的内部主题,您可以安全地忽略从__consumer_offsets
开始的行。
要了解更多关于 KafkaT 的信息,请参阅其 GitHub 存储库。
步骤 7 — 设置一个多节点集群(可选)
如果你想使用更多的 Ubuntu 18.04 机器创建一个多经纪人群,你应该在每个新机器上重复步骤 1,步骤 4 和步骤 5. 此外,你应该对每个机器的 server.properties
文件进行以下更改:
- 該「broker.id」屬性的值應變更,以使其在集群中獨一無二。 該屬性可獨特地識別集群中的每個伺服器,並且可以有任何串值。 例如,「server1」、「server2」等。
- 「zookeeper.connect」屬性的值應變更,以便所有節點指向相同的 ZooKeeper 實例。 此屬性指定 Zookeeper 實例的地址,並遵循「<HOSTNAME/IP_ADDRESS>:
」格式。 例如,「203.0.113.03:2181」、「203.0.113.1:2181」等。
如果您想为您的集群拥有多个 ZooKeeper 实例,则每个节点上的zookeeper.connect
属性值应该是列出所有 ZooKeeper 实例的 IP 地址和端口号的相同、单元格分离的字符串。
第8步:限制卡夫卡用户
现在所有安装都完成了,您可以删除 kafka 用户的管理权限. 在您这样做之前,请退出并作为任何其他非 root sudo 用户重新登录。
从 sudo 组中删除 kafka 用户:
1sudo deluser kafka sudo
为了进一步提高 Kafka 服务器的安全性,请使用passwd
命令锁定 kafka 用户的密码,以确保没有人可以直接使用此帐户登录服务器:
1sudo passwd kafka -l
在此时,只有 root 或 sudo 用户可以通过键入以下命令作为kafka
登录:
1sudo su - kafka
在未来,如果你想解锁它,使用passwd
与-u
选项:
1sudo passwd kafka -u
您现在已经成功限制了 kafka 用户的管理权限。
结论
您现在可以在 Ubuntu 服务器上安全地运行 Apache Kafka. 您可以通过使用 Kafka 客户端来创建 Kafka 生产者和消费者,在您的项目中使用它。