Agentic-AI-Pipeline

MCP Server (Monorepo Control Plane)

Python FastAPI Uvicorn SSE DuckDuckGo BeautifulSoup Trafilatura OpenAI Anthropic Google%20Gemini

A shared control plane exposing common tools and pipeline dispatch over HTTP, so all agentic subsystems (Research/Outreach agent, Agentic‑RAG, Agentic‑Coding) reuse the same toolbox and can be orchestrated from one place.

Contents

What You Get

Architecture

flowchart LR
  %% Clients
  subgraph Clients
    SDK[SDKs / Apps]
    CLI[CLI]
    UI[Browser UI]
  end

  %% MCP Core
  subgraph MCP[Control Plane]
    API[HTTP API]
    REG[Pipeline Registry]
    LLM[LLM Adapter]
    WEB[Web Tools]
    KB[KB Tools]
    FS[FS Sandbox]
  end

  %% Pipelines
  subgraph Pipelines
    ROOT[Research & Outreach]
    RAG[RAG]
    COD[Coding]
    DAT[Data]
  end

  %% Client → API
  SDK --> API
  CLI --> API
  UI  --> API

  %% API → MCP internals
  API --> REG
  API --> LLM
  API --> WEB
  API --> KB
  API --> FS

  %% Registry → Pipelines
  REG --> ROOT
  REG --> RAG
  REG --> COD
  REG --> DAT

Sequence Examples

sequenceDiagram
  participant App as Client
  participant MCP as MCP API
  participant COD as Coding Pipeline

  App->>MCP: POST /pipeline/coding/stream
  MCP->>COD: run_pipeline_stream(...)
  COD-->>MCP: events (log, done)
  MCP-->>App: SSE (log, done)
sequenceDiagram
  participant App as Client
  participant MCP as MCP API
  participant RAG as RAG Pipeline

  App->>MCP: POST /pipeline/rag/ask
  MCP->>RAG: run_rag_stream(...)
  RAG-->>MCP: events (log, answer, sources, done)
  MCP-->>App: SSE (log, answer, sources, done)

Quick Start

Run in‑process for quick checks:

python - <<'PY'
from mcp import MCPServer
from fastapi.testclient import TestClient
srv = MCPServer(); client = TestClient(srv.app)
print(client.get('/status').json())
PY

Serve as an ASGI app:

uvicorn mcp.server:create_app --factory --reload
# Then:  http://127.0.0.1:8000/pipelines

Endpoints

Examples

Register an in‑process pipeline and call it:

from mcp import MCPServer
from fastapi.testclient import TestClient

srv = MCPServer()
srv.register("echo", lambda task: {"echo": task})
client = TestClient(srv.app)
print(client.post("/pipeline/echo", json={"task": "hi"}).json())

SSE adapter (Coding):

curl -N -X POST http://127.0.0.1:8000/pipeline/coding/stream \
  -H 'Content-Type: application/json' \
  -d '{"repo": "/path/to/repo", "task": "Add pagination"}'

SSE adapter (RAG):

curl -N -X POST http://127.0.0.1:8000/pipeline/rag/ask \
  -H 'Content-Type: application/json' \
  -d '{"question": "Summarize topic X"}'

Web + KB + FS:

curl 'http://127.0.0.1:8000/search?q=python&max_results=2'
curl 'http://127.0.0.1:8000/browse?url=https://example.com'
curl -X POST 'http://127.0.0.1:8000/kb/add' -H 'Content-Type: application/json' \
     -d '{"text":"notes","metadata":{"tags":["kb"]}}'
curl 'http://127.0.0.1:8000/kb/search?q=notes&k=2'
curl -X POST 'http://127.0.0.1:8000/fs/write' -H 'Content-Type: application/json' \
     -d '{"path":"demo/out.txt","content":"hello"}'
curl 'http://127.0.0.1:8000/fs/read?path=demo/out.txt'

Security & Limits

Project Structure

mcp/
  README.md        # This guide
  __init__.py      # Entry (MCPServer)
  server.py        # FastAPI app + endpoints
  schemas.py       # Pydantic request models
  tools/
    web.py         # search/fetch helpers
    kb.py          # KB add/search (Chroma-backed via root project)
    files.py       # Safe read/write under data/agent_output

Requirements

Install & Run

python -m venv .venv
. .venv/bin/activate
pip install -U pip
pip install -r requirements.txt

# Run MCP as ASGI app
uvicorn mcp.server:create_app --factory --host 0.0.0.0 --port 8080 --reload

# Explore
curl -s http://127.0.0.1:8080/pipelines | jq .

To run inside the existing Docker image (which defaults to the root research server), override the command:

docker build -t agentic-ai .
docker run -p 8080:8080 --rm \
  -e OPENAI_API_KEY=sk-... \
  agentic-ai \
  python -m uvicorn mcp.server:create_app --factory --host 0.0.0.0 --port 8080

SDK Examples

TypeScript (Node 18+):

import { AgenticAIClient } from "../clients/ts/src/client";
const c = new AgenticAIClient({ baseUrl: "http://127.0.0.1:8080" });

// LLM
const sum = await fetch(`${c["base"]}/llm/summarize`, { method: 'POST', headers: {'Content-Type':'application/json'}, body: JSON.stringify({ text: 'Long text here', provider: 'openai' })}).then(r=>r.json());

// Coding pipeline (SSE)
await c.codingStream({ repo: "/path/to/repo", task: "Add pagination", onEvent: ev => console.log(ev.event, ev.data) });

// RAG pipeline (SSE)
await c.ragAskStream({ question: "Summarize\n this topic", onEvent: ev => console.log(ev.event, ev.data) });

Python (async):

import anyio, httpx

async def main():
    async with httpx.AsyncClient(base_url="http://127.0.0.1:8080", timeout=60.0) as client:
        # LLM summarize
        r = await client.post("/llm/summarize", json={"text":"Long text...","provider":"openai"})
        print(r.json())

        # Coding stream
        r = await client.post("/pipeline/coding/stream", json={"repo": "/repo", "task": "Implement X"})
        async for chunk in r.aiter_text():
            for block in chunk.split("\n\n"):
                if not block.strip(): continue
                ev = next((ln[6:].strip() for ln in block.splitlines() if ln.startswith("event:")), None)
                data = next((ln[5:] for ln in block.splitlines() if ln.startswith("data:")), None)
                if ev and data: print(ev, data)

anyio.run(main)

Error Handling & Codes

Responses are always JSON (or SSE blocks with event: + data:), with clear detail on error.

Extending MCP

Observability & Security