I am trying to use Kryo Serializer in spark streaming. I read in Spark tuning docs that -
Finally, if you don’t register your custom classes, Kryo will still work, but it will have to store the full class name with each object, which is wasteful.
So i am trying to register all classes. My case classes are -
trait Message extends java.io.Serializable
object MutableTypes {
type Childs = scala.collection.mutable.Map[Int, (Long, Boolean)]
type Parents = scala.collection.mutable.Map[Int, Childs]
}
case class IncomingRecord(id_1: String, id_raw: String, parents_to_add: MutableTypes.Parents, parents_to_delete: MutableTypes.Parents) extends Message
And i am registering class like this -
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired","true")
sparkConf.registerKryoClasses(Array(classOf[Tuple2[Long,Boolean]],classOf[IncomingRecord]))
I got this exception:
com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: scala.Tuple2$mcJZ$sp Note: To register this class use: kryo.register(scala.Tuple2$mcJZ$sp.class); Serialization trace: parents_to_add (com.test.IncomingRecord) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185) at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
How can I register my class? How to solve this?
Update:
I know turning registeration false will remove exception but that will not add that much performance due to extra overhead. I want to know how can I register my class.