Cálculo de media móvil en Dataframe Scala
Calcula la media móvil a partir de una ventana temporal de 3 periodos
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ val df = sc.parallelize( List(("Sensor1", "2016-05-01", 50.00), ("Sensor1", "2016-05-03", 45.00), ("Sensor1", "2016-05-04", 55.00), ("Sensor2", "2016-05-01", 25.00), ("Sensor2", "2016-05-04", 29.00), ("Sensor2", "2016-05-06", 27.00)) ).toDF("sensor", "fecha", "valor") // Crea una ventana tempora de 3 periodos sobre la que realizar calculos val window = Window.partitionBy("sensor").orderBy("fecha").rowsBetween(-1, 1) // Calcula la media movil df.withColumn("media_movil", avg(df("valor")).over(window) ).show()
+-------+----------+-----+-----------+ | sensor| fecha|valor|media_movil| +-------+----------+-----+-----------+ |Sensor2|2016-05-01| 25.0| 27.0| |Sensor2|2016-05-04| 29.0| 27.0| |Sensor2|2016-05-06| 27.0| 28.0| |Sensor1|2016-05-01| 50.0| 47.5| |Sensor1|2016-05-03| 45.0| 50.0| |Sensor1|2016-05-04| 55.0| 50.0| +-------+----------+-----+-----------+
Suma acumulativa en Dataframe Scala
Calcula una suma acumulativa usando separador de ventana el identificador del sensor
val window = Window.partitionBy("sensor").orderBy("fecha").rowsBetween(Long.MinValue, 0) df.withColumn( "suma_acumulada", sum(df("valor")).over(window)).show()
+-------+----------+-----+--------------+ | sensor| fecha|valor|suma_acumulada| +-------+----------+-----+--------------+ |Sensor2|2016-05-01| 25.0| 25.0| |Sensor2|2016-05-04| 29.0| 54.0| |Sensor2|2016-05-06| 27.0| 81.0| |Sensor1|2016-05-01| 50.0| 50.0| |Sensor1|2016-05-03| 45.0| 95.0| |Sensor1|2016-05-04| 55.0| 150.0| +-------+----------+-----+--------------+
Valor previo en un Dataframe Scala
Calcula una suma acumulativa usando separador de ventana el identificador del sensor
val window = Window.partitionBy("sensor").orderBy("fecha") df.withColumn("valor_previo", lag(df("valor"), 1).over(window) ).show()
+-------+----------+-----+------------+ | sensor| fecha|valor|valor_previo| +-------+----------+-----+------------+ |Sensor2|2016-05-01| 25.0| null| |Sensor2|2016-05-04| 29.0| 25.0| |Sensor2|2016-05-06| 27.0| 29.0| |Sensor1|2016-05-01| 50.0| null| |Sensor1|2016-05-03| 45.0| 50.0| |Sensor1|2016-05-04| 55.0| 45.0| +-------+----------+-----+------------+
Ranking en Dataframe Scala
Calcula una suma acumulativa usando separador de ventana el identificador del sensor
val window = Window.partitionBy("sensor").orderBy("fecha") df.withColumn("ranking", rank().over(window) ).show()
+-------+----------+-----+-------+ | sensor| fecha|valor|ranking| +-------+----------+-----+-------+ |Sensor2|2016-05-01| 25.0| 1| |Sensor2|2016-05-04| 29.0| 2| |Sensor2|2016-05-06| 27.0| 3| |Sensor1|2016-05-01| 50.0| 1| |Sensor1|2016-05-03| 45.0| 2| |Sensor1|2016-05-04| 55.0| 3| +-------+----------+-----+-------+
0 comentarios