Mohammad Gufran Jahangir August 13, 2025 0

Databricks Auto Loader is the cloudFiles source for Structured Streaming that incrementally discovers and ingests new files from ADLS/S3/GCS into Delta Lake—with built‑in deduplication, schema management, and exactly‑once delivery to Delta sinks. This guide explains how Auto Loader works, when to use it, the two file detection strategies, and the schema evolution modes, with practical code you can paste into a notebook or job.


TL;DR

  • Use Auto Loader for continuous or frequent micro‑batch file ingestion at scale.
  • Pick a file detection mode:
    • Directory Listing (default): zero setup; scans the path incrementally.
    • File Notification: event‑driven discovery via S3/SQS, ADLS Event Grid→Queue, or GCS Pub/Sub—faster and cheaper at scale.
  • Choose a schema evolution mode:
    • addNewColumns → table evolves by adding new nullable columns.
    • rescue → unexpected fields go into _rescued_data JSON; table schema stays stable.
    • failOnNewColumns → stop the stream on drift and force you to update the contract.
  • For exactly‑once to Delta: keep a checkpoint, write to Delta, and use MERGE for upserts in Silver.

When to use Auto Loader vs. COPY INTO

ScenarioAuto LoaderCOPY INTO
File arrivalContinuous/near‑real‑timePeriodic batch
Directory sizeHuge / deepSmall→medium
Schema driftBuilt‑in inference, evolution, rescued dataBasic; manual control
Cost of discoveryLowest with notificationsDirectory listing every run
SemanticsStreaming/micro‑batchBatch, idempotent per file

Rule of thumb: If producers drop files throughout the day or you have millions of files, go Auto Loader. If you run a daily batch on a modest folder, COPY INTO is simpler.


Architecture at a glance

  • Source path: abfss://…, s3://…, gs://…, or a UC Volume.
  • cloudFiles.schemaLocation: persistent state and canonical schema. Treat as critical infra.
  • Checkpoint: Structured Streaming progress store for exactly‑once.
  • Detection mode: directory listing or file notifications queue.
  • Sink: Delta table (Bronze), then MERGE to Silver for dedup/upserts.
[Producers] → [Landing Storage] → (Auto Loader: discover + parse)
                                     │
                                     ├── schemaLocation (state + schema)
                                     ├── checkpoint (stream progress)
                                     └── Delta Bronze (append, exactly‑once)
                                               ↓ MERGE (keys)
                                         Delta Silver (clean)

Quick starts

Python

from pyspark.sql.functions import current_timestamp

src_path   = "/Volumes/dev/bronze/landing/input"     # or abfss://, s3://, gs://
schema_loc = "/Volumes/dev/ops/_schemas/invoice"    # stable
chkpt      = "/Volumes/dev/ops/_checkpoints/invoice_autoloader"

df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", schema_loc)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        #.option("cloudFiles.useNotifications", "true")  # enable when infra is ready
        .load(src_path)
        .withColumn("ingestion_ts", current_timestamp()))

(df.writeStream
   .option("checkpointLocation", chkpt)
   .trigger(availableNow=True)  # or processingTime="5 minutes"
   .toTable("dev.bronze.invoice_al"))

SQL

CREATE STREAMING TABLE IF NOT EXISTS dev.bronze.invoice_al;

COPY INTO STREAM dev.bronze.invoice_al
FROM STREAM read_files(
  format => 'cloudFiles',
  cloudFiles => map(
    'format','csv',
    'schemaLocation','/Volumes/dev/ops/_schemas/invoice',
    'inferColumnTypes','true',
    'schemaEvolutionMode','addNewColumns'
  ),
  path => '/Volumes/dev/bronze/landing/input'
);

File detection modes

1) Directory Listing (default)

  • How: Periodically lists the path, tracking discovered files at schemaLocation.
  • Pros: No external setup; portable across clouds.
  • Cons: For very large trees, listings can add latency and cost.
  • Use: Do nothing (default) or set .option("cloudFiles.useNotifications","false").

2) File Notification (event‑driven)

  • How: Storage publishes file‑create events to a queue/topic; Auto Loader consumes them.
  • Why: Faster discovery, fewer list calls, better for spiky throughput.
  • Cloud wiring (overview)
    • AWS: S3 → Event Notification → SQS. Grant the cluster role sqs:ReceiveMessage, etc.
    • Azure: ADLS/Blob → Event GridQueue Storage. Grant Storage Queue permissions.
    • GCP: GCS → Pub/Sub. Grant subscriber role to the service principal.
  • Use: .option("cloudFiles.useNotifications","true") (plus provider‑specific options/permissions).
  • Fallback: If events are misconfigured, either fix IAM or temporarily disable notifications to keep loading via listing.

Schema inference & evolution

Auto Loader maintains a canonical schema and supports several strategies to handle drift.

Key options

  • cloudFiles.inferColumnTypes = true|false → cast obvious types.
  • cloudFiles.schemaHints = "id LONG, ts TIMESTAMP, amount DECIMAL(18,2)" → pin critical columns.
  • cloudFiles.rescuedDataColumn = '_rescued_data' (default) → captures unexpected columns/fields as JSON.
  • cloudFiles.parseMode = PERMISSIVE|DROPMALFORMED|FAILFAST (CSV/JSON).

Evolution modes

ModeSettingBehavior on new columnsNotes
addNewColumnscloudFiles.schemaEvolutionMode='addNewColumns'Adds columns to the table (nullable).Good for known growth; still control types via hints.
rescuecloudFiles.schemaEvolutionMode='rescue'Keeps table schema stable; new fields land in _rescued_data.Safest for volatile payloads; parse later in Silver.
failOnNewColumnscloudFiles.schemaEvolutionMode='failOnNewColumns'Fails the stream on drift.Enforce strict contracts; update hints intentionally.

Examples

Strict contract

(spark.readStream.format('cloudFiles')
  .option('cloudFiles.format','json')
  .option('cloudFiles.schemaLocation', schema_loc)
  .option('cloudFiles.schemaHints', 'id LONG, event_ts TIMESTAMP, amount DECIMAL(18,2)')
  .option('cloudFiles.schemaEvolutionMode','failOnNewColumns')
  .load(src_path))

Rescue and parse later

from pyspark.sql.functions import from_json, col
raw = (spark.readStream.format('cloudFiles')
        .option('cloudFiles.format','json')
        .option('cloudFiles.schemaLocation', schema_loc)
        .option('cloudFiles.schemaEvolutionMode','rescue')
        .load(src_path))

parsed = raw.withColumn('extra', from_json(col('_rescued_data'), 'map<string,string>'))

Exactly‑once delivery and dedup

  • Checkpointing and the Delta sink provide exactly‑once at the micro‑batch level.
  • For row‑level idempotency, use foreachBatch + MERGE with a stable key.
def upsert_to_silver(batch_df, batch_id):
    batch_df.createOrReplaceTempView('v_batch')
    spark.sql('''
      MERGE INTO dev.silver.invoice s
      USING v_batch b
      ON s.invoice_id = b.invoice_id AND s.invoice_date = b.invoice_date
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    ''')

(df.writeStream
  .option('checkpointLocation', chkpt + '/silver')
  .foreachBatch(upsert_to_silver)
  .start())

Add a hash column for change detection, and consider watermarks + dropDuplicates for event streams.


Operations: scaling, cost, and monitoring

  • Throughput: tune maxFilesPerTrigger / maxBytesPerTrigger; scale cluster cores.
  • Latency: prefer file notifications; reduce deep nesting; use availableNow=True to backfill.
  • Cost: notifications minimize list calls; compact small Bronze files with OPTIMIZE as needed.
  • Observability: Streaming UI, Delta DESCRIBE HISTORY, Unity Catalog system tables (query & job history). Persist per‑batch metrics to an ops table.
  • Backfills: Run a separate job with a different checkpoint and availableNow=True against historical folders.

Common pitfalls & fixes

  • Misconfigured events → stream won’t start. Fix IAM/queue wiring or disable notifications temporarily.
  • Schema thrash → define schemaHints; pick failOnNewColumns for strictness or rescue to defer.
  • Duplicate business rows → MERGE on deterministic keys in Silver; treat landing as append‑only (don’t overwrite files in place).
  • Huge directories → split by date prefixes; enable notifications; scale cluster.

Got it—here’s a tight cheat-sheet for those terms, plus copy-paste snippets.

Detection mode

ModeWhat it doesWhen to useHow to set
Directory listing (default)Periodically lists the source path to discover new files.Simple setups, small/medium directories.Omit setting or .option("cloudFiles.useNotifications","false")
File notificationEvent-driven discovery via storage events (S3→SQS, ADLS→Event Grid→Queue, GCS→Pub/Sub).Huge trees, high arrival rate, lower listing cost/latency..option("cloudFiles.useNotifications","true") (plus cloud IAM/queue setup)

Checkpoint (exactly-once)

  • Stores Structured Streaming progress/offsets for the job.
  • One unique path per stream; don’t share between jobs; don’t delete while a stream is active.
  • Moving/clearing a checkpoint causes reprocessing from the last durable commit.
chkpt = "/Volumes/dev/ops/_checkpoints/invoice_al"
(df.writeStream
   .option("checkpointLocation", chkpt)
   .toTable("dev.bronze.invoice_al"))

Schema evolution modes (Auto Loader)

ModeEffect when new columns appearHow to setNotes
addNewColumnsAdds the new columns to the table (nullable)..option("cloudFiles.schemaEvolutionMode","addNewColumns")Also allow sink evolution with .option("mergeSchema","true") on the writeStream, or enable auto-merge at the cluster if needed.
rescueKeeps table schema unchanged; unexpected columns land in _rescued_data JSON..option("cloudFiles.schemaEvolutionMode","rescue") and optionally .option("cloudFiles.rescuedDataColumn","_rescued_data")Safest when producers are noisy; you can parse later in Silver.
failOnNewColumnsStream fails on drift; forces contract update..option("cloudFiles.schemaEvolutionMode","failOnNewColumns")Pair with schemaHints for strict typing.
none(No literal mode.) If you mean “don’t evolve the table”: use rescue to ignore extras without failing, or failOnNewColumns to hard-block changes.Leaving the mode unset defaults to addNewColumns.

Minimal end-to-end example

from pyspark.sql.functions import current_timestamp

src          = "/Volumes/dev/bronze/landing/input"
schema_loc   = "/Volumes/dev/ops/_schemas/invoice"
checkpoint   = "/Volumes/dev/ops/_checkpoints/invoice_al"

df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format","csv")
        .option("cloudFiles.schemaLocation", schema_loc)
        .option("cloudFiles.inferColumnTypes","true")
        # Detection mode:
        # .option("cloudFiles.useNotifications","true")   # enable when queue/events wired
        # Evolution mode (pick one):
        # .option("cloudFiles.schemaEvolutionMode","addNewColumns")
        # .option("cloudFiles.schemaEvolutionMode","rescue")
        # .option("cloudFiles.schemaEvolutionMode","failOnNewColumns")
        .load(src)
        .withColumn("ingestion_ts", current_timestamp()))

(df.writeStream
   .option("checkpointLocation", checkpoint)
   # Needed if you chose addNewColumns and expect the Delta table to grow:
   # .option("mergeSchema","true")
   .toTable("dev.bronze.invoice_al"))


Cheat‑sheet

  • Source: .format('cloudFiles') + cloudFiles.format = csv|json|parquet|...
  • State: cloudFiles.schemaLocation
  • Detection: cloudFiles.useNotifications = true|false
  • Evolution: cloudFiles.schemaEvolutionMode = addNewColumns|rescue|failOnNewColumns
  • Hints: cloudFiles.schemaHints = 'col TYPE, ...'
  • Catch‑up: trigger(availableNow=True) + includeExistingFiles (default true)
  • Exactly‑once: Delta sink + checkpoints; MERGE in Silver

Auto Loader gives you streaming‑grade ingestion with the simplicity of a few options. Choose your detection mode, lock your schema strategy, and you’re production‑ready.

Category: 
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments