Spark DataFrame: Computing row-wise mean (or any aggregate operation)

I have a Spark DataFrame loaded up in memory, and I want to take the mean (or any aggregate operation) over the columns. How would I do that? (In numpy, this is known as taking an operation over axis=1).

If one were calculating the mean of the DataFrame down the rows (axis=0), then this is already built in:

from pyspark.sql import functions as F
F.mean(...)

But is there a way to programmatically do this against the entries in the columns? For example, from the DataFrame below

+--+--+---+---+
|id|US| UK|Can|
+--+--+---+---+
| 1|50|  0|  0|
| 1| 0|100|  0|
| 1| 0|  0|125|
| 2|75|  0|  0|
+--+--+---+---+

Omitting id, the means would be

+------+
|  mean|
+------+
| 16.66|
| 33.33|
| 41.67|
| 25.00|
+------+

Answers:

Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.

Method 1

All you need here is a standard SQL like this:

SELECT (US + UK + CAN) / 3 AS mean FROM df

which can be used directly with SqlContext.sql or expressed using DSL

df.select(((col("UK") + col("US") + col("CAN")) / lit(3)).alias("mean"))

If you have a larger number of columns you can generate expression as follows:

from functools import reduce
from operator import add
from pyspark.sql.functions import col, lit

n = lit(len(df.columns) - 1.0)
rowMean  = (reduce(add, (col(x) for x in df.columns[1:])) / n).alias("mean")

df.select(rowMean)

or

rowMean  = (sum(col(x) for x in df.columns[1:]) / n).alias("mean")
df.select(rowMean)

Finally its equivalent in Scala:

df.select(df.columns
  .drop(1)
  .map(col)
  .reduce(_ + _)
  .divide(df.columns.size - 1)
  .alias("mean"))

In a more complex scenario you can combine columns using array function and use an UDF to compute statistics:

import numpy as np
from pyspark.sql.functions import array, udf
from pyspark.sql.types import FloatType

combined = array(*(col(x) for x in df.columns[1:]))
median_udf = udf(lambda xs: float(np.median(xs)), FloatType())

df.select(median_udf(combined).alias("median"))

The same operation expressed using Scala API:

val combined = array(df.columns.drop(1).map(col).map(_.cast(DoubleType)): _*)
val median_udf = udf((xs: Seq[Double]) => 
    breeze.stats.DescriptiveStats.percentile(xs, 0.5))

df.select(median_udf(combined).alias("median"))

Since Spark 2.4 an alternative approach is to combine values into an array and apply aggregate expression. See for example Spark Scala row-wise average by handling null.

Method 2

in Scala something like this would do it

val cols = Seq("US","UK","Can")
f.map(r => (r.getAs[Int]("id"),r.getValuesMap(cols).values.fold(0.0)(_+_)/cols.length)).toDF


All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0

0 0 votes
Article Rating
Subscribe
Notify of
guest

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x