Execution Graph Tracing
Pipelex captures pipeline executions as directed graphs for visualization and debugging. The pipelex/graph/ module provides tracing infrastructure, a canonical data model (GraphSpec), and multiple rendering backends (Mermaid, ReactFlow).
Design Principle
Pipeline executions form implicit graphs: pipes call other pipes, data flows between them. The graph module makes this structure explicit:
- Trace at runtime: Instrument pipe execution to capture nodes (pipes) and edges (relationships)
- Store canonically: GraphSpec is a versioned, renderer-agnostic JSON model
- Render as needed: Transform GraphSpec into Mermaid diagrams or ReactFlow visualizations
Pipe Execution → GraphTracer → GraphSpec → Renderers → HTML/Mermaid
Non-Intrusive Design
Graph tracing is opt-in. When disabled, a no-op tracer is used with zero overhead. The tracer is injected via GraphContext in JobMetadata, not global state.
Usage Variants
| Scenario | CLI | API | Result |
|---|---|---|---|
| Generate execution graph | pipelex run my_pipe --graph |
execute_pipeline(..., execution_config.is_generate_graph=True) |
GraphSpec JSON + HTML viewers |
| Force include full data | --graph --graph-full-data |
data_inclusion.stuff_json_content=True |
Data embedded in IOSpec |
| Force exclude data | --graph --graph-no-data |
All data_inclusion.*=False |
Previews only |
| Dry run with graph | --dry-run --graph |
dry_run_pipe_with_graph(pipe) |
Graph of mock execution |
Full Data Included by Default
The default configuration includes full data in graphs (stuff_json_content, stuff_text_content, stuff_html_content, and error_stack_traces are all true). Use --graph-full-data or --graph-no-data only to override project-specific settings.
Interfaces
CLI Commands
# Run pipeline and generate graph
pipelex run my_pipe --graph
# Include full serialized data
pipelex run my_pipe --graph --graph-full-data
# Exclude data (previews only)
pipelex run my_pipe --graph --graph-no-data
# Dry run with graph tracing
pipelex run my_pipe --dry-run --graph --mock-inputs
API
from pipelex.pipeline.execute import execute_pipeline
from pipelex.pipe_run.dry_run_with_graph import dry_run_pipe_with_graph
# Execute with graph tracing via config
result = await execute_pipeline(
pipe_code="my_pipe",
execution_config=config.with_graph_config_overrides(generate_graph=True),
)
# Dry run directly returns GraphSpec
graph_spec = await dry_run_pipe_with_graph(pipe)
Outputs
| Output | File | Purpose |
|---|---|---|
graphspec_json |
_graphspec.json |
Canonical graph representation |
mermaidflow_mmd |
_mermaid.mmd |
Mermaid flowchart code |
mermaidflow_html |
_mermaid.html |
Standalone Mermaid viewer |
reactflow_viewspec |
_viewspec.json |
ViewSpec for ReactFlow |
reactflow_html |
_reactflow.html |
Interactive ReactFlow viewer |
Architecture
flowchart TB
subgraph EXECUTION["Pipeline Execution"]
direction TB
PIPE["PipeAbstract.run_pipe()"]
JOB["JobMetadata"]
PIPE --> JOB
end
subgraph TRACING["Graph Tracing"]
direction TB
MGR["GraphTracerManager<br/>(singleton)"]
TRACER["GraphTracer"]
CTX["GraphContext"]
MGR --> TRACER
TRACER --> CTX
end
subgraph MODEL["Canonical Model"]
direction TB
SPEC["GraphSpec"]
NODE["NodeSpec"]
EDGE["EdgeSpec"]
IO["IOSpec"]
SPEC --> NODE
SPEC --> EDGE
NODE --> IO
end
subgraph ANALYSIS["Pre-computed Analysis"]
direction TB
GA["GraphAnalysis"]
TREE["containment_tree"]
STUFF["stuff_registry"]
GA --> TREE
GA --> STUFF
end
subgraph RENDER["Renderers"]
direction TB
MF["MermaidflowFactory"]
RF["ViewSpec Transformer"]
HTML1["Mermaid HTML"]
HTML2["ReactFlow HTML"]
MF --> HTML1
RF --> HTML2
end
JOB --> CTX
CTX --> TRACER
TRACER --> SPEC
SPEC --> GA
GA --> MF
GA --> RF
Core Components
GraphSpec
The canonical, versioned data model for execution graphs. Designed for JSON serialization and renderer-agnostic storage.
class GraphSpec(BaseModel):
graph_id: str
created_at: datetime
pipeline_ref: PipelineRef
nodes: list[NodeSpec]
edges: list[EdgeSpec]
meta: dict[str, Any]
def to_json(self) -> str:
return self.model_dump_json(by_alias=True, indent=2)
Node Types
| NodeKind | Description |
|---|---|
PIPE_CALL |
Generic pipe invocation |
CONTROLLER |
PipeController (Sequence, Parallel, etc.) |
OPERATOR |
PipeOperator (LLM, Extract, etc.) |
INPUT |
Pipeline input node |
OUTPUT |
Pipeline output node |
ARTIFACT |
Generated artifact |
ERROR |
Error node |
Edge Types
| EdgeKind | Description |
|---|---|
CONTROL |
Execution flow between pipes |
DATA |
Data flow (stuff passed between pipes) |
CONTAINS |
Parent-child containment (controller → children) |
SELECTED_OUTCOME |
Condition outcome selection |
Node Status
| NodeStatus | Description |
|---|---|
SCHEDULED |
Not yet started |
RUNNING |
Currently executing |
SUCCEEDED |
Completed successfully |
FAILED |
Execution failed |
SKIPPED |
Skipped during execution |
CANCELED |
Canceled before completion |
Implementation
Tracing Flow
# 1. Manager opens tracer for pipeline run
manager = GraphTracerManager.get_or_create_instance()
graph_context = manager.open_tracer(
graph_id=pipeline_run_id,
data_inclusion=config.data_inclusion,
pipeline_ref_domain="my_domain",
pipeline_ref_main_pipe="my_pipe",
)
# 2. Context flows through JobMetadata to each pipe
job_metadata = JobMetadata(
pipeline_run_id=pipeline_run_id,
graph_context=graph_context,
)
# 3. Each pipe reports start/end to tracer
node_id, child_context = manager.on_pipe_start(
graph_context=graph_context,
pipe_code="extract_text",
pipe_type="PipeExtract",
node_kind=NodeKind.OPERATOR,
started_at=datetime.now(timezone.utc),
input_specs=[...],
)
# 4. On completion, report success with output
manager.on_pipe_end_success(
graph_id=graph_context.graph_id,
node_id=node_id,
ended_at=datetime.now(timezone.utc),
output_spec=IOSpec(...),
)
# 5. Manager closes tracer and returns GraphSpec
graph_spec = manager.close_tracer(pipeline_run_id)
GraphContext Propagation
GraphContext is a serializable context that flows through pipe execution:
class GraphContext(BaseModel):
graph_id: str # Unique graph identifier
parent_node_id: str | None # Parent pipe's node ID
node_sequence: int # Counter for generating node IDs
data_inclusion: DataInclusionConfig # What data to capture
def copy_for_child(self, child_node_id: str, next_sequence: int) -> GraphContext:
"""Create context for nested pipe execution."""
return GraphContext(
graph_id=self.graph_id,
parent_node_id=child_node_id,
node_sequence=next_sequence,
data_inclusion=self.data_inclusion,
)
Producer/Consumer Paradigm
Data flow in the graph follows a producer/consumer model:
- Producers: Nodes that create or output data (stuff). When a pipe outputs a value, it becomes the producer of that data item.
- Consumers: Nodes that receive or use data as input. When a pipe takes a value as input, it becomes a consumer of that data item.
Each piece of data is identified by a digest (a unique hash). This allows the tracer to track where data originates and where it flows, even when the same value passes through multiple pipes.
Producer Node ──(outputs)──► Stuff (digest: abc123) ──(inputs)──► Consumer Node
This paradigm enables:
- Automatic DATA edge generation without explicit wiring
- Visualization of data lineage across the execution graph
- Debugging by tracing which pipe produced unexpected output
Data Flow Edge Generation
DATA edges are generated at teardown by correlating input/output digests:
def _generate_data_edges(self) -> None:
for consumer_node_id, node_data in self._nodes.items():
for input_spec in node_data.input_specs:
if input_spec.digest is None:
continue
producer_node_id = self._stuff_producer_map.get(input_spec.digest)
if producer_node_id and producer_node_id != consumer_node_id:
self.add_edge(
source_node_id=producer_node_id,
target_node_id=consumer_node_id,
edge_kind=EdgeKind.DATA,
label=input_spec.name,
)
GraphAnalysis
Pre-computed analysis layer that extracts common information for renderers:
analysis = GraphAnalysis.from_graphspec(graph_spec)
# Lookups
node = analysis.nodes_by_id["node_123"]
children = analysis.get_children("controller_node")
is_root = analysis.is_root("node_123")
# Data flow
stuff_info = analysis.get_stuff_info(digest="abc123")
producer = analysis.get_producer(digest="abc123")
consumers = analysis.get_consumers(digest="abc123")
| Attribute | Purpose |
|---|---|
nodes_by_id |
Fast node lookup by ID |
containment_tree |
Parent → children mapping |
child_node_ids |
All nodes that have parents |
controller_node_ids |
Nodes with children |
root_nodes |
Top-level nodes |
stuff_registry |
Digest → StuffInfo |
stuff_producers |
Digest → producer node ID |
stuff_consumers |
Digest → consumer node IDs |
Rendering
Mermaidflow
Converts GraphSpec to Mermaid flowchart syntax with controller subgraphs:
from pipelex.graph.mermaidflow.mermaidflow_factory import MermaidflowFactory
mermaidflow = MermaidflowFactory.make_from_graphspec(
graph_spec,
graph_config,
direction=FlowchartDirection.TOP_DOWN,
include_subgraphs=True,
)
print(mermaidflow.mermaid_code)
Output structure:
- Controllers rendered as subgraphs
- Operators rendered as rectangles inside subgraphs
- Stuff nodes (data items) rendered as stadium shapes
- DATA edges connect producers → stuff → consumers
ViewSpec (ReactFlow)
Transforms GraphSpec into a viewer-oriented model:
from pipelex.graph.reactflow.viewspec_transformer import graphspec_to_viewspec
analysis = GraphAnalysis.from_graphspec(graph_spec)
viewspec = graphspec_to_viewspec(
graph_spec,
analysis,
options={"show_data_edges": True},
)
ViewSpec provides:
ViewNodewith UI metadata (badges, classes, inspector data)ViewEdgewith ReactFlow properties (animated, hidden)ViewIndexfor fast client-side lookups- Layout configuration for Dagre
Configuration
GraphConfig
# pipelex.toml (default values)
[pipelex.pipeline_execution_config.graph_config]
[pipelex.pipeline_execution_config.graph_config.data_inclusion]
stuff_json_content = true # Include full serialized data
stuff_text_content = true # Include ASCII text representation
stuff_html_content = true # Include HTML representation
error_stack_traces = true # Include full stack traces
[pipelex.pipeline_execution_config.graph_config.graphs_inclusion]
graphspec_json = true # Generate GraphSpec JSON
mermaidflow_mmd = true # Generate Mermaid code
mermaidflow_html = true # Generate Mermaid HTML
reactflow_viewspec = true # Generate ViewSpec JSON
reactflow_html = true # Generate ReactFlow HTML
MermaidRenderingConfig
| Option | Description |
|---|---|
direction |
Flowchart direction (TB, LR, BT, RL) |
is_include_data_edges |
Show data flow edges |
is_include_contains_edges |
Show containment edges |
is_show_stuff_codes |
Show digest in stuff labels |
style.theme |
Mermaid theme (default, dark, forest, neutral) |
ReactFlowRenderingConfig
| Option | Description |
|---|---|
layout_direction |
Dagre layout direction (TB, LR) |
nodesep |
Node separation in pixels |
ranksep |
Rank separation in pixels |
edge_type |
Edge style (bezier, smoothstep, step, straight) |
initial_zoom |
Initial viewport zoom |
style.theme |
UI theme (light, dark, system) |
IOSpec Data Capture
IOSpec captures input/output data for nodes:
class IOSpec(BaseModel):
name: str # Variable name
concept: str | None # Concept code
content_type: str | None # MIME type
preview: str | None # Truncated preview (max 200 chars)
size: int | None # Content size
digest: str | None # Unique identifier for data flow
data: Any | None # Full serialized content
data_text: str | None # ASCII text representation
data_html: str | None # HTML representation
Preview Truncation
Previews are automatically truncated to 200 characters. Stack traces are truncated to 2000 characters. Use --graph-full-data to capture complete content.
Validation
GraphSpec validation enforces invariants:
from pipelex.graph.validation import validate_graphspec
validate_graphspec(graph_spec)
| Invariant | Error |
|---|---|
| Duplicate node IDs | GraphSpecValidationError |
| Duplicate edge IDs | GraphSpecValidationError |
| Edge references non-existent node | GraphSpecValidationError |
| Failed node without error spec | GraphSpecValidationError |
Syntax Quick Reference
| Pattern | Purpose |
|---|---|
GraphSpec.to_json() |
Serialize to JSON string |
GraphAnalysis.from_graphspec(g) |
Pre-compute analysis |
MermaidflowFactory.make_from_graphspec(...) |
Generate Mermaid |
graphspec_to_viewspec(g, analysis) |
Generate ViewSpec |
generate_graph_outputs(g, config, pipe_code) |
Generate all outputs |
Files Reference
| File | Purpose |
|---|---|
pipelex/graph/graphspec.py |
Canonical GraphSpec model |
pipelex/graph/graph_tracer.py |
GraphTracer implementation |
pipelex/graph/graph_tracer_manager.py |
Singleton manager for tracers |
pipelex/graph/graph_tracer_protocol.py |
Protocol + NoOp implementation |
pipelex/graph/graph_context.py |
Serializable tracing context |
pipelex/graph/graph_analysis.py |
Pre-computed graph analysis |
pipelex/graph/graph_factory.py |
Output generation factory |
pipelex/graph/graph_config.py |
Configuration models |
pipelex/graph/validation.py |
GraphSpec validation |
pipelex/graph/exceptions.py |
Graph-specific exceptions |
pipelex/graph/mermaidflow/ |
Mermaid rendering |
pipelex/graph/reactflow/ |
ReactFlow rendering |