Delta Lake DDL Cheat Sheet

This post lists down a few Spark SQLs that focus on creating and loading delta lake tables, following by a group of handy statements to enrich loaded data and explicitly specifying parsing options, as well as setting advance options to control data location, quality enforcement and partitioning.

Schemas and Tables

Create Managed Table

USE ${da.schema_name}_default_location;

CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT);
INSERT INTO managed_table 
VALUES (3, 2, 1);

SELECT * FROM managed_table;
DROP TABLE managed_table;

Create External Table

USE ${da.schema_name}_default_location;

CREATE OR REPLACE TEMPORARY VIEW temp_delays USING CSV OPTIONS (
  path = '${da.paths.datasets}/flights/departuredelays.csv',
  header = "true",
  mode = "FAILFAST" -- abort file parsing with a RuntimeException if any malformed lines are encountered
);
CREATE OR REPLACE TABLE external_table LOCATION '${da.paths.working_dir}/external_table' AS
  SELECT * FROM temp_delays;

SELECT * FROM external_table;

Delta Tables

Create Table as Select (CTAS)

CREATE OR REPLACE TABLE sales AS
SELECT * FROM parquet.`${DA.paths.datasets}/ecommerce/raw/sales-historical`;
  • CTAS statements automatically infer schema information from query results and do not support manual schema declaration.
  • This means that CTAS statements are useful for external data ingestion from sources with well-defined schema, such as Parquet files and tables.
  • CTAS statements also do not support specifying additional file options.
  • We can see how this would present significant limitations when trying to ingest data from CSV files.
-- Creating TEMP VIEW helps to specify the parsing options when reading CSVs
CREATE OR REPLACE TEMP VIEW sales_tmp_vw
  (order_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGER, purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING)
USING CSV
OPTIONS (
  path = "${da.paths.datasets}/ecommerce/raw/sales-csv",
  header = "true",
  delimiter = "|"
);

CREATE TABLE sales_delta AS
  SELECT * FROM sales_tmp_vw;
  
SELECT * FROM sales_delta

Filter and Rename Columns from Existing Tables/Views

CREATE OR REPLACE TABLE/VIEW purchases AS
SELECT order_id AS id, transaction_timestamp, purchase_revenue_in_usd AS price
FROM sales;

SELECT * FROM purchases

Declare Schema with Generated Columns

CREATE OR REPLACE TABLE purchase_dates (
  id STRING, 
  transaction_timestamp STRING, 
  price STRING,
  date DATE GENERATED ALWAYS AS (
    cast(cast(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE))
    COMMENT "generated based on `transactions_timestamp` column")


SET spark.databricks.delta.schema.autoMerge.enabled=true; 

MERGE INTO purchase_dates a
USING purchases b
ON a.id = b.id
WHEN NOT MATCHED THEN
  INSERT *

Add Constraint

ALTER TABLE purchase_dates ADD CONSTRAINT valid_date CHECK (date > '2020-01-01');

Enrich Tables with Additional Optionals and Metadata

CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
LOCATION "${da.paths.working_dir}/tmp/users_pii"
PARTITIONED BY (first_touch_date)
AS
  SELECT *, 
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() updated,
    input_file_name() source_file
  FROM parquet.`${da.paths.datasets}/ecommerce/raw/users-historical/`;
  
SELECT * FROM users_pii;
  • LOCATION is specified, which will result in an external (rather than managed) table.

NOTE: Partitioning is shown here primarily to demonstrate syntax and impact. Most Delta Lake tables (especially small-to-medium sized data) will not benefit from partitioning. Because partitioning physically separates data files, this approach can result in a small files problem and prevent file compaction and efficient data skipping. The benefits observed in Hive or HDFS do not translate to Delta Lake, and you should consult with an experienced Delta Lake architect before partitioning tables.

As a best practice, you should default to non-partitioned tables for most use cases when working with Delta Lake.

Clone Delta Lake Tables

CREATE OR REPLACE TABLE purchases_clone
DEEP CLONE purchases

CREATE OR REPLACE TABLE purchases_shallow_clone
SHALLOW CLONE purchases
  • DEEP CLONE fully copies data and metadata from a source table to a target. This copy occurs incrementally, so executing this command again can sync changes from the source to the target location.
  • If you wish to create a copy of a table quickly to test out applying changes without the risk of modifying the current table, SHALLOW CLONE can be a good option. Shallow clones just copy the Delta transaction logs, meaning that the data doesn't move.

Complete Overwrite

CREATE OR REPLACE TABLE (CRAS) fully replace the contents of a table each time they executes

INSERT OVERWRITE provides identical outcome from CRAS but it can 1) only overwrite existing table, not creating new table 2). only overwrite with new records that match current table schema (not affecting downstream consumers). 3) can overwrite individual partitions

CREATE OR REPLACE TABLE events AS
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/events-historical`
INSERT OVERWRITE sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-historical/`

Whereas a CRAS statement will allow us to completely redefine the contents of our target table, INSERT OVERWRITE will fail if we try to change our schema (unless we provide optional settings)

Append Rows

We can use INSERT INTO to atomically append new rows to an existing Delta table. This allows for incremental updates to existing tables, which is much more efficient than overwriting each time.

INSERT INTO sales
SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/sales-30m`

Note that INSERT INTO does not have any built-in guarantees to prevent inserting the same records multiple times. Re-executing the above cell would write the same records to the target table, resulting in duplicate records.

Upsert/Merge Updates

MERGE INTO target a 
USING source b 
ON {merge_condition} 
WHEN MATCHED THEN {matched_action} 
WHEN NOT MATCHED THEN {not_matched_action}

The benefit of using MERGE : 1). updates, inserts and deletes are completed as a single transactions, 2) multiple conditions can be added to matching fields. 3) provides extensive options for implementing custom logic

-- Below, we'll only update records if the current row has a **`NULL`** email 
-- and the new row does not. 
-- All unmatched records from the new batch will be inserted.
MERGE INTO users a
USING users_update b
ON a.user_id = b.user_id
WHEN MATCHED AND a.email IS NULL AND b.email IS NOT NULL THEN
  UPDATE SET email = b.email, updated = b.updated
WHEN NOT MATCHED THEN INSERT *

Insert Only Merge for Deduplication

A common ETL use case is to collect logs into a Delta table through append operations.

Many source systems can generate duplicate records. With merge, you can avoid inserting the duplicate records by performing an insert-only merge.

MERGE INTO events a
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN 
  INSERT *

Load Incrementally

COPY INTO provides SQL engineers an idmpotent option to incrementally ingest data from external systems. This is a re-triable and idempotent operation; files in the source location that have already been loaded are skipped.

COPY INTO sales
FROM "${da.paths.datasets}/ecommerce/raw/sales-30m"
FILEFORMAT = PARQUET

Compacting Small Files and Indexing

OPTIMIZE will replace existing data files by combining records and rewriting the results.

When execute OPTIMIZE, users can optimally specify one or serval fields for ZORDER indexing, it speeds up data retrieval when filtering on provided fields by colocating data with similar values within data files.

OPTIMIZE students
ZORDER BY id

Rollback Versions

RESTORE can rollback to previous state of the data in case you accidentally delete table or data. This commands still records as a transaction in the log.

Cleaning Up Stale Files

Databricks will automatically clean up stale log files (> 30 days by default) in Delta Lake tables. Each time a checkpoint is written, Databricks automatically cleans up log entries older than this retention interval.

While Delta Lake versioning and time travel are great for querying recent versions and rolling back queries, keeping the data files for all versions of large production tables around indefinitely is very expensive (and can lead to compliance issues if PII is present).

If you wish to manually purge old data files, this can be performed with the VACUUM operation.

-- The following cell and execute it with a retention of 0 HOURS to 
-- keep only the current version:
VACUUM students RETAIN 0 HOURS

By default, VACUUM will prevent you from deleting files less than 7 days old, just to ensure that no long-running operations are still referencing any of the files to be deleted. If you run VACUUM on a Delta table, you lose the ability time travel back to a version older than the specified data retention period. In our demos, you may see Databricks executing code that specifies a retention of 0 HOURS. This is simply to demonstrate the feature and is not typically done in production.

-- Turn off a check to prevent premature deletion of data files
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
-- Make sure that logging of VACUUM commands is enabled
SET spark.databricks.delta.vacuum.logging.enabled = true;
-- 3. Use the **`DRY RUN`** version of vacuum to print out all records to be deleted
VACUUM students RETAIN 0 HOURS DRY RUN

Reference

  1. Databricks Academy Notebook: DE 3.2 – Set Up Delta Tables, DE 3.3 – Load Data into Delta Lake, DE 3.5 – Version and Optimize Delta Tables

By Hang

Leave a Reply

Your email address will not be published. Required fields are marked *