Transforming complex data structures efficiently is a pivotal aspect of modern data processing. In this guide, we explore how to leverage SQL and Python in Databricks notebooks to work with nested data, derive JSON schemas, manipulate arrays, and reshape datasets, enabling streamlined data transformations for advanced analytics and insights.
-
Work with Nested Data
- Spark SQL has built-in functionality to directly interact with nested data stored as JSON strings or struct types.
- Use
:
syntax to access subfields in JSON string - Use
.
syntax to access subfields in struct types
- Use
-- Prepare Dataset CREATE OR REPLACE TEMP VIEW events_strings AS SELECT string(key), string(value) FROM events_raw; SELECT * FROM events_strings WHERE value:event_name = "finalize" ORDER BY key LIMIT 1
event_strings sample entries
%python from pyspark.sql.functions import col events_stringsDF = (spark .table("events_raw") .select(col("key").cast("string"), col("value").cast("string")) ) display(events_stringsDF) %python display(events_stringsDF .where("value:event_name = 'finalize'") .orderBy("key") .limit(1) )
- Spark SQL has built-in functionality to directly interact with nested data stored as JSON strings or struct types.
-
Derive JSON Schema from the Given Data Entries
schema_of_json()
returns the schema derived from an example JSON string.from_json()
parses a column containing a JSON string into a struct type using the specified schema.
-- content can be a JSON payload from any row in the table SELECT **schema_of_json**('{"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1075.5,"total_item_quantity":1,"unique_items":1},"event_name":"finalize","event_previous_timestamp":1593879231210816,"event_timestamp":1593879335779563,"geo":{"city":"Houston","state":"TX"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_K","item_name":"Standard King Mattress","item_revenue_in_usd":1075.5,"price_in_usd":1195.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593454417513109,"user_id":"UA000000106116176"}') AS schema -- Result: STRUCT<device: STRING, ecommerce: STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name: STRING, event_previous_timestamp: BIGINT, event_timestamp: BIGINT, geo: STRUCT<city: STRING, state: STRING>, items: ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source: STRING, user_first_touch_timestamp: BIGINT, user_id: STRING> CREATE OR REPLACE TEMP VIEW parsed_events AS SELECT json.* FROM ( SELECT **from_json**(value, 'STRUCT<device: STRING, ecommerce: STRUCT<purchase_revenue_in_usd: DOUBLE, total_item_quantity: BIGINT, unique_items: BIGINT>, event_name: STRING, event_previous_timestamp: BIGINT, event_timestamp: BIGINT, geo: STRUCT<city: STRING, state: STRING>, items: ARRAY<STRUCT<coupon: STRING, item_id: STRING, item_name: STRING, item_revenue_in_usd: DOUBLE, price_in_usd: DOUBLE, quantity: BIGINT>>, traffic_source: STRING, user_first_touch_timestamp: BIGINT, user_id: STRING>') AS json FROM events_strings); SELECT * FROM parsed_events
result of the parsed_events
%python from pyspark.sql.functions import from_json, schema_of_json json_string = """ {"device":"Linux","ecommerce":{"purchase_revenue_in_usd":1047.6,"total_item_quantity":2,"unique_items":2},"event_name":"finalize","event_previous_timestamp":1593879787820475,"event_timestamp":1593879948830076,"geo":{"city":"Huntington Park","state":"CA"},"items":[{"coupon":"NEWBED10","item_id":"M_STAN_Q","item_name":"Standard Queen Mattress","item_revenue_in_usd":940.5,"price_in_usd":1045.0,"quantity":1},{"coupon":"NEWBED10","item_id":"P_DOWN_S","item_name":"Standard Down Pillow","item_revenue_in_usd":107.10000000000001,"price_in_usd":119.0,"quantity":1}],"traffic_source":"email","user_first_touch_timestamp":1593583891412316,"user_id":"UA000000106459577"} """ parsed_eventsDF = (events_stringsDF .select(from_json("value", schema_of_json(json_string)).alias("json")) .select("json.*") ) display(parsed_eventsDF)
-
Manipulate Arrays
explode()
separates the elements of an array into multiple rows; this creates a new row for each element.size()
provides a count for the number of elements in an array for each row.
CREATE OR REPLACE TEMP VIEW exploded_events AS SELECT *, explode(items) AS item FROM parsed_events; SELECT * FROM exploded_events WHERE size(items) > 2
%python from pyspark.sql.functions import explode, size exploded_eventsDF = (parsed_eventsDF .withColumn("item", explode("items")) ) display(exploded_eventsDF.where(size("items") > 2))
collect_set()
collects unique values for a field, including fields within arrays.flatten()
combines multiple arrays into a single array.array_distinct()
removes duplicate elements from an array.
SELECT user_id, collect_set(event_name) AS event_history, array_distinct(flatten(collect_set(items.item_id))) AS cart_history FROM exploded_events GROUP BY user_id
%python from pyspark.sql.functions import array_distinct, collect_set, flatten display(exploded_eventsDF .groupby("user_id") .agg(collect_set("event_name").alias("event_history"), array_distinct(flatten(collect_set("items.item_id"))).alias("cart_history")) )
-
Combine and Reshape Data
-
Spark SQL supports standard
JOIN
operations (inner, outer, left, right, anti, cross, semi).CREATE OR REPLACE TEMP VIEW item_purchases AS SELECT * FROM (SELECT *, explode(items) AS item FROM sales) a INNER JOIN item_lookup b ON a.item.item_id = b.item_id; SELECT * FROM item_purchases
%python exploded_salesDF = (spark .table("sales") .withColumn("item", explode("items")) ) itemsDF = spark.table("item_lookup") item_purchasesDF = (exploded_salesDF .join(itemsDF, exploded_salesDF.item.item_id == itemsDF.item_id) ) display(item_purchasesDF)
-
The
PIVOT
clause follows the table name or subquery specified in aFROM
clause, which is the input for the pivot table.
SELECT * FROM item_purchases PIVOT ( sum(item.quantity) FOR item_id IN ( 'P_FOAM_K', 'M_STAN_Q', 'P_FOAM_S', 'M_PREM_Q', 'M_STAN_F', 'M_STAN_T', 'M_PREM_K', 'M_PREM_F', 'M_STAN_K', 'M_PREM_T', 'P_DOWN_S', 'P_DOWN_K') )
%python transactionsDF = (item_purchasesDF .groupBy("order_id", "email", "transaction_timestamp", "total_item_quantity", "purchase_revenue_in_usd", "unique_items", "items", "item", "name", "price") .pivot("item_id") .sum("item.quantity") ) display(transactionsDF)
-
These SQL and Python transformations in Databricks provide powerful tools to handle complex data structures and reshape them for analysis and further processing.