Temporal Integration
This page is for contributors working on Pipelex internals. If you're deploying or operating Pipelex on Temporal, see the user-facing Distributed Execution with Temporal guide instead.
This page covers the Temporal-specific mechanisms that enable distributed pipe execution. For the general pipe routing architecture (shared between direct and distributed modes), see Pipe Routing & Execution.
The Three Challenges
Distributing pipe execution across separate worker processes introduces three challenges that don't exist in direct (single-process) mode:
- Pipe resolution — Controllers call
get_required_pipe()to resolve child pipes. The worker's library must contain those pipes. - Dynamic class deserialization —
.mthdsbundles generate Python classes at runtime (e.g.,RawTextinheriting fromTextContent). The worker must have those classes registered before deserializingWorkingMemory. - Concurrent isolation — Multiple workflows running on the same worker may define conflicting class/pipe names. Each workflow needs its own scoped registry and library.
These are solved by three mechanisms: LibraryCrate propagation, deferred hydration, and per-workflow scoping.
LibraryCrate
A LibraryCrate is a serializable snapshot of the library — two flat dicts of blueprints:
class LibraryCrate(BaseModel):
concepts: dict[str, ConceptBlueprint | str] # concept_ref -> blueprint or description
pipes: dict[str, PipeBlueprintUnion] # pipe_ref -> blueprint
domains: dict[str, DomainBlueprint] # domain_code -> domain metadata
source_map: dict[str, str] # ref -> source file (error traceability)
fingerprint: str # SHA256 of serialized content
Domain is encoded in the dictionary keys (e.g., scoring.WeightedScore, scoring.compute_score), not in a structural container. domains carries per-domain metadata (description, system prompt). source_map enables error messages that trace back to origin files. The fingerprint enables idempotent loading — if a worker already loaded a crate with the same fingerprint, it skips the reload.
How it propagates
sequenceDiagram
participant S as Submitter
participant W as Worker
S->>S: Load library, build LibraryCrate
S->>W: PipeJob (crate attached)
Note over W: WfPipeRouter.run()
W->>W: load_from_crate(crate)
W->>W: pipe.run_pipe()
Note over W: Controller dispatches child
W->>W: PipeRouterChild → child workflow
Note over W: Child WfPipeRouter.run()
W->>W: load_from_crate(same crate)
W->>W: child pipe.run_pipe()
W->>S: PipeOutput (dehydrated)
S->>S: Hydrate PipeOutput
Every child workflow receives the crate through its PipeJob. Each WfPipeRouter loads the crate into its own scoped library. This ensures every workflow — parent or child, on any worker — can resolve all pipes.
Visibility
The LibraryCrate is a Pydantic model. Temporal serializes it as structured JSON in the workflow input, making it fully visible in the Temporal dashboard — not opaque bytes.
Deferred Hydration
The chicken-and-egg problem
Temporal's data converter deserializes payloads (workflow inputs and return values) using Kajson, which needs dynamic concept classes registered to reconstruct typed WorkingMemory. But those classes only get registered when the LibraryCrate is loaded inside the workflow. This creates two deserialization gaps:
- Input: The
PipeJobworkflow input containsWorkingMemorywith dynamic classes that don't exist yet on the worker. - Output: A child workflow's
PipeOutputreturn value containsWorkingMemorywith dynamic classes. The parent's data converter runs in the Temporal SDK's event processing context (outside the workflow coroutine'sContextVarscope), so it can't access the parent's per-workflow class registry.
The solution
Both PipeJob and PipeOutput carry a working_memory_raw field for deferred hydration:
class PipeJob(BaseModel):
working_memory: WorkingMemory | None = None
working_memory_raw: dict[str, Any] | None = None
library_crate: LibraryCrate | None = None
class PipeOutput(PipeOutputAbstract[WorkingMemory]):
working_memory: WorkingMemory = Field(default_factory=WorkingMemory)
working_memory_raw: dict[str, Any] | None = None
Input dehydration/hydration cycle:
- Before dispatch,
PipeJob.prepare_for_temporal()movesworking_memorytoworking_memory_raw(a plain dict viadump_for_temporal()). - After the worker loads the crate,
WfPipeRouterhydrates the raw dict back to typedWorkingMemory.
Output dehydration/hydration cycle:
- Before returning from a workflow,
WfPipeRoutercallsPipeOutput.prepare_for_temporal()to dehydrateWorkingMemorytoworking_memory_raw. - The raw dict flows end-to-end: child workflow → parent
WfPipeRun→act_deliveractivity, all without intermediate rehydration. Parents do not own a per-workflow registry that would let them rehydrate the child's output, and the activity worker is typically a different process that has not loaded the crate. - Only the top-level submitter rehydrates, via
rehydrate_pipe_output_with_crate()(pipelex/temporal/tprl_pipe/submitter_hydration.py). When the submitter has not loaded the bundle locally (e.g., a remote API client that built thePipeJobfrom a serializedLibraryCrate), this function opens a fresh per-call scoped Library, loads the crate into a scopedClassRegistrypre-seeded from the global, hydrates inside the scope, then tears down — leaving the submitter's global registry untouched.
The hydration function (pipelex/temporal/tprl_pipe/hydration.py) iterates the raw dict, uses StuffContentFactory to reconstruct typed content objects using the registered classes, and preserves aliases. It also includes a small but unintuitive guard (_validate_as_known_class) that round-trips items through model_dump() to defeat cross-exec class-identity mismatches: kajson may eagerly rebuild instances using one class identity, then load_from_crate re-execs the source and replaces the registry entry with a new identity (same name, different id()), which would otherwise cause Pydantic's model_validate to reject the older instance.
Why output dehydration goes end-to-end
Earlier designs had the parent WfPipeRun rehydrate the child's PipeOutput before scheduling act_deliver, then propagated dynamic classes from the per-workflow registry into the worker's global KajsonManager registry so the activity worker could decode the typed payload. This worked on a single worker but broke under distributed Temporal: any worker polling the task queue can pick up the activity, and ContextVars / per-worker globals do not cross processes. The current design eliminates that hack — act_deliver receives the raw dict directly, never needs class resolution for transport, and the activity worker does not need the crate loaded.
Activity-side rendering of the raw working memory
act_deliver cannot assume dynamic concept classes are registered on its worker. DeliveryExecutor.generate_result_files() (pipelex/pipe_run/delivery_executor.py) therefore branches on which field is populated:
- Typed path (
working_memoryset, in-process / same-worker): existing typed renderers run unchanged. - Raw path (
working_memory_rawset, cross-process Temporal):working_memory.jsonis written directly from the raw dict; for the main stuff,try_local_hydrate_stuff()attempts a single-Stuff hydration using only globally registered classes (built-inStuffContentsubclasses are always present). On success, typed renderers producemain_stuff.{md,html}and the viewer; on miss (dynamic concept class not registered locally), a generic field-walking fallback renders JSON-in-markdown and<pre>-in-HTML so users still get readable output. Misses log a warning so silent regressions on built-in hydration surface.
Tradeoff: dynamic concepts lose typed rendering on the activity worker, but the activity worker no longer needs the crate, which is the precondition for true distributed execution.
Boundary-by-boundary summary
| Boundary | Direction | What crosses | Hydration on receive? |
|---|---|---|---|
Submitter → top WfPipeRouter |
input | PipeJob with working_memory_raw + library_crate |
Yes — worker loads crate, then hydrates inside scope |
Parent WfPipeRouter → child WfPipeRouter |
input | Same PipeJob shape (crate re-loaded idempotently via fingerprint) |
Yes — child worker loads crate, hydrates inside its own scope |
Child WfPipeRouter → parent WfPipeRun |
output | PipeOutput with working_memory_raw |
No — parent forwards raw dict |
Parent WfPipeRun → act_deliver |
activity arg | PipeOutput with working_memory_raw (no crate) |
No — activity renders from raw, with best-effort local hydration of the main stuff using only globally registered classes |
Top WfPipeRouter → submitter |
output | PipeOutput with working_memory_raw |
Yes — rehydrate_pipe_output_with_crate() opens a fresh scoped Library if a crate is available, otherwise falls back to the global registry |
Dashboard visibility
working_memory_raw is a plain JSON dict — fully readable in the Temporal dashboard without needing class resolution.
Per-Workflow Scoping
Why per-workflow isolation matters
A single Temporal worker may process multiple concurrent workflows. If two workflows define a concept named Result with different structures (e.g., Result(score, label) vs Result(value, confidence, is_valid)), they must not share the same ClassRegistry or Library. Without isolation, the second workflow's class registration would overwrite the first's, causing silent data corruption.
How it works
Each WfPipeRouter.run() creates its own scoped state:
-
ClassRegistry: A new
ClassRegistrypre-seeded from the global registry (which has base classes fromPIPELEXPATH). Dynamic classes from the crate are registered here — not in the global registry. -
Library: A new
Libraryinstance opened vialibrary_manager.open_fresh_library(), with theClassRegistryattached as aPrivateAttr. The library is set as the current library via the_library_idContextVar. The library id is keyed by the run id (wf_{run_id}) — replay-stable but unique per run. Keying by workflow id would collide across runs of a reused workflow id (workflow-level retry policy, Temporal reset, resubmission of the samepipeline_run_id): a closed predecessor run's late eviction cleanup could tear down a live successor run's library on the same worker. With run-id keying, a pre-existing library under the id can only be this same run's own leftover (an interrupted execution whose cleanup never ran), soopen_fresh_librarytears it down before opening: reusing it would fingerprint-skip the crate load against the fresh registry — the crate's dynamic classes would never land in it. The stale teardown is best-effort: the manager forgets the entry pop-first, and a raising teardown is logged and never fails the fresh open (worker-local leak state must not decide setup success). The same run-id keying applies to the rest of the per-run worker-local state: the graph tracer key and the report-delegate event-log context key. -
ClassRegistry lookup chain:
hub.get_class_registry()reads_library_idfrom theContextVar, gets the library's attachedClassRegistry. Falls back to the global registry if no library is set. -
Cleanup:
library_manager.teardown(library_id)deletes the library and itsClassRegistry. TheContextVaris reset viaclear_current_library(). No manual GC needed — theClassRegistryis garbage-collected with theLibrary. In the workflow'sfinallyblock, this worker-local cleanup runs BEFORE the awaitedact_flush_trace_eventsactivity: the await is a suspension point where an eviction-timeBaseExceptioncan abort the rest of the block, so ordering cleanup first guarantees an eviction can only skip the flush itself, never the teardown.
Kajson integration
The Temporal data converter passes the active ClassRegistry to kajson.loads():
pydantic_gizmo = kajson.loads(data, class_registry=get_class_registry())
Inside a workflow coroutine, get_class_registry() returns the per-workflow scoped registry (via _library_id ContextVar). However, the data converter is also called from the Temporal SDK's event processing context (e.g., when deserializing child workflow return values), where the ContextVar is not set. In that case, it falls back to the global registry. This is why PipeOutput uses deferred hydration — the working_memory_raw dict doesn't require class resolution during deserialization.
Workflow Topology
A single pipe execution can create many Temporal workflows. Each level of nesting dispatches child workflows:
PipeSequence (3 steps)
→ 1 parent WfPipeRouter
→ 3 child WfPipeRouters (one per step)
Each child step (if PipeLLM):
→ 1 act_llm_gen_text activity (dispatched directly from WfPipeRouter
via ContentGeneratorInWorkflow.make_llm_text)
PipeParallel (2 branches, then summary)
→ 1 parent WfPipeRouter
→ 3 child WfPipeRouters (2 branches concurrent + 1 summary)
PipeBatch (3 items)
→ 1 parent WfPipeRouter
→ 1 child WfPipeRouter for the PipeBatch controller
→ 3 child WfPipeRouters (one per item)
Each WfPipeRouter loads the LibraryCrate (idempotent via fingerprint), so pipe resolution works at every level.
Controller Dispatch Patterns on Temporal
Each controller type creates a different child workflow pattern:
| Controller | Dispatch pattern | Child workflows |
|---|---|---|
| PipeSequence | Sequential: each step via SubPipe.run_pipe() → PipeRouterChild |
N children (one per step), executed sequentially |
| PipeParallel | Concurrent: all branches via SubPipe.run_pipe() + asyncio.gather() |
N children (one per branch), executed concurrently |
| PipeBatch | Fan-out: each list item via SubPipe.run_pipe() |
N children (one per item), executed concurrently |
| PipeCondition | Conditional: evaluates expression, dispatches selected outcome via SubPipe.run_pipe() |
1 child (the chosen outcome pipe), plus an internal act_jinja2_gen_text activity for expression evaluation |
Internal templating activities
Some controllers dispatch internal act_jinja2_gen_text activities in addition to pipe routing:
- PipeCondition:
act_jinja2_gen_textto evaluate the expression template against working memory - PipeCompose:
act_jinja2_gen_textto resolve construct field references (e.g.,{ from = "title_text.text" })
Both call ContentGeneratorInWorkflow.make_templated_text from inside WfPipeRouter, which dispatches the activity directly via workflow.execute_activity(act_jinja2_gen_text, ...). These activities carry working memory contents through the Temporal data converter, which creates a serialization dependency on working memory content types.
Validation Dispatch: One In-Process Activity
Pipeline execution fans out across workflows and activities as described above — but validation deliberately does the opposite. When Temporal is enabled, a /validate job (validation sweep + graph-producing dry-run) dispatches as the one-step wrapper workflow wf_dry_validate, which runs a single act_dry_validate activity and returns everything the canonical validation report needs that must be computed worker-side, in one round-trip: the per-pipe status map, the best-effort GraphSpec, the library-wide pending_signatures, and the pipe_io_contracts IO contracts (built inside the worker's library window — their JSON-Schema rendering resolves bundle-defined structure classes through the loaded library's class registry, so the API side never needs to re-acquire a library).
Inside the activity everything is in-process and in-memory:
- The library is loaded once: the sweep half is
validate_bundleitself (the same function the direct-mode route calls, so both backends share the categorizedValidateBundleErrorcontract), which leaves the library loaded; the graph dry-run (dry_run_pipe_in_process) runs against that same library; teardown happens once in the activity'sfinally. - ContextVar scopes pin the run in-process:
scoped_pipe_router(nested controller sub-pipes resolve a local router, not theTemporalPipeRouter),scoped_content_generator(inference leaves resolve an inline dry generator, notContentGeneratorInWorkflow), andscoped_event_log(the graph traces into a sharedInMemoryEventLog— no NDJSON file, no DynamoDB round-trip; theGraphSpecrides back on the activity result). No usage/cost events are emitted. - The graph is best-effort: an expected dry-run failure (the run-failure wrappers, or a mock-input mint failure) returns
graph_spec=Nonewith validation still successful; any other error propagates and fails the activity. - Validation failures cross the boundary as structured
ErrorReports via the activity error boundary — aValidateBundleErrorkeeps its categorized per-error data (blueprint/factory/pipe/dry-run), so the API can render meaningful diagnostics. Signatures are not a failure: an unsatisfiedPipeSignatureis a runnability fact carried on the report'spending_signatures/is_runnable, not an error that crosses the boundary.
Submitters call dispatch_dry_validate (pipelex/temporal/tprl_pipe/dry_validate_dispatch.py) for the whole round-trip. The wrapper workflow exists so the dispatch works on the current temporalio SDK; replacing it with a true standalone activity is a deferred optimization.
Known Limitation: StuffArtefact Serialization in Dry-Run
In dry-run mode, PipeLLM produces StuffArtefact debug objects as working memory content. When controllers dispatch internal templating activities that carry working memory contents through the Temporal data converter, Kajson fails with TypeError: Type <class 'StuffArtefact'> is not JSON serializable.
StuffArtefact is not a Pydantic model — it's a plain object used for dry-run tracing that was never designed for cross-process serialization.
Confirmed affected:
- PipeCondition: expression evaluation dispatches
act_jinja2_gen_text, which serializes working memory to evaluate the Jinja2 expression. Fails withStuffArtefactTypeError. - PipeCompose: construct field resolution dispatches
act_jinja2_gen_textsimilarly.
Likely affected (hangs in testing, root cause not fully diagnosed):
- PipeBatch: fan-out creates child PipeJobs with copies of working memory. The hang may be caused by
StuffArtefactin the serialized child PipeJob, or by a different serialization issue in the list content extraction.
Not affected:
- PipeSequence: no internal templating activities — child pipes are dispatched via
SubPipe.run_pipe()→PipeRouterChild, which creates clean child PipeJobs. - PipeParallel: same dispatch pattern as PipeSequence — branches go through
SubPipe.run_pipe()without internal templating activities. - All controllers in live mode:
StuffArtefactis only produced in dry-run. Live execution uses real content objects that serialize correctly.
Fix direction: Either make StuffArtefact serializable (implement __json__ or convert to Pydantic), or strip non-serializable objects from working memory before passing to internal templating activities.
File Reference
| Component | File |
|---|---|
| LibraryCrate model | pipelex/libraries/library_crate.py |
| LibraryCrate factory | pipelex/libraries/library_crate_factory.py |
| PipeJob (crate + raw WM fields) | pipelex/pipe_run/pipe_job.py |
| PipeOutput (raw WM for deferred hydration) | pipelex/core/pipes/pipe_output.py |
| PipeRouterTop (submitter-side dispatch) | pipelex/temporal/tprl_pipe/pipe_router_top.py |
| PipeRouterChild (worker-side child dispatch) | pipelex/temporal/tprl_pipe/pipe_router_child.py |
| WfPipeRouter (workflow entry point) | pipelex/temporal/tprl_pipe/wf_pipe_router.py |
| Deferred hydration (worker-side) | pipelex/temporal/tprl_pipe/hydration.py |
| Submitter-side rehydration | pipelex/temporal/tprl_pipe/submitter_hydration.py |
| Activity-side delivery rendering (typed/raw dual path) | pipelex/pipe_run/delivery_executor.py |
| WorkingMemory dehydration helper | pipelex/core/memory/working_memory.py (dump_for_temporal) |
| Kajson data converter | pipelex/temporal/temporal_data_converter.py |
| Hub (ClassRegistry + Library scoping) | pipelex/hub.py |
| Library (PrivateAttr ClassRegistry) | pipelex/libraries/library.py |
| Worker CLI | pipelex/temporal/worker_cli.py |
| Temporal task manager | pipelex/temporal/temporal_task_manager.py |