使用 YARN REST API 提交 MapReduce 任务时遇到的重复任务问题

最近在尝试使用 YARN REST API 提交 MapReduce PI 任务的时候,发现了一个现象:通过 REST API 提交的任务会失败,但是在提交的任务后面会额外多出两个任务,而这两个额外任务都会成功。本文针对这一现象进行深入探讨。

使用 YARN REST API 提交任务的流程

根据 Hadoop 官网提供的文档,使用 YARN REST API 提交任务需要两个步骤:

Cluster New Application API

这一步非常简单,只需要做出如下 POST 请求:

1
POST http://rm-http-address:port/ws/v1/cluster/apps/new-application

服务器返回 200 OK,并提供以下字段,包括请求得到的 application-id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"application-id": "application_1634012636701_0009",
"maximum-resource-capability": {
"memory": 8192,
"vCores": 2,
"resourceInformations": {
"resourceInformation": [
{
"attributes": {},
"maximumAllocation": 9223372036854775807,
"minimumAllocation": 0,
"name": "memory-mb",
"resourceType": "COUNTABLE",
"units": "Mi",
"value": 8192
},
{
"attributes": {},
"maximumAllocation": 9223372036854775807,
"minimumAllocation": 0,
"name": "vcores",
"resourceType": "COUNTABLE",
"units": "",
"value": 2
}
]
}
}
}

Submit Application API

这一步需要准备一个 JSON,并做出如下请求:

1
POST http://rm-http-address:port/ws/v1/cluster/apps?user.name=root

注意,如果这里没有指定 ?user.name=root,则可能会以 dr.who 作为用户提交任务,可能会出现权限问题。

在这里我们尝试运行一个 MapReduce PI 程序。Request Body 如下,其中 command 表示需要运行的 MapReduce 程序命令,需要根据自己的 Hadoop 安装指定;CLASSPATH 也需要根据自己的 Hadoop 安装指定,以下设置并不十分通用。resource 表示请求的资源,unmanaged-AM 表示是否需要使用 Unmanaged Application Master,keep-containers-across-application-attempts 则表示是否需要回收此应用使用的 Container。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
"application-id":"application_1634012636701_0009",
"application-name":"pi",
"am-container-spec":
{
"commands":
{
"command": "{{HADOOP_HOME}}/bin/hadoop jar {{HADOOP_HOME}}/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar pi 10 10"
},
"environment":
{
"entry":
[
{
"key": "CLASSPATH",
"value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
}
]
}
},
"unmanaged-AM":false,
"max-app-attempts":"2",
"resource":
{
"memory":"512",
"vCores":"1"
},
"application-type":"MAPREDUCE",
"keep-containers-across-application-attempts":false
}

服务器返回 202 Accepted,也可以在 UI 看到任务已经开始运行。

使用 YARN REST API 提交 MapReduce 任务出现的重复任务问题

提交 MapReduce PI 任务后,发现了一个神奇的现象。以上面提交的任务举例,提交的 application-idapplication_1634012636701_0009,但在 UI 上却出现了两个额外的任务:application_1634012636701_0010application_1634012636701_0011,这两个额外的任务都会成功,提交的 application_1634012636701_0009 任务却会失败。

失败任务的诊断信息显示 Application application_1634012636701_0009 failed 2 times due to AM Container for appattempt_1634012636701_0009_000002 exited with exitCode: 0,去机器上运行 yarn logs -applicationId application_1634012636701_0009 查看日志,也没有发现报错信息,任务运行成功,也有正常的输出:

prelaunch.out (container_1634012636701_0009_01_000001)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Setting up env variables
Setting up job resources
Copying debugging information
Launching container
Number of Maps = 10
Samples per Map = 10
Wrote input for Map #0
Wrote input for Map #1
Wrote input for Map #2
Wrote input for Map #3
Wrote input for Map #4
Wrote input for Map #5
Wrote input for Map #6
Wrote input for Map #7
Wrote input for Map #8
Wrote input for Map #9
Starting Job
Job Finished in 39.979 seconds
Estimated value of Pi is 3.20000000000000000000

下面深入探讨这个问题出现的原因。

MapReduce 重复任务问题的原因

在网上也可以看到有人遇到同样的问题,追溯一下,最终发现 YARN-3084 里有人提到了这个问题,并且有一些详细的解释。这里我们简单总结一下。

首先,在提交任务时,202 Accepted 表示 RM 已经接受了这个任务,任务也已经开始运行。但是为什么任务会失败呢?首先我们看一下启动 Container 的时候做了什么:

launch_container.sh (container_1634012636701_0009_01_000001)
1
2
3
# ...
echo "Launching container"
exec /bin/bash -c "$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar pi 10 10"

从这里可以发现,application_1634012636701_0009 的 Container 实际运行的命令是一条 hadoop jar 命令,并不是 MapReduce 任务。也就是说,这个 Container 帮我们提交了 MapReduce 任务,并未实际启动任何 Application Master,随后就完成了它的使命。在 prelaunch.err 输出中,我们也可以看到这条命令实际提交的一个新的任务:

prelaunch.err (container_1634012636701_0009_01_000001)
1
2
3
4
5
6
7
8
9
10
11
12
...
2021-10-12 06:44:53,819 INFO mapreduce.JobSubmissionFiles: Permissions on staging directory /tmp/hadoop-yarn/staging/root/.staging are incorrect: rwxrwxrwx. Fixing permissions to correct value rwx------
2021-10-12 06:44:54,867 INFO input.FileInputFormat: Total input files to process : 10
2021-10-12 06:44:55,166 INFO mapreduce.JobSubmitter: number of splits:10
2021-10-12 06:44:55,579 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1634012636701_0010
2021-10-12 06:44:55,706 INFO mapreduce.JobSubmitter: Executing with tokens: [Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 9 cluster_timestamp: 1634012636701 } attemptId: 1 } keyId: 2035624994)]
2021-10-12 06:44:56,240 INFO conf.Configuration: resource-types.xml not found
2021-10-12 06:44:56,241 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2021-10-12 06:44:56,926 INFO impl.YarnClientImpl: Submitted application application_1634012636701_0010
2021-10-12 06:44:56,977 INFO mapreduce.Job: The url to track the job: http://resourcemanager:8088/proxy/application_1634012636701_0010/
2021-10-12 06:44:56,977 INFO mapreduce.Job: Running job: job_1634012636701_0010
...

所以,在第一次任务尝试中,Container 提交了一个新任务,任务 ID 为 job_1634012636701_0010,这也就是我们在 UI 上看到的第一个额外任务。在这个任务的日志中,我们才能看到实际的 Application Master 的启动:

launch_container.sh (container_1634012636701_0010_01_000001)
1
2
3
# ...
echo "Launching container"
exec /bin/bash -c "$JAVA_HOME/bin/java -Djava.io.tmpdir=$PWD/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=/data/yarn/log/application_1634012636701_0010/container_1634012636701_0010_01_000001 -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=INFO,CLA -Dhadoop.root.logfile=syslog -Xmx1024m org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1>/data/yarn/log/application_1634012636701_0010/container_1634012636701_0010_01_000001/stdout 2>/data/yarn/log/application_1634012636701_0010/container_1634012636701_0010_01_000001/stderr "
syslog (container_1634012636701_0010_01_000001)
1
2
2021-10-12 06:47:34,910 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1634012636701_0010_000001
2021-10-12 06:47:35,032 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster:

回到 application_1634012636701_0009 这个任务,因为这个任务只是运行了一句 hadoop jar 命令,并未实际启动 Application Master,RM 没有看到 AM 的启动,就会把这次任务尝试标记为失败。随后,第二次任务尝试开始,一个新的 Container 又运行了一遍 hadoop jar 命令,启动了一个新的任务 application_1634012636701_0011,随后退出。两次尝试均失败后,任务会被标记为失败。这就是 MapReduce 出现重复任务,并且原任务失败的原因。

由此我们可以看到,YARN REST API 类似于 RPC API,它的目的是启动 Application Master,而不是任务本身。因此,如果采用上面的方式提交 MapReduce 任务,都会出现这种现象。

使用 YARN REST API 提交 DistributedShell 任务

了解了这种现象发生的原因,我们现在研究一下如何正确提交任务。其实,Hadoop 官网给出的例子是一个 DistributedShell 任务,而不是 MapReduce。YARN-3084 里也提到,推荐使用 DistributedShell 而不是 MapReduce。

为什么不推荐提交 MapReduce 呢?因为 YARN REST API 的目的是启动 Application Master,而 MapReduce 任务在启动 Application Master 之前,JobClient 会做一些准备工作,例如计算作业分片,上传所需的 job.xmljob.jarjob.split 文件、检查作业输入输出位置等。所以,除非用户自己实现这些逻辑并且放入运行的命令中,否则仅仅启动 MRAppMaster 是不够的。因此通过 REST API 提交 MapReduce 任务的流程将会极其复杂。

在这里,我们尝试提交一次 DistributedShell 任务,该任务会使用 org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster

准备所需文件并上传至 HDFS

本文假设使用的文件系统为 HDFS,使用 S3 可能会出现文件时间戳不一致的问题,会另外写文章说明。

准备以下文件并上传至 HDFS:

  • AppMaster.jar,我们使用 $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-*.jar
  • distributed-shell,是一个 Shell 脚本,里面写入简单的内容:ls -al

它们在 HDFS 的位置如下:

1
2
3
4
$ hdfs dfs -ls /ds
Found 2 items
-rw-r--r-- 3 root hdfs 7 2021-10-13 16:32 /ds/distributed-shell
-rw-r--r-- 3 root hdfs 81129 2021-10-13 16:32 /ds/hadoop-yarn-applications-distributedshell-3.3.1.jar

提交 DistributedShell 任务

首先使用 POST http://rm-http-address:port/ws/v1/cluster/apps/new-application 获取新的 application-id,然后做出如下请求:

1
POST http://rm-http-address:port/ws/v1/cluster/apps?user.name=root
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
{
"application-id":"application_1632883941558_0024",
"application-name":"ds",
"am-container-spec":
{
"local-resources":
{
"entry":
[
{
"key":"AppMaster.jar",
"value":
{
"resource":"hdfs://master:9000/ds/hadoop-yarn-applications-distributedshell-3.3.1.jar",
"type":"FILE",
"visibility":"APPLICATION",
"size": 81129,
"timestamp": 1634113979705
}
}
]
},
"commands":
{
"command":"{{JAVA_HOME}}/bin/java -Xmx128m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 512 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
},
"environment":
{
"entry":
[
{
"key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
"value": "1634113979639"
},
{
"key": "CLASSPATH",
"value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLEN",
"value": "7"
},
{
"key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
"value": "hdfs://master:9000/ds/distributed-shell"
}
]
}
},
"unmanaged-AM":false,
"max-app-attempts":2,
"resource":
{
"memory":1024,
"vCores":1
},
"application-type":"YARN",
"keep-containers-across-application-attempts":false
}

在这里,AppMaster.jarresourcesizetimestamp 需要根据 HDFS 上的文件自行设定,同理,DISTRIBUTEDSHELLSCRIPTTIMESTAMPDISTRIBUTEDSHELLSCRIPTLENDISTRIBUTEDSHELLSCRIPTLOCATION 也需要根据 HDFS 上的 Shell 文件自行修改。其中,文件的大小可以用 hdfs dfs -ls 查看,文件的修改时间可以用 hdfs dfs -stat %Y 查看,对应填写即可。

最后请注意,distributed-shell 文件会在运行时被动态改为 distributed-shell.sh 文件,所以在重复测试时,需要将文件名改回。