Implement Logging in Databricks

Posted by

  1. Introduction to Python Logging: Python’s built-in logging module enables writing status messages to files or other output streams, which can include details about code execution and issues encountered.
  2. Logging Module Features: The logging module is feature-rich, simplifying the process of recording events in a file. It suggests using the log4j logging framework for creating custom log messages and directing them to a desired output location.
  3. Setting Up Logging:
    • Import the Logging Module: Start by importing the logging module from Python’s standard library.
    • Create and Configure the Logger: Configure the logger with various parameters, such as the file name where the logs should be recorded.
    • Set Logger Format: Adjust the format of the logging messages as needed. By default, the file is set to append mode, but this can be changed to write mode.
    • Set Logging Level: Determine the logging level, which controls the threshold for what types of messages should be recorded (e.g., debug, info, warning, error, critical).

Logging is a means of tracking events. Logging is important for software developing, debugging, and running. Information Levels:

  • Debug
  • Info
  • Warning
  • Critical
  • Error
# Logging at different levels
logging.debug("Used to give detailed information, typically of interest only when diagnosing problems.")
logging.info("Used to confirm that things are working as expected.")
logging.warning("Used as an indication that something unexpected happened, or is indicative of some problem in the near future.")
logging.critical("This tells serious error, indicating that the program itself may be unable to continue running.")
logging.error("This tells that due to a more serious problem, the software has not been able to perform some function.")
logging.fatal("This is logger fatal.")  # Note: `fatal` is actually an alias for `critical` in Python's logging module.

Below are the logging command:

import logging
from pyspark.sql import SparkSession

# Set up logging
logger = logging.getLogger('DataProcessingLogger')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

# Create or get the Spark session
spark = SparkSession.builder.appName("CSV Data Processing").getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv('dbfs:/FileStore/tables/jaffle_shop_customers.csv', header='true')
logger.info("Share data source file read started...")

# Count the number of records
try:
    count = df.count()
    logger.info(f"Total number of {count} records present.")
except Exception as e:
    logger.error("Error counting records", exc_info=True)

# Attempt to add a column that might not exist in the DataFrame
try:
    df1 = df.withColumn('testColumn', df['column_not_present'].cast('string'))
    df1.show()
except Exception as e:
    logger.error("Logging Error: {}".format(e), exc_info=True)

breaking the above command

1. Import Libraries

import logging
from pyspark.sql import SparkSession
  • import logging: This imports Python’s built-in logging module, which allows you to write status messages to a file or any other output streams. Logging is useful for tracking events that happen when some software runs.
  • from pyspark.sql import SparkSession: This imports SparkSession from the pyspark.sql module, which is the entry point to programming Spark with the DataFrame and SQL API used for working with structured data.

2. Set Up Logging

logger = logging.getLogger('DataProcessingLogger')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
  • getLogger('DataProcessingLogger'): Creates a new logger or retrieves an existing one named ‘DataProcessingLogger’.
  • setLevel(logging.INFO): Sets the logger’s level to INFO, which means the logger will handle only INFO level and higher messages (i.e., INFO, WARNING, ERROR, CRITICAL).
  • logging.StreamHandler(): Creates a stream handler that will write logging output to stderr (standard error stream).
  • logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'): Specifies the format of the log messages which includes the time (asctime), the level of the log message (levelname), and the log message itself (message).
  • setFormatter(formatter): Attaches the formatter to the handler.
  • addHandler(handler): Adds the specified handler to the logger.

3. Create or Get the Spark Session

spark = SparkSession.builder.appName("CSV Data Processing").getOrCreate()
  • SparkSession.builder: Accesses the builder for constructing a SparkSession.
  • appName("CSV Data Processing"): Sets the name of the application as seen on the Spark web UI.
  • getOrCreate(): Retrieves an existing Spark session if there is one, or creates a new one if none exists.

4. Read the CSV File into a DataFrame

df = spark.read.csv('dbfs:/FileStore/tables/jaffle_shop_customers.csv', header='true')
logger.info("Share data source file read started...")
  • spark.read.csv(...): Reads a CSV file and returns the result as a DataFrame. This method takes two arguments: the path to the CSV file and whether the file contains a header row.
  • logger.info(...): Logs an informational message indicating that the reading of the data file has started.

5. Count the Number of Records

try:
    count = df.count()
    logger.info(f"Total number of {count} records present.")
except Exception as e:
    logger.error("Error counting records", exc_info=True)
  • df.count(): Counts the number of rows in the DataFrame.
  • logger.info(...): Logs the count of records in the DataFrame.
  • except Exception as e: Catches any exception that might occur during the counting of records.
  • logger.error(..., exc_info=True): Logs the error along with a traceback to help diagnose what went wrong.

6. Attempt to Add a Column That Might Not Exist in the DataFrame

try:
    df1 = df.withColumn('testColumn', df['column_not_present'].cast('string'))
    df1.show()
except Exception as e:
    logger.error("Logging Error: {}", format(e), exc_info=True)
  • df.withColumn(...): Attempts to add a new column named testColumn to the DataFrame by casting a non-existent column column_not_present to a string type. This is likely to fail.
  • df1.show(): Displays the contents of the modified DataFrame.
  • except Exception as e: Catches any exception that occurs when trying to add the column.
  • logger.error(..., exc_info=True): Logs an error message indicating what went wrong and includes a traceback.

To shut down logging

logging.shutdown()
df = spark.read.text('file:/dbfs/test')
display(df)

Now this will not show the error

guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x