介绍
Transporter 是一个开源工具,用于在不同的数据存储中移动数据. 开发人员通常会为任务编写一次性脚本,例如将数据移动到数据库中,将数据从文件移动到数据库中,或者反之亦然,但使用像 Transporter 这样的工具有几个优点。
在Transporter中,您构建 pipelines,这定义了从 source (数据读取) 到 sink (数据写入) 的数据流程。 源和 sinks 可以是 SQL 或 NoSQL 数据库、平板文件或其他资源。
除了移动数据外,Transporter 还允许您使用 transformer 通过管道移动数据时更改数据. 像适配器一样,默认情况下包含 若干变换器。
在本教程中,我们将通过将数据从MongoDB数据库移动和处理到Elasticsearch的例子,使用Transporter的内置适配器和用JavaScript编写的自定义转换器。
前提条件
要遵循本教程,您将需要:
- 一个 Ubuntu 16.04 服务器通过遵循 此 Ubuntu 16.04 初始服务器设置教程设置,包括一个 sudo 非根用户和一个防火墙
- MongoDB 通过遵循 此 MongoDB 在 Ubuntu 16.04 教程安装,或现有 MongoDB 安装
- Elasticsearch 通过遵循 此 Elasticsearch 在 Ubuntu 16.04 教程安装,或现有 Elasticsearch 安装
您不需要任何先前的JavaScript知识或经验来跟随本教程,但您可以在 这些JavaScript教程中了解更多。
步骤 1 - 安装运输
Transporter为大多数常见操作系统提供二进制,Ubuntu的安装过程包括两个步骤:下载Linux二进制并使其可执行。
首先,从 Transporter 的 GitHub 上的最新版本页面获取最新版本的链接。 复制以 -linux-amd64
结束的链接。
下载二进制到您的家庭目录。
1cd
2wget https://github.com/compose/transporter/releases/download/v0.5.2/transporter-0.5.2-linux-amd64
将其移动到 /usr/local/bin
或您偏好的安装目录。
1mv transporter-*-linux-amd64 /usr/local/bin/transporter
然后使它可执行,以便您可以运行它。
1chmod +x /usr/local/bin/transporter
您可以通过运行二进制来测试 Transporter 是否正确设置。
1transporter
您将看到使用帮助输出和版本号:
1[secondary_label Output]
2USAGE
3 transporter <command> [flags]
4
5COMMANDS
6 run run pipeline loaded from a file
7 . . .
8
9VERSION
10 0.5.2
为了使用 Transporter 将数据从 MongoDB 移动到 Elasticsearch,我们需要两件事:我们想要移动的 MongoDB 中的数据和一条管道,告诉 Transporter 如何移动它。
步骤 2 — 将示例数据添加到 MongoDB (可选)
在此步骤中,我们将创建一个单一集合的示例数据库在MongoDB中,并将一些文档添加到该集合中,然后,在教程的其余部分中,我们将使用Transporter管道迁移和转换此示例数据。
首先,连接到您的 MongoDB 数据库。
1mongo
这将改变您的提示为mongo>
,表示您正在使用MongoDB壳。
从这里,选择一个数据库来工作,我们将称之为my_application
。
1use my_application
在MongoDB
中,您不需要明确创建数据库或集合,一旦您开始将数据添加到您以名义选择的数据库中,该数据库将自动创建。
因此,要创建my_application
数据库,请将两个文档保存到其用户
集合中:一个代表Sammy Shark,一个代表Gilly Glowfish。
1db.users.save({"firstName": "Sammy", "lastName": "Shark"});
2db.users.save({"firstName": "Gilly", "lastName": "Glowfish"});
添加文档后,您可以查询用户
集合以查看您的记录。
1db.users.find().pretty();
输出将与下面的输出类似,但_id
列将不同。
1[secondary_label output]
2{
3 "_id" : ObjectId("59299ac7f80b31254a916456"),
4 "firstName" : "Sammy",
5 "lastName" : "Shark"
6}
7{
8 "_id" : ObjectId("59299ac7f80b31254a916457"),
9 "firstName" : "Gilly",
10 "lastName" : "Glowfish"
11}
按CTRL+C
来退出 MongoDB 壳。
接下来,让我们创建一个 Transporter 管道,将这些数据从 MongoDB 移动到 Elasticsearch。
步骤三:建立基础管道
在Transporter中,一个管道是由一个名为pipeline.js
的JavaScript文件默认定义的。内置的init
命令在正确的目录中创建了一个基本的 配置文件,给出一个源和分解。
以 MongoDB 作为源和 Elasticsearch 作为污点为初始化起始 `pipeline.js 。
1transporter init mongodb elasticsearch
您将看到以下输出:
1[secondary_label Output]
2Writing pipeline.js...
您不需要为此步骤修改pipeline.js
,但让我们看看它是如何工作的。
文件看起来是这样的,但你也可以使用命令cat pipeline.js
,less pipeline.js
查看文件的内容(通过按q
来输出less
)或使用你最喜欢的文本编辑器打开它。
1[label pipeline.js]
2var source = mongodb({
3 "uri": "${MONGODB_URI}"
4 // "timeout": "30s",
5 // "tail": false,
6 // "ssl": false,
7 // "cacerts": ["/path/to/cert.pem"],
8 // "wc": 1,
9 // "fsync": false,
10 // "bulk": false,
11 // "collection_filters": "{}",
12 // "read_preference": "Primary"
13})
14
15var sink = elasticsearch({
16 "uri": "${ELASTICSEARCH_URI}"
17 // "timeout": "10s", // defaults to 30s
18 // "aws_access_key": "ABCDEF", // used for signing requests to AWS Elasticsearch service
19 // "aws_access_secret": "ABCDEF" // used for signing requests to AWS Elasticsearch service
20 // "parent_id": "elastic_parent" // defaults to "elastic_parent" parent identifier for Elasticsearch
21})
22
23t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
以var source
和var sink
开头的行分别为 MongoDB 和 Elasticsearch 适配器定义了 JavaScript 变量。
以//
开头的行是评论,它们突出了你可以为你的管道设置的一些常见配置选项,但我们不使用它们为我们在这里创建的基本管道。
变量transporter
或t
允许我们访问我们的管道. 我们使用.Source()
和.Save()
函数添加源并使用文件中先前定义的source
和sink
变量。
至於「Source()」和「Save()」函數的第三個論點是「namespace.' Passing `/.*/'」,因為最後一個論點意味著我們要從 MongoDB 轉移所有資料,並在 Elasticsearch 中儲存在相同的名稱空間之下。
在我们可以运行此管道之前,我们需要为 MongoDB URI和 Elasticsearch URI设置 环境变量。
1export MONGODB_URI='mongodb://localhost/my_application'
2export ELASTICSEARCH_URI='http://localhost:9200/my_application'
现在我们已经准备好运行管道了。
1transporter run pipeline.js
你会看到输出以这样的方式结束:
1[secondary_label Output]
2. . .
3INFO[0001] metrics source records: 2 path=source ts=1522942118483391242
4INFO[0001] metrics source/sink records: 2 path="source/sink" ts=1522942118483395960
5INFO[0001] exit map[source:mongodb sink:elasticsearch] ts=1522942118483396878
在第二行和第三行到最后一行中,此输出表明源中存在2个记录,并将2个记录移动到水槽中。
要确认这两个记录都已被处理,您可以查询Elasticsearch的my_application
数据库的内容,该数据库现在应该存在。
1curl $ELASTICSEARCH_URI/_search?pretty=true
?pretty=true
参数使输出更容易阅读:
1[secondary_label Output]
2{
3 "took" : 5,
4 "timed_out" : false,
5 "_shards" : {
6 "total" : 5,
7 "successful" : 5,
8 "skipped" : 0,
9 "failed" : 0
10 },
11 "hits" : {
12 "total" : 2,
13 "max_score" : 1.0,
14 "hits" : [
15 {
16 "_index" : "my_application",
17 "_type" : "users",
18 "_id" : "5ac63e9c6687d9f638ced4fe",
19 "_score" : 1.0,
20 "_source" : {
21 "firstName" : "Gilly",
22 "lastName" : "Glowfish"
23 }
24 },
25 {
26 "_index" : "my_application",
27 "_type" : "users",
28 "_id" : "5ac63e986687d9f638ced4fd",
29 "_score" : 1.0,
30 "_source" : {
31 "firstName" : "Sammy",
32 "lastName" : "Shark"
33 }
34 }
35 ]
36 }
37}
MongoDB 中的数据库和集合类似于 Elasticsearch 中的索引和类型。
_index
字段设置为my_application,
原始 MongoDB 数据库的名称。_type
字段设置为users,
MongoDB 集合的名称。
这证实了MongoDB的两个记录都通过Transporter成功处理,并加载到Elasticsearch,以建立这个基本的管道,我们将添加一个中间处理步骤,可以转换输入数据。
步骤 4 – 创建一个转换器
正如其名称所示,transformers 在将其加载到水槽之前修改源数据,例如,它们允许您添加一个新的字段,删除一个字段,或更改一个字段的数据。
通常,自定义转换器被写成JavaScript函数,并存储在一个单独的文件中。 为了使用它们,你在pipeline.js中添加了转换器文件的参考。 Transporter包括Otto和Goja的JavaScript引擎。 因为Goja更新,通常更快,我们在这里使用它。 唯一的功能差异是语法。
创建一个名为transform.js
的文件,我们将使用它来编写我们的转换函数。
1nano transform.js
以下是我们将使用的函数,该函数将创建一个名为fullName
的新字段,其值将是firstName
和lastName
字段连接在一起,分开一个空间(如Sammy Shark
)。
1[label transform.js]
2function transform(msg) {
3 msg.data.fullName = msg.data.firstName + " " + msg.data.lastName;
4 return msg
5}
让我们通过这个文件的行列行走:
- 文件的第一个行,
函数转换(msg),
是 函数定义。 msg
是包含源文档的详细信息的 JavaScript 对象 我们使用此对象(https://andsky.com/tech/tutorials/understanding-objects-in-javascript#accessing-object-properties)通过管道 通过管道- 函数的第一个行(https://andsky.com/tech/tutorials/how-to-work-with-strings-in-javascript#string-concatenation)连接(https://andsky.com/tech/tutorials/how-to-work-with-strings-in-javascript#string-concatenation)两个现有字段,并(https://andsky.com/tech/tutorials/understanding-objects-in-javascript#adding-and-modifying-object-properties)将该值分配(https://andsky.com/tech/tutorials/understanding-objects-in-javascript#adding-and-modifying-object-properties)到新的
完整名称
字段 - 函数的最后一行返回新修改的
msg
对象用于管道的其余部分
保存并关闭文件。
接下来,我们需要修改管道以使用这种转换器. 打开pipeline.js
文件进行编辑。
1nano pipeline.js
在最后一行中,我们需要添加一个调用到Transform()
函数,以便将变换器添加到Source()
和Save()
的调用之间的管道中,如下:
1[label ~/transporter/pipeline.js]
2. . .
3t.Source("source", source, "/.*/")
4.Transform(goja({"filename": "transform.js"}))
5.Save("sink", sink, "/.*/")
通过使用goja
函数,我们使用它的 相对路径来指定转换器的文件名。
在重新启动管道以测试变压器之前,让我们从以前的测试中清除 Elasticsearch 中的现有数据。
1curl -XDELETE $ELASTICSEARCH_URI
您将看到此输出承认命令的成功。
1[secondary_label Output]
2{"acknowledged":true}
现在再开管道了。
1transporter run pipeline.js
输出将看起来非常类似于以前的测试,你可以在最后几行中看到管道是否像以前一样成功完成。
1curl $ELASTICSEARCH_URI/_search?pretty=true
您可以在新输出中看到fullName
字段:
1[secondary_label Output]
2{
3 "took" : 9,
4 "timed_out" : false,
5 "_shards" : {
6 "total" : 5,
7 "successful" : 5,
8 "skipped" : 0,
9 "failed" : 0
10 },
11 "hits" : {
12 "total" : 2,
13 "max_score" : 1.0,
14 "hits" : [
15 {
16 "_index" : "my_application",
17 "_type" : "users",
18 "_id" : "5ac63e9c6687d9f638ced4fe",
19 "_score" : 1.0,
20 "_source" : {
21 "firstName" : "Gilly",
22 "fullName" : "Gilly Glowfish",
23 "lastName" : "Glowfish"
24 }
25 },
26 {
27 "_index" : "my_application",
28 "_type" : "users",
29 "_id" : "5ac63e986687d9f638ced4fd",
30 "_score" : 1.0,
31 "_source" : {
32 "firstName" : "Sammy",
33 "fullName" : "Sammy Shark",
34 "lastName" : "Shark"
35 }
36 }
37 ]
38 }
39}
请注意,两个文档中都添加了fullName
字段,值设置正确,现在我们知道如何将自定义转换添加到 Transporter 管道中。
结论
你已经构建了一个基本的Transporter管道,使用一个变压器来复制和修改数据从MongoDB到Elasticsearch. 你可以以相同的方式应用更复杂的转换,在同一个管道中链接多个转换,等等。
您可以查看 GitHub 上的 Transporter 项目以获取 API 的最新更改,并访问 Transporter 维基以获取有关如何使用适配器、变压器和 Transformer 的其他功能的详细信息。