The data looks like this –
+-----------+-----------+-----------------------------+
| id| point| data|
+-----------------------------------------------------+
| abc| 6|{"key1":"124", "key2": "345"}|
| dfl| 7|{"key1":"777", "key2": "888"}|
| 4bd| 6|{"key1":"111", "key2": "788"}|
I am trying to break it into the following format.
+-----------+-----------+-----------+-----------+ | id| point| key1| key2| +------------------------------------------------ | abc| 6| 124| 345| | dfl| 7| 777| 888| | 4bd| 6| 111| 788|
The explode function explodes the dataframe into multiple rows. But that is not the desired solution.
Note: This solution does not answers my questions.
PySpark “explode” dict in column
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
As long as you are using Spark version 2.1 or higher, pyspark.sql.functions.from_json should get you your desired result, but you would need to first define the required schema
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('key1', StringType(), True),
StructField('key2', StringType(), True)
]
)
df.withColumn("data", from_json("data", schema))
.select(col('id'), col('point'), col('data.*'))
.show()
which should give you
+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc| 6| 124| 345|
|df1| 7| 777| 888|
|4bd| 6| 111| 788|
+---+-----+----+----+
Method 2
As suggested by @pault, the data field is a string field. since the keys are the same (i.e. ‘key1’, ‘key2’) in the JSON string over rows, you might also use json_tuple() (this function is New in version 1.6 based on the documentation)
from pyspark.sql import functions as F
df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()
Below is My original post: which is most likely WRONG if the original table is from df.show(truncate=False) and thus the data field is NOT a python data structure.
Since you have exploded the data into rows, I supposed the column data is a Python data structure instead of a string:
from pyspark.sql import functions as F
df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()
Method 3
As mentioned by @jxc, json_tuple should work fine if you were not able to define the schema beforehand and you only needed to deal with a single level of json string. I think it’s more straight forward and easier to use. Strangely, I didn’t find anyone else mention this function before.
In my use case, original dataframe schema: StructType(List(StructField(a,StringType,true))), json string column shown as:
+---------------------------------------+
|a |
+---------------------------------------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
|{"k1": "v11", "k3": "v33"} |
|{"k1": "v13", "k2": "23"} |
+---------------------------------------+
Expand json fields into new columns with json_tuple:
from pyspark.sql import functions as F
df = df.select(F.col('a'),
F.json_tuple(F.col('a'), 'k1', 'k2', 'k3')
.alias('k1', 'k2', 'k3'))
df.schema
df.show(truncate=False)
The document doesn’t say much about it, but at least in my use case, new columns extracted by json_tuple are StringType, and it only extract single depth of JSON string.
StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))
+---------------------------------------+---+----+-------+
|a |k1 |k2 |k3 |
+---------------------------------------+---+----+-------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2 |{"m":1}|
|{"k1": "v11", "k3": "v33"} |v11|null|v33 |
|{"k1": "v13", "k2": "23"} |v13|23 |null |
+---------------------------------------+---+----+-------+
Method 4
This works for my use case
data1 = spark.read.parquet(path)
json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
data2 = data1.withColumn("data", from_json("json_col", json_schema))
col1 = data2.columns
col1.remove("data")
col2 = data2.select("data.*").columns
append_str ="data."
col3 = [append_str + val for val in col2]
col_list = col1 + col3
data3 = data2.select(*col_list).drop("json_col")
Method 5
All credits to Shrikant Prabhu
You can simply use SQL
SELECT id, point, data.* FROM original_table
Like this the schema of the new table will adapt if the data changes and you won’t have to do anything in your pipelin.
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0