Constructing Knowledge Pipeline with Prefect

Date:

Share post:


Picture by Writer | Canva

 

On this tutorial, we are going to study Prefect, a contemporary workflow orchestration software. We’ll begin by constructing a knowledge pipeline with Pandas after which examine it with a Prefect workflow to realize a greater understanding. Ultimately, we are going to deploy our workflow and examine run logs on the dashboard.

 

What’s Prefect?

 

Prefect is a workflow administration system designed to orchestrate and handle complicated information workflows, together with machine studying (ML) pipelines. It supplies a framework for constructing, scheduling, and monitoring workflows, making it a necessary software for managing ML operations (MLOps).

Prefect provides job and stream administration, permitting customers to outline dependencies and execute workflows effectively. With options like state administration and observability, Prefect supplies insights into job standing and historical past, aiding debugging and optimization. It comes with a extremely interactive dashboard that allows you to schedule, monitor, and combine varied different options that may enhance your workflow for the MLOps pipeline. You possibly can even arrange notifications and combine different ML frameworks with just a few clicks. 

Prefect is on the market as an open-source framework and a managed cloud service, simplifying your workflow much more.

 

Constructing Knowledge Pipeline with Pandas

 

We’ll replicate the information pipeline that I used within the earlier tutorials (Constructing Knowledge Science Pipelines Utilizing Pandas—KDnuggets) to present you an concept of how every job works within the pipeline and easy methods to mix them. I’m mentioning it right here so as to clearly examine how good information pipelines are completely different from regular pipelines.

import pandas as pd

def load_data(path):
    return pd.read_csv(path)

def data_cleaning(information):
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

def convert_dtypes(information, types_dict=None):
    information = information.astype(dtype=types_dict)
    ## convert the date column to datetime
    information["Date"] = pd.to_datetime(information["Date"])
    return information

def data_analysis(information):
    information["month"] = information["Date"].dt.month
    new_df = information.groupby("month")["Units Sold"].imply()
    return new_df

def data_visualization(new_df, vis_type="bar"):
    new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
    return new_df

path = "Online Sales Data.csv"
df = (
    pd.DataFrame()
    .pipe(lambda x: load_data(path))
    .pipe(data_cleaning)
    .pipe(convert_dtypes, {"Product Category": "str", "Product Name": "str"})
    .pipe(data_analysis)
    .pipe(data_visualization, "line")
)

 

Once we run the above code, every job will run sequentially and generate the information visualization. Aside from that, it would not do something. We will schedule it, view the run logs, and even combine third celebration instruments for notification or monitoring. 

 

Building Data Pipeline with Prefect

 

Constructing Knowledge Pipeline with Prefect

 

Now we are going to construct the identical pipeline with the identical dataset On-line Gross sales Dataset – Common Market Knowledge however with Prefect. We’ll first set up the PRefect library through the use of the PIP command. 

 

In the event you assessment the code under, you’ll discover that nothing has actually modified. The capabilities are the identical, however with the addition of the Python decorators. Every step within the pipeline has the `@job` decorator, and the pipeline combining these steps has the `@stream` decorator. Moreover, we’re saving the generated determine too. 

import pandas as pd
import matplotlib.pyplot as plt
from prefect import job, stream

@job
def load_data(path):
    return pd.read_csv(path)

@job
def data_cleaning(information):
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

@job
def convert_dtypes(information, types_dict=None):
    information = information.astype(dtype=types_dict)
    information["Date"] = pd.to_datetime(information["Date"])
    return information

@job
def data_analysis(information):
    information["month"] = information["Date"].dt.month
    new_df = information.groupby("month")["Units Sold"].imply()
    return new_df

@job
def data_visualization(new_df, vis_type="bar"):

    new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
    plt.savefig("average_units_sold_by_month.png")
    return new_df

@stream(title="Data Pipeline")
def data_pipeline(path: str):
    df = load_data(path)
    df_cleaned = data_cleaning(df)
    df_converted = convert_dtypes(
        df_cleaned, {"Product Category": "str", "Product Name": "str"}
    )
    analysis_result = data_analysis(df_converted)
    new_df = data_visualization(analysis_result, "line")
    return new_df

# Run the stream!
if __name__ == "__main__":
    new_df = data_pipeline("Online Sales Data.csv")
    print(new_df)

 

We’ll run our information pipeline by offering the CSV file location. It is going to carry out all of the steps in sequence and generate logs with the run states. 

14:18:48.649 | INFO    | prefect.engine - Created stream run 'enlightened-dingo' for stream 'Knowledge Pipeline'
14:18:48.816 | INFO    | Stream run 'enlightened-dingo' - Created job run 'load_data-0' for job 'load_data'
14:18:48.822 | INFO    | Stream run 'enlightened-dingo' - Executing 'load_data-0' instantly...
14:18:48.990 | INFO    | Process run 'load_data-0' - Completed in state Accomplished()
14:18:49.052 | INFO    | Stream run 'enlightened-dingo' - Created job run 'data_cleaning-0' for job 'data_cleaning'
14:18:49.053 | INFO    | Stream run 'enlightened-dingo' - Executing 'data_cleaning-0' instantly...
14:18:49.226 | INFO    | Process run 'data_cleaning-0' - Completed in state Accomplished()
14:18:49.283 | INFO    | Stream run 'enlightened-dingo' - Created job run 'convert_dtypes-0' for job 'convert_dtypes'
14:18:49.288 | INFO    | Stream run 'enlightened-dingo' - Executing 'convert_dtypes-0' instantly...
14:18:49.441 | INFO    | Process run 'convert_dtypes-0' - Completed in state Accomplished()
14:18:49.506 | INFO    | Stream run 'enlightened-dingo' - Created job run 'data_analysis-0' for job 'data_analysis'
14:18:49.510 | INFO    | Stream run 'enlightened-dingo' - Executing 'data_analysis-0' instantly...
14:18:49.684 | INFO    | Process run 'data_analysis-0' - Completed in state Accomplished()
14:18:49.753 | INFO    | Stream run 'enlightened-dingo' - Created job run 'data_visualization-0' for job 'data_visualization'
14:18:49.760 | INFO    | Stream run 'enlightened-dingo' - Executing 'data_visualization-0' instantly...
14:18:50.087 | INFO    | Process run 'data_visualization-0' - Completed in state Accomplished()
14:18:50.144 | INFO    | Stream run 'enlightened-dingo' - Completed in state Accomplished()

 

Ultimately, you’ll get the remodeled information body and visualizations. 

 

Building Data Pipeline with Prefect

 

Deploying the Prefect Pipeline

 

With a purpose to deploy the Prefect pipeline, we have to begin by shifting our codebase to the Python file `data_pipe.py`. After that, we are going to modify how we run our pipeline. We’ll use the `.server` operate to deploy the pipeline and cross the CSV file as an argument to the operate.

data_pipe.py:

import pandas as pd
import matplotlib.pyplot as plt
from prefect import job, stream

@job
def load_data(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

@job
def data_cleaning(information: pd.DataFrame) -> pd.DataFrame:
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

@job
def convert_dtypes(information: pd.DataFrame, types_dict: dict = None) -> pd.DataFrame:
    information = information.astype(dtype=types_dict)
    information["Date"] = pd.to_datetime(information["Date"])
    return information

@job
def data_analysis(information: pd.DataFrame) -> pd.DataFrame:
    information["month"] = information["Date"].dt.month
    new_df = information.groupby("month")["Units Sold"].imply()
    return new_df

@job
def data_visualization(new_df: pd.DataFrame, vis_type: str = "bar") -> pd.DataFrame:
    new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
    plt.savefig("average_units_sold_by_month.png")
    return new_df

@job
def save_to_csv(df: pd.DataFrame, filename: str):
    df.to_csv(filename, index=False)
    return filename

@stream(title="Data Pipeline")
def run_pipeline(path: str):
    df = load_data(path)
    df_cleaned = data_cleaning(df)
    df_converted = convert_dtypes(
        df_cleaned, {"Product Category": "str", "Product Name": "str"}
    )
    analysis_result = data_analysis(df_converted)
    data_visualization(analysis_result, "line")
    save_to_csv(analysis_result, "average_units_sold_by_month.csv")

# Run the stream
if __name__ == "__main__":
    run_pipeline.serve(
        title="pass-params-deployment",
        parameters=dict(path="Online Sales Data.csv"),
    )

 

 

Once we run the Python file, we are going to obtain the message saying that to run the deployed pipeline, we’ve to make use of the next command: 

 

Building Data Pipeline with Prefect

 

Launch a brand new Terminal window and kind the command to set off the run for this stream. 

$ prefect deployment run 'Knowledge Pipeline/pass-params-deployment'

 

As we will see, stream runs have initiated, which means the pipeline is operating within the background. We will all the time return to the primary Terminal window to view the logs.

 

Building Data Pipeline with Prefect

 

To view the logs within the dashboard, we’ve to launch the Prefect dashboard by typing the next command: 

 

Click on on the dashboard hyperlink to launch the dashboard in your internet browser. 

 

Building Data Pipeline with Prefect

 

The dashboard consists of assorted tabs and data associated to your pipeline, workflow, and runs. To view the present run, navigate to the “Flow Runs” tab and choose the latest stream run.

 

Building Data Pipeline with Prefect

 

All of the supply code, information, and data can be found on the Kingabzpro/Knowledge-Pipeline-with-Prefect GitHub repository. Please do not forget to star ⭐ it.

 

Conclusion

 

Constructing a pipeline utilizing the right instruments is critical so that you can scale your information workflow and keep away from pointless hiccups. Through the use of Prefect, you possibly can schedule your runs, debug the pipeline, and combine it with a number of third-party instruments that you’re already utilizing. It’s simple to make use of and comes with tons of options that you’ll love. If you’re new to Prefect, I extremely suggest testing Prefect Cloud. They provide free hours for customers to expertise the cloud platform and turn out to be accustomed to the workflow administration system.
 
 

Abid Ali Awan (@1abidaliawan) is a licensed information scientist skilled who loves constructing machine studying fashions. Presently, he’s specializing in content material creation and writing technical blogs on machine studying and information science applied sciences. Abid holds a Grasp’s diploma in know-how administration and a bachelor’s diploma in telecommunication engineering. His imaginative and prescient is to construct an AI product utilizing a graph neural community for college students fighting psychological sickness.

Related articles

Important AI Options You Have to Know

Google’s newest Synthetic Intelligence (AI) mannequin, Gemini 2, has launched a collection of latest options that considerably increase...

10 Finest AI Instruments for Retail Administration (December 2024)

AI retail instruments have moved far past easy automation and information crunching. At present's platforms dive deep into...

A Private Take On Pc Imaginative and prescient Literature Traits in 2024

I have been repeatedly following the pc imaginative and prescient (CV) and picture synthesis analysis scene at Arxiv...

10 Greatest AI Veterinary Instruments (December 2024)

The veterinary area is present process a change by means of AI-powered instruments that improve all the pieces...