The AI Orchestrator is designed to be easily extensible. You can add support for new AI coding assistants by creating a new adapter that implements the standard interface.
graph TD
A[Orchestrator Core] --> B[Base Adapter Interface]
B --> C[Your Custom Adapter]
C --> D[AI Agent CLI]
sequenceDiagram
participant O as Orchestrator
participant A as Your Adapter
participant C as CLI Communicator
participant AI as AI Agent CLI
O->>A: execute(task, context)
A->>A: validate_input()
A->>A: build_command()
A->>C: execute_command(cmd)
C->>AI: spawn_process()
AI-->>C: response
C-->>A: raw_output
A->>A: parse_response()
A->>A: extract_files()
A-->>O: TaskResult
Create a new file in the adapters/ directory:
touch adapters/your_agent_adapter.py
from typing import Dict, Any, List, Optional
from adapters.base import BaseAdapter, TaskResult
from adapters.cli_communicator import CLICommunicator
from orchestrator.exceptions import AdapterError, ValidationError
import logging
import re
import json
logger = logging.getLogger(__name__)
class YourAgentAdapter(BaseAdapter):
"""
Adapter for Your AI Agent CLI.
This adapter integrates Your AI Agent with the orchestrator,
enabling it to participate in collaborative workflows.
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the adapter with configuration.
Args:
config: Agent configuration dictionary
"""
super().__init__(config)
self.name = "your_agent"
self.command = config.get("command", "your-agent-cli")
self.timeout = config.get("timeout", 300)
self.communicator = CLICommunicator()
logger.info(f"Initialized {self.name} adapter")
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
"""
Execute a task using Your AI Agent.
Args:
task: The task description
context: Optional context from previous tasks
Returns:
TaskResult containing output and generated files
Raises:
AdapterError: If execution fails
"""
try:
# Validate input
if not self.validate_input(task):
raise ValidationError(f"Invalid task: {task}")
# Build command
cmd = self._build_command(task, context)
# Execute with CLI communicator
logger.info(f"Executing task with {self.name}")
response = self.communicator.execute(
cmd,
timeout=self.timeout,
capture_output=True
)
# Parse response
parsed_output = self._parse_response(response.stdout)
# Extract generated files
files = self._extract_files(parsed_output)
# Validate response
if not self.validate_response(parsed_output):
raise AdapterError("Invalid response from agent")
return TaskResult(
agent=self.name,
output=parsed_output,
files=files,
success=True,
metadata={
"duration": response.duration,
"exit_code": response.returncode
}
)
except Exception as e:
logger.error(f"Error executing task with {self.name}: {e}")
return TaskResult(
agent=self.name,
output="",
files=[],
success=False,
error=str(e)
)
def validate_input(self, task: str) -> bool:
"""
Validate task input before execution.
Args:
task: Task description to validate
Returns:
True if valid, False otherwise
"""
if not task or not task.strip():
logger.error("Empty task provided")
return False
if len(task) > 10000: # Max task length
logger.error("Task too long")
return False
# Add agent-specific validation
# e.g., check for required keywords, format, etc.
return True
def validate_response(self, response: str) -> bool:
"""
Validate agent response.
Args:
response: Agent response to validate
Returns:
True if valid, False otherwise
"""
if not response:
return False
# Add agent-specific validation
# e.g., check for expected format, required fields, etc.
return True
def _build_command(self, task: str, context: Optional[Dict[str, Any]] = None) -> List[str]:
"""
Build CLI command for the agent.
Args:
task: Task description
context: Optional context from previous tasks
Returns:
Command as list of strings
"""
cmd = [self.command]
# Add agent-specific flags
cmd.extend(["--task", task])
# Add context if available
if context:
if "previous_output" in context:
cmd.extend(["--context", context["previous_output"]])
if "files" in context:
cmd.extend(["--files", ",".join(context["files"])])
# Add any additional flags
cmd.extend([
"--format", "json", # Request structured output
"--verbose"
])
return cmd
def _parse_response(self, raw_output: str) -> str:
"""
Parse agent's raw output into structured format.
Args:
raw_output: Raw output from agent CLI
Returns:
Parsed and formatted output
"""
try:
# If agent returns JSON
data = json.loads(raw_output)
return data.get("output", raw_output)
except json.JSONDecodeError:
# Fallback to raw output
return raw_output.strip()
def _extract_files(self, output: str) -> List[str]:
"""
Extract file paths from agent output.
Args:
output: Agent output to parse
Returns:
List of file paths mentioned in output
"""
files = []
# Look for common file path patterns
# Adjust regex based on your agent's output format
patterns = [
r'(?:Created|Generated|Wrote)\s+(?:file\s+)?["\']?([^\s"\']+\.\w+)["\']?',
r'```[\w]*\s*#\s*([^\n]+\.\w+)',
r'File:\s*([^\s]+\.\w+)'
]
for pattern in patterns:
matches = re.findall(pattern, output, re.MULTILINE)
files.extend(matches)
# Remove duplicates and validate
files = list(set(files))
files = [f for f in files if self._is_valid_filepath(f)]
return files
def _is_valid_filepath(self, filepath: str) -> bool:
"""Validate that a string looks like a valid file path."""
import os
# Check it's not too long, doesn't have dangerous characters, etc.
if len(filepath) > 255:
return False
if any(c in filepath for c in ['<', '>', '|', '\0']):
return False
return True
Edit adapters/__init__.py:
from adapters.your_agent_adapter import YourAgentAdapter
ADAPTER_REGISTRY = {
"claude": ClaudeAdapter,
"codex": CodexAdapter,
"gemini": GeminiAdapter,
"copilot": CopilotAdapter,
"your_agent": YourAgentAdapter, # Add your adapter
}
def get_adapter(agent_name: str, config: Dict[str, Any]) -> BaseAdapter:
"""Get adapter instance by name."""
adapter_class = ADAPTER_REGISTRY.get(agent_name)
if not adapter_class:
raise ValueError(f"Unknown agent: {agent_name}")
return adapter_class(config)
agents:
your_agent:
enabled: true
command: "your-agent-cli"
role: "implementation" # or "review", "refinement", etc.
timeout: 300
description: "Your AI Agent description"
# Agent-specific configuration
options:
model: "your-model-name"
temperature: 0.7
max_tokens: 2000
workflows:
with_your_agent:
- agent: "codex"
task: "implement"
- agent: "your_agent"
task: "enhance"
- agent: "gemini"
task: "review"
Create tests/test_your_agent_adapter.py:
import pytest
from unittest.mock import Mock, patch
from adapters.your_agent_adapter import YourAgentAdapter
class TestYourAgentAdapter:
"""Test suite for Your Agent Adapter."""
@pytest.fixture
def adapter(self):
"""Create adapter instance for testing."""
config = {
"command": "your-agent-cli",
"timeout": 300,
"role": "implementation"
}
return YourAgentAdapter(config)
def test_initialization(self, adapter):
"""Test adapter initializes correctly."""
assert adapter.name == "your_agent"
assert adapter.command == "your-agent-cli"
assert adapter.timeout == 300
def test_validate_input_valid(self, adapter):
"""Test input validation with valid input."""
assert adapter.validate_input("Create a function")
def test_validate_input_empty(self, adapter):
"""Test input validation rejects empty input."""
assert not adapter.validate_input("")
assert not adapter.validate_input(" ")
def test_build_command(self, adapter):
"""Test command building."""
task = "Create a function"
cmd = adapter._build_command(task)
assert "your-agent-cli" in cmd
assert "--task" in cmd
assert "Create a function" in cmd
@patch('adapters.your_agent_adapter.CLICommunicator')
def test_execute_success(self, mock_communicator, adapter):
"""Test successful task execution."""
# Mock CLI response
mock_response = Mock()
mock_response.stdout = '{"output": "Function created"}'
mock_response.returncode = 0
mock_response.duration = 1.5
mock_communicator.return_value.execute.return_value = mock_response
adapter.communicator = mock_communicator.return_value
# Execute task
result = adapter.execute("Create a function")
assert result.success
assert result.agent == "your_agent"
assert "Function created" in result.output
def test_parse_response_json(self, adapter):
"""Test parsing JSON response."""
json_output = '{"output": "Test output"}'
parsed = adapter._parse_response(json_output)
assert parsed == "Test output"
def test_parse_response_plain_text(self, adapter):
"""Test parsing plain text response."""
text_output = "Plain text output"
parsed = adapter._parse_response(text_output)
assert parsed == "Plain text output"
def test_extract_files(self, adapter):
"""Test file extraction from output."""
output = """
Created file main.py
Generated test.py
Wrote docs.md
"""
files = adapter._extract_files(output)
assert "main.py" in files
assert "test.py" in files
assert "docs.md" in files
# Run your adapter tests
pytest tests/test_your_agent_adapter.py -v
# Run with coverage
pytest tests/test_your_agent_adapter.py --cov=adapters.your_agent_adapter
# Run all adapter tests
pytest tests/test_adapters.py -v
Create tests/test_integration_your_agent.py:
import pytest
from orchestrator.core import Orchestrator
@pytest.mark.integration
class TestYourAgentIntegration:
"""Integration tests for Your Agent."""
def test_full_workflow_with_your_agent(self):
"""Test complete workflow including your agent."""
orchestrator = Orchestrator(config_path="config/agents.yaml")
result = orchestrator.execute_task(
task="Create a simple function",
workflow="with_your_agent"
)
assert result.success
assert len(result.files) > 0
If your agent supports streaming:
def execute_streaming(self, task: str, callback=None):
"""Execute task with streaming output."""
cmd = self._build_command(task)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
output = []
for line in process.stdout:
output.append(line)
if callback:
callback(line) # Real-time callback
process.wait()
return "".join(output)
def _parse_response(self, raw_output: str, format_type: str = "auto") -> str:
"""Parse response supporting multiple formats."""
if format_type == "json" or (format_type == "auto" and raw_output.strip().startswith("{")):
return self._parse_json(raw_output)
elif format_type == "xml":
return self._parse_xml(raw_output)
elif format_type == "markdown":
return self._parse_markdown(raw_output)
else:
return raw_output.strip()
class YourAgentError(AdapterError):
"""Custom exception for Your Agent specific errors."""
pass
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
try:
# ... execution code ...
except YourAgentSpecificException as e:
raise YourAgentError(f"Agent-specific error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise AdapterError(f"Failed to execute: {e}")
from functools import lru_cache
import hashlib
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
# Generate cache key
cache_key = self._generate_cache_key(task, context)
# Check cache
if cached_result := self._get_from_cache(cache_key):
logger.info(f"Cache hit for task: {task[:50]}...")
return cached_result
# Execute and cache
result = self._execute_task(task, context)
self._save_to_cache(cache_key, result)
return result
def _generate_cache_key(self, task: str, context: Optional[Dict[str, Any]]) -> str:
"""Generate cache key from task and context."""
data = f"{task}:{json.dumps(context, sort_keys=True) if context else ''}"
return hashlib.sha256(data.encode()).hexdigest()
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
try:
# Execution code
pass
except ValidationError as e:
logger.error(f"Validation failed: {e}")
return TaskResult(success=False, error=str(e))
except TimeoutError as e:
logger.error(f"Timeout: {e}")
return TaskResult(success=False, error="Agent timed out")
except Exception as e:
logger.exception(f"Unexpected error: {e}")
return TaskResult(success=False, error=f"Unexpected error: {e}")
import logging
logger = logging.getLogger(__name__)
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
logger.info(f"Executing task with {self.name}", extra={
"task_length": len(task),
"has_context": context is not None
})
# ... execution ...
logger.info(f"Task completed successfully", extra={
"duration": duration,
"files_generated": len(files)
})
def _sanitize_task(self, task: str) -> str:
"""Sanitize task input."""
# Remove control characters
task = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', task)
# Limit length
if len(task) > MAX_TASK_LENGTH:
task = task[:MAX_TASK_LENGTH]
# Escape special characters for shell
task = shlex.quote(task)
return task
from pydantic import BaseModel, validator
class YourAgentConfig(BaseModel):
"""Configuration schema for Your Agent."""
command: str
timeout: int = 300
role: str
model: Optional[str] = None
@validator('timeout')
def validate_timeout(cls, v):
if v < 1 or v > 3600:
raise ValueError('Timeout must be between 1 and 3600 seconds')
return v
@validator('role')
def validate_role(cls, v):
allowed_roles = ['implementation', 'review', 'refinement', 'suggestions']
if v not in allowed_roles:
raise ValueError(f'Role must be one of {allowed_roles}')
return v
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
temp_files = []
try:
# Create temp files if needed
temp_file = self._create_temp_input(task)
temp_files.append(temp_file)
# Execute
result = self._execute_task(temp_file)
return result
finally:
# Always cleanup
for temp_file in temp_files:
if os.path.exists(temp_file):
os.remove(temp_file)
class SimpleTextAdapter(BaseAdapter):
"""Adapter for a simple text-based AI agent."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.name = "simple_text"
self.command = config.get("command", "simple-ai")
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
# Build simple command
cmd = [self.command, task]
# Execute
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
return TaskResult(
agent=self.name,
output=result.stdout.strip(),
files=[],
success=result.returncode == 0
)
def validate_input(self, task: str) -> bool:
return bool(task and task.strip())
def validate_response(self, response: str) -> bool:
return bool(response)
import requests
class APIBasedAdapter(BaseAdapter):
"""Adapter for an AI agent with REST API."""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.api_url = config.get("api_url")
self.api_key = config.get("api_key")
def execute(self, task: str, context: Optional[Dict[str, Any]] = None) -> TaskResult:
payload = {
"task": task,
"context": context
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = requests.post(
f"{self.api_url}/execute",
json=payload,
headers=headers,
timeout=self.timeout
)
response.raise_for_status()
data = response.json()
return TaskResult(
agent=self.name,
output=data.get("output", ""),
files=data.get("files", []),
success=True
)
Problem: Agent 'your_agent' not found in registry
Solution:
# Check adapters/__init__.py
from adapters.your_agent_adapter import YourAgentAdapter
ADAPTER_REGISTRY = {
# ...
"your_agent": YourAgentAdapter, # Add this line
}
Problem: Agent CLI doesn’t respond or times out
Solution:
# Add debug logging
logger.debug(f"Executing command: {cmd}")
logger.debug(f"Timeout: {self.timeout}")
# Increase timeout
timeout: 600 # in config
Problem: Can’t parse agent output
Solution:
def _parse_response(self, raw_output: str) -> str:
logger.debug(f"Raw output: {raw_output}")
try:
# Try JSON first
return json.loads(raw_output)
except:
# Fallback to text
logger.warning("Failed to parse JSON, using raw text")
return raw_output
Enable detailed logging:
import logging
# In your adapter
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Add detailed logs
logger.debug(f"Built command: {cmd}")
logger.debug(f"Context provided: {context}")
logger.debug(f"Raw response: {response.stdout}")
your-agent-cli --task "test task"
from unittest.mock import Mock, patch
@patch('subprocess.run')
def test_execute(self, mock_run):
mock_run.return_value = Mock(stdout="output", returncode=0)
# Test your adapter
@pytest.mark.integration
@pytest.mark.skipif(not has_agent(), reason="Agent not installed")
def test_real_execution(self):
# Test with actual agent
After creating your adapter:
Need help? Open an issue or discussion on GitHub!