本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blogs/1299770/
欢迎加入Hadoop超级群: 180941958
Oozie是个针对Hadoop的工作流,有些自己的语法. 这两天碰到一个异常,查看源码才明白Oozie的join只允许承接fork下来的任务,否则会报以下错误.整个异常如下:
WARN CallableQueueService$CallableWrapper:528 - USER[-] GROUP[-] TOKEN[-] APP[-] JOB[-] ACTION[-] exception callable [signal], E0720: Fork/join mismatch, node [join_node_name]
org.apache.oozie.command.CommandException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]
at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:213)
at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:305)
at org.apache.oozie.command.wf.SignalCommand.execute(SignalCommand.java:59)
at org.apache.oozie.command.Command.call(Command.java:202)
at org.apache.oozie.service.CallableQueueService$CallableWrapper.run(CallableQueueService.java:128)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.apache.oozie.workflow.WorkflowException: E0720: Fork/join mismatch, node [tianqi_sawlog_transformation_done]
at org.apache.oozie.workflow.lite.JoinNodeDef$JoinNodeHandler.loopDetection(JoinNodeDef.java:44)
at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:203)
at org.apache.oozie.workflow.lite.LiteWorkflowInstance.signal(LiteWorkflowInstance.java:284)
at org.apache.oozie.command.wf.SignalCommand.call(SignalCommand.java:120)
... 7 more
源码来自org.apache.oozie.workflow.lite.JoinNodeDef,检测这个语法的代码如下:
public void loopDetection(Context context) throws WorkflowException { String flag = getLoopFlag(context.getNodeDef().getName()); if (context.getVar(flag) != null) { throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName()); } String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath); if (forkCount == null) { throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); } int count = Integer.parseInt(forkCount) - 1; if (count == 0) { context.setVar(flag, "true"); } } public boolean enter(Context context) throws WorkflowException { String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath); if (forkCount == null) { throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); } int count = Integer.parseInt(forkCount) - 1; if (count > 0) { context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, "" + count); context.deleteExecutionPath(); } else { context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, null); } return (count == 0); }
可以发现这个两个方法都会通过String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);去获取当前节点
的所有父节点是Fork的个数.如果为空,则通过这行代码throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());抛出这个异常
所以我们今年在使用join节点的时候,一定要承接来自fork或者join本身的节点.下面举几个例子:
1.错误的例子,由于join并没有承接fork或者join,所以会报以上的错误
<workflow-app xmlns="uri:oozie:workflow:0.1" name="workflow-test"> <start to="action1"> <action name="action1"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <join name="join1" to="end" /> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
2.正确的例子,join承接了fork(join1承接了fork1过来的action1和action2)
<workflow-app xmlns="uri:oozie:workflow:0.1" name="workflow-test"> <start to="fork1"> <fork name="fork1"> <path start="action1" /> <path start="action2" /> </fork> <action name="action1"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <action name="action2"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <join name="join1" to="end" /> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
3.正确的例子,join承接了join(join2承接了join1)
<workflow-app xmlns="uri:oozie:workflow:0.1" name="workflow-test"> <start to="fork1"> <fork name="fork1"> <path start="action1" /> <path start="action2" /> </fork> <action name="action1"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <action name="action2"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <join name="join1" to="join2" /> <join name="join2" to="end" /> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
4.正确的例子,join承接了fork下来的所有节点
<workflow-app xmlns="uri:oozie:workflow:0.1" name="workflow-test"> <start to="fork1"> <fork name="fork1"> <path start="action1" /> <path start="action2" /> </fork> <action name="action1"> <!-- do some things--> <ok to="action3" /> <error to="fail" /> </action> <action name="action2"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <action name="action3"> <!-- do some things--> <ok to="join1" /> <error to="fail" /> </action> <join name="join1" to="end" /> <kill name="fail"> <message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> </kill> <end name="end" /> </workflow-app>
这里只是大概玩了下Oozie,希望能起抛砖引玉...
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
https://www.tutorialspoint.com/apache_drill/apache_drill_pdf_version.htm https://www.tutorialspoint.com/apache_drill/apache_drill_pdf_version.htm ...
可作为java大数据课程设计使用: 详情查看:https://blog.csdn.net/weixin_46115961/article/details/126061076
export HADOOP_HOME="/usr/local/hadoop/" export JAVA_HOME="/usr/local/hadoop/jdk1.6.0_24" export CLASSPATH="$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:${HADOOP_HOME}/lib/commons-logging-1.0.4.jar...
当flink on yarn模式运行时,发生如下异常信息,需要将压缩包中的4个依赖jar包放入flink安装路径下的lib目录下。 Exception in thread "main" java.lang.NoClassDefFoundError: ...
http://mirrors.aliyun.com/apache/hadoop/common/hadoop-2.6.3/hadoop-2.6.3.tar.gz 3.Spark-1.5.2-bin-hadoop2.6.tgz http://www.apache.org/dyn/closer.lua/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz ...
09/03/23 20:19:47 INFO dfs.Storage: Storage directory /tmp/hadoop/hadoop-hadooper/dfs/name has b 09/03/23 20:19:47 INFO dfs.NameNode: SHUTDOWN_MSG: /**************************************************...
export HADOOP_HOME="/usr/local/hadoop/" export JAVA_HOME="/usr/local/hadoop/jdk1.6.0_24" export CLASSPATH="$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:${HADOOP_HOME}/lib/commons-logging-1.0.4.jar...
第4章 Oozie的使用 4.1 案例一:Oozie调度shell脚本 目标:使用Oozie调度Shell脚本 分步实现: 1)解压官方案例模板 [atguigu@hadoop102 oozie-4.0.0-cdh5.3.6]$ tar -zxvf oozie-examples.tar.gz 2)创建工作目录 ...
http://wiki.apache.org/hadoop/ This distribution includes cryptographic software. The country in which you currently reside may have restrictions on the import, possession, use, and/or re-export ...
适用于Hadoop 2.x的Oozie 这是一个映像,该映像对oozie / webapp的uber配置文件进行了一些更改,并使用hadoop-2配置文件和Hadoop 2.7.0库构建了一个Oozie发行版。 用法 将Oozie sharelib安装到HDFS docker run -ti...
Hadoop权威指南 第3版 修订版 本人是一个IT开发爱好者,从事Android的开发工作。热爱学习新技术。若您也有共同的兴趣爱好,欢迎你加入QQ技术群让我们共同发现新技术。 【GitHub】https://github.com/xiaole0310 ...
在root ALL=(ALL) ALL后 hadoop ALL=(ALL) ALL使hadoop用户具有root权限 (id:查看用户) 三、ssh配置 用hadoop账户进行登录。 安装openssh-server:sudo apt-get install openssh-server 建立SSH KEY:ssh-keygen...
数据算法:Hadoop/Spark大数据处理技巧
Hadoop常见异常,以及hadoop配置,等资料
在网上搜集的以及本人自己总结的hadoop集群常见问题及解决办法,融合了网上常常搜到的一些文档以及个人自己的经验。
调用保存文件的算子,需要配置Hadoop依赖 将文件夹中的 hadoop-3.0.0 解压到电脑任意位置 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’ winutils.exe,并放入Hadoop解压...
解决方案 在hadoop的配置文件core-site.xml增加如下配置: hadoop.proxyuser.hc.hosts * hadoop.proxyuser.hc.groups * 其中“hc”是连接beeline的用户。 启动测试 重启hdfs:先stop-all.sh,再start-all.sh,...
当从本地上传文件到HDFS中时报错 fs.FSInputChecker: Found checksum error: b[0, 69]=6d6f...[root@node01 data]# hadoop fs -put hyk.txt /hyk/test 20/02/18 12:54:39 INFO fs.FSInputChecker: Fo
HIVE Data Warehousing & Analytics on Hadoop.ppt
NULL 博文链接:https://ouyida3.iteye.com/blog/1144326