This post lists down a few Spark SQLs that focus on creating and loading delta lake tables, following by a group of handy statements to enrich loaded data and explicitly specifying parsing options, as well as setting advance options to control data location, quality enforcement and partitioning.
Schemas and Tables
Create Managed Table
USE ${da.schema_name}_default_location;
CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT);
INSERT INTO managed_table
VALUES (3, 2, 1);
SELECT * FROM managed_table;
DROP TABLE managed_table;
Create External Table
USE ${da.schema_name}_default_location;
CREATE OR REPLACE TEMPORARY VIEW temp_delays USING CSV OPTIONS (
path = '${da.paths.datasets}/flights/departuredelays.csv',
header = "true",
mode = "FAILFAST" -- abort file parsing with a RuntimeException if any malformed lines are encountered
);
CREATE OR REPLACE TABLE external_table LOCATION '${da.paths.working_dir}/external_table' AS
SELECT * FROM temp_delays;
SELECT * FROM external_table;
Delta Tables
Create Table as Select (CTAS)
CREATE OR REPLACE TABLE sales AS
SELECT * FROM parquet.`${DA.paths.datasets}/ecommerce/raw/sales-historical`;
- CTAS statements automatically infer schema information from query results and do not support manual schema declaration.
- This means that CTAS statements are useful for external data ingestion from sources with well-defined schema, such as Parquet files and tables.
- CTAS statements also do not support specifying additional file options.
- We can see how this would present significant limitations when trying to ingest data from CSV files.
-- Creating TEMP VIEW helps to specify the parsing options when reading CSVs
CREATE OR REPLACE TEMP VIEW sales_tmp_vw
(order_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGER, purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING)
USING CSV
OPTIONS (
path = "${da.paths.datasets}/ecommerce/raw/sales-csv",
header = "true",
delimiter = "|"
);
CREATE TABLE sales_delta AS
SELECT * FROM sales_tmp_vw;
SELECT * FROM sales_delta
Filter and Rename Columns from Existing Tables/Views
CREATE OR REPLACE TABLE/VIEW purchases AS
SELECT order_id AS id, transaction_timestamp, purchase_revenue_in_usd AS price
FROM sales;
SELECT * FROM purchases
Declare Schema with Generated Columns
CREATE OR REPLACE TABLE purchase_dates (
id STRING,
transaction_timestamp STRING,
price STRING,
date DATE GENERATED ALWAYS AS (
cast(cast(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE))
COMMENT "generated based on `transactions_timestamp` column")
SET spark.databricks.delta.schema.autoMerge.enabled=true;
MERGE INTO purchase_dates a
USING purchases b
ON a.id = b.id
WHEN NOT MATCHED THEN
INSERT *
Add Constraint
ALTER TABLE purchase_dates ADD CONSTRAINT valid_date CHECK (date > '2020-01-01');
Enrich Tables with Additional Optionals and Metadata
CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
LOCATION "${da.paths.working_dir}/tmp/users_pii"
PARTITIONED BY (first_touch_date)
AS
SELECT *,
cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date,
current_timestamp() updated,
input_file_name() source_file
FROM parquet.`${da.paths.datasets}/ecommerce/raw/users-historical/`;
SELECT * FROM users_pii;
- A
LOCATION
is specified, which will result in an external (rather than managed) table.
NOTE: Partitioning is shown here primarily to demonstrate syntax and impact. Most Delta Lake tables (especially small-to-medium sized data) will not benefit from partitioning. Because partitioning physically separates data files, this approach can result in a small files problem and prevent file compaction and efficient data skipping. The benefits observed in Hive or HDFS do not translate to Delta Lake, and you should consult with an experienced Delta Lake architect before partitioning tables.
As a best practice, you should default to non-partitioned tables for most use cases when working with Delta Lake.
Clone Delta Lake Tables
CREATE OR REPLACE TABLE purchases_clone
DEEP CLONE purchases
CREATE OR REPLACE TABLE purchases_shallow_clone
SHALLOW CLONE purchases
DEEP CLONE
fully copies data and metadata from a source table to a target. This copy occurs incrementally, so executing this command again can sync changes from the source to the target location.- If you wish to create a copy of a table quickly to test out applying changes without the risk of modifying the current table,
SHALLOW CLONE
can be a good option. Shallow clones just copy the Delta transaction logs, meaning that the data doesn't move.
Complete Overwrite
CREATE OR REPLACE TABLE
(CRAS) fully replace the contents of a table each time they executes
INSERT OVERWRITE
provides identical outcome from CRAS but it can 1) only overwrite existing table, not creating new table 2). only overwrite with new records that match current table schema (not affecting downstream consumers). 3) can overwrite individual partitions
CREATE OR REPLACE TABLE events AS
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/events-historical`
INSERT OVERWRITE sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-historical/`
Whereas a CRAS statement will allow us to completely redefine the contents of our target table, INSERT OVERWRITE
will fail if we try to change our schema (unless we provide optional settings)
Append Rows
We can use INSERT INTO
to atomically append new rows to an existing Delta table. This allows for incremental updates to existing tables, which is much more efficient than overwriting each time.
INSERT INTO sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-30m`
Note that INSERT INTO
does not have any built-in guarantees to prevent inserting the same records multiple times. Re-executing the above cell would write the same records to the target table, resulting in duplicate records.
Upsert/Merge Updates
MERGE INTO target a
USING source b
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}
The benefit of using MERGE
: 1). updates, inserts and deletes are completed as a single transactions, 2) multiple conditions can be added to matching fields. 3) provides extensive options for implementing custom logic
-- Below, we'll only update records if the current row has a **`NULL`** email
-- and the new row does not.
-- All unmatched records from the new batch will be inserted.
MERGE INTO users a
USING users_update b
ON a.user_id = b.user_id
WHEN MATCHED AND a.email IS NULL AND b.email IS NOT NULL THEN
UPDATE SET email = b.email, updated = b.updated
WHEN NOT MATCHED THEN INSERT *
Insert Only Merge for Deduplication
A common ETL use case is to collect logs into a Delta table through append operations.
Many source systems can generate duplicate records. With merge, you can avoid inserting the duplicate records by performing an insert-only merge.
MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN
INSERT *
Load Incrementally
COPY INTO
provides SQL engineers an idmpotent option to incrementally ingest data from external systems. This is a re-triable and idempotent operation; files in the source location that have already been loaded are skipped.
COPY INTO sales
FROM "${da.paths.datasets}/ecommerce/raw/sales-30m"
FILEFORMAT = PARQUET
Compacting Small Files and Indexing
OPTIMIZE
will replace existing data files by combining records and rewriting the results.
When execute OPTIMIZE
, users can optimally specify one or serval fields for ZORDER
indexing, it speeds up data retrieval when filtering on provided fields by colocating data with similar values within data files.
OPTIMIZE students
ZORDER BY id
Rollback Versions
RESTORE
can rollback to previous state of the data in case you accidentally delete table or data. This commands still records as a transaction in the log.
Cleaning Up Stale Files
Databricks will automatically clean up stale log files (> 30 days by default) in Delta Lake tables. Each time a checkpoint is written, Databricks automatically cleans up log entries older than this retention interval.
While Delta Lake versioning and time travel are great for querying recent versions and rolling back queries, keeping the data files for all versions of large production tables around indefinitely is very expensive (and can lead to compliance issues if PII is present).
If you wish to manually purge old data files, this can be performed with the VACUUM
operation.
-- The following cell and execute it with a retention of 0 HOURS to
-- keep only the current version:
VACUUM students RETAIN 0 HOURS
By default, VACUUM
will prevent you from deleting files less than 7 days old, just to ensure that no long-running operations are still referencing any of the files to be deleted. If you run VACUUM
on a Delta table, you lose the ability time travel back to a version older than the specified data retention period. In our demos, you may see Databricks executing code that specifies a retention of 0 HOURS
. This is simply to demonstrate the feature and is not typically done in production.
-- Turn off a check to prevent premature deletion of data files
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
-- Make sure that logging of VACUUM commands is enabled
SET spark.databricks.delta.vacuum.logging.enabled = true;
-- 3. Use the **`DRY RUN`** version of vacuum to print out all records to be deleted
VACUUM students RETAIN 0 HOURS DRY RUN
Reference
- Databricks Academy Notebook: DE 3.2 – Set Up Delta Tables, DE 3.3 – Load Data into Delta Lake, DE 3.5 – Version and Optimize Delta Tables