Skip to content

Error Model

In Pipelex, an error is data, not a control-flow accident. Every failure is classified once — at the layer that knows the most about it — and that classification travels intact to every consumer: the human reading a Rich panel, the agent parsing JSON, the Temporal retry engine, and the HTTP adapter picking a status code.

This page covers the contract that makes that possible: the ErrorReport schema, the classification enums, how inference workers classify SDK exceptions, how classification survives every wrapping layer, and how it crosses the Temporal boundary.


Design Principle

Three rules hold across the codebase, and everything else builds on them.

Single-rooted hierarchy. Every custom exception inherits from PipelexError (pipelex/base_exceptions.py). There is one root, so one to_error_report() contract covers the whole tree.

Classify at the source, never lose it. The layer that catches a third-party exception knows the most about it. It classifies there. Every layer above is a wrapper — it adds context (pipe code, stack) but inherits the classification rather than re-deriving or discarding it.

No broad catches in business logic. except Exception is allowed only at CLI entry points and async task roots. Ruff rule BLE001 enforces this — an unexpected exception crashes loudly instead of being silently swallowed.

Why classify, instead of just propagating the exception?

A raw openai.RateLimitError tells a Python except clause what to catch, but it does not tell the Temporal retry engine whether to retry, the HTTP adapter which status to emit, or an agent whether the failure is the user's fault. Classification turns an exception into a decision input that every consumer can act on uniformly.


The Layer Model

An error rises through a series of layers. Each layer has exactly one job.

Layer Role What it does with errors
5 — CLI entry points pipelex / pipelex-agent commands Catch, format for human (Rich) / agent (JSON·MD) / HTTP
4 — CLI factories cli_factory.py, agent_cli_factory.py Catch setup errors, route to handlers
3 — Pipeline runner PipelexRunner.execute_pipeline() Catch + wrap as PipelineExecutionError
2 — Pipe router / operators PipeRouter, pipe operators Catch + wrap with pipe context (pipe_code, pipe_stack)
1 — Workers / SDK calls pipelex/plugins/*/ Catch the SDK exception → classify → raise CogtError
0 — Third-party SDKs OpenAI, Anthropic, Google, … Raise raw, untyped provider exceptions

Classification happens once, at Layer 1. Layers 2–5 are wrappers: they attach context as they catch and re-raise, but the error_category, error_domain, model, and provider set at Layer 1 reach Layer 5 unchanged (see Cause-Chain Enrichment).


ErrorReport — the Serialization Schema

ErrorReport (pipelex/base_exceptions.py) is the single source of truth for error serialization. It is a frozen Pydantic dataclass with extra="forbid".

Field Type Meaning
error_type str The exception class name
message str Human-readable message
error_category str \| None InferenceErrorCategory value (inference errors only)
error_domain str \| None ErrorDomain value — input / config / runtime
retryable bool \| None Whether a retry could succeed
user_action UserAction \| None Typed advice — kind + free-form detail
model str \| None Model handle, when the failure is attributable to one
provider str \| None Backend name, when attributable
provider_metadata ProviderErrorMetadata \| None SDK metadata — status code, request id, retry_after

PipelexError.to_error_report() is the entry point. to_dict() serializes, dropping None fields; from_dict() is its strict inverse.

report = exc.to_error_report()
report.to_dict()         # {"error_type": "LLMCompletionError", "message": "...", ...}
ErrorReport.from_dict(d) # strict inverse — raises ValidationError on a malformed dict
report.http_status       # 422 / 429 / 500 — for HTTP adapters

ErrorReport is extra="forbid"

from_dict() rejects unknown keys. When recovering a report across a version boundary (e.g. a Temporal payload from a newer worker), trim to known fields before calling from_dict()recover_error_report() does exactly this.


Classification Enums

Two StrEnums drive every downstream decision.

InferenceErrorCategory

Defined in pipelex/cogt/exceptions.py. Drives retry decisions — is_retryable is True only for TRANSIENT.

Category Meaning Retryable Typical cause
TRANSIENT A brief, self-correcting failure Rate limit, 5xx, connection blip
CONFIGURATION The setup is wrong Bad API key, missing backend
CONTENT The input or prompt is wrong Content-policy violation, bad prompt
CAPACITY Account quota / billing exhausted insufficient_quota, HTTP 402
AMBIGUOUS Outcome unknown — may have committed Connection dropped mid-request
UNKNOWN Could not classify Unrecognized inner exception
class InferenceErrorCategory(StrEnum):
    TRANSIENT = "transient"
    # ... CONFIGURATION, CONTENT, CAPACITY, AMBIGUOUS ...
    UNKNOWN = "unknown"

    @property
    def is_retryable(self) -> bool:
        match self:
            case InferenceErrorCategory.TRANSIENT:
                return True
            case _:  # all other categories
                return False

AMBIGUOUS vs UNKNOWN

AMBIGUOUS means the error type is known but the operation may or may not have committed — a blind retry is unsafe for a non-idempotent call. UNKNOWN means classification itself failed. Both are non-retryable, for different reasons.

ErrorDomain

Defined in pipelex/base_exceptions.py. Set as a class-level attribute on the exception, drives HTTP status.

Domain Meaning HTTP status Who can fix it
INPUT Caller sent something it can fix 422 The caller
CONFIG Environment / configuration change needed 500 The operator
RUNTIME A failure during execution 500 Depends on the cause

error_domain_to_http_status() is the pure mapping table. ErrorReport.http_status layers one rule on top: a provider 429 (provider_metadata.status_code == 429) takes precedence over the domain, so the API can emit a Retry-After header.

class PipelexConfigError(PipelexError):
    error_domain = ErrorDomain.CONFIG     # class-level — every instance carries it

Worker Classification

Layer 0 → Layer 1. Every inference worker under pipelex/plugins/*/ catches its SDK's typed exceptions and re-raises a categorized CogtError.

The Uniform Shape — Extract / Classify / Render

Every inference worker's SDK-exception handler collapses to a three-step pipeline: Extract turns the SDK exception into a provider-blind ProviderErrorMetadata, Classify maps that metadata to a category + user-action, and Render picks the CogtError subclass to raise.

except (APIError, APIConnectionError, APITimeoutError) as exc:
    metadata = extract_openai_metadata(exc)
    classification = classify_inference_error(metadata)
    raise render_llm_error(
        family=InferenceErrorFamily.LLM_COMPLETION,
        metadata=metadata,
        classification=classification,
        model_desc=self.inference_model.desc,
    ) from exc

The three steps live in three modules. Only the per-provider Extract functions stay plugin-local; Classify and Render are single shared functions.

Module Step What it owns
pipelex/cogt/inference/error_classification.py Extract ProviderErrorMetadata, SDKErrorEnvelope, UserAction, UserActionKind, the 12 extract_*_metadata functions, plus pure discriminators (is_quota_exhaustion, is_content_policy_violation, is_network_error) exposed as @property on the metadata
pipelex/cogt/inference/error_classify.py Classify classify_inference_error() — provider-blind mapping from ProviderErrorMetadataClassificationResult(category, user_action_kind, is_model_not_found)
pipelex/cogt/inference/error_render.py Render render_llm_error() / render_img_gen_error() / render_extract_error() / render_search_error() — picks the CogtError subclass from InferenceErrorFamily plus is_model_not_found (e.g. LLMModelNotFoundError vs LLMCompletionError)

Provider-specific nuance is normalized away in Extract (e.g. Google's code becomes status_code; AWS Bedrock error codes are mapped to HTTP statuses), so Classify has no provider branching. HTTP status drives classification; status-less errors dispatch on the SDK exception type name. The tests/unit/pipelex/cogt/inference/test_provider_classification_parity.py meta-test walks every ProviderName against the extract-fn registry so adding a new provider without wiring it fails fast.

ProviderErrorMetadata and UserAction

Every raised inference error carries structured SDK metadata and typed advice.

class ProviderErrorMetadata(BaseModel):
    provider: str
    sdk_exception_type: str
    status_code: int | None = None
    request_id: str | None = None
    retry_after_seconds: float | None = None
    provider_error_code: str | None = None
    body: Any | None = Field(default=None, exclude=True)   # may carry secrets

body is excluded from serialization

The raw provider response body can carry account ids, billing details, or credential fragments. It is held in-process but excluded from every serialized form — CLI JSON, agent output, Temporal details.

UserAction pairs a discrete UserActionKind (WAIT_AND_RETRY, CHECK_BILLING, CHECK_CREDENTIALS, CHANGE_INPUT, CHANGE_MODEL, CONTACT_SUPPORT, UNKNOWN) with a free-form detail string — so the CLI can render consistent guidance while keeping provider-specific text.

The instructor Unwrap

On structured-generation paths, instructor wraps the real SDK exception in an InstructorRetryException. extract_underlying_sdk_exception() recovers it, so it routes through the same per-provider categorization as the plain-text path. A genuinely unrecognized inner exception (e.g. a pydantic.ValidationError from a schema mismatch) lands in UNKNOWN rather than being mis-labelled as a CONTENT-policy violation.

Model and Provider Attribution

Inference-failure leaf errors (LLMCompletionError, ImgGenGenerationError, …) are raised deep inside a plugin and do not know which model handle invoked them. Each worker family fills that in at its public-method chokepoint:

def fill_model_and_provider(self, model_handle: str | None, backend_name: str | None) -> None:
    """Fill model_handle / backend_name from the worker, only when still unset."""

Cause-Chain Enrichment

A wrapper exception — PipeRunErrorPipeRouterErrorPipelineExecutionError — carries no error_category of its own. to_error_report() enriches the report from the __cause__ chain, so the inference classification survives every wrapping layer.

def _enrich_error_report_from_cause(self, report: ErrorReport) -> ErrorReport:
    cause = self.__cause__
    if not isinstance(cause, PipelexError):
        return report
    cause_report = cause.to_error_report()
    return ErrorReport(
        error_type=report.error_type,                                  # keep own identity
        message=report.message,
        error_category=report.error_category or cause_report.error_category,
        error_domain=report.error_domain or cause_report.error_domain,
        # ... retryable, user_action, model, provider, provider_metadata ...
    )

A wrapper keeps its own error_type and message but inherits every classification field it does not set itself.

Overrides must call the enrichment helper

A to_error_report() override on a subclass must end with self._enrich_error_report_from_cause(report). Otherwise that subclass becomes a black hole that drops the cause's classification. A cyclic-__cause__ guard ensures a malformed chain can never turn error reporting into a RecursionError.


The Temporal Error Bridge

When a pipe runs on a Temporal worker, the error must survive serialization across the activity → workflow → submitter boundary. Temporal's default failure converter would wrap a raw PipelexError without packing the ErrorReport or deriving the retry decision. The bridge closes that gap.

Activity Side — convert_pipelex_errors

A decorator applied beneath @activity.defn on every in-scope activity converts a PipelexError into a TemporalError.

@activity.defn
@convert_pipelex_errors
async def act_llm_gen_text(llm_assignment: LLMAssignment) -> str:
    return await llm_gen_text(llm_assignment=llm_assignment)

TemporalError.from_message_exception() does two things:

  • Derives non_retryable from InferenceErrorCategory.is_retryable for category-carrying errors. (The configured non_retryable_error_types class-name list is the fallback for category-less exceptions.)
  • Packs the reportto_error_report().to_dict() goes into ApplicationError.details, so workflow code keeps the full classification, not just a message string.

Submitter Side — recover_error_report

Once the failure returns to the process that submitted the workflow, recover_error_report() walks the __cause__ chain for the ApplicationError, pulls the details-packed dict, and rebuilds the ErrorReport.

def recover_error_report(exc: BaseException) -> ErrorReport | None:
    report_dict = _find_error_report_dict(exc)
    if report_dict is None:
        return None
    known = {f.name for f in fields(ErrorReport)}
    trimmed = {k: v for k, v in report_dict.items() if k in known}  # tolerate skew
    try:
        return ErrorReport.from_dict(trimmed)
    except ValidationError:
        return None

The recovered report is carried on WorkflowExecutionError(error_report=...), whose to_error_report() override returns it. Since WorkflowExecutionError is a PipelexError, PipelineExecutionError inherits the classification natively.

Version skew is tolerated

During a rolling deploy a worker and a submitter may run different Pipelex versions. Unknown keys are trimmed before validation, and a dict that still fails validation yields None — the error path degrades gracefully instead of crashing.

Net effect: a pipe failing on a Temporal worker reaches the CLI and HTTP adapters with the same error_category / retryable / model / provider / user_action as the identical failure run locally.


Interfaces

CLI

The agent CLI (pipelex-agent) emits a structured error to stderr, markdown by default and JSON with --error-format json. When --error-format is omitted it inherits the value of --format (the success-output flag) — so --format json still flips both as it did before the split. Both exit with code 1.

Command Error output
run, validate, init, models, check-model, doctor Markdown (default) or JSON via --error-format (or via --format, which --error-format inherits)
inputs, concept, pipe, accept-gateway-terms JSON only
fmt, lint Native plxt output (subprocess passthrough); falls back to JSON only when the plxt binary itself is missing

The human CLI (pipelex) renders a Rich error panel — red banner, structured fields, the user_action tip, doc/Discord links — through the shared display_error_panel() helper in pipelex/cli/error_handlers.py.

API

pipelex is a library — there is no API server in the package. Downstream HTTP repos consume the ErrorReport:

  • error_domain_to_http_status(error_domain) — pure domain → status table.
  • ErrorReport.http_status — full property, layering the provider-429 passthrough on top.

A downstream FastAPI exception handler calls ErrorReport.http_status and is a trivial adapter — it must not redefine the mapping.

Inputs and Outputs

Inputs. to_error_report() takes a live PipelexError. recover_error_report() takes any BaseException and walks its __cause__ chain. ErrorReport.from_dict() takes a to_dict() payload — strictly, raising ValidationError on drift.

Outputs. to_error_report() returns an ErrorReport; to_dict() returns a None-free dict. Side effects: telemetry events emitted on pipeline failure at Layer 3; the agent CLI writes to stderr and raises typer.Exit(1).


Architecture

flowchart TB
    SDK["Layer 0 — SDK exception<br/>(openai.RateLimitError)"]
    W["Layer 1 — Worker classifies<br/>is_quota_exhaustion_*() → CogtError<br/>+ InferenceErrorCategory + ProviderErrorMetadata"]
    WRAP["Layers 2-3 — Wrappers<br/>PipeRouterError → PipelineExecutionError<br/>(attach pipe context)"]
    REPORT["ErrorReport<br/>via to_error_report() + cause-chain enrichment"]

    SDK -->|"raise ... from exc"| W
    W -->|"raise ... from exc"| WRAP
    WRAP --> REPORT

    REPORT --> RICH["Human CLI<br/>Rich panel"]
    REPORT --> AGENT["Agent CLI<br/>JSON / Markdown"]
    REPORT --> HTTP["HTTP adapters<br/>.http_status"]

    W -.->|"@convert_pipelex_errors"| TEMP["Temporal bridge<br/>TemporalError → ApplicationError.details"]
    TEMP -.->|"recover_error_report()"| REPORT

    classDef src fill:#fff3e0,stroke:#e65100,color:#000
    classDef cls fill:#e8eaf6,stroke:#3949ab,color:#000
    classDef out fill:#e8f5e9,stroke:#2e7d32,color:#000
    class SDK src
    class W,WRAP,REPORT,TEMP cls
    class RICH,AGENT,HTTP out

Implementation

Class Hierarchy

PipelexError is the single root. CogtError is the inference branch — it overrides to_error_report() to add error_category, retryable, user_action, provider_metadata, and reads model_handle / backend_name from the instance.

Exception
└── PipelexError                  base_exceptions.py — error_domain, user_action, to_error_report()
    ├── PipelexConfigError         → error_domain = CONFIG
    ├── PipelexSetupError          → error_domain = CONFIG
    ├── CogtError                  cogt/exceptions.py — error_category, provider_metadata
    │   ├── LLMCompletionError      ← per-instance category from the worker
    │   ├── ImgGenGenerationError   ← per-instance category
    │   ├── ModelNotFoundError      ← sibling family raised on provider HTTP 404
    │   │   ├── LLMModelNotFoundError / ImgGenModelNotFoundError
    │   │   └── ExtractModelNotFoundError / SearchModelNotFoundError
    │   └── ... (see worker classification) ...
    ├── PipelineExecutionError      pipeline/exceptions.py — error_domain = RUNTIME
    └── ... (one exceptions.py per package) ...

Factory-time vs Runtime

When What carries metadata How
Class definition error_domain, error_category defaults, user_action defaults Class-level attributes — one source of truth per exception type
Raise time Per-instance error_category, user_action, provider_metadata Constructor args — set by the worker that classified the failure
Report time model, provider, cause-chain fields fill_model_and_provider() at the worker chokepoint; _enrich_error_report_from_cause() on to_error_report()

The "outcome" exceptions (LLMCompletionError, ImgGenGenerationError, ExtractJobFailureError, SearchJobFailureError) intentionally carry no class-level error_category — their category is genuinely per-instance, decided by the worker.


Reference

Quick-Ref

# Produce a report from any PipelexError
report = exc.to_error_report()          # enriched from the __cause__ chain
payload = report.to_dict()              # None-free dict for serialization

# Consume a report
report.http_status                      # 422 / 429 / 500
report.user_action_detail()             # free-form advice text, or None
report.error_category                   # "transient" / "capacity" / ...

# Round-trip across a boundary
ErrorReport.from_dict(payload)           # strict inverse of to_dict()
recover_error_report(temporal_failure)   # walk __cause__ for an ApplicationError

# Retry decision
InferenceErrorCategory.TRANSIENT.is_retryable   # True — only TRANSIENT

File → Purpose

File Purpose
pipelex/base_exceptions.py PipelexError, ErrorReport, ErrorDomain, error_domain_to_http_status()
pipelex/cogt/exceptions.py CogtError, InferenceErrorCategory
pipelex/cogt/inference/error_classification.py Extract — ProviderErrorMetadata, SDKErrorEnvelope, UserAction, UserActionKind, per-provider extract_*_metadata functions, pure discriminators
pipelex/cogt/inference/error_classify.py Classify — classify_inference_error(), ClassificationResult
pipelex/cogt/inference/error_render.py Render — render_llm_error() / render_img_gen_error() / render_extract_error() / render_search_error(), InferenceErrorFamily
pipelex/cogt/inference/provider_name.py ProviderName enum keying the extract-fn registry
pipelex/plugins/*/ Per-provider inference workers — Layer 0 → 1 classification
pipelex/pipeline/exceptions.py PipelineExecutionError, PipeExecutionError
pipelex/temporal/tprl/temporal_error.py TemporalError, from_message_exception, recover_error_report
pipelex/temporal/tprl/activity_error_boundary.py convert_pipelex_errors decorator
pipelex/temporal/tprl/workflow_caller.py WorkflowExecutor, WorkflowExecutionError recovery
pipelex/cli/error_handlers.py Human CLI Rich panels — display_error_panel()
pipelex/cli/agent_cli/commands/agent_output.py Agent CLI JSON / markdown delivery

Behavior Summary

Scenario Behavior
Rate limit hit TRANSIENT → retryable; transport retry honors Retry-After
Quota / billing exhausted CAPACITY → non-retryable; UserAction(CHECK_BILLING)
Bad API key CONFIGURATION → non-retryable; error_domain = CONFIG → HTTP 500
Model or deployment not found (provider HTTP 404) Raises a dedicated *ModelNotFoundError sibling (LLMModelNotFoundError, ImgGenModelNotFoundError, ExtractModelNotFoundError, SearchModelNotFoundError); operator re-raises PipeOperatorModelAvailabilityError
Content-policy violation CONTENT → non-retryable; UserAction(CHANGE_INPUT)
LLM returns schema-mismatched JSON instructor re-asks; if exhausted → UNKNOWN
Connection dropped mid-request AMBIGUOUS → non-retryable (outcome unknown)
Wrapper exception (no own category) Inherits cause's classification via enrichment
Failure on a Temporal worker ErrorReport recovered from ApplicationError.details — same classification as local
Worker/submitter version skew Unknown keys trimmed; unrecoverable dict yields None

Next Steps