0

I want to take function name as a parameter in parent class so child class can set it. This variable will be used in one of the parent class' method.

abstract class Parent[T: TypeInformation] {
   val myfun: T => Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   val myfun: User => Unit = service.callme
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}

I'm new to scala but this looked okay. While compiler doesn't complain, I am getting run time exception and the job won't start:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:250)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:427)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field Child1.myfun of type scala.Function1 in instance of Child1
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1431)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2372)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:235)
    ... 11 more

Any idea how can I set the function name as variable value in child so parent can use it?

Mital Pritmani
  • 4,880
  • 8
  • 38
  • 39
  • 1
    This is going through object serialization. Not sure if it is possible to serialize closures/functions this way. Is it possible to do something simpler? (Note that you are not setting a "function name" to the variable, `myfun` contains the actual function itself). – Thilo Oct 22 '20 at 07:58
  • 2
    Maybe an abstract `def myFun(t:T): Unit` that the subclass can implement would work? That way, the function becomes part of the class definition itself and does not need to be serialized as data. – Thilo Oct 22 '20 at 07:59
  • Could it be that spark doesn't have the matching jar? I get serialization issues when the jars are missing or out of date. See https://stackoverflow.com/questions/28186607/java-lang-classcastexception-using-lambda-expressions-in-spark-job-on-remote-ser/28367602#28367602 – joseph Mar 28 '22 at 20:06

1 Answers1

5

You are talking about passing function names, but what you're passing is the actual function itself. It looks like that doesn't survive serialization through flink.

An alternative encoding might be not using a function value at all, but a plain old method:

abstract class Parent[T: TypeInformation] {
   def myfun(t: T): Unit

   // A different method uses myfun
}

class Child1 extends Parent[User] {
   val service = new Service()

   def myfun(t: User): Unit = service.callme(t)
}

class Service {
   def callme(user: User) => Unit = {
      println("We are here for user")
   }
}

If this doesn't work out of the box, it should at least give you a clearer error message (possibly about the availability of Service on the class path.

Fixing that issue might then allow you to go back to your previous encoding

Martijn
  • 11,964
  • 12
  • 50
  • 96