Designing and Running Pipelines
In Pipelex, a pipeline is not just a rigid sequence of steps; it's a dynamic and intelligent workflow built by composing individual, reusable components called pipes. This approach allows you to break down complex AI tasks into manageable, testable, and reliable units.
This guide provides an overview of how to design your pipelines and execute them.
The Building Blocks: Pipes
A pipeline is composed of pipes. There are two fundamental types of pipes you will use to build your workflows:
- Pipe Operators: These are the "workers" of your pipeline. They perform concrete actions like calling an LLM (
PipeLLM
), extracting text from a document (PipeOcr
), or running a Python function (PipeFunc
). Each operator is a specialized tool designed for a specific task. - Pipe Controllers: These are the "managers" of your pipeline. They don't perform tasks themselves but orchestrate the execution flow of other pipes. They define the logic of your workflow, such as running pipes in sequence (
PipeSequence
), in parallel (PipeParallel
), or based on a condition (PipeCondition
).
Designing a Pipeline: Composition in TOML
The most common way to design a pipeline is by defining and composing pipes in a .toml
configuration file. This provides a clear, declarative way to see the structure of your workflow.
Each pipe, whether it's an operator or a controller, is defined in its own [pipe.<pipe_name>]
table. The <pipe_name>
becomes the unique identifier for that pipe.
Let's look at a simple example. Imagine we want a workflow that: 1. Takes a product description. 2. Generates a short, catchy marketing tagline for it.
We can achieve this with a PipeLLM
operator.
# Filename: marketing_pipeline.toml
domain = "marketing"
definition = "Marketing content generation domain"
# 1. Define the concepts used in our pipes
[concept]
ProductDescription = "A description of a product's features and benefits"
Tagline = "A catchy marketing tagline"
# 2. Define the pipe that does the work
[pipe.generate_tagline]
PipeLLM = "Generate a catchy tagline for a product"
inputs = { description = "ProductDescription" }
output = "Tagline"
prompt_template = """
Product Description:
@description
Generate a catchy tagline based on the above description.
The tagline should be memorable, concise, and highlight the key benefit.
"""
This defines a single-step pipeline. The pipe generate_tagline
takes a ProductDescription
as input and outputs a Tagline
.
To create a multi-step workflow, you use a controller. The PipeSequence
controller is the most common one. It executes a series of pipes in a specific order.
# Filename: marketing_pipeline.toml
domain = "marketing"
definition = "Marketing content generation domain"
# 1. Define concepts
[concept]
ProductDescription = "A description of a product's features and benefits"
Keyword = "A keyword extracted from a text"
Tagline = "A catchy marketing tagline"
# 2. Define operator pipes
[pipe.extract_keywords]
PipeLLM = "Extract keywords from a product description"
inputs = { description = "ProductDescription" }
output = "Keyword"
multiple_output = true
prompt_template = """
Please extract the most relevant keywords from the following product description:
@description
Focus on features, benefits, and unique selling points.
"""
[pipe.generate_tagline_from_keywords]
PipeLLM = "Generate a tagline from keywords"
inputs = { keywords = "Keyword" }
output = "Tagline"
prompt_template = """
Here are the key product keywords:
@keywords
Generate a catchy marketing tagline based on these keywords.
The tagline should be memorable, concise (under 10 words), and highlight the main benefit.
"""
# 3. This controller pipe defines the two-step pipeline
[pipe.description_to_tagline]
PipeSequence = "From product description to tagline"
inputs = { description = "ProductDescription" }
output = "Tagline"
steps = [
{ pipe = "extract_keywords", result = "extracted_keywords" },
{ pipe = "generate_tagline_from_keywords", result = "tagline" },
]
Data Flow: The Working Memory
How does data get from extract_keywords
to generate_tagline_from_keywords
? This is handled by the Working Memory.
The Working Memory is a temporary storage space that exists for the duration of a single pipeline run.
- When a pipe in a sequence executes, its output is given a name using the
result
key (e.g.,result = "extracted_keywords"
). - This named result is placed into the Working Memory.
- Subsequent pipes can then reference this data by its name in their
inputs
field (e.g.,inputs = { keywords = "Keyword" }
).
This mechanism allows you to chain pipes together, creating a flow of information through your pipeline.
Running a Pipeline
Once your pipes are defined, you can execute them from your Python code. Pipelex provides two main functions for this: start_pipeline
and execute_pipeline
.
To run the description_to_tagline
pipeline we defined above, you would call it by its unique name:
import asyncio
from pipelex.core.working_memory import WorkingMemory
from pipelex.core.working_memory_factory import WorkingMemoryFactory
from pipelex.pipelex import Pipelex
from pipelex.pipeline.execute import execute_pipeline
async def main():
# First, initialize Pipelex (this loads all pipeline definitions)
Pipelex.make()
# Create working memory with the pipeline's input
working_memory = WorkingMemoryFactory.make_from_text(
text="EcoClean Pro is a revolutionary biodegradable cleaning solution that removes 99.9% of germs while being completely safe for children and pets. Made from plant-based ingredients.",
concept_code="ProductDescription",
name="description"
)
# Execute the pipeline and wait for the result
pipe_output, _ = await execute_pipeline(
pipe_code="description_to_tagline",
working_memory=working_memory,
)
# Get the final output
tagline = pipe_output.main_stuff_as(content_type=str)
print(f"Generated tagline: {tagline}")
if __name__ == "__main__":
asyncio.run(main())
execute_pipeline
: Runs the specified pipe and waits for it to complete, returning the final output. This is useful for simple, synchronous-style interactions.start_pipeline
: Immediately returns apipeline_run_id
and anasyncio.Task
. This allows you to run pipelines in the background and manage them asynchronously, which is essential for complex, long-running, or parallel workflows.
By combining declarative TOML definitions with a powerful Python execution model, Pipelex gives you a robust framework for building and running reliable AI workflows.