Agrupar por clave – groupByKey()
Agrupa los elementos de un RDD por clave.
val words = sc.parallelize(List("avion", "tren", "barco", "coche", "moto", "bici"), 2) val rdd_with_key = words.keyBy(_.length) // se usa la longitud de la palabra como clave rdd_with_key.groupByKey.collect()
res: Array[(Int, Iterable[String])] = Array((4,CompactBuffer(tren, moto, bici)), (5,CompactBuffer(avion, barco, coche)))
Reducir por clave – reduceByKey()
Agrega elementos de un RDD por clave de tal manera que estos queden aplanados
val rdd = sc.parallelize(List("avion", "tren", "barco", "coche", "moto", "bici"), 2) val map_rdd = rdd.map(x => (x.length, x)) map_rdd.reduceByKey(_ + ";" + _).collect()
res: Array[(Int, String)] = Array((4,tren;moto;bici), (5,avion;barco;coche))
Agregar por clave – aggregateByKey()
Nos devuelve un conjunto de datos de pares (K, V) donde los valores de cada clave se agregan utilizando las funciones combinadas dadas y donde tenemos un valor por defecto de inicio. Ejemplo empezar a contar desde cero.
val rdd = sc.parallelize(List(("avion", 1), ("avion", 2), ("barco", 4), ("barco", 3))) rdd.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect()
res: Array[(String, Int)] = Array((avion,3), (barco,7))
Agrupamiento múltiple – cogroup()
Nos devuelve un RDD fruto del agurpamiento por clave hasta 3 RDDs.
val rdd1 = sc.parallelize(List((1, "avion"), (1, "tren"), (1, "barco"), (2, "coche"), (3, "moto"))) val rdd2 = sc.parallelize(List((1, "rojo"), (2, "azul"), (3, "verde"))) val rdd3 = sc.parallelize(List((1, "grande"), (2, "mediano"), (3, "pequeño"))) rdd1.cogroup(rdd2, rdd3).collect()
res: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(avion, tren, barco),CompactBuffer(rojo),CompactBuffer(grande))), (2,(CompactBuffer(coche),CompactBuffer(azul),CompactBuffer(mediano))), (3,(CompactBuffer(moto),CompactBuffer(verde),CompactBuffer(pequeño))))
0 comentarios