I am using Spark 1.6.1 and Python. How can I enable Kryo serialization when working with PySpark?
I have the following settings in the spark-default.conf file:
spark.eventLog.enabled true
spark.eventLog.dir //local_drive/sparkLogs
spark.default.parallelism 8
spark.locality.wait.node 5s
spark.executor.extraJavaOptions -XX:+UseCompressedOops
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.kryo.classesToRegister Timing, Join, Select, Predicate, Timeliness, Project, Query2, ScanSelect
spark.shuffle.compress true
And the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: org.apache.spark.SparkException: Failed to register classes with Kryo
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:128)
at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:273)
at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:258)
at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:174)
Caused by: java.lang.ClassNotFoundException: Timing
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:120)
at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:120)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:120)
The main class contains (Query2.py):
from Timing import Timing
from Predicate import Predicate
from Join import Join
from ScanSelect import ScanSelect
from Select import Select
from Timeliness import Timeliness
from Project import Project
conf = SparkConf().setMaster(master).setAppName(sys.argv[1]).setSparkHome("$SPARK_HOME")
sc = SparkContext(conf=conf)
conf.set("spark.kryo.registrationRequired", "true")
sqlContext = SQLContext(sc)
I know that "Kryo won’t make a major impact on PySpark because it just stores data as byte[] objects, which are fast to serialize even with Java. But it may be worth a try to set spark.serializer and not try to register any classes"(Matei Zaharia, 2014). However, I need to register the classes.
Thanks in advance.