Picture by Writer | Canva
On this tutorial, we are going to study Prefect, a contemporary workflow orchestration software. We’ll begin by constructing a knowledge pipeline with Pandas after which examine it with a Prefect workflow to realize a greater understanding. Ultimately, we are going to deploy our workflow and examine run logs on the dashboard.
What’s Prefect?
Prefect is a workflow administration system designed to orchestrate and handle complicated information workflows, together with machine studying (ML) pipelines. It supplies a framework for constructing, scheduling, and monitoring workflows, making it a necessary software for managing ML operations (MLOps).
Prefect provides job and stream administration, permitting customers to outline dependencies and execute workflows effectively. With options like state administration and observability, Prefect supplies insights into job standing and historical past, aiding debugging and optimization. It comes with a extremely interactive dashboard that allows you to schedule, monitor, and combine varied different options that may enhance your workflow for the MLOps pipeline. You possibly can even arrange notifications and combine different ML frameworks with just a few clicks.
Prefect is on the market as an open-source framework and a managed cloud service, simplifying your workflow much more.
Constructing Knowledge Pipeline with Pandas
We’ll replicate the information pipeline that I used within the earlier tutorials (Constructing Knowledge Science Pipelines Utilizing Pandas—KDnuggets) to present you an concept of how every job works within the pipeline and easy methods to mix them. I’m mentioning it right here so as to clearly examine how good information pipelines are completely different from regular pipelines.
import pandas as pd
def load_data(path):
return pd.read_csv(path)
def data_cleaning(information):
information = information.drop_duplicates()
information = information.dropna()
information = information.reset_index(drop=True)
return information
def convert_dtypes(information, types_dict=None):
information = information.astype(dtype=types_dict)
## convert the date column to datetime
information["Date"] = pd.to_datetime(information["Date"])
return information
def data_analysis(information):
information["month"] = information["Date"].dt.month
new_df = information.groupby("month")["Units Sold"].imply()
return new_df
def data_visualization(new_df, vis_type="bar"):
new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
return new_df
path = "Online Sales Data.csv"
df = (
pd.DataFrame()
.pipe(lambda x: load_data(path))
.pipe(data_cleaning)
.pipe(convert_dtypes, {"Product Category": "str", "Product Name": "str"})
.pipe(data_analysis)
.pipe(data_visualization, "line")
)
Once we run the above code, every job will run sequentially and generate the information visualization. Aside from that, it would not do something. We will schedule it, view the run logs, and even combine third celebration instruments for notification or monitoring.
Constructing Knowledge Pipeline with Prefect
Now we are going to construct the identical pipeline with the identical dataset On-line Gross sales Dataset – Common Market Knowledge however with Prefect. We’ll first set up the PRefect library through the use of the PIP command.
In the event you assessment the code under, you’ll discover that nothing has actually modified. The capabilities are the identical, however with the addition of the Python decorators. Every step within the pipeline has the `@job` decorator, and the pipeline combining these steps has the `@stream` decorator. Moreover, we’re saving the generated determine too.
import pandas as pd
import matplotlib.pyplot as plt
from prefect import job, stream
@job
def load_data(path):
return pd.read_csv(path)
@job
def data_cleaning(information):
information = information.drop_duplicates()
information = information.dropna()
information = information.reset_index(drop=True)
return information
@job
def convert_dtypes(information, types_dict=None):
information = information.astype(dtype=types_dict)
information["Date"] = pd.to_datetime(information["Date"])
return information
@job
def data_analysis(information):
information["month"] = information["Date"].dt.month
new_df = information.groupby("month")["Units Sold"].imply()
return new_df
@job
def data_visualization(new_df, vis_type="bar"):
new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
plt.savefig("average_units_sold_by_month.png")
return new_df
@stream(title="Data Pipeline")
def data_pipeline(path: str):
df = load_data(path)
df_cleaned = data_cleaning(df)
df_converted = convert_dtypes(
df_cleaned, {"Product Category": "str", "Product Name": "str"}
)
analysis_result = data_analysis(df_converted)
new_df = data_visualization(analysis_result, "line")
return new_df
# Run the stream!
if __name__ == "__main__":
new_df = data_pipeline("Online Sales Data.csv")
print(new_df)
We’ll run our information pipeline by offering the CSV file location. It is going to carry out all of the steps in sequence and generate logs with the run states.
14:18:48.649 | INFO | prefect.engine - Created stream run 'enlightened-dingo' for stream 'Knowledge Pipeline'
14:18:48.816 | INFO | Stream run 'enlightened-dingo' - Created job run 'load_data-0' for job 'load_data'
14:18:48.822 | INFO | Stream run 'enlightened-dingo' - Executing 'load_data-0' instantly...
14:18:48.990 | INFO | Process run 'load_data-0' - Completed in state Accomplished()
14:18:49.052 | INFO | Stream run 'enlightened-dingo' - Created job run 'data_cleaning-0' for job 'data_cleaning'
14:18:49.053 | INFO | Stream run 'enlightened-dingo' - Executing 'data_cleaning-0' instantly...
14:18:49.226 | INFO | Process run 'data_cleaning-0' - Completed in state Accomplished()
14:18:49.283 | INFO | Stream run 'enlightened-dingo' - Created job run 'convert_dtypes-0' for job 'convert_dtypes'
14:18:49.288 | INFO | Stream run 'enlightened-dingo' - Executing 'convert_dtypes-0' instantly...
14:18:49.441 | INFO | Process run 'convert_dtypes-0' - Completed in state Accomplished()
14:18:49.506 | INFO | Stream run 'enlightened-dingo' - Created job run 'data_analysis-0' for job 'data_analysis'
14:18:49.510 | INFO | Stream run 'enlightened-dingo' - Executing 'data_analysis-0' instantly...
14:18:49.684 | INFO | Process run 'data_analysis-0' - Completed in state Accomplished()
14:18:49.753 | INFO | Stream run 'enlightened-dingo' - Created job run 'data_visualization-0' for job 'data_visualization'
14:18:49.760 | INFO | Stream run 'enlightened-dingo' - Executing 'data_visualization-0' instantly...
14:18:50.087 | INFO | Process run 'data_visualization-0' - Completed in state Accomplished()
14:18:50.144 | INFO | Stream run 'enlightened-dingo' - Completed in state Accomplished()
Ultimately, you’ll get the remodeled information body and visualizations.
Deploying the Prefect Pipeline
With a purpose to deploy the Prefect pipeline, we have to begin by shifting our codebase to the Python file `data_pipe.py`. After that, we are going to modify how we run our pipeline. We’ll use the `.server` operate to deploy the pipeline and cross the CSV file as an argument to the operate.
data_pipe.py:
import pandas as pd
import matplotlib.pyplot as plt
from prefect import job, stream
@job
def load_data(path: str) -> pd.DataFrame:
return pd.read_csv(path)
@job
def data_cleaning(information: pd.DataFrame) -> pd.DataFrame:
information = information.drop_duplicates()
information = information.dropna()
information = information.reset_index(drop=True)
return information
@job
def convert_dtypes(information: pd.DataFrame, types_dict: dict = None) -> pd.DataFrame:
information = information.astype(dtype=types_dict)
information["Date"] = pd.to_datetime(information["Date"])
return information
@job
def data_analysis(information: pd.DataFrame) -> pd.DataFrame:
information["month"] = information["Date"].dt.month
new_df = information.groupby("month")["Units Sold"].imply()
return new_df
@job
def data_visualization(new_df: pd.DataFrame, vis_type: str = "bar") -> pd.DataFrame:
new_df.plot(form=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
plt.savefig("average_units_sold_by_month.png")
return new_df
@job
def save_to_csv(df: pd.DataFrame, filename: str):
df.to_csv(filename, index=False)
return filename
@stream(title="Data Pipeline")
def run_pipeline(path: str):
df = load_data(path)
df_cleaned = data_cleaning(df)
df_converted = convert_dtypes(
df_cleaned, {"Product Category": "str", "Product Name": "str"}
)
analysis_result = data_analysis(df_converted)
data_visualization(analysis_result, "line")
save_to_csv(analysis_result, "average_units_sold_by_month.csv")
# Run the stream
if __name__ == "__main__":
run_pipeline.serve(
title="pass-params-deployment",
parameters=dict(path="Online Sales Data.csv"),
)
Once we run the Python file, we are going to obtain the message saying that to run the deployed pipeline, we’ve to make use of the next command:
Launch a brand new Terminal window and kind the command to set off the run for this stream.
$ prefect deployment run 'Knowledge Pipeline/pass-params-deployment'
As we will see, stream runs have initiated, which means the pipeline is operating within the background. We will all the time return to the primary Terminal window to view the logs.
To view the logs within the dashboard, we’ve to launch the Prefect dashboard by typing the next command:
Click on on the dashboard hyperlink to launch the dashboard in your internet browser.
The dashboard consists of assorted tabs and data associated to your pipeline, workflow, and runs. To view the present run, navigate to the “Flow Runs” tab and choose the latest stream run.
All of the supply code, information, and data can be found on the Kingabzpro/Knowledge-Pipeline-with-Prefect GitHub repository. Please do not forget to star ⭐ it.
Conclusion
Constructing a pipeline utilizing the right instruments is critical so that you can scale your information workflow and keep away from pointless hiccups. Through the use of Prefect, you possibly can schedule your runs, debug the pipeline, and combine it with a number of third-party instruments that you’re already utilizing. It’s simple to make use of and comes with tons of options that you’ll love. If you’re new to Prefect, I extremely suggest testing Prefect Cloud. They provide free hours for customers to expertise the cloud platform and turn out to be accustomed to the workflow administration system.
Abid Ali Awan (@1abidaliawan) is a licensed information scientist skilled who loves constructing machine studying fashions. Presently, he’s specializing in content material creation and writing technical blogs on machine studying and information science applied sciences. Abid holds a Grasp’s diploma in know-how administration and a bachelor’s diploma in telecommunication engineering. His imaginative and prescient is to construct an AI product utilizing a graph neural community for college students fighting psychological sickness.