Cargar datos
# Cargar un dataframe df = sqlContext.read.format("com.databricks.spark.csv").options(delimiter='\t',header='true',inferschema='true').load("/databricks-datasets/power-plant/data") display(df)
AT | V | AP | RH | PE |
---|---|---|---|---|
14.96 | 41.76 | 1024.07 | 73.17 | 463.26 |
25.18 | 62.96 | 1020.04 | 59.08 | 444.37 |
5.11 | 39.4 | 1012.16 | 92.14 | 488.56 |
20.86 | 57.32 | 1010.24 | 76.64 | 446.48 |
Generar conjunto de entrenamiento y test
#Definir una semilla seed = 1800009193L
# Generar un grupo de entrenamiento y otro de prueba con una proporción 80-20 (split20DF, split80DF) = df.randomSplit([.2, .8], seed=seed)
# Cachear los conjuntos de datos testSetDF = split20DF.cache() trainingSetDF = split80DF.cache()
display(trainingSetDF)
Generar el modelo
from pyspark.ml import Pipeline from pyspark.ml.regression import DecisionTreeRegressor from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.feature import VectorAssembler # Definir un vector de ensamblado para que las variables de entrada se queden en una sola "features" vectorizer = VectorAssembler() vectorizer.setInputCols(["AT", "V", "AP", "RH"]) vectorizer.setOutputCol("features")
# Definir molelo de arbol de regresión dt = DecisionTreeRegressor()
# Definir los parametros del modelo: # - Predicted_PE: columna que almacenará las predicciones estimadas # - features: columna que almacena el vector de variables predictoras # - PE: columna que almacena la predicción real # - 8 niveles de profundidad dt.setPredictionCol("Predicted_PE").setMaxBins(100).setFeaturesCol("features").setLabelCol("PE").setMaxDepth(8)
# Crear una 'pipeline' en la cual hay 2 elementos, un 'Vector Assembler' y un modelo 'Decision Tree', accesibles mediante el atributo 'stages'. pipeline = Pipeline(stages=[vectorizer, dt])
# Ajustar el modelo (Ejecutar) model = pipeline.fit(trainingSetDF)
# Visualizar los resultados vectAssembler = model.stages[0] dtModel = model.stages[1] print("Nodos: " + str(dtModel.numNodes)) print("Profundidad: "+ str(dtModel.depth)) # summary only print(dtModel.toDebugString)
Nodos: 503 Profundidad: 8 DecisionTreeRegressionModel (uid=DecisionTreeRegressor_4f21b2e2b1f92f4c08f3) of depth 8 with 503 nodes If (feature 0 <= 17.75) If (feature 0 <= 11.86) If (feature 0 <= 8.81) If (feature 0 <= 6.92) If (feature 1 <= 40.55) If (feature 1 <= 39.99) If (feature 0 <= 5.23) If (feature 2 <= 1003.91) Predict: 478.4114285714286 Else (feature 2 > 1003.91) Predict: 488.2392936802974 Else (feature 0 > 5.23) If (feature 1 <= 39.81) Predict: 485.9372307692308 Else (feature 1 > 39.81) ...
Predicción del modelo
predictions = model.transform(testSetDF) display(predictions)
AT | V | AP | RH | PE | features | Predicted_PE |
---|---|---|---|---|---|---|
1.81 | 39.42 | 1026.92 | 76.97 | 490.55 | [1,4,[],[1.81,39.42,1026.92,76.97]] | 488.2392936802974 |
3.2 | 41.31 | 997.67 | 98.84 | 489.86 | [1,4,[],[3.2,41.31,997.67,98.84]] | 488.81500000000005 |
3.38 | 41.31 | 998.79 | 97.76 | 489.11 | [1,4,[],[3.38,41.31,998.79,97.76]] | 488.81500000000005 |
3.4 | 39.64 | 1011.1 | 83.43 | 459.86 | [1,4,[],[3.4,39.64,1011.1,83.43]] | 488.2392936802974 |
3.51 | 35.47 | 1017.53 | 86.56 | 489.07 | [1,4,[],[3.51,35.47,1017.53,86.56]] | 488.2392936802974 |
3.63 | 38.44 | 1016.16 | 87.38 | 487.87 | [1,4,[],[3.63,38.44,1016.16,87.38]] | 488.2392936802974 |
Evaluar el modelo
# Cargar libreria de evaluación from pyspark.ml.evaluation import RegressionEvaluator
# Evaluación mediante el metodo de regresion regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")
# Evaluación 1 - RMSE: Error cuadrático medio rmse = regEval.evaluate(predictions) print(" Error cuadrático medio (RMSE): %.2f" % rmse)
## Error cuadrático medio (RMSE): 3.60
# Evaluación 2 - r2: Coeficiente de determinación r2 = regEval.evaluate(predictions, {regEval.metricName: "r2"}) print("coeficiente de determinación (r2): {0:.2f}".format(r2))
## Coeficiente de determinación (r2): 0.96
# Almacenar los datos en una tabla para poder mostrar estadisticas sqlContext.sql("DROP TABLE IF EXISTS Power_Plant_RMSE_Evaluation") dbutils.fs.rm("dbfs:/user/hive/warehouse/Power_Plant_RMSE_Evaluation", True) predictions.selectExpr("PE", "Predicted_PE", "PE - Predicted_PE Residual_Error", "(PE - Predicted_PE) / {} Within_RSME".format(rmse)).registerTempTable("Power_Plant_RMSE_Evaluation")
0 comentarios