Using DataFrame columns in Azure Databricks offers various operations for data manipulation.
How to use DataFrame column in different way in Azure Databrick
Using DataFrame columns in Azure Databricks offers various operations for data manipulation. Here’s a step-by-step guide with examples showcasing different ways to work with DataFrame columns:
1. Creating a DataFrame:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("ColumnOperations").getOrCreate()
# Sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
# Create a DataFrame with columns: 'Name' and 'Age'
df = spark.createDataFrame(data, ["Name", "Age"])
2. Accessing Columns:
# Accessing a single column
name_col = df["Name"]
# Show the content of the column
name_col.show()
# Accessing multiple columns
selected_cols = df.select("Name", "Age")
# Show the selected columns
selected_cols.show()
3. Adding and Renaming Columns:
from pyspark.sql.functions import col
# Adding a new column
df_with_new_col = df.withColumn("NewColumn", col("Age") * 2)
# Renaming a column
df_renamed_col = df.withColumnRenamed("Age", "Years")
# Show DataFrame with new and renamed columns
df_with_new_col.show()
df_renamed_col.show()
4. Filtering and Modifying Columns:
from pyspark.sql.functions import expr
# Filtering rows based on column values
filtered_df = df.filter(df["Age"] > 30)
# Modifying column values using an expression
modified_df = df.withColumn("UpdatedAge", expr("Age + 5"))
# Show the filtered and modified DataFrames
filtered_df.show()
modified_df.show()
5. Aggregations and Calculations:
from pyspark.sql.functions import avg
# Aggregating column values
average_age = df.select(avg("Age")).alias("AverageAge")
# Calculating statistics on columns
statistics = df.describe("Age")
# Show the aggregation and statistics results
average_age.show()
statistics.show()
6. Joining Columns and DataFrame Operations:
# Joining two DataFrames based on a common column
another_df = spark.createDataFrame([(25, "USA"), (30, "UK")], ["Age", "Country"])
joined_df = df.join(another_df, "Age", "inner")
# Performing other DataFrame operations (e.g., union, distinct)
union_df = df.union(another_df)
distinct_df = df.distinct()
# Show the joined, union, and distinct DataFrames
joined_df.show()
union_df.show()
distinct_df.show()
List of Action Functions in Azure Databricks Spark
Function | Description | Example |
---|---|---|
show(numRows) | Displays a specified number of rows of the DataFrame. | df.show(5) |
write.format(format).save(path) | Writes the DataFrame to a specified format and path. | df.write.format(“csv”).save(“data.csv”) |
saveAsTable(tableName) | Saves the DataFrame as a table in the Spark catalog. | df.saveAsTable(“myTable”) |
foreach(function) | Applies a user-defined function to each row of the DataFrame. | def myFunction(row): print(row) df.foreach(myFunction) |
collect() | Collects all rows of the DataFrame into an array. | data = df.collect() |
head(numRows) | Returns the first specified number of rows of the DataFrame as an array. | data = df.head(2) |
count() | Returns the total number of rows in the DataFrame. | numRows = df.count() |
saveModel(model, path) | Saves a trained machine learning model to a specified path. | from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(…) lr.fit(df) lr.saveModel(“model”) |
trigger(url, body) | Triggers an HTTP request to a specified URL with a provided body. | from pyspark.sql.functions import col df.filter(col(“status”) == “pending”).trigger(“http://example.com/process“, df.toJSON()) |
foreachPartition(function) | Applies a user-defined function to each partition of the DataFrame. | def myPartitionFunction(iterator): for row in iterator: print(row) df.foreachPartition(myPartitionFunction) |
List of Transformation Functions in Azure Databricks Spark
Here’s a list of common Transformation Functions in Azure Databricks Spark, along with descriptions and examples:
Function | Description | Example |
---|---|---|
filter(condition) | Filters rows based on a specified condition | df.filter(col(“age”) > 25) |
where(condition) | Alias for filter | df.where(col(“city”) == “Seattle”) |
orderBy(col1, …, colN) | Sorts the DataFrame by one or more columns | df.orderBy(col(“name”).desc(), col(“age”).asc()) |
sort(col1, …, colN) | Alias for orderBy | df.sort(col(“city”).asc()) |
select(expr1, …, exprN) | Selects specific columns or expressions for the output DataFrame | df.select(“name”, “age”, col(“city”).alias(“town”)) |
groupBy(col1, …, colN) | Groups rows by one or more columns | df.groupBy(“city”).count() |
agg(expr1, …, exprN) | Aggregates data using built-in functions like avg , sum , max , etc. | df.groupBy(“city”).agg(avg(“age”), max(“age”)) |
join(otherDF, joinExpr, joinType) | Joins two DataFrames based on a join expression and type | df.join(otherDF, col(“id”) == otherDF.col(“id”), “inner”) |
crossJoin(otherDF) | Performs a Cartesian product join, joining every row of one DataFrame with every row of the other | df.crossJoin(otherDF) |
distinct() | Removes duplicate rows from the DataFrame | df.distinct() |
dropDuplicates(subset) | Removes duplicate rows based on a specified subset of columns | df.dropDuplicates(subset=[“name”, “city”]) |
withColumn(colName, expr) | Adds a new column to the DataFrame based on an expression | df.withColumn(“fullName”, col(“name”) + ” ” + col(“lastName”)) |
drop(col1, …, colN) | Drops specified columns from the DataFrame | df.drop(“id”, “age”) |
sample(withReplacement, fraction) | Samples a fraction of rows from the DataFrame with or without replacement | df.sample(withReplacement=False, fraction=0.1) |
limit(numRows) | Limits the number of rows in the DataFrame | df.limit(10) |
map(func) | Applies a user-defined function to each row of the DataFrame | def myFunction(row): return row.age + 1 df.map(myFunction) |
flatMap(func) | Applies a user-defined function that produces a sequence of elements for each row of the DataFrame | def myFunction(row): return [row.name, row.city] df.flatMap(myFunction) |