What Auto Loader does (one line)
Watches a folder for new files and appends them exactly once to a Delta table, even if the job restarts.
Minimal example (Python)
Replace the 4 paths/names at the top. Works for ADLS/S3/GCS or a UC Volume path.
from pyspark.sql.functions import current_timestamp
# === 1) Fill these in ===
SRC_PATH = "/Volumes/dev/bronze/landing/invoices" # where files arrive
SCHEMA_LOC = "/Volumes/dev/ops/_schemas/invoices" # Auto Loader state
CHECKPOINT = "/Volumes/dev/ops/_checkpoints/invoices_al" # stream progress
TABLE_NAME = "dev.bronze.invoices_al" # Delta sink table
# Detection mode: "listing" (default) or "notification"
USE_NOTIFICATIONS = "false" # set to "true" after wiring S3/SQS, ADLS/EventGrid→Queue, or GCS/PubSub
# Schema evolution mode: "addNewColumns" | "rescue" | "failOnNewColumns"
EVOLUTION_MODE = "addNewColumns" # try "rescue" or "failOnNewColumns" to see behavior
# If you expect new columns to be added to the Delta table, enable sink auto-merge:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", SCHEMA_LOC)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.useNotifications", USE_NOTIFICATIONS)
.option("cloudFiles.schemaEvolutionMode", EVOLUTION_MODE) # <-- key line
.load(SRC_PATH)
.withColumn("ingestion_ts", current_timestamp()))
(df.writeStream
.option("checkpointLocation", CHECKPOINT) # <-- exactly-once checkpoint
.toTable(TABLE_NAME)) # creates/appends Delta table
What each setting means (in plain words)
File detection mode
- Directory listing (default): Auto Loader scans the folder.
How to use: do nothing or set.option("cloudFiles.useNotifications","false"). - File notification (event-driven): Storage sends “new file” events to a queue; no scanning.
How to use: set.option("cloudFiles.useNotifications","true")and wire S3→SQS, ADLS→Event Grid→Queue, or GCS→Pub/Sub with correct IAM.
Checkpoint
- Path that stores stream progress. One unique checkpoint per stream.
- Deleting it means the job will reprocess from scratch.
Schema evolution modes
addNewColumns→ If new columns appear in incoming files, they are added to the table (nullable).rescue→ New/unknown columns go into_rescued_data(a JSON map). Table schema stays the same.failOnNewColumns→ The stream fails when a new column appears (forces you to update the contract).- “none” → There isn’t a literal “none”. If you want “don’t change the table”, use
rescue(soft) orfailOnNewColumns(hard stop). If you leave it unset, Databricks defaults toaddNewColumns.
Tiny demo you can run
- Drop this file as
invoices_1.csvintoSRC_PATH:
invoice_id,customer,amount
1,A,100
2,B,200
- Result: table has 3 columns +
ingestion_ts. - Now drop
invoices_2.csvwith a new column:
invoice_id,customer,amount,currency
3,C,300,USD
- What happens by mode:
| Mode | Table change | What you’ll see |
|---|---|---|
| addNewColumns | currency column is added (nullable). Old rows have NULL. | Stream keeps running. |
| rescue | No new table column. currency lands inside _rescued_data JSON. | Stream keeps running; parse later in Silver. |
| failOnNewColumns | No change. | Stream throws an error about unexpected column. |
Silver upsert (optional, for row-level exactly-once)
def upsert_to_silver(batch_df, batch_id):
batch_df.createOrReplaceTempView("v_batch")
spark.sql("""
MERGE INTO dev.silver.invoices s
USING v_batch b
ON s.invoice_id = b.invoice_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
(spark.readStream.table(TABLE_NAME)
.writeStream
.option("checkpointLocation", CHECKPOINT + "/silver")
.foreachBatch(upsert_to_silver)
.start())
Quick cheat-sheet
- Paths: set
SRC_PATH,SCHEMA_LOC,CHECKPOINT,TABLE_NAME. - Detection:
useNotifications=false(default) ortrue(event-driven). - Evolution: pick one of
addNewColumns/rescue/failOnNewColumns. - Exactly-once: keep a stable checkpoint and write to Delta.
- New columns to sink: turn on
spark.databricks.delta.schema.autoMerge.enabled=true.
Here’s the simple, no-nonsense version:
What’s a checkpoint in Autoloader?
- Think of it as the bookmark for a streaming job.
- Stored in a folder you choose (on ADLS/S3/GCS/Volumes).
- It saves:
- which batches already ran,
- source offsets/progress,
- sink commit info (so retries don’t duplicate writes),
- state for any streaming ops (aggregations, dedup, etc.).
- If the cluster dies or you restart, Spark reads the checkpoint and resumes exactly where it left off.
Rules of thumb
- Use one unique checkpoint path per stream.
- Don’t delete/overwrite it while the stream is running.
- Moving or deleting it makes the job “forget” progress (it will behave like a new stream).
What’s RocksDB here?
- RocksDB is a very fast embedded key-value store (a tiny database on disk).
- Databricks uses it under the hood as the state store for streaming and (for Auto Loader) to keep the file-discovery state.
- In Auto Loader, RocksDB sits in the
cloudFiles.schemaLocationfolder and remembers which files were already seen/processed, so discovery scales to millions/billions of files without rescanning everything.
Why it matters
- Lets Auto Loader do incremental discovery efficiently.
- Prevents re-ingesting the same file when jobs restart.
- Handles big “state” without needing tons of driver memory.
How they fit together in Auto Loader
cloudFiles.schemaLocation(RocksDB state): remembers what files exist and which ones were processed → enables scalable, incremental discovery (listing or notifications).checkpointLocation(checkpoint): remembers which micro-batches were written to the sink → enables exactly-once delivery to Delta.
If you wipe the checkpoint but keep the schemaLocation, Auto Loader still knows which files were seen, so it won’t re-offer old files—but you can lose “resume” position for in-flight batches. Best practice: don’t delete either.
Minimal setup (copy/paste)
SRC_PATH = "/Volumes/dev/bronze/landing/invoices"
SCHEMA_LOC = "/Volumes/dev/ops/_schemas/invoices" # RocksDB + schema
CHECKPOINT = "/Volumes/dev/ops/_checkpoints/invoices_al" # streaming bookmark
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", SCHEMA_LOC) # <-- RocksDB lives here
.load(SRC_PATH))
(df.writeStream
.option("checkpointLocation", CHECKPOINT) # <-- checkpoint lives here
.toTable("dev.bronze.invoices_al"))
Quick FAQ
- Do I need both paths? Yes.
schemaLocation(RocksDB) for Auto Loader’s file state;checkpointLocationfor the stream’s progress. - Does RocksDB mean I install anything? No—Databricks manages it; you just provide a stable folder.
- Directory Listing vs File Notification? Either way, Auto Loader still uses RocksDB at
schemaLocationto track discovered files.
Category: