diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0802b6b --- /dev/null +++ b/.gitignore @@ -0,0 +1,51 @@ +# Environment variables +.env +.env.local +.env.production +.env.development + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +backend/venv/ +backend/.deps_installed + +# Node.js / Next.js +node_modules/ +.next/ +out/ +frontend/node_modules/ +frontend/.next/ +frontend/out/ +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.pnpm-debug.log* +package-lock.json + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Build artifacts +dist/ +build/ +*.egg-info/ + +# Process IDs +.backend.pid +.frontend.pid + +# Next.js +next-env.d.ts \ No newline at end of file diff --git a/backend/.env.example b/backend/.env.example new file mode 100644 index 0000000..b3303ec --- /dev/null +++ b/backend/.env.example @@ -0,0 +1,22 @@ +# MCP Server Configuration +MCP_PORT=8000 +MCP_HOST=0.0.0.0 + +# Security Settings +RATE_LIMIT_PER_MINUTE=60 +MAX_EMAIL_FETCH_SIZE=100 +SESSION_TIMEOUT_MINUTES=30 + +# Redis Configuration (for production) +REDIS_URL=redis://localhost:6379 +REDIS_DB=0 + +# Logging +LOG_LEVEL=INFO + +# CORS Settings (for frontend) +CORS_ORIGINS=http://localhost:3000,https://maxtheweb.ai + +# Auth Settings (for magic links) +RESEND_API_KEY=your_resend_api_key +MAGIC_LINK_EXPIRY_MINUTES=15 \ No newline at end of file diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..5fe6583 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,33 @@ +# Core MCP dependencies +fastmcp>=0.5.0 + +# IMAP email handling +imapclient>=3.0.0 +email-validator>=2.1.0 + +# SMTP support +aiosmtplib>=3.0.0 + +# Security and authentication +python-dotenv>=1.0.0 +cryptography>=41.0.0 + +# Async support and performance +aiofiles>=23.2.1 +aioimaplib>=1.0.1 + +# Rate limiting +slowapi>=0.1.9 + +# Web server +fastapi>=0.100.0 +uvicorn[standard]>=0.30.0 + +# Utilities +pydantic>=2.5.0 +python-multipart>=0.0.6 + +# Testing +pytest>=8.0.0 +pytest-asyncio>=0.23.0 +pytest-cov>=4.1.0 \ No newline at end of file diff --git a/backend/src/api_server.py b/backend/src/api_server.py new file mode 100644 index 0000000..7043e69 --- /dev/null +++ b/backend/src/api_server.py @@ -0,0 +1,401 @@ +""" +REST API server for MCP IMAP Agent +Provides HTTP endpoints for the Next.js frontend to communicate with MCP tools +""" + +import os +import json +import asyncio +import logging +from typing import Optional, Dict, Any +from contextlib import asynccontextmanager +from datetime import datetime + +from fastapi import FastAPI, HTTPException, Request, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse +from pydantic import BaseModel, EmailStr +from dotenv import load_dotenv +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded + +# Import MCP server components +from mcp_server import ( + get_session, + IMAPConfig, + list_folders as mcp_list_folders, + search_emails as mcp_search_emails, + get_email as mcp_get_email, + send_email as mcp_send_email, + health_check as mcp_health_check, + Context +) + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Create rate limiter +limiter = Limiter(key_func=get_remote_address) + +# Lifespan manager for FastAPI +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup + logger.info("Starting API server...") + yield + # Shutdown + logger.info("Shutting down API server...") + +# Create FastAPI app +app = FastAPI( + title="MCP IMAP Agent API", + version="1.0.0", + description="REST API for MCP IMAP Agent", + lifespan=lifespan +) + +# Add rate limiter error handler +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +# Configure CORS +app.add_middleware( + CORSMiddleware, + allow_origins=os.getenv("CORS_ORIGINS", "http://localhost:3000").split(","), + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Request models +class ConnectionConfig(BaseModel): + """Email connection configuration""" + host: str + port: int = 993 + username: EmailStr + password: str + smtpHost: Optional[str] = None + smtpPort: Optional[int] = 587 + +class ValidateConnectionRequest(BaseModel): + """Connection validation request""" + host: str + port: int = 993 + username: EmailStr + password: str + +class ChatRequest(BaseModel): + """Chat request with LLM""" + message: str + connection: ConnectionConfig + history: Optional[list] = [] + +class SearchRequest(BaseModel): + """Email search request""" + connection: ConnectionConfig + folder: str = "INBOX" + sender: Optional[str] = None + subject: Optional[str] = None + body: Optional[str] = None + since: Optional[str] = None + limit: int = 20 + +class GetEmailRequest(BaseModel): + """Get single email request""" + connection: ConnectionConfig + folder: str = "INBOX" + email_id: str + +class SendEmailRequest(BaseModel): + """Send email request""" + connection: ConnectionConfig + to: EmailStr + subject: str + body: str + cc: Optional[str] = None + is_html: bool = False + +# Endpoints +@app.get("/") +async def root(): + """Root endpoint""" + return {"message": "MCP IMAP Agent API", "version": "1.0.0"} + +@app.get("/health") +@limiter.limit("10/minute") +async def health(request: Request): + """Health check endpoint""" + return { + "status": "healthy", + "version": "1.0.0", + "timestamp": datetime.now().isoformat() + } + +@app.post("/api/validate-connection") +@limiter.limit("5/minute") +async def validate_connection(request: Request, conn: ValidateConnectionRequest): + """Validate IMAP connection""" + try: + # Test connection by listing folders + result = await mcp_list_folders( + None, # Context not needed for direct calls + username=conn.username, + password=conn.password, + host=conn.host + ) + + if result.get("success"): + return { + "success": True, + "message": "Connection successful", + "folders": result.get("folders", []) + } + else: + raise HTTPException(status_code=400, detail=result.get("error", "Connection failed")) + + except Exception as e: + logger.error(f"Connection validation error: {e}") + raise HTTPException(status_code=400, detail=str(e)) + +@app.post("/api/chat") +@limiter.limit("30/minute") +async def chat(request: Request, chat_req: ChatRequest): + """ + Handle chat messages and execute MCP tools + Returns streaming response with results + """ + async def generate(): + try: + # Parse the message to determine action + message = chat_req.message.lower() + conn = chat_req.connection + # Context not needed for REST API calls + + # Simple intent detection (in production, use LLM) + if "list folder" in message or "show folder" in message: + result = await mcp_list_folders( + ctx, + username=conn.username, + password=conn.password, + host=conn.host + ) + yield json.dumps({"type": "folders", "data": result}) + "\n" + + elif "search" in message or "find" in message or "show" in message: + # Extract search parameters from message + search_params = { + "folder": "INBOX", + "limit": 20 + } + + if "from" in message: + # Simple extraction (in production, use NLP) + parts = message.split("from") + if len(parts) > 1: + sender = parts[1].strip().split()[0] + search_params["sender"] = sender + + if "subject" in message: + parts = message.split("subject") + if len(parts) > 1: + subject = parts[1].strip().split()[0] + search_params["subject"] = subject + + if "last week" in message: + from datetime import datetime, timedelta + week_ago = datetime.now() - timedelta(days=7) + search_params["since"] = week_ago.strftime("%Y-%m-%d") + + result = await mcp_search_emails( + ctx, + username=conn.username, + password=conn.password, + host=conn.host, + **search_params + ) + + yield json.dumps({"type": "search", "data": result}) + "\n" + + elif "send" in message or "draft" in message or "reply" in message: + # For demo, just return a template + yield json.dumps({ + "type": "draft", + "data": { + "success": True, + "message": "Ready to compose email. Please provide recipient, subject, and body.", + "template": { + "to": "", + "subject": "", + "body": "" + } + } + }) + "\n" + + else: + # Default: search recent emails + result = await mcp_search_emails( + ctx, + username=conn.username, + password=conn.password, + host=conn.host, + folder="INBOX", + limit=10 + ) + + yield json.dumps({ + "type": "info", + "data": { + "message": f"Here are your recent emails. You can ask me to search, read, draft, or send emails.", + "emails": result.get("emails", []) + } + }) + "\n" + + except Exception as e: + logger.error(f"Chat error: {e}") + yield json.dumps({"type": "error", "data": {"error": str(e)}}) + "\n" + + return StreamingResponse(generate(), media_type="application/x-ndjson") + +@app.post("/api/search") +@limiter.limit("20/minute") +async def search_emails(request: Request, search_req: SearchRequest): + """Search emails with filters""" + try: + # Context not needed for REST API calls + conn = search_req.connection + + result = await mcp_search_emails( + ctx, + username=conn.username, + password=conn.password, + host=conn.host, + folder=search_req.folder, + sender=search_req.sender, + subject=search_req.subject, + body=search_req.body, + since=search_req.since, + limit=search_req.limit + ) + + return result + + except Exception as e: + logger.error(f"Search error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/get-email") +@limiter.limit("30/minute") +async def get_email(request: Request, email_req: GetEmailRequest): + """Get full email content""" + try: + # Context not needed for REST API calls + conn = email_req.connection + + result = await mcp_get_email( + ctx, + username=conn.username, + password=conn.password, + host=conn.host, + folder=email_req.folder, + email_id=email_req.email_id + ) + + return result + + except Exception as e: + logger.error(f"Get email error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/send") +@limiter.limit("10/minute") +async def send_email_endpoint(request: Request, send_req: SendEmailRequest): + """Send an email""" + try: + # Context not needed for REST API calls + conn = send_req.connection + + # Use SMTP host if provided, otherwise derive from IMAP host + smtp_host = conn.smtpHost + if not smtp_host: + if "gmail" in conn.host: + smtp_host = "smtp.gmail.com" + elif "outlook" in conn.host or "office365" in conn.host: + smtp_host = "smtp.office365.com" + else: + # Try replacing imap with smtp + smtp_host = conn.host.replace("imap.", "smtp.") + + result = await mcp_send_email( + ctx, + username=conn.username, + password=conn.password, + smtp_host=smtp_host, + to=send_req.to, + subject=send_req.subject, + body=send_req.body, + cc=send_req.cc, + is_html=send_req.is_html + ) + + return result + + except Exception as e: + logger.error(f"Send email error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/folders") +@limiter.limit("10/minute") +async def list_folders(request: Request, conn: ConnectionConfig): + """List email folders""" + try: + # Context not needed for REST API calls + result = await mcp_list_folders( + ctx, + username=conn.username, + password=conn.password, + host=conn.host + ) + + return result + + except Exception as e: + logger.error(f"List folders error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/api/demo-request") +@limiter.limit("5/minute") +async def demo_request(request: Request, data: dict): + """ + Handle demo requests - sends user inquiry to max@maxtheweb.ai + """ + try: + user_email = data.get("email") + user_message = data.get("message", "Demo request") + + # TODO: Send email to max@maxtheweb.ai with user's demo request + # For now, just log it + logger.info(f"Demo request from {user_email}: {user_message}") + + # In production, you would: + # 1. Send email to max@maxtheweb.ai with user details + # 2. Queue AI processing job + # 3. Send results back to user_email + + return { + "success": True, + "message": "Demo request received" + } + + except Exception as e: + logger.error(f"Demo request error: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + import uvicorn + port = int(os.getenv("API_PORT", 8001)) + uvicorn.run(app, host="0.0.0.0", port=port) \ No newline at end of file diff --git a/backend/src/mcp_server.py b/backend/src/mcp_server.py new file mode 100644 index 0000000..2b2516a --- /dev/null +++ b/backend/src/mcp_server.py @@ -0,0 +1,456 @@ +""" +MCP IMAP Agent Server - AI-powered email automation via MCP protocol +Built with FastMCP for production-ready performance +""" + +import os +import asyncio +import logging +from typing import Optional, List, Dict, Any +from datetime import datetime, timedelta +from functools import lru_cache +import email +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart + +from fastmcp import FastMCP, Context +from pydantic import BaseModel, Field, EmailStr +from dotenv import load_dotenv +import aioimaplib +from email_validator import validate_email, EmailNotValidError +from slowapi import Limiter +from slowapi.util import get_remote_address + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Initialize FastMCP server +mcp = FastMCP("mcp-imap-agent") + +# Rate limiter for security +limiter = Limiter(key_func=get_remote_address) + +# Configuration models +class IMAPConfig(BaseModel): + """IMAP connection configuration""" + host: str = Field(..., description="IMAP server hostname") + port: int = Field(993, description="IMAP port (993 for SSL)") + username: EmailStr = Field(..., description="Email address") + password: str = Field(..., description="App-specific password") + use_ssl: bool = Field(True, description="Use SSL connection") + +class EmailSearchParams(BaseModel): + """Email search parameters""" + folder: str = Field("INBOX", description="Folder to search in") + sender: Optional[str] = Field(None, description="Filter by sender email") + subject: Optional[str] = Field(None, description="Filter by subject") + body: Optional[str] = Field(None, description="Search in email body") + since: Optional[str] = Field(None, description="Emails since date (YYYY-MM-DD)") + limit: int = Field(50, description="Maximum emails to return", ge=1, le=100) + +class IMAPSession: + """Manages IMAP connection with connection pooling""" + + def __init__(self, config: IMAPConfig): + self.config = config + self.client: Optional[aioimaplib.IMAP4_SSL] = None + self._lock = asyncio.Lock() + + async def connect(self): + """Establish IMAP connection with retry logic""" + async with self._lock: + if self.client is None: + try: + self.client = aioimaplib.IMAP4_SSL( + host=self.config.host, + port=self.config.port + ) + await self.client.wait_hello_from_server() + + # Authenticate + response = await self.client.login( + self.config.username, + self.config.password + ) + + if response.result != 'OK': + raise Exception(f"Login failed: {response}") + + logger.info(f"Connected to {self.config.host} as {self.config.username}") + + except Exception as e: + logger.error(f"Connection failed: {e}") + self.client = None + raise + + async def disconnect(self): + """Close IMAP connection gracefully""" + async with self._lock: + if self.client: + try: + await self.client.logout() + except: + pass + self.client = None + + async def ensure_connected(self): + """Ensure connection is active""" + if self.client is None: + await self.connect() + + async def execute(self, command: str, *args): + """Execute IMAP command with automatic reconnection""" + await self.ensure_connected() + try: + return await getattr(self.client, command)(*args) + except Exception as e: + logger.warning(f"Command failed, reconnecting: {e}") + await self.disconnect() + await self.connect() + return await getattr(self.client, command)(*args) + +# Global session cache (in production, use Redis) +sessions: Dict[str, IMAPSession] = {} + +@lru_cache(maxsize=128) +def get_session(username: str, password: str, host: str) -> IMAPSession: + """Get or create IMAP session with caching""" + key = f"{username}@{host}" + if key not in sessions: + config = IMAPConfig( + host=host, + username=username, + password=password + ) + sessions[key] = IMAPSession(config) + return sessions[key] + +# MCP Tools Implementation + +@mcp.tool() +async def list_folders( + ctx: Context, + username: EmailStr = Field(..., description="Email address"), + password: str = Field(..., description="App-specific password"), + host: str = Field(..., description="IMAP server hostname") +) -> Dict[str, Any]: + """ + List all email folders in the account. + Returns folder names and their attributes. + """ + try: + session = get_session(username, password, host) + await session.ensure_connected() + + # List folders + response = await session.execute('list') + + if response.result != 'OK': + return { + "success": False, + "error": "Failed to list folders" + } + + folders = [] + for line in response.lines: + # Parse IMAP LIST response + if isinstance(line, bytes): + line = line.decode('utf-8') + + # Extract folder name (simplified parsing) + parts = line.split('"') + if len(parts) >= 3: + folder_name = parts[-2] + folders.append({ + "name": folder_name, + "path": folder_name + }) + + return { + "success": True, + "folders": folders, + "count": len(folders) + } + + except Exception as e: + logger.error(f"list_folders error: {e}") + return { + "success": False, + "error": str(e) + } + +@mcp.tool() +async def search_emails( + ctx: Context, + username: EmailStr = Field(..., description="Email address"), + password: str = Field(..., description="App-specific password"), + host: str = Field(..., description="IMAP server hostname"), + folder: str = Field("INBOX", description="Folder to search"), + sender: Optional[str] = Field(None, description="Filter by sender"), + subject: Optional[str] = Field(None, description="Filter by subject"), + body: Optional[str] = Field(None, description="Search in body"), + since: Optional[str] = Field(None, description="Since date YYYY-MM-DD"), + limit: int = Field(20, description="Max results", ge=1, le=100) +) -> Dict[str, Any]: + """ + Search emails with multiple filters. + Returns email metadata and snippets. + """ + try: + session = get_session(username, password, host) + await session.ensure_connected() + + # Select folder + response = await session.execute('select', folder) + if response.result != 'OK': + return { + "success": False, + "error": f"Cannot access folder: {folder}" + } + + # Build search criteria + criteria = ['ALL'] + + if sender: + criteria.append(f'FROM "{sender}"') + if subject: + criteria.append(f'SUBJECT "{subject}"') + if body: + criteria.append(f'BODY "{body}"') + if since: + date_obj = datetime.strptime(since, '%Y-%m-%d') + criteria.append(f'SINCE {date_obj.strftime("%d-%b-%Y")}') + + search_string = ' '.join(criteria) if len(criteria) > 1 else criteria[0] + + # Execute search + response = await session.execute('search', None, search_string) + + if response.result != 'OK': + return { + "success": False, + "error": "Search failed" + } + + # Parse message IDs + message_ids = response.lines[0].split() if response.lines else [] + message_ids = message_ids[-limit:] if len(message_ids) > limit else message_ids + + emails = [] + for msg_id in message_ids: + # Fetch email headers + response = await session.execute( + 'fetch', + msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id), + '(BODY.PEEK[HEADER] FLAGS)' + ) + + if response.result == 'OK' and response.lines: + # Parse email headers (simplified) + header_data = b''.join( + line if isinstance(line, bytes) else line.encode() + for line in response.lines if line + ) + + msg = email.message_from_bytes(header_data) + + emails.append({ + "id": msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id), + "subject": msg.get('Subject', '(no subject)'), + "from": msg.get('From', ''), + "to": msg.get('To', ''), + "date": msg.get('Date', ''), + "folder": folder + }) + + return { + "success": True, + "emails": emails, + "count": len(emails), + "search_criteria": search_string + } + + except Exception as e: + logger.error(f"search_emails error: {e}") + return { + "success": False, + "error": str(e) + } + +@mcp.tool() +async def get_email( + ctx: Context, + username: EmailStr = Field(..., description="Email address"), + password: str = Field(..., description="App-specific password"), + host: str = Field(..., description="IMAP server hostname"), + email_id: str = Field(..., description="Email ID to retrieve"), + folder: str = Field("INBOX", description="Email folder") +) -> Dict[str, Any]: + """ + Get full email content including body and attachments info. + """ + try: + session = get_session(username, password, host) + await session.ensure_connected() + + # Select folder + await session.execute('select', folder) + + # Fetch full email + response = await session.execute('fetch', email_id, '(RFC822)') + + if response.result != 'OK': + return { + "success": False, + "error": "Failed to fetch email" + } + + # Parse email + raw_email = b''.join( + line if isinstance(line, bytes) else line.encode() + for line in response.lines if line + ) + + msg = email.message_from_bytes(raw_email) + + # Extract body + body_plain = "" + body_html = "" + attachments = [] + + for part in msg.walk(): + content_type = part.get_content_type() + content_disposition = str(part.get("Content-Disposition", "")) + + if "attachment" in content_disposition: + filename = part.get_filename() + if filename: + attachments.append({ + "filename": filename, + "content_type": content_type, + "size": len(part.get_payload()) + }) + elif content_type == "text/plain": + body_plain = part.get_payload(decode=True).decode('utf-8', errors='ignore') + elif content_type == "text/html": + body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore') + + return { + "success": True, + "email": { + "id": email_id, + "subject": msg.get('Subject', ''), + "from": msg.get('From', ''), + "to": msg.get('To', ''), + "cc": msg.get('Cc', ''), + "date": msg.get('Date', ''), + "body_plain": body_plain, + "body_html": body_html, + "attachments": attachments, + "folder": folder + } + } + + except Exception as e: + logger.error(f"get_email error: {e}") + return { + "success": False, + "error": str(e) + } + +@mcp.tool() +async def send_email( + ctx: Context, + username: EmailStr = Field(..., description="Email address"), + password: str = Field(..., description="App-specific password"), + smtp_host: str = Field(..., description="SMTP server hostname"), + to: EmailStr = Field(..., description="Recipient email"), + subject: str = Field(..., description="Email subject"), + body: str = Field(..., description="Email body"), + cc: Optional[str] = Field(None, description="CC recipients"), + is_html: bool = Field(False, description="Send as HTML email") +) -> Dict[str, Any]: + """ + Send an email via SMTP. + Supports plain text and HTML formats. + """ + try: + import aiosmtplib + + # Validate email + try: + validate_email(to) + except EmailNotValidError as e: + return { + "success": False, + "error": f"Invalid recipient email: {e}" + } + + # Create message + msg = MIMEMultipart('alternative') + msg['From'] = username + msg['To'] = to + msg['Subject'] = subject + + if cc: + msg['Cc'] = cc + + # Add body + if is_html: + msg.attach(MIMEText(body, 'html')) + else: + msg.attach(MIMEText(body, 'plain')) + + # Send via SMTP + async with aiosmtplib.SMTP( + hostname=smtp_host, + port=587, + start_tls=True + ) as smtp: + await smtp.login(username, password) + await smtp.send_message(msg) + + return { + "success": True, + "message": "Email sent successfully", + "details": { + "to": to, + "subject": subject, + "timestamp": datetime.now().isoformat() + } + } + + except Exception as e: + logger.error(f"send_email error: {e}") + return { + "success": False, + "error": str(e) + } + +# Health check endpoint +@mcp.tool() +async def health_check(ctx: Context) -> Dict[str, Any]: + """Check server health and status""" + return { + "status": "healthy", + "version": "1.0.0", + "timestamp": datetime.now().isoformat(), + "active_sessions": len(sessions) + } + +if __name__ == "__main__": + # Run the MCP server + import uvicorn + uvicorn.run( + mcp.get_asgi_app(), + host="0.0.0.0", + port=int(os.getenv("MCP_PORT", 8000)), + log_level="info" + ) \ No newline at end of file