Mohammad Gufran Jahangir August 17, 2025 0

What Auto Loader does (one line)

Watches a folder for new files and appends them exactly once to a Delta table, even if the job restarts.


Minimal example (Python)

Replace the 4 paths/names at the top. Works for ADLS/S3/GCS or a UC Volume path.

from pyspark.sql.functions import current_timestamp

# === 1) Fill these in ===
SRC_PATH   = "/Volumes/dev/bronze/landing/invoices"      # where files arrive
SCHEMA_LOC = "/Volumes/dev/ops/_schemas/invoices"        # Auto Loader state
CHECKPOINT = "/Volumes/dev/ops/_checkpoints/invoices_al" # stream progress
TABLE_NAME = "dev.bronze.invoices_al"                    # Delta sink table

# Detection mode: "listing" (default) or "notification"
USE_NOTIFICATIONS = "false"   # set to "true" after wiring S3/SQS, ADLS/EventGrid→Queue, or GCS/PubSub

# Schema evolution mode: "addNewColumns" | "rescue" | "failOnNewColumns"
EVOLUTION_MODE = "addNewColumns"   # try "rescue" or "failOnNewColumns" to see behavior

# If you expect new columns to be added to the Delta table, enable sink auto-merge:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", SCHEMA_LOC)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.useNotifications", USE_NOTIFICATIONS)
        .option("cloudFiles.schemaEvolutionMode", EVOLUTION_MODE)  # <-- key line
        .load(SRC_PATH)
        .withColumn("ingestion_ts", current_timestamp()))

(df.writeStream
   .option("checkpointLocation", CHECKPOINT)   # <-- exactly-once checkpoint
   .toTable(TABLE_NAME))                       # creates/appends Delta table

What each setting means (in plain words)

File detection mode

  • Directory listing (default): Auto Loader scans the folder.
    How to use: do nothing or set .option("cloudFiles.useNotifications","false").
  • File notification (event-driven): Storage sends “new file” events to a queue; no scanning.
    How to use: set .option("cloudFiles.useNotifications","true") and wire S3→SQS, ADLS→Event Grid→Queue, or GCS→Pub/Sub with correct IAM.

Checkpoint

  • Path that stores stream progress. One unique checkpoint per stream.
  • Deleting it means the job will reprocess from scratch.

Schema evolution modes

  • addNewColumns → If new columns appear in incoming files, they are added to the table (nullable).
  • rescue → New/unknown columns go into _rescued_data (a JSON map). Table schema stays the same.
  • failOnNewColumns → The stream fails when a new column appears (forces you to update the contract).
  • “none” → There isn’t a literal “none”. If you want “don’t change the table”, use rescue (soft) or failOnNewColumns (hard stop). If you leave it unset, Databricks defaults to addNewColumns.

Tiny demo you can run

  1. Drop this file as invoices_1.csv into SRC_PATH:
invoice_id,customer,amount
1,A,100
2,B,200
  1. Result: table has 3 columns + ingestion_ts.
  2. Now drop invoices_2.csv with a new column:
invoice_id,customer,amount,currency
3,C,300,USD
  1. What happens by mode:
ModeTable changeWhat you’ll see
addNewColumnscurrency column is added (nullable). Old rows have NULL.Stream keeps running.
rescueNo new table column. currency lands inside _rescued_data JSON.Stream keeps running; parse later in Silver.
failOnNewColumnsNo change.Stream throws an error about unexpected column.

Silver upsert (optional, for row-level exactly-once)

def upsert_to_silver(batch_df, batch_id):
    batch_df.createOrReplaceTempView("v_batch")
    spark.sql("""
      MERGE INTO dev.silver.invoices s
      USING v_batch b
      ON s.invoice_id = b.invoice_id
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    """)

(spark.readStream.table(TABLE_NAME)
  .writeStream
  .option("checkpointLocation", CHECKPOINT + "/silver")
  .foreachBatch(upsert_to_silver)
  .start())

Quick cheat-sheet

  • Paths: set SRC_PATH, SCHEMA_LOC, CHECKPOINT, TABLE_NAME.
  • Detection: useNotifications=false (default) or true (event-driven).
  • Evolution: pick one of addNewColumns / rescue / failOnNewColumns.
  • Exactly-once: keep a stable checkpoint and write to Delta.
  • New columns to sink: turn on spark.databricks.delta.schema.autoMerge.enabled=true.

Here’s the simple, no-nonsense version:

What’s a checkpoint in Autoloader?

  • Think of it as the bookmark for a streaming job.
  • Stored in a folder you choose (on ADLS/S3/GCS/Volumes).
  • It saves:
    • which batches already ran,
    • source offsets/progress,
    • sink commit info (so retries don’t duplicate writes),
    • state for any streaming ops (aggregations, dedup, etc.).
  • If the cluster dies or you restart, Spark reads the checkpoint and resumes exactly where it left off.

Rules of thumb

  • Use one unique checkpoint path per stream.
  • Don’t delete/overwrite it while the stream is running.
  • Moving or deleting it makes the job “forget” progress (it will behave like a new stream).

What’s RocksDB here?

  • RocksDB is a very fast embedded key-value store (a tiny database on disk).
  • Databricks uses it under the hood as the state store for streaming and (for Auto Loader) to keep the file-discovery state.
  • In Auto Loader, RocksDB sits in the cloudFiles.schemaLocation folder and remembers which files were already seen/processed, so discovery scales to millions/billions of files without rescanning everything.

Why it matters

  • Lets Auto Loader do incremental discovery efficiently.
  • Prevents re-ingesting the same file when jobs restart.
  • Handles big “state” without needing tons of driver memory.

How they fit together in Auto Loader

  • cloudFiles.schemaLocation (RocksDB state): remembers what files exist and which ones were processed → enables scalable, incremental discovery (listing or notifications).
  • checkpointLocation (checkpoint): remembers which micro-batches were written to the sink → enables exactly-once delivery to Delta.

If you wipe the checkpoint but keep the schemaLocation, Auto Loader still knows which files were seen, so it won’t re-offer old files—but you can lose “resume” position for in-flight batches. Best practice: don’t delete either.

Minimal setup (copy/paste)

SRC_PATH     = "/Volumes/dev/bronze/landing/invoices"
SCHEMA_LOC   = "/Volumes/dev/ops/_schemas/invoices"        # RocksDB + schema
CHECKPOINT   = "/Volumes/dev/ops/_checkpoints/invoices_al"  # streaming bookmark

df = (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", SCHEMA_LOC)    # <-- RocksDB lives here
        .load(SRC_PATH))

(df.writeStream
   .option("checkpointLocation", CHECKPOINT)                # <-- checkpoint lives here
   .toTable("dev.bronze.invoices_al"))

Quick FAQ

  • Do I need both paths? Yes. schemaLocation (RocksDB) for Auto Loader’s file state; checkpointLocation for the stream’s progress.
  • Does RocksDB mean I install anything? No—Databricks manages it; you just provide a stable folder.
  • Directory Listing vs File Notification? Either way, Auto Loader still uses RocksDB at schemaLocation to track discovered files.

Category: 
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments