Azure Databrick – DataFrame column

Posted by

Using DataFrame columns in Azure Databricks offers various operations for data manipulation.

How to use DataFrame column in different way in Azure Databrick

Using DataFrame columns in Azure Databricks offers various operations for data manipulation. Here’s a step-by-step guide with examples showcasing different ways to work with DataFrame columns:

1. Creating a DataFrame:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ColumnOperations").getOrCreate()

# Sample data
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]

# Create a DataFrame with columns: 'Name' and 'Age'
df = spark.createDataFrame(data, ["Name", "Age"])

2. Accessing Columns:

# Accessing a single column
name_col = df["Name"]

# Show the content of the column
name_col.show()

# Accessing multiple columns
selected_cols = df.select("Name", "Age")

# Show the selected columns
selected_cols.show()

3. Adding and Renaming Columns:

from pyspark.sql.functions import col

# Adding a new column
df_with_new_col = df.withColumn("NewColumn", col("Age") * 2)

# Renaming a column
df_renamed_col = df.withColumnRenamed("Age", "Years")

# Show DataFrame with new and renamed columns
df_with_new_col.show()
df_renamed_col.show()

4. Filtering and Modifying Columns:

from pyspark.sql.functions import expr

# Filtering rows based on column values
filtered_df = df.filter(df["Age"] > 30)

# Modifying column values using an expression
modified_df = df.withColumn("UpdatedAge", expr("Age + 5"))

# Show the filtered and modified DataFrames
filtered_df.show()
modified_df.show()

5. Aggregations and Calculations:

from pyspark.sql.functions import avg

# Aggregating column values
average_age = df.select(avg("Age")).alias("AverageAge")

# Calculating statistics on columns
statistics = df.describe("Age")

# Show the aggregation and statistics results
average_age.show()
statistics.show()

6. Joining Columns and DataFrame Operations:

# Joining two DataFrames based on a common column
another_df = spark.createDataFrame([(25, "USA"), (30, "UK")], ["Age", "Country"])
joined_df = df.join(another_df, "Age", "inner")

# Performing other DataFrame operations (e.g., union, distinct)
union_df = df.union(another_df)
distinct_df = df.distinct()

# Show the joined, union, and distinct DataFrames
joined_df.show()
union_df.show()
distinct_df.show()

List of Action Functions in Azure Databricks Spark

FunctionDescriptionExample
show(numRows)Displays a specified number of rows of the DataFrame.df.show(5)
write.format(format).save(path)Writes the DataFrame to a specified format and path.df.write.format(“csv”).save(“data.csv”)
saveAsTable(tableName)Saves the DataFrame as a table in the Spark catalog.df.saveAsTable(“myTable”)
foreach(function)Applies a user-defined function to each row of the DataFrame.def myFunction(row): print(row) df.foreach(myFunction)
collect()Collects all rows of the DataFrame into an array.data = df.collect()
head(numRows)Returns the first specified number of rows of the DataFrame as an array.data = df.head(2)
count()Returns the total number of rows in the DataFrame.numRows = df.count()
saveModel(model, path)Saves a trained machine learning model to a specified path.from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(…) lr.fit(df) lr.saveModel(“model”)
trigger(url, body)Triggers an HTTP request to a specified URL with a provided body.from pyspark.sql.functions import col df.filter(col(“status”) == “pending”).trigger(“http://example.com/process“, df.toJSON())
foreachPartition(function)Applies a user-defined function to each partition of the DataFrame.def myPartitionFunction(iterator): for row in iterator: print(row) df.foreachPartition(myPartitionFunction)

List of Transformation Functions in Azure Databricks Spark

Here’s a list of common Transformation Functions in Azure Databricks Spark, along with descriptions and examples:

FunctionDescriptionExample
filter(condition)Filters rows based on a specified conditiondf.filter(col(“age”) > 25)
where(condition)Alias for filterdf.where(col(“city”) == “Seattle”)
orderBy(col1, …, colN)Sorts the DataFrame by one or more columnsdf.orderBy(col(“name”).desc(), col(“age”).asc())
sort(col1, …, colN)Alias for orderBydf.sort(col(“city”).asc())
select(expr1, …, exprN)Selects specific columns or expressions for the output DataFramedf.select(“name”, “age”, col(“city”).alias(“town”))
groupBy(col1, …, colN)Groups rows by one or more columnsdf.groupBy(“city”).count()
agg(expr1, …, exprN)Aggregates data using built-in functions like avgsummax, etc.df.groupBy(“city”).agg(avg(“age”), max(“age”))
join(otherDF, joinExpr, joinType)Joins two DataFrames based on a join expression and typedf.join(otherDF, col(“id”) == otherDF.col(“id”), “inner”)
crossJoin(otherDF)Performs a Cartesian product join, joining every row of one DataFrame with every row of the otherdf.crossJoin(otherDF)
distinct()Removes duplicate rows from the DataFramedf.distinct()
dropDuplicates(subset)Removes duplicate rows based on a specified subset of columnsdf.dropDuplicates(subset=[“name”, “city”])
withColumn(colName, expr)Adds a new column to the DataFrame based on an expressiondf.withColumn(“fullName”, col(“name”) + ” ” + col(“lastName”))
drop(col1, …, colN)Drops specified columns from the DataFramedf.drop(“id”, “age”)
sample(withReplacement, fraction)Samples a fraction of rows from the DataFrame with or without replacementdf.sample(withReplacement=False, fraction=0.1)
limit(numRows)Limits the number of rows in the DataFramedf.limit(10)
map(func)Applies a user-defined function to each row of the DataFramedef myFunction(row): return row.age + 1 df.map(myFunction)
flatMap(func)Applies a user-defined function that produces a sequence of elements for each row of the DataFramedef myFunction(row): return [row.name, row.city] df.flatMap(myFunction)
guest
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x