MCP-IMAP-Agent/backend/src/api_server.py
Soldier 8232a58600 MCP IMAP server with core email tools
Implements complete IMAP server using FastMCP protocol with 5 core tools:
- list_folders: Browse mailbox structure
- search_emails: Query with filters (sender, subject, date)
- get_email: Fetch full email content with metadata
- send_email: SMTP sending with HTML support
- health_check: Connection validation

Architecture:
- FastMCP for MCP protocol implementation
- aioimaplib for async IMAP connections
- IMAPSession class for connection pooling
- REST API bridge (api_server.py) for HTTP access

Tech: Python 3.11 + FastMCP + aioimaplib + FastAPI + aiosmtplib
2025-11-17 12:36:09 +00:00

401 lines
12 KiB
Python

"""
REST API server for MCP IMAP Agent
Provides HTTP endpoints for the Next.js frontend to communicate with MCP tools
"""
import os
import json
import asyncio
import logging
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI, HTTPException, Request, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, EmailStr
from dotenv import load_dotenv
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# Import MCP server components
from mcp_server import (
get_session,
IMAPConfig,
list_folders as mcp_list_folders,
search_emails as mcp_search_emails,
get_email as mcp_get_email,
send_email as mcp_send_email,
health_check as mcp_health_check,
Context
)
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create rate limiter
limiter = Limiter(key_func=get_remote_address)
# Lifespan manager for FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting API server...")
yield
# Shutdown
logger.info("Shutting down API server...")
# Create FastAPI app
app = FastAPI(
title="MCP IMAP Agent API",
version="1.0.0",
description="REST API for MCP IMAP Agent",
lifespan=lifespan
)
# Add rate limiter error handler
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("CORS_ORIGINS", "http://localhost:3000").split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request models
class ConnectionConfig(BaseModel):
"""Email connection configuration"""
host: str
port: int = 993
username: EmailStr
password: str
smtpHost: Optional[str] = None
smtpPort: Optional[int] = 587
class ValidateConnectionRequest(BaseModel):
"""Connection validation request"""
host: str
port: int = 993
username: EmailStr
password: str
class ChatRequest(BaseModel):
"""Chat request with LLM"""
message: str
connection: ConnectionConfig
history: Optional[list] = []
class SearchRequest(BaseModel):
"""Email search request"""
connection: ConnectionConfig
folder: str = "INBOX"
sender: Optional[str] = None
subject: Optional[str] = None
body: Optional[str] = None
since: Optional[str] = None
limit: int = 20
class GetEmailRequest(BaseModel):
"""Get single email request"""
connection: ConnectionConfig
folder: str = "INBOX"
email_id: str
class SendEmailRequest(BaseModel):
"""Send email request"""
connection: ConnectionConfig
to: EmailStr
subject: str
body: str
cc: Optional[str] = None
is_html: bool = False
# Endpoints
@app.get("/")
async def root():
"""Root endpoint"""
return {"message": "MCP IMAP Agent API", "version": "1.0.0"}
@app.get("/health")
@limiter.limit("10/minute")
async def health(request: Request):
"""Health check endpoint"""
return {
"status": "healthy",
"version": "1.0.0",
"timestamp": datetime.now().isoformat()
}
@app.post("/api/validate-connection")
@limiter.limit("5/minute")
async def validate_connection(request: Request, conn: ValidateConnectionRequest):
"""Validate IMAP connection"""
try:
# Test connection by listing folders
result = await mcp_list_folders(
None, # Context not needed for direct calls
username=conn.username,
password=conn.password,
host=conn.host
)
if result.get("success"):
return {
"success": True,
"message": "Connection successful",
"folders": result.get("folders", [])
}
else:
raise HTTPException(status_code=400, detail=result.get("error", "Connection failed"))
except Exception as e:
logger.error(f"Connection validation error: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.post("/api/chat")
@limiter.limit("30/minute")
async def chat(request: Request, chat_req: ChatRequest):
"""
Handle chat messages and execute MCP tools
Returns streaming response with results
"""
async def generate():
try:
# Parse the message to determine action
message = chat_req.message.lower()
conn = chat_req.connection
# Context not needed for REST API calls
# Simple intent detection (in production, use LLM)
if "list folder" in message or "show folder" in message:
result = await mcp_list_folders(
ctx,
username=conn.username,
password=conn.password,
host=conn.host
)
yield json.dumps({"type": "folders", "data": result}) + "\n"
elif "search" in message or "find" in message or "show" in message:
# Extract search parameters from message
search_params = {
"folder": "INBOX",
"limit": 20
}
if "from" in message:
# Simple extraction (in production, use NLP)
parts = message.split("from")
if len(parts) > 1:
sender = parts[1].strip().split()[0]
search_params["sender"] = sender
if "subject" in message:
parts = message.split("subject")
if len(parts) > 1:
subject = parts[1].strip().split()[0]
search_params["subject"] = subject
if "last week" in message:
from datetime import datetime, timedelta
week_ago = datetime.now() - timedelta(days=7)
search_params["since"] = week_ago.strftime("%Y-%m-%d")
result = await mcp_search_emails(
ctx,
username=conn.username,
password=conn.password,
host=conn.host,
**search_params
)
yield json.dumps({"type": "search", "data": result}) + "\n"
elif "send" in message or "draft" in message or "reply" in message:
# For demo, just return a template
yield json.dumps({
"type": "draft",
"data": {
"success": True,
"message": "Ready to compose email. Please provide recipient, subject, and body.",
"template": {
"to": "",
"subject": "",
"body": ""
}
}
}) + "\n"
else:
# Default: search recent emails
result = await mcp_search_emails(
ctx,
username=conn.username,
password=conn.password,
host=conn.host,
folder="INBOX",
limit=10
)
yield json.dumps({
"type": "info",
"data": {
"message": f"Here are your recent emails. You can ask me to search, read, draft, or send emails.",
"emails": result.get("emails", [])
}
}) + "\n"
except Exception as e:
logger.error(f"Chat error: {e}")
yield json.dumps({"type": "error", "data": {"error": str(e)}}) + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson")
@app.post("/api/search")
@limiter.limit("20/minute")
async def search_emails(request: Request, search_req: SearchRequest):
"""Search emails with filters"""
try:
# Context not needed for REST API calls
conn = search_req.connection
result = await mcp_search_emails(
ctx,
username=conn.username,
password=conn.password,
host=conn.host,
folder=search_req.folder,
sender=search_req.sender,
subject=search_req.subject,
body=search_req.body,
since=search_req.since,
limit=search_req.limit
)
return result
except Exception as e:
logger.error(f"Search error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/get-email")
@limiter.limit("30/minute")
async def get_email(request: Request, email_req: GetEmailRequest):
"""Get full email content"""
try:
# Context not needed for REST API calls
conn = email_req.connection
result = await mcp_get_email(
ctx,
username=conn.username,
password=conn.password,
host=conn.host,
folder=email_req.folder,
email_id=email_req.email_id
)
return result
except Exception as e:
logger.error(f"Get email error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/send")
@limiter.limit("10/minute")
async def send_email_endpoint(request: Request, send_req: SendEmailRequest):
"""Send an email"""
try:
# Context not needed for REST API calls
conn = send_req.connection
# Use SMTP host if provided, otherwise derive from IMAP host
smtp_host = conn.smtpHost
if not smtp_host:
if "gmail" in conn.host:
smtp_host = "smtp.gmail.com"
elif "outlook" in conn.host or "office365" in conn.host:
smtp_host = "smtp.office365.com"
else:
# Try replacing imap with smtp
smtp_host = conn.host.replace("imap.", "smtp.")
result = await mcp_send_email(
ctx,
username=conn.username,
password=conn.password,
smtp_host=smtp_host,
to=send_req.to,
subject=send_req.subject,
body=send_req.body,
cc=send_req.cc,
is_html=send_req.is_html
)
return result
except Exception as e:
logger.error(f"Send email error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/folders")
@limiter.limit("10/minute")
async def list_folders(request: Request, conn: ConnectionConfig):
"""List email folders"""
try:
# Context not needed for REST API calls
result = await mcp_list_folders(
ctx,
username=conn.username,
password=conn.password,
host=conn.host
)
return result
except Exception as e:
logger.error(f"List folders error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/demo-request")
@limiter.limit("5/minute")
async def demo_request(request: Request, data: dict):
"""
Handle demo requests - sends user inquiry to max@maxtheweb.ai
"""
try:
user_email = data.get("email")
user_message = data.get("message", "Demo request")
# TODO: Send email to max@maxtheweb.ai with user's demo request
# For now, just log it
logger.info(f"Demo request from {user_email}: {user_message}")
# In production, you would:
# 1. Send email to max@maxtheweb.ai with user details
# 2. Queue AI processing job
# 3. Send results back to user_email
return {
"success": True,
"message": "Demo request received"
}
except Exception as e:
logger.error(f"Demo request error: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("API_PORT", 8001))
uvicorn.run(app, host="0.0.0.0", port=port)