9

I'm writing a process that needs to generate a UUID for certain groups that match based on some criteria. I got my code working, but I'm worried about potential issues from creating the UUID within my UDF (thus making it non-deterministic). Here's a simplified example of some code to illustrate:

from uuid import uuid1

from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf

spark = (
    SparkSession.builder.master("local")
    .appName("Word Count")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])


@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
    df["uuid"] = str(uuid1())
    return df


>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age|                uuid|
+----+---+--------------------+
|   j|  3|1f8f48ac-0da8-430...|
|   h|  3|1f8f48ac-0da8-430...|
|   a|  2|d5206d03-bcce-445...|
+----+---+--------------------+

This currently works on some data processing over 200k records on AWS Glue, and I haven't found any bugs yet.

I use uuid1 since that uses the node information to generate the UUID thus ensuring no 2 nodes generate the same id.

One thought I had was to register the UDF as non-deterministic with:

udf = pandas_udf(
    create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()

But that gave me the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
 `age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
               ;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
   +- LogicalRDD [name#0, age#1L], false

My questions are:

  • What are some potential issues this could encounter?
  • If it does have potential issues, what are some says in which I could make this deterministic?
  • Why can't GROUPED_MAP functions be labeled as non-deterministic?
aiguofer
  • 1,887
  • 20
  • 34
  • By the way, I have already read https://stackoverflow.com/questions/42960920/spark-dataframe-random-uuid-changes-after-every-transformation-action/42961574#42961574 but that just explains 1 potential issue. Looking for anything else. – aiguofer May 19 '20 at 16:43
  • To be sure, your pandas_udf wants to add a UUID to all the rows of the original df, and keep the rest of the columns. The only thing is that the UUID is conditional to some values. Am I ok? – Alfilercio May 19 '20 at 21:55
  • @Alfilercio yeah for the most part. I don't necessarily need to return every column, I really just need the row_id (not in the above example) and the matched_id (uuid in above example). Btw, the comparison logic is pretty complex, so it probably has to be a pandas_udf. – aiguofer May 20 '20 at 15:42
  • Probably the most potential issue are the non-deterministic generation of the uuid as you already read on the stack overflow post. In data manipulation are really not so good that your id change too much time during data processing. By the way on the last versions of spark you can checkpoint the dataframe to avoid this kind of mistake. – gccodec May 21 '20 at 07:20
  • @gccodec cool yeah that's kinda what I was thinking... I tried a few things to produce deterministic IDs in the UDF and replace them with UUIDs after, but it made it horribly slow and still had some issues. In our case, we produce the group UUIDs and then save the results. We don't perform any actions between the UDF output and storing results, so I think we'll be fine. We'll just make sure to keep an eye out to make sure no records get duplicated or other unexpected behavior occurs. – aiguofer May 22 '20 at 17:30
  • would it be a solution to use [monotonically_increasing_id](https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id) instead of `uuid1`? – werner May 24 '20 at 15:14
  • @aiguofer Perhaps your title and second question should include 'deterministic' rather than 'non-deterministic', given that you are currently using a deterministic UDF. – Chris May 26 '20 at 15:47
  • @werner unfortunately no because I have to actually do some pretty complex logic to group matching items together (this is just a basic code example to illustrate). Unless I can call that function within my UDF it wouldn't really help. Unfortunately it's pretty complex to re-create my own monotonically increasing id within the UDF. – aiguofer May 26 '20 at 16:10
  • @Chris not sure what you mean... this is definitely a non-deterministic UDF, every time you run it it gives different results (if you have pyspark installed and set up, you can literally copy/paste the snippet of code and try it yourself). – aiguofer May 26 '20 at 16:11
  • Ah, I see what you are getting at :) I thought you were referring to whether it was treated as deterministic or non-deterministic by Spark! (it is currently treated as deterministic, but I think that is ok for your situation as I have explained in my answer). – Chris May 26 '20 at 16:16
  • uuid.uuid1() queries random.getrandbits so it might have changed the internal states of random. This means that somewhere in the code you have dependency on the states in random. In order to querying the random, you can specify clock_seq argument when calling uuid1(). Or just simply use uuid4(). – Mahsa Hassankashi May 26 '20 at 21:20

1 Answers1

6

Your function is non-deterministic, but Spark is treating it as deterministic i.e. "Due to optimization, duplicate invocations maybe eliminated". However, each call to the pandas_udf will be a unique input (rows grouped by key), so the optimisation for duplicate calls to the pandas_udf won't be triggered. Hence, the asNondeterministic method to suppress such optimisations is redundant for a pandas_udf of GROUPED_MAP type. In my opinion, this explains why the GroupedData.apply function has not been coded to accept a pandas_udf marked as non-deterministic. It would make no sense to as there are no optimisation opportunities to suppress.

Chris
  • 1,335
  • 10
  • 19
  • Thanks for the response... However Question 1: I was referring to the fact that it's non-deterministic, not the uuid choice. uuid1 uses MAC address, current time, and a random sequence to generate; although i suppose collisions could happen, I'm not too worried as it's highly unlikely. Question 2: Syntax doesn't matter, a decorated function is the same regardless of how you decorate it. Your code still gives the exact same error. Question 3: this contradicts your answer to #2 and doesn't really explain why. I was already aware it doesn't work for Pandas UDFs, but want to understand why. – aiguofer May 26 '20 at 16:01
  • Appologies, I thought I had it working, but I was wrong. On trying now it doesn't work as you say :( – Chris May 26 '20 at 17:00
  • No worries, thanks for your answer! It certainly explains why `GROUPED_MAP` UDFs can't be marked as non-deterministic. Based on this, it does seem like the only issue that can arise is having a different value after each action. Since, in our case, we're immediately storing the results this should be a non-issue. One thing we'll have to be careful about is what we log within the UDF (logging the generated UUID will likely be pointless as it might not match what actually gets stored). – aiguofer May 27 '20 at 17:18