Prerequisitos
Tener configurado la paquetería de Spark para IntelliJ IDEA
Tener un Elastic con Searchguard instalado
Incluir en el fichero pon la paquetería propia de Elastic:
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-20 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.4.2</version> </dependency>
Ejemplo de como escribir un dataframe transformado a Mapa en un Elastic 5.2.0 local en Scala 2.1.1.
package com.scala import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark._ import org.apache.spark.sql.SparkSession import scala.util.Random object App { def main(args : Array[String]) { val conf = new SparkConf() .setAppName("ElasticSearch") .setMaster("local") .set("es.index.auto.create", "true") .set("es.nodes", "servidor_elastic_remoto") .set("es.port","9200") .set("es.net.http.auth.user","admin") .set("es.net.http.auth.pass","admin") .set("es.net.ssl","true") .set("es.net.ssl.cert.allow.self.signed","true") .set("es.http.timeout","5m") .set("es.net.ssl.truststore.location","truststore.jks") .set("es.net.ssl.truststore.pass","changeit") val sc = new SparkContext(conf) val spark = SparkSession .builder() .config(conf) .getOrCreate() // Create a RDD val rdd = spark.sparkContext.parallelize( Seq.fill(10000) { (Math.abs(Random.nextLong % 100L), Math.abs(Random.nextLong % 1000000000000L)) }) import spark.implicits._ val df = rdd.toDF("duration", "timestamp").cache() df.show() val mapa = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*)) sc.makeRDD(mapa).saveToEs("elastic_df/docs") } }
Compilar
Generar la claves ssl segura «truststore.jks» siguiendo las instrucciones de Elastic para Searchguard
Incluir la clave generada «truststore.jks» al nivel del fichero App.scala
Ejecutar el jar generado
Incluir el fichero «truststore.jks» en el jar a generar
Ejecutar el comando siguiente donde se indica las librerías necesarias que no existan en una maquina con spark estandar:
spark-submit --jars "/tmp/elasticsearch-spark-13_2.11-5.2.2.jar" --class com.scala.App /tmp/elastic_conexion.jar
Comprobar datos escritos en Elastic
Mostrar todos los indices almacenados en Elastic
http://localhost:9200/_cat/indices?v
Mostrar datos del índice “elastic_df” en concreto
http://localhost:9200/elastic_df/_search?pretty=true&q=*
Si quisieramos borrar el indice generado, tendriamos que ir a la linea de comandos y ejecutar:
curl --insecure -u usuario:password -XDELETE 'https://localhost:9200/nombre_de_indice'
0 comentarios