Creating a Pipeline with LineaPy¶
Table of Contents
Intro¶
Scenario¶
As notebooks become more mature, they may get used like pipelines. For instance, notebooks that do the following things:
- Process latest data and update dashboards/graphs/charts/reports
- Preprocess new data and dump it to the filesystem or database for downstream model development
- Create updated versions of models using latest data
- Run inference on new data using standard model
may be expected to be re-executed as a pipeline on a regular or ad-hoc basis.
However, you may not have the proper engineering support to set up these pipelines for production.
What might happen next?¶
If this is the case, any of the following may happen:
- You spend a lot of time copying and pasting and cleaning your code to make your pipeline work in orchestration systems or job schedulers (cron, Apache Airflow, Prefect, etc.).
- It takes forever to make your pipeline work and you end up running your pipelines manually whenever you need to or get asked to.
- Your runtime environment is changed due to other experiments you are running and your pipeline is no longer working.
- You make some changes to your pipeline and now you find out you need to restart the above processes again.
As more notebooks and pipelines need to be maintained, data scientists will need to spend more time on the operation side to make sure every notebook is running at the right time successfully. This operational burden will consume a lot of time from the data scientist, which decreases productivity and innovation.
How can LineaPy help here?¶
LineaPy can help to set up and maintain pipelines with minimal effort.
import lineapy
........................
.
. your original notebook
.
........................
lineapy.save(object, 'artifact name')
lineapy.to_pipeline(['artifact name', 'other artifact name', ... ], framework, pipeline_name, output_dir)
With just these three lines of code, LineaPy enables data scientists to produce runnable pipelines. For some orchestration systems, like Apache Airflow, LineaPy is even able to upload the runnable pipeline without any manual interaction.
The rest of this notebook will use a concrete example to walk you through how this can be done.
Demo¶
Info
If you encounter issues you cannot resolve, simply ask in our Slack community's #support
channel. We are always happy and ready to help you!
Note
You can ignore # NBVAL_*
comments in certain cell blocks. They are for passing unit tests only, which we do to make sure the examples are always functional as we update the codebase.
#NBVAL_SKIP
%%capture
!pip -q install lineapy~=0.2 scikit-learn pandas matplotlib
#NBVAL_SKIP
%load_ext lineapy
# NBVAL_IGNORE_OUTPUT
import os
import lineapy
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
lineapy.tag("Pipeline End-to-End Demo")
Exploration and Pre-processing¶
We are a botanist working in an ecology lab, and we recently obtained a data set to explore. Let's see what it contains.
# Load data
url = "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv"
df = pd.read_csv(url)
# View data
df
sepal.length | sepal.width | petal.length | petal.width | variety | |
---|---|---|---|---|---|
0 | 5.1 | 3.5 | 1.4 | 0.2 | Setosa |
1 | 4.9 | 3.0 | 1.4 | 0.2 | Setosa |
2 | 4.7 | 3.2 | 1.3 | 0.2 | Setosa |
3 | 4.6 | 3.1 | 1.5 | 0.2 | Setosa |
4 | 5.0 | 3.6 | 1.4 | 0.2 | Setosa |
... | ... | ... | ... | ... | ... |
145 | 6.7 | 3.0 | 5.2 | 2.3 | Virginica |
146 | 6.3 | 2.5 | 5.0 | 1.9 | Virginica |
147 | 6.5 | 3.0 | 5.2 | 2.0 | Virginica |
148 | 6.2 | 3.4 | 5.4 | 2.3 | Virginica |
149 | 5.9 | 3.0 | 5.1 | 1.8 | Virginica |
150 rows × 5 columns
It is simple data that contains different iris species and their physical measurements. Our common sense dictates that petal length and width may have an association. Let's quickly check.
# NBVAL_IGNORE_OUTPUT
# Plot petal vs. sepal width
df.plot.scatter("petal.length", "petal.width")
plt.show()
We visually confirm that there is a strong positive relationship between petal length and width.
More interestingly, we see a distinct cluster of observations at the lower left side fof the plot, which suggests that these observations may correspond to a specific species. Let's plot observations by species.
# Check species and their counts
df["variety"].value_counts()
Setosa 50 Versicolor 50 Virginica 50 Name: variety, dtype: int64
# NBVAL_IGNORE_OUTPUT
# Map each species to a color
color_map = {"Setosa": "green", "Versicolor": "blue", "Virginica": "red"}
df["variety_color"] = df["variety"].map(color_map)
# Plot petal length vs. width by species
df.plot.scatter("petal.length", "petal.width", c="variety_color")
plt.show()
We indeed confirm that each species forms a quite distinct cluster based on petal traits.
To make our analysis more interesting, let's try some formal modeling, where we will try to identify the species of a given iris based on its petal traits.
# For modeling, map each species to a number
num_map = {"Setosa": 0, "Versicolor": 1, "Virginica": 2}
df["variety_num"] = df["variety"].map(num_map)
# View data
df
sepal.length | sepal.width | petal.length | petal.width | variety | variety_color | variety_num | |
---|---|---|---|---|---|---|---|
0 | 5.1 | 3.5 | 1.4 | 0.2 | Setosa | green | 0 |
1 | 4.9 | 3.0 | 1.4 | 0.2 | Setosa | green | 0 |
2 | 4.7 | 3.2 | 1.3 | 0.2 | Setosa | green | 0 |
3 | 4.6 | 3.1 | 1.5 | 0.2 | Setosa | green | 0 |
4 | 5.0 | 3.6 | 1.4 | 0.2 | Setosa | green | 0 |
... | ... | ... | ... | ... | ... | ... | ... |
145 | 6.7 | 3.0 | 5.2 | 2.3 | Virginica | red | 2 |
146 | 6.3 | 2.5 | 5.0 | 1.9 | Virginica | red | 2 |
147 | 6.5 | 3.0 | 5.2 | 2.0 | Virginica | red | 2 |
148 | 6.2 | 3.4 | 5.4 | 2.3 | Virginica | red | 2 |
149 | 5.9 | 3.0 | 5.1 | 1.8 | Virginica | red | 2 |
150 rows × 7 columns
Let's store the pre-processed data as an artifact since we may want to reuse/recompute it later.
# NBVAL_IGNORE_OUTPUT
# Save side effects as an artifact
lineapy.save(df, "iris_preprocessed")
LineaArtifact(name='iris_preprocessed', _version=0)
Info
In LineaPy, an artifact is both code and value. That is, when storing an artifact, LineaPy not only records the state (i.e., value) of the variable but also traces and saves all relevant operations leading to this state — as code. Such a complete development history or *lineage* then allows LineaPy to fully reproduce the given artifact. Learn more about LineaPy artifacts here.
Modeling¶
For model building and evaluation, we need to split the data into training and test sets.
# Identify predictor and outcome variables
X = df[["petal.length", "petal.width"]]
y = df["variety_num"]
# Specify parameters of data split
test_size = 0.33
random_state = 42
# Split the data
split_samples = train_test_split(X, y, test_size=test_size, random_state=random_state)
X_train = split_samples[0]
X_test = split_samples[1]
y_train = split_samples[2]
y_test = split_samples[3]
With the data split, we can now proceed to model training. Ours is a classification problem, so we can use multinomial logistic regression.
# NBVAL_IGNORE_OUTPUT
# Initiate the model
mod = LogisticRegression(multi_class="multinomial")
# Fit the model
mod.fit(X_train, y_train)
LogisticRegression(multi_class='multinomial')
Let's examine the model's accuracy, first on the training data and then on the test data.
# NBVAL_IGNORE_OUTPUT
# Examine model accuracy on training data
y_train_pred = mod.predict(X_train)
mod_eval_train = classification_report(y_train, y_train_pred, digits=3)
print(mod_eval_train)
precision recall f1-score support 0 1.000 1.000 1.000 31 1 0.941 0.914 0.928 35 2 0.914 0.941 0.928 34 accuracy 0.950 100 macro avg 0.952 0.952 0.952 100 weighted avg 0.950 0.950 0.950 100
# NBVAL_IGNORE_OUTPUT
# Examine model accuracy on test data
y_test_pred = mod.predict(X_test)
mod_eval_test = classification_report(y_test, y_test_pred, digits=3)
print(mod_eval_test)
precision recall f1-score support 0 1.000 1.000 1.000 19 1 1.000 1.000 1.000 15 2 1.000 1.000 1.000 16 accuracy 1.000 50 macro avg 1.000 1.000 1.000 50 weighted avg 1.000 1.000 1.000 50
Overall, results indicate our model is very accurate with all correct predictions on the test data!
We are quite content with the model, so let's save it as an artifact.
# NBVAL_IGNORE_OUTPUT
# Save the fitted model as an artifact
lineapy.save(mod, "iris_model")
LineaArtifact(name='iris_model', _version=0)
Let's also save the model evaluation result so we can reference it later as needed. For instance, this information may help model comparison and selection.
# NBVAL_IGNORE_OUTPUT
# Save the model evaluation result as an artifact
lineapy.save(mod_eval_test, "iris_model_evaluation")
LineaArtifact(name='iris_model_evaluation', _version=0)
Pipeline Building¶
Say we expect rerunning the above processes again in the future. For instance, our source data (iris.csv
) may get new observations added. Or, we may want to train the model with a different source data. Or, we may want to train the model with a different randomization setup (i.e., different random_state
). For these, we can simply update and rerun our original development scripts/notebooks, of course. However, this is a brittle process that's prone to errors, so we may want to instead set up a proper pipeline.
Normally, this would involve finding our old code, cleaning it up, and transforming it into a deployable pipeline — all manually. Thankfully, we had already captured the relevant development history across LineaPy artifacts, so, for us, building a pipeline reduces to “stitching” these artifacts, like so:
# NBVAL_IGNORE_OUTPUT
# Build an Airflow pipeline using artifacts
lineapy.to_pipeline(
pipeline_name="iris_pipeline",
artifacts=["iris_preprocessed", "iris_model", "iris_model_evaluation"],
dependencies={
"iris_model_evaluation": {"iris_model"},
"iris_model": {"iris_preprocessed"}
},
input_parameters=["url", "test_size", "random_state"],
output_dir="./pipeline_example/",
framework="AIRFLOW",
)
Generated module file: pipeline_example/iris_pipeline_module.py Generated requirements file: pipeline_example/iris_pipeline_requirements.txt Generated DAG file: pipeline_example/iris_pipeline_dag.py Generated Docker file: pipeline_example/iris_pipeline_Dockerfile
PosixPath('pipeline_example')
where
artifacts
is the list of artifact names to be used for the pipelinepipeline_name
is the name of the pipelinedependencies
is the dependency graph among artifacts- If artifact A depends on artifacts B and C, then the graph is specified as
{ A: { B, C } }
- If A depends on B and B depends on C, then the graph is specified as
{ A: { B }, B: { C } }
- If artifact A depends on artifacts B and C, then the graph is specified as
input_parameters
is the list of variable names (in the code) to be turned into tunable parameters of the pipelineoutput_dir
is the location to put the files for running the pipelineframework
is the name of orchestration framework to use- LineaPy currently supports
"AIRFLOW"
and"SCRIPT"
- If
"AIRFLOW"
, it will generate files that can run Airflow DAGs - If
"SCRIPT"
, it will generate files that can run the pipeline as a Python script
- LineaPy currently supports
The API call above generates several files including:
# NBVAL_IGNORE_OUTPUT
# Check the generated files for running the pipeline
os.listdir("./pipeline_example/")
['iris_pipeline_dag.py', 'iris_pipeline_Dockerfile', 'iris_pipeline_module.py', 'iris_pipeline_requirements.txt']
where
[pipeline_name]_module.py
contains the artifact code packaged as a function module[pipeline_name]_dag.py
uses the packaged function(s) to define the pipeline[pipeline_name]_requirements.txt
lists any package dependencies for running the pipeline[pipeline_name]_Dockerfile
contains commands to set up the environment to run the pipeline
Module File¶
Specifically, in the current case, we have iris_pipeline_module.py
looking as follows:
# Uncomment and run next line to view full file content
# %cat ./pipeline_example/iris_pipeline_module.py
# ./pipeline_example/iris_pipeline_module.py
[...]
def get_iris_preprocessed(url):
df = pd.read_csv(url)
color_map = {"Setosa": "green", "Versicolor": "blue", "Virginica": "red"}
df["variety_color"] = df["variety"].map(color_map)
num_map = {"Setosa": 0, "Versicolor": 1, "Virginica": 2}
df["variety_num"] = df["variety"].map(num_map)
return df
def get_split_samples_for_artifact_iris_model_and_downstream(
df, random_state, test_size
):
X = df[["petal.length", "petal.width"]]
y = df["variety_num"]
split_samples = train_test_split(
X, y, test_size=test_size, random_state=random_state
)
return split_samples
def get_iris_model(split_samples):
X_train = split_samples[0]
y_train = split_samples[2]
mod = LogisticRegression(multi_class="multinomial")
mod.fit(X_train, y_train)
return mod
def get_iris_model_evaluation(mod, split_samples):
X_test = split_samples[1]
y_test = split_samples[3]
y_test_pred = mod.predict(X_test)
mod_eval_test = classification_report(y_test, y_test_pred, digits=3)
return mod_eval_test
[...]
We can see that LineaPy used artifacts to automatically 1) clean up their code to retain only essential operations and 2) package the cleaned-up code into importable functions. For instance, we see that get_iris_preprocessed()
can re-calculate the iris_preprocessed
artifact, but in a more succinct way (e.g., irrelevant plotting code is now gone).
Note that some function signatures have parameters (e.g., def get_iris_preprocessed(url)
), and this is to support the pipeline parametrization we requested during our pipeline API call earlier.
Note also that LineaPy has smartly identified the "common" computation between iris_model
and iris_model_evaluation
artifacts, which has been automatically factored out into its own function, i.e., get_split_samples_for_artifact_iris_model_and_downstream()
; if this were a step involving a heavy computation, the current modularization would have saved much time and resource. This way, LineaPy minimizes duplicate processes in a pipeline, making it more efficient to run.
Info
In case you are wondering how LineaPy does this "magic": LineaPy represents user's code as a graph, where each node is a certain execution (e.g., variable assignment). Thanks to this graph representation, LineaPy can traverse user's code to identify dependency relations between parts of the code, which is the basis for code cleanup as well as "smart" refactoring (i.e., factoring out common code blocks between artifacts into a separate function). For illustration, the following shows an example of how user's code may map onto LineaPy's graph representation:
As shown, agg_df
(red) and summary_df
(blue) share a common set of operations (green), so LineaPy will detect this overlap and factor it out into a separate function during code refactor.
DAG File¶
These functions in the module file are then imported and used by iris_pipeline_dag.py
to compose an Airflow DAG:
# Uncomment and run next line to view full file content
# %cat ./pipeline_example/iris_pipeline_dag.py
# ./pipeline_example/iris_pipeline_dag.py
[...]
import iris_pipeline_module
[...]
def task_iris_preprocessed(url):
url = str(url)
df = iris_pipeline_module.get_iris_preprocessed(url)
pickle.dump(df, open("/tmp/iris_pipeline/variable_df.pickle", "wb"))
def task_split_samples_for_artifact_iris_model_and_downstream(random_state, test_size):
random_state = int(random_state)
test_size = float(test_size)
df = pickle.load(open("/tmp/iris_pipeline/variable_df.pickle", "rb"))
split_samples = (
iris_pipeline_module.get_split_samples_for_artifact_iris_model_and_downstream(
df, random_state, test_size
)
)
pickle.dump(
split_samples, open("/tmp/iris_pipeline/variable_split_samples.pickle", "wb")
)
[...]
default_dag_args = {
"owner": "airflow",
"retries": 2,
"start_date": days_ago(1),
"params": {
"url": "https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv",
"test_size": 0.33,
"random_state": 42,
},
}
with DAG(
dag_id="iris_pipeline_dag",
schedule_interval="*/15 * * * *",
max_active_runs=1,
catchup=False,
default_args=default_dag_args,
) as dag:
[...]
iris_preprocessed = PythonOperator(
task_id="iris_preprocessed_task",
python_callable=task_iris_preprocessed,
op_kwargs={"url": "{{ params.url }}"},
)
split_samples_for_artifact_iris_model_and_downstream = PythonOperator(
task_id="split_samples_for_artifact_iris_model_and_downstream_task",
python_callable=task_split_samples_for_artifact_iris_model_and_downstream,
op_kwargs={
"random_state": "{{ params.random_state }}",
"test_size": "{{ params.test_size }}",
},
)
[...]
iris_preprocessed >> split_samples_for_artifact_iris_model_and_downstream
[...]
As shown, the DAG file defines "tasks" (e.g., iris_preprocessed_task
) using functions from the module file, and then puts them together into an executable graph (dependency relationships are expressed through >>
).
Note that these tasks in the DAG file also reflect our requested pipeline parameters (e.g., url
), which will eventually allow us to run the pipeline with different values.
Requirements File¶
To ensure that the code runs as it did during development, iris_pipeline_requirements.txt
has captured the relevant package dependencies, like so:
# NBVAL_IGNORE_OUTPUT
%cat ./pipeline_example/iris_pipeline_requirements.txt
pandas==1.3.5 scikit-learn==1.0.2
Note that this list is much shorter than all imports that we did during development (e.g., it does not include matplotlib
). This is because LineaPy has identified and kept necessary package dependencies only, hence making the setup as lean and efficient as possible.
Dockerfile¶
Finally, iris_pipeline_Dockerfile
further facilitates pipeline execution by containerizing/automating environment setup and launch.
# NBVAL_IGNORE_OUTPUT
%cat ./pipeline_example/iris_pipeline_Dockerfile
FROM apache/airflow:latest-python3.10 RUN mkdir /tmp/installers WORKDIR /tmp/installers # copy all the requirements to run the current dag COPY ./iris_pipeline_requirements.txt ./ # install the required libs RUN pip install -r ./iris_pipeline_requirements.txt WORKDIR /opt/airflow/dags COPY . . WORKDIR /opt/airflow CMD [ "standalone" ]
Running Pipelines Locally¶
With these automatically generated files, we can quickly test running the pipeline locally. First, we run the following command to build a Docker image:
docker build -t [image_name] . -f iris_pipeline_Dockerfile
where [image_name]
is the image name of our choice.
We then stand up a container instance with the following command:
docker run -it -p 8080:8080 [image_name]
In the current example where we set framework="AIRFLOW"
, this will result in an Airflow instance with an executable DAG in it.
Recap¶
Data science workflows revolve around building and refining pipelines, i.e. series of processes that transform data into useful information/product. Traditionally, this is often manual and time-consuming work as data scientists (or other engineers) need to clean up messy development code and transform it into deployable scripts for the target system (e.g., Airflow). This tutorial demonstrated that LineaPy, having the complete development process stored in artifacts, can automate such code transformation, accelerating transition from development to production.
Info
If you want to learn more about LineaPy's pipeline support, check out the project documentation.