Component specification#
Each Fondant component is defined by a component specification which describes its interface. The component specification is used for a couple of things:
- To define which input data Fondant should provide to the component, and which output data it should write to storage.
- To validate compatibility with other components.
- To execute the component with the correct parameters.
The component specification should be defined by the author of the component.
Contents#
A component specification consists of the following sections:
Metadata#
The metadata tracks metadata about the component, such as its name, description, and the URL of the Docker image used to run it.
Consumes & produces#
The consumes
and produces
sections describe which data the component consumes and produces.
The specification below for instance defines a component that creates an embedding from an
image-caption combination.
...
consumes:
images:
type: binary
text:
type: utf8
produces:
embeddings:
type: array
items:
type: float32
The consumes
and produces
sections follow the schema below:
Fields#
Each component specification defines a list of fields
where the fields are the columns of the
dataset.
- Only those fields defined in the
consumes
section of the component specification are read and passed to the component implementation. - Only those fields defined in the
produces
section of the component specification are written to storage
Each field defines the expected data type, which should match the types defined by Fondant, that correspond to Arrow data types.
Note that you can always map a field from your dataset with a different name to a specific field name expected by the
component provided they have the same data type. For example, suppose we have a component spec that
consumes a text
field:
If your dataset has a field called custom_text
with type string
, you can map it to the
text
field in the component spec as follows:
dataset = Dataset.read(...)
dataset = dataset.apply(
"example_component",
consumes={
"text": "custom_text"
}
In this example, the custom_text
field will be mapped to the text
field to match the
field expected by the component.
Similarly, you can also the map the output field of a component to a specific field name in the
dataset. Suppose we have a component spec that produces a width
field:
If you want to map the output field to a field called custom_width
in the dataset, you can do
so as follows:
dataset = Dataset.read(...)
dataset = dataset.apply(
"example_component",
produces={
"width": "custom_width"
}
In this example, the component produces a field called width
. This field name is mapped to a custom field
name custom_width
which can be referenced in later components or used to change the field name of the final
written dataset.
Dynamic fields#
The schema also defines the additionalProperties
keyword. This can be
used to define dynamic fields that should be produced or consumed when set to true
. This can be useful in many scenarios,
here are a few examples:
- Components that load/write general fields from/to external source (e.g. a CSV file, HuggingFace dataset, ...) can use this to define dynamic fields that should be loaded/written.
- Components that consume or produce optional fields. For example, a component that queries a vector database can accept either a text passage or optionally precalculated text embeddings.
- Components that can work on a dynamic amount of fields.
Let's take an example of a component that loads a dataset from a CSV file. The CSV file can contain any number of
columns, so we set additionalProperties
to true
to allow any column to be loaded.
Note that the schema of the fields to be produced is not defined as it would usually be in the component specification, so we will need to specify the schema of the fields when defining the components
dataset = Dataset.create(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
},
produces={
"image": pa.binary(),
"embedding": pa.list_(pa.binary())
}
)
Here we define the schema of the image
and embedding
fields which will be produced by the component.
Now that we know how to define dynamic fields to be produced, let's take a look at how we can use the additionalProperties
to define additional field to be consumed. Building on the previous example, let's take a component that takes
either an image
or embedding
field as input to query a certain vector database. The specification
for such a component can be defined as follows:
additionalProperties
to allow the component to accept dynamic fields. This gives us the flexibly choose which field to consume
by the next component. We can either load the image
field:
dataset = Dataset.create(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
},
produces={
"my_image": pa.binary(),
"my_embedding": pa.list_(pa.binary())
}
)
dataset = dataset.apply(
"query_vector_database",
consumes={
"image": "my_image"
}
)
or the embedding
field:
dataset = Dataset.create(
"load_from_csv",
arguments={
"dataset_uri": "path/to/dataset.csv",
},
produces={
"my_image": pa.binary(),
"my_embedding": pa.list_(pa.binary())
}
)
dataset = dataset.apply(
"query_vector_database",
consumes={
"embedding": "my_embedding"
}
)
Where my_image
and my_embedding
are the fields produced by the previous component and image
, embedding
are the field names
that can be consumed by the query_vector_database
component. The data type of the consumed field does not need to be specified here
since it can be inferred from the previous component.
Note that in the implementation of the component, there should be a custom logic to handle the different cases of the consumed fields based on the passed field name.
For a practical example on using dynamic fields, make sure to check the guide on implementing your own custom component below to build a better understanding.
Args#
The args
section describes which arguments the component takes. Each argument is defined by a
description
and a type
, which should be one of the builtin Python types. Additionally, you can
set an optional default
value for each argument.
args:
custom_argument:
description: A custom argument
type: str
default_argument:
description: A default argument
type: str
default: bar
These arguments are passed in when the component is instantiated. If an argument is not explicitly provided, the default value will be used instead if available.
Afterwards, we pass all keyword arguments to the __init__()
method of the component.
import pandas as pd
from fondant.component import PandasTransformComponent
class ExampleComponent(PandasTransformComponent):
def __init__(self, *, custom_argument, default_argument) -> None:
"""
Args:
x_argument: An argument passed to the component
"""
# Initialize your component here based on the arguments
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""Implement your custom logic in this single method
Args:
dataframe: A Pandas dataframe containing the data
Returns:
A pandas dataframe containing the transformed data
"""
Afterwards, we pass all keyword arguments to the __init__()
method of the component.
You can also use the setup()
and teardown()
methods to do setup and cleanup of component
configuration.
The setup()
method is useful to set up any configuration that is not directly used by your
component, but by some of the underlying dependencies such as Dask
. The advantage compared to
__init__()
is that you can return a state which will be injected into the teardown()
method,
so you don't need to store everything as an instance attribute, which can be a problem for
unpickleable objects when running in parallel across processes.
You can use the teardown()
method to clean up both instance variables from __init__()
and
state from setup()
. Eg. closing open connections or files.
import typing as t
import pandas as pd
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from fondant.component import PandasTransformComponent
from my_library import HTTPClient
class MyComponent(PandasTransformComponent):
def __init__(self, *, client_url) -> None:
"""
Args:
client_url: An argument passed to the component
"""
# Initialize your component here based on the arguments
self.http_client = HTTPClient(client_url)
def setup(self) -> t.Any:
return Client(LocalCUDACluster)
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""Implement your custom logic in this single method
Args:
dataframe: A Pandas dataframe containing the data
Returns:
A pandas dataframe containing the transformed data
"""
def teardown(self, dask_client):
"""Perform any cleanup after the component has been executed
"""
self.http_client.shutdown()
dask_client.shutdown()