Removing duplicate columns after a DF join in Spark

PythonApache SparkPysparkApache Spark-Sql

Python Problem Overview


When you join two DFs with similar column names:

df = df1.join(df2, df1['id'] == df2['id'])

Join works fine but you can't call the id column because it is ambiguous and you would get the following exception:

> pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, > could be: id#5691, id#5918.;"

This makes id not usable anymore...

The following function solves the problem:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

What I don't like about it is that I have to iterate over the column names and delete them why by one. This looks really clunky...

Do you know of any other solution that will either join and remove duplicates more elegantly or delete multiple columns without iterating over each of them?

Python Solutions


Solution 1 - Python

If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

Otherwise you need to give the join data frames alias and refer to the duplicated columns by the alias later:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

Solution 2 - Python

df.join(other, on, how) when on is a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns. when on is a join expression, it will result in duplicate columns. We can use .drop(df.a) to drop duplicate columns. Example:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)

Solution 3 - Python

Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'

I use the following two methods to remove duplicates:

Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you

a.join(b, 'id')

Method 2: Renaming the column before the join and dropping it after

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)


Solution 4 - Python

The code below works with Spark 1.6.0 and above.

salespeople_df.show()
+---+------+-----+
|Num|  Name|Store|
+---+------+-----+
|  1| Henry|  100|
|  2| Karen|  100|
|  3|  Paul|  101|
|  4| Jimmy|  102|
|  5|Janice|  103|
+---+------+-----+

storeaddress_df.show()
+-----+--------------------+
|Store|             Address|
+-----+--------------------+
|  100|    64 E Illinos Ave|
|  101|         74 Grand Pl|
|  102|          2298 Hwy 7|
|  103|No address available|
+-----+--------------------+

Assuming -in this example- that the name of the shared column is the same:

joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()

+-----+---+------+--------------------+
|Store|Num|  Name|             Address|
+-----+---+------+--------------------+
|  100|  1| Henry|    64 E Illinos Ave|
|  100|  2| Karen|    64 E Illinos Ave|
|  101|  3|  Paul|         74 Grand Pl|
|  102|  4| Jimmy|          2298 Hwy 7|
|  103|  5|Janice|No address available|
+-----+---+------+--------------------+

.join will prevent the duplication of the shared column.

Let's assume that you want to remove the column Num in this example, you can just use .drop('colname')

joined=joined.drop('Num')
joined.show()

+-----+------+--------------------+
|Store|  Name|             Address|
+-----+------+--------------------+
|  103|Janice|No address available|
|  100| Henry|    64 E Illinos Ave|
|  100| Karen|    64 E Illinos Ave|
|  101|  Paul|         74 Grand Pl|
|  102| Jimmy|          2298 Hwy 7|
+-----+------+--------------------+

Solution 5 - Python

After I've joined multiple tables together, I run them through a simple function to drop columns in the DF if it encounters duplicates while walking from left to right. Alternatively, you could rename these columns too.

Where Names is a table with columns ['Id', 'Name', 'DateId', 'Description'] and Dates is a table with columns ['Id', 'Date', 'Description'], the columns Id and Description will be duplicated after being joined.

Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")

Where dropDupeDfCols is defined as:

def dropDupeDfCols(df):
	newcols = []
	dupcols = []

	for i in range(len(df.columns)):
		if df.columns[i] not in newcols:
			newcols.append(df.columns[i])
		else:
			dupcols.append(i)

	df = df.toDF(*[str(i) for i in range(len(df.columns))])
	for dupcol in dupcols:
		df = df.drop(str(dupcol))

	return df.toDF(*newcols)

The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Date'].

Solution 6 - Python

In my case I had a dataframe with multiple duplicate columns after joins and I was trying to same that dataframe in csv format, but due to duplicate column I was getting error. I followed below steps to drop duplicate columns. Code is in scala

1) Rename all the duplicate columns and make new dataframe 2) make separate list for all the renamed columns 3) Make new dataframe with all columns (including renamed - step 1) 4) drop all the renamed column

private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
var allColumns:  mutable.MutableList[String] = mutable.MutableList()
val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
dataFrame.columns.foreach((i: String) =>{
if(allColumns.contains(i))

if(allColumns.contains(i))
{allColumns += "dup_" + i
dup_Columns += "dup_" +i
}else{
allColumns += i
}println(i)
})
val columnSeq = allColumns.toSeq
val df = dataFrame.toDF(columnSeq:_*)
val unDF = df.drop(dup_Columns:_*)
unDF
}

to call the above function use below code and pass your dataframe which contains duplicate columns

val uniColDF = removeDuplicateColumns(df)

Solution 7 - Python

If you join on a list or string, dup cols are automatically]1 removed This is a scala solution, you could translate the same idea into any language

// get a list of duplicate columns or use a list/seq 
// of columns you would like to join on (note that this list
// should include columns for which you do not want duplicates)
val duplicateCols = df1.columns.intersect(df2.columns) 

// no duplicate columns in resulting DF
df1.join(df2, duplicateCols.distinct.toSet)

Solution 8 - Python

Here is simple solution for remove duplicate column

final_result=df1.join(df2,(df1['subjectid']==df2['subjectid']),"left").drop(df1['subjectid'])

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionthecheechView Question on Stackoverflow
Solution 1 - PythonPsidomView Answer on Stackoverflow
Solution 2 - PythonjerrytimView Answer on Stackoverflow
Solution 3 - PythonHeapifyView Answer on Stackoverflow
Solution 4 - PythonhussamView Answer on Stackoverflow
Solution 5 - PythonQA CollectiveView Answer on Stackoverflow
Solution 6 - PythonSantosh KumarView Answer on Stackoverflow
Solution 7 - PythonAnthony AwuleyView Answer on Stackoverflow
Solution 8 - PythonSuresh KumarView Answer on Stackoverflow