Using pypark’s pandas integration via apply_batch and transform_batch is very powerful but lacking documentation can cause hard to trace bugs – hopefully my experience (below) can save you some debugging when you’d use it yourself.
It has a been something like 3 years since I last used pyspark and a lot has changed in this time. The error messages got much better and the Scala engine is not popping up aggressively as it used to.
Sparks pandas API (formerly project Koala) is also very nice and welcomed addition making the experience more pythonic (pandas..) but like any “compatibility layer” it has its limits and sometimes there are places there Spark’s pandas API is lacking and you want to use real Pandas for the job.
pandas all the things
Luckily Spark has the built-in escape route to enable that – without requiring the developer to define UDFs. In fact there are two apply_batch and transform_batch – both are pretty similar and allow you to call a function which should accept and return a Pandas dataframe. When you use either function, spark iterates on the (large) Spark dataframe, breaks it into batches, and calls the function you specify per batch.
As an aside, the difference between apply_batch and transform_batch is that former is treated as returning a new dataframe and can change the number of rows returned and the latter is expected to return the same number of rows.
So far so good. I had a good use case for these functions, where I wanted to create a new pandas data frame and then add its columns to the batch. I wrote code similar to the snippet below:
def process_rows(df,new_columns_list):
new_df=pd.DataFrame(...)
#do some work
df[new_columns_list]=new_df[new_columns_list]
return df
.
.
df=spark.read.parquet(...)
psdf=df.pandas_api()
psdf.pandas_on_spark.apply_batch(process_rows,new_columns_list=['whatever'])
psdf.to_spark().write...
I ran the code in a local pyspark instance and everything ran great. Then I ran it on a Kubernetes cluster with a small dataset (1 million rows) to see how it behaves in a distributed environment (I wrote enough Spark to know that local runs don’t mean much) and at first glance everything looked just fine as well.
But then something bugged me – there were too many rows where the new columns were null – There were a lot of good records as well but the ratio of faulty (null) rows seems too big . so I started to investigate the data . [few hours later] I found out the data was just fine. I thought maybe some data wasn’t being serialized properly so that only one(/few) executor(s) got it right and the others didn’t. [more hours passed] Nope. Maybe there was something wrong with my pandas code [yet more hours], I rewrote it in a less efficient way … and it seemed to solve the problem.
end game
Crisis averted, spark is annoying (what else is new) and the pandas thing is , well, not as cool as advertised… or is it. The damn thing kept nagging at me. so I took a pause, a few sips of Whiskey (Bushmills’ 21 y/o single malt in case you’re wondering) and then I finally got it
df[new_columns_list]=new_df[new_columns_list]
would only work if the indexes of the two dataframes are aligned. In the local tests it was fine since the data set was small, Spark used a single batch and for that both datasets started at 0 and everything was fine. In the cluster the first batch (maybe even the first per executor) but then the index for the batch coming out spark wasn’t aligned.
There are many ways to solve index mis-alignment – here’s the one I went with (but again there are many other options)
df[new_columns_list]=new_df[new_columns_list].to_numpy()
To summarize: apply_batch/transform_batch do work. The documentation doesn’t specify the indexes that the dataframe batches will have – but don’t make the same mistake I made to assume they’d just start with 0