0

This is not a duplicate of Usage of custom Python object in Pyspark UDF because whereas that troubleshoots the usage of an outside file this is a request for best practices.

To begin, I'm sure this would be more efficient in Scala, but adding a new language to my ecosystem isn't feasible.

My problem is I have a Hive database containing IP addresses, and I would like to map each of them to its parent Autonomous System Number (ASN).

MaxMind offers several downloadable databases containing various IP metadata, including Geolocation, ASNs, etc., and additionally several reference implementations of code to read these databases. The databases themselves are efficiently-packed binary trees in the MaxMind DB Format and are difficult to unroll into something suitable for a database join.

My plan to do this mapping is to create a PySpark UDF around their Python database reader, like so:

def mmdbv(x):

    import maxminddb

    reader = maxminddb.open_database(SparkFiles.get("GeoIP2-ISP.mmdb"))

    try:
        foo = reader.get(x)['autonomous_system_number']
    except:
        foo = None
    finally:
        reader.close()

    return foo


if __name__== "__main__":

    spark = (SparkSession
            .builder
            .enableHiveSupport()
            .config("hive.exec.dynamic.partition", "true")
            .config("hive.exec.dynamic.partition.mode", "nonstrict")
            .getOrCreate())

    spark.sparkContext.setLogLevel("WARN")

    spark.sparkContext.addPyFile('maxminddb-1.3.0-py3.6.egg')
    spark.sparkContext.addFile('GeoIP2-ISP.mmdb')

    mf = F.udf(mmdbv, T.IntegerType())

    main()


    spark.stop()

This is very inefficient, essentially elementwise creating a file handle, opening, reading and closing. Is there a better way?

Patrick McCarthy
  • 2,478
  • 2
  • 24
  • 40
  • Is unpacking the binary tree into a flat file containing all its paths feasible? Spark works with such data well. – 9000 Jan 31 '18 at 16:00
  • Not really, I tried that first. The DB is small because it's a tree, but to unroll it I'd need to have a row for every possible IPv4 and IPv6 address, which would be titanic even for IPv4 alone. – Patrick McCarthy Jan 31 '18 at 16:03
  • https://stackoverflow.com/a/46694426/8371915 – Alper t. Turker Jan 31 '18 at 16:04
  • 1
    Or just use `RDD.mapPartitions`. – Alper t. Turker Jan 31 '18 at 16:06
  • This post has very little in common with https://stackoverflow.com/questions/46692370/usage-of-custom-python-object-in-pyspark-udf – Patrick McCarthy Jan 31 '18 at 16:37
  • @user8371915 what do you mean mapPartitions? Used how? – Patrick McCarthy Jan 31 '18 at 18:16
  • `df.rdd.mapPartitions(your_function)` where `your_function` opens the file, and then iterates over all elements in the partition (see [Design Patterns for using foreachRDD](https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd)) And questions have exactly the same problem - how to initialize object once for interpreter. Serialization is a secondary thing. Please read linked answer carefully - it does solve your problem. – Alper t. Turker Jan 31 '18 at 19:17
  • I see what makes you suggest that they're similar (to the extent that we assume mapPartitions and running once per executor is ideal), although I'm not positive this is a duplicate. In any case, I reimplemented my approach with a function run via mapPartitions containing a closure to do the lookup, but I find that in fact mapPartitions is an order of magnitude slower across my tests (2min15sec vs 13sec, on average). Identical partitioning, fixed executor size, etc. – Patrick McCarthy Feb 01 '18 at 17:58

0 Answers0