All articles/How to Build a RAG Agent with LangGraph and Needle
How to Build a RAG Agent with LangGraph and Needle

How to Build a RAG Agent with LangGraph and Needle

Today, let’s explore how to build sophisticated Retrieval-Augmented Generation (RAG) Agents with state management and complex workflows using two powerful tools

Author Image

Jan Heimes

CEO @ Needle AI

  1. Uploads documents to a Needle collection.
  2. Waits for the documents to finish indexing.
  3. Searches those documents based on a user’s query.
  4. Responds with an answer using a Large Language Model (LLM).


We’ll build this agent using two tools:


Needle, a “RAG API and Knowledge Threading” platform for document ingestion, chunking, embedding, and semantic search.


LangGraph, a library that lets you create stateful, multi-step LLM workflows (or “agents”) that can maintain state between steps.


By the end of this tutorial, you’ll have a fully functional RAG agent that uses Needle for document retrieval and LangGraph for orchestrating a multi-step process.


Why Needle and LangGraph?


Needle handles the heavy lifting of storing and indexing your documents. Its built-in chunking, embedding, and semantic search features make it easy to build RAG applications without managing complex infrastructure.


LangGraph specializes in creating stateful workflows for LLM-powered agents. It helps you chain together steps, like uploading files, waiting for indexing, searching, and more, while keeping track of important context along the way.


Together, these tools provide a clean, modular approach to building powerful RAG workflows. You can reference this GitHub repository (replace with your own link) that demonstrates everything we’ll cover.


Prerequisites


Before you begin, make sure you have:


Python 3.9+ installed.

A Needle account and API key.

An OpenAI account and API key.

Dependencies installed:

pip install needle langgraph langchain_community openai


A Needle collection already created (or you can create one). You’ll need the COLLECTION_ID for that collection.


Demo Preview

By the end of this tutorial, your agent will:


Add a file (for example, documentation) to the Needle collection.

Pause until Needle indexes that file.

Perform a semantic search against that document.

Return an answer to a user’s question.



The Code (Step-by-Step)


Below is a complete, annotated example showing how to build and run this workflow. Feel free to copy and adapt it to your own needs.


Step 1: Initialize the Environment

import os

import time

from typing import List, Union, Literal, TypedDict


# LangChain & OpenAI

from langchain.chat_models import ChatOpenAI

from langchain.schema import HumanMessage, AIMessage

from langchain.prompts.chat import ChatPromptTemplate

from langchain.chains import create_stuff_documents_chain, create_retrieval_chain


# LangChain Community

from langchain_community.document_loaders import NeedleLoader

from langchain_community.retrievers import NeedleRetriever


# LangGraph

from langgraph.graph import StateGraph, START, END

from langgraph.types import ToolMessage

from langgraph.decorators import tool

from langgraph.nodes import ToolNode

from typing_extensions import Annotated

from langgraph.utils import add_messages


#####################################################

# 1) Set up API Keys and Collection

#####################################################


NEEDLE_API_KEY = os.environ.get("NEEDLE_API_KEY")

OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")


if not NEEDLE_API_KEY or not OPENAI_API_KEY:

raise ValueError(

"Required environment variables NEEDLE_API_KEY and OPENAI_API_KEY must be set"

)


COLLECTION_ID = "clt_01JGVAHK2V00KEJJNR84BS1FKA" # Replace with your own collection ID


print("✅ API keys validated and collection ID set.")



#####################################################

# 2) Define Our Agent State

#####################################################


class AgentState(TypedDict):

messages: Annotated[List[Union[HumanMessage, AIMessage, ToolMessage]], add_messages]

file_added: bool

indexed: bool



#####################################################

# 3) Initialize Needle Loader and Retriever

#####################################################


needle_loader = NeedleLoader(

needle_api_key=NEEDLE_API_KEY,

collection_id=COLLECTION_ID

)

retriever = NeedleRetriever(

needle_api_key=NEEDLE_API_KEY,

collection_id=COLLECTION_ID

)


#####################################################

# 4) Define Tools for Needle Operations

#####################################################


@tool

def add_file_to_collection() -> str:

"""

Add a file to the Needle collection.

Replace 'docs.needle-ai.com' and the URL with your own file or website.

"""

print("\n📥 Adding documentation to collection...")

files = {

"docs.needle-ai.com": "https://docs.needle-ai.com"

}

needle_loader.add_files(files=files)

print("✅ File added successfully")

return "File added successfully"


@tool

def search_collection(query: str) -> str:

"""

Search the Needle collection using a retrieval chain.

Returns a direct answer from the retrieved context.

"""

print(f"\n🔍 Searching collection for: '{query}'")


llm = ChatOpenAI(temperature=0)


system_prompt = """

You are an assistant for question-answering tasks.

Use the following pieces of retrieved context to answer the question.

If you don't know, say so concisely.\n\n{context}

"""


prompt = ChatPromptTemplate.from_messages([

("system", system_prompt),

("human", "{input}")

])


# Create a chain that retrieves documents and then answers the question

question_answer_chain = create_stuff_documents_chain(llm, prompt)

rag_chain = create_retrieval_chain(retriever, question_answer_chain)


print("⚡ Executing RAG chain...")

response = rag_chain.invoke({"input": query})

print("✨ Search complete")


# Return the final answer

return str(response.get('answer', response))



#####################################################

# 5) Bind Tools to the Model

#####################################################


model = ChatOpenAI(model="gpt-4", temperature=0).bind_tools([

add_file_to_collection,

search_collection

])


Explanation


NeedleLoader and NeedleRetriever: These classes handle file ingestion and retrieval for the collection you specify in COLLECTION_ID.

Tools: We create two “tools” that the agent can call:

add_file_to_collection() – loads a file into Needle.

search_collection(query) – uses a retrieval chain to answer a question with context from the newly added file.



Step 2: Define the Workflow Logic

A workflow manages the sequence of steps. We’ll use LangGraph to either:

Ask the AI model what to do next,

Wait for indexing,


Or run a tool to add/search files.


#####################################################

# 6) Define Workflow Logic

#####################################################


def should_continue(state: AgentState) -> Literal["tools", "agent", END]:

messages = state["messages"]


if not messages:

# If no messages yet, prompt the AI

return "agent"


last_message = messages[-1]


# If the last message came from a tool

if isinstance(last_message, ToolMessage):

# Check if it was "File added successfully"

if last_message.content == "File added successfully":

state["file_added"] = True

print("\n📌 File addition confirmed")

return "agent"

print("\n🏁 Search complete, ending workflow")

return END


# If the last message was from the AI

if isinstance(last_message, AIMessage):

# If we've already added a file and used the search tool, we can end

if any(

isinstance(m, ToolMessage) and m.content != "File added successfully"

for m in messages[:-1]

):

return END


# If the AI asks to call a tool

if hasattr(last_message, "tool_calls") and last_message.tool_calls:

# If the file is added but not indexed, we wait

if state.get("file_added") and not state.get("indexed"):

print("\n⏳ Waiting for indexing to complete...")

return "agent"

print("\n🛠️ Executing tool calls...")

return "tools"


return "agent"



def call_model(state: AgentState):

messages = state["messages"]


if not messages:

print("\n🤖 Starting conversation...")

return {

"messages": [

HumanMessage(content="Let's add the file to our Needle collection and then search for what it says about Needle.")

]

}


# Wait for indexing if file is added but not yet indexed

if state.get("file_added") and not state.get("indexed"):

print("\n⏳ Waiting 30 seconds for file indexing...")

time.sleep(30)

state["indexed"] = True

print("✅ Indexing complete")

return {

"messages": [

HumanMessage(content="File has been indexed. Now searching for information about Needle...")

]

}


# Otherwise, let the AI respond

response = model.invoke(messages)

return {"messages": [response]}

Explanation

should_continue: Checks the conversation’s latest message to decide whether to:


Call a tool ("tools"),


Have the AI speak again ("agent"),


End the workflow (END).


call_model: Either starts the conversation, waits for indexing, or calls the bound model (model) to generate the next message.


Step 3: Compile and Run the Workflow

Now we’ll tie everything together and execute our multi-step process.


#####################################################

# 7) Compile and Execute the Graph

#####################################################


# Create a graph of states and transitions

workflow = StateGraph(AgentState)

workflow.add_node("agent", call_model)

workflow.add_node("tools", ToolNode([add_file_to_collection, search_collection]))


workflow.add_edge(START, "agent")

workflow.add_conditional_edges("agent", should_continue, {

"tools": "tools",

"agent": "agent",

END: END

})

workflow.add_edge("tools", "agent")


app = workflow.compile()


#####################################################

# 8) Initialize the State and Run

#####################################################


initial_state = {

"messages": [],

"file_added": False,

"indexed": False

}


print("\n🚀 Starting Needle operations...")

print("=" * 50)


for step in app.stream(initial_state, {"recursion_limit": 10}):

# Print messages from each step

if "messages" in step.get("agent", {}):

for msg in step["agent"]["messages"]:

msg.pretty_print()

elif "messages" in step.get("tools", {}):

for msg in step["tools"]["messages"]:

msg.pretty_print()


print("\n✨ Workflow complete!")

print("=" * 50)

Explanation

StateGraph: Manages transitions between the "agent" (AI decisions) and "tools" (tool execution).


app.stream: Steps through the workflow until we reach the end state.


Trying It Out

Set environment variables:


export NEEDLE_API_KEY="your-needle-api-key"

export OPENAI_API_KEY="your-openai-api-key"

Run the script:


python rag_workflow.py


Observe:


The script prompts, “Let’s add the file to our Needle collection…”


It calls add_file_to_collection.


Waits 30 seconds to allow indexing.


Searches for content about Needle.


Ends the workflow when the AI is satisfied with the final response.


How It Works (In a Nutshell)


Initialize: You specify your API keys, collection ID, and create a loader/retriever.

Add a File: The agent calls add_file_to_collection.

Indexing Delay: We wait 30 seconds to let Needle index the file.

Search: The agent queries the newly added file for the user’s question.

Multi-Step: LangGraph tracks these steps so each one executes at the right time.


Key Takeaways


Minimal Dependencies:

  1. Needle: For document ingestion and retrieval.
  2. LangGraph: For orchestrating a stateful workflow.
  3. OpenAI: For LLM responses.


Stateful Workflows: LangGraph helps you split your process into small, manageable steps.


Beginner-Friendly: The code waits for indexing automatically, so you don’t have to remember to do it manually.


Easily Extensible: You can add more steps (like transformations or additional data checks) without altering the rest of the workflow.


Conclusion


Combining Needle and LangGraph allows you to build RAG agents that gracefully handle multi-step processes, such as uploading documents, waiting for indexing, and searching. This example is just a starting point: customize the wait time, build more elaborate logic, or add new tools to your workflow.


Next Steps


Check out the Needle documentation for more details on advanced indexing options.

Explore LangGraph on GitHub for building more sophisticated multi-step LLM workflows.

Browse LangChain Community integrations for more ways to enhance your agent.



Happy building!

More Techbible