使用 Hadoop S3 时出现的 Resource changed on src filesystem 问题

本文简要概述了在使用 Hadoop S3 时遇到的 Resource changed on src filesystem 问题及临时解决方法。关于如何在 Hadoop 中使用 S3,请参考 在Hadoop集群中使用S3(对象存储)文件系统

问题简述

在Hadoop中使用S3时,有时会遇到 Resource changed on src filesystem 的问题。最简单的复现方法就是尝试提交一个 DistributedShell 任务(使用S3上已经存在的 Shell 脚本文件):

1
2
3
4
5
6
7
8
9
10
11
$ echo "ls" > distributed-shell
$ hdfs dfs -put distributed-shell
$ hadoop jar \
$HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-*.jar \
org.apache.hadoop.yarn.applications.distributedshell.Client \
--jar $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-*.jar \
--shell_script distributed-shell \
--num_containers 10 \
--container_memory 350 \
--master_memory 350 \
--priority 10

然后就会遇到以下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java.io.IOException: Resource s3a://yarn/user/root/DistributedShell/application_1641533299713_0002/ExecScript.sh changed on src filesystem (expected 1641534006000, was 1641534011000
at org.apache.hadoop.yarn.util.FSDownload.verifyAndCopy(FSDownload.java:273)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:67)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:414)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:411)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:411)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:242)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:235)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:223)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

出现原因

这个错误的报错位置在 hadoop-yarn-common 包下的 org.apache.hadoop.yarn.util.FSDownload 中:

FSDownload.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void verifyAndCopy(Path destination)
throws IOException, YarnException {
final Path sCopy;
try {
sCopy = resource.getResource().toPath();
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
FileSystem sourceFs = sCopy.getFileSystem(conf);
FileStatus sStat = sourceFs.getFileStatus(sCopy);
if (sStat.getModificationTime() != resource.getTimestamp()) {
throw new IOException("Resource " + sCopy + " changed on src filesystem" +
" - expected: " +
"\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
", was: " +
"\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"");
}
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
throw new IOException("Resource " + sCopy +
" is not publicly accessible and as such cannot be part of the" +
" public cache.");
}
}

downloadAndUnpack(sCopy, destination);
}

对于 DistributedShell 任务来讲,它会在运行时更改 ExecScript 文件为 ExecScript.shExecScript 是从 distributed-shell 文件复制而来)。在S3的实现中,更改文件名会造成时间戳被修改(HDFS则不会更改)。关于这一点的详细解释可以查看官网的相关内容。以下截取hadoop-yarn-applications-distributedshell中的部分源码:

ApplicationMaster.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// The container for the eventual shell commands needs its own local
// resources too.
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!scriptPath.isEmpty()) {
Path renamedScriptPath = null;
if (Shell.WINDOWS) {
renamedScriptPath = new Path(scriptPath + ".bat");
} else {
renamedScriptPath = new Path(scriptPath + ".sh");
}

try {
// rename the script file based on the underlying OS syntax.
renameScriptFile(renamedScriptPath);
} catch (Exception e) {
LOG.error(
"Not able to add suffix (.bat/.sh) to the shell script filename",
e);
// We know we cannot continue launching the container
// so we should release it.
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
}

URL yarnUrl = null;
try {
yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + renamedScriptPath, e);
// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
// TODO
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
}
LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
shellScriptPathLen, shellScriptPathTimestamp);
localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
EXEC_SHELL_STRING_PATH, shellRsrc);
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
}

解决方法

本节部分参考了这个网页

可以将上面的代码抛出异常的部分改为日志:

FSDownload.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void verifyAndCopy(Path destination)
throws IOException, YarnException {
final Path sCopy;
try {
sCopy = resource.getResource().toPath();
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
FileSystem sourceFs = sCopy.getFileSystem(conf);
FileStatus sStat = sourceFs.getFileStatus(sCopy);
if (sStat.getModificationTime() != resource.getTimestamp()) {
/*
throw new IOException("Resource " + sCopy + " changed on src filesystem" +
" - expected: " +
"\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
", was: " +
"\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"");
*/
LOG.warn("Resource " + sCopy + " changed on src filesystem" +
" - expected: " +
"\"" + Times.formatISO8601(resource.getTimestamp()) + "\"" +
", was: " +
"\"" + Times.formatISO8601(sStat.getModificationTime()) + "\"" +
", current time: " + "\"" + Times.formatISO8601(Time.now()) + "\"" +
". Stop showing exception here, use a warning instead.");
}
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
throw new IOException("Resource " + sCopy +
" is not publicly accessible and as such cannot be part of the" +
" public cache.");
}
}

downloadAndUnpack(sCopy, destination);
}

随后,重新编译hadoop-yarn-common并将编译好的jar包进行替换:

1
2
3
4
5
6
export HADOOP_VERSION=3.3.1
mv $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-$HADOOP_VERSION.jar $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-$HADOOP_VERSION.jar.bak
cp hadoop-yarn-common-$HADOOP_VERSION.jar $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-$HADOOP_VERSION.jar
chown 1000 $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-$HADOOP_VERSION.jar
chgrp 1000 $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-$HADOOP_VERSION.jar
chmod 644 $HADOOP_HOME/share/hadoop/yarn/hadoop-yarn-common-$HADOOP_VERSION.jar

如果编译过程中出现版本问题,可以尝试更改hadoop-yarn-common下的pom.xml

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  <properties>
+ <java.version>1.8</java.version>
+ <java.releaseVersion>8</java.releaseVersion>
</properties>

<profiles>
<profile>
<id>java9</id>
<activation>
<jdk>[9,)</jdk>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
+ <release>${java.releaseVersion}</release>
<excludes>
<exclude>org/apache/hadoop/yarn/webapp/hamlet/**</exclude>
</excludes>
<testExcludes>
<exclude>org/apache/hadoop/yarn/webapp/hamlet/**</exclude>
</testExcludes>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<excludePackageNames>org.apache.hadoop.yarn.webapp.hamlet</excludePackageNames>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>