Datos json usados para el análisis
%fs head /databricks-datasets/structured-streaming/events/file-0.json
{«time»:1469501107,»action»:»Open»}
{«time»:1469501147,»action»:»Open»}
{«time»:1469501202,»action»:»Open»}
{«time»:1469501219,»action»:»Open»}
{«time»:1469501225,»action»:»Open»}
{«time»:1469501234,»action»:»Open»}
Fuente: www.batabricks.com
Definir DataFrame especifico de Spark
# El DataFrame propio de Spark "pyspark" acelera el procesamiento de los datos from pyspark.sql.types import * pathText = "/databricks-datasets/structured-streaming/events/" jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ]) # DataFrame que representa datos en los archivos JSON df = ( spark .read .schema(jsonSchema) .json(pathText) ) # Visualizar el DataFrame display(df)
time | action |
2016-07-28T04:19:28.000+0000 | Close |
2016-07-28T04:19:28.000+0000 | Close |
2016-07-28T04:19:29.000+0000 | Open |
2016-07-28T04:19:31.000+0000 | Close |
Agrupar un DataFrame por minuto y tipo
# Importar librería para usar la función window from pyspark.sql.functions import * # Definir el DataFrame agrupado por minuto a partir del original df_by_min2 = ( df .groupBy( df.action, window(df.time, "1 minute")) # 1 hour ... .count() ) # Visualizar el DataFrame display(df_by_min2)
Realizar consultas SQL
# Almacenar el DataFrame como una tabla de SparkQL df.createOrReplaceTempView("view_df") # Realizar la consulta result = spark.sql("SELECT action, COUNT(time) as total FROM view_df group by action") # Visualizar el resultado de la consulta SQL result.show() +------+-----+ |action|total| +------+-----+ | Open|50000| | Close|50000| +------+-----+
Hola,
Oye ya tengo los datos en spark, pero no se ven «bonitos» para presentarlos a la dirección, ¿cómo hago que tengan el formato # DataFrame que representa datos en los archivos JSON?
Saludos y gracias
Gaby
Hola,
Estoy aprendiendo con spark 2.4.5 y la instrucción display(df) no muestra el contenido del df como a vosotros, sino el tipo de datos. ejm
Code:
df = spark.read.csv(pathFile, inferSchema=True, header=True)
display(dfPostventa)
Resultado:
DataFrame[ID: string, Comentario: string]
Es posible que se deba a la versión de spark? o estoy haciendo algo mal?
Un saludo