I’ve been searching for a while if there is any way to use a Scala class in Pyspark, and I haven’t found any documentation nor guide about this subject.
Let’s say I create a simple class in Scala that uses some libraries of apache-spark, something like:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
- Is there any possible way to use this class in
Pyspark? - Is it too tough?
- Do I have to create a
.pyfile? - Is there any guide that shows how to do that?
By the way I also looked at the spark code and I felt a bit lost, and I was incapable of replicating their functionality for my own purpose.
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
Yes it is possible although can be far from trivial. Typically you want a Java (friendly) wrapper so you don’t have to deal with Scala features which cannot be easily expressed using plain Java and as a result don’t play well with Py4J gateway.
Assuming your class is int the package com.example and have Python DataFrame called df
df = ... # Python DataFrame
you’ll have to:
- Build a jar using your favorite build tool.
-
Include it in the driver classpath for example using
--driver-class-pathargument for PySpark shell /spark-submit. Depending on the exact code you may have to pass it using--jarsas well -
Extract JVM instance from a Python
SparkContextinstance:jvm = sc._jvm
-
Extract Scala
SQLContextfrom aSQLContextinstance:ssqlContext = sqlContext._ssql_ctx
-
Extract Java
DataFramefrom thedf:jdf = df._jdf
-
Create new instance of
SimpleClass:simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
-
Call
exemethod and wrap the result using PythonDataFrame:from pyspark.sql import DataFrame DataFrame(simpleObject.exe(), ssqlContext)
The result should be a valid PySpark DataFrame. You can of course combine all the steps into a single call.
Important: This approach is possible only if Python code is executed solely on the driver. It cannot be used inside Python action or transformation. See How to use Java/Scala function from an action or a transformation? for details.
Method 2
As an update to @zero323‘s answer, given that Spark’s APIs have evolved over the last six years, a recipe that works in Spark-3.2 is as follows:
- Compile your Scala code into a JAR file (e.g. using
sbt assembly) - Include the JAR file in the
--jarsargument tospark-submittogether with any--py-filesarguments needed for local package definitions - Extract the JVM instance within Python:
jvm = spark._jvm
- Extract a Java representation of the
SparkSession:
jSess = spark._jsparkSession
- Extract the Java representation of the PySpark
DataFrame:
jdf = df._jdf
- Create a new instance of
SimpleClass:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
- Call the
exemethod and convert its output into a PySparkDataFrame:
from pyspark.sql import DataFrame result = DataFrame(simpleObject.exe(), spark)
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0