Procesamiento por lotes (batch) Apache Spark en Python

por | Nov 23, 2017 | Python, Spark | 2 Comentarios

Datos json usados para el análisis

%fs head /databricks-datasets/structured-streaming/events/file-0.json

{«time»:1469501107,»action»:»Open»}
{«time»:1469501147,»action»:»Open»}
{«time»:1469501202,»action»:»Open»}
{«time»:1469501219,»action»:»Open»}
{«time»:1469501225,»action»:»Open»}
{«time»:1469501234,»action»:»Open»}

Fuente: www.batabricks.com

 

Definir DataFrame especifico de Spark

# El DataFrame propio de Spark "pyspark" acelera el procesamiento de los datos
from pyspark.sql.types import *

pathText = "/databricks-datasets/structured-streaming/events/"
jsonSchema = StructType([ 
                 StructField("time", TimestampType(), True), 
                 StructField("action", StringType(), True) 
             ])

# DataFrame que representa datos en los archivos JSON
df = (
  spark
    .read
    .schema(jsonSchema)
    .json(pathText)
)

# Visualizar el DataFrame
display(df)
timeaction
2016-07-28T04:19:28.000+0000Close
2016-07-28T04:19:28.000+0000Close
2016-07-28T04:19:29.000+0000Open
2016-07-28T04:19:31.000+0000Close

 

Agrupar un DataFrame por minuto y tipo

# Importar librería para usar la función window
from pyspark.sql.functions import *

# Definir el DataFrame agrupado por minuto a partir del original
df_by_min2 = (
  df
    .groupBy(
       df.action, 
       window(df.time, "1 minute"))   # 1 hour ...   
    .count()
)
# Visualizar el DataFrame
display(df_by_min2)


Realizar consultas SQL

# Almacenar el DataFrame como una tabla de SparkQL
df.createOrReplaceTempView("view_df")

# Realizar la consulta
result = spark.sql("SELECT action, COUNT(time) as total FROM view_df group by action")

# Visualizar el resultado de la consulta SQL
result.show()

+------+-----+ 
|action|total| 
+------+-----+ 
|  Open|50000| 
| Close|50000| 
+------+-----+

2 Comentarios

  1. Gabriela

    Hola,

    Oye ya tengo los datos en spark, pero no se ven «bonitos» para presentarlos a la dirección, ¿cómo hago que tengan el formato # DataFrame que representa datos en los archivos JSON?

    Saludos y gracias
    Gaby

    Responder
  2. Ivanny

    Hola,
    Estoy aprendiendo con spark 2.4.5 y la instrucción display(df) no muestra el contenido del df como a vosotros, sino el tipo de datos. ejm
    Code:

    df = spark.read.csv(pathFile, inferSchema=True, header=True)
    display(dfPostventa)

    Resultado:
    DataFrame[ID: string, Comentario: string]

    Es posible que se deba a la versión de spark? o estoy haciendo algo mal?

    Un saludo

    Responder

Enviar un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *