PipeParallel
The PipeParallel
controller executes multiple pipes simultaneously. This is highly effective for running pipes concurrently in isolated branches.
How it works
PipeParallel
runs a list of sub-pipes in concurrent branches.
- Isolation: Before execution,
PipeParallel
creates a deep copy of the currentWorkingMemory
for each branch. This means every parallel pipe starts with the exact same state, but they run in complete isolation—a change in one branch will not affect another. - Concurrent Execution: All specified pipes are executed at the same time using
asyncio.gather
. - Output Handling: After all parallel tasks have finished, their results are collected and added back to the main working memory. You can control how this happens with two parameters:
add_each_output
: Iftrue
, the individual result of each branch is added to the working memory under the name specified in itsresult
key.combined_output
: If you provide an output concept here, the results of all branches are bundled together into a single structured object. The field names of this object correspond to theresult
names of the branches.
You must use add_each_output
, combined_output
, or both.
Configuration
PipeParallel
is configured in your pipeline's .toml
file.
TOML Parameters
Parameter | Type | Description | Required |
---|---|---|---|
PipeParallel |
string | A descriptive name for the parallel operation. | Yes |
inputs |
dictionary | The input concept(s) for the parallel operation, as a dictionary mapping input names to concept codes. | Yes |
output |
string | The output concept produced by the parallel operation. | Yes |
parallels |
array of tables | An array defining the pipes to run in parallel. Each table is a sub-pipe definition. | Yes |
add_each_output |
boolean | If true , adds the output of each parallel pipe to the working memory individually. Defaults to false . |
No |
combined_output |
string | The name of a concept to use for a single, combined output object. The structure of this concept must have fields that match the result names from the parallels array. |
No |
Parallel Step Configuration
Each entry in the parallels
array is a table with the following keys:
Key | Type | Description | Required |
---|---|---|---|
pipe |
string | The name of the pipe to execute for this branch. | Yes |
result |
string | The name for this branch's output. Must be unique within the PipeParallel definition. |
Yes |
Example: Extracting different details from a text
Imagine you have a product description and you want to extract the product features and the product sentiment at the same time.
[concept.ProductAnalysis]
structure = "ProductAnalysis" # A Pydantic model with 'features' and 'sentiment' fields
[pipe.extract_features]
PipeLLM = "Extract features from text"
inputs = { description = "ProductDescription" }
output = "ProductFeatures"
[pipe.analyze_sentiment]
PipeLLM = "Analyze sentiment of text"
inputs = { description = "ProductDescription" }
output = "ProductSentiment"
# The PipeParallel definition
[pipe.analyze_product_in_parallel]
PipeParallel = "Extract features and sentiment at the same time"
inputs = { description = "ProductDescription" }
output = "ProductAnalysis" # This name is for the combined output
add_each_output = false
combined_output = "ProductAnalysis"
parallels = [
{ pipe = "extract_features", result = "features" },
{ pipe = "analyze_sentiment", result = "sentiment" },
]
How this works:
1. The analyze_product_in_parallel
pipe starts. It receives a ProductDescription
.
2. Two parallel branches are created, both with access to the ProductDescription
.
3. The extract_features
pipe runs in one branch, and the analyze_sentiment
pipe runs in the other, simultaneously.
4. After both complete, PipeParallel
collects the results. The output from extract_features
is named features
, and the output from analyze_sentiment
is named sentiment
.
5. Because combined_output
is set to ProductAnalysis
, a new structured object of type ProductAnalysis
is created. This object is populated with the results, like {"features": ..., "sentiment": ...}
.
6. This single ProductAnalysis
object becomes the final output of the analyze_product_in_parallel
pipe.