Pyspark: Parse a column of json strings
PythonJsonApache SparkPysparkPython Problem Overview
I have a pyspark dataframe consisting of one column, called json
, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
I've tried mapping over each row with json.loads
:
(df
.select('json')
.rdd
.map(lambda x: json.loads(x))
.toDF()
).show()
But this returns a TypeError: expected string or buffer
I suspect that part of the problem is that when converting from a dataframe
to an rdd
, the schema information is lost, so I've also tried manually entering in the schema info:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
But I get the same TypeError
.
Looking at this answer, it looks like flattening out the rows with flatMap
might be useful here, but I'm not having success with that either:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
I get this error: AttributeError: 'unicode' object has no attribute 'get'
.
Python Solutions
Solution 1 - Python
For Spark 2.1+, you can use from_json
which allows the preservation of the other non-json columns within the dataframe as follows:
from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))
You let Spark derive the schema of the json string column. Then the df.json
column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType
and all the other columns of df
are preserved as-is.
You can access the json content as follows:
df.select(col('json.header').alias('header'))
Solution 2 - Python
Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)
For example:
>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
|-- body: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- sub_json: struct (nullable = true)
| | |-- id: long (nullable = true)
| | |-- sub_sub_json: struct (nullable = true)
| | | |-- col1: long (nullable = true)
| | | |-- col2: string (nullable = true)
|-- header: struct (nullable = true)
| |-- foo: string (nullable = true)
| |-- id: long (nullable = true)
Solution 3 - Python
Existing answers do not work if your JSON is anything but perfectly/traditionally formatted. For example, the RDD-based schema inference expects JSON in curly-braces {}
and will provide an incorrect schema (resulting in null
values) if, for example, your data looks like:
[ { "a": 1.0, "b": 1 }, { "a": 0.0, "b": 2 }]
I wrote a function to work around this issue by sanitizing JSON such that it lives in another JSON object:
def parseJSONCols(df, *cols, sanitize=True):
"""Auto infer the schema of a json column and parse into a struct.
rdd-based schema inference works if you have well-formatted JSON,
like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
can fix everything by wrapping the data in another JSON object
(``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
automatically performs the wrapping and unwrapping.
The schema inference is based on this
`SO Post <https://stackoverflow.com/a/45880574)/>`_.
Parameters
----------
df : pyspark dataframe
Dataframe containing the JSON cols.
*cols : string(s)
Names of the columns containing JSON.
sanitize : boolean
Flag indicating whether you'd like to sanitize your records
by wrapping and unwrapping them in another JSON object layer.
Returns
-------
pyspark dataframe
A dataframe with the decoded columns.
"""
res = df
for i in cols:
# sanitize if requested.
if sanitize:
res = (
res.withColumn(
i,
psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
)
)
# infer schema and apply it
schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
res = res.withColumn(i, psf.from_json(psf.col(i), schema))
# unpack the wrapped object if needed
if sanitize:
res = res.withColumn(i, psf.col(i).data)
return res
Note: psf
= pyspark.sql.functions
.
Solution 4 - Python
Here's a concise (spark SQL) version of @nolan-conaway's parseJSONCols
function.
SELECT
explode(
from_json(
concat('{"data":',
'[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]',
'}'),
'data array<struct<a:DOUBLE, b:INT>>'
).data) as data;
> PS. I've added the explode function as well :P
You'll need to know some HIVE SQL types
Solution 5 - Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
def map2json(dict):
import json
return json.dumps(dict)
from pyspark.sql.types import StringType
spark.udf.register("map2json", lambda dict: map2json(dict), StringType())
spark.sql("select map2json(map('a', '1'))").show()
Solution 6 - Python
If you don't know the schema of each JSON (and it can be different) you can use :
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# ... here you get your DF
# Assuming the first column of your DF is the JSON to parse
my_df = spark.read.json(my_df.rdd.map(lambda x: x[0]))
Note that it won't keep any other column present in your dataset. From : https://github.com/apache/spark/pull/22775