An end-to-end, production-ready Agentic RAG pipeline powered by Google Gemini (planning, writing, critique), FAISS (vector search), Google Programmable Search (hybrid web retrieval), and a simple, file-backed session memory. It does intent recognition, task decomposition, dynamic retrieval planning, multi-agent verification, tool/API calls, and guardrailed finalization.
flowchart TD
U[User]
IR["Intent Router<br/>(Gemini Flash)<br/><br/>JSON: intents, safety, urgency"]
PD["Planner / Decomposer<br/>(Gemini Pro)<br/><br/>JSON: sub-goals with sources & done-tests"]
RP["Retrieval Planner<br/>(Gemini Pro)<br/><br/>JSON: diverse queries, k"]
MEM["Memory<br/>(session)"]
RET["Retrievers<br/>(parallel per query)"]
VR["VectorRetriever<br/>(FAISS)"]
WR["WebRetriever<br/>(Google CSE + page reader)"]
WRT["Writer / Synthesizer<br/>(Gemini Pro)<br/><br/>JSON: {status, draft, missing}"]
CRT["Critic / Verifier<br/>(Gemini Pro)<br/><br/>JSON: {ok, issues, followup_queries}"]
GR["Guardrails<br/>(PII masking)"]
FA["Final Answer<br/>+ Evidence Trace"]
%% Main flow
U --> IR --> PD
PD --> RP
PD --> MEM
RP --> RET
RET --> VR
RET --> WR
VR --> WRT
WR --> WRT
MEM --> WRT
WRT --> CRT
CRT -- if gaps --> RP
CRT --> GR --> FA
Do agents share the same LLM instance? Each agent runs its own LLM session (distinct system prompt, temperature, token budget) while pointing to the same Gemini family (e.g., 1.5 Pro/Flash). This isolates roles, enables parallelism, and simplifies telemetry/cost control.
Accounts/keys:
GOOGLE_API_KEY (required) for Gemini.CSE_API_KEY and CSE_ENGINE_ID for Google Programmable Search (web retrieval).pypdf or pdfminer.six for PDF extractionpython-docx for DOCX extractionpillow + pytesseract for image OCRpython -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install --upgrade pip
pip install google-generativeai faiss-cpu httpx requests beautifulsoup4 pydantic python-dotenv
# Optional (multimodal ingest)
pip install pypdf pdfminer.six python-docx pillow pytesseract
.env in the project root):# Required
GOOGLE_API_KEY=your_gemini_api_key
# Optional (enable web retrieval)
CSE_API_KEY=your_google_cse_key
CSE_ENGINE_ID=your_google_cse_engine_id
# Optional (where to ingest local docs)
CORPUS_DIR=corpus
.txt or .md files into corpus/. They’ll be chunked and embedded on startup.Environment variables quick reference
| Name | Required | Purpose |
|---|---|---|
| GOOGLE_API_KEY | Yes | Gemini API key |
| CSE_API_KEY | No | Google Programmable Search API key |
| CSE_ENGINE_ID | No | Google CSE Engine ID |
| CORPUS_DIR | No | Directory of local documents to index |
python app.py
You’ll see:
[ingest] Loading corpus from: corpus
[ingest] Added N chunks.
[web] Google Programmable Search enabled. # if keys provided
Then type questions at the prompt:
>>> Compare the two documents in the corpus and list actionable next steps.
The system will plan, retrieve (vector + web if enabled), synthesize, critique, and output a grounded answer with a Sources list.
A zero-build Vue UI is included and mounted by the root FastAPI server.
uvicorn agentic_ai.app:app --reload
# Then open http://127.0.0.1:8000/rag
Features:
Files:
Agentic-RAG-Pipeline/ui/index.htmlAgentic-RAG-Pipeline/ui/app.jsAgentic-RAG-Pipeline/ui/styles.cssEndpoints (served by root FastAPI):
GET /api/rag/new_session → { "session_id": "uuid" }POST /api/rag/ask (SSE) with { "session_id": "uuid", "question": "..." }
log, answer (markdown), sources (JSON array), done.POST /api/rag/ingest_text to add text or a URL
{ "text": "...", "id?": "doc-id", "title?": "...", "tags?": [ ... ] }{ "url": "https://...", "title?": "...", "tags?": [ ... ] }POST /api/rag/ingest_file (multipart)
file, title?, tags? (comma-separated)All ingestion routes chunk text and add it to the in-memory FAISS index with metadata for later retrieval.
Call the RAG endpoints from the monorepo SDKs:
import { AgenticAIClient } from "../clients/ts/src/client";
const c = new AgenticAIClient({ baseUrl: "http://127.0.0.1:8000" });
const { session_id } = await c.ragNewSession();
await c.ragAskStream({ session_id, question: "Summarize topic X", onEvent: (ev) => console.log(ev.event, ev.data) });
await c.ragIngestText({ url: "https://example.com" });
from clients.python.agentic_ai_client import AgenticAIClient
import anyio
async def run():
async with AgenticAIClient("http://127.0.0.1:8000") as c:
sess = await c.rag_new_session()
await c.rag_ask_stream("Summarize topic X", session_id=sess["session_id"], on_event=lambda ev, d: print(ev, d))
await c.rag_ingest_text(url="https://example.com")
anyio.run(run)
See root README “Client SDKs” for more capabilities and examples.
k.Retrievers run hybrid search:
agentic-rag/
app.py
services.py # UI/API glue: shared index, ingestion, streaming
core/
llm.py # Gemini client, embeddings, JSON helpers
vector.py # FAISS index + corpus ingestion
tools.py # Web search + page fetcher
memory.py # File-backed session memory
structs.py # Pydantic data contracts
agents/
base.py
intent.py
planner.py
retrieval_planner.py
retrievers.py
writer.py
critic.py
guardrails.py
graph/
orchestrator.py # The control flow / loop
eval/
harness.py # Optional quick smoke tests
corpus/ # (your .txt/.md docs)
.session_memory/ # (generated)
ui/ # Browser UI mounted at /rag
index.html
app.js
styles.css
{intents[], safety[], urgency, notes}.sources and done_test.{queries[], k} per sub-goal.{status, draft, missing} with bracketed citations [ #1 ].{ok, issues, followup_queries}; triggers one targeted repair loop.LLM instances: each agent uses its own Gemini session & parameters. Same model family (e.g., gemini-1.5-pro), different prompts and budgets.
.txt / .md files in corpus/.text-embedding-004 (768-dim). FAISS uses inner-product on normalized vectors.Tip: Create a file like corpus/knowledge.md with key facts, glossaries, or SOPs for stronger grounding.
Web (optional) broadens coverage and provides fresh/public context:
CSE_API_KEY and CSE_ENGINE_ID.Tuning knobs (see graph/orchestrator.py):
k per sub-goal from Retrieval Planner (bounded to 4–12).(uri, chunk_id))..session_memory/SESSION_ID.jsonl.
The orchestrator appends user and assistant messages and can generate a short summary window for context.done_test. The critic evaluates whether the draft meets it; if not, it proposes follow-ups.Temperatures:
0.1–0.3 for determinism.~0.1 to keep follow-ups targeted.Latency & cost:
Observability:
Security:
Scaling:
GOOGLE_API_KEY is required
Set it in .env or your shell environment.
Web search always disabled
Set both CSE_API_KEY and CSE_ENGINE_ID (and ensure your CSE is configured to search the web or desired domains).
Empty/weak answers
corpus/.k or chunk size in core/vector.py.“JSON parsing” warnings (rare) The pipeline is resilient and attempts to coerce malformed JSON. If it recurs, lower temperatures.
Slow runs
Disable web, reduce k, or remove the critic loop in orchestrator.py.
Q: Does each agent use an instance of the LLM? A: Yes. Each agent maintains its own Gemini session & config (system prompt, temperature, token limits). They typically use the same base model (Gemini 1.5 Pro for planning/writing/critique; Gemini 1.5 Flash for routing/guardrails).
Q: Can I run without web search?
A: Yes. The system runs vector-only if CSE_API_KEY/CSE_ENGINE_ID aren’t set.
Q: How do I add a custom tool/API (e.g., SQL, Jira, GitHub)?
A: Add a client in core/tools.py, create a dedicated Agent (e.g., DataAgent) with restricted prompts/permissions, and call it from the orchestrator based on sub-goal sources or intent routing.
Q: How do I swap FAISS for pgvector/Pinecone?
A: Replace FAISSIndex with your client; keep the add()/search() signatures. Most adapters are a few dozen lines.
Q: How do I change models or parameters?
A: Edit core/llm.py (GEMINI_PRO, GEMINI_FLASH, temperatures, max tokens). You can also route some roles to Flash for cost/latency.
# 1) Install deps
pip install google-generativeai faiss-cpu httpx requests beautifulsoup4 pydantic python-dotenv
# 2) Configure keys
echo "GOOGLE_API_KEY=sk-..." >> .env
# Optional web:
echo "CSE_API_KEY=..." >> .env
echo "CSE_ENGINE_ID=..." >> .env
# 3) Add local docs (optional)
mkdir -p corpus
echo "Your internal SOPs or notes go here." > corpus/notes.md
# 4) Run
python app.py
This pipeline is designed to be a solid foundation for building advanced, agentic RAG systems with Gemini. It can be extended with more agents, tools, and retrieval methods as needed. Happy coding!