Runtime Bridge & Transport
This page is for contributors working on Pipelex internals. For the capability overview, see the user-facing Distributed Execution page instead.
Pipelex runs a pipe either direct (everything in one Python process) or distributed on a host runtime (a separate worker process, potentially on another machine). Both paths reach pipe execution through the same framework-agnostic runtime bridge. This page documents that bridge backend-neutrally: the boundary data types, how a library snapshot and working memory cross a process boundary, and how concurrent runs stay isolated. The concrete realizations — Temporal-backed durable execution and the Mistral Workflows integration — are commercial Pipelex platform capabilities; their backend-specific topology lives in their own plugin repos.
A commercial capability rides on this seam
Embedding a Pipelex pipe run into a host runtime — so it executes as durable, distributed workflows — is part of Pipelex's workflow-orchestration offer. This page documents the open seam and the open boundary types; the cross-process plumbing that drives them lives in the closed pipelex-transport library (see Where open core ends below). For the orchestrator extension point a third-party backend compiles against, see Orchestrator Plugins.
Direct vs distributed, one bridge
| Mode | Where pipes run | How the PipeJob travels |
|---|---|---|
| Direct | All pipes in the same process. Library, class registry, and pipe resolution are shared in-memory. | Stays in-process; no serialization. |
| Distributed | The PipeJob is serialized and sent to a worker — a separate process. | Crosses the boundary as structured JSON, carrying a serializable library snapshot. |
Both modes share the same pipe definitions, library loading, and controller logic. The only difference is where a pipe runs and how its PipeJob gets there. Distributing execution across separate processes introduces problems that don't exist in-process — those problems, and their solutions, are this page's subject. For the routing layer that sits above the bridge (how a controller dispatches its child pipes in each mode), see Pipe Routing & Execution.
The boundary data types
Three open-core Pydantic models are everything that crosses a process boundary.
PipeJob — the unit of execution
PipeJob (pipelex/pipe_run/pipe_job.py) carries everything needed to run a single pipe:
| Field | Purpose |
|---|---|
pipe |
The resolved pipe object (concrete operator or controller). |
working_memory |
Runtime data store — typed Stuff objects keyed by variable name. |
working_memory_raw |
Raw JSON dict of working memory for deferred hydration (cross-process only). |
pipe_run_params |
Execution config: run mode (LIVE/DRY), output multiplicity, pipe stack for cycle detection. |
job_metadata |
Pipeline run ID, user ID, tracing context. |
library_crate |
Serializable library snapshot for distributed execution. |
PipeOutput — the result that comes back
PipeOutput (pipelex/core/pipes/pipe_output.py) carries the run result. Like PipeJob, it has both a typed working_memory and a working_memory_raw dict — the raw field is how a result travels back across a boundary without requiring class resolution at every hop (see Deferred hydration).
LibraryCrate — a serializable library snapshot
A LibraryCrate (pipelex/libraries/library_crate.py) is a flat, serializable snapshot of the library — the pipes and concepts a worker needs to run the job:
class LibraryCrate(BaseModel):
concepts: dict[str, ConceptBlueprint | str] # concept ref -> blueprint or description
pipes: dict[str, PipeBlueprintUnion] # pipe ref -> blueprint
domains: dict[str, DomainBlueprint] # domain code -> domain metadata
source_map: dict[str, str] # ref -> source file (error traceability)
fingerprint: str # SHA256 of serialized content
Domain is encoded in the dictionary keys (e.g., scoring.WeightedScore, scoring.compute_score), not in a structural container. source_map lets error messages trace back to origin files. The fingerprint enables idempotent loading — a worker that already loaded a crate with the same fingerprint skips the reload. The crate is a plain Pydantic model, so a host runtime serializes it as structured JSON in the job input — fully inspectable, not opaque bytes.
The three cross-process challenges
Sending a pipe run to a separate worker process introduces three challenges that single-process execution never faces:
- Pipe resolution — Controllers call
get_required_pipe()to resolve child pipes. The worker's library must contain those pipes. → solved by LibraryCrate propagation. - Dynamic class deserialization —
.mthdsbundles generate Python classes at runtime (e.g.,RawTextinheriting fromTextContent). A receiving process must have those classes registered before it can deserializeWorkingMemory. → solved by deferred hydration. - Concurrent isolation — One worker may run many jobs at once. Two jobs that each define a concept named
Resultwith different structures must not share a class registry or library. → solved by per-call scoping.
LibraryCrate propagation
The submitter builds a LibraryCrate from its loaded library (pipelex/libraries/library_crate_factory.py) and attaches it to the PipeJob. On the worker, the job's entry point loads the crate into a scoped library, registering the bundle's dynamic concept classes, so get_required_pipe() resolves every pipe at every level of nesting. Because a distributed run can fan out into nested jobs (a controller's child pipes), every nested job carries the same crate; loading is idempotent via the fingerprint, so re-receiving a crate already loaded on a worker is a no-op.
Deferred hydration
The chicken-and-egg problem
A host runtime's data converter deserializes a payload (a job input, a return value) before any bundle code has run — but reconstructing a typed WorkingMemory needs the bundle's dynamic concept classes registered, and those only get registered when the LibraryCrate is loaded inside the job. Two deserialization gaps follow:
- Input — the
PipeJobinput containsWorkingMemorywhose dynamic classes don't exist yet on the worker. - Output — a nested job's
PipeOutputreturn value containsWorkingMemorywith dynamic classes, and the parent's data converter typically runs outside the job's scope, so it can't reach the per-job class registry.
The solution: carry working memory raw
Both PipeJob and PipeOutput carry a working_memory_raw field — a plain JSON dict — alongside the typed working_memory. The raw dict needs no class resolution to cross a boundary, which decouples transport from class registration.
- Input dehydration — before dispatch, the working memory is moved into
working_memory_raw(a plain dict produced byWorkingMemory.dump_for_transport(), which stays in open core atpipelex/core/memory/working_memory.py). After the worker loads the crate, the job's entry point hydrates the raw dict back to a typedWorkingMemory, inside the job's scope. - Output dehydration — before a nested job returns, its working memory is dehydrated to
working_memory_raw. The raw dict then flows end-to-end — nested job → parent → delivery — without intermediate rehydration, because parents and delivery workers generally have not loaded the crate. Only the top-level submitter rehydrates.
The worker-side hydration helper (pipelex/runtime_bridge/primitives/hydration.py) is open core: it iterates the raw dict, uses StuffContentFactory to rebuild typed content from the registered classes, and preserves aliases. It also guards against cross-exec class-identity mismatches (_validate_as_known_class round-trips items through model_dump()): the dynamic classes a worker re-execs from the crate are new Python identities sharing a name with whatever a data converter eagerly rebuilt, and a naive model_validate would reject the older instance.
Boundary-by-boundary (backend-neutral)
| Boundary | Direction | What crosses | Hydrate on receive? |
|---|---|---|---|
| Submitter → top worker job | input | PipeJob (working_memory_raw + library_crate) |
Yes — worker loads crate, then hydrates in scope |
| Parent job → nested job | input | Same PipeJob shape (crate re-loaded idempotently) |
Yes — in the nested job's own scope |
| Nested job → parent | output | PipeOutput (working_memory_raw) |
No — parent forwards the raw dict |
| Parent job → delivery | result arg | PipeOutput (working_memory_raw, no crate) |
No — delivery renders from raw, best-effort local hydration of the main stuff using only globally registered classes |
| Top worker job → submitter | output | PipeOutput (working_memory_raw) |
Yes — the submitter rehydrates, opening a fresh scoped library if a crate is available |
Because delivery renders from the raw dict (pipelex/pipe_run/delivery_executor.py branches on which field is populated), a delivery worker never needs the crate loaded — which is the precondition for true distributed execution. Dynamic concepts lose typed rendering on the delivery worker (a generic field-walking fallback still produces readable JSON/HTML), but the worker stays crate-free.
Where the transport prep lives
Moving working memory into working_memory_raw for a job, dehydrating a result before return, and the submitter-side rehydration are part of the closed pipelex-transport library — they are the cross-process plumbing. The raw field itself, dump_for_transport, and the worker-side hydration helper stay open core, because they are host-agnostic and are also exercised by the open pipelex-api runner.
Per-call scoping
A single worker may process many concurrent jobs. If two jobs define a concept named Result with different fields, they must not share a ClassRegistry or Library — otherwise the second registration overwrites the first and silently corrupts data. Each job therefore creates its own scoped state:
- ClassRegistry — a fresh
ClassRegistrypre-seeded from the global registry (which holds the base classes fromPIPELEXPATH). The crate's dynamic classes register here, never in the global registry. - Library — a fresh
Library(library_manager.open_fresh_library()) with thatClassRegistryattached, set as current via aContextVarkeyed by the run id. Run-id keying (not job/workflow id) is deliberate: a reused workflow id across retries/resets could otherwise let a closed predecessor's cleanup tear down a live successor's library on the same worker. - Lookup chain —
hub.get_class_registry()reads theContextVar, returns the scoped library's registry, and falls back to the global registry when no library is set (e.g. a data converter running outside the job scope — which is exactly why output uses deferred hydration). - Cleanup —
library_manager.teardown(library_id)drops the library and its registry; theContextVaris reset. No manual GC needed.
The hub's class-registry and library scoping (pipelex/hub.py, pipelex/libraries/library.py, pipelex/libraries/library_manager.py) is open core — the same machinery serves direct execution, the open runner, and any host-runtime plugin.
Where open core ends and pipelex-transport begins
The boundary types and the host-agnostic helpers are open (MIT, in the pipelex distribution):
| Concern | Open core (pipelex) |
|---|---|
| Boundary DTOs | PipeJob, PipeOutput, LibraryCrate, runtime_bridge/payloads.py |
| Serialization | runtime_bridge/serialization.py |
| Mode / delivery model | orchestration_mode, DeliveryMode, the orchestrator registry / SPI |
| Working-memory raw + dump | working_memory_raw, WorkingMemory.dump_for_transport |
| Worker-side hydration | runtime_bridge/primitives/hydration.py |
The cross-process plumbing that drives a pipe run across the boundary — the host-side bridge entry point, the transport-prep step, and the dispatch primitives (submitter rehydration, delivery, trace flush, scoped-library, pipe classification, and the submit-arg envelope) — is closed (the proprietary pipelex-transport library, installed via git+ssh, pinned to an exact core rev). Embedding Pipelex into a host runtime is a commercial capability, not an open extension seam.
A third-party orchestrator compiles against the open surface only (the Orchestrator SPI); Pipelex's own host-runtime plugins (pipelex-temporal, pipelex-mistralai-workflows) additionally compile against pipelex-transport.
Next steps
- Orchestrator Plugins — the orchestration-mode seam, the Orchestrator SPI, and the open/closed boundary in contract terms.
- Pipe Routing & Execution — how PipeJobs are created and routed in direct and distributed modes.
- Content Generation Across Boundaries — how dynamic Pydantic models and large binary payloads cross a worker boundary.
- Distributed Execution — the user-facing capability overview, and the workflow-orchestration offer (Temporal-backed durable execution, the Mistral Workflows integration).