Overview

In this phase you assemble everything you have learned into production-style applications. You will:

  • Build a robust RAG assistant orchestrated by LangGraph.
  • Design multi-agent systems with clear roles and shared state.
  • Learn configuration, testing, deployment, monitoring, and security basics.
  • Explore advanced topics and directions for deeper study.

Module 6 – Building a Production-style RAG Assistant with LangGraph

This module combines LangChain (for RAG) with LangGraph (for orchestration) and wraps the result in a simple API server.

6.1 Architecture Overview

At a high level, your system will have:

  • Ingestion pipeline – load, split, embed, and index documents.
  • RAG graph – query refinement, retrieval, answer generation.
  • API server – FastAPI endpoint that calls the graph.
  • Logging & evaluation – record interactions and test queries.

6.2 Ingestion Script

Create an ingestion script that can be rerun whenever your content changes:

# src/phase3/ingest_corpus.py
import os
from pathlib import Path

from dotenv import load_dotenv
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS


DATA_DIR = Path("corpus")
INDEX_DIR = Path("indexes")
INDEX_DIR.mkdir(exist_ok=True)


def load_documents():
    docs = []
    for path in DATA_DIR.glob("**/*.md"):
        loader = TextLoader(str(path))
        docs.extend(loader.load())
    return docs


def main():
    load_dotenv()
    docs = load_documents()
    print(f"Loaded {len(docs)} documents.")

    splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=150)
    chunks = splitter.split_documents(docs)
    print(f"Split into {len(chunks)} chunks.")

    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    vectorstore = FAISS.from_documents(chunks, embedding=embeddings)

    faiss_path = INDEX_DIR / "course_index.faiss"
    index_meta_path = INDEX_DIR / "course_index.pkl"

    vectorstore.save_local(folder_path=str(INDEX_DIR), index_name="course_index")
    print(f"Saved FAISS index to {INDEX_DIR}")


if __name__ == "__main__":
    main()

6.3 RAG Graph Orchestration

Next, create a LangGraph graph that encapsulates the query flow. We’ll reuse the same index and RAG logic, but introduce nodes for logging and error handling.

# src/phase3/rag_graph.py
from typing import TypedDict, List, Optional
from datetime import datetime

from dotenv import load_dotenv

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END


class RAGSessionState(TypedDict, total=False):
    question: str
    refined_question: str
    retrieved_docs: List[str]
    answer: str
    error: str
    log: List[str]


def build_vectorstore():
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    vectorstore = FAISS.load_local(
        folder_path="indexes",
        index_name="course_index",
        embeddings=embeddings,
        allow_dangerous_deserialization=True,  # local, trusted environment
    )
    return vectorstore.as_retriever(search_kwargs={"k": 4})


def init_state(state: RAGSessionState) -> RAGSessionState:
    log_entry = f"[{datetime.utcnow().isoformat()}] Received question."
    log = state.get("log", [])
    log.append(log_entry)
    return {"log": log}


def refine_question(state: RAGSessionState, llm: ChatOpenAI) -> RAGSessionState:
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "Rewrite the question to be specific and self-contained."),
            ("human", "{question}"),
        ]
    )
    chain = prompt | llm | StrOutputParser()
    refined = chain.invoke({"question": state["question"]})
    log = state.get("log", [])
    log.append(f"Refined question: {refined}")
    return {"refined_question": refined, "log": log}


def retrieve(state: RAGSessionState, retriever) -> RAGSessionState:
    query = state.get("refined_question") or state["question"]
    docs = retriever.get_relevant_documents(query)
    contents = [d.page_content for d in docs]
    log = state.get("log", [])
    log.append(f"Retrieved {len(contents)} docs.")
    return {"retrieved_docs": contents, "log": log}


def generate_answer(state: RAGSessionState, llm: ChatOpenAI) -> RAGSessionState:
    context = "\n\n".join(state.get("retrieved_docs", []))
    if not context.strip():
        return {"error": "No relevant context found.", "answer": ""}

    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a helpful assistant for this course. Use only the given context.",
            ),
            ("human", "Context:\n{context}\n\nQuestion: {question}\n\nAnswer:"),
        ]
    )
    chain = prompt | llm | StrOutputParser()
    answer = chain.invoke({"context": context, "question": state["question"]})
    log = state.get("log", [])
    log.append("Generated answer.")
    return {"answer": answer, "log": log}


def build_rag_app():
    load_dotenv()
    retriever = build_vectorstore()
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.2)

    workflow = StateGraph(RAGSessionState)

    workflow.add_node("init", lambda s: init_state(s))
    workflow.add_node("refine", lambda s: refine_question(s, llm))
    workflow.add_node("retrieve", lambda s: retrieve(s, retriever))
    workflow.add_node("answer", lambda s: generate_answer(s, llm))

    workflow.set_entry_point("init")
    workflow.add_edge("init", "refine")
    workflow.add_edge("refine", "retrieve")
    workflow.add_edge("retrieve", "answer")
    workflow.set_finish_point("answer")

    app = workflow.compile()
    return app

6.4 Exposing the Graph via FastAPI

Now build a small FastAPI server that calls the graph:

# src/phase3/api_server.py
from fastapi import FastAPI
from pydantic import BaseModel
from fastapi.middleware.cors import CORSMiddleware

from rag_graph import build_rag_app


class QuestionRequest(BaseModel):
    question: str


class AnswerResponse(BaseModel):
    answer: str
    log: list[str]
    error: str | None = None


app = FastAPI(title="LangGraph RAG Assistant")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

rag_app = build_rag_app()


@app.post("/ask", response_model=AnswerResponse)
async def ask(req: QuestionRequest):
    state = rag_app.invoke({"question": req.question})
    return AnswerResponse(
        answer=state.get("answer", ""),
        log=state.get("log", []),
        error=state.get("error"),
    )

Run the server and test:

uvicorn src.phase3.api_server:app --reload

6.5 Evaluation Basics

Before deployment, you should evaluate the assistant on a small set of test questions. You can start with a simple script that:

  • Loads a list of (question, expected_notes).
  • Calls the /ask endpoint or rag_app.invoke directly.
  • Logs answers for manual review and calculates latency.
# src/phase3/eval_rag.py
import time
from rag_graph import build_rag_app


TEST_QUERIES = [
    "What is LangChain?",
    "What is LangGraph and when should I use it?",
    "Explain what RAG is in this course.",
]


def main():
    app = build_rag_app()
    for q in TEST_QUERIES:
        start = time.time()
        state = app.invoke({"question": q})
        elapsed = time.time() - start

        print("=" * 60)
        print("Question:", q)
        print("Answer:", state.get("answer", ""))
        print("Error:", state.get("error"))
        print(f"Latency: {elapsed:.2f}s")


    if __name__ == "__main__":
    main()

6.6 Streaming & Async

For a better user experience, you often want to stream tokens from the model to the client as they are generated. LangGraph apps support streaming just like LCEL chains.

You can expose streaming over HTTP using Server‑Sent Events (SSE) in FastAPI:

# src/phase3/api_server_streaming.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from rag_graph import build_rag_app


app = FastAPI(title="LangGraph RAG Assistant (Streaming)")
rag_app = build_rag_app()


def format_stream(question: str):
    # app.stream yields intermediate state updates as the graph runs
    for step in rag_app.stream({"question": question}):
        # Each `step` is a dict like {"node_name": state}
        # You can choose what to send; here we stream the latest answer fragment.
        for node_name, node_state in step.items():
            answer = node_state.get("answer")
            if answer:
                yield answer


@app.get("/stream")
def stream_answer(question: str):
    return StreamingResponse(
        format_stream(question),
        media_type="text/plain",
    )

For heavier workloads, you can make your nodes async and call async-friendly tools or models, then use async for over app.astream() in an async FastAPI endpoint. The design is similar: the graph encapsulates orchestration, and the web layer simply streams or returns results.

Module 7 – Multi-Agent Systems with LangGraph

Multi-agent systems use multiple specialized agents that collaborate on a task: for example, a planner, a researcher, a writer, and a reviewer.

7.1 Design Patterns

Common patterns include:

  • Planner–worker – planner breaks down tasks, workers execute them.
  • Specialists – different agents for search, code, writing, etc.
  • Hierarchy – a controller agent routes tasks to sub-agents.

7.2 Shared State for Multi-Agent Graphs

All agents operate over a shared state structure. Define it carefully:

# src/phase3/multi_agent_graph.py
from typing import TypedDict, List


class MultiAgentState(TypedDict, total=False):
    goal: str
    plan: List[str]
    research_notes: str
    draft_report: str
    reviewed_report: str
    messages: List[str]

7.3 Planner Agent Node

The planner takes a goal and produces a list of concrete steps:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser


def build_llm():
    return ChatOpenAI(model="gpt-4o-mini", temperature=0.2)


def planner_node(state: MultiAgentState) -> MultiAgentState:
    llm = build_llm()
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a project planner. Break the goal into 3-6 numbered steps."),
            ("human", "{goal}"),
        ]
    )
    chain = prompt | llm | StrOutputParser()
    plan_text = chain.invoke({"goal": state["goal"]})

    steps = []
    for line in plan_text.splitlines():
        line = line.strip()
        if not line:
            continue
        # Rough parsing of lines like "1. Do X"
        if "." in line:
            line = line.split(".", 1)[1].strip()
        steps.append(line)

    messages = state.get("messages", [])
    messages.append("Planner produced steps.")
    return {"plan": steps, "messages": messages}

7.4 Research Agent Node

The research agent could call tools (web search, RAG) to gather information; here we will simulate with the LLM:

def research_node(state: MultiAgentState) -> MultiAgentState:
    llm = build_llm()
    plan = state.get("plan", [])
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a research assistant. For each step, write concise notes.",
            ),
            ("human", "Goal: {goal}\n\nSteps:\n{steps}"),
        ]
    )
    chain = prompt | llm | StrOutputParser()
    steps_text = "\n".join(f"- {s}" for s in plan)
    notes = chain.invoke({"goal": state["goal"], "steps": steps_text})

    messages = state.get("messages", [])
    messages.append("Researcher produced notes.")
    return {"research_notes": notes, "messages": messages}

7.5 Writer & Reviewer Agents

def writer_node(state: MultiAgentState) -> MultiAgentState:
    llm = build_llm()
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "You are a technical writer. Draft a clear, structured report."),
            ("human", "Goal:\n{goal}\n\nResearch notes:\n{notes}"),
        ]
    )
    chain = prompt | llm | StrOutputParser()
    draft = chain.invoke({"goal": state["goal"], "notes": state["research_notes"]})
    messages = state.get("messages", [])
    messages.append("Writer produced draft.")
    return {"draft_report": draft, "messages": messages}


def reviewer_node(state: MultiAgentState) -> MultiAgentState:
    llm = build_llm()
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                "You are a strict reviewer. Fix style, structure, and obvious hallucinations. "
                "If information seems unsupported, flag it.",
            ),
            ("human", "Draft report:\n{draft}"),
        ]
    )
    chain = prompt | llm | StrOutputParser()
    reviewed = chain.invoke({"draft": state["draft_report"]})
    messages = state.get("messages", [])
    messages.append("Reviewer edited draft.")
    return {"reviewed_report": reviewed, "messages": messages}

7.6 Moderation Node

Add a simple moderation node that checks for dangerous content:

def moderation_node(state: MultiAgentState) -> MultiAgentState:
    report = state.get("reviewed_report") or state.get("draft_report", "")
    # Simple heuristic check; in production, integrate with a moderation API.
    banned_keywords = ["password", "credit card", "ssn"]
    flags = [kw for kw in banned_keywords if kw.lower() in report.lower()]

    messages = state.get("messages", [])
    if flags:
        messages.append(f"Moderation warning: found {flags}.")
        safe_report = report + "\n\n[WARNING: Potential sensitive data removed.]"
        return {"reviewed_report": safe_report, "messages": messages}

    messages.append("Moderation passed.")
    return {"messages": messages}

7.7 Wiring the Multi-Agent Graph

from langgraph.graph import StateGraph


def build_multi_agent_app():
    workflow = StateGraph(MultiAgentState)

    workflow.add_node("planner", planner_node)
    workflow.add_node("researcher", research_node)
    workflow.add_node("writer", writer_node)
    workflow.add_node("reviewer", reviewer_node)
    workflow.add_node("moderation", moderation_node)

    workflow.set_entry_point("planner")
    workflow.add_edge("planner", "researcher")
    workflow.add_edge("researcher", "writer")
    workflow.add_edge("writer", "reviewer")
    workflow.add_edge("reviewer", "moderation")
    workflow.set_finish_point("moderation")

    return workflow.compile()
# src/phase3/run_multi_agent.py
from multi_agent_graph import build_multi_agent_app


def main():
    app = build_multi_agent_app()
    state = app.invoke(
        {"goal": "Write a short report explaining LangChain and LangGraph to a new engineer."}
    )
    print("Messages:")
    for msg in state.get("messages", []):
        print("-", msg)

    print("\nFinal report:\n")
    print(state.get("reviewed_report") or state.get("draft_report", "No report"))


if __name__ == "__main__":
    main()

Module 8 – Hardening & Deploying LangChain + LangGraph Apps

This module covers configuration management, testing, deployment, monitoring, and basic security concerns.

8.1 Configuration & Environment Management

Avoid hard-coding model names, temperatures, and service URLs. Instead, centralize them:

# src/phase3/config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv


@dataclass
class ModelConfig:
    chat_model: str = "gpt-4o-mini"
    temperature: float = 0.2
    embedding_model: str = "text-embedding-3-small"


@dataclass
class AppConfig:
    environment: str
    model: ModelConfig


def load_config() -> AppConfig:
    load_dotenv()
    env = os.getenv("APP_ENV", "dev")
    model = ModelConfig(
        chat_model=os.getenv("CHAT_MODEL", "gpt-4o-mini"),
        temperature=float(os.getenv("CHAT_TEMPERATURE", "0.2")),
        embedding_model=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small"),
    )
    return AppConfig(environment=env, model=model)

8.2 Testing

You should test both individual components and end-to-end flows:

  • Unit tests – nodes, utility functions, prompt formatters.
  • Golden tests – fixed input and expected output snapshots.
  • Regression tests – re-run tests when you change prompts or models.
# tests/test_rewrite_query.py
from phase2.basic_graph import rewrite_query, RAGState


def test_rewrite_query_makes_nonempty_string():
    state: RAGState = {"question": "What is LangChain?"}
    new_state = rewrite_query(state)
    assert "rewritten_question" in new_state
    assert isinstance(new_state["rewritten_question"], str)
    assert new_state["rewritten_question"].strip()

8.3 Deployment Options

Common deployment patterns include:

  • Containerized FastAPI app behind a load balancer.
  • Serverless function (for short-lived requests).
  • Background workers for long-running agents (with queues).

A minimal Dockerfile might look like:

# Dockerfile (sketch)
FROM python:3.11-slim

WORKDIR /app
COPY pyproject.toml poetry.lock ./  # or requirements.txt
RUN pip install --no-cache-dir fastapi uvicorn langchain langgraph ...

COPY src ./src

CMD ["uvicorn", "src.phase3.api_server:app", "--host", "0.0.0.0", "--port", "8000"]

8.4 Monitoring & Observability

For real applications, you should:

  • Log inputs, outputs, and errors with correlation IDs.
  • Track latency and cost per request.
  • Use traces (e.g. via LangSmith or similar tools) to inspect model calls and graph steps.
import logging
import uuid
from datetime import datetime

logger = logging.getLogger("rag_app")
logging.basicConfig(level=logging.INFO)


def log_request(question: str):
    request_id = str(uuid.uuid4())
    logger.info("request_id=%s question=%s", request_id, question)
    return request_id


def log_response(request_id: str, answer: str, error: str | None):
    logger.info(
        "request_id=%s answer_length=%d error=%s",
        request_id,
        len(answer),
        error,
    )

8.5 Security & Guardrails

Key concerns:

  • Secure API keys using environment variables or a secret manager.
  • Validate and sanitize tool inputs to avoid command injection.
  • Limit what tools can access (e.g. file system, network).
  • Use moderation (Module 7) to filter harmful content.
Treat any model output as untrusted input. Tools that perform side-effects (sending emails, modifying files, making payments) must have extra validation layers.

Module 9 – Advanced Topics & Next Steps (Optional)

This final module offers directions for deeper specialization. Choose the areas that best fit your goals.

9.1 Advanced Retrieval

Beyond basic vector search:

  • Hybrid search – combine BM25 (keyword) with dense vectors.
  • Reranking – use a cross-encoder or LLM to rerank retrieved chunks.
  • Hierarchical indexing – parent/child documents, multi-vector per document.

In LangChain, you can often swap out the retriever with minimal changes to your RAG chain/graph.

9.2 Domain-Specific Agents

Examples:

  • Code agents – tools to read/write files, run tests, and edit code.
  • Data analysis agents – tools over Python, pandas, SQL databases.
  • DevOps agents – tools to inspect logs, metrics, and configs.

The patterns you learned in Modules 3 and 7 apply: define tools carefully, give the agent a clear role, and use LangGraph to orchestrate multi-step flows.

9.3 Performance & Cost Tuning

Practical tips:

  • Use smaller, cheaper models where quality allows; reserve larger models for critical steps.
  • Reduce context size by improving retrieval and prompt design.
  • Cache embeddings and model outputs where appropriate.
  • Run steps in parallel when they don’t depend on each other (async and LCEL parallelism).

9.4 Reading Source Code

To deeply understand LangChain and LangGraph, spend time reading selected parts of their source code:

  • Core runnable and chain implementations.
  • Retrievers and vector store integrations.
  • StateGraph and execution engine in LangGraph.

Reading real-world Python code is a powerful way to level up your skills.

9.5 Staying Up to Date

The LLM ecosystem changes fast. To stay current:

  • Follow LangChain and LangGraph release notes.
  • Study example repositories and community projects.
  • Periodically revisit your prompts, tools, and graphs as new models appear.
You now have a full stack: from basic model calls to orchestrated, production-style applications. The next step is to build something real for your own use case and iterate based on real feedback.