how to filter out a null value from spark dataframe
ScalaApache SparkApache Spark-SqlSpark DataframeScala Problem Overview
I created a dataframe in spark with the following schema:
root
|-- user_id: long (nullable = false)
|-- event_id: long (nullable = false)
|-- invited: integer (nullable = false)
|-- day_diff: long (nullable = true)
|-- interested: integer (nullable = false)
|-- event_owner: long (nullable = false)
|-- friend_id: long (nullable = false)
And the data is shown below:
+----------+----------+-------+--------+----------+-----------+---------+
| user_id| event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
| 4236494| 110357109| 0| -1| 0| 937597069| null|
| 78065188| 498404626| 0| 0| 0| 2904922087| null|
| 282487230|2520855981| 0| 28| 0| 3749735525| null|
| 335269852|1641491432| 0| 2| 0| 1490350911| null|
| 437050836|1238456614| 0| 2| 0| 991277599| null|
| 447244169|2095085551| 0| -1| 0| 1579858878| null|
| 516353916|1076364848| 0| 3| 1| 3597645735| null|
| 528218683|1151525474| 0| 1| 0| 3433080956| null|
| 531967718|3632072502| 0| 1| 0| 3863085861| null|
| 627948360|2823119321| 0| 0| 0| 4092665803| null|
| 811791433|3513954032| 0| 2| 0| 415464198| null|
| 830686203| 99027353| 0| 0| 0| 3549822604| null|
|1008893291|1115453150| 0| 2| 0| 2245155244| null|
|1239364869|2824096896| 0| 2| 1| 2579294650| null|
|1287950172|1076364848| 0| 0| 0| 3597645735| null|
|1345896548|2658555390| 0| 1| 0| 2025118823| null|
|1354205322|2564682277| 0| 3| 0| 2563033185| null|
|1408344828|1255629030| 0| -1| 1| 804901063| null|
|1452633375|1334001859| 0| 4| 0| 1488588320| null|
|1625052108|3297535757| 0| 3| 0| 1972598895| null|
+----------+----------+-------+--------+----------+-----------+---------+
I want to filter out the rows have null values in the field of "friend_id".
scala> val aaa = test.filter("friend_id is null")
scala> aaa.count
I got :res52: Long = 0 which is obvious not right. What is the right way to get it?
One more question, I want to replace the values in the friend_id field. I want to replace null with 0 and 1 for any other value except null. The code I can figure out is:
val aaa = train_friend_join.select($"user_id", $"event_id", $"invited", $"day_diff", $"interested", $"event_owner", ($"friend_id" != null)?1:0)
This code also doesn't work. Can anyone tell me how can I fix it? Thanks
Scala Solutions
Solution 1 - Scala
Let's say you have this data setup (so that results are reproducible):
// declaring data types
case class Company(cName: String, cId: String, details: String)
case class Employee(name: String, id: String, email: String, company: Company)
// setting up example data
val e1 = Employee("n1", null, "[email protected]", Company("c1", "1", "d1"))
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1"))
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1"))
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2"))
val e5 = Employee("n5", null, "[email protected]", Company("c2", "2", "d2"))
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2"))
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3"))
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3"))
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8)
val df = sc.parallelize(employees).toDF
Data is:
+----+----+---------+---------+
|name| id| email| company|
+----+----+---------+---------+
| n1|null|n1@c1.com|[c1,1,d1]|
| n2| 2|n2@c1.com|[c1,1,d1]|
| n3| 3|n3@c1.com|[c1,1,d1]|
| n4| 4|n4@c2.com|[c2,2,d2]|
| n5|null|n5@c2.com|[c2,2,d2]|
| n6| 6|n6@c2.com|[c2,2,d2]|
| n7| 7|n7@c3.com|[c3,3,d3]|
| n8| 8|n8@c3.com|[c3,3,d3]|
+----+----+---------+---------+
Now to filter employees with null
ids, you will do --
df.filter("id is null").show
which will correctly show you following:
+----+----+---------+---------+
|name| id| email| company|
+----+----+---------+---------+
| n1|null|n1@c1.com|[c1,1,d1]|
| n5|null|n5@c2.com|[c2,2,d2]|
+----+----+---------+---------+
Coming to the second part of your question, you can replace the null
ids with 0 and other values with 1 with this --
df.withColumn("id", when($"id".isNull, 0).otherwise(1)).show
This results in:
+----+---+---------+---------+
|name| id| email| company|
+----+---+---------+---------+
| n1| 0|n1@c1.com|[c1,1,d1]|
| n2| 1|n2@c1.com|[c1,1,d1]|
| n3| 1|n3@c1.com|[c1,1,d1]|
| n4| 1|n4@c2.com|[c2,2,d2]|
| n5| 0|n5@c2.com|[c2,2,d2]|
| n6| 1|n6@c2.com|[c2,2,d2]|
| n7| 1|n7@c3.com|[c3,3,d3]|
| n8| 1|n8@c3.com|[c3,3,d3]|
+----+---+---------+---------+
Solution 2 - Scala
Or like df.filter($"friend_id".isNotNull)
Solution 3 - Scala
df.where(df.col("friend_id").isNull)
Solution 4 - Scala
There are two ways to do it: creating filter condition 1) Manually 2) Dynamically.
Sample DataFrame:
val df = spark.createDataFrame(Seq(
(0, "a1", "b1", "c1", "d1"),
(1, "a2", "b2", "c2", "d2"),
(2, "a3", "b3", null, "d3"),
(3, "a4", null, "c4", "d4"),
(4, null, "b5", "c5", "d5")
)).toDF("id", "col1", "col2", "col3", "col4")
+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
| 0| a1| b1| c1| d1|
| 1| a2| b2| c2| d2|
| 2| a3| b3|null| d3|
| 3| a4|null| c4| d4|
| 4|null| b5| c5| d5|
+---+----+----+----+----+
1) Creating filter condition manually i.e. using DataFrame where
or filter
function
df.filter(col("col1").isNotNull && col("col2").isNotNull).show
or
df.where("col1 is not null and col2 is not null").show
Result:
+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
| 0| a1| b1| c1| d1|
| 1| a2| b2| c2| d2|
| 2| a3| b3|null| d3|
+---+----+----+----+----+
2) Creating filter condition dynamically: This is useful when we don't want any column to have null value and there are large number of columns, which is mostly the case.
To create the filter condition manually in these cases will waste a lot of time. In below code we are including all columns dynamically using map
and reduce
function on DataFrame columns:
val filterCond = df.columns.map(x=>col(x).isNotNull).reduce(_ && _)
How filterCond
looks:
filterCond: org.apache.spark.sql.Column = (((((id IS NOT NULL) AND (col1 IS NOT NULL)) AND (col2 IS NOT NULL)) AND (col3 IS NOT NULL)) AND (col4 IS NOT NULL))
Filtering:
val filteredDf = df.filter(filterCond)
Result:
+---+----+----+----+----+
| id|col1|col2|col3|col4|
+---+----+----+----+----+
| 0| a1| b1| c1| d1|
| 1| a2| b2| c2| d2|
+---+----+----+----+----+
Solution 5 - Scala
A good solution for me was to drop the rows with any null values:
Dataset<Row> filtered = df.filter(row => !row.anyNull);
In case one is interested in the other case, just call row.anyNull
.
(Spark 2.1.0 using Java API)
Solution 6 - Scala
The following lines work well:
test.filter("friend_id is not null")
Solution 7 - Scala
From the hint from Michael Kopaniov, below works
df.where(df("id").isNotNull).show
Solution 8 - Scala
Here is a solution for spark in Java. To select data rows containing nulls. When you have Dataset
Dataset<Row> containingNulls = data.where(data.col("COLUMN_NAME").isNull())
To filter out data without nulls you do:
Dataset<Row> withoutNulls = data.where(data.col("COLUMN_NAME").isNotNull())
Often dataframes contain columns of type String where instead of nulls we have empty strings like "". To filter out such data as well we do:
Dataset<Row> withoutNullsAndEmpty = data.where(data.col("COLUMN_NAME").isNotNull().and(data.col("COLUMN_NAME").notEqual("")))
Solution 9 - Scala
for the first question, it is correct you are filtering out nulls and hence count is zero.
for the second replacing: use like below:
val options = Map("path" -> "...\\ex.csv", "header" -> "true")
val dfNull = spark.sqlContext.load("com.databricks.spark.csv", options)
scala> dfNull.show
+----------+----------+-------+--------+----------+-----------+---------+
| user_id| event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
| 4236494| 110357109| 0| -1| 0| 937597069| null|
| 78065188| 498404626| 0| 0| 0| 2904922087| null|
| 282487230|2520855981| 0| 28| 0| 3749735525| null|
| 335269852|1641491432| 0| 2| 0| 1490350911| null|
| 437050836|1238456614| 0| 2| 0| 991277599| null|
| 447244169|2095085551| 0| -1| 0| 1579858878| a|
| 516353916|1076364848| 0| 3| 1| 3597645735| b|
| 528218683|1151525474| 0| 1| 0| 3433080956| c|
| 531967718|3632072502| 0| 1| 0| 3863085861| null|
| 627948360|2823119321| 0| 0| 0| 4092665803| null|
| 811791433|3513954032| 0| 2| 0| 415464198| null|
| 830686203| 99027353| 0| 0| 0| 3549822604| null|
|1008893291|1115453150| 0| 2| 0| 2245155244| null|
|1239364869|2824096896| 0| 2| 1| 2579294650| d|
|1287950172|1076364848| 0| 0| 0| 3597645735| null|
|1345896548|2658555390| 0| 1| 0| 2025118823| null|
|1354205322|2564682277| 0| 3| 0| 2563033185| null|
|1408344828|1255629030| 0| -1| 1| 804901063| null|
|1452633375|1334001859| 0| 4| 0| 1488588320| null|
|1625052108|3297535757| 0| 3| 0| 1972598895| null|
+----------+----------+-------+--------+----------+-----------+---------+
dfNull.withColumn("friend_idTmp", when($"friend_id".isNull, "1").otherwise("0")).drop($"friend_id").withColumnRenamed("friend_idTmp", "friend_id").show
+----------+----------+-------+--------+----------+-----------+---------+
| user_id| event_id|invited|day_diff|interested|event_owner|friend_id|
+----------+----------+-------+--------+----------+-----------+---------+
| 4236494| 110357109| 0| -1| 0| 937597069| 1|
| 78065188| 498404626| 0| 0| 0| 2904922087| 1|
| 282487230|2520855981| 0| 28| 0| 3749735525| 1|
| 335269852|1641491432| 0| 2| 0| 1490350911| 1|
| 437050836|1238456614| 0| 2| 0| 991277599| 1|
| 447244169|2095085551| 0| -1| 0| 1579858878| 0|
| 516353916|1076364848| 0| 3| 1| 3597645735| 0|
| 528218683|1151525474| 0| 1| 0| 3433080956| 0|
| 531967718|3632072502| 0| 1| 0| 3863085861| 1|
| 627948360|2823119321| 0| 0| 0| 4092665803| 1|
| 811791433|3513954032| 0| 2| 0| 415464198| 1|
| 830686203| 99027353| 0| 0| 0| 3549822604| 1|
|1008893291|1115453150| 0| 2| 0| 2245155244| 1|
|1239364869|2824096896| 0| 2| 1| 2579294650| 0|
|1287950172|1076364848| 0| 0| 0| 3597645735| 1|
|1345896548|2658555390| 0| 1| 0| 2025118823| 1|
|1354205322|2564682277| 0| 3| 0| 2563033185| 1|
|1408344828|1255629030| 0| -1| 1| 804901063| 1|
|1452633375|1334001859| 0| 4| 0| 1488588320| 1|
|1625052108|3297535757| 0| 3| 0| 1972598895| 1|
+----------+----------+-------+--------+----------+-----------+---------+
Solution 10 - Scala
Another easy way to filter out null values from multiple columns in spark dataframe. Please pay attention there is AND between columns.
df.filter(" COALESCE(col1, col2, col3, col4, col5, col6) IS NOT NULL")
If you need to filter out rows that contain any null (OR connected) please use
df.na.drop()
Solution 11 - Scala
val df = Seq(
("1001", "1007"),
("1002", null),
("1003", "1005"),
(null, "1006")
).toDF("user_id", "friend_id")
Data is:
+-------+---------+
|user_id|friend_id|
+-------+---------+
| 1001| 1007|
| 1002| null|
| 1003| 1005|
| null| 1006|
+-------+---------+
Drop rows containing any null or NaN values in the specified columns of the Seq:
df.na.drop(Seq("friend_id"))
.show()
Output:
+-------+---------+
|user_id|friend_id|
+-------+---------+
| 1001| 1007|
| 1003| 1005|
| null| 1006|
+-------+---------+
If do not specify columns, drop row as long as any column of a row contains null or NaN values:
df.na.drop()
.show()
Output:
+-------+---------+
|user_id|friend_id|
+-------+---------+
| 1001| 1007|
| 1003| 1005|
+-------+---------+
Solution 12 - Scala
I use the following code to solve my question. It works. But as we all know, I work around a country's mile to solve it. So, is there a short cut for that? Thanks
def filter_null(field : Any) : Int = field match {
case null => 0
case _ => 1
}
val test = train_event_join.join(
user_friends_pair,
train_event_join("user_id") === user_friends_pair("user_id") &&
train_event_join("event_owner") === user_friends_pair("friend_id"),
"left"
).select(
train_event_join("user_id"),
train_event_join("event_id"),
train_event_join("invited"),
train_event_join("day_diff"),
train_event_join("interested"),
train_event_join("event_owner"),
user_friends_pair("friend_id")
).rdd.map{
line => (
line(0).toString.toLong,
line(1).toString.toLong,
line(2).toString.toLong,
line(3).toString.toLong,
line(4).toString.toLong,
line(5).toString.toLong,
filter_null(line(6))
)
}.toDF("user_id", "event_id", "invited", "day_diff", "interested", "event_owner", "creator_is_friend")