如何在 Ubuntu 20.04 上安装 Apache Kafka

作者选择了 自由和开源基金作为 写给捐款计划的一部分接受捐款。

介绍

Apache Kafka是一个流行的分布式消息经纪商,旨在处理大量的实时数据。一个Kafka集群是高度可扩展和容忍错误的,而且与其他消息经纪商(如ActiveMQRabbitMQ相比,其传输量也更高。尽管它通常被用作 publish/subscribe 消息系统,但许多组织也使用它用于日志集合,因为它为发布的消息提供持久的存储。

发布/订阅消息系统允许一个或多个生产商发布消息,而不考虑消费者的数量或他们将如何处理消息。订阅客户自动通知更新和创建新消息。

在本教程中,您将安装和配置Apache Kafka 2.8.2在Ubuntu 20.04上。

前提条件

要跟上,你需要:

步骤 1 — 创建卡夫卡的用户

因为卡夫卡可以通过网络处理请求,你的第一步是为该服务创建一个专用用户,这将最大限度地减少对你的Ubuntu机器的损害,如果有人破坏卡夫卡服务器。

登录您的服务器作为您的非根sudo用户,然后创建一个名为kafka的用户:

1sudo adduser kafka

按照提示设置密码并创建kafka用户。

接下来,用adduser命令将kafka用户添加到sudo组中。

1sudo adduser kafka sudo

你的kafka用户现在已经准备好了. 使用su登录到kafka帐户:

1su -l kafka

现在你已经创建了一个卡夫卡特定的用户,你已经准备好下载和提取卡夫卡二进制。

步骤2 —下载和提取卡夫卡二进制

在此步骤中,您将下载并将卡夫卡二进制提取到您的kafka用户主目录中的专用文件夹中。

首先,在/home/kafka中创建一个名为下载的目录,以存储下载:

1mkdir ~/Downloads

使用弯曲下载卡夫卡二进制:

1curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz

创建一个名为kafka的目录,然后移动到此目录. 您将使用此目录作为 Kafka 安装的基本目录:

1mkdir ~/kafka && cd ~/kafka

使用tar命令提取您下载的档案:

1tar -xvzf ~/Downloads/kafka.tgz --strip 1

您指定--条1旗,以确保档案的内容被提取到~/kafka/,而不是在其内部的其他目录(如~/kafka/kafka_2.13-2.8.2/)中。

现在你已经下载并成功提取了二进制,你可以开始配置你的卡夫卡服务器。

步骤 3 — 配置卡夫卡服务器

卡夫卡 topic是可以发布消息的类别、组或源名称,然而,卡夫卡的默认行为不会允许您删除主题。

卡夫卡的配置选项在server.properties中指定。 使用nano或您最喜欢的编辑器打开此文件:

1nano ~/kafka/config/server.properties

首先,添加一个设置,允许您删除Kafka主题. 将下列行添加到文件底部:

1[label ~/kafka/config/server.properties]
2delete.topic.enable = true

其次,你会通过修改log.dirs属性来更改存储卡夫卡日志的目录,找到log.dirs属性,并用突出的路线替换现有路线:

1[label ~/kafka/config/server.properties]
2log.dirs=/home/kafka/logs

保存并关闭文件。

现在你已经配置了Kafka,你可以创建systemd单元文件来运行并启用Kafka服务器在启动。

步骤 4 —创建systemd单元文件并启动卡夫卡服务器

在本节中,您将为 Kafka 服务创建 systemd 单元文件,这些文件将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,如启动、停止和重新启动 Kafka。

Kafka 使用 Zookeeper来管理其集群状态和配置. 它用于许多分布式系统,您可以在 官方 Zookeeper 文件中阅读更多有关该工具的信息。

创建Zookeeper的单元文件:

1sudo nano /etc/systemd/system/zookeeper.service

将下列单位定义输入到文件中:

 1[label /etc/systemd/system/zookeeper.service]
 2[Unit]
 3Requires=network.target remote-fs.target
 4After=network.target remote-fs.target
 5
 6[Service]
 7Type=simple
 8User=kafka
 9ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
10ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target

[单位]部分规定 Zookeeper 需要网络和文件系统在启动之前准备好。

[服务]部分规定systemd应使用zookeeper-server-start.shzookeeper-server-stop.sh壳文件来启动和停止服务。

添加此内容后,保存并关闭文件。

接下来,创建kafka的 systemd 服务文件:

1sudo nano /etc/systemd/system/kafka.service

将下列单位定义输入到文件中:

 1[label /etc/systemd/system/kafka.service]
 2[Unit]
 3Requires=zookeeper.service
 4After=zookeeper.service
 5
 6[Service]
 7Type=simple
 8User=kafka
 9ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
10ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target

[单位]部分规定这个单位文件取决于zookeeper.service,这将确保zookeeperkafka服务启动时自动启动。

[服务]部分规定systemd应该使用kafka-server-start.shkafka-server-stop.sh壳文件来启动和停止服务。

保存并关闭文件。

现在你已经定义了单位,开始卡夫卡用以下命令:

1sudo systemctl start kafka

要确保服务器成功启动,请检查kafka单元的日志日志:

1sudo systemctl status kafka

你会得到这样的输出:

 1[secondary_label Output]
 2 kafka.service
 3     Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset>
 4     Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s ago
 5   Main PID: 17770 (sh)
 6      Tasks: 69 (limit: 4677)
 7     Memory: 321.9M
 8     CGroup: /system.slice/kafka.service
 9             ├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho>
10             └─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>

你现在有一个卡夫卡服务器在端口9092上听,这是卡夫卡服务器使用的默认端口。

您已经启动了kafka服务,但如果您重新启动服务器,Kafka 将不会自动重新启动。

1sudo systemctl enable zookeeper

您将收到一个响应,即创建了一个 symlink:

1[secondary_label Output]
2Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.

然后运行这个命令:

1sudo systemctl enable kafka

您将收到一个响应,即创建了一个 symlink:

1[secondary_label Output]
2Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.

在此步骤中,您启动了kafkazookeeper服务,在下一步,您将检查Kafka的安装。

步骤5 —测试卡夫卡安装

在此步骤中,您将测试您的 Kafka 安装. 您将发布和消耗一个 Hello World 消息,以确保 Kafka 服务器的行为如预期。

在卡夫卡中发布信息需要:

  • 一个 producer,谁可以将记录和数据发布到主题。
  • 一个 consumer,谁读取消息和数据从主题。

首先,创建一个名为TutorialTopic的主题:

1~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用kafka-console-producer.sh脚本从命令行创建一个制作者,它将 Kafka 服务器的主机名、端口和主题作为参数。

您将收到一个答案,即主题已创建:

1[secondary_label Output]
2Created topic TutorialTopic.

现在将字符串Hello, World发布到TutorialTopic主题:

1echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

接下来,使用kafka-console-consumer.sh脚本创建一个Kafka消费者,它预计ZooKeeper服务器的主机名和端口以及主题名称作为论点。

1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

如果没有配置问题,您将在您的终端收到Hello, World响应:

1[secondary_label Output]
2Hello, World

脚本将继续运行,等待更多的消息发布。 要测试这一点,请打开一个新的终端窗口并登录您的服务器。

1[environment second]
2su -l kafka

在这个新终端中,开始制作人发布第二条消息:

1[environment second]
2echo "Hello World from Sammy at DigitalOcean!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

此消息将在原始终端中加载到消费者输出中:

1[secondary_label Output]
2Hello, World
3Hello World from Sammy at DigitalOcean!

当您完成测试时,请按CTRL+C,以停止原始终端中的消费者脚本。

您现在已经安装并配置了Kafka服务器在Ubuntu 20.04. 在下一步,您将执行一些快速的任务来加强Kafka服务器的安全性。

步骤 6 — 硬化卡夫卡服务器

安装完成后,您可以删除kafka用户的管理权限,并硬化Kafka服务器。

在这样做之前,请退出并重新登录为任何其他非根sudo用户. 如果您仍在运行您开始本教程的相同壳会话,请键入退出

从 sudo 组中删除kafka用户:

1sudo deluser kafka sudo

为了进一步提高 Kafka 服务器的安全性,请使用passwd命令锁定kafka用户的密码。

1sudo passwd kafka -l

「-l」旗锁定了更改用户密码的命令(「passwd」)。

在此时,只有rootsudo用户可以使用以下命令登录为kafka:

1sudo su - kafka

在未来,如果您想解锁更改密码的能力,请使用passwd-u选项:

1sudo passwd kafka -u

您现在已经成功限制了kafka用户的管理权限,您已经准备好开始使用 Kafka。

步骤 7 — 安装 KafkaT (可选)

KafkaT旨在提高您查看卡夫卡集群的细节和执行指令行中的某些管理任务的能力,因为它是卢比宝石,您将需要卢比来使用它,您还需要构建必不可少包来构建卡夫卡T所依赖的其他宝石。

使用apt安装 Ruby 和build-essential包:

1sudo apt install ruby ruby-dev build-essential

您现在可以使用gem命令安装 KafkaT:

1sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

需要Wno-error=format-overflow编译标志来抑制 Zookeeper 在kafkat安装过程中出现的警告和错误。

当安装完成后,您将收到一个响应,即完成:

1[secondary_label Output]
2...
3Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds
48 gems installed

KafkaT 使用 .kafkatcfg 作为配置文件来确定您的 Kafka 服务器的安装和日志目录. 它还应该有一个指向 KafkaT 的输入到您的 ZooKeeper 实例。

创建一个名为「.kafkatcfg」的新文件:

1nano ~/.kafkatcfg

添加以下行来指定关于 Kafka 服务器和 Zookeeper 实例所需的信息:

1[label ~/.kafkatcfg]
2{
3  "kafka_path": "~/kafka",
4  "log_path": "/home/kafka/logs",
5  "zk_path": "localhost:2181"
6}

保存并关闭文件. 您现在已经准备好使用 KafkaT。

要查看所有卡夫卡分区的详细信息,请尝试运行此命令:

1kafkat partitions

您将获得以下输出:

1[secondary_label Output]
2[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible.
3/var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated
4...
5Topic Partition Leader Replicas ISRs
6TutorialTopic 0 0         [0]             [0]
7__consumer_offsets	  0		          0		      [0]							[0]
8...
9...

输出将包括TutorialTopic__consumer_offsets,这是卡夫卡用于存储与客户相关的信息的内部主题,您可以安全地忽略从__consumer_offsets开始的行。

要了解更多关于 KafkaT 的信息,请参阅其 GitHub 存储库

结论

你现在可以在 Ubuntu 服务器上安全地运行 Apache Kafka. 你可以使用 Kafka 客户端将 Kafka 集成到你最喜欢的编程语言中。

要了解更多关于卡夫卡的信息,您也可以参阅其文件(https://kafka.apache.org/documentation.html)。

Published At
Categories with 技术
comments powered by Disqus