An end-to-end, production-ready Agentic RAG pipeline powered by Google Gemini (planning, writing, critique), FAISS (vector search), Google Programmable Search (hybrid web retrieval), and a simple, file-backed session memory. It does intent recognition, task decomposition, dynamic retrieval planning, multi-agent verification, tool/API calls, and guardrailed finalization.
flowchart TD
U[User]
IR["Intent Router<br/>(Gemini Flash)<br/><br/>JSON: intents, safety, urgency"]
PD["Planner / Decomposer<br/>(Gemini Pro)<br/><br/>JSON: sub-goals with sources & done-tests"]
RP["Retrieval Planner<br/>(Gemini Pro)<br/><br/>JSON: diverse queries, k"]
MEM["Memory<br/>(session)"]
RET["Retrievers<br/>(parallel per query)"]
VR["VectorRetriever<br/>(FAISS)"]
WR["WebRetriever<br/>(Google CSE + page reader)"]
WRT["Writer / Synthesizer<br/>(Gemini Pro)<br/><br/>JSON: {status, draft, missing}"]
CRT["Critic / Verifier<br/>(Gemini Pro)<br/><br/>JSON: {ok, issues, followup_queries}"]
GR["Guardrails<br/>(PII masking)"]
FA["Final Answer<br/>+ Evidence Trace"]
%% Main flow
U --> IR --> PD
PD --> RP
PD --> MEM
RP --> RET
RET --> VR
RET --> WR
VR --> WRT
WR --> WRT
MEM --> WRT
WRT --> CRT
CRT -- if gaps --> RP
CRT --> GR --> FA
Do agents share the same LLM instance? Each agent runs its own LLM session (distinct system prompt, temperature, token budget) while pointing to the same Gemini family (e.g., 1.5 Pro/Flash). This isolates roles, enables parallelism, and simplifies telemetry/cost control.
Accounts/keys:
GOOGLE_API_KEY
(required) for Gemini.CSE_API_KEY
and CSE_ENGINE_ID
for Google Programmable Search (web retrieval).pypdf
or pdfminer.six
for PDF extractionpython-docx
for DOCX extractionpillow
+ pytesseract
for image OCRpython -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install --upgrade pip
pip install google-generativeai faiss-cpu httpx requests beautifulsoup4 pydantic python-dotenv
# Optional (multimodal ingest)
pip install pypdf pdfminer.six python-docx pillow pytesseract
.env
in the project root):# Required
GOOGLE_API_KEY=your_gemini_api_key
# Optional (enable web retrieval)
CSE_API_KEY=your_google_cse_key
CSE_ENGINE_ID=your_google_cse_engine_id
# Optional (where to ingest local docs)
CORPUS_DIR=corpus
.txt
or .md
files into corpus/
. They’ll be chunked and embedded on startup.Environment variables quick reference
Name | Required | Purpose |
---|---|---|
GOOGLE_API_KEY | Yes | Gemini API key |
CSE_API_KEY | No | Google Programmable Search API key |
CSE_ENGINE_ID | No | Google CSE Engine ID |
CORPUS_DIR | No | Directory of local documents to index |
python app.py
You’ll see:
[ingest] Loading corpus from: corpus
[ingest] Added N chunks.
[web] Google Programmable Search enabled. # if keys provided
Then type questions at the prompt:
>>> Compare the two documents in the corpus and list actionable next steps.
The system will plan, retrieve (vector + web if enabled), synthesize, critique, and output a grounded answer with a Sources list.
A zero-build Vue UI is included and mounted by the root FastAPI server.
uvicorn agentic_ai.app:app --reload
# Then open http://127.0.0.1:8000/rag
Features:
Files:
Agentic-RAG-Pipeline/ui/index.html
Agentic-RAG-Pipeline/ui/app.js
Agentic-RAG-Pipeline/ui/styles.css
Endpoints (served by root FastAPI):
GET /api/rag/new_session
→ { "session_id": "uuid" }
POST /api/rag/ask
(SSE) with { "session_id": "uuid", "question": "..." }
log
, answer
(markdown), sources
(JSON array), done
.POST /api/rag/ingest_text
to add text or a URL
{ "text": "...", "id?": "doc-id", "title?": "...", "tags?": [ ... ] }
{ "url": "https://...", "title?": "...", "tags?": [ ... ] }
POST /api/rag/ingest_file
(multipart)
file
, title?
, tags?
(comma-separated)All ingestion routes chunk text and add it to the in-memory FAISS index with metadata for later retrieval.
Call the RAG endpoints from the monorepo SDKs:
import { AgenticAIClient } from "../clients/ts/src/client";
const c = new AgenticAIClient({ baseUrl: "http://127.0.0.1:8000" });
const { session_id } = await c.ragNewSession();
await c.ragAskStream({ session_id, question: "Summarize topic X", onEvent: (ev) => console.log(ev.event, ev.data) });
await c.ragIngestText({ url: "https://example.com" });
from clients.python.agentic_ai_client import AgenticAIClient
import anyio
async def run():
async with AgenticAIClient("http://127.0.0.1:8000") as c:
sess = await c.rag_new_session()
await c.rag_ask_stream("Summarize topic X", session_id=sess["session_id"], on_event=lambda ev, d: print(ev, d))
await c.rag_ingest_text(url="https://example.com")
anyio.run(run)
See root README “Client SDKs” for more capabilities and examples.
k
.Retrievers run hybrid search:
agentic-rag/
app.py
services.py # UI/API glue: shared index, ingestion, streaming
core/
llm.py # Gemini client, embeddings, JSON helpers
vector.py # FAISS index + corpus ingestion
tools.py # Web search + page fetcher
memory.py # File-backed session memory
structs.py # Pydantic data contracts
agents/
base.py
intent.py
planner.py
retrieval_planner.py
retrievers.py
writer.py
critic.py
guardrails.py
graph/
orchestrator.py # The control flow / loop
eval/
harness.py # Optional quick smoke tests
corpus/ # (your .txt/.md docs)
.session_memory/ # (generated)
ui/ # Browser UI mounted at /rag
index.html
app.js
styles.css
{intents[], safety[], urgency, notes}
.sources
and done_test
.{queries[], k}
per sub-goal.{status, draft, missing}
with bracketed citations [ #1 ]
.{ok, issues, followup_queries}
; triggers one targeted repair loop.LLM instances: each agent uses its own Gemini session & parameters. Same model family (e.g., gemini-1.5-pro
), different prompts and budgets.
.txt
/ .md
files in corpus/
.text-embedding-004
(768-dim). FAISS uses inner-product on normalized vectors.Tip: Create a file like corpus/knowledge.md
with key facts, glossaries, or SOPs for stronger grounding.
Web (optional) broadens coverage and provides fresh/public context:
CSE_API_KEY
and CSE_ENGINE_ID
.Tuning knobs (see graph/orchestrator.py
):
k
per sub-goal from Retrieval Planner (bounded to 4–12).(uri, chunk_id)
)..session_memory/SESSION_ID.jsonl
.
The orchestrator appends user and assistant messages and can generate a short summary window for context.done_test
. The critic evaluates whether the draft meets it; if not, it proposes follow-ups.Temperatures:
0.1–0.3
for determinism.~0.1
to keep follow-ups targeted.Latency & cost:
Observability:
Security:
Scaling:
GOOGLE_API_KEY is required
Set it in .env
or your shell environment.
Web search always disabled
Set both CSE_API_KEY
and CSE_ENGINE_ID
(and ensure your CSE is configured to search the web or desired domains).
Empty/weak answers
corpus/
.k
or chunk size in core/vector.py
.“JSON parsing” warnings (rare) The pipeline is resilient and attempts to coerce malformed JSON. If it recurs, lower temperatures.
Slow runs
Disable web, reduce k
, or remove the critic loop in orchestrator.py
.
Q: Does each agent use an instance of the LLM? A: Yes. Each agent maintains its own Gemini session & config (system prompt, temperature, token limits). They typically use the same base model (Gemini 1.5 Pro for planning/writing/critique; Gemini 1.5 Flash for routing/guardrails).
Q: Can I run without web search?
A: Yes. The system runs vector-only if CSE_API_KEY
/CSE_ENGINE_ID
aren’t set.
Q: How do I add a custom tool/API (e.g., SQL, Jira, GitHub)?
A: Add a client in core/tools.py
, create a dedicated Agent (e.g., DataAgent
) with restricted prompts/permissions, and call it from the orchestrator based on sub-goal sources
or intent routing.
Q: How do I swap FAISS for pgvector/Pinecone?
A: Replace FAISSIndex
with your client; keep the add()
/search()
signatures. Most adapters are a few dozen lines.
Q: How do I change models or parameters?
A: Edit core/llm.py
(GEMINI_PRO
, GEMINI_FLASH
, temperatures, max tokens). You can also route some roles to Flash
for cost/latency.
# 1) Install deps
pip install google-generativeai faiss-cpu httpx requests beautifulsoup4 pydantic python-dotenv
# 2) Configure keys
echo "GOOGLE_API_KEY=sk-..." >> .env
# Optional web:
echo "CSE_API_KEY=..." >> .env
echo "CSE_ENGINE_ID=..." >> .env
# 3) Add local docs (optional)
mkdir -p corpus
echo "Your internal SOPs or notes go here." > corpus/notes.md
# 4) Run
python app.py
This pipeline is designed to be a solid foundation for building advanced, agentic RAG systems with Gemini. It can be extended with more agents, tools, and retrieval methods as needed. Happy coding!