I am trying to convert the Spark RDD to a DataFrame. I have seen the documentation and example where the scheme is passed to
sqlContext.CreateDataFrame(rdd,schema) function.
But I have 38 columns or fields and this will increase further. If I manually give the schema specifying each field information, that it going to be so tedious job.
Is there any other way to specify the schema without knowing the information of the columns prior.
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
See,
There are two ways to convert an RDD to DF in Spark.
toDF() and createDataFrame(rdd, schema)
I will show you how you can do that dynamically.
toDF()
The toDF() command gives you the way to convert an RDD[Row] to a Dataframe. The point is, the object Row() can receive a **kwargs argument. So, there is an easy way to do that.
from pyspark.sql.types import Row
#here you are going to create a function
def f(x):
d = {}
for i in range(len(x)):
d[str(i)] = x[i]
return d
#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()
This way you are going to be able to create a dataframe dynamically.
createDataFrame(rdd, schema)
Other way to do that is creating a dynamic schema. How?
This way:
from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType schema = StructType([StructField(str(i), StringType(), True) for i in range(32)]) df = sqlContext.createDataFrame(rdd, schema)
This second way is cleaner to do that…
So this is how you can create dataframes dynamically.
Method 2
I liked Arun’s answer better but there is a tiny problem and I could not comment or edit the answer. sparkContext does not have createDeataFrame, sqlContext does (as Thiago mentioned). So:
from pyspark.sql import SQLContext
# assuming the spark environemnt is set and sc is spark.sparkContext
sqlContext = SQLContext(sc)
schemaPeople = sqlContext.createDataFrame(RDDName)
schemaPeople.createOrReplaceTempView("RDDName")
Method 3
Try if that works
sc = spark.sparkContext
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(RddName)
schemaPeople.createOrReplaceTempView("RddName")
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0