How to loop through each row of dataFrame in pyspark

Apache SparkDataframeFor LoopPysparkApache Spark-Sql

Apache Spark Problem Overview


E.g

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

The above statement prints theentire table on terminal. But I want to access each row in that table using for or while to perform further calculations.

Apache Spark Solutions


Solution 1 - Apache Spark

You simply cannot. DataFrames, same as other distributed data structures, are not iterable and can be accessed using only dedicated higher order function and / or SQL methods.

You can of course collect

for row in df.rdd.collect():
    do_something(row)

or convert toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)

and iterate locally as shown above, but it beats all purpose of using Spark.

Solution 2 - Apache Spark

To "loop" and take advantage of Spark's parallel computation framework, you could define a custom function and use map.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

or

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe.

Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe.

sample3 = sample.withColumn('age2', sample.age + 2)

Solution 3 - Apache Spark

Using list comprehensions in python, you can collect an entire column of values into a list using just two lines:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

In the above example, we return a list of tables in database 'default', but the same can be adapted by replacing the query used in sql().

Or more abbreviated:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

And for your example of three columns, we can create a list of dictionaries, and then iterate through them in a for loop.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))

Solution 4 - Apache Spark

Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)

Solution 5 - Apache Spark

It might not be the best practice, but you can simply target a specific column using collect(), export it as a list of Rows, and loop through the list.

Assume this is your df:

+----------+----------+-------------------+-----------+-----------+------------------+ 
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+ 
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            | 
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

to loop through rows in Date column:

rows = df3.select('Date').collect()

final_list = []
for i in rows:
    final_list.append(i[0])

print(final_list)

Solution 6 - Apache Spark

If you want to do something to each row in a DataFrame object, use map. This will allow you to perform further calculations on each row. It's the equivalent of looping across the entire dataset from 0 to len(dataset)-1.

Note that this will return a PipelinedRDD, not a DataFrame.

Solution 7 - Apache Spark

above

tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 

should be

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]} 

for name, age, and city are not variables but simply keys of the dictionary.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionArti BerdeView Question on Stackoverflow
Solution 1 - Apache Sparkzero323View Answer on Stackoverflow
Solution 2 - Apache SparkDavidView Answer on Stackoverflow
Solution 3 - Apache SparkaaronsteersView Answer on Stackoverflow
Solution 4 - Apache SparkBala cseView Answer on Stackoverflow
Solution 5 - Apache SparkSRHView Answer on Stackoverflow
Solution 6 - Apache SparkKatya WillardView Answer on Stackoverflow
Solution 7 - Apache Sparkten2the6View Answer on Stackoverflow