,

Resolving Out-of-Memory (OOM) Errors in Databricks and Apache Spark

Posted by

Introduction

Out-of-Memory (OOM) errors are a frequent headache in Databricks and Apache Spark workflows. Whether your Spark driver crashes unexpectedly or executors repeatedly fail, OOM errors can derail jobs, inflate cloud costs, and leave teams scrambling for fixes. In this guide, we’ll dissect why OOM errors occur, how to diagnose them, and actionable strategies to prevent these issues in the future.


What Causes OOM Errors?

Spark applications run in a distributed environment where memory is split between the driver (orchestrates tasks) and executors (process data). OOM errors arise when:

  1. Driver OOM: The driver node exhausts memory (e.g., collecting large datasets or mishandling metadata).
  2. Executor OOM: Individual executors run out of memory due to data skew, improper configuration, or large shuffles.

Common Causes of OOM Errors

1. Driver Node Failures

  • Collecting Large Datasets: Using df.collect() to retrieve massive results to the driver.
  • Broadcast Joins with Huge Tables: Broadcasting a table larger than driver memory.
  • Inefficient Metadata Handling: Processing complex schemas (e.g., deeply nested JSON).

2. Executor Failures

  • Data Skew: A few tasks process oversized partitions
  • Insufficient Memory Allocation: Under-provisioned spark.executor.memory or spark.memory.overhead.
  • Shuffle Overload: Large shuffles spilling to disk (e.g., groupByjoin).
  • Memory Leaks: Unbounded accumulators or unmanaged off-heap memory in UDFs.

Troubleshooting OOM Errors

Step 1: Identify the Culprit

  • Check Logs:
    • Driver OOM: Look for java.lang.OutOfMemoryError: Java heap space in driver logs.
    • Executor OOM: Search executor logs for ExecutorLostFailure or Exit code: 52.
  • Spark UI:
    • Stages Tab: Identify tasks with high shuffle spill or GC time.
    • Storage Tab: Check cached RDDs/DataFrames consuming excess memory.

Step 2: Analyze Memory Usage

  • Garbage Collection (GC) Logs: Enable GC logging to detect frequent full GC cycles:
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps  
  • Heap Dumps: Generate heap dumps for deep analysis (use tools like Eclipse MAT).

Step 3: Reproduce in Isolation

Test with a smaller dataset or a single executor to isolate memory issues.


Solutions to Fix OOM Errors

For Driver OOM

  1. Avoid collect(): Write results to cloud storage (e.g., S3, ADLS) instead of pulling data to the driver.
  2. Increase Driver Memory:
# Databricks cluster config  
{  
  "driverMemory": "16g",  
  "driverCores": 4  
}  

3. Optimize Broadcasts: Use spark.sql.autoBroadcastJoinThreshold to limit broadcast table size.

    For Executor OOM

    1. Tune Memory Settings:
      • Executor Memory: Allocate 75% of the instance’s RAM to Spark (leave room for OS/overhead).
      • Overhead Memory: Increase spark.executor.memoryOverhead (e.g., 2g).
    spark.conf.set("spark.executor.memory", "8g")  
    spark.conf.set("spark.executor.memoryOverhead", "2g")  

    2. Handle Data Skew:

    • Salting: Redistribute skewed keys (see Data Skew Guide).
    • Increase Partitions: Use spark.sql.shuffle.partitions (default 200) to reduce partition size.

    3. Optimize Shuffles:

    • Enable spark.sql.adaptive.enabled (Spark 3.0+) to auto-tune shuffle partitions.
    • Use repartition() before heavy operations like groupBy.

      Advanced Fixes

      • Off-Heap Memory: Store serialized data off-heap with spark.memory.offHeap.enabled=true.
      • Photon Acceleration: Use Databricks Runtime with Photon for optimized vectorized execution.
      • Delta Lake Caching: Cache frequently used Delta tables to reduce recomputation.

      Best Practices to Prevent OOM Errors

      1. Right-Size Clusters:
        • Use memory-optimized instances (e.g., AWS r5/Azure E_v3) for memory-heavy workloads.
        • Enable autoscaling to handle dynamic workloads.
      2. Monitor Proactively:
        • Use Databricks Ganglia UI or Metrics API to track memory usage.
        • Set alerts for MemoryUsage and GCTime.
      3. Code Optimizations:
        • Avoid cache() unless necessary.
        • Use select() to prune unused columns before shuffles.
      4. Leverage Delta Lake:
        • Use ZORDER BY to colocate data and minimize scans.
        • Compact small files with OPTIMIZE.

      Real-World Example: Fixing Executor OOM in a Skewed Join

      Scenario: A JOIN operation on a skewed user_id column caused repeated executor crashes.

      Steps Taken:

      1. Identified skew via Spark UI (1 task processed 10M records; others <100K).
      2. Salted the skewed key:
      from pyspark.sql.functions import concat, lit, rand  
      salted_df = df.withColumn("salted_user_id", concat("user_id", lit("_"), (rand() * 20).cast("int")))  

      3. Increased partitions and executor memory:

      spark.conf.set("spark.sql.shuffle.partitions", 400)  
      spark.conf.set("spark.executor.memory", "12g")  

      4. Result: Executor OOM errors resolved; job runtime reduced by 60%.

      Conclusion

      OOM errors in Spark are rarely insurmountable. By understanding memory management, tuning configurations, and addressing root causes like data skew, you can keep your Databricks pipelines running smoothly.

      guest
      0 Comments
      Inline Feedbacks
      View all comments
      0
      Would love your thoughts, please comment.x
      ()
      x