Scala Spark - task not serializable -
i have following code, fault @ sc.parallelize()
val pairs = ret.cartesian(ret) .map { case ((k1, v1), (k2, v2)) => ((k1, k2), (v1.tolist, v2.tolist)) } (pair <- pairs) { val test = sc.parallelize(pair._2._1.map(_._1 )) }
where
- k1, k2 strings
- v1, v2 lists of doubles
i getting following error whenever try access sc. doing wrong here?
exception in thread "main" org.apache.spark.sparkexception: task not serializable @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:315) @ org.apache.spark.util.closurecleaner$.org$apache$spark$util$closurecleaner$$clean(closurecleaner.scala:305) @ org.apache.spark.util.closurecleaner$.clean(closurecleaner.scala:132) @ org.apache.spark.sparkcontext.clean(sparkcontext.scala:1893) @ org.apache.spark.rdd.rdd$$anonfun$foreach$1.apply(rdd.scala:869) @ org.apache.spark.rdd.rdd$$anonfun$foreach$1.apply(rdd.scala:868) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:147) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:108) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:286) @ org.apache.spark.rdd.rdd.foreach(rdd.scala:868) @ correlationcalc$.main(correlationcalc.scala:33) @ correlationcalc.main(correlationcalc.scala) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.spark.deploy.sparksubmit$.org$apache$spark$deploy$sparksubmit$$runmain(sparksubmit.scala:665) @ org.apache.spark.deploy.sparksubmit$.dorunmain$1(sparksubmit.scala:170) @ org.apache.spark.deploy.sparksubmit$.submit(sparksubmit.scala:193) @ org.apache.spark.deploy.sparksubmit$.main(sparksubmit.scala:112) @ org.apache.spark.deploy.sparksubmit.main(sparksubmit.scala) caused by: java.io.notserializableexception: org.apache.spark.sparkcontext serialization stack: - object not serializable (class: org.apache.spark.sparkcontext, value: org.apache.spark.sparkcontext@40bee8c5) - field (class: correlationcalc$$anonfun$main$1, name: sc$1, type: class org.apache.spark.sparkcontext) - object (class correlationcalc$$anonfun$main$1, ) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:40) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:81) @ org.apache.spark.util.closurecleaner$.ensureserializable(closurecleaner.scala:312) ... 20 more
the for-comprehension doing pairs.map()
rdd operations performed workers , have them work, send them must serializable. sparkcontext attached master: responsible managing entire cluster.
if want create rdd, have aware of whole cluster (that's 2nd "d" --- distributed) can't create new rdd on workers. , don't want turn each row in pairs rdd (and each same name!) anyway.
it's difficult tell code you'd do,
val test = pairs.map( r => r._2._1)
which rdd each row whatever in v1.tolist's
Comments
Post a Comment