Concatenate columns in Apache Spark DataFrame

SqlApache SparkDataframeApache Spark-Sql

Sql Problem Overview


How do we concatenate two columns in an Apache Spark DataFrame? Is there any function in Spark SQL which we can use?

Sql Solutions


Solution 1 - Sql

With raw SQL you can use CONCAT:

  • In Python

      df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
      df.registerTempTable("df")
      sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • In Scala

      import sqlContext.implicits._
    
      val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
      df.registerTempTable("df")
      sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

Since Spark 1.5.0 you can use concat function with DataFrame API:

  • In Python :

      from pyspark.sql.functions import concat, col, lit
    
      df.select(concat(col("k"), lit(" "), col("v")))
    
  • In Scala :

      import org.apache.spark.sql.functions.{concat, lit}
    
      df.select(concat($"k", lit(" "), $"v"))
    

There is also concat_ws function which takes a string separator as the first argument.

Solution 2 - Sql

Here's how you can do custom naming

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

gives,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

create new column by concatenating:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

Solution 3 - Sql

One option to concatenate string columns in Spark Scala is using concat.

It is necessary to check for null values. Because if one of the columns is null, the result will be null even if one of the other columns do have information.

Using concat and withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Using concat and select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

With both approaches you will have a NEW_COLUMN which value is a concatenation of the columns: COL1 and COL2 from your original df.

Solution 4 - Sql

concat(*cols)

v1.5 and higher

Concatenates multiple input columns together into a single column. The function works with strings, binary and compatible array columns.

Eg: new_df = df.select(concat(df.a, df.b, df.c))


concat_ws(sep, *cols)

v1.5 and higher

Similar to concat but uses the specified separator.

Eg: new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat(*cols)

v2.4 and higher

Used to concat maps, returns the union of all the given maps.

Eg: new_df = df.select(map_concat("map1", "map2"))


Using concat operator (||):

v2.3 and higher

Eg: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

Reference: Spark sql doc

Solution 5 - Sql

If you want to do it using DF, you could use a udf to add a new column based on existing columns.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()

Solution 6 - Sql

From Spark 2.3(SPARK-22771) Spark SQL supports the concatenation operator ||.

For example;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")

Solution 7 - Sql

Here is another way of doing this for pyspark:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+

Solution 8 - Sql

Here is a suggestion for when you don't know the number or name of the columns in the Dataframe.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

Solution 9 - Sql

Do we have java syntax corresponding to below process

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

Solution 10 - Sql

In Spark 2.3.0, you may do:

spark.sql( """ select '1' || column_a from table_a """)

Solution 11 - Sql

In Java you can do this to concatenate multiple columns. The sample code is to provide you a scenario and how to use it for better understanding.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
						.withColumn("concatenatedCol",
								concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
	private static transient SparkSession instance = null;

	public static SparkSession getInstance(SparkConf sparkConf) {
		if (instance == null) {
			instance = SparkSession.builder().config(sparkConf)
					.getOrCreate();
		}
		return instance;
	}
}

The above code concatenated col1,col2,col3 seperated by "_" to create a column with name "concatenatedCol".

Solution 12 - Sql

In my case, I wanted a Pipe-'I' delimited row.

from pyspark.sql import functions as F
df.select(F.concat_ws('|','_c1','_c2','_c3','_c4')).show()

This worked well like a hot knife over butter.

Solution 13 - Sql

Another way to do it in pySpark using sqlContext...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))

Solution 14 - Sql

Indeed, there are some beautiful inbuilt abstractions for you to accomplish your concatenation without the need to implement a custom function. Since you mentioned Spark SQL, so I am guessing you are trying to pass it as a declarative command through spark.sql(). If so, you can accomplish in a straight forward manner passing SQL command like: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

Also, from Spark 2.3.0, you can use commands in lines with: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

Wherein, is your preferred delimiter (can be empty space as well) and is the temporary or permanent table you are trying to read from.

Solution 15 - Sql

We can simple use SelectExpr as well.

df1.selectExpr("*","upper(_2||_3) as new")

Solution 16 - Sql

use concat method like this:

Dataset<Row> DF2 = DF1
            .withColumn("NEW_COLUMN",concat(col("ADDR1"),col("ADDR2"),col("ADDR3"))).as("NEW_COLUMN")

Solution 17 - Sql

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Note: For this code to work you need to put the parentheses "()" in the "isNotNull" function. -> The correct one is "isNotNull()".

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull(), col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull(), col("COL2")).otherwise(lit("null"))))

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionNipunView Question on Stackoverflow
Solution 1 - Sqlzero323View Answer on Stackoverflow
Solution 2 - SqlmuonView Answer on Stackoverflow
Solution 3 - SqlIgnacio AlorreView Answer on Stackoverflow
Solution 4 - SqlAni MenonView Answer on Stackoverflow
Solution 5 - SqlDanish ShresthaView Answer on Stackoverflow
Solution 6 - SqlKrishasView Answer on Stackoverflow
Solution 7 - SqlTeddy BelayView Answer on Stackoverflow
Solution 8 - Sqlwones0120View Answer on Stackoverflow
Solution 9 - SqlRoopesh MBView Answer on Stackoverflow
Solution 10 - SqlCharlie 木匠View Answer on Stackoverflow
Solution 11 - SqlwandermonkView Answer on Stackoverflow
Solution 12 - Sqlvijayraj34View Answer on Stackoverflow
Solution 13 - SqlGurView Answer on Stackoverflow
Solution 14 - Sqluser11768920View Answer on Stackoverflow
Solution 15 - SqlDeepak SaxenaView Answer on Stackoverflow
Solution 16 - SqlDavoud MalekahmadiView Answer on Stackoverflow
Solution 17 - SqljoibonfimView Answer on Stackoverflow