I have a Spark DataFrame loaded up in memory, and I want to take the mean (or any aggregate operation) over the columns. How would I do that? (In numpy, this is known as taking an operation over axis=1).
If one were calculating the mean of the DataFrame down the rows (axis=0), then this is already built in:
from pyspark.sql import functions as F F.mean(...)
But is there a way to programmatically do this against the entries in the columns? For example, from the DataFrame below
+--+--+---+---+ |id|US| UK|Can| +--+--+---+---+ | 1|50| 0| 0| | 1| 0|100| 0| | 1| 0| 0|125| | 2|75| 0| 0| +--+--+---+---+
Omitting id, the means would be
+------+ | mean| +------+ | 16.66| | 33.33| | 41.67| | 25.00| +------+
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
All you need here is a standard SQL like this:
SELECT (US + UK + CAN) / 3 AS mean FROM df
which can be used directly with SqlContext.sql or expressed using DSL
df.select(((col("UK") + col("US") + col("CAN")) / lit(3)).alias("mean"))
If you have a larger number of columns you can generate expression as follows:
from functools import reduce
from operator import add
from pyspark.sql.functions import col, lit
n = lit(len(df.columns) - 1.0)
rowMean = (reduce(add, (col(x) for x in df.columns[1:])) / n).alias("mean")
df.select(rowMean)
or
rowMean = (sum(col(x) for x in df.columns[1:]) / n).alias("mean")
df.select(rowMean)
Finally its equivalent in Scala:
df.select(df.columns
.drop(1)
.map(col)
.reduce(_ + _)
.divide(df.columns.size - 1)
.alias("mean"))
In a more complex scenario you can combine columns using array function and use an UDF to compute statistics:
import numpy as np
from pyspark.sql.functions import array, udf
from pyspark.sql.types import FloatType
combined = array(*(col(x) for x in df.columns[1:]))
median_udf = udf(lambda xs: float(np.median(xs)), FloatType())
df.select(median_udf(combined).alias("median"))
The same operation expressed using Scala API:
val combined = array(df.columns.drop(1).map(col).map(_.cast(DoubleType)): _*)
val median_udf = udf((xs: Seq[Double]) =>
breeze.stats.DescriptiveStats.percentile(xs, 0.5))
df.select(median_udf(combined).alias("median"))
Since Spark 2.4 an alternative approach is to combine values into an array and apply aggregate expression. See for example Spark Scala row-wise average by handling null.
Method 2
in Scala something like this would do it
val cols = Seq("US","UK","Can")
f.map(r => (r.getAs[Int]("id"),r.getValuesMap(cols).values.fold(0.0)(_+_)/cols.length)).toDF
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0