Example of pipeline concatenation
In this example, you can show an example of how elements are included in a pipe in such a way that finally all converge in the same point, which we call “features”
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler # Define the Spark DF to use df = spark.createDataFrame([ ('line_1', 1, 2, 3, 4), ('line_2', 5, 6, 7, 8), ('line_3', 9, 9, 9, 9) ], ("label", "x1", "x2", "x3", "x4")) # Define an assembler of the columns 'x1' and 'x2' and take as output 'features1' assembler12 = VectorAssembler(inputCols=["x1", "x2"], outputCol="features1") # Create the pipeline pipeline12 = Pipeline() # Define the stages that the pipeline is made of pipeline12.setStages([assembler12]) # Define an assembler of the columns 'x3' and 'x4' and take as output 'features2' assembler34 = VectorAssembler(inputCols=["x3", "x4"], outputCol="features2") # Create the pipeline pipeline34 = Pipeline() # Define the stages that the pipe is made of pipeline34.setStages([assembler34]) # Define an assembler of the columns 'features1' and 'features2' and take as output 'features' assemblerResult = VectorAssembler(inputCols=["features1", "features2"], outputCol="features") # Create the pipe pipelineResult = Pipeline() # Define the stages that the pipe is made of pipelineResult.setStages([pipeline12, pipeline34, assemblerResult]) # Model of piping adjustment with data 'df' input modelResult = pipelineResult.fit(df) # Make data transformation using the model result_df = modelResult.transform(df) # shows the results display(result_df)
Example of pipelines use
# Create the pipeline pipelineResult = Pipeline() # Define the stages that the pipe is made of pipelineResult.setStages([assembler12, assembler34, assemblerResult]) # Model of piping adjustment with data ' df ' input modelResult = pipelineResult.fit(df) # Make data transformation using the model result_df2 = modelResult.transform(df) # Shows the results display(result_df2)
Example of pipe concatenation 2 (optimizing sentences)
from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler df = spark.createDataFrame([ ('line_1', 1, 2, 3, 4), ('line_2', 5, 6, 7, 8), ('line_3', 9, 9, 9, 9) ], ("label", "x1", "x2", "x3", "x4")) pipeline1 = Pipeline(stages=[ VectorAssembler(inputCols=["x1", "x2"], outputCol="features1") ]) pipeline2 = Pipeline(stages=[ VectorAssembler(inputCols=["x3", "x4"], outputCol="features2") ]) result = Pipeline(stages=[ pipeline1, pipeline2, VectorAssembler(inputCols=["features1", "features2"], outputCol="features") ]).fit(df).transform(df) display(result)
0 Comments