python - Save Pyspark dataframe to Hbase -
hi have referred below this link write pyspark dataframe hbase. getting below error not sure doing wrong, seems missing thing on configuration side/ missing jar
below code trying use:
df = sc.parallelize([('a', '1.0'), ('b', '2.0')]).todf(schema=['col0', 'col1']) catalog = ''.join("""{ "table":{"namespace":"default", "name":"testtable"}, "rowkey":"key", "columns":{ "col0":{"cf":"rowkey", "col":"key", "type":"string"}, "col1":{"cf":"cf", "col":"col1", "type":"string"} } }""".split()) #write hbase df.write \ .options(catalog=catalog) \ .format('org.apache.spark.sql.execution.datasources.hbase') \ .mode("overwrite") \ .option("zkurl","host1,host2,host3:2181") \ .save() #reading df_read = spark.read.options(catalog=catalog).format('org.apache.spark.sql.execution.datasources.hbase').load() df_read .show()
i have started pyspark using kernel.json , pyspark submit arguments are:
"pyspark_submit_args": "--master yarn --jars hbase_spark_jar/hbase-0.90.2.jar,/hbase_spark_jar/hbase-client-1.3.1.jar,hbase_spark_jar/spark-avro_2.11-3.0.1.jar,/hbase_spark_jar/hbase-spark-1.2.0-cdh5.7.3.jar,/hbase_spark_jar/shc-1.0.0-2.0-s_2.11.jar --files /etc/hbase/2.5.0.0-1245/0/hbase-site.xml --executor-memory 8g --executor-cores 4 --num-executors 4 pyspark-shell"
i getting below error:
py4jjavaerror: error occurred while calling o90.save. : org.apache.spark.sparkexception: job aborted due stage failure: task 3 in stage 5.0 failed 4 times, recent failure: lost task 3.3 in stage 5.0 (tid 73, hostname): java.lang.nullpointerexception @ org.apache.hadoop.hbase.mapreduce.tableoutputformat$tablerecordwriter.close(tableoutputformat.java:107) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1$$anonfun$12$$anonfun$apply$5.apply$mcv$sp(pairrddfunctions.scala:1124) @ org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils.scala:1343) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1124) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1095) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:70) @ org.apache.spark.scheduler.task.run(task.scala:85) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:274) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) driver stacktrace: @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1450) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1438) @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1437) @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59) @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:48) @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1437) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811) @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:811) @ scala.option.foreach(option.scala:257) @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:811) @ org.apache.spark.scheduler.dagschedulereventprocessloop.doonreceive(dagscheduler.scala:1659) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1618) @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1607) @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) @ org.apache.spark.scheduler.dagscheduler.runjob(dagscheduler.scala:632) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1871) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1884) @ org.apache.spark.sparkcontext.runjob(sparkcontext.scala:1904) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1.apply$mcv$sp(pairrddfunctions.scala:1151) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1.apply(pairrddfunctions.scala:1078) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1.apply(pairrddfunctions.scala:1078) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:151) @ org.apache.spark.rdd.rddoperationscope$.withscope(rddoperationscope.scala:112) @ org.apache.spark.rdd.rdd.withscope(rdd.scala:358) @ org.apache.spark.rdd.pairrddfunctions.saveasnewapihadoopdataset(pairrddfunctions.scala:1078) @ org.apache.spark.sql.execution.datasources.hbase.hbaserelation.insert(hbaserelation.scala:160) @ org.apache.spark.sql.execution.datasources.hbase.defaultsource.createrelation(hbaserelation.scala:59) @ org.apache.spark.sql.execution.datasources.datasource.write(datasource.scala:429) @ org.apache.spark.sql.dataframewriter.save(dataframewriter.scala:211) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:62) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ py4j.reflection.methodinvoker.invoke(methodinvoker.java:237) @ py4j.reflection.reflectionengine.invoke(reflectionengine.java:357) @ py4j.gateway.invoke(gateway.java:280) @ py4j.commands.abstractcommand.invokemethod(abstractcommand.java:128) @ py4j.commands.callcommand.execute(callcommand.java:79) @ py4j.gatewayconnection.run(gatewayconnection.java:211) @ java.lang.thread.run(thread.java:745) caused by: java.lang.nullpointerexception @ org.apache.hadoop.hbase.mapreduce.tableoutputformat$tablerecordwriter.close(tableoutputformat.java:107) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1$$anonfun$12$$anonfun$apply$5.apply$mcv$sp(pairrddfunctions.scala:1124) @ org.apache.spark.util.utils$.trywithsafefinallyandfailurecallbacks(utils.scala:1343) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1124) @ org.apache.spark.rdd.pairrddfunctions$$anonfun$saveasnewapihadoopdataset$1$$anonfun$12.apply(pairrddfunctions.scala:1095) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:70) @ org.apache.spark.scheduler.task.run(task.scala:85) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:274) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) ... 1 more
Comments
Post a Comment