如何在 Ubuntu 16.04 上使用 Transporter 将转换后的数据从 MongoDB 同步到 Elasticsearch

介绍

Transporter 是一个开源工具,用于在不同的数据存储中移动数据. 开发人员通常会为任务编写一次性脚本,例如将数据移动到数据库中,将数据从文件移动到数据库中,或者反之亦然,但使用像 Transporter 这样的工具有几个优点。

在Transporter中,您构建 pipelines,这定义了从 source (数据读取) 到 sink (数据写入) 的数据流程。 源和 sinks 可以是 SQL 或 NoSQL 数据库、平板文件或其他资源。

除了移动数据外,Transporter 还允许您使用 transformer 通过管道移动数据时更改数据. 像适配器一样,默认情况下包含 若干变换器

在本教程中,我们将通过将数据从MongoDB数据库移动和处理到Elasticsearch的例子,使用Transporter的内置适配器和用JavaScript编写的自定义转换器。

前提条件

要遵循本教程,您将需要:

您不需要任何先前的JavaScript知识或经验来跟随本教程,但您可以在 这些JavaScript教程中了解更多。

步骤 1 - 安装运输

Transporter为大多数常见操作系统提供二进制,Ubuntu的安装过程包括两个步骤:下载Linux二进制并使其可执行。

首先,从 Transporter 的 GitHub 上的最新版本页面获取最新版本的链接。 复制以 -linux-amd64 结束的链接。

下载二进制到您的家庭目录。

1cd
2wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64

将其移动到 /usr/local/bin或您偏好的安装目录。

1mv transporter-*-linux-amd64 /usr/local/bin/transporter

然后使它可执行,以便您可以运行它。

1chmod +x /usr/local/bin/transporter

您可以通过运行二进制来测试 Transporter 是否正确设置。

1transporter

您将看到使用帮助输出和版本号:

 1[secondary_label Output]
 2USAGE
 3  transporter <command> [flags]
 4
 5COMMANDS
 6  run run pipeline loaded from a file
 7  . . .
 8
 9VERSION
10  0.5.2

为了使用 Transporter 将数据从 MongoDB 移动到 Elasticsearch,我们需要两件事:我们想要移动的 MongoDB 中的数据和一条管道,告诉 Transporter 如何移动它。

步骤 2 — 将示例数据添加到 MongoDB (可选)

在此步骤中,我们将创建一个单一集合的示例数据库在MongoDB中,并将一些文档添加到该集合中,然后,在教程的其余部分中,我们将使用Transporter管道迁移和转换此示例数据。

首先,连接到您的 MongoDB 数据库。

1mongo

这将改变您的提示为mongo>,表示您正在使用MongoDB壳。

从这里,选择一个数据库来工作,我们将称之为my_application

1use my_application

MongoDB中,您不需要明确创建数据库或集合,一旦您开始将数据添加到您以名义选择的数据库中,该数据库将自动创建。

因此,要创建my_application数据库,请将两个文档保存到其用户集合中:一个代表Sammy Shark,一个代表Gilly Glowfish。

1db.users.save({"firstName": "Sammy", "lastName": "Shark"});
2db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});

添加文档后,您可以查询用户集合以查看您的记录。

1db.users.find().pretty();

输出将与下面的输出类似,但_id列将不同。

 1[secondary_label output]
 2{
 3  "_id" : ObjectId("59299ac7f80b31254a916456"),
 4  "firstName" : "Sammy",
 5  "lastName" : "Shark"
 6}
 7{
 8  "_id" : ObjectId("59299ac7f80b31254a916457"),
 9  "firstName" : "Gilly",
10  "lastName" : "Glowfish"
11}

CTRL+C来退出 MongoDB 壳。

接下来,让我们创建一个 Transporter 管道,将这些数据从 MongoDB 移动到 Elasticsearch。

步骤三:建立基础管道

在Transporter中,一个管道是由一个名为pipeline.js的JavaScript文件默认定义的。内置的init命令在正确的目录中创建了一个基本的 配置文件,给出一个源和分解。

以 MongoDB 作为源和 Elasticsearch 作为污点为初始化起始 `pipeline.js 。

1transporter init mongodb elasticsearch

您将看到以下输出:

1[secondary_label Output]
2Writing pipeline.js...

您不需要为此步骤修改pipeline.js,但让我们看看它是如何工作的。

文件看起来是这样的,但你也可以使用命令cat pipeline.js,less pipeline.js查看文件的内容(通过按q来输出less)或使用你最喜欢的文本编辑器打开它。

 1[label pipeline.js]
 2var source = mongodb({
 3  "uri": "${MONGODB_URI}"
 4  // "timeout": "30s",
 5  // "tail": false,
 6  // "ssl": false,
 7  // "cacerts": ["/path/to/cert.pem"],
 8  // "wc": 1,
 9  // "fsync": false,
10  // "bulk": false,
11  // "collection_filters": "{}",
12  // "read_preference": "Primary"
13})
14
15var sink = elasticsearch({
16  "uri": "${ELASTICSEARCH_URI}"
17  // "timeout": "10s", // defaults to 30s
18  // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
19  // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
20  // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
21})
22
23t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

var sourcevar sink开头的行分别为 MongoDB 和 Elasticsearch 适配器定义了 JavaScript 变量

//开头的行是评论,它们突出了你可以为你的管道设置的一些常见配置选项,但我们不使用它们为我们在这里创建的基本管道。

变量transportert允许我们访问我们的管道. 我们使用.Source().Save() 函数添加源并使用文件中先前定义的sourcesink变量。

至於「Source()」和「Save()」函數的第三個論點是「namespace.' Passing `/.*/'」,因為最後一個論點意味著我們要從 MongoDB 轉移所有資料,並在 Elasticsearch 中儲存在相同的名稱空間之下。

在我们可以运行此管道之前,我们需要为 MongoDB URIElasticsearch URI设置 环境变量

1export MONGODB_URI='mongodb://localhost/my_application'
2export ELASTICSEARCH_URI='http://localhost:9200/my_application'

现在我们已经准备好运行管道了。

1transporter run pipeline.js

你会看到输出以这样的方式结束:

1[secondary_label Output]
2. . .
3INFO[0001] metrics source records: 2 path=source ts=1522942118483391242
4INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960
5INFO[0001] exit map[source:mongodb sink:elasticsearch]   ts=1522942118483396878

在第二行和第三行到最后一行中,此输出表明源中存在2个记录,并将2个记录移动到水槽中。

要确认这两个记录都已被处理,您可以查询Elasticsearch的my_application数据库的内容,该数据库现在应该存在。

1curl $ELASTICSEARCH_URI/_search?pretty=true

?pretty=true参数使输出更容易阅读:

 1[secondary_label Output]
 2{
 3  "took" : 5,
 4  "timed_out" : false,
 5  "_shards" : {
 6    "total" : 5,
 7    "successful" : 5,
 8    "skipped" : 0,
 9    "failed" : 0
10  },
11  "hits" : {
12    "total" : 2,
13    "max_score" : 1.0,
14    "hits" : [
15      {
16        "_index" : "my_application",
17        "_type" : "users",
18        "_id" : "5ac63e9c6687d9f638ced4fe",
19        "_score" : 1.0,
20        "_source" : {
21          "firstName" : "Gilly",
22          "lastName" : "Glowfish"
23        }
24      },
25      {
26        "_index" : "my_application",
27        "_type" : "users",
28        "_id" : "5ac63e986687d9f638ced4fd",
29        "_score" : 1.0,
30        "_source" : {
31          "firstName" : "Sammy",
32          "lastName" : "Shark"
33        }
34      }
35    ]
36  }
37}

MongoDB 中的数据库和集合类似于 Elasticsearch 中的索引和类型。

  • _index 字段设置为 my_application,原始 MongoDB 数据库的名称。
  • _type 字段设置为 users, MongoDB 集合的名称。

这证实了MongoDB的两个记录都通过Transporter成功处理,并加载到Elasticsearch,以建立这个基本的管道,我们将添加一个中间处理步骤,可以转换输入数据。

步骤 4 – 创建一个转换器

正如其名称所示,transformers 在将其加载到水槽之前修改源数据,例如,它们允许您添加一个新的字段,删除一个字段,或更改一个字段的数据。

通常,自定义转换器被写成JavaScript函数,并存储在一个单独的文件中。 为了使用它们,你在pipeline.js中添加了转换器文件的参考。 Transporter包括Otto和Goja的JavaScript引擎。 因为Goja更新,通常更快,我们在这里使用它。 唯一的功能差异是语法。

创建一个名为transform.js的文件,我们将使用它来编写我们的转换函数。

1nano transform.js

以下是我们将使用的函数,该函数将创建一个名为fullName的新字段,其值将是firstNamelastName字段连接在一起,分开一个空间(如Sammy Shark)。

1[label transform.js]
2function transform(msg) {
3    msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
4    return msg
5}

让我们通过这个文件的行列行走:

保存并关闭文件。

接下来,我们需要修改管道以使用这种转换器. 打开pipeline.js文件进行编辑。

1nano pipeline.js

在最后一行中,我们需要添加一个调用到Transform()函数,以便将变换器添加到Source()Save()的调用之间的管道中,如下:

1[label ~/transporter/pipeline.js]
2. . .
3t.Source("source", source, "/.*/")
4.Transform(goja({"filename": "transform.js"}))
5.Save("sink", sink, "/.*/")

通过使用goja函数,我们使用它的 相对路径来指定转换器的文件名。

在重新启动管道以测试变压器之前,让我们从以前的测试中清除 Elasticsearch 中的现有数据。

1curl -XDELETE $ELASTICSEARCH_URI

您将看到此输出承认命令的成功。

1[secondary_label Output]
2{"acknowledged":true}

现在再开管道了。

1transporter run pipeline.js

输出将看起来非常类似于以前的测试,你可以在最后几行中看到管道是否像以前一样成功完成。

1curl $ELASTICSEARCH_URI/_search?pretty=true

您可以在新输出中看到fullName字段:

 1[secondary_label Output]
 2{
 3  "took" : 9,
 4  "timed_out" : false,
 5  "_shards" : {
 6    "total" : 5,
 7    "successful" : 5,
 8    "skipped" : 0,
 9    "failed" : 0
10  },
11  "hits" : {
12    "total" : 2,
13    "max_score" : 1.0,
14    "hits" : [
15      {
16        "_index" : "my_application",
17        "_type" : "users",
18        "_id" : "5ac63e9c6687d9f638ced4fe",
19        "_score" : 1.0,
20        "_source" : {
21          "firstName" : "Gilly",
22          "fullName" : "Gilly Glowfish",
23          "lastName" : "Glowfish"
24        }
25      },
26      {
27        "_index" : "my_application",
28        "_type" : "users",
29        "_id" : "5ac63e986687d9f638ced4fd",
30        "_score" : 1.0,
31        "_source" : {
32          "firstName" : "Sammy",
33          "fullName" : "Sammy Shark",
34          "lastName" : "Shark"
35        }
36      }
37    ]
38  }
39}

请注意,两个文档中都添加了fullName字段,值设置正确,现在我们知道如何将自定义转换添加到 Transporter 管道中。

结论

你已经构建了一个基本的Transporter管道,使用一个变压器来复制和修改数据从MongoDB到Elasticsearch. 你可以以相同的方式应用更复杂的转换,在同一个管道中链接多个转换,等等。

您可以查看 GitHub 上的 Transporter 项目以获取 API 的最新更改,并访问 Transporter 维基以获取有关如何使用适配器、变压器和 Transformer 的其他功能的详细信息。

Published At
Categories with 技术
comments powered by Disqus