7

I have a situation where I want to execute a system process on each worker within Spark. I want this process to be run an each machine once. Specifically this process starts a daemon which is required to be running before the rest of my program executes. Ideally this should execute before I've read any data in.

I'm on Spark 2.0.2 and using dynamic allocation.

Jon
  • 3,985
  • 7
  • 48
  • 80
  • Duplicate of: http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark – T. Gawęda Nov 29 '16 at 22:11

3 Answers3

7

You may be able to achieve this with a combination of lazy val and Spark broadcast. It will be something like below. (Have not compiled below code, you may have to change few things)

object ProcessManager {
  lazy val start = // start your process here.
}

You can broadcast this object at the start of your application before you do any transformations.

val pm = sc.broadcast(ProcessManager)

Now, you can access this object inside your transformation like you do with any other broadcast variables and invoke the lazy val.

rdd.mapPartition(itr => {
  pm.value.start
  // Other stuff here.
}
Jegan
  • 1,721
  • 1
  • 20
  • 25
  • 1
    Won't this execute the process once per partition and not once per worker? – Jon Nov 29 '16 at 22:41
  • You are right, that is just an example. But since it is a lazy val and ProcessManager is an "object", it runs only once in an executor. – Jegan Nov 29 '16 at 23:02
  • Broadcasting that object is a little odd. You should broadcast data, not code. Just having the object and accessing the start variable will be enough. That way you don't need the ProcessManager object to be serializable. – Atreys Dec 11 '16 at 02:30
  • @Jegan could you please help me figure out jave analogy\ – VB_ Feb 12 '18 at 13:06
  • @Jegan now your code will work not on all workers, but on workers that have `rdd`. Am I wrong? – VB_ Feb 12 '18 at 13:38
2

An object with static initialization which invokes your system process should do the trick.

object SparkStandIn extends App {
  object invokeSystemProcess {
    import sys.process._
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".!

    def doIt(): Unit = {
      // this object will construct once per jvm, but objects are lazy in
      // another way to make sure instantiation happens is to check that the errorCode does not represent an error
    }
  }
  invokeSystemProcess.doIt()
  invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once
}
Atreys
  • 3,741
  • 1
  • 17
  • 27
  • But how do you ensure it is actually initialized without repeating calls on every transformation? –  Nov 29 '16 at 20:40
0

A specific answer for a specific use case, I have a cluster with 50 nodes and I wanted to know which ones have CET timezone set:

(1 until 100).toSeq.toDS.
mapPartitions(itr => {
        sys.process.Process(
                Seq("bash", "-c", "echo $(hostname && date)")
        ).
        lines.
        toIterator
}).
collect().
filter(_.contains(" CET ")).
distinct.
sorted.
foreach(println)

Notice I don't think it's guaranteed 100% you'll get a partition for every node so the command might not get run on every node, even using using a 100 elements Dataset in a cluster with 50 nodes like the previous example.

jpgerek
  • 796
  • 1
  • 8
  • 13