Agentic-AI-Pipeline

Agentic RAG Pipeline (with Google Gemini LLM)

An end-to-end, production-ready Agentic RAG pipeline powered by Google Gemini (planning, writing, critique), FAISS (vector search), Google Programmable Search (hybrid web retrieval), and a simple, file-backed session memory. It does intent recognition, task decomposition, dynamic retrieval planning, multi-agent verification, tool/API calls, and guardrailed finalization.

Python FastAPI Uvicorn LangGraph LangChain OpenAI Anthropic ChromaDB SQLite DuckDuckGo SQLAlchemy Pydantic Pytest Ruff SSE Makefile GitHub Actions Code Quality Open Source Shell AWS Docker Ansible


Contents


What you get


Architecture

flowchart TD
  U[User] --> IR[Intent Router - Flash]
  IR --> PL[Planner - Pro]
  PL --> RP[Retrieval Planner - Pro]
  RP --> R1[VectorRetriever - FAISS]
  RP --> R2[WebRetriever - Google CSE + Fetch]
  R1 --> W[Writer - Pro]
  R2 --> W
  W --> C[Critic - Pro]
  C -->|follow-ups| RP
  W --> G[Guardrails]
  G --> A[Answer + Evidence]
  PL -. session .-> M[(File-backed Memory)]
User
  │
  ▼
Intent Router (Gemini Flash)
  │  JSON: intents, safety, urgency
  ▼
Planner / Decomposer (Gemini Pro)
  │  JSON: sub-goals with sources & done-tests
  ├─────────────┐
  │             │
  ▼             ▼
Retrieval Planner (Gemini Pro)     Memory (session)
  │  JSON: diverse queries, k
  ▼
Retrievers (parallel per query)
  ├─ VectorRetriever (FAISS)
  └─ WebRetriever (Google CSE + page reader)
  │
  ▼
Writer / Synthesizer (Gemini Pro)
  │  JSON: {status, draft, missing}
  ▼
Critic / Verifier (Gemini Pro)
  │  JSON: {ok, issues, followup_queries}
  ├─(if gaps)→ targeted re-retrieval → Writer
  ▼
Guardrails (PII masking)
  ▼
Final Answer + Evidence Trace

Do agents share the same LLM instance? Each agent runs its own LLM session (distinct system prompt, temperature, token budget) while pointing to the same Gemini family (e.g., 1.5 Pro/Flash). This isolates roles, enables parallelism, and simplifies telemetry/cost control.


Prerequisites


Install

python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install --upgrade pip
pip install google-generativeai faiss-cpu httpx requests beautifulsoup4 pydantic python-dotenv

Configure

  1. Environment variables (create a .env in the project root):
# Required
GOOGLE_API_KEY=your_gemini_api_key

# Optional (enable web retrieval)
CSE_API_KEY=your_google_cse_key
CSE_ENGINE_ID=your_google_cse_engine_id

# Optional (where to ingest local docs)
CORPUS_DIR=corpus
  1. Corpus (optional but recommended): Drop .txt or .md files into corpus/. They’ll be chunked and embedded on startup.

Environment variables quick reference

Name Required Purpose
GOOGLE_API_KEY Yes Gemini API key
CSE_API_KEY No Google Programmable Search API key
CSE_ENGINE_ID No Google CSE Engine ID
CORPUS_DIR No Directory of local documents to index

Run

python app.py

You’ll see:

[ingest] Loading corpus from: corpus
[ingest] Added N chunks.
[web] Google Programmable Search enabled.   # if keys provided

Then type questions at the prompt:

>>> Compare the two documents in the corpus and list actionable next steps.

The system will plan, retrieve (vector + web if enabled), synthesize, critique, and output a grounded answer with a Sources list.


How it works (step-by-step)

  1. Intent Router classifies the task (answer/plan/code/etc.) and flags safety concerns.
  2. Planner breaks the request into sub-goals with suggested sources (vector/web/db/tools) and a completion test.
  3. Retrieval Planner generates diverse queries for each sub-goal and sets k.
  4. Retrievers run hybrid search:

    • Vector: FAISS over your local corpus.
    • Web: Google CSE + HTML fetch + text extraction (optional).
  5. Writer produces a grounded draft using only the provided evidence and adds bracketed citations.
  6. Critic checks for unsupported claims or gaps and proposes follow-up queries; the orchestrator can re-retrieve and revise the draft once.
  7. Guardrails redact emails/phones (extendable).
  8. Answer + evidence trace is returned; session memory is updated.

Project structure

agentic-rag/
  app.py
  core/
    llm.py           # Gemini client, embeddings, JSON helpers
    vector.py        # FAISS index + corpus ingestion
    tools.py         # Web search + page fetcher
    memory.py        # File-backed session memory
    structs.py       # Pydantic data contracts
  agents/
    base.py
    intent.py
    planner.py
    retrieval_planner.py
    retrievers.py
    writer.py
    critic.py
    guardrails.py
  graph/
    orchestrator.py  # The control flow / loop
  eval/
    harness.py       # Optional quick smoke tests
  corpus/            # (your .txt/.md docs)
  .session_memory/   # (generated)

Agents (roles & prompts)

LLM instances: each agent uses its own Gemini session & parameters. Same model family (e.g., gemini-1.5-pro), different prompts and budgets.


Corpus ingestion

Tip: Create a file like corpus/knowledge.md with key facts, glossaries, or SOPs for stronger grounding.


Hybrid retrieval (vector + web)

Tuning knobs (see graph/orchestrator.py):


Memory & context


Quality control


Tuning & production notes


Troubleshooting

GOOGLE_API_KEY is required Set it in .env or your shell environment.

Web search always disabled Set both CSE_API_KEY and CSE_ENGINE_ID (and ensure your CSE is configured to search the web or desired domains).

Empty/weak answers

“JSON parsing” warnings (rare) The pipeline is resilient and attempts to coerce malformed JSON. If it recurs, lower temperatures.

Slow runs Disable web, reduce k, or remove the critic loop in orchestrator.py.


FAQ

Q: Does each agent use an instance of the LLM? A: Yes. Each agent maintains its own Gemini session & config (system prompt, temperature, token limits). They typically use the same base model (Gemini 1.5 Pro for planning/writing/critique; Gemini 1.5 Flash for routing/guardrails).

Q: Can I run without web search? A: Yes. The system runs vector-only if CSE_API_KEY/CSE_ENGINE_ID aren’t set.

Q: How do I add a custom tool/API (e.g., SQL, Jira, GitHub)? A: Add a client in core/tools.py, create a dedicated Agent (e.g., DataAgent) with restricted prompts/permissions, and call it from the orchestrator based on sub-goal sources or intent routing.

Q: How do I swap FAISS for pgvector/Pinecone? A: Replace FAISSIndex with your client; keep the add()/search() signatures. Most adapters are a few dozen lines.

Q: How do I change models or parameters? A: Edit core/llm.py (GEMINI_PRO, GEMINI_FLASH, temperatures, max tokens). You can also route some roles to Flash for cost/latency.


Quick commands

# 1) Install deps
pip install google-generativeai faiss-cpu httpx requests beautifulsoup4 pydantic python-dotenv

# 2) Configure keys
echo "GOOGLE_API_KEY=sk-..." >> .env
# Optional web:
echo "CSE_API_KEY=..." >> .env
echo "CSE_ENGINE_ID=..." >> .env

# 3) Add local docs (optional)
mkdir -p corpus
echo "Your internal SOPs or notes go here." > corpus/notes.md

# 4) Run
python app.py

This pipeline is designed to be a solid foundation for building advanced, agentic RAG systems with Gemini. It can be extended with more agents, tools, and retrieval methods as needed. Happy coding!