storm客户端提交topology失败:
java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:141)
at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:176)
at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:158)
at cn.com.tiza.dataquality.service.service.JobService.start(JobService.java:93)
at cn.com.tiza.dataquality.service.service.JobService.submit(JobService.java:149)
at cn.com.tiza.dataquality.service.resource.JobResource.execute(JobResource.java:33)
at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:160)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)
at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)
nimbus.log:
2017-11-17T16:12:27.038+0800 b.s.d.nimbus [WARN] Topology submission exception. (topology name='dq23') #<IllegalArgumentException java.lang.IllegalArgumentException: storm-local/nimbus/inb
ox/stormjar-bc004b80-5f6e-4b94-9505-911511e5cc1f.jar to copy to storm-local/nimbus/stormdist/dq23-94-1510906347 does not exist!>
2017-11-17T16:12:27.038+0800 o.a.t.s.TNonblockingServer [ERROR] Unexpected exception while invoking!
java.lang.IllegalArgumentException: storm-local/nimbus/inbox/stormjar-bc004b80-5f6e-4b94-9505-911511e5cc1f.jar to copy to storm-local/nimbus/stormdist/dq23-94-1510906347 does not exist!
at backtype.storm.daemon.nimbus$fn__4364.invoke(nimbus.clj:1173) ~[storm-core-0.9.7.jar:0.9.7]
at clojure.lang.MultiFn.invoke(MultiFn.java:236) ~[clojure-1.5.1.jar:na]
at backtype.storm.daemon.nimbus$setup_storm_code.invoke(nimbus.clj:307) ~[storm-core-0.9.7.jar:0.9.7]
at backtype.storm.daemon.nimbus$fn__4261$exec_fn__1104__auto__$reify__4274.submitTopologyWithOpts(nimbus.clj:953) ~[storm-core-0.9.7.jar:0.9.7]
at backtype.storm.daemon.nimbus$fn__4261$exec_fn__1104__auto__$reify__4274.submitTopology(nimbus.clj:966) ~[storm-core-0.9.7.jar:0.9.7]
at backtype.storm.generated.Nimbus$Processor$submitTopology.getResult(Nimbus.java:1240) ~[storm-core-0.9.7.jar:0.9.7]
at backtype.storm.generated.Nimbus$Processor$submitTopology.getResult(Nimbus.java:1228) ~[storm-core-0.9.7.jar:0.9.7]
at org.apache.thrift7.ProcessFunction.process(ProcessFunction.java:32) ~[storm-core-0.9.7.jar:0.9.7]
at org.apache.thrift7.TBaseProcessor.process(TBaseProcessor.java:34) ~[storm-core-0.9.7.jar:0.9.7]
at org.apache.thrift7.server.TNonblockingServer$FrameBuffer.invoke(TNonblockingServer.java:632) ~[storm-core-0.9.7.jar:0.9.7]
at org.apache.thrift7.server.THsHaServer$Invocation.run(THsHaServer.java:201) [storm-core-0.9.7.jar:0.9.7]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
经过分析nimbus有个clean-inbox的机制来定时清理inbox中的jar文件,并有两个配置项来设置定时策略:
/**
* How often nimbus should wake the cleanup thread to clean the inbox.
* @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
*/
public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
/**
* The length of time a jar file lives in the inbox before being deleted by the cleanup thread.
*
* Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS.
* Note that the time it takes to delete an inbox jar file is going to be somewhat more than
* NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS
* is set to).
* @see NIMBUS_CLEANUP_FREQ_SECS
*/
public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
NIMBUS_CLEANUP_INBOX_FREQ_SECS: 表示nimbus多久唤醒一次清理线程去进行清理;
NIMBUS_INBOX_JAR_EXPIRATION_SECS:表示jar文件在inbox中存活的时长,在清理线程清理之前如果到期了就会被清理
另一方面,通过storm-core提供的StormSubmitter.submitTopology的方法进行提交任务时,上传jar包的逻辑如下:
private static String submittedJar = null;
private static void submitJar(Map conf, ProgressListener listener) {
if(submittedJar==null) {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
String localJar = System.getProperty("storm.jar");
submittedJar = submitJar(conf, localJar, listener);
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}
只要客户端进程不停,jar包就只上传一次。
所以等一个小时后,jar会被清除,重新提交任务就找不到inbox中的jar文件。