Pyspark: Split multiple array columns into rows

I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a separate row, while keeping any non-list column as is.

Sample DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

What I want:

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

If I only had one list column, this would be easy by just doing an explode:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

However, if I try to also explode the c column, I end up with a dataframe with a length the square of what I want:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

What I want is – for each column, take the nth element of the array in that column and add that to a new row. I’ve tried mapping an explode accross all columns in the dataframe, but that doesn’t seem to work either:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

Spark >= 2.4

You can replace zip_ udf with arrays_zip function

from pyspark.sql.functions import arrays_zip, col, explode

(df
    .withColumn("tmp", arrays_zip("b", "c"))
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2.4

With DataFrames and UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)

(df
    .withColumn("tmp", zip_("b", "c"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

With RDDs:

(df
    .rdd
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
    .toDF(["a", "b", "c", "d"]))

Both solutions are inefficient due to Python communication overhead. If data size is fixed you can do something like this:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
        for i in range(n))
).toDF("a", "b", "c", "d")

or even:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
    for i in range(n)
]))

(df
    .withColumn("tmp", tmp)
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

This should be significantly faster compared to UDF or RDD. Generalized to support an arbitrary number of columns:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
    return explode(array(*[
        struct(*[col(c).getItem(i).alias(c) for c in colnames])
        for i in range(n)
    ]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

Method 2

You’d need to use flatMap, not map as you want to make multiple output rows out of each input row.

from pyspark.sql import Row
def dualExplode(r):
    rowDict = r.asDict()
    bList = rowDict.pop('b')
    cList = rowDict.pop('c')
    for b,c in zip(bList, cList):
        newDict = dict(rowDict)
        newDict['b'] = b
        newDict['c'] = c
        yield Row(**newDict)

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))

Method 3

One liner (for Spark>=2.4.0):

df.withColumn("bc", arrays_zip("b","c"))
  .select("a", explode("bc").alias("tbc"))
  .select("a", col"tbc.b", "tbc.c").show()

Import required:

from pyspark.sql.functions import arrays_zip


Steps –

  1. Create a column bc which is an array_zip of columns b and c
  2. Explode bc to get a struct tbc
  3. Select the required columns a, b and c (all exploded as required).

Output:

> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|
+---+---+---+


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x