This is not a duplicate of Usage of custom Python object in Pyspark UDF because whereas that troubleshoots the usage of an outside file this is a request for best practices.
To begin, I'm sure this would be more efficient in Scala, but adding a new language to my ecosystem isn't feasible.
My problem is I have a Hive database containing IP addresses, and I would like to map each of them to its parent Autonomous System Number (ASN).
MaxMind offers several downloadable databases containing various IP metadata, including Geolocation, ASNs, etc., and additionally several reference implementations of code to read these databases. The databases themselves are efficiently-packed binary trees in the MaxMind DB Format and are difficult to unroll into something suitable for a database join.
My plan to do this mapping is to create a PySpark UDF around their Python database reader, like so:
def mmdbv(x):
import maxminddb
reader = maxminddb.open_database(SparkFiles.get("GeoIP2-ISP.mmdb"))
try:
foo = reader.get(x)['autonomous_system_number']
except:
foo = None
finally:
reader.close()
return foo
if __name__== "__main__":
spark = (SparkSession
.builder
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate())
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile('maxminddb-1.3.0-py3.6.egg')
spark.sparkContext.addFile('GeoIP2-ISP.mmdb')
mf = F.udf(mmdbv, T.IntegerType())
main()
spark.stop()
This is very inefficient, essentially elementwise creating a file handle, opening, reading and closing. Is there a better way?