Constructing Knowledge Pipeline with Prefect – KDnuggets


Picture by Creator | Canva

 

On this tutorial, we are going to study Prefect, a contemporary workflow orchestration software. We are going to begin by constructing an information pipeline with Pandas after which examine it with a Prefect workflow to achieve a greater understanding. In the long run, we are going to deploy our workflow and look at run logs on the dashboard.

 

What’s Prefect?

 

Prefect is a workflow administration system designed to orchestrate and handle complicated information workflows, together with machine studying (ML) pipelines. It offers a framework for constructing, scheduling, and monitoring workflows, making it a vital software for managing ML operations (MLOps).

Prefect presents activity and movement administration, permitting customers to outline dependencies and execute workflows effectively. With options like state administration and observability, Prefect offers insights into activity standing and historical past, aiding debugging and optimization. It comes with a extremely interactive dashboard that permits you to schedule, monitor, and combine varied different options that can enhance your workflow for the MLOps pipeline. You may even arrange notifications and combine different ML frameworks with a couple of clicks. 

Prefect is obtainable as an open-source framework and a managed cloud service, simplifying your workflow much more.

 

Constructing Knowledge Pipeline with Pandas

 

We are going to replicate the info pipeline that I used within the earlier tutorials (Constructing Knowledge Science Pipelines Utilizing Pandas—KDnuggets) to provide you an concept of how every activity works within the pipeline and the way to mix them. I’m mentioning it right here so that you could clearly examine how excellent information pipelines are completely different from regular pipelines.

import pandas as pd

def load_data(path):
    return pd.read_csv(path)

def data_cleaning(information):
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

def convert_dtypes(information, types_dict=None):
    information = information.astype(dtype=types_dict)
    ## convert the date column to datetime
    information["Date"] = pd.to_datetime(information["Date"])
    return information

def data_analysis(information):
    information["month"] = information["Date"].dt.month
    new_df = information.groupby("month")["Units Sold"].imply()
    return new_df

def data_visualization(new_df, vis_type="bar"):
    new_df.plot(type=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
    return new_df

path = "Online Sales Data.csv"
df = (
    pd.DataFrame()
    .pipe(lambda x: load_data(path))
    .pipe(data_cleaning)
    .pipe(convert_dtypes, {"Product Category": "str", "Product Name": "str"})
    .pipe(data_analysis)
    .pipe(data_visualization, "line")
)

 

After we run the above code, every activity will run sequentially and generate the info visualization. Aside from that, it does not do something. We will schedule it, view the run logs, and even combine third celebration instruments for notification or monitoring. 

 

Building Data Pipeline with Prefect

 

Constructing Knowledge Pipeline with Prefect

 

Now we are going to construct the identical pipeline with the identical dataset On-line Gross sales Dataset – Widespread Market Knowledge however with Prefect. We are going to first set up the PRefect library by utilizing the PIP command. 

 

If you happen to evaluate the code beneath, you’ll discover that nothing has actually modified. The capabilities are the identical, however with the addition of the Python decorators. Every step within the pipeline has the `@activity` decorator, and the pipeline combining these steps has the `@movement` decorator. Moreover, we’re saving the generated determine too. 

import pandas as pd
import matplotlib.pyplot as plt
from prefect import activity, movement

@activity
def load_data(path):
    return pd.read_csv(path)

@activity
def data_cleaning(information):
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

@activity
def convert_dtypes(information, types_dict=None):
    information = information.astype(dtype=types_dict)
    information["Date"] = pd.to_datetime(information["Date"])
    return information

@activity
def data_analysis(information):
    information["month"] = information["Date"].dt.month
    new_df = information.groupby("month")["Units Sold"].imply()
    return new_df

@activity
def data_visualization(new_df, vis_type="bar"):

    new_df.plot(type=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
    plt.savefig("average_units_sold_by_month.png")
    return new_df

@movement(identify="Data Pipeline")
def data_pipeline(path: str):
    df = load_data(path)
    df_cleaned = data_cleaning(df)
    df_converted = convert_dtypes(
        df_cleaned, {"Product Category": "str", "Product Name": "str"}
    )
    analysis_result = data_analysis(df_converted)
    new_df = data_visualization(analysis_result, "line")
    return new_df

# Run the movement!
if __name__ == "__main__":
    new_df = data_pipeline("Online Sales Data.csv")
    print(new_df)

 

We are going to run our information pipeline by offering the CSV file location. It’s going to carry out all of the steps in sequence and generate logs with the run states. 

14:18:48.649 | INFO    | prefect.engine - Created movement run 'enlightened-dingo' for movement 'Knowledge Pipeline'
14:18:48.816 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'load_data-0' for activity 'load_data'
14:18:48.822 | INFO    | Movement run 'enlightened-dingo' - Executing 'load_data-0' instantly...
14:18:48.990 | INFO    | Activity run 'load_data-0' - Completed in state Accomplished()
14:18:49.052 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'data_cleaning-0' for activity 'data_cleaning'
14:18:49.053 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_cleaning-0' instantly...
14:18:49.226 | INFO    | Activity run 'data_cleaning-0' - Completed in state Accomplished()
14:18:49.283 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'convert_dtypes-0' for activity 'convert_dtypes'
14:18:49.288 | INFO    | Movement run 'enlightened-dingo' - Executing 'convert_dtypes-0' instantly...
14:18:49.441 | INFO    | Activity run 'convert_dtypes-0' - Completed in state Accomplished()
14:18:49.506 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'data_analysis-0' for activity 'data_analysis'
14:18:49.510 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_analysis-0' instantly...
14:18:49.684 | INFO    | Activity run 'data_analysis-0' - Completed in state Accomplished()
14:18:49.753 | INFO    | Movement run 'enlightened-dingo' - Created activity run 'data_visualization-0' for activity 'data_visualization'
14:18:49.760 | INFO    | Movement run 'enlightened-dingo' - Executing 'data_visualization-0' instantly...
14:18:50.087 | INFO    | Activity run 'data_visualization-0' - Completed in state Accomplished()
14:18:50.144 | INFO    | Movement run 'enlightened-dingo' - Completed in state Accomplished()

 

In the long run, you’re going to get the remodeled information body and visualizations. 

 

Building Data Pipeline with Prefect

 

Deploying the Prefect Pipeline

 

With a purpose to deploy the Prefect pipeline, we have to begin by transferring our codebase to the Python file `data_pipe.py`. After that, we are going to modify how we run our pipeline. We are going to use the `.server` operate to deploy the pipeline and move the CSV file as an argument to the operate.

data_pipe.py:

import pandas as pd
import matplotlib.pyplot as plt
from prefect import activity, movement

@activity
def load_data(path: str) -> pd.DataFrame:
    return pd.read_csv(path)

@activity
def data_cleaning(information: pd.DataFrame) -> pd.DataFrame:
    information = information.drop_duplicates()
    information = information.dropna()
    information = information.reset_index(drop=True)
    return information

@activity
def convert_dtypes(information: pd.DataFrame, types_dict: dict = None) -> pd.DataFrame:
    information = information.astype(dtype=types_dict)
    information["Date"] = pd.to_datetime(information["Date"])
    return information

@activity
def data_analysis(information: pd.DataFrame) -> pd.DataFrame:
    information["month"] = information["Date"].dt.month
    new_df = information.groupby("month")["Units Sold"].imply()
    return new_df

@activity
def data_visualization(new_df: pd.DataFrame, vis_type: str = "bar") -> pd.DataFrame:
    new_df.plot(type=vis_type, figsize=(10, 5), title="Average Units Sold by Month")
    plt.savefig("average_units_sold_by_month.png")
    return new_df

@activity
def save_to_csv(df: pd.DataFrame, filename: str):
    df.to_csv(filename, index=False)
    return filename

@movement(identify="Data Pipeline")
def run_pipeline(path: str):
    df = load_data(path)
    df_cleaned = data_cleaning(df)
    df_converted = convert_dtypes(
        df_cleaned, {"Product Category": "str", "Product Name": "str"}
    )
    analysis_result = data_analysis(df_converted)
    data_visualization(analysis_result, "line")
    save_to_csv(analysis_result, "average_units_sold_by_month.csv")

# Run the movement
if __name__ == "__main__":
    run_pipeline.serve(
        identify="pass-params-deployment",
        parameters=dict(path="Online Sales Data.csv"),
    )

 

 

After we run the Python file, we are going to obtain the message saying that to run the deployed pipeline, we now have to make use of the next command: 

 

Building Data Pipeline with Prefect

 

Launch a brand new Terminal window and kind the command to set off the run for this movement. 

$ prefect deployment run 'Knowledge Pipeline/pass-params-deployment'

 

As we will see, movement runs have initiated, which means the pipeline is working within the background. We will all the time return to the primary Terminal window to view the logs.

 

Building Data Pipeline with Prefect

 

To view the logs within the dashboard, we now have to launch the Prefect dashboard by typing the next command: 

 

Click on on the dashboard hyperlink to launch the dashboard in your internet browser. 

 

Building Data Pipeline with Prefect

 

The dashboard consists of varied tabs and data associated to your pipeline, workflow, and runs. To view the present run, navigate to the “Flow Runs” tab and choose the newest movement run.

 

Building Data Pipeline with Prefect

 

All of the supply code, information, and data can be found on the Kingabzpro/Knowledge-Pipeline-with-Prefect GitHub repository. Please do not forget to star ⭐ it.

 

Conclusion

 

Constructing a pipeline utilizing the correct instruments is critical so that you can scale your information workflow and keep away from pointless hiccups. Through the use of Prefect, you may schedule your runs, debug the pipeline, and combine it with a number of third-party instruments that you’re already utilizing. It’s simple to make use of and comes with tons of options that you’ll love. If you’re new to Prefect, I extremely advocate testing Prefect Cloud. They provide free hours for customers to expertise the cloud platform and develop into accustomed to the workflow administration system.
 
 

Abid Ali Awan (@1abidaliawan) is a licensed information scientist skilled who loves constructing machine studying fashions. Presently, he’s specializing in content material creation and writing technical blogs on machine studying and information science applied sciences. Abid holds a Grasp’s diploma in know-how administration and a bachelor’s diploma in telecommunication engineering. His imaginative and prescient is to construct an AI product utilizing a graph neural community for college kids scuffling with psychological sickness.

Recent articles

Hackers Use Microsoft MSC Information to Deploy Obfuscated Backdoor in Pakistan Assaults

Dec 17, 2024Ravie LakshmananCyber Assault / Malware A brand new...

INTERPOL Pushes for

Dec 18, 2024Ravie LakshmananCyber Fraud / Social engineering INTERPOL is...

Patch Alert: Essential Apache Struts Flaw Discovered, Exploitation Makes an attempt Detected

Dec 18, 2024Ravie LakshmananCyber Assault / Vulnerability Risk actors are...