如何在 Debian 10 上安装 Apache Kafka

作者选择了 自由和开源基金作为 写给捐款计划的一部分接受捐款。

介绍

Apache Kafka是一个流行的分布式消息经纪商,旨在处理大量的实时数据。一个Kafka集群是高度可扩展和耐错误的,并且与其他消息经纪商(如 ActiveMQRabbitMQ相比,其传输量也更高。

发布/订阅消息系统允许一个或多个生产商发布消息,而不考虑消费者的数量或他们将如何处理消息。订阅客户自动通知更新和创建新消息。

在本教程中,您将安全地在 Debian 10 服务器上安装和配置 Apache Kafka 2.1.1,然后通过生成和消耗一个Hello World消息来测试您的设置,然后您将可选地安装 KafkaT来监控 Kafka,并设置一个 Kafka 多节点集群。

前提条件

要跟上,你需要:

<$>[注] 注: 没有4GB的RAM的安装可能会导致卡夫卡服务失败,而Java虚拟机(JVM)(https://en.wikipedia.org/wiki/Java_virtual_machine)在启动时会抛出一个内存漏洞例外。

步骤 1 — 为 Kafka 创建用户

由于Kafka可以通过网络处理请求,所以为其创建一个专用用户是最好的做法,这将最大限度地减少对您的Debian机器的损害,如果Kafka服务器受到威胁。

登录为非 root sudo 用户,使用useradd命令创建一个名为kafka的用户:

1sudo useradd kafka -m

-m 标志确保为用户创建一个主目录. 此主目录, /home/kafka,将作为您的工作区目录,以便在以后执行命令。

使用passwd设置密码:

1sudo passwd kafka

输入您想为此用户使用的密码。

接下来,用adduser命令将 kafka 用户添加到sudo组中,使其具有安装 Kafka 的依赖性所需的权限:

1sudo adduser kafka sudo

你的 kafka 用户已经准备好了. 使用su登录这个帐户:

1su -l kafka

现在你已经创建了卡夫卡特定的用户,你可以继续下载和提取卡夫卡二进制。

步骤2:下载和提取卡夫卡二进制

在此步骤中,您将下载并提取 Kafka 二进制文本到您的 kafka 用户主目录中的专用文件夹中。

首先,在/home/kafka中创建一个名为下载的目录,以存储下载:

1mkdir ~/Downloads

接下来,使用apt-get安装curl,以便您可以下载远程文件:

1sudo apt-get update && sudo apt-get install curl

当被提示时,键入Y来确认弯曲下载。

一旦安装了‘curl’,使用它来下载卡夫卡二进制:

1curl "https://archive.apache.org/dist/kafka/2.1.1/kafka_2.11-2.1.1.tgz" -o ~/Downloads/kafka.tgz

创建一个名为kafka的目录,并切换到这个目录,这将是 Kafka 安装的基本目录:

1mkdir ~/kafka && cd ~/kafka

使用tar命令提取您下载的档案:

1tar -xvzf ~/Downloads/kafka.tgz --strip 1

您指定了--条1旗,以确保档案的内容被提取到~/kafka/,而不是在其内部的其他目录中,例如~/kafka/kafka_2.12-2.1.1/

现在你已经下载并成功提取了二进制,你可以继续配置卡夫卡以允许主题删除。

步骤 3 – 配置 Kafka 服务器

Kafka 的默认行为不会允许我们删除可发布消息的 topic、类别、群组或信号名称。

卡夫卡的配置选项在server.properties中指定。 使用nano或您最喜欢的编辑器打开此文件:

1nano ~/kafka/config/server.properties

让我们添加一个设置,允许我们删除Kafka主题. 将下面的突出行添加到文件底部:

1[label ~/kafka/config/server.properties]
2...
3group.initial.rebalance.delay.ms
4
5delete.topic.enable = true

保存文件,然后退出nano。现在你已经配置了Kafka,你可以创建systemd单元文件来运行并启用Kafka在启动。

步骤 4 — 创建 Systemd 单元文件并启动 Kafka 服务器

在本节中,您将为 Kafka 服务创建 systemd 单元文件,这将帮助您以与其他 Linux 服务一致的方式执行常见的服务操作,如启动、停止和重新启动 Kafka。

ZooKeeper 是 Kafka 用来管理其集群状态和配置的服务,它通常在分布式系统中被用作组成部分,在本教程中,您将使用 Zookeeper 来管理 Kafka 的这些方面。

首先,创建zookeeper的单元文件:

1sudo nano /etc/systemd/system/zookeeper.service

将下列单位定义输入到文件中:

 1[label /etc/systemd/system/zookeeper.service]
 2[Unit]
 3Requires=network.target remote-fs.target
 4After=network.target remote-fs.target
 5
 6[Service]
 7Type=simple
 8User=kafka
 9ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
10ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target

[单位]部分规定,ZooKeeper需要网络和文件系统在启动之前做好准备。

[服务]部分规定systemd应使用zookeeper-server-start.shzookeeper-server-stop.sh壳文件来启动和停止服务。

接下来,为kafka创建systemd服务文件:

1sudo nano /etc/systemd/system/kafka.service

将下列单位定义输入到文件中:

 1[label /etc/systemd/system/kafka.service]
 2[Unit]
 3Requires=zookeeper.service
 4After=zookeeper.service
 5
 6[Service]
 7Type=simple
 8User=kafka
 9ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
10ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
11Restart=on-abnormal
12
13[Install]
14WantedBy=multi-user.target

[单位]部分规定这个单元文件取决于zookeeper.service。这将确保zookeeperkafka服务开始时自动启动。

[服务]部分规定systemd应使用kafka-server-start.shkafka-server-stop.sh壳文件来启动和停止服务。

现在单位已经定义了,开始卡夫卡用以下命令:

1sudo systemctl start kafka

要确保服务器成功启动,请检查kafka单元的日志日志:

1sudo journalctl -u kafka

您将看到类似于以下的输出:

1[secondary_label Output]
2Mar 23 13:31:48 kafka systemd[1]: Started kafka.service.

你现在有一个卡夫卡服务器在端口9092上聆听,这是卡夫卡的默认端口。

你已经启动了kafka服务,但如果你要重新启动你的服务器,它还不会自动启动。

1sudo systemctl enable kafka

现在你已经开始并启用了服务,是时候检查安装了。

步骤5:测试安装

让我们发布和消耗一个Hello World消息,以确保卡夫卡服务器的行为是正确的。

  • 一个 producer,它允许对主题发布记录和数据。
  • 一个 consumer,它读取来自主题的消息和数据。

首先,创建一个名为TutorialTopic的主题,键入:

1~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用kafka-console-producer.sh脚本从命令行创建一个制作者,它将 Kafka 服务器的主机名、端口和主题名称作为参数。

将字符串Hello, World发布到TutorialTopic主题,键入:

1echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

--经纪人列表的旗帜决定了要发送消息的邮件经纪人列表,在这种情况下是localhost:9092

接下来,您可以使用kafka-console-consumer.sh脚本创建一个 Kafka 消费者,它将 ZooKeeper 服务器的主机名称和端口以及主题名称作为参数。

下面的命令消耗了来自TutorialTopic的消息,请注意使用从开始的旗帜,允许消耗在消费者开始之前发布的消息:

1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server `localhost:9092` --topic TutorialTopic --from-beginning

--bootstrap-server 提供了 Kafka 集群中的入口列表. 在这种情况下,您正在使用 localhost:9092

您将在您的终端中看到Hello, World:

1[secondary_label Output]
2Hello, World

脚本将继续运行,等待更多消息发布到主题上。 请自由打开一个新的终端,并开始制作人发布一些更多消息。 你应该能够在消费者输出中看到它们。

完成测试后,请按CTRL+C来停止用户脚本. 现在您已经测试了安装,您可以继续安装 KafkaT,以便更好地管理您的 Kafka 集群。

步骤 6 — 安装 KafkaT (可选)

KafkaT是Airbnb的一种工具,可以让您更轻松地查看卡夫卡集群的细节,并从命令行执行某些管理任务,因为它是卢比宝石,所以您需要使用Ruby(https://www.ruby-lang.org/en/)。

1sudo apt install ruby ruby-dev build-essential

您现在可以使用gem命令安装 KafkaT:

1sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

CFLAGS=-Wno-error=format-overflow选项禁用格式过流警告,并且对ZooKeeper宝石是必需的,这是一种对KafkaT的依赖。

KafkaT 使用 .kafkatcfg 作为配置文件来确定您的 Kafka 服务器的安装和日志目录. 它还应该有一个指向 KafkaT 的输入到您的 ZooKeeper 实例。

创建一个名为「.kafkatcfg」的新文件:

1nano ~/.kafkatcfg

添加以下行来指定关于 Kafka 服务器和 Zookeeper 实例所需的信息:

1[label ~/.kafkatcfg]
2{
3  "kafka_path": "~/kafka",
4  "log_path": "/tmp/kafka-logs",
5  "zk_path": "localhost:2181"
6}

您现在已经准备好使用 KafkaT. 首先,这里是如何使用它来查看所有 Kafka 分区的详细信息:

1kafkat partitions

您将看到以下结果:

1[secondary_label Output]
2Topic Partition Leader Replicas ISRs    
3TutorialTopic 0 0         [0]             [0]
4__consumer_offsets	  0		        0		  [0]			  [0]
5...

此输出显示了TutorialTopic,以及__consumer_offsets,这是卡夫卡用于存储与客户相关的信息的内部主题,您可以安全地忽略从__consumer_offsets开始的行。

要了解更多关于 KafkaT 的信息,请参阅其 GitHub 存储库

现在你已经安装了 KafkaT,你可以选择在 Debian 10 服务器群集上设置 Kafka,以创建一个多节点群集。

步骤 7 — 设置一个多节点集群(可选)

如果您想使用多个 Debian 10 服务器创建一个多经纪人集群,请在每台新机器上重复步骤 1,步骤 4 和步骤 5. 此外,在每个机器的 ~/kafka/config/server.properties 文件中进行以下更改:

  • 更改broker.id属性的值,使其在整个集群中独特。该属性独特地识别集群中的每个服务器,并且可以有任何字符串作为其值。例如,server1server2等,将有用作为标识符。
  • 更改zookeeper.connect属性的值,使所有节点都指向相同的ZooKeeper实例。 此属性指定了ZooKeeper实例的地址,并遵循了<HOSTNAME/IP_ADDRESS>:<PORT>格式。 在本教程中,您将使用your_first_server_IP:2181′′,以您已经设置的 Debian 10 服务器的 IP 地址代替your

如果您想为您的集群拥有多个 ZooKeeper 实例,则每个节点上的zookeeper.connect属性值应该是列出所有 ZooKeeper 实例的 IP 地址和端口号的相同、单元格分离的字符串。

<$>[注] 注: 如果您在安装 Zookeeper 的 Debian 10 服务器上启用了防火墙,请确保打开端口 2181,以允许从群集中的其他节点接收请求。

第8步:限制卡夫卡用户

现在所有安装都完成了,您可以删除kafka用户的管理权限。在您这样做之前,请退出并作为其他非 root sudo 用户重新登录。

从 sudo 组中删除kafka用户:

1sudo deluser kafka sudo

为了进一步提高 Kafka 服务器的安全性,请使用passwd命令锁定kafka用户的密码,以确保没有人可以直接使用此帐户登录服务器:

1sudo passwd kafka -l

在此时,只有 root 或 sudo 用户可以通过键入以下命令作为kafka登录:

1sudo su - kafka

在未来,如果你想解锁它,使用passwd-u选项:

1sudo passwd kafka -u

您现在已经成功限制了kafka用户的管理权限。

结论

您现在可以在您的 Debian 服务器上安全地运行 Apache Kafka. 您可以通过使用 Kafka 客户端来创建 Kafka 生产者和消费者,在您的项目中使用它。

Published At
Categories with 技术
comments powered by Disqus