Палец вверх 2
Перевод
Перевод

Как рассчитать среднее и стандартное отклонение с учетом PySpark DataFrame?

У меня есть PySpark DataFrame ( не pandas ) с именем df который достаточно велик для использования collect() . Поэтому приведенный ниже код неэффективен. Это работало с меньшим количеством данных, однако теперь это терпит неудачу.

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

Есть ли способ получить mean и стандартное значение как две переменные, используя pyspark.sql.functions или аналогичные?

from pyspark.sql.functions import mean as mean_, std as std_

Я мог бы использовать withColumn , однако, этот подход применяет вычисления withColumn , и он не возвращает единственную переменную.

ОБНОВИТЬ:

Пример содержания df :

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

Я должен рассчитать среднее и стандартное отклонение значений score , например, значение 1 в [691,1] является одним из баллов.

python apache-spark pyspark apache-spark-sql
задан Markus 27 дек. 2017 г., 19:05:04
источник

1 ответ

Решение 8
Перевод
Перевод

Вы можете использовать встроенные функции для получения совокупной статистики. Вот как можно получить среднее и стандартное отклонение.

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

Обратите внимание, что есть три разные функции стандартного отклонения. Из документов, которые я использовал ( stddev ), возвращается следующее:

Агрегатная функция: возвращает несмещенное стандартное отклонение выборки выражения в группе

Вы также можете использовать метод describe() :

df.describe().show()

Обратитесь к этой ссылке для получения дополнительной информации: pyspark.sql.functions

ОБНОВЛЕНИЕ : Это, как вы можете работать с вложенными данными.

Используйте explode для извлечения значений в отдельные строки, затем вызовите mean и stddev как показано выше.

Вот MWE:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

Какие выводы:

[2.3333333333333335, 1.505545305418162]

Вы можете проверить правильность этих значений, используя numpy :

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

Пояснение: Ваш столбец "products" является list list s. Вызов explode создаст новую строку для каждого элемента внешнего list . Затем возьмите значение "score" из каждой разнесенной строки, которое вы определили как второй элемент в list из 2 элементов. Наконец, вызовите агрегатные функции в этом новом столбце.

ответ дан pault 27 дек. 2017 г., 19:27:15
источник