Skip to content

Building your own pipeline#

This guide will teach you how to build and run your own pipeline using components available on the Fondant hub.

Overview#

In this guide, we will build a pipeline that downloads images from the fondant-cc-25m dataset and filters them.

It consists of three steps:

Setting up the environment#

We will be using the local runner to run this pipelines. To set up your local environment, please refer to our installation documentation.

Building the pipeline#

Start by creating a pipeline.py file and adding the following code.

from fondant.pipeline import Pipeline

pipeline = Pipeline(
    name="creative_commons_pipline",
    base_path="./data"
)

IMPORTANT

Make sure the provided base_path already exists.

View a detailed reference of the options accepted by the Pipeline class

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
base_path str

The base path for the pipeline to use to store artifacts and data. This can be a local path or a remote path on one of the supported cloud storage services. The path should already exist.

required
description Optional[str]

Optional description of the pipeline.

None

#

Adding components#

Now it's time to incrementally build our pipeline by adding different execution steps or components. Components are executable elements of a pipeline that consume and produce data.

You can use two types of components with Fondant:

  • Reusable components: A bunch of reusable components are available on our hub, which you can easily add to your pipeline.
  • Custom components: You can also implement your own custom component.

If you want to learn more about components, you can check out the components documentation.

1. A reusable load component#

As a first step, we want to read data into our pipeline. In this case, we will load a dataset from the HuggingFace Hub. For this, we can use the reusable load_from_hf_hub component.

We can read data into our pipeline using the Pipeline.read() method, which returns a (lazy) Dataset.

import pyarrow as pa

dataset = pipeline.read(
    "load_from_hf_hub",
    arguments={
        "dataset_name": "fondant-ai/fondant-cc-25m",
        "n_rows_to_load": 100,
    },
    produces={
      "alt_text": pa.string(),
      "url": pa.string(),
      "license_location": pa.string(),
      "license_type": pa.string(),
      "webpage_url": pa.string(),
    }
)

We provide three arguments to the .read() method:

  • The name of the reusable component
  • Some arguments to configure the component. Check the component's documentation for the supported arguments
  • The schema of the data the component will produce. This is necessary for this specific component since the output is dynamic based on the dataset being loaded. You can see this defined in the component documentation with additionalProperties: true under the produces section.
View a detailed reference of the Pipeline.read() method

Read data using the provided component.

Parameters:

Name Type Description Default
name_or_path Union[str, Path]

The name of a reusable component, or the path to the directory containing a custom component.

required
produces Optional[Dict[str, Union[str, DataType]]]

A mapping to update the fields produced by the operation as defined in the component spec. The keys are the names of the fields to be received by the component, while the values are the type of the field, or the name of the field to map from the dataset.

None
arguments Optional[Dict[str, Any]]

A dictionary containing the argument name and value for the operation.

None
input_partition_rows Optional[Union[int, str]]

The number of rows to load per partition. Set to override the

None
resources Optional[Resources]

The resources to assign to the operation.

None
cache Optional[bool]

Set to False to disable caching, True by default.

True
cluster_type Optional[str]

The type of cluster to use for distributed execution (default is "local").

'default'
client_kwargs Optional[dict]

Keyword arguments used to initialise the Dask client.

None

Returns:

Type Description
Dataset

An intermediate dataset.

#

To test the pipeline, you can execute the following command within the pipeline directory:

fondant run local pipeline.py

The pipeline execution will start, initiating the download of the dataset from HuggingFace. After the pipeline has completed, you can explore the pipeline result using the fondant explorer:

fondant explore start --base_path ./data

You can open your browser at localhost:8501 to explore the loaded data.

2. A reusable transform component#

Our pipeline has successfully loaded the dataset from HuggingFace. One of these columns, url, directs us to the original source of the images. To access and utilise these images directly, we must download each of them.

Downloading images is a common requirement across various use cases, which is why Fondant provides a reusable component specifically for this purpose. This component is appropriately named download_images.

We can add this component to our pipeline as follows:

images = dataset.apply(
    "download_images",
)

Looking at the component documentation, we can see that it expects an "image_url" field, which was generated by our previous component. This means that we can simply chain the components as-is.

3. A reusable transform component with non-matching fields#

This won't always be the case though. We now want to filter our dataset for images that contain English alt text. For this, we leverage the filter_language component. Looking at the component documentation, we can see that it expects an "text" field, while we would like to apply it to the "alt_text" field in our dataset.

We can easily achieve this using the consumes argument, which lets us maps the fields that the component will consume. Here we indicate that the component should read the "alt_text" field instead of the "text" field.

english_images = images.apply(
  "filter_language",
  arguments={
    "language": "en"
  },
  consumes={
    "text": "alt_text"
  }
)
View a detailed reference of the Dataset.apply() method

Apply the provided component on the dataset.

Parameters:

Name Type Description Default
name_or_path Union[str, Path]

The name of a reusable component, or the path to the directory containing a custom component.

required
consumes Optional[Dict[str, Union[str, DataType]]]

A mapping to update the fields consumed by the operation as defined in the component spec. The keys are the names of the fields to be received by the component, while the values are the type of the field, or the name of the field to map from the input dataset.

Suppose we have a component spec that expects the following fields:

...
consumes:
    text:
        type: string
    image:
        type: binary
...

To override the default mapping and specify that the 'text' field should be sourced from the 'custom_text' field in the input dataset, the 'consumes' mapping can be defined as follows:

consumes = {
    "text": "custom_text"
}

In this example, the 'text' field will be sourced from 'custom_text' and 'image' will be sourced from the 'image' field by default, since it's not specified in the custom mapping.

None
produces Optional[Dict[str, Union[str, DataType]]]

A mapping to update the fields produced by the operation as defined in the component spec. The keys are the names of the fields to be produced by the component, while the values are the type of the field, or the name that should be used to write the field to the output dataset.

Suppose we have a component spec that expects the following fields:

...
produces:
    text:
        type: string
    width:
        type: int

To customize the field names and types during the production step, the 'produces' mapping can be defined as follows:

produces = {
    "width": "custom_width",
}

In this example, the 'text' field will retain as text since it is not specified in the custom mapping. The 'width' field will be stored with the name 'custom_width' in the output dataset.

Alternatively, the produces defines the data type of the output data.

produces = {
    "width": pa.float32(),
}

In this example, the 'text' field will retain its type 'string' without specifying a different source, while the 'width' field will be produced as type float in the output dataset.

None
arguments Optional[Dict[str, Any]]

A dictionary containing the argument name and value for the operation.

None
input_partition_rows Optional[Union[int, str]]

The number of rows to load per partition. Set to override the

None
resources Optional[Resources]

The resources to assign to the operation.

None
cache Optional[bool]

Set to False to disable caching, True by default.

True
cluster_type Optional[str]

The type of cluster to use for distributed execution (default is "local").

'default'
client_kwargs Optional[dict]

Keyword arguments used to initialise the Dask client.

None

Returns:

Type Description
Dataset

An intermediate dataset.

#

Inspecting your data#

Now, you can proceed to execute your pipeline once more and explore the results. In the explorer, you will be able to view the images that have been downloaded.

explorer

Well done! You have now acquired the skills to construct a simple Fondant pipeline by leveraging reusable components. In the next tutorial, we'll demonstrate how you can customise the pipeline by implementing a custom component.