How to export a table dataframe in PySpark to csv?

I am using Spark 1.3.1 (PySpark) and I have generated a table using a SQL query. I now have an object that is a DataFrame. I want to export this DataFrame object (I have called it “table”) to a csv file so I can manipulate it and plot the columns. How do I export the DataFrame “table” to a csv file?

Thanks!

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

If data frame fits in a driver memory and you want to save to local files system you can convert Spark DataFrame to local Pandas DataFrame using toPandas method and then simply use to_csv:

df.toPandas().to_csv('mycsv.csv')

Otherwise you can use spark-csv:

  • Spark 1.3
    df.save('mycsv.csv', 'com.databricks.spark.csv')
  • Spark 1.4+
    df.write.format('com.databricks.spark.csv').save('mycsv.csv')

In Spark 2.0+ you can use csv data source directly:

df.write.csv('mycsv.csv')

Method 2

For Apache Spark 2+, in order to save dataframe into single csv file. Use following command

query.repartition(1).write.csv("cc_out.csv", sep='|')

Here 1 indicate that I need one partition of csv only. you can change it according to your requirements.

Method 3

If you cannot use spark-csv, you can do the following:

df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")

If you need to handle strings with linebreaks or comma that will not work. Use this:

import csv
import cStringIO

def row2csv(row):
    buffer = cStringIO.StringIO()
    writer = csv.writer(buffer)
    writer.writerow([str(s).encode("utf-8") for s in row])
    buffer.seek(0)
    return buffer.read().strip()

df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")

Method 4

You need to repartition the Dataframe in a single partition and then define the format, path and other parameter to the file in Unix file system format and here you go,

df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')

Read more about the repartition function
Read more about the save function

However, repartition is a costly function and toPandas() is worst. Try using .coalesce(1) instead of .repartition(1) in previous syntax for better performance.

Read more on repartition vs coalesce functions.

Method 5

How about this (in case you don’t want a one liner) ?

for row in df.collect():
    d = row.asDict()
    s = "%dt%st%sn" % (d["int_column"], d["string_column"], d["string_column"])
    f.write(s)

f is an opened file descriptor. Also the separator is a TAB char, but it’s easy to change to whatever you want.

Method 6

Using PySpark

Easiest way to write in csv in Spark 3.0+

sdf.write.csv("/path/to/csv/data.csv")

this can generate multiple files based on the number of spark nodes you are using. In case you want to get it in a single file use repartition.

sdf.repartition(1).write.csv("/path/to/csv/data.csv")

Using Pandas

If your data is not too much and can be held in the local python, then you can make use of pandas too

sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)

Using Koalas

sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)

Method 7

'''
I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
'''

import shutil
import os
import glob

path = 'test_write'
#write single csv
students.repartition(1).write.csv(path)

#rename and relocate the csv
shutil.move(glob.glob(os.getcwd() + '\' + path + '\' + r'*.csv')[0], os.getcwd()+ '\' + path+ '.csv')

#remove additional directory
shutil.rmtree(os.getcwd()+'\'+path)

Method 8

try display(df) and use the download option in the results. Please note: only 1 million rows can be downloaded with this option but its really quick.

Method 9

I used the method with pandas and this gave me horrible performance. In the end it took so long that I stopped to look for another method.

If you are looking for a way to write to one csv instead of multiple csv’s this would be what you are looking for:

df.coalesce(1).write.csv("train_dataset_processed", header=True)

It reduced processing my dataset from 2+ hours to 2 minutes


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x