Debugging a Prompt Chain

Logging, tracing, and fixing failures in multi-step workflows. Tools and techniques to diagnose why your chain broke and how to prevent it from happening again.

June 9, 2026
prompt-chainingdebuggingloggingtracingerror-handlingtutorial

Debugging a Prompt Chain

A prompt chain fails silently by default. An LLM call returns something — it might be wrong, incomplete, or hallucinated, but it's rarely an exception you can catch. Debugging means making failures loud and traceable.

This tutorial covers the debugging toolkit: structured logging, step-by-step tracing, gate checks, and recovery strategies. By the end, you'll have a chain that tells you exactly where and why it broke.

Why Chains Fail

Before you instrument, understand the failure modes:

FailureSymptomImpact
Empty outputStep N returns "" or NoneStep N+1 has no input, chain crashes
Too-short outputStep returns 10 chars when 500 were expectedDownstream steps produce garbage from garbage
Format mismatchStep 2 expects JSON, Step 1 returned proseParsing error or hallucinated data
Context driftBy step 4, the model forgot the original taskOutput is coherent but wrong topic
Hallucinated dataStep invents facts or sourcesPropagation — hallucination becomes "truth" for later steps
Infinite retry loopA step retries, fails, retries, fails...Token waste, eventual timeout
Silent quality dropOutput degrades gradually across stepsHardest to detect — everything "looks fine"

Instrumenting a Chain

Start with structured logging. Every step logs its input size, output size, latency, and any anomalies.

import json
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from openai import OpenAI

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(name)s] %(levelname)s %(message)s'
)
logger = logging.getLogger("chain")

@dataclass
class StepTrace:
    step_name: str
    started_at: str = ""
    completed_at: str = ""
    duration_ms: float = 0
    input_size: int = 0
    output_size: int = 0
    model: str = ""
    temperature: float = 0.0
    tokens_used: int = 0
    gate_check_passed: bool = True
    error: str = ""
    retries: int = 0

@dataclass
class ChainTrace:
    chain_name: str
    started_at: str = ""
    completed_at: str = ""
    steps: list[StepTrace] = field(default_factory=list)
    total_tokens: int = 0
    total_cost: float = 0.0
    success: bool = False
    error: str = ""

class TracedChain:
    def __init__(self, name: str = "untitled"):
        self.name = name
        self.client = OpenAI()
        self.trace = ChainTrace(
            chain_name=name,
            started_at=datetime.now(timezone.utc).isoformat(),
        )

    def step(self, name: str, model: str = "gpt-4o", temperature: float = 0.3,
             max_retries: int = 3, **kwargs) -> str:
        trace = StepTrace(
            step_name=name,
            model=model,
            temperature=temperature,
            started_at=datetime.now(timezone.utc).isoformat(),
        )

        for attempt in range(1, max_retries + 1):
            try:
                messages = kwargs.get("messages", [])
                trace.input_size = sum(len(m.get("content", "")) for m in messages)

                start = time.time()
                response = self.client.chat.completions.create(
                    model=model,
                    temperature=temperature,
                    **{k: v for k, v in kwargs.items() if k != "gate_check"},
                )
                trace.duration_ms = (time.time() - start) * 1000

                content = response.choices[0].message.content or ""
                trace.output_size = len(content)
                trace.tokens_used = (
                    response.usage.total_tokens if response.usage else 0
                )

                # Run gate check if provided
                gate = kwargs.get("gate_check")
                if gate and not gate(content):
                    trace.gate_check_passed = False
                    trace.error = "Gate check failed"
                    if attempt < max_retries:
                        logger.warning(
                            f"[{name}] Gate check failed, retrying ({attempt}/{max_retries})"
                        )
                        trace.retries = attempt
                        continue
                    else:
                        raise ValueError(
                            f"[{name}] Gate check failed after {max_retries} attempts"
                        )

                trace.completed_at = datetime.now(timezone.utc).isoformat()
                self.trace.steps.append(trace)
                self.trace.total_tokens += trace.tokens_used

                logger.info(
                    f"[{name}] Complete | {trace.output_size} chars | "
                    f"{trace.duration_ms:.0f}ms | {trace.tokens_used} tokens"
                )
                return content

            except Exception as e:
                trace.error = str(e)
                if attempt < max_retries:
                    logger.warning(
                        f"[{name}] Error, retrying ({attempt}/{max_retries}): {e}"
                    )
                    trace.retries = attempt
                    time.sleep(1 * attempt)  # Exponential backoff
                else:
                    trace.completed_at = datetime.now(timezone.utc).isoformat()
                    self.trace.steps.append(trace)
                    self.trace.error = str(e)
                    logger.error(f"[{name}] Failed after {max_retries} attempts: {e}")
                    raise

    def finalize(self, success: bool = True):
        self.trace.completed_at = datetime.now(timezone.utc).isoformat()
        self.trace.success = success
        total_ms = sum(s.duration_ms for s in self.trace.steps)
        # Approximate cost: gpt-4o is ~$2.50/1M input, ~$10/1M output
        self.trace.total_cost = (self.trace.total_tokens / 1_000_000) * 6.00

        logger.info(
            f"[{self.name}] {'✓' if success else '✗'} "
            f"{len(self.trace.steps)} steps | "
            f"{total_ms:.0f}ms | "
            f"{self.trace.total_tokens} tokens | "
            f"${self.trace.total_cost:.4f}"
        )

    def dump_trace(self) -> str:
        return json.dumps(self.trace.__dict__, indent=2, default=str)

Note:

The TracedChain class is reusable. Drop it into any chain and get per-step metrics for free. The gate_check parameter accepts a callable — if it returns False, the step retries.

Gate Checks: Fail Fast

The most common chain bug: step 2 produces garbage, step 3 amplifies it, and you only notice at step 5. Gate checks stop the cascade early.

def content_pipeline_with_gates(topic: str, audience: str) -> str:
    chain = TracedChain("content-pipeline")

    # Step 1: Outline
    outline = chain.step(
        "outline",
        messages=[{
            "role": "user",
            "content": f"Create a detailed outline for an article about {topic} for {audience}."
        }],
        gate_check=lambda x: len(x) > 100 and "##" in x,
    )

    # Step 2: Draft — must contain every H2 from the outline
    draft = chain.step(
        "draft",
        temperature=0.7,
        messages=[{
            "role": "user",
            "content": f"Write a full draft following this outline:\n\n{outline}"
        }],
        gate_check=lambda x: len(x) > 500,
    )

    # Step 3: Polish — must not be shorter than draft (data loss check)
    polished = chain.step(
        "polish",
        temperature=0.2,
        messages=[{
            "role": "user",
            "content": f"Polish this draft for clarity and grammar:\n\n{draft}"
        }],
        gate_check=lambda x: len(x) >= len(draft) * 0.7,
    )

    chain.finalize(success=True)
    logger.info(f"\n{chain.dump_trace()}")
    return polished

Gate check patterns by step type:

# Structural: output must contain expected markers
lambda x: "## " in x and len(x) > 200

# Length: output must be substantive
lambda x: 500 < len(x) < 5000

# JSON: parseable and has required keys
lambda x: (
    json.loads(x) and
    all(k in json.loads(x) for k in ["title", "body"])
)

# No regression: output must not lose data
lambda x: len(x) >= previous_output_length * 0.8

# Keyword presence: must mention specific terms
lambda x: all(k in x.lower() for k in ["security", "performance"])

# Sentiment: must not be overly negative
lambda x: "error" not in x[:100].lower()

Tracing: Reconstruct What Happened

When a chain fails in production, you need the full trace to diagnose it. The dump_trace() method above gives you this. Save traces to a file or ship them to your observability platform.

import os

def save_trace(chain: TracedChain, directory: str = "traces"):
    os.makedirs(directory, exist_ok=True)
    timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
    filename = f"{directory}/{chain.name}_{timestamp}.json"

    with open(filename, "w") as f:
        f.write(chain.dump_trace())

    logger.info(f"Trace saved to {filename}")

# Usage
chain = TracedChain("debug-example")
try:
    result = content_pipeline_with_gates("Rust vs Zig", "systems programmers")
    save_trace(chain)
except Exception:
    chain.finalize(success=False)
    save_trace(chain)
    raise

A trace file reveals:

  • Which step failedtrace.steps[-1].error has the exception
  • Where time went — sort steps by duration_ms
  • Cost breakdowntokens_used per step, total_cost
  • Gate violationsgate_check_passed: false with retry count
  • Input/output sizes — spot data loss between steps

Common Debugging Scenarios

Scenario 1: Chain Produces Too-Short Output

Symptom: Final output is 50 words when you expected 500.

Diagnosis with trace:

for step in trace.steps:
    print(f"{step.step_name}: {step.output_size} chars (input: {step.input_size})")
# Step 1 (outline): 450 chars ← fine
# Step 2 (draft): 62 chars   ← PROBLEM: draft collapsed
# Step 3 (polish): 58 chars  ← polishing garbage

Root cause: Step 2's prompt didn't specify expected length. The model produced a summary instead of a draft.

Fix:

chain.step(
    "draft",
    messages=[{
        "role": "user",
        "content": f"""Write a full draft. Each section must be 2-3 paragraphs.
        Minimum 500 words. Do not summarize — expand each point in detail.

        Outline:
        {outline}"""
    }],
    gate_check=lambda x: len(x.split()) > 400,  # Word count, not char count
)

Scenario 2: Context Drift by Step 4

Symptom: The chain starts writing about Kubernetes but by step 4 it's writing about Docker.

Diagnosis: Compare the original task to each step's output. Look for the step where the topic shifts.

def detect_topic_drift(original_topic: str, step_output: str) -> float:
    """Returns 0.0 (completely drifted) to 1.0 (on topic)"""
    # Simple keyword overlap
    topic_words = set(original_topic.lower().split())
    output_words = set(step_output.lower().split())
    overlap = topic_words & output_words
    return len(overlap) / len(topic_words) if topic_words else 1.0

# In your chain:
drift_score = detect_topic_drift("kubernetes", draft)
if drift_score < 0.3:
    logger.warning(f"Topic drift detected! Score: {drift_score:.2f}")

Fix: Re-inject the original goal into every step's prompt:

messages=[{
    "role": "user",
    "content": f"""Remember: the goal is a {tone} article about '{topic}' for {audience}.

    Current step: {step_description}
    Previous output: {previous_output}"""
}]

Scenario 3: Hallucinated Data Propagation

Symptom: Step 1 invents a statistic. Steps 2-4 cite it as fact. By step 5, the output references a study that doesn't exist.

Diagnosis: Manual — read each step's output. Hard to automate without a knowledge base to verify against.

Mitigation:

# After steps that produce factual claims, add a verification prompt
claims = chain.step(
    "fact_check",
    temperature=0.0,
    messages=[{
        "role": "user",
        "content": f"""Review these claims. For each, mark as:
        - VERIFIED: widely known, easily confirmable
        - PLAUSIBLE: reasonable but I'm not 100% certain
        - UNVERIFIABLE: specific numbers or studies I can't confirm

        Claims to check:
        {step_output}"""
    }]
)

Scenario 4: Cost Explosion

Symptom: A chain that should cost $0.02 costs $0.50.

Diagnosis with trace:

for step in trace.steps:
    print(f"{step.step_name}: {step.tokens_used} tokens, {step.retries} retries")

# Step 3 (draft): 12000 tokens, 3 retries ← 36K tokens for one step

Root cause: The draft step retried 3 times because the gate check was too strict. Each retry re-sent the full context.

Fix: Tune gate checks. If a step fails twice, relax the check on the third attempt:

def adaptive_gate(content: str, attempt: int) -> bool:
    if attempt == 1:
        return len(content) > 500 and "##" in content
    elif attempt == 2:
        return len(content) > 300  # Relax structural requirement
    else:
        return len(content) > 100  # Last resort: just get something

Visualizing Chain Performance

Pipe traces into a simple HTML report for pattern spotting:

def trace_report(traces: list[ChainTrace], output_path: str = "report.html"):
    rows = []
    for t in traces:
        total_ms = sum(s.duration_ms for s in t.steps)
        failed_steps = [s.step_name for s in t.steps if s.error]
        rows.append(f"""
        <tr>
            <td>{t.chain_name}</td>
            <td>{'✓' if t.success else '✗'}</td>
            <td>{len(t.steps)}</td>
            <td>{total_ms:.0f}ms</td>
            <td>{t.total_tokens}</td>
            <td>${t.total_cost:.4f}</td>
            <td>{', '.join(failed_steps) or '—'}</td>
        </tr>""")

    html = f"""<html><body><table border="1">
    <tr><th>Chain</th><th>OK</th><th>Steps</th><th>Latency</th>
    <th>Tokens</th><th>Cost</th><th>Failures</th></tr>
    {''.join(rows)}
    </table></body></html>"""

    with open(output_path, "w") as f:
        f.write(html)
    logger.info(f"Report written to {output_path}")

Debugging Checklist

When a chain breaks, work through this:

  1. Check traces — which step failed? What was the error?
  2. Inspect that step's output — was it empty? Wrong format? Too short?
  3. Check the gate — did the gate check fail and retry exhaust?
  4. Look upstream — did the previous step produce bad input?
  5. Check for drift — is the chain still on the original topic?
  6. Review temperature — is it too high (random) or too low (repetitive)?
  7. Check token limits — did the accumulated context exceed the model's window?
  8. Test the step in isolation — run just that step with hand-crafted input

Note:

The most expensive debugging technique is running the full chain repeatedly. Test individual steps in isolation first. If step 3 is the problem, don't re-run steps 1 and 2.

Production-Grade Tracing

For chains running in production, ship structured traces to your observability stack:

def ship_to_observability(trace: ChainTrace):
    """Example: ship trace spans to an OTLP-compatible collector"""
    spans = []
    chain_start = datetime.fromisoformat(trace.started_at)

    for step in trace.steps:
        step_start = datetime.fromisoformat(step.started_at)
        spans.append({
            "name": f"{trace.chain_name}.{step.step_name}",
            "start": (step_start - chain_start).total_seconds(),
            "duration_ms": step.duration_ms,
            "attributes": {
                "model": step.model,
                "tokens": step.tokens_used,
                "input_size": step.input_size,
                "output_size": step.output_size,
                "success": not bool(step.error),
                "retries": step.retries,
            }
        })

    # Ship spans to your collector (Datadog, Grafana, Honeycomb, etc.)
    # otlp_exporter.export(spans)
    logger.info(f"Shipped {len(spans)} spans for {trace.chain_name}")

Note:

Pro tip: Add a unique chain_id (UUID) to every trace. Tag it in your logs, spans, and any downstream systems the chain touches. When a user reports "the report was wrong," you can pull the full trace by chain ID.