Transforming Data with Databricks PySpark

  

Azure Databricks is a collaborative, notebook-style platform that simplifies big data tasks with Apache Spark. Using PySpark, you can easily manipulate large datasets for ETL processes. The notebooks allow you to switch between SQL for quick queries and Python for flexible coding, all while visualizing your data with built-in charts. This integration makes for an efficient and interactive data workflow.

In this blog, we’ll dive into key PySpark functionalities essential for effective ETL processes. We’ll start with how to read Delta tables, which provide efficient data storage and retrieval. Next, we’ll explore data profiling to help you understand your dataset better. We’ll then cover filtering techniques to refine your data, address missing values to ensure data integrity, and handle duplicates to maintain clean datasets. After that, we’ll discuss grouping data for meaningful insights and finish with pivoting to reshape your data for better analysis. Finally, we’ll conclude by demonstrating how to create a new Databricks Delta table for efficient data management.

To kick things off, let’s look at how to read data from Hive Metastore tables using PySpark. This is a crucial first step in your ETL process, allowing you to load and manipulate your datasets easily. Here’s a simple line of code to get started:

df = spark.table("your_database.your_table")

Caching: You can cache the DataFrame after reading it to improve performance for subsequent actions:

df = spark.table("your_database.your_table").cache()

Selecting Specific Columns: To load only specific columns, you can chain the select() method:

df = spark.table("your_database.your_table").select("column1", "column2")

Filtering Rows: You can also filter rows immediately after reading the table:

df = spark.table("your_database.your_table").filter(df.column1 > 100)

Using SQL Queries: Instead of directly using spark.table(), you can create a temporary view and run SQL queries:

spark.sql("CREATE OR REPLACE TEMP VIEW temp_view AS
SELECT * FROM your_database.your_table")
df = spark.sql("SELECT column1, column2 FROM temp_view WHERE column1 > 100")

These customizations can help tailor the data loading process to meet your specific needs.

After successfully reading your Hive Metastore table, the next critical step in the ETL process is data profiling. Data profiling allows you to gain a deeper understanding of your dataset, uncovering insights about its structure, quality, and content. By examining aspects such as data types, null values, unique counts, and distribution, you can identify potential issues and make informed decisions on how to handle them. Let’s explore the key techniques and tools in PySpark that will help you effectively profile your data.

To get the total number of rows in the DataFrame, you can use the count() method:

row_count = df.count()

To check if there are duplicates in your DataFrame, use this custom function which prints out the results 

from pyspark.sql import functions as F

def check_duplicate_rows(df, cols):

    """
    To check if there are duplicates: Count the number of distinct rows in a subset and compare it with the number of total rows.
    If they're the same, there are no duplicates in the selected subset. Otherwise, duplicates exist.

    # Call the function
    cols_to_check = ['col1', 'col2', 'col3']  # Replace with the actual column names
    check_duplicate_rows(df, cols_to_check)

    """

    counts_df = df.groupBy(cols).count().filter(F.col('count') > 1)

    if counts_df.count() == 0:
        print("Total Row {} and Unique Row {} are equal. Therefore, there are no duplicate rows for the given
subset".format(df.count(), df.distinct().count()))
    else:
        print("Total Row {} and Unique Row {} are not equal. Therefore, there are {} duplicate rows for the given
subset \nDuplicate rows below:".format(df.count(), df.distinct().count(), counts_df.count()))
        display(counts_df)


# Example usage:
# cols_to_check = ['col1', 'col2', 'col3']  # Replace with the actual column names
# check_duplicate_rows(df, cols_to_check)


To check how many null values in any subset of columns you specify, as well as what is the percentage of nulls in total row count, use this custom function 


from pyspark.sql.functions import col, round, sum as spark_sum

def null_check(df, columns):
    """
    Count the number of null values in the specified columns of a PySpark DataFrame.

    Parameters:
    - df: PySpark DataFrame
        The DataFrame to check for null values.
    - columns: list of str
        A list of column names to check for null values.

    Returns:
    - PySpark DataFrame
        A DataFrame with three columns: "column_name", "null_count", and "null_percent".

    Call Example:
      cols = test_df.columns
      result_df = null_check(test_df, cols)
      result_df.show()

    """
    row_count = df.count()

    null_counts = []
    for column in columns:
        null_count = df.select(spark_sum(col(column).isNull().cast("int")).alias(column)).collect()[0][column]
        null_percent = null_count / row_count
        column_info = (column, null_count, null_percent)
        null_counts.append(column_info)

    # Create a PySpark DataFrame to hold the results
    result_df = spark.createDataFrame(null_counts, ["column_name", "null_count", "null_percent"])

    # Round the null_percent column
    result_df = result_df.withColumn("null_percent", round(col("null_percent"), 2))

    return result_df

To visualize the distribution of a specific column, you can create a histogram using Plotly Express. First, ensure you have the required libraries installed:

pip install plotly

Then, convert your DataFrame to a Pandas DataFrame and create the histogram:

import plotly.express as px

# Convert to Pandas DataFrame
pandas_df = df.toPandas()

# Create a histogram for a specific column (e.g., "column_name")
fig = px.histogram(pandas_df, x="column_name", title="Histogram of Column Name")
fig.show()

By obtaining the row count, null counts, and duplicate counts, along with visualizing your data through a histogram, you can gain valuable insights into your dataset, helping you make informed decisions for the next steps in your ETL process.


After profiling your data, the next step in the ETL process is filtering, which allows you to refine your dataset by keeping only the relevant rows. Filtering is crucial for data quality and ensuring that your analysis focuses on the most pertinent information.

In PySpark, you can filter data using the filter() or where() methods. Both methods work similarly, allowing you to specify conditions for selecting rows.

Basic Filtering Example

Here’s a simple example of filtering rows where a specific column meets a certain condition. For instance, to filter rows where the value in the "age" column is greater than 30:

filtered_df = df.filter(df.age > 30)

Alternatively, you can use SQL-like syntax for filtering:

filtered_df = df.filter('age > 30')

You can also use the where() method in a similar way:

filtered_df = df.where(df.age > 30)

You can filter using multiple conditions with logical operators like & (and) and | (or). For example, to filter for rows where "age" is greater than 30 and "salary" is less than 60,000:

filtered_df = df.filter((df.age > 30) & (df.salary < 60000))
filtered_df = df.filter('age > 30 AND salary < 60000')

Filtering your dataset is an essential part of the ETL process that allows you to focus on the relevant data for analysis. Using the filter() or where() methods in PySpark makes it easy to refine your DataFrame based on specific conditions, whether using method chaining or SQL-like syntax, helping you prepare for the next steps in your data journey.

Once you've filtered your dataset, the next crucial task is to handle any null values that may still be present. Nulls can significantly impact your analysis, so it's vital to address them appropriately. Let's explore some effective techniques for managing null values in PySpark, including filtering them out, backfilling, forward filling, and imputing with the mean.

You can easily filter out rows with null values in a specific column:

cleaned_df = df.filter(df.column_name.isNotNull())

To backfill null values in a column, you can use the fillna() method with a specific value:

(Backfilling replaces null values with the next valid observation in the column)

backfilled_df = df.fillna(method='backfill')

To forward fill null values, use:

(forward filling fills nulls with the most recent valid observation before the null.)

forward_filled_df = df.fillna(method='ffill')

 To replace null values with the mean of a specific column, first calculate the mean and then use fillna():

mean_value = df.select(mean(df.column_name)).first()[0]
imputed_df = df.fillna(mean_value, subset=['column_name'])

After handling null values, another important step in the ETL process is to address duplicates within your dataset. Duplicates can skew your analysis and lead to inaccurate results, making it essential to identify and remove them. In PySpark, you can easily drop duplicate rows from your DataFrame to ensure data integrity.

To remove duplicate rows from your DataFrame, you can use the dropDuplicates() method. This method removes all rows that have the same values across all specified columns. Here’s how to do it:

To drop duplicates across all columns, simply call:
cleaned_df = df.dropDuplicates()

If you want to drop duplicates based on specific columns, you can pass those column names as a list:
cleaned_df = df.dropDuplicates(['column1', 'column2'])

After ensuring your dataset is clean and free of duplicates, the next steps involve grouping and reshaping your data for deeper analysis. Grouping data allows you to aggregate values based on specific categories, while pivoting provides a way to reorganize your data for better readability and insights. Let’s explore basic examples of both techniques using PySpark.

To group your data and perform aggregate functions, you can use the groupBy() method. This method enables you to summarize your dataset based on one or more columns. Here’s a basic example:

Suppose you want to calculate the average salary by department. You can achieve this as follows:

average_salary_df = df.groupBy("department").agg(avg("salary").alias("average_salary"))

Once you have grouped your data, you may find that certain aggregations could be more insightful if presented in a different format. This is where pivoting comes into play. Pivoting allows you to transform unique values from one column into multiple columns in the resulting DataFrame, providing a clearer view of your data relationships.

For instance, if you want to see the total sales by product category and region, you can do this:

pivot_df = df.groupBy("region").pivot("product_category").agg(sum("sales"))


Grouping and pivoting are powerful techniques that enable you to gain valuable insights from your data. By using the groupBy() method for aggregation and pivot() for reshaping your DataFrame, you can enhance your analysis and make data-driven decisions with greater clarity.

After processing your data, saving it as a Delta table is a crucial step for efficient storage and retrieval. Delta tables provide ACID transactions, scalable metadata handling, and unifies batch and streaming data processing.

To write a DataFrame as a Delta table in PySpark, you can use the built-in write method. Here's how to do it:

df.write.format("delta") \
    .mode("append") \  # Use "overwrite" to replace existing data
    .option("overwriteSchema", "true") \
    .partitionBy("partition_column") \  # Specify your partition column
    .saveAsTable("schema.table_name")  # Provide schema and table name


Key Options

  • format("delta"): Specifies that the data is stored in Delta format.
  • mode("append"): Determines the write mode; use "overwrite" to replace existing data.
  • option("overwriteSchema", "true"): Allows updates to the schema if it changes.
  • partitionBy("partition_column"): Specifies the column for partitioning the table.
  • saveAsTable("schema.table_name"): Saves the DataFrame as a table in the given schema

In this blog, we explored essential PySpark functionalities for effective ETL processes, including reading Delta tables, performing data profiling, filtering, handling null values, removing duplicates, and leveraging grouping and pivoting techniques. Finally, we discussed how to write your processed data as a Delta table for efficient management.

If you found this blog helpful, please like, share, and leave your comments below! We’d love to hear your thoughts and experiences with PySpark and Delta tables.


Share:

No comments:

Post a Comment

We'd like to hear your comments!

Recent Posts