Hybrid RAG Pipeline with Breakpoints
Last Updated: April 16, 2025
This notebook demonstrates how to setup breakpoints in a Haystack pipeline. In this case we will set up break points in a hybrid retrieval-augmented generation (RAG) pipeline. The pipeline combines BM25 and embedding-based retrieval methods, then uses a transformer-based reranker and an LLM to generate answers.
NOTE: this feature is a part of
haystack-experimental
Install packages
!pip install "haystack-experimental"
!pip install "transformers[torch,sentencepiece]"
!pip install "sentence-transformers>=3.0.0"
Setup OpenAI API keys
import os
from getpass import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Import Required Libraries
First, let’s import all the necessary components from Haystack.
from haystack_experimental.core.pipeline.pipeline import Pipeline # Note that we need to import the pipeline from haystack-experimental
from haystack import Document
from haystack.components.builders import AnswerBuilder, ChatPromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
Document Store Initializations
Let’s create a simple document store with some sample documents and their embeddings.
def indexing():
"""
Indexing documents in a DocumentStore.
"""
print("Indexing documents...")
# Create sample documents
documents = [
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome."),
]
# Initialize document store and components
document_store = InMemoryDocumentStore()
doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2", progress_bar=False)
# Build and run the ingestion pipeline
ingestion_pipe = Pipeline()
ingestion_pipe.add_component(instance=doc_embedder, name="doc_embedder")
ingestion_pipe.add_component(instance=doc_writer, name="doc_writer")
ingestion_pipe.connect("doc_embedder.documents", "doc_writer.documents")
ingestion_pipe.run({"doc_embedder": {"documents": documents}})
return document_store
A Hybrid Retrieval Pipeline
Now let’s build a hybrid RAG pipeline.
def hybrid_retrieval(doc_store):
"""
A simple pipeline for hybrid retrieval using BM25 and embeddings.
"""
# Initialize query embedder
query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", progress_bar=False)
# Define the prompt template for the LLM
template = [
ChatMessage.from_system(
"You are a helpful AI assistant. Answer the following question based on the given context information only. If the context is empty or just a '\n' answer with None, example: 'None'."
),
ChatMessage.from_user(
"""
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{question}}
"""
)
]
# Build the RAG pipeline
rag_pipeline = Pipeline()
# Add components to the pipeline
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=doc_store), name="bm25_retriever")
rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
rag_pipeline.add_component(instance=InMemoryEmbeddingRetriever(document_store=doc_store), name="embedding_retriever")
rag_pipeline.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
rag_pipeline.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=5), name="ranker")
rag_pipeline.add_component(instance=ChatPromptBuilder(template=template, required_variables=["question", "documents"]), name="prompt_builder", )
rag_pipeline.add_component(instance=OpenAIChatGenerator(), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
# Connect the components
rag_pipeline.connect("query_embedder", "embedding_retriever.query_embedding")
rag_pipeline.connect("embedding_retriever", "doc_joiner.documents")
rag_pipeline.connect("bm25_retriever", "doc_joiner.documents")
rag_pipeline.connect("doc_joiner", "ranker.documents")
rag_pipeline.connect("ranker", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("doc_joiner", "answer_builder.documents")
return rag_pipeline
Running the pipeline with breakpoints
This function shows how to run a pipeline using breakpoints to save its state at specific stages. In this example, we place a breakpoint at the query embedder and set the visit count to 0, breaking the execution in the first run.
# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)
# Define the query
question = "Where does Mark live?"
data = {
"query_embedder": {"text": question},
"bm25_retriever": {"query": question},
"ranker": {"query": question, "top_k": 10},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
pipeline.run(data, breakpoints={("query_embedder", 0)}, debug_path="saved_states")
Indexing documents...
---------------------------------------------------------------------------
PipelineBreakpointException Traceback (most recent call last)
Cell In[12], line 16
6 question = "Where does Mark live?"
7 data = {
8 "query_embedder": {"text": question},
9 "bm25_retriever": {"query": question},
(...)
12 "answer_builder": {"query": question},
13 }
---> 16 pipeline.run(data, breakpoints={("query_embedder", 0)}, debug_path="saved_states")
File ~/haystack-experimental/haystack_experimental/core/pipeline/pipeline.py:321, in Pipeline.run(self, data, include_outputs_from, breakpoints, resume_state, debug_path)
319 # keep track of the original input to save it in case of a breakpoint when running the component
320 self.original_input_data = data
--> 321 component_outputs = self._run_component(
322 component,
323 inputs,
324 component_visits,
325 validated_breakpoints,
326 parent_span=span,
327 )
329 # Updates global input state with component outputs and returns outputs that should go to
330 # pipeline outputs.
331 component_pipeline_outputs = self._write_component_outputs(
332 component_name=component_name,
333 component_outputs=component_outputs,
(...)
336 include_outputs_from=include_outputs_from,
337 )
File ~/haystack-experimental/haystack_experimental/core/pipeline/pipeline.py:90, in Pipeline._run_component(self, component, inputs, component_visits, breakpoints, parent_span)
88 breakpoint_inputs[component_name] = Pipeline._remove_unserializable_data(component_inputs)
89 if breakpoints and not self.resume_state:
---> 90 self._check_breakpoints(breakpoints, component_name, component_visits, breakpoint_inputs)
92 with tracing.tracer.trace(
93 "haystack.component.run",
94 tags={
(...)
115 # We deepcopy the inputs otherwise we might lose that information
116 # when we delete them in case they're sent to other Components
117 span.set_content_tag("haystack.component.input", deepcopy(component_inputs))
File ~/haystack-experimental/haystack_experimental/core/pipeline/pipeline.py:425, in Pipeline._check_breakpoints(self, breakpoints, component_name, component_visits, inputs)
423 logger.info(msg)
424 state = self.save_state(inputs, str(component_name), component_visits)
--> 425 raise PipelineBreakpointException(msg, component=component_name, state=state)
PipelineBreakpointException: Breaking at component query_embedder visit count 0
This run shall break with a PipelineBreakpointException: Breaking at component query_embedder visit count 0
- and this will generate a JSON file with date and time stamp (e.g.: query_embedder_2025_04_15_15_00_20.json
) in a new directory saved_stated
containing the pipeline running states before running the component query_embedder
.
This file can be explored and used to inspect the pipeline at that execution point.
Resuming from a break point
We can then resume a pipeline from a saved_state
by passing it to the Pipeline.run()
method. This will run the pipeline to the end.
# Load the saved state and continue execution
resume_state = pipeline.load_state("saved_states/query_embedder_2025_04_15_15_00_20.json")
result = pipeline.run(data={}, resume_state=resume_state)
# Print the results
print(result['answer_builder']['answers'][0].data)
print(result['answer_builder']['answers'][0].meta)
Mark lives in Berlin.
{'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 6, 'prompt_tokens': 74, 'total_tokens': 80, 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0, cached_tokens=0)}}