作者选择了 自由和开源基金作为 写给捐款计划的一部分接受捐款。
介绍
Apache Kafka是一个流行的分布式消息经纪商,旨在处理大量的实时数据。一个Kafka集群是高度可扩展和容忍错误的,而且与其他消息经纪商(如ActiveMQ和RabbitMQ相比,其传输量也更高。尽管它通常被用作 publish/subscribe 消息系统,但许多组织也使用它用于日志集合,因为它为发布的消息提供持久的存储。
发布/订阅消息系统允许一个或多个生产商发布消息,而不考虑消费者的数量或他们将如何处理消息。订阅客户自动通知更新和创建新消息。
在本教程中,您将安装和配置Apache Kafka 2.8.2在Ubuntu 20.04上。
前提条件
要跟上,你需要:
- Ubuntu 20.04 伺服器具有至少 4 GB 的 RAM 和具有
sudo
特权的非根用户。如果您没有根用户,您可以按照我们的 初始伺服器设置指南设置此设置。 RAM 少于 4 GB 的安装可能会导致 Kafka 服务失败。 - OpenJDK 11 安装在您的服务器上。 要安装此版本,请遵循我们关于 如何在 Ubuntu 20.04 上使用 APT 安装 Java 的教程的教程。 Kafka 是用 Java 编写的,因此需要 JVM。
步骤 1 — 创建卡夫卡的用户
因为卡夫卡可以通过网络处理请求,你的第一步是为该服务创建一个专用用户,这将最大限度地减少对你的Ubuntu机器的损害,如果有人破坏卡夫卡服务器。
登录您的服务器作为您的非根sudo
用户,然后创建一个名为kafka
的用户:
1sudo adduser kafka
按照提示设置密码并创建kafka
用户。
接下来,用adduser
命令将kafka
用户添加到sudo
组中。
1sudo adduser kafka sudo
你的kafka
用户现在已经准备好了. 使用su
登录到kafka
帐户:
1su -l kafka
现在你已经创建了一个卡夫卡特定的用户,你已经准备好下载和提取卡夫卡二进制。
步骤2 —下载和提取卡夫卡二进制
在此步骤中,您将下载并将卡夫卡二进制提取到您的kafka
用户主目录中的专用文件夹中。
首先,在/home/kafka
中创建一个名为下载
的目录,以存储下载:
1mkdir ~/Downloads
使用弯曲
下载卡夫卡二进制:
1curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz
创建一个名为kafka
的目录,然后移动到此目录. 您将使用此目录作为 Kafka 安装的基本目录:
1mkdir ~/kafka && cd ~/kafka
使用tar
命令提取您下载的档案:
1tar -xvzf ~/Downloads/kafka.tgz --strip 1
您指定--条1
旗,以确保档案的内容被提取到~/kafka/
,而不是在其内部的其他目录(如~/kafka/kafka_2.13-2.8.2/
)中。
现在你已经下载并成功提取了二进制,你可以开始配置你的卡夫卡服务器。
步骤 3 — 配置卡夫卡服务器
卡夫卡 topic是可以发布消息的类别、组或源名称,然而,卡夫卡的默认行为不会允许您删除主题。
卡夫卡的配置选项在server.properties
中指定。 使用nano
或您最喜欢的编辑器打开此文件:
1nano ~/kafka/config/server.properties
首先,添加一个设置,允许您删除Kafka主题. 将下列行添加到文件底部:
1[label ~/kafka/config/server.properties]
2delete.topic.enable = true
其次,你会通过修改log.dirs
属性来更改存储卡夫卡日志的目录,找到log.dirs
属性,并用突出的路线替换现有路线:
1[label ~/kafka/config/server.properties]
2log.dirs=/home/kafka/logs
保存并关闭文件。
现在你已经配置了Kafka,你可以创建systemd
单元文件来运行并启用Kafka服务器在启动。
步骤 4 —创建systemd
单元文件并启动卡夫卡服务器
在本节中,您将为 Kafka 服务创建 systemd
单元文件,这些文件将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,如启动、停止和重新启动 Kafka。
Kafka 使用 Zookeeper来管理其集群状态和配置. 它用于许多分布式系统,您可以在 官方 Zookeeper 文件中阅读更多有关该工具的信息。
创建Zookeeper
的单元文件:
1sudo nano /etc/systemd/system/zookeeper.service
将下列单位定义输入到文件中:
1[label /etc/systemd/system/zookeeper.service]
2[Unit]
3Requires=network.target remote-fs.target
4After=network.target remote-fs.target
5
6[Service]
7Type=simple
8User=kafka
9ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
10ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target
该[单位]
部分规定 Zookeeper 需要网络和文件系统在启动之前准备好。
该[服务]
部分规定systemd
应使用zookeeper-server-start.sh
和zookeeper-server-stop.sh
壳文件来启动和停止服务。
添加此内容后,保存并关闭文件。
接下来,创建kafka
的 systemd 服务文件:
1sudo nano /etc/systemd/system/kafka.service
将下列单位定义输入到文件中:
1[label /etc/systemd/system/kafka.service]
2[Unit]
3Requires=zookeeper.service
4After=zookeeper.service
5
6[Service]
7Type=simple
8User=kafka
9ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
10ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target
[单位]
部分规定这个单位文件取决于zookeeper.service
,这将确保zookeeper
在kafka
服务启动时自动启动。
[服务]
部分规定systemd
应该使用kafka-server-start.sh
和kafka-server-stop.sh
壳文件来启动和停止服务。
保存并关闭文件。
现在你已经定义了单位,开始卡夫卡用以下命令:
1sudo systemctl start kafka
要确保服务器成功启动,请检查kafka
单元的日志日志:
1sudo systemctl status kafka
你会得到这样的输出:
1[secondary_label Output]
2● kafka.service
3 Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset>
4 Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s ago
5 Main PID: 17770 (sh)
6 Tasks: 69 (limit: 4677)
7 Memory: 321.9M
8 CGroup: /system.slice/kafka.service
9 ├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho>
10 └─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>
你现在有一个卡夫卡服务器在端口9092
上听,这是卡夫卡服务器使用的默认端口。
您已经启动了kafka
服务,但如果您重新启动服务器,Kafka 将不会自动重新启动。
1sudo systemctl enable zookeeper
您将收到一个响应,即创建了一个 symlink:
1[secondary_label Output]
2Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.
然后运行这个命令:
1sudo systemctl enable kafka
您将收到一个响应,即创建了一个 symlink:
1[secondary_label Output]
2Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
在此步骤中,您启动了kafka
和zookeeper
服务,在下一步,您将检查Kafka的安装。
步骤5 —测试卡夫卡安装
在此步骤中,您将测试您的 Kafka 安装. 您将发布和消耗一个 Hello World 消息,以确保 Kafka 服务器的行为如预期。
在卡夫卡中发布信息需要:
- 一个 producer,谁可以将记录和数据发布到主题。
- 一个 consumer,谁读取消息和数据从主题。
首先,创建一个名为TutorialTopic
的主题:
1~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
您可以使用kafka-console-producer.sh
脚本从命令行创建一个制作者,它将 Kafka 服务器的主机名、端口和主题作为参数。
您将收到一个答案,即主题已创建:
1[secondary_label Output]
2Created topic TutorialTopic.
现在将字符串Hello, World
发布到TutorialTopic
主题:
1echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
接下来,使用kafka-console-consumer.sh
脚本创建一个Kafka消费者,它预计ZooKeeper服务器的主机名和端口以及主题名称作为论点。
1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
如果没有配置问题,您将在您的终端收到Hello, World
响应:
1[secondary_label Output]
2Hello, World
脚本将继续运行,等待更多的消息发布。 要测试这一点,请打开一个新的终端窗口并登录您的服务器。
1[environment second]
2su -l kafka
在这个新终端中,开始制作人发布第二条消息:
1[environment second]
2echo "Hello World from Sammy at DigitalOcean!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
此消息将在原始终端中加载到消费者输出中:
1[secondary_label Output]
2Hello, World
3Hello World from Sammy at DigitalOcean!
当您完成测试时,请按CTRL+C
,以停止原始终端中的消费者脚本。
您现在已经安装并配置了Kafka服务器在Ubuntu 20.04. 在下一步,您将执行一些快速的任务来加强Kafka服务器的安全性。
步骤 6 — 硬化卡夫卡服务器
安装完成后,您可以删除kafka
用户的管理权限,并硬化Kafka服务器。
在这样做之前,请退出并重新登录为任何其他非根sudo
用户. 如果您仍在运行您开始本教程的相同壳会话,请键入退出
。
从 sudo 组中删除kafka
用户:
1sudo deluser kafka sudo
为了进一步提高 Kafka 服务器的安全性,请使用passwd
命令锁定kafka
用户的密码。
1sudo passwd kafka -l
「-l」旗锁定了更改用户密码的命令(「passwd」)。
在此时,只有root
或sudo
用户可以使用以下命令登录为kafka
:
1sudo su - kafka
在未来,如果您想解锁更改密码的能力,请使用passwd
与-u
选项:
1sudo passwd kafka -u
您现在已经成功限制了kafka
用户的管理权限,您已经准备好开始使用 Kafka。
步骤 7 — 安装 KafkaT (可选)
KafkaT旨在提高您查看卡夫卡集群的细节和执行指令行中的某些管理任务的能力,因为它是卢比宝石,您将需要卢比来使用它,您还需要构建必不可少
包来构建卡夫卡T
所依赖的其他宝石。
使用apt
安装 Ruby 和build-essential
包:
1sudo apt install ruby ruby-dev build-essential
您现在可以使用gem
命令安装 KafkaT:
1sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
需要Wno-error=format-overflow
编译标志来抑制 Zookeeper 在kafkat
安装过程中出现的警告和错误。
当安装完成后,您将收到一个响应,即完成:
1[secondary_label Output]
2...
3Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds
48 gems installed
KafkaT 使用 .kafkatcfg
作为配置文件来确定您的 Kafka 服务器的安装和日志目录. 它还应该有一个指向 KafkaT 的输入到您的 ZooKeeper 实例。
创建一个名为「.kafkatcfg」的新文件:
1nano ~/.kafkatcfg
添加以下行来指定关于 Kafka 服务器和 Zookeeper 实例所需的信息:
1[label ~/.kafkatcfg]
2{
3 "kafka_path": "~/kafka",
4 "log_path": "/home/kafka/logs",
5 "zk_path": "localhost:2181"
6}
保存并关闭文件. 您现在已经准备好使用 KafkaT。
要查看所有卡夫卡分区的详细信息,请尝试运行此命令:
1kafkat partitions
您将获得以下输出:
1[secondary_label Output]
2[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible.
3/var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated
4...
5Topic Partition Leader Replicas ISRs
6TutorialTopic 0 0 [0] [0]
7__consumer_offsets 0 0 [0] [0]
8...
9...
输出将包括TutorialTopic
和__consumer_offsets
,这是卡夫卡用于存储与客户相关的信息的内部主题,您可以安全地忽略从__consumer_offsets
开始的行。
要了解更多关于 KafkaT 的信息,请参阅其 GitHub 存储库。
结论
你现在可以在 Ubuntu 服务器上安全地运行 Apache Kafka. 你可以使用 Kafka 客户端将 Kafka 集成到你最喜欢的编程语言中。
要了解更多关于卡夫卡的信息,您也可以参阅其文件(https://kafka.apache.org/documentation.html)。