🌸 Join the Spring into Haystack challenge and create your Agent with MCP and Haystack!

Hybrid RAG Pipeline with Breakpoints


This notebook demonstrates how to setup breakpoints in a Haystack pipeline. In this case we will set up break points in a hybrid retrieval-augmented generation (RAG) pipeline. The pipeline combines BM25 and embedding-based retrieval methods, then uses a transformer-based reranker and an LLM to generate answers.

NOTE: this feature is a part of haystack-experimental

Install packages

!pip install "haystack-experimental"
!pip install "transformers[torch,sentencepiece]"
!pip install "sentence-transformers>=3.0.0"

Setup OpenAI API keys

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

Import Required Libraries

First, let’s import all the necessary components from Haystack.

from haystack_experimental.core.pipeline.pipeline import Pipeline # Note that we need to import the pipeline from haystack-experimental

from haystack import Document
from haystack.components.builders import AnswerBuilder, ChatPromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy

Document Store Initializations

Let’s create a simple document store with some sample documents and their embeddings.

def indexing():
    """
    Indexing documents in a DocumentStore.
    """

    print("Indexing documents...")

    # Create sample documents
    documents = [
        Document(content="My name is Jean and I live in Paris."),
        Document(content="My name is Mark and I live in Berlin."),
        Document(content="My name is Giorgio and I live in Rome."),
    ]

    # Initialize document store and components
    document_store = InMemoryDocumentStore()
    doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
    doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2", progress_bar=False)

    # Build and run the ingestion pipeline
    ingestion_pipe = Pipeline()
    ingestion_pipe.add_component(instance=doc_embedder, name="doc_embedder")
    ingestion_pipe.add_component(instance=doc_writer, name="doc_writer")

    ingestion_pipe.connect("doc_embedder.documents", "doc_writer.documents")
    ingestion_pipe.run({"doc_embedder": {"documents": documents}})

    return document_store

A Hybrid Retrieval Pipeline

Now let’s build a hybrid RAG pipeline.

def hybrid_retrieval(doc_store):
    """
    A simple pipeline for hybrid retrieval using BM25 and embeddings.
    """

    # Initialize query embedder
    query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", progress_bar=False)

    # Define the prompt template for the LLM
    template = [
        ChatMessage.from_system(
            "You are a helpful AI assistant. Answer the following question based on the given context information only. If the context is empty or just a '\n' answer with None, example: 'None'."
        ),
        ChatMessage.from_user(
            """
            Context:
            {% for document in documents %}
                {{ document.content }}
            {% endfor %}
    
            Question: {{question}}
            """
        )
    ]

    
    # Build the RAG pipeline
    rag_pipeline = Pipeline()
    
    # Add components to the pipeline
    rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=doc_store), name="bm25_retriever")
    rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
    rag_pipeline.add_component(instance=InMemoryEmbeddingRetriever(document_store=doc_store), name="embedding_retriever")
    rag_pipeline.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
    rag_pipeline.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=5), name="ranker")    
    rag_pipeline.add_component(instance=ChatPromptBuilder(template=template, required_variables=["question", "documents"]), name="prompt_builder", )    
    rag_pipeline.add_component(instance=OpenAIChatGenerator(), name="llm")
    rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")

    # Connect the components
    rag_pipeline.connect("query_embedder", "embedding_retriever.query_embedding")
    rag_pipeline.connect("embedding_retriever", "doc_joiner.documents")
    rag_pipeline.connect("bm25_retriever", "doc_joiner.documents")
    rag_pipeline.connect("doc_joiner", "ranker.documents")
    rag_pipeline.connect("ranker", "prompt_builder.documents")
    rag_pipeline.connect("prompt_builder", "llm")
    rag_pipeline.connect("llm.replies", "answer_builder.replies")    
    rag_pipeline.connect("doc_joiner", "answer_builder.documents")

    return rag_pipeline

Running the pipeline with breakpoints

This function shows how to run a pipeline using breakpoints to save its state at specific stages. In this example, we place a breakpoint at the query embedder and set the visit count to 0, breaking the execution in the first run.

# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)

# Define the query
question = "Where does Mark live?"
data = {
    "query_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "ranker": {"query": question, "top_k": 10},
    "prompt_builder": {"question": question},
    "answer_builder": {"query": question},
}


pipeline.run(data, breakpoints={("query_embedder", 0)}, debug_path="saved_states")
Indexing documents...



---------------------------------------------------------------------------

PipelineBreakpointException               Traceback (most recent call last)

Cell In[12], line 16
      6 question = "Where does Mark live?"
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)
     12     "answer_builder": {"query": question},
     13 }
---> 16 pipeline.run(data, breakpoints={("query_embedder", 0)}, debug_path="saved_states")


File ~/haystack-experimental/haystack_experimental/core/pipeline/pipeline.py:321, in Pipeline.run(self, data, include_outputs_from, breakpoints, resume_state, debug_path)
    319 # keep track of the original input to save it in case of a breakpoint when running the component
    320 self.original_input_data = data
--> 321 component_outputs = self._run_component(
    322     component,
    323     inputs,
    324     component_visits,
    325     validated_breakpoints,
    326     parent_span=span,
    327 )
    329 # Updates global input state with component outputs and returns outputs that should go to
    330 # pipeline outputs.
    331 component_pipeline_outputs = self._write_component_outputs(
    332     component_name=component_name,
    333     component_outputs=component_outputs,
   (...)
    336     include_outputs_from=include_outputs_from,
    337 )


File ~/haystack-experimental/haystack_experimental/core/pipeline/pipeline.py:90, in Pipeline._run_component(self, component, inputs, component_visits, breakpoints, parent_span)
     88 breakpoint_inputs[component_name] = Pipeline._remove_unserializable_data(component_inputs)
     89 if breakpoints and not self.resume_state:
---> 90     self._check_breakpoints(breakpoints, component_name, component_visits, breakpoint_inputs)
     92 with tracing.tracer.trace(
     93     "haystack.component.run",
     94     tags={
   (...)
    115     # We deepcopy the inputs otherwise we might lose that information
    116     # when we delete them in case they're sent to other Components
    117     span.set_content_tag("haystack.component.input", deepcopy(component_inputs))


File ~/haystack-experimental/haystack_experimental/core/pipeline/pipeline.py:425, in Pipeline._check_breakpoints(self, breakpoints, component_name, component_visits, inputs)
    423 logger.info(msg)
    424 state = self.save_state(inputs, str(component_name), component_visits)
--> 425 raise PipelineBreakpointException(msg, component=component_name, state=state)


PipelineBreakpointException: Breaking at component query_embedder visit count 0

This run shall break with a PipelineBreakpointException: Breaking at component query_embedder visit count 0 - and this will generate a JSON file with date and time stamp (e.g.: query_embedder_2025_04_15_15_00_20.json) in a new directory saved_stated containing the pipeline running states before running the component query_embedder.

This file can be explored and used to inspect the pipeline at that execution point.

Resuming from a break point

We can then resume a pipeline from a saved_state by passing it to the Pipeline.run() method. This will run the pipeline to the end.

 # Load the saved state and continue execution
resume_state = pipeline.load_state("saved_states/query_embedder_2025_04_15_15_00_20.json")
result = pipeline.run(data={}, resume_state=resume_state)
    
# Print the results
print(result['answer_builder']['answers'][0].data)
print(result['answer_builder']['answers'][0].meta)
Mark lives in Berlin.
{'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 6, 'prompt_tokens': 74, 'total_tokens': 80, 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0, cached_tokens=0)}}