问题描述
限时送ChatGPT账号..我正在处理 Spark SQL 程序,但收到以下异常:
I'm working on a Spark SQL program and I'm receiving the following exception:
16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
at scala.Option.getOrElse(Option.scala:121)
at com.somecompany.ml.Main$.main(Main.scala:46)
at com.somecompany.ml.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])
我从堆栈跟踪中识别出的最后一部分代码是 com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
这让我看到这一行:<代码>profilesDF.cache()在缓存之前,我在 2 个数据帧之间执行联合.我已经看到关于在加入之前保留两个数据帧的答案 here 我仍然需要缓存联合数据帧,因为我在我的几个转换中使用它
The last part of my code that I recognize from the stack trace is com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
which gets me to this line: profilesDF.cache()
Before the caching I perform a union between 2 dataframes. I've seen an answer about persisting both the dataframes before the join here I still need to cache the unioned dataframe since I'm using it in several of my transformations
我想知道什么可能导致抛出这个异常?搜索它让我找到了一个处理 rpc 超时异常或一些不是我的问题的安全问题的链接如果您对如何解决它也有任何想法,我显然会很感激,但即使只是了解问题也会帮助我解决它
And I was wondering what may cause this exception to be thrown? Searching for it got me to a link dealing with rpc timeout exception or some security issues which is not my problem If you also have any idea on how to solve it I'd obviously appreciate it but even just understanding the problem will help me solve it
提前致谢
推荐答案
问题:我想知道什么可能导致抛出此异常?
Question : I was wondering what may cause this exception to be thrown?
答案:
spark.sql.broadcastTimeout
300 超时以秒为单位的广播广播加入的等待时间
spark.sql.broadcastTimeout
300 Timeout in seconds for the broadcast wait time in broadcast joins
sparkwork.timeout
120s 所有网络交互的默认超时.. sparkwork.timeout (spark.rpc.askTimeout)
, spark.sql.broadcastTimeout
,spark.kryoserializer.buffer.max
(如果你使用 kryo序列化)等都使用比默认值更大的值进行调整以便处理复杂的查询.您可以从这些值开始,然后根据您的 SQL 工作负载进行相应调整.
sparkwork.timeout
120s Default timeout for all network interactions.. sparkwork.timeout (spark.rpc.askTimeout)
, spark.sql.broadcastTimeout
,
spark.kryoserializer.buffer.max
(if you are using kryo
serialization), etc. are tuned with larger-than-default values in
order to handle complex queries. You can start with these values and
adjust accordingly to your SQL workloads.
注意:Doc 说
以下选项(请参阅 spark.sql. 属性)也可用于调整查询执行的性能.随着更多优化的自动执行,这些选项可能会在未来版本中被弃用.*
The following options(see spark.sql. properties) can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.*
另外,为了更好的理解你可以看到 BroadCastHashJoin 其中 execute 方法是上述堆栈跟踪的触发点.
Also,for your better understanding you can see BroadCastHashJoin where execute method is trigger point for the above stack trace.
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)
streamedPlan.execute().mapPartitions { streamedIter =>
hashJoin(streamedIter, broadcastRelation.value)
}
}
这篇关于接收 TimeoutException 的可能原因是什么:使用 Spark 时期货在 [n 秒] 后超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
更多推荐
[db:关键词]
发布评论