Ejemplo de regresión lineal sobre segmentos de datos
Realiza diferentes regresiones de datos tomando como grupo para segmentar la clave formado por el identificador y el tipo
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StructField, StructType, StringType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} object App{ def lineal_regresion (df: DataFrame) = { // Definir características val features = new VectorAssembler() .setInputCols(Array("timestamp")) .setOutputCol("features") // Definir modelo a utilizar val lr = new LinearRegression().setLabelCol("duration") // Crear una tuberia que asocie el modelo con la secuencia de tratamiento de datos val pipeline = new Pipeline().setStages(Array(features, lr)) //Ejecutar el modelo val model = pipeline.fit(df) //Mostrar resultados del modelo val linRegModel = model.stages(1).asInstanceOf[LinearRegressionModel] linRegModel.coefficients(0) :: linRegModel.intercept :: Nil } def calculation(id:Row, main_df: DataFrame)= List{ var df = main_df.filter(col("id") === id(0) && col("type") === id(1) ) val regresion = lineal_regresion(df) (df.agg(max(df("duration")), avg(df("duration")), variance(df("duration")), min(df("timestamp")), max(df("timestamp"))).head() , regresion(0), regresion(1), id(0), id(1)) } def main(args: Array[String]) { val conf = new SparkConf() .setAppName("DiegoWeb") .setMaster("local[2]") .set("spark.executor.memory", "1g") val sc = new SparkContext(conf) val sparkSession = SparkSession .builder() .config(conf) .getOrCreate() val data = List( Row("1","A",100,1541113010), Row("1","A",200,1541123010), Row("1","B",300,1541133010), Row("2","A",400,1541143010), Row("2","A",500,1541153010), Row("2","B",600,1541163010) ) val rdd = sc.parallelize(data) val schema = StructType( List( StructField("id", StringType), StructField("type", StringType), StructField("duration", IntegerType), StructField("timestamp", IntegerType) ) ) val df = sparkSession.createDataFrame(rdd,schema) df.show() val regression = lineal_regresion(df) println(regression(0)) println(regression(1)) val ids = df.select(df("id"),df("type")).distinct.rdd.collect().toList val x = ids.flatMap( ids => calculation(ids, df)) println(x) } }
List(([200,150.0,5000.0,1541113010,1541123010],0.00999966655456395,-1.5410516221233152E7,1,A), ([600,600.0,NaN,1541163010,1541163010],0.0,600.0,2,B), ([300,300.0,NaN,1541133010,1541133010],0.0,300.0,1,B), ([500,450.0,5000.0,1541143010,1541153010],0.010000331975887176,-1.5411541723977894E7,2,A))
Configuración del fichero SBT
name := "regression" version := "0.1" scalaVersion := "2.10.6" mainClass := Some("com.example.Main_Example") libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.1" libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.1.1"
0 comentarios