With the latest releases of kedro 0.17.x
, it is now possible to run kedro
pipelines from within scripts. While I would not start a project with this
technique, it will be an excellent tool to keep in my back pocket when I want to
sprinkle in a bit of kedro goodness in existing projects.
New to Kedro
If you are just learning about kedro, check out this post walking through it
No More Rabbit Hole of Errors
as of 0.17.2
I've tried to do this in kedro 0.16.x,
which turned into a rabbit hole of
errors. First, kedro needed a conf
directory. If you tried to fake one in, kedro
would then ask for a logging setup. These errors just kept coming to the point
it wasn't worth doing, and I might as well use a proper template for real
projects and stick to simple function calls for things that are not a kedro
project.
Kedro in a script
To get kedro running, you will need a pipeline, catalog, and
runner at a minimum. If you have used kedro before, the pipeline will
look very similar to what you are familiar with, but the catalog will not be
loaded from yaml, and you will need to bring your runner.
from kedro.pipeline import Pipeline, node
from kedro.io import DataCatalog
from kedro.runner.sequential_runner import SequentialRunner
# additional datasets you want to use
from kedro.extras.datasets.pandas.csv_dataset import CSVDataSet
from kedro.extras.datasets.pandas.parquet_dataset import ParquetDataSet
# the sequential runner is the simplest. It runs one node at a time.
runner = SequentialRunner()
# this is a super simple example pipeline
pipeline = Pipeline(
[
node(lambda: range(100), None, "range"),
node(lambda x: [i ** 2 for i in x], "range", "range**2"),
node(lambda x: [i for i in x if i > 5000], "range**2", "range>5k"),
node(lambda x: x[:5], "range>5k", "range>5k-head"),
node(lambda x: sum(x) / len(x), "range>5k", "range>5k-mean"),
]
)
# to get up and running, you can use an empty catalog
catalog = DataCatalog()
runner.run(pipeline, catalog)
π Above is the minimal setup to get a kedro pipeline running
more practically
More often, your kedro pipelines are going to use a function rather than a
lambda, and pandas DataFrames.
def clean_columns(df: pd.DataFrame):
df.columns = [col.lower().strip() for col in df.columns]
pipeline = Pipeline(
[
node(clean_columns, "raw_data", "clean_columns", name="create_clean_columns"),
]
)
catalog = DataCatalog(
{
"raw_data": ParquetDataSet(filepath=f"data/raw_data.parquet")
"clean_columns": ParquetDataSet(filepath=f"data/clean_columns.parquet")
}
)
One single node pipeline to get you started
Semi-automatic catalog
For some reason, when I tried to use the DataCatalogWithDefault it did not pick
up my datasets right. I suspect this has something to do with not setting up a
proper session, so this is what I did in a pinch to get that catalog goodness
for my DataFrames without setting up each one manually.
catalog = DataCatalog(
{
name: ParquetDataSet(filepath=f"data/{name}.parquet")
for name in pipeline.all_outputs()
}
)
β If all of your datasets are pandas DataFrames
For the example above that does not use DataFrames, I would pickle all of my
outputs to enable re-loading them later.
catalog = DataCatalog(
{
name: PickleDataSet(filepath=f"data/{name}.pkl")
for name in pipeline.all_outputs()
}
)
π₯ for use with non-pandas datasets
Logging
Once you explicitly add datasets, kedro will start logging when it's
loading, running, or saving each node. Things will begin to look a
bit more familiar to anyone who has used kedro before.
ww3 βͺmain Β©kedro-in-scripts v3.8.8 ipython
β― runner.run(pipeline, catalog)
2021-04-18 09:30:58,099 - kedro.pipeline.node - INFO - Running node: <lambda>(None) -> [range]
2021-04-18 09:30:58,100 - kedro.io.data_catalog - INFO - Saving data to `range` (PickleDataSet)...
2021-04-18 09:30:58,104 - kedro.runner.sequential_runner - INFO - Completed 1 out of 5 tasks
2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Loading data from `range` (PickleDataSet)...
2021-04-18 09:30:58,105 - kedro.pipeline.node - INFO - Running node: <lambda>([range]) -> [range**2]
2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Saving data to `range**2` (PickleDataSet)...
2021-04-18 09:30:58,111 - kedro.runner.sequential_runner - INFO - Completed 2 out of 5 tasks
2021-04-18 09:30:58,111 - kedro.io.data_catalog - INFO - Loading data from `range**2` (PickleDataSet)...
2021-04-18 09:30:58,112 - kedro.pipeline.node - INFO - Running node: <lambda>([range**2]) -> [range>5k]
2021-04-18 09:30:58,112 - kedro.io.data_catalog - INFO - Saving data to `range>5k` (PickleDataSet)...
2021-04-18 09:30:58,115 - kedro.runner.sequential_runner - INFO - Completed 3 out of 5 tasks
2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)...
2021-04-18 09:30:58,115 - kedro.pipeline.node - INFO - Running node: <lambda>([range>5k]) -> [range>5k-mean]
2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Saving data to `range>5k-mean` (PickleDataSet)...
2021-04-18 09:30:58,118 - kedro.runner.sequential_runner - INFO - Completed 4 out of 5 tasks
2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)...
2021-04-18 09:30:58,119 - kedro.pipeline.node - INFO - Running node: <lambda>([range>5k]) -> [range>5k-head]
2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Saving data to `range>5k-head` (PickleDataSet)...
2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Completed 5 out of 5 tasks
2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
Kedro Viz
I was not able to quickly get kedro viz up and running for my use case. If you
really wanted to you could start modifying their format_pipelines_data function
in
server.py.
Or you could render a new template and put your pipeline there for viz
purposes.
It's possible, but might as well stick to the template
cli
For something that I would be using this on, I will probably not put much
effort into the cli as it is not likely something that we will have a
team of developers interacting with constantly. I would just put together the
minimum necessary to run my application how I need it.
if __name__ == "__main__":
import sys
if '--skip-raw' in sys.argv:
runner.run(pipeline.from_inputs('range**2'), catalog)
else:
runner.run(pipeline, catalog)
Keeping it simple
If I want to go down the route of having a full cli built out, I am probably
going to use the full kedro template or something very similar.
It's a bit Rough
While I might use this in production somewhere, it will be inside some
other not kedro application. I will still be using something quite similar to
their template for my pipelining projects. It misses some excellent
things that bring me to kedro like hooks, plugins, credentials, catalog,
logging config, cli, and viz.
Top comments (0)