Haystack docs home page

Pipelines

Flexibility powered by DAGs

To build modern search pipelines, you need two things: powerful building blocks and an easy way to stick them together. The Pipeline class is precisely built for this purpose and enables many search scenarios beyond QA. The core idea is to build a Directed Acyclic Graph (DAG) where each Node is one building block (Reader, Retriever, Generator ...). Here's a simple example for a standard Open-Domain QA Pipeline:

from haystack import Pipeline
p = Pipeline()
p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])
p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])
res = p.run(query="What did Einstein work on?")

Initialize a Pipeline

To start building your custom pipeline, you’ll need to initialize an object of the base Pipeline class:

from haystack import Pipeline
pipeline = Pipeline()

By default, a new pipeline receives a root node called Query or File depending on whether it's a Query or Indexing Pipeline, as the entry point to the pipeline graph. You need to manually define how the information flows from one node to the next from that point on.

Add Nodes to a Pipeline

Use the add_node() method to add new components to the pipeline graph. You may either initialize the modules before or during the call to add_node(). When you add a node to the pipeline, give it a name and a list of inputs containing one or more items. Note how the default Query node acts as the input node to the first explicitly defined node.

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])

Here's an example of a node with several input sources:

pipeline.add_node(component=JoinNode(), name='Joiner',
inputs=['Retriever1', 'Retriever2'])

If the predecessor node has more than one output, you’ll need to specify the output number in the inputs list. For example:

pipeline.add_node(component=Branch1(), name='Branch1',
inputs=['TopicClassifier.output_1'])
pipeline.add_node(component=Branch2(), name='Branch2',
inputs=['TopicClassifier.output_2'])

Under the hood, the nodes are placed in a queue and executed one by one when the run() method is invoked. The output of the last node in the queue is the output of the entire pipeline.

When you create a custom pipeline, you need to pay extra care that each node’s output is compatible with the input of the successive node in the chain. Otherwise, your system will throw an error at runtime.

Arguments

Each node in a Pipeline defines the arguments the run() method accepts. The Pipeline class takes care of passing relevant arguments to the node. In addition to mandatory inputs like query, the run() accepts optional node parameters like top_k with the params argument. For instance, params={"top_k": 5} will set the top_k of all nodes as 5. To target params to a specific node, the node name can be explicitly specified as params={"Retriever": {"top_k": 5}}.

res = pipeline.run(
query="What did Einstein work on?",
params={"Retriever": {"top_k": 5}, "Reader": {"top_k": 3}}
)

Run a Pipeline

The run() function is the single command that triggers the execution of the entire pipeline:

query = "What's the history of Quidditch?"
pipeline.run(query=query)

Every node has its own run() method, and the pipeline run() call invokes each node, one after the other. When you run() a pipeline, all the function arguments are propagated to every node in the graph. To disambiguate, say, the top_k values of retriever and ranker, they have aliases that are automatically recognized by the respective modules. This lets you dynamically modify these parameters in each call to the pipeline:

pipeline.run(query=query, params={"retriever": {"top_k": 28}, "ranker": {"top_k": 9}})

Inspect a Pipeline

Using draw()

The pipeline.draw() method generates a sketch of your pipeline. By looking at a drawing of your pipeline, you may be able to confirm that the graph is indeed structured in the way that you intended. This is especially true for customized graphs that may branch out at some point.

image

Accessing Pipeline Nodes

If your custom pipeline is not working as intended, try running your nodes in isolation. You may access any pipeline node by using the get_node() method and specifying the component's name:

retriever_node = pipeline.get_node('Retriever')

Add debug information

Nodes in a Pipeline can add debug information that gets propagated to the final output of a Pipeline. For instance, a decision Node can append details on the decision made.

To return debug data from a Node, add a _debug key in the output dict. The value can be a primitive or a dict. For instance,

def run(self, query: str):
if "?" in query:
return {"_debug": "The query contains a question mark"}, "output_1"
else:
return {"_debug": "The query does not contains a question mark"}, "output_2"

This _debug gets appended to a "global" _debug dict storing per Node debug data that gets returned with the final output. The final output may look like: {"answers": ..., "_debug": {"node_a": "my debug info", "node_b": {"key": "value"}}}

A Node in a Pipeline can access the global _debug from preceding nodes by adding _debug in the run() method:

def run(self, query: str, _debug: dict):
debug_info = _debug["PrecedingNodeA"]
...

Running a Node in Isolation

When you execute a pipeline with run(), it successively invokes the run() methods of all nodes in the queue. However, you can also use a given node's run() method in isolation.

retriever_node.run(query=query, pipeline_type='Query')

What happens during an individual run depends entirely on the given node's definition. For example, the retriever's run() method calls run_query(), which in turn calls retrieve() and a few other methods. Once you have extracted your node from the pipeline with the get_node() method, you're free to run any one of that node's class methods:

retriever_node.run_query(query=query)
retriever_node.retrieve(query=query)

If you want to find out which class methods are called by a component's run() function, we recommend that you take a look at the definitions (e.g., this one in the source code.

YAML File Definitions

For your convenience, there is also the option of defining and loading pipelines in YAML files. Having your pipeline available in a YAML is particularly useful when you move between experimentation and production environments. Just export the YAML from your notebook / IDE and import it into your production environment. It also helps with version control of pipelines, allows you to share your pipeline easily with colleagues, and simplifies the configuration of pipeline parameters in production.

For example, you can define and save a simple Retriever Reader pipeline by saving the following to a file:

version: "0.9"
components: # define all the building-blocks for Pipeline
- name: MyReader # custom-name for the component; helpful for visualization & debugging
type: FARMReader # Haystack Class name for the component
params:
no_ans_boost: -10
model_name_or_path: deepset/roberta-base-squad2
- name: MyESRetriever
type: ElasticsearchRetriever
params:
document_store: MyDocumentStore # params can reference other components defined in the YAML
custom_query: null
- name: MyDocumentStore
type: ElasticsearchDocumentStore
params:
index: haystack_test
pipelines: # multiple Pipelines can be defined using the components from above
- name: my_query_pipeline # a simple extractive-qa Pipeline
nodes:
- name: MyESRetriever
inputs: [Query]
- name: MyReader
inputs: [MyESRetriever]

To load, simply call:

pipeline.load_from_yaml(Path("sample.yaml"))

For another example YAML config, check out this file.

Custom Nodes

Thanks to the modularity of pipelines, you can create your own nodes and comfortably integrate them into your system. You should define a run() function at the core of each node class that accepts a flexible number of mandatory or optional keyword arguments. That's where the entire functionality of your node will be defined. Let's look at a node class template:

class NodeTemplate(BaseComponent):
outgoing_edges = 1
def run(self, query: str, my_arg: Optional[int] = 10):
# Insert code here to manipulate the input & produce an output
return output, "output_1"

Usually, your node will have one outgoing edge and thus one return value. A node's return value should come in the form of a Python dictionary. That value is returned within a tuple, which also contains the outgoing edge name e.g., output_1.

It's also possible to have more than one outgoing edge, typically in a decision node. A decision node's run() method consists of a decision function that determines the path in the graph by which to send down its input. Such a function has more than one possible return value, and all of these will be named accordingly, i.e. output_1, output_2, and so forth.

When defining your own custom nodes, you must inherit from haystack.BaseComponent. This registers the node as a Component that can later be added to a Pipeline.

from haystack import BaseComponent
class CustomNode(BaseComponent):
pass

Decision nodes

You can add decision nodes where only one "branch" is executed afterwards. This allows, for example, to classify an incoming query and depending on the result routing it to different modules. To find a ready-made example of a decision node, have a look at the page about the QueryClassifier.

image

If you'd like to define our own, you'll need to create a class that looks something like this:

class QueryClassifier(BaseComponent):
outgoing_edges = 2
def run(self, query):
if "?" in query:
return {}, "output_1"
else:
return {}, "output_2"
pipe = Pipeline()
pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"])
pipe.add_node(component=es_retriever, name="ESRetriever", inputs=["QueryClassifier.output_1"])
pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_2"])
pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults",
inputs=["ESRetriever", "DPRRetriever"])
pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})

Evaluation nodes

There are nodes in Haystack that are used to evaluate the performance of readers, retrievers and combine systems. To get hands on with this kind of node, have a look at the evaluation tutorial.

Ready-Made Pipelines

Last but not least, we added some ready-made pipelines that allow you to run standard patterns with very few lines of code. See the ready-made pipelines page and pipelines API documentation to learn more about these.

Examples:

from haystack.pipeline import DocumentSearchPipeline, ExtractiveQAPipeline, Pipeline, JoinDocuments
# Extractive QA
qa_pipe = ExtractiveQAPipeline(reader=reader, retriever=retriever)
res = qa_pipe.run(query="When was Kant born?", params={"retriever": {"top_k": 3}, "reader": {"top_k": 5}})
# Document Search
doc_pipe = DocumentSearchPipeline(retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})
# Generative QA
doc_pipe = GenerativeQAPipeline(generator=rag_generator, retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})
# FAQ based QA
doc_pipe = FAQPipeline(retriever=retriever)
res = doc_pipe.run(query="How can I change my address?", params={"retriever": {"top_k": 3}})

Example: Multiple retrievers

You can now also use multiple Retrievers and join their results:

from haystack import Pipeline
p = Pipeline()
p.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"])
p.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["Query"])
p.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"])
p.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})

image

Example: Creating a Retriever-Ranker-Summarizer Pipeline

In this example, we'll look at how to establish a custom Retriever-Ranker-Summarizer pipeline. It's useful to add a Ranker to a summarization pipeline because the output of the Summarizer depends on the order of the documents that it receives.

from haystack import Pipeline
pipeline = Pipeline()

To create new pipeline nodes, we initialize the modules first. For our use case, we need a retriever, a ranker, and a summarizer. We tell the summarizer to return a single summary per query (instead of one summary for each document), and that its length should be somewhere between ten and 300 words:

from haystack.retriever import ElasticsearchRetriever
from haystack.ranker import FARMRanker
from haystack.summarizer import TransformersSummarizer
retriever = ElasticsearchRetriever(document_store, top_k=10)
ranker=FARMRanker(model_name_or_path="sentence-transformers/distilbert-multilingual-nli-stsb-quora-ranking", top_k=10)
summarizer = TransformersSummarizer(model_name_or_path='t5-large', min_length=10, max_length=300, generate_single_summary=True)

We add the nodes to the pipeline:

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])
pipeline.add_node(component=ranker, name='Ranker', inputs=['Retriever'])
pipeline.add_node(component=summarizer, name='Summarizer', inputs=['Ranker'])

Let's now run our custom pipeline on the Harry Potter Wiki dataset. A typical application for this pipeline would be a situation where we want some high-level information about our corpus that is not necessarily contained within one document. We therefore retrieve multiple documents, rank them, and let the summarizer return a single summary of all the texts.

query = "What's the history of Quidditch?"
result = pipeline.run(query=query)

The pipeline returns a dictionary that contains the query, the name of the last node, and a list of documents:

result.keys()
>>> dict_keys(['documents', 'query', 'node_id'])

Since we requested a single summary of all the texts we inputted to the summarizer, the list of documents contains only one item. We access the summary through the text attribute:

result['documents'][0].text
>>> "the first record of a primitive form of Quidditch (''Kwidditch'') dates to c. 1050. the first known reference to wizards using broomsticks as a means of conveyance dates to A.D. 963. a variant of the game, Quodpot, was invented in the eighteenth century. in the middle of the 14th century it was made a protected species by the wizards council."

Example: Creating a Custom Translation Node

Let's say that we wanted to add a special translation module to our pipeline. Instead of just translating into one predefined language, our node should be able to return a summary in any language we want (i.e., for which we have a trained model). To that end, we define a CustomTranslator class. Since there's no decision function involved, we set outgoing_edges = 1:

class CustomTranslator():
outgoing_edges = 1

Within a pipeline node, the run() function is where all the action happens. Our run function receives a language argument that tells the translator which translation model to initialize:

def run(self, language='fr', **kwargs):
translator = TransformersTranslator(model_name_or_path=f'Helsinki-NLP/opus-mt-en-{language}')

We run the translator with the specified model and return its output.

translation = translator.run(documents=kwargs['documents'])
return translation

We initialize this node directly when adding it to the pipeline. As usual, we specify a name and the inputs for this node:

pipeline.add_node(component=CustomTranslator(), name='CustomTranslator', inputs=['Summarizer'])

We can now call the pipeline with any Helsinki-NLP translation model from HuggingFace with English as a source language. Pipeline arguments are simply propagated through the pipeline. This means that if we want to pass a language value to our custom node, we simply specify it in our call to the pipeline. Let's look at the French summary of a popular wizard sport:

query = "What's the history of Quidditch?'
result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 20}, "language": "fr"})
result['documents'][0].text
>>> "''Quidditch'' a obtenu son nom du marais queerditch, l'emplacement du premier jeu enregistré. le jeu a été basé sur un jeu joué par une sorcière au 11ème siècle. un snitch d'or a été introduit à la suite d'un jeu 1269 joué en kent. on pense qu'une version balai du jeu peut avoir inspiré le mouvement du jeu moderne 'harlem shuffle'"

Now, how about Ukrainian?

result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 30}, "language": "uk"})
result['documents'][0].text
>>> '" Quuiditch " отримала свою назву від дивного болота, місця першої в історії записаної гри. Гру було засновано на грі, яку грала відьма у XI столітті. Золотий стукач було введено у гру 1269 гри в кенті. Вважається, що версія мітла у грі, можливо, надихнула сучасну гру на " заплутування " move " гри'

Distributed Pipelines with Ray

Ray (https://ray.io) is a framework for distributed computing.

Ray allows distributing a Pipeline's components across a cluster of machines. The individual components of a Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas of the Reader and a single replica for the Retriever. It enables efficient resource utilization by horizontally scaling Components.

To set the number of replicas, add replicas in the YAML config for the node in a pipeline:

components:
...
pipelines:
- name: ray_query_pipeline
type: RayPipeline
nodes:
- name: ESRetriever
replicas: 2 # number of replicas to create on the Ray cluster
inputs: [ Query ]

A RayPipeline can only be created with a YAML Pipeline config:

from haystack.pipeline import RayPipeline
pipeline = RayPipeline.load_from_yaml(path="my_pipelines.yaml", pipeline_name="my_query_pipeline")
pipeline.run(query="What is the capital of Germany?")

By default, RayPipelines creates an instance of RayServe locally. To connect to an existing Ray instance, set the address parameter when creating the RayPipeline instance.