Spark es un motor ultrarrápido para el almacenamiento, procesamiento y análisis de grandes volúmenes de datos.
Es de código abierto y se encuentra gestionado por la Apache Software Foundation.
Apache Spark está especialmente diseñado para su implementación en Big data y Machine Learning.Pues su potencia de procesamiento agiliza la detección de patrones en los datos, la clasificación organizada de la información, la ejecución de cómputo intensivo sobre los datos y el procesamiento paralelo en clústers.
RStudio ha publicado sparklyr, un nuevo paquete R que ofrece un interfaz entre R y pache Spark.
1. Instalación y carga de librerías
#Instalar paquetes
install.packages("sparklyr")
install.packages("tidyverse")
install.packages("nycflights13")
#Cargar librerias
library(sparklyr)
library(nycflights13)
library(tidyverse)
spark_install("3.0.0")
2. Conexión al entorno Spark
#Nos conectamos al entorno de Spark en local
sesion_spark <- spark_connect(master="local", version="3.0.0")
3. Operaciones con datos
#Guardamos los datos del dataframe en una variable en el entorno Spark
vuelos <- sdf_copy_to(sesion_spark,flights)
#Extraemos el numero de registros que tiene
vuelos %>%
tally()
## Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 336776
#Extraemos el número de registros que tiene en función al aeropuerto desde donde sale
vuelos %>%
group_by(origin)%>%
tally()
## Source: spark<?> [?? x 2]
## origin n
## <chr> <dbl>
## 1 JFK 111279
## 2 EWR 120835
## 3 LGA 104662
#Obtenemos el tiempo medio en horas de retraso en los vuelos por aeropuerto
vuelos %>%
group_by(origin)%>%
summarise(retraso=mean(dep_delay))
## Source: spark<?> [?? x 2]
## origin retraso
## <chr> <dbl>
## 1 JFK 12.1
## 2 EWR 15.1
## 3 LGA 10.3
Extrae si un vuelo ha llegado tarde o no, de manera que todos los que reflejen un numero de horas en la columna arr_delay mayor a 0 será que han llegado tarde
Todo el que refleje un número 1 en la columna tarde será una afirmación de que ha llegado tarde
vuelos %>%
ft_binarizer(
input_col = "arr_delay",
output_col= "tarde",
threshold= 0
)%>%
select(
arr_delay,
tarde
) #Comprobamos que se realiza de forma correcta
## Source: spark<?> [?? x 2]
## arr_delay tarde
## <dbl> <dbl>
## 1 11 1
## 2 20 1
## 3 33 1
## 4 -18 0
## 5 -25 0
## 6 12 1
## 7 19 1
## 8 -14 0
## 9 -8 0
## 10 8 1
## ... with more rows
Ahora vamos a extraer los rangos de las horas en las que llegaron los vuelos
vuelos %>%
mutate(sched_dep_time=as.numeric(sched_dep_time)) %>% #Trasformamos a dato numérico las horas de llegadas
ft_bucketizer(
input_col = "sched_dep_time",
output_col= "hora",
splits= c(0,800,1600,2400)
)%>%
select(
sched_dep_time,
hora
)#Comprobamos que se realiza de forma correcta
## Source: spark<?> [?? x 2]
## sched_dep_time hora
## <dbl> <dbl>
## 1 515 0
## 2 529 0
## 3 540 0
## 4 545 0
## 5 600 0
## 6 558 0
## 7 600 0
## 8 600 0
## 9 600 0
## 10 600 0
## ... with more rows
Ahora Extraemos el número de registros en cada rango de horas en las que llegaron los vuelos
vuelos %>%
mutate(sched_dep_time=as.numeric(sched_dep_time)) %>% #Trasformamos a dato numérico las horas de llegadas
ft_bucketizer(
input_col = "sched_dep_time",
output_col= "hora",
splits= c(0,800,1600,2400)
)%>%
group_by(hora)%>%
tally()%>%
arrange(hora)
## Source: spark<?> [?? x 2]
## Ordered by: hora
## hora n
## <dbl> <dbl>
## 1 0 50726
## 2 1 164026
## 3 2 122024
El rango 0 sera de 0 a 800, que se traduce como desde 00:00 hasta las 8:00, el rango 1 es de 800 a 1600 y el rango 2 es el que va de 1600 a 2400
#Obtenemos número de vuelos por meses
Datos_meses <- vuelos %>%
group_by(month) %>%
tally() %>%
collect()
Datos_meses
## A tibble: 12 x 2
## month n
## <int> <dbl>
## 1 12 28135
## 2 10 28889
## 3 5 28796
## 4 1 27004
## 5 3 28834
## 6 2 24951
## 7 6 28243
## 8 9 27574
## 9 11 27268
## 10 7 29425
## 11 4 28330
## 12 8 29327
4. Modelización
#Entrenamos un modelo
muestra_vuelos <- vuelos %>%
filter(!is.na(arr_delay))%>%
mutate(sched_dep_time=as.numeric(sched_dep_time)) %>% #Trasformamos a dato numérico las horas de llegadas
ft_binarizer(
input_col = "arr_delay",
output_col= "tarde",
threshold= 0
)%>%
ft_bucketizer(
input_col = "sched_dep_time",
output_col= "hora",
splits= c(0,800,1600,2400)
)%>%
mutate(dephour=paste0("h", as.integer(hora)))%>%
sdf_random_split(entrenar=0.07, examinar=0.03, otros=0.9)
muestra_vuelos$entrenar
## Source: spark<?> [?? x 22]
## year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay carrier flight
## <int> <int> <int> <int> <dbl> <dbl> <int> <int> <dbl> <chr> <int>
## 1 2013 1 1 558 600 -2 849 851 -2 B6 49
## 2 2013 1 1 613 610 3 925 921 4 B6 135
## 3 2013 1 1 646 645 1 1023 1030 -7 UA 1496
## 4 2013 1 1 656 659 -3 949 959 -10 AA 1815
## 5 2013 1 1 658 700 -2 944 939 5 DL 1547
## 6 2013 1 1 659 705 -6 907 913 -6 DL 831
## 7 2013 1 1 743 749 -6 1043 1054 -11 B6 341
## 8 2013 1 1 804 810 -6 1103 1116 -13 DL 1959
## 9 2013 1 1 810 815 -5 1100 1128 -28 DL 2395
## 10 2013 1 1 828 830 -2 1027 1012 15 B6 905
## ... with more rows, and 11 more variables: tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>,
## distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>, tarde <dbl>, hora <dbl>, dephour <chr>
Modelizamos con una regresión logística
modelo <- muestra_vuelos$entrenar %>%
ml_logistic_regression(tarde ~.)#Predecimos si llegará tarde un vuelo en función del resto de variables
Seleccionamos las columnas que más nos interesan para nuestro modelo de entrenamiento
entrenar <- muestra_vuelos$entrenar %>%
mutate(
arr_delay= ifelse(arr_delay=="NaN",0,arr_delay)
)%>%
select(
month,
sched_dep_time,
arr_delay,
distance
)%>%
mutate_all(as.numeric)
Creamos un Pipeline desarrollando los parámetros que vamos a utilizar en nuestro modelo
Modelo_ML_vuelos <- ml_pipeline(sesion_spark) %>%
ft_dplyr_transformer(
tbl=entrenar
) %>%
ft_binarizer(
input_col = "arr_delay",
output_col= "tarde",
threshold= 0
)%>%
ft_bucketizer(
input_col = "sched_dep_time",
output_col= "hora",
splits= c(0,800,1600,2400)
)%>%
ft_r_formula(tarde ~ hora + distance + arr_delay)%>%
ml_logistic_regression()
Modelo_ML_vuelos
## Pipeline (Estimator) with 5 stages
## <pipeline_428c5e382011>
## Stages
|--1 SQLTransformer (Transformer)
| <dplyr_transformer_428c8fa56a0>
| (Parameters -- Column Names)
|--2 Binarizer (Transformer)
| <binarizer_428c792442a7>
| (Parameters -- Column Names)
| input_col: arr_delay
| output_col: tarde
|--3 Bucketizer (Transformer)
| <bucketizer_428c5d3e6a14>
| (Parameters -- Column Names)
| input_col: sched_dep_time
| output_col: hora
|--4 RFormula (Estimator)
| <r_formula_428c366f683f>
| (Parameters -- Column Names)
| features_col: features
| label_col: label
| (Parameters)
| force_index_label: FALSE
| formula: tarde ~ hora + distance + arr_delay
| handle_invalid: error
| stringIndexerOrderType: frequencyDesc
|--5 LogisticRegression (Estimator)
| <logistic_regression_428c7fff6838>
| (Parameters -- Column Names)
| features_col: features
| label_col: label
| prediction_col: prediction
| probability_col: probability
| raw_prediction_col: rawPrediction
| (Parameters)
| aggregation_depth: 2
| elastic_net_param: 0
| family: auto
| fit_intercept: TRUE
| max_iter: 100
| reg_param: 0
| standardization: TRUE
| threshold: 0.5
| tol: 1e-06
Ajustamos el modelo a los datos
Modelo_fit_vuelos <- ml_fit(Modelo_ML_vuelos,muestra_vuelos$entrenar)
Modelo_fit_vuelos
## PipelineModel (Transformer) with 5 stages
## <pipeline_428c5e382011>
## Stages
|--1 SQLTransformer (Transformer)
| <dplyr_transformer_428c8fa56a0>
| (Parameters -- Column Names)
|--2 Binarizer (Transformer)
| <binarizer_428c792442a7>
| (Parameters -- Column Names)
| input_col: arr_delay
| output_col: tarde
|--3 Bucketizer (Transformer)
| <bucketizer_428c5d3e6a14>
| (Parameters -- Column Names)
| input_col: sched_dep_time
| output_col: hora
|--4 RFormulaModel (Transformer)
| <r_formula_428c366f683f>
| (Parameters -- Column Names)
| features_col: features
| label_col: label
| (Transformer Info)
| formula: chr "tarde ~ hora + distance + arr_delay"
|--5 LogisticRegressionModel (Transformer)
| <logistic_regression_428c7fff6838>
| (Parameters -- Column Names)
| features_col: features
| label_col: label
| prediction_col: prediction
| probability_col: probability
| raw_prediction_col: rawPrediction
| (Transformer Info)
| coefficient_matrix: num [1, 1:3] -0.5802 -0.0003 28.7768
| coefficients: num [1:3] -0.5802 -0.0003 28.7768
| intercept: num -13.1
| intercept_vector: num -13.1
| num_classes: int 2
| num_features: int 3
| threshold: num 0.5
| thresholds: num [1:2] 0.5 0.5
5. Predicciones
#Hacemos las predicciones
prediccion <- ml_transform(Modelo_fit_vuelos,muestra_vuelos$examinar)
prediccion
## Source: spark<?> [?? x 11]
## month sched_dep_time arr_delay distance tarde hora features label rawPrediction probability prediction
## <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <list> <dbl> <list> <list> <dbl>
## 1 1 545 -18 1576 0 0 <dbl [3~ 0 <dbl [2]> <dbl [2]> 0
## 2 1 615 -9 746 0 0 <dbl [3~ 0 <dbl [2]> <dbl [2]> 0
## 3 1 730 31 1389 1 0 <dbl [3~ 1 <dbl [2]> <dbl [2]> 1
## 4 1 810 11 1029 1 1 <dbl [3~ 1 <dbl [2]> <dbl [2]> 1
## 5 1 830 2 888 1 1 <dbl [3~ 1 <dbl [2]> <dbl [2]> 1
## 6 1 851 -25 2402 0 1 <dbl [3~ 0 <dbl [2]> <dbl [2]> 0
## 7 1 733 123 200 1 0 <dbl [3~ 1 <dbl [2]> <dbl [2]> 1
## 8 1 1155 8 397 1 1 <dbl [3~ 1 <dbl [2]> <dbl [2]> 1
## 9 1 1200 -3 213 0 1 <dbl [3~ 0 <dbl [2]> <dbl [2]> 0
## 10 1 1159 -8 1608 0 1 <dbl [3~ 0 <dbl [2]> <dbl [2]> 0
Se puede observar como las predicciones realizadas coinciden con la realidad, es decir, que la columna tarde coincide con la columna prediction, esto quiere decir que nuestro modelo de predicción tiene un alto porcentaje de acertar.
Vamos a ver el resultado total de predicción con una matriz de confusión
#Creamos la matriz de confusión
prediccion%>%
group_by(tarde,prediction)%>%
tally()
## Source: spark<?> [?? x 3]
## Groups: tarde
## tarde prediction n
## <dbl> <dbl> <dbl>
## 1 1 1 3973
## 2 0 0 5744
0 comentarios