Skip to content

PipeBatch

The PipeBatch controller provides a powerful "map" operation for your pipelines. It takes a list of items as input and runs the same pipe on each item in the list, executing the branches concurrently for efficiency.

This is the ideal controller for processing collections of documents, images, or any other data records where the same logic needs to be applied to each one independently.

How it works

PipeBatch orchestrates a parallel, per-item execution of a single "branch pipe".

  1. Input List: It identifies an input list from the working memory.
  2. Branching: For each item in the input list, it creates a new, isolated execution branch.
  3. Isolation & Injection: Each branch gets a deep copy of the WorkingMemory. The specific item for that branch is injected into this memory with a defined name.
  4. Concurrent Execution: The specified branch_pipe_code runs across the branches concurrently — in bounded chunks, by default at most max_concurrency branches at a time. Each branch pipe operates only on its own item.
  5. Aggregation: After all branches have completed, PipeBatch collects the individual output from each one and aggregates them into a single new list. This list becomes the final output of the PipeBatch pipe.

Concurrency

PipeBatch does not spawn every branch at once. Branches run in bounded concurrent chunks, capped by the max_concurrency setting under [pipelex.pipeline_execution_config] (default 8). This keeps a large batch — one pipe over thousands of items — from overwhelming memory, the asyncio event loop, and provider rate limits.

To restore unbounded fan-out (every branch started at once), set max_concurrency = "unbounded".

Results always preserve input order regardless of the concurrency bound. If a branch fails, the failure propagates and the first error by input index wins.

For durable, rate-limited execution of very large batches, run the pipeline on the Temporal track.

Configuration

PipeBatch is configured in your pipeline's .mthds file.

MTHDS Parameters

Parameter Type Description Required
type string The type of the pipe: PipeBatch Yes
description string A description of the batch operation. Yes
inputs dictionary The input concept(s) for the batch operation, as a dictionary mapping input names to concept codes. Yes
output string The output concept produced by the batch operation. Yes
branch_pipe_code string The name of the single pipe to execute for each item in the input list. Yes
batch_params table (dict) An optional table to provide more specific names for the batch operation. No

Batch Parameters (batch_params)

Key Type Description Required
input_list_stuff_name string The name of the list in the WorkingMemory to iterate over. If not provided, it defaults to the name of the PipeBatch's main input. No
input_item_stuff_name string The name that an individual item from the list will have inside its execution branch. This is how the branch pipe finds its input. Yes

Example: Summarizing a list of articles

Suppose you have a list of articles and you want to generate a summary for each one.

# The pipe that knows how to summarize one article
[pipe.summarize_one_article]
type = "PipeLLM"
description = "Summarize a single article"
inputs = { article = "ArticleText" }
output = "ArticleSummary"
prompt = "Please provide a one-sentence summary of the following article:\n\n@article_text"

# The PipeBatch definition
[pipe.summarize_all_articles]
type = "PipeBatch"
description = "Summarize a batch of articles in parallel"
inputs = { articles = "ArticleList" }  # This is the list to iterate over
output = "SummaryList" # This will be the list of summaries
branch_pipe_code = "summarize_one_article"

input_item_name = "ArticleText" # Name of an item within the branch

How this works: 1. The summarize_all_articles pipe receives an ArticleList. Let's say it contains 10 articles. 2. PipeBatch creates 10 parallel branches. 3. In branch #1, it takes the first article from ArticleList, puts it into the branch's isolated working memory, and gives it the name ArticleText (as specified by input_item_name). 4. The summarize_one_article pipe is then executed in branch #1. It looks for an input named ArticleText, finds the injected article, and produces a summary. 5. Steps 3 and 4 run concurrently across branches, up to max_concurrency at a time (8 by default), until all 10 articles are processed. 6. Once all summarize_one_article pipes are done, PipeBatch collects the 10 ArticleSummary outputs and bundles them into a single SummaryList. This list is the final result.