A shared control plane exposing common tools and pipeline dispatch over HTTP, so all agentic subsystems (Research/Outreach agent, Agentic‑RAG, Agentic‑Coding) reuse the same toolbox and can be orchestrated from one place.
/pipeline/{name}
/pipeline/coding/stream
) and RAG (/pipeline/rag/ask
)/llm/{provider}
(OpenAI, Claude, Gemini)/llm/summarize
for short summaries/search
(DuckDuckGo), /browse
(fetch + extract), /research
(search + fetch top results)/kb/add
and /kb/search
/fs/write
and /fs/read
under data/agent_output
flowchart LR
%% Clients
subgraph Clients
SDK[SDKs / Apps]
CLI[CLI]
UI[Browser UI]
end
%% MCP Core
subgraph MCP[Control Plane]
API[HTTP API]
REG[Pipeline Registry]
LLM[LLM Adapter]
WEB[Web Tools]
KB[KB Tools]
FS[FS Sandbox]
end
%% Pipelines
subgraph Pipelines
ROOT[Research & Outreach]
RAG[RAG]
COD[Coding]
DAT[Data]
end
%% Client → API
SDK --> API
CLI --> API
UI --> API
%% API → MCP internals
API --> REG
API --> LLM
API --> WEB
API --> KB
API --> FS
%% Registry → Pipelines
REG --> ROOT
REG --> RAG
REG --> COD
REG --> DAT
sequenceDiagram
participant App as Client
participant MCP as MCP API
participant COD as Coding Pipeline
App->>MCP: POST /pipeline/coding/stream
MCP->>COD: run_pipeline_stream(...)
COD-->>MCP: events (log, done)
MCP-->>App: SSE (log, done)
sequenceDiagram
participant App as Client
participant MCP as MCP API
participant RAG as RAG Pipeline
App->>MCP: POST /pipeline/rag/ask
MCP->>RAG: run_rag_stream(...)
RAG-->>MCP: events (log, answer, sources, done)
MCP-->>App: SSE (log, answer, sources, done)
Run in‑process for quick checks:
python - <<'PY'
from mcp import MCPServer
from fastapi.testclient import TestClient
srv = MCPServer(); client = TestClient(srv.app)
print(client.get('/status').json())
PY
Serve as an ASGI app:
uvicorn mcp.server:create_app --factory --reload
# Then: http://127.0.0.1:8000/pipelines
/pipeline/{name}
— call a registered handler { task }
/pipelines
— list available names/pipeline/coding/stream
— stream logs/results (SSE)/pipeline/rag/ask
— stream answer/sources (SSE)/llm/{provider}
— { prompt, model? }
/llm/summarize
— { text, provider?, model? }
/search?q=&max_results=
/browse?url=
/research?q=&max_results=
/kb/add
— { id?, text, metadata? }
/kb/search?q=&k=
/fs/write
— { path, content }
/fs/read?path=
Register an in‑process pipeline and call it:
from mcp import MCPServer
from fastapi.testclient import TestClient
srv = MCPServer()
srv.register("echo", lambda task: {"echo": task})
client = TestClient(srv.app)
print(client.post("/pipeline/echo", json={"task": "hi"}).json())
SSE adapter (Coding):
curl -N -X POST http://127.0.0.1:8000/pipeline/coding/stream \
-H 'Content-Type: application/json' \
-d '{"repo": "/path/to/repo", "task": "Add pagination"}'
SSE adapter (RAG):
curl -N -X POST http://127.0.0.1:8000/pipeline/rag/ask \
-H 'Content-Type: application/json' \
-d '{"question": "Summarize topic X"}'
Web + KB + FS:
curl 'http://127.0.0.1:8000/search?q=python&max_results=2'
curl 'http://127.0.0.1:8000/browse?url=https://example.com'
curl -X POST 'http://127.0.0.1:8000/kb/add' -H 'Content-Type: application/json' \
-d '{"text":"notes","metadata":{"tags":["kb"]}}'
curl 'http://127.0.0.1:8000/kb/search?q=notes&k=2'
curl -X POST 'http://127.0.0.1:8000/fs/write' -H 'Content-Type: application/json' \
-d '{"path":"demo/out.txt","content":"hello"}'
curl 'http://127.0.0.1:8000/fs/read?path=demo/out.txt'
data/agent_output
.mcp/
README.md # This guide
__init__.py # Entry (MCPServer)
server.py # FastAPI app + endpoints
schemas.py # Pydantic request models
tools/
web.py # search/fetch helpers
kb.py # KB add/search (Chroma-backed via root project)
files.py # Safe read/write under data/agent_output
requirements.txt
(FastAPI, httpx, trafilatura, bs4, duckduckgo-search, etc.)./llm/*
endpoints:
OPENAI_API_KEY
, ANTHROPIC_API_KEY
, GOOGLE_API_KEY
python -m venv .venv
. .venv/bin/activate
pip install -U pip
pip install -r requirements.txt
# Run MCP as ASGI app
uvicorn mcp.server:create_app --factory --host 0.0.0.0 --port 8080 --reload
# Explore
curl -s http://127.0.0.1:8080/pipelines | jq .
To run inside the existing Docker image (which defaults to the root research server), override the command:
docker build -t agentic-ai .
docker run -p 8080:8080 --rm \
-e OPENAI_API_KEY=sk-... \
agentic-ai \
python -m uvicorn mcp.server:create_app --factory --host 0.0.0.0 --port 8080
TypeScript (Node 18+):
import { AgenticAIClient } from "../clients/ts/src/client";
const c = new AgenticAIClient({ baseUrl: "http://127.0.0.1:8080" });
// LLM
const sum = await fetch(`${c["base"]}/llm/summarize`, { method: 'POST', headers: {'Content-Type':'application/json'}, body: JSON.stringify({ text: 'Long text here', provider: 'openai' })}).then(r=>r.json());
// Coding pipeline (SSE)
await c.codingStream({ repo: "/path/to/repo", task: "Add pagination", onEvent: ev => console.log(ev.event, ev.data) });
// RAG pipeline (SSE)
await c.ragAskStream({ question: "Summarize\n this topic", onEvent: ev => console.log(ev.event, ev.data) });
Python (async):
import anyio, httpx
async def main():
async with httpx.AsyncClient(base_url="http://127.0.0.1:8080", timeout=60.0) as client:
# LLM summarize
r = await client.post("/llm/summarize", json={"text":"Long text...","provider":"openai"})
print(r.json())
# Coding stream
r = await client.post("/pipeline/coding/stream", json={"repo": "/repo", "task": "Implement X"})
async for chunk in r.aiter_text():
for block in chunk.split("\n\n"):
if not block.strip(): continue
ev = next((ln[6:].strip() for ln in block.splitlines() if ln.startswith("event:")), None)
data = next((ln[5:] for ln in block.splitlines() if ln.startswith("data:")), None)
if ev and data: print(ev, data)
anyio.run(main)
404
— unknown pipeline (/pipeline/{name}
not registered)400
— invalid arguments (e.g., missing question
, unknown LLM provider)500
— upstream provider/tool failures (network, parsing)Responses are always JSON (or SSE blocks with event:
+ data:
), with clear detail
on error.
mcp/tools/<name>.py
with small, testable functions.mcp/server.py
(keep input/output JSON minimal and consistent).srv.register("my_pipeline", lambda task: {...})
.data/agent_output
to avoid accidental writes elsewhere./llm/*
and streaming endpoints.