作者选择了 自由和开源基金作为 写给捐款计划的一部分接受捐款。
介绍
Apache Kafka是一个流行的分布式消息经纪商,旨在处理大量的实时数据。一个Kafka集群是高度可扩展和耐错误的,并且与其他消息经纪商(如 ActiveMQ和 RabbitMQ相比,其传输量也更高。
发布/订阅消息系统允许一个或多个生产商发布消息,而不考虑消费者的数量或他们将如何处理消息。订阅客户自动通知更新和创建新消息。
在本教程中,您将安全地在 Debian 10 服务器上安装和配置 Apache Kafka 2.1.1,然后通过生成和消耗一个Hello World
消息来测试您的设置,然后您将可选地安装 KafkaT来监控 Kafka,并设置一个 Kafka 多节点集群。
前提条件
要跟上,你需要:
- 一个 Debian 10 服务器,至少有 4 GB 的 RAM 和一个具有 sudo 特权的非根用户。如果您没有一个非根用户,请遵循我们 Debian 10 初始服务器设置指南中的步骤。
- OpenJDK 11 已安装在您的服务器上。 要安装此版本,请遵循 如何在 Debian 10 上使用 Apt 安装 Java中的说明。
<$>[注]
注: 没有4GB的RAM的安装可能会导致卡夫卡服务失败,而Java虚拟机(JVM)(https://en.wikipedia.org/wiki/Java_virtual_machine)在启动时会抛出一个内存漏洞
例外。
步骤 1 — 为 Kafka 创建用户
由于Kafka可以通过网络处理请求,所以为其创建一个专用用户是最好的做法,这将最大限度地减少对您的Debian机器的损害,如果Kafka服务器受到威胁。
登录为非 root sudo 用户,使用useradd
命令创建一个名为kafka
的用户:
1sudo useradd kafka -m
-m
标志确保为用户创建一个主目录. 此主目录, /home/kafka
,将作为您的工作区目录,以便在以后执行命令。
使用passwd
设置密码:
1sudo passwd kafka
输入您想为此用户使用的密码。
接下来,用adduser
命令将 kafka 用户添加到sudo
组中,使其具有安装 Kafka 的依赖性所需的权限:
1sudo adduser kafka sudo
你的 kafka 用户已经准备好了. 使用su
登录这个帐户:
1su -l kafka
现在你已经创建了卡夫卡特定的用户,你可以继续下载和提取卡夫卡二进制。
步骤2:下载和提取卡夫卡二进制
在此步骤中,您将下载并提取 Kafka 二进制文本到您的 kafka 用户主目录中的专用文件夹中。
首先,在/home/kafka
中创建一个名为下载
的目录,以存储下载:
1mkdir ~/Downloads
接下来,使用apt-get
安装curl
,以便您可以下载远程文件:
1sudo apt-get update && sudo apt-get install curl
当被提示时,键入Y
来确认弯曲
下载。
一旦安装了‘curl’,使用它来下载卡夫卡二进制:
1curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz
创建一个名为kafka
的目录,并切换到这个目录,这将是 Kafka 安装的基本目录:
1mkdir ~/kafka && cd ~/kafka
使用tar
命令提取您下载的档案:
1tar -xvzf ~/Downloads/kafka.tgz --strip 1
您指定了--条1
旗,以确保档案的内容被提取到~/kafka/
,而不是在其内部的其他目录中,例如~/kafka/kafka_2.12-2.1.1/
。
现在你已经下载并成功提取了二进制,你可以继续配置卡夫卡以允许主题删除。
步骤 3 – 配置 Kafka 服务器
Kafka 的默认行为不会允许我们删除可发布消息的 topic、类别、群组或信号名称。
卡夫卡的配置选项在server.properties
中指定。 使用nano
或您最喜欢的编辑器打开此文件:
1nano ~/kafka/config/server.properties
让我们添加一个设置,允许我们删除Kafka主题. 将下面的突出行添加到文件底部:
1[label ~/kafka/config/server.properties]
2...
3group.initial.rebalance.delay.ms
4
5delete.topic.enable = true
保存文件,然后退出nano
。现在你已经配置了Kafka,你可以创建systemd
单元文件来运行并启用Kafka在启动。
步骤 4 — 创建 Systemd 单元文件并启动 Kafka 服务器
在本节中,您将为 Kafka 服务创建 systemd
单元文件,这将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,如启动、停止和重新启动 Kafka。
ZooKeeper 是 Kafka 用来管理其集群状态和配置的服务,它通常在分布式系统中被用作组成部分,在本教程中,您将使用 Zookeeper 来管理 Kafka 的这些方面。
首先,创建zookeeper
的单元文件:
1sudo nano /etc/systemd/system/zookeeper.service
将下列单位定义输入到文件中:
1[label /etc/systemd/system/zookeeper.service]
2[Unit]
3Requires=network.target remote-fs.target
4After=network.target remote-fs.target
5
6[Service]
7Type=simple
8User=kafka
9ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
10ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target
该[单位]
部分规定,ZooKeeper需要网络和文件系统在启动之前做好准备。
该[服务]
部分规定systemd
应使用zookeeper-server-start.sh
和zookeeper-server-stop.sh
壳文件来启动和停止服务。
接下来,为kafka
创建systemd
服务文件:
1sudo nano /etc/systemd/system/kafka.service
将下列单位定义输入到文件中:
1[label /etc/systemd/system/kafka.service]
2[Unit]
3Requires=zookeeper.service
4After=zookeeper.service
5
6[Service]
7Type=simple
8User=kafka
9ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
10ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target
[单位]
部分规定这个单元文件取决于zookeeper.service
。这将确保zookeeper
在kafka
服务开始时自动启动。
该[服务]
部分规定systemd
应使用kafka-server-start.sh
和kafka-server-stop.sh
壳文件来启动和停止服务。
现在单位已经定义了,开始卡夫卡用以下命令:
1sudo systemctl start kafka
要确保服务器成功启动,请检查kafka
单元的日志日志:
1sudo journalctl -u kafka
您将看到类似于以下的输出:
1[secondary_label Output]
2Mar 23 13:31:48 kafka systemd[1]: Started kafka.service.
你现在有一个卡夫卡服务器在端口9092
上聆听,这是卡夫卡的默认端口。
你已经启动了kafka
服务,但如果你要重新启动你的服务器,它还不会自动启动。
1sudo systemctl enable kafka
现在你已经开始并启用了服务,是时候检查安装了。
步骤5:测试安装
让我们发布和消耗一个Hello World
消息,以确保卡夫卡服务器的行为是正确的。
- 一个 producer,它允许对主题发布记录和数据。
- 一个 consumer,它读取来自主题的消息和数据。
首先,创建一个名为TutorialTopic
的主题,键入:
1~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用kafka-console-producer.sh
脚本从命令行创建一个制作者,它将 Kafka 服务器的主机名、端口和主题名称作为参数。
将字符串Hello, World
发布到TutorialTopic
主题,键入:
1echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
--经纪人列表
的旗帜决定了要发送消息的邮件经纪人列表,在这种情况下是localhost:9092
。
接下来,您可以使用kafka-console-consumer.sh
脚本创建一个 Kafka 消费者,它将 ZooKeeper 服务器的主机名称和端口以及主题名称作为参数。
下面的命令消耗了来自TutorialTopic
的消息,请注意使用从开始
的旗帜,允许消耗在消费者开始之前发布的消息:
1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server `localhost:9092` --topic TutorialTopic --from-beginning
--bootstrap-server
提供了 Kafka 集群中的入口列表. 在这种情况下,您正在使用 localhost:9092
。
您将在您的终端中看到Hello, World
:
1[secondary_label Output]
2Hello, World
脚本将继续运行,等待更多消息发布到主题上。 请自由打开一个新的终端,并开始制作人发布一些更多消息。 你应该能够在消费者输出中看到它们。
完成测试后,请按CTRL+C
来停止用户脚本. 现在您已经测试了安装,您可以继续安装 KafkaT,以便更好地管理您的 Kafka 集群。
步骤 6 — 安装 KafkaT (可选)
KafkaT是Airbnb的一种工具,可以让您更轻松地查看卡夫卡集群的细节,并从命令行执行某些管理任务,因为它是卢比宝石,所以您需要使用Ruby(https://www.ruby-lang.org/en/)。
1sudo apt install ruby ruby-dev build-essential
您现在可以使用gem
命令安装 KafkaT:
1sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
CFLAGS=-Wno-error=format-overflow
选项禁用格式过流警告,并且对ZooKeeper宝石是必需的,这是一种对KafkaT的依赖。
KafkaT 使用 .kafkatcfg
作为配置文件来确定您的 Kafka 服务器的安装和日志目录. 它还应该有一个指向 KafkaT 的输入到您的 ZooKeeper 实例。
创建一个名为「.kafkatcfg」的新文件:
1nano ~/.kafkatcfg
添加以下行来指定关于 Kafka 服务器和 Zookeeper 实例所需的信息:
1[label ~/.kafkatcfg]
2{
3 "kafka_path": "~/kafka",
4 "log_path": "/tmp/kafka-logs",
5 "zk_path": "localhost:2181"
6}
您现在已经准备好使用 KafkaT. 首先,这里是如何使用它来查看所有 Kafka 分区的详细信息:
1kafkat partitions
您将看到以下结果:
1[secondary_label Output]
2Topic Partition Leader Replicas ISRs
3TutorialTopic 0 0 [0] [0]
4__consumer_offsets 0 0 [0] [0]
5...
此输出显示了TutorialTopic
,以及__consumer_offsets
,这是卡夫卡用于存储与客户相关的信息的内部主题,您可以安全地忽略从__consumer_offsets
开始的行。
要了解更多关于 KafkaT 的信息,请参阅其 GitHub 存储库。
现在你已经安装了 KafkaT,你可以选择在 Debian 10 服务器群集上设置 Kafka,以创建一个多节点群集。
步骤 7 — 设置一个多节点集群(可选)
如果您想使用多个 Debian 10 服务器创建一个多经纪人集群,请在每台新机器上重复步骤 1,步骤 4 和步骤 5. 此外,在每个机器的 ~/kafka/config/server.properties
文件中进行以下更改:
- 更改
broker.id
属性的值,使其在整个集群中独特。该属性独特地识别集群中的每个服务器,并且可以有任何字符串作为其值。例如,server1
、server2
等,将有用作为标识符。 - 更改
zookeeper.connect
属性的值,使所有节点都指向相同的ZooKeeper实例。 此属性指定了ZooKeeper实例的地址,并遵循了<HOSTNAME/IP_ADDRESS>:<PORT>
格式。 在本教程中,您将使用your_first_server_IP:2181′′,以您已经设置的 Debian 10 服务器的 IP 地址代替
your
如果您想为您的集群拥有多个 ZooKeeper 实例,则每个节点上的zookeeper.connect
属性值应该是列出所有 ZooKeeper 实例的 IP 地址和端口号的相同、单元格分离的字符串。
<$>[注]
注: 如果您在安装 Zookeeper 的 Debian 10 服务器上启用了防火墙,请确保打开端口 2181
,以允许从群集中的其他节点接收请求。
第8步:限制卡夫卡用户
现在所有安装都完成了,您可以删除kafka
用户的管理权限。在您这样做之前,请退出并作为其他非 root sudo 用户重新登录。
从 sudo 组中删除kafka
用户:
1sudo deluser kafka sudo
为了进一步提高 Kafka 服务器的安全性,请使用passwd
命令锁定kafka
用户的密码,以确保没有人可以直接使用此帐户登录服务器:
1sudo passwd kafka -l
在此时,只有 root 或 sudo 用户可以通过键入以下命令作为kafka
登录:
1sudo su - kafka
在未来,如果你想解锁它,使用passwd
与-u
选项:
1sudo passwd kafka -u
您现在已经成功限制了kafka
用户的管理权限。
结论
您现在可以在您的 Debian 服务器上安全地运行 Apache Kafka. 您可以通过使用 Kafka 客户端来创建 Kafka 生产者和消费者,在您的项目中使用它。