🆕 Build and deploy Haystack pipelines with deepset Studio

Extract Metadata Filters from a Query


Notebook by David Batista

This is part one of the Advanced Use Cases series:

1️⃣ Extract Metadata from Queries to Improve Retrieval & the full article

2️⃣ Query Expansion cookbook & full article

3️⃣ Query Decomposition cookbook & the full article

4️⃣ Automated Metadata Enrichment

In this notebook, we’ll discuss how to implement a custom component, QueryMetadataExtractor, that extracts entities from the query and formulates the corresponding metadata filter.

Useful Sources

Setup the Development Environment

!pip install haystack-ai
!pip install sentence-transformers

Enter your OPENAI_API_KEY. Get your OpenAI API key here:

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Enter OpenAI API key:··········

Implement QueryMetadataExtractor

Create a custom component, QueryMetadataExtractor, which takes query and metadata_fields as inputs and outputs filters. This component encapsulates a generative pipeline, made up of PromptBuilder and OpenAIGenerator. The pipeline instructs the LLM to extract keywords, phrases, or entities from a given query which can then be used as metadata filters. In the prompt, we include instructions to ensure the output format is in JSON and provide metadata_fields along with the query to ensure the correct entities are extracted from the query.

Once the pipeline is initialized in the init method of the component, we post-process the LLM output in the run method. This step ensures the extracted metadata is correctly formatted to be used as a metadata filter.

import json
from typing import Dict, List

from haystack import Pipeline, component
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator

@component()
class QueryMetadataExtractor:

    def __init__(self):
        prompt = """
        You are part of an information system that processes users queries.
        Given a user query you extract information from it that matches a given list of metadata fields.
        The information to be extracted from the query must match the semantics associated with the given metadata fields.
        The information that you extracted from the query will then be used as filters to narrow down the search space
        when querying an index.
        Just include the value of the extracted metadata without including the name of the metadata field.
        The extracted information in 'Extracted metadata' must be returned as a valid JSON structure.
        ###
        Example 1:
        Query: "What was the revenue of Nvidia in 2022?"
        Metadata fields: {"company", "year"}
        Extracted metadata fields: {"company": "nvidia", "year": 2022}
        ###
        Example 2:
        Query: "What were the most influential publications in 2023 regarding Alzheimer's disease?"
        Metadata fields: {"disease", "year"}
        Extracted metadata fields: {"disease": "Alzheimer", "year": 2023}
        ###
        Example 3:
        Query: "{{query}}"
        Metadata fields: "{{metadata_fields}}"
        Extracted metadata fields:
        """
        self.pipeline = Pipeline()
        self.pipeline.add_component(name="builder", instance=PromptBuilder(prompt))
        self.pipeline.add_component(name="llm", instance=OpenAIGenerator(model="gpt-4o-mini"))
        self.pipeline.connect("builder", "llm")

    @component.output_types(filters=Dict[str, str])
    def run(self, query: str, metadata_fields: List[str]):
        result = self.pipeline.run({'builder': {'query': query, 'metadata_fields': metadata_fields}})
        metadata = json.loads(result['llm']['replies'][0])

        # this can be done with specific data structures and in a more sophisticated way
        filters = []
        for key, value in metadata.items():
            field = f"meta.{key}"
            filters.append({f"field": field, "operator": "==", "value": value})

        return {"filters": {"operator": "AND", "conditions": filters}}

First, let’s test the QueryMetadataExtractor in isolation, passing a query and a list of metadata fields.

extractor = QueryMetadataExtractor()

query = "What were the most influential publications in 2022 regarding Parkinson's disease?"
metadata_fields = {"disease", "year"}

result = extractor.run(query, metadata_fields)
print(result)
{'filters': {'operator': 'AND', 'conditions': [{'field': 'meta.year', 'operator': '==', 'value': 2022}, {'field': 'meta.disease', 'operator': '==', 'value': 'Parkinson'}]}}

Notice that the QueryMetadataExtractor has extracted the metadata fields from the query and returned them in a format that can be used as filters passed directly to a Retriever. By default, the QueryMetadataExtractor will use all metadata fields as conditions together with an AND operator.

Use QueryMetadataExtractor in a Pipeline

Now, let’s plug the QueryMetadataExtractor into a Pipeline with a Retriever connected to a DocumentStore to see how it works in practice.

We start by creating a InMemoryDocumentStore and adding some documents to it. We include info about “year” and “disease” in the “meta” field of each document.

from haystack import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy

documents = [
    Document(
        content="some publication about Alzheimer prevention research done over 2023 patients study",
        meta={"year": 2022, "disease": "Alzheimer", "author": "Michael Butter"}),
    Document(
        content="some text about investigation and treatment of Alzheimer disease",
        meta={"year": 2023, "disease": "Alzheimer", "author": "John Bread"}),
    Document(
        content="A study on the effectiveness of new therapies for Parkinson's disease",
        meta={"year": 2022, "disease": "Parkinson", "author": "Alice Smith"}
    ),
    Document(
        content="An overview of the latest research on the genetics of Parkinson's disease and its implications for treatment",
        meta={"year": 2023, "disease": "Parkinson", "author": "David Jones"}
    )
]

document_store = InMemoryDocumentStore(bm25_algorithm="BM25Plus")
document_store.write_documents(documents=documents, policy=DuplicatePolicy.OVERWRITE)
4

We then create a pipeline consisting of the QueryMetadataExtractor and a InMemoryBM25Retriever connected to the InMemoryDocumentStore created above.

Learn about connecting components and creating pipelines in Docs: Creating Pipelines.

from haystack import Pipeline, Document
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever


retrieval_pipeline = Pipeline()
metadata_extractor = QueryMetadataExtractor()
retriever = InMemoryBM25Retriever(document_store=document_store)

retrieval_pipeline.add_component(instance=metadata_extractor, name="metadata_extractor")
retrieval_pipeline.add_component(instance=retriever, name="retriever")
retrieval_pipeline.connect("metadata_extractor.filters", "retriever.filters")
<haystack.core.pipeline.pipeline.Pipeline object at 0x789b1bba1900>
🚅 Components
  - metadata_extractor: LLMMetadataQueryExtractor
  - retriever: InMemoryBM25Retriever
🛤️ Connections
  - metadata_extractor.filters -> retriever.filters (Dict[str, str])

Now define a query and metadata fields and pass them to the pipeline:

query = "publications 2023 Alzheimer's disease"
metadata_fields = {"year", "author", "disease"}

retrieval_pipeline.run(data={"metadata_extractor": {"query": query, "metadata_fields": metadata_fields}, "retriever":{"query": query}})
Ranking by BM25...:   0%|          | 0/1 [00:00<?, ? docs/s]





{'retriever': {'documents': [Document(id=e3b0bfd497a9f83397945583e77b293429eb5bdead5680cc8f58dd4337372aa3, content: 'some text about investigation and treatment of Alzheimer disease', meta: {'year': 2023, 'disease': 'Alzheimer', 'author': 'John Bread'}, score: 2.772588722239781)]}}