如何在 CentOS 7 上备份、导入和迁移 Apache Kafka 数据

作者选择了(https://www.brightfunds.org/funds/tech-education)作为 写给捐款计划的一部分的捐款。

介绍

备份你的 Apache Kafka数据是一个重要的做法,将有助于你从意外数据丢失或坏数据添加到集群由于用户错误的恢复。

在因服务器硬件或网络故障而无法使用的情况下,将备份数据导入和迁移到单独的服务器非常有用,并且您需要使用旧数据创建一个新的 Kafka 实例。

在本教程中,您将备份、导入和迁移您的Kafka数据到单一的CentOS 7安装以及多个CentOS 7安装在单独的服务器上。ZooKeeper是Kafka操作的关键组成部分。它存储有关群集状态的信息,如消费者数据、分区数据和群集中的其他经纪人的状态。

前提条件

要跟上,你需要:

  • 一个 CentOS 7 服务器,至少有 4GB RAM ,并通过跟踪此教程而设置的非根 sudo 用户.
  • 安装了Apache Kafka的 CentOS 7 服务器,作为备份的来源。 遵循 [如何在 CentOS 7 上安装 Apache Kafka (https://andsky.com/tech/tutorials/how-to-install-apache-kafka-on-centos-7) 指南来设置您的 Kafka 安装, 如果 Kafka 尚未安装在源服务器上 。
  • [OpenJDK] (http://openjdk.java.net/) 8被安装在服务器上. 要安装此版本, 请在安装 OpenJDK 的特定版本时遵循这些 [指示 (https://andsky.com/tech/tutorials/how-to-install-java-on-centos-and-fedora# install-openjdk-8).
  • 第7步的可选性——安装了Apache Kafka的另一台CentOS 7服务器,作为备份的目的地. 遵循之前在目的地服务器上安装Kafka的前提条件中的文章链接. 只有当您正在将您的 Kafka 数据从一个服务器移动到另一个服务器时,才需要这个先决条件 。 如果您想要备份并导入您的 Kafka 数据到单个服务器, 您可以跳过此先决条件 。 .

步骤 1 — 创建测试主题和添加消息

一个卡夫卡 message 是卡夫卡最基本的数据存储单位,也是您将向卡夫卡发布和订阅的实体. 卡夫卡** 专题** 类似于一组相关信息的容器。 当您订阅特定主题时, 您只能收到针对该特定主题发布的信息 。 在此区域中, 您将登录到您想要备份的服务器( 源服务器) , 并添加一个 Kafka 话题和一个消息, 这样您就可以为备份 .

本教程假定您已经安装了 Kafka 在用户的 kafka 主目录中 (/home/kafka/kafka)。 如果您的安装是在不同的目录中,请在下面的命令中修改 ~/kafka 部分与您的 Kafka 安装路径,并在本教程的其余时间内对命令进行修改。

SSH 进入源服务器,执行:

1[environment local]
2ssh sammy@source_server_ip

运行以下命令以作为 kafka 用户登录:

1sudo -iu kafka

使用 Kafka 安装的 Bin 目录中的 Kafka-topics.sh 壳工具文件创建一个名为 BackupTopic 的主题,键入:

1~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BackupTopic

BackupTopic主题中,使用~/kafka/bin/kafka-console-producer.sh壳实用程序来发布字符串Test Message 1

如果您想在这里添加额外的消息,您可以现在这样做。

1echo "Test Message 1" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic BackupTopic > /dev/null

~/kafka/bin/kafka-console-producer.sh文件允许您直接从命令行发布消息. 通常,您会使用Kafka客户端库在您的程序内发布消息,但由于这涉及不同编程语言的不同设置,您可以使用壳脚本作为测试或执行管理任务时发布消息的语言独立的方式。

接下来,通过运行以下命令来验证‘kafka-console-producer.sh’脚本发布了消息(s):

1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

______________________________________________________________________ 贝壳脚本启动消费者. 一旦开始,它将订阅您在上一个命令中的`"Test Message 1' 消息中发表的主题的信息。 命令中的 " 从开始 " 旗帜允许消费在消费者开始前发布的信息。 没有启用此旗帜, 只有消费者开始后发布的消息才会出现 。 在运行命令时,您会在终端中看到以下输出:

1[secondary_label Output]
2Test Message 1

CTRL+C来阻止消费者。

您已创建了一些测试数据,并验证其持久性,现在您可以在下一节中备份状态数据。

步骤2 - 备份 ZooKeeper 状态数据

在备份实际的 Kafka 数据之前,您需要备份存储在 ZooKeeper 中的集群状态。

ZooKeeper 将其数据存储在设置文件 ~/kafka/config/zookeeper.properties 中的 dataDir字段所指定的目录中. 您需要读取该字段的值来确定要备份的目录。 默认情况下,dataDir指向/tmp/zookeeper' 目录。 如果在您的安装中值不同,请在下面的命令中用该值替换 `/tmp/zookeeper。

以下是 ~/kafka/config/zookeeper.properties 文件的示例输出:

 1[label ~/kafka/config/zookeeper.properties]
 2...
 3...
 4...
 5# the directory where the snapshot is stored.
 6dataDir=/tmp/zookeeper
 7# the port at which the clients will connect
 8clientPort=2181
 9# disable the per-ip limit on the number of connections since this is a non-production config
10maxClientCnxns=0
11...
12...
13...

现在你有通往目录的路径,你可以创建一个压缩的档案文件其内容. 压缩的档案文件是一个更好的选择比普通的档案文件来节省磁盘空间。

1tar -czf /home/kafka/zookeeper-backup.tar.gz /tmp/zookeeper/*

命令的输出 tar: 删除引导 / 从成员名称 你可以安全地忽略。

-c-z 旗表示 tar 创建档案,并将 gzip 压缩应用到档案中. -f 旗指明输出压缩档案的名称,在这种情况下是 zookeeper-backup.tar.gz

您可以在当前目录中运行ls,以查看zookeeper-backup.tar.gz作为输出的一部分。

您现在已经成功地备份了 ZooKeeper 数据,在下一部分,您将备份实际的 Kafka 数据。

第3步:备份卡夫卡主题和信息

在本节中,您将将卡夫卡的数据目录备份为压缩的文件,就像您在上一个步骤中对ZooKeeper做的那样。

卡夫卡将主题、消息和内部文件存储在log.dirs字段在~/kafka/config/server.properties配置文件中指定的目录中. 您需要读取该字段的值来确定要备份的目录。 默认情况下,在当前的安装中,log.dirs指向/tmp/kafka-logs目录。 如果在安装中值不同,请在下面的命令中用正确的值替换/tmp/kafka-logs

以下是 ~/kafka/config/server.properties 文件的示例输出:

 1[label ~/kafka/config/server.properties]
 2...
 3...
 4...
 5############################# Log Basics #############################
 6
 7# A comma separated list of directories under which to store log files
 8log.dirs=/tmp/kafka-logs
 9
10# The default number of log partitions per topic. More partitions allow greater
11# parallelism for consumption, but this will also result in more files across
12# the brokers.
13num.partitions=1
14
15# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
16# This value is recommended to be increased for installations with data dirs located in RAID array.
17num.recovery.threads.per.data.dir=1
18...
19...
20...

首先,停止卡夫卡服务,以便在创建tar档案时在log.dirs目录中的数据处于一致状态。

1sudo systemctl stop kafka

停止 Kafka 服务后,请再次登录作为您的 kafka 用户:

1sudo -iu kafka

必须停止/启动 Kafka 和 ZooKeeper 服务作为您的非 root sudo 用户,因为在 Apache Kafka 安装前提条件中,您限制了 kafka 用户作为安全预防措施。

现在,通过运行以下命令创建文件目录内容的压缩档案文件:

1tar -czf /home/kafka/kafka-backup.tar.gz /tmp/kafka-logs/*

再次,您可以安全地忽略命令的输出(‘tar:删除引导 / 成员名称’)。

您可以在当前目录中运行ls,以查看kafka-backup.tar.gz作为输出的一部分。

您可以重新启动 Kafka 服务 - 如果您不想立即恢复数据 - 通过键入退出,切换到非根 sudo 用户,然后运行:

1sudo systemctl start kafka

再次登录为您的 * kafka 用户:

1sudo -iu kafka

您已成功备份卡夫卡数据,您现在可以进入下一部分,在那里您将恢复存储在ZooKeeper中的集群状态数据。

第4步:恢复 ZooKeeper 数据

在本节中,您将恢复卡夫卡在用户创建主题、添加/删除额外节点、添加和消耗消息等操作时内部创建和管理的集群状态数据。

您需要停止 Kafka 和 ZooKeeper 服务,以防止数据目录在恢复过程中收到无效数据。

首先,通过键入退出来停止卡夫卡服务,切换到非 root sudo 用户,然后运行:

1sudo systemctl stop kafka

接下来,停止 ZooKeeper 服务:

1sudo systemctl stop zookeeper

再次登录为您的 * kafka 用户:

1sudo -iu kafka

然后,您可以使用以下命令安全地删除现有集群数据目录:

1rm -r /tmp/zookeeper/*

现在,恢复您在步骤 2 中备份的数据:

1tar -C /tmp/zookeeper -xzf /home/kafka/zookeeper-backup.tar.gz --strip-components 2

在提取数据之前,)中。

您已成功恢复群集状态数据,现在您可以在下一节中继续进行卡夫卡数据恢复过程。

第5步:恢复卡夫卡数据

在本节中,您将通过删除 Kafka 数据目录并恢复压缩的档案文件,将备份的 Kafka 数据恢复到现有源安装(或目标服务器,如果您遵循了可选步骤 7)。

您可以使用以下命令安全地删除现有 Kafka 数据目录:

1rm -r /tmp/kafka-logs/*

现在你已经删除了数据,你的Kafka安装看起来像一个新的安装,没有主题或消息在其中。

1tar -C /tmp/kafka-logs -xzf /home/kafka/kafka-backup.tar.gz --strip-components 2

在提取数据之前,)里面。

现在您已经成功提取了数据,您可以通过键入退出来重新启动 Kafka 和 ZooKeeper 服务,切换到非 root sudo 用户,然后执行:

1sudo systemctl start kafka

开始使用 ZooKeeper 服务:

1sudo systemctl start zookeeper

再次登录为您的 * kafka 用户:

1sudo -iu kafka

您已经恢复了kafka数据,您可以在下一节中继续验证恢复是否成功。

步骤6 - 检查恢复

要测试卡夫卡数据的恢复,您将从您在步骤 1 中创建的主题中消耗消息。

等待几分钟,卡夫卡启动,然后执行以下命令读取来自备份主题的消息:

1~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

如果您收到以下警告,则需要等待卡夫卡完全启动:

1[secondary_label Output]
2[2018-09-13 15:52:45,234] WARN [Consumer clientId=consumer-1, groupId=console-consumer-87747] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

在几分钟内重复上一个命令,或者运行sudo systemctl restart kafka作为您的非 root sudo 用户。

1[secondary_label Output]
2Test Message 1

如果您看不到此消息,您可以检查在上一节中是否错过了任何命令,并执行它们。

现在您已经验证了恢复的 Kafka 数据,这意味着您在单个 Kafka 安装中成功地备份和恢复了数据。

步骤 7 — 将备份迁移和恢复到另一个 Kafka 服务器(可选)

在本节中,您将从源卡夫卡服务器迁移备份数据到目的地卡夫卡服务器。 要做到这一点,您将首先使用scp命令将压缩的tar.gz文件下载到本地系统,然后再使用scp将文件推到目的地服务器。

您正在本地下载备份文件,然后将其上传到目标服务器,而不是直接从源到目的地服务器复制,因为目的地服务器在其 /home/sammy/.ssh/authorized_keys 文件中不会有源服务器的 SSH 密钥,并且无法连接到和从源服务器。

zookeeper-backup.tar.gzkafka-backup.tar.gz文件下载到本地计算机,执行:

1[environment local]
2scp sammy@source_server_ip:/home/kafka/zookeeper-backup.tar.gz .

你会看到类似的输出:

1[environment local]
2[secondary_label Output]
3zookeeper-backup.tar.gz 100%   68KB 128.0KB/s 00:00

现在运行以下命令,将kafka-backup.tar.gz文件下载到您的本地机器:

1[environment local]
2scp sammy@source_server_ip:/home/kafka/kafka-backup.tar.gz .

您将看到以下结果:

1[environment local]
2[secondary_label Output]
3kafka-backup.tar.gz 100% 1031KB 488.3KB/s 00:02

在本地机器的当前目录中运行ls,你会看到两个文件:

1[environment local]
2[secondary_label Output]
3kafka-backup.tar.gz zookeeper.tar.gz

运行以下命令,将 zookeeper-backup.tar.gz 文件传输到目标服务器的 /home/kafka/:

1[environment local]
2scp zookeeper-backup.tar.gz sammy@destination_server_ip:/home/sammy/zookeeper-backup.tar.gz

现在运行以下命令将kafka-backup.tar.gz文件转移到目标服务器的/home/kafka/文件:

1[environment local]
2scp kafka-backup.tar.gz sammy@destination_server_ip:/home/sammy/kafka-backup.tar.gz

由于文件位于 /home/sammy/ 目录中,并且没有对 kafka 用户的正确访问权限,您可以将文件移动到 /home/kafka/ 目录并更改其权限。

将 SSH 输入到目的地服务器,执行:

1[environment local]
2ssh sammy@destination_server_ip

现在将zookeeper-backup.tar.gz移动到/home/kafka/通过执行:

1[environment second]
2sudo mv zookeeper-backup.tar.gz /home/sammy/zookeeper-backup.tar.gz

同样,运行以下命令来复制 kafka-backup.tar.gz/home/kafka/:

1[environment second]
2sudo mv kafka-backup.tar.gz /home/kafka/kafka-backup.tar.gz

通过运行以下命令更改备份文件的所有者:

1[environment second]
2sudo chown kafka /home/kafka/zookeeper-backup.tar.gz /home/kafka/kafka-backup.tar.gz

之前的mvchown命令不会显示任何输出。

现在备份文件在正确的目录中存在于目标服务器中,请遵循本教程的步骤4至6中列出的命令来恢复和验证目标服务器的数据。

结论

在本教程中,您已备份、导入和迁移您的 Kafka 主题和消息,从相同的安装和单独的服务器上的安装。如果您想了解更多关于 Kafka 中的其他有用的管理任务,您可以参阅 Kafka 官方文档的 操作部分。

要远程存储备份的文件,如 zookeeper-backup.tar.gzkafka-backup.tar.gz,您可以浏览 Digital Ocean Spaces. 如果 Kafka 是您服务器上唯一运行的服务,您也可以浏览其他备份方法,如完整实例 备份

Published At
Categories with 技术
comments powered by Disqus