#!/usr/bin/env python3 """ FastAPI Server untuk MikroTik Gemini Integration """ from fastapi import FastAPI, HTTPException, Security, status, Depends from fastapi.security import APIKeyHeader from pydantic import BaseModel from datetime import datetime import uvicorn import os from dotenv import load_dotenv # Import our assistant from gemini_mikrotik_integration import MikroTikGeminiAssistant # Load environment load_dotenv() # Security API_KEY = os.getenv("API_KEY", "mkmcp_secret_key_2026") API_KEY_NAME = "X-API-Key" api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) async def get_api_key(api_key: str = Security(api_key_header)): if api_key == API_KEY: return api_key raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API Key tidak valid atau tidak ditemukan", ) # Initialize FastAPI app = FastAPI( title="MikroTik Gemini API", description="API untuk query MikroTik dengan Gemini AI", version="1.0.0", docs_url="/docs", redoc_url="/redoc", dependencies=[Depends(get_api_key)] ) # Initialize assistant assistant = None try: assistant = MikroTikGeminiAssistant() print("✅ MikroTik Gemini Assistant initialized") except Exception as e: print(f"⚠️ Assistant initialization failed: {e}") assistant = None # Request/Response models class QuestionRequest(BaseModel): question: str detailed: bool = False class AnswerResponse(BaseModel): question: str answer: str timestamp: str success: bool class HealthResponse(BaseModel): status: str service: str version: str timestamp: str mikrotik_device: str = "N/A" data_age: str = "N/A" # API Endpoints @app.get("/") def root(): return { "service": "MikroTik Gemini API", "version": "1.0.0", "endpoints": { "docs": "/docs", "health": "/health", "ask": "/ask (POST)" } } @app.get("/health", response_model=HealthResponse) def health_check(): """Check API and MikroTik status""" status = "healthy" device_info = "N/A" data_age = "N/A" if assistant: try: summary = assistant.get_summary() device_info = summary["device"] data_age = summary["timestamp"] except: status = "degraded" else: status = "degraded" return HealthResponse( status=status, service="MikroTik Gemini API", version="1.0.0", timestamp=datetime.now().isoformat(), mikrotik_device=device_info, data_age=data_age ) @app.post("/ask", response_model=AnswerResponse) def ask_question(request: QuestionRequest): """Ask Gemini about MikroTik""" if not assistant: raise HTTPException(status_code=503, detail="Assistant not available") try: answer = assistant.ask(request.question) return AnswerResponse( question=request.question, answer=answer, timestamp=datetime.now().isoformat(), success=True ) except Exception as e: raise HTTPException(status_code=500, detail=f"Error: {str(e)}") @app.get("/summary") def get_summary(): """Get MikroTik device summary""" if not assistant: raise HTTPException(status_code=503, detail="Assistant not available") try: return assistant.get_summary() except Exception as e: raise HTTPException(status_code=500, detail=f"Error: {str(e)}") # Run server if __name__ == "__main__": host = os.getenv("API_HOST", "0.0.0.0") port = int(os.getenv("API_PORT", "8000")) debug = os.getenv("API_DEBUG", "false").lower() == "true" print(f"🚀 Starting MikroTik Gemini API on {host}:{port}") print(f"📚 Documentation: http://{host}:{port}/docs") uvicorn.run( app, host=host, port=port, log_level="info" )