Prerequisitos
Tener configurado la paquetería de Spark para IntelliJ IDEA
Incluir en el fichero pon la paquetería propia de Elastic:
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency>
Ejemplo de como escribir un dataframe transformado a Mapa en un Elastic 6.4.2 local en Scala 2.1.1.
package com.scala import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark._ import scala.util.Random object App { def main(args : Array[String]) { val conf = new SparkConf() .setAppName("App") .setMaster("local") .set("es.nodes", "localhost") .set("es.port", "9200") .set("es.scheme", "http") val sc = new SparkContext(conf) val spark = SparkSession .builder() .config(conf) .getOrCreate() // Create a RDD val rdd = spark.sparkContext.parallelize( Seq.fill(10000) { (Math.abs(Random.nextLong % 100L), Math.abs(Random.nextLong % 1000000000000L)) }) import spark.implicits._ val df = rdd.toDF("duration", "timestamp").cache() df.show() val mapa = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*)) sc.makeRDD(mapa).saveToEs("elastic_df/docs") } }
Comprobar datos escritos en Elastic
Mostrar todos los indices almacenados en Elastic
http://localhost:9200/_cat/indices?v
Mostrar datos del índice “elastic_df” en concreto
http://localhost:9200/elastic_df/_search?pretty=true&q=*
Si quisieramos borrar el indice generado, tendríamos que ir a la linea de comandos y ejecutar:
curl -XDELETE 'https://localhost:9200/elastic_df'
0 comentarios