#!/usr/bin/env python3 """ Script untuk update data MikroTik dari REST API """ import requests import json import os from datetime import datetime from dotenv import load_dotenv # Load environment load_dotenv() # Default MikroTik credentials (fallback to .env) DEFAULT_MIKROTIK_HOST = os.getenv("MIKROTIK_HOST", "192.168.1.1") DEFAULT_MIKROTIK_PORT = os.getenv("MIKROTIK_PORT", "8728") DEFAULT_MIKROTIK_USER = os.getenv("MIKROTIK_USER", "admin") DEFAULT_MIKROTIK_PASSWORD = os.getenv("MIKROTIK_PASSWORD", "password") def get_base_url(host, port): # Gunakan https jika port adalah 443, sisanya http (tapi tetap REST) protocol = "https" if str(port) == "443" else "http" return f"{protocol}://{host}:{port}/rest" def fetch_data(endpoint, auth=None, base_url=None): """Fetch data from MikroTik REST API with provided credentials (GET)""" return _send_request("GET", endpoint, auth, base_url) def post_data(endpoint, payload, auth=None, base_url=None): """Send POST request to MikroTik REST API""" return _send_request("POST", endpoint, auth, base_url, json_data=payload) def _send_request(method, endpoint, auth=None, base_url=None, json_data=None): """Internal helper for HTTP requests""" if auth is None: auth = (DEFAULT_MIKROTIK_USER, DEFAULT_MIKROTIK_PASSWORD) if base_url is None: base_url = get_base_url(DEFAULT_MIKROTIK_HOST, DEFAULT_MIKROTIK_PORT) url = f"{base_url}/{endpoint}" headers = { "Accept": "application/json", "User-Agent": "MikroTik-MCP-Assistant/1.0" } try: if method == "GET": response = requests.get(url, auth=auth, headers=headers, timeout=12) else: response = requests.post(url, auth=auth, headers=headers, json=json_data, timeout=12) if response.status_code != 200: status_code = response.status_code error_body = "" try: error_body = response.text except: pass error_msg = f"HTTP {status_code}" if status_code == 403: error_msg += " (Forbidden/Permission Denied)" elif status_code == 401: error_msg += " (Unauthorized)" elif status_code == 404: error_msg += " (Not Found)" if error_body: error_msg += f" - Body: {error_body[:100]}" raise Exception(error_msg) if not response.text or response.text.strip() == "": # POST requests might return empty body if successful but no content? # Usually monitor-traffic returns data. if method == "POST": return {} raise Exception("Empty Response from Router") return response.json() except requests.exceptions.Timeout: raise Exception("Timeout: Router tidak merespon dalam 12 detik.") except requests.exceptions.ConnectionError as e: raise Exception(f"Connection Error: Tidak bisa terhubung ke router. {str(e)}") except json.JSONDecodeError: raise Exception(f"JSON Error: Response bukan format JSON yang valid. Body: {response.text[:100]}") except Exception as e: # Catch-all for other errors error_type = type(e).__name__ raise Exception(f"{error_type}: {str(e)}") def gather_all_info(host=None, port=None, user=None, password=None): """Gather all MikroTik information for a specific router""" host = host or DEFAULT_MIKROTIK_HOST port = port or DEFAULT_MIKROTIK_PORT user = user or DEFAULT_MIKROTIK_USER password = password or DEFAULT_MIKROTIK_PASSWORD base_url = get_base_url(host, port) auth = (user, password) print(f"🔍 Gathering MikroTik data from {host}...") def safe_fetch(endpoint): try: return fetch_data(endpoint, auth, base_url) except Exception as e: return {"error": str(e)} data = { "metadata": { "timestamp": datetime.now().isoformat(), "source": "MikroTik REST API", "host": host }, "system": { "resource": safe_fetch("system/resource"), "identity": safe_fetch("system/identity"), "license": safe_fetch("system/license"), "users": safe_fetch("user") }, "interfaces": { "interfaces": safe_fetch("interface") }, "network": { "ip_addresses": safe_fetch("ip/address"), "routes": safe_fetch("ip/route") }, "ppp": { "secrets": safe_fetch("ppp/secret") }, "logs": safe_fetch("log"), "hotspot": { "active": safe_fetch("ip/hotspot/active"), "users": safe_fetch("ip/hotspot/user") } } # Calculate statistics if data["interfaces"]["interfaces"]: total = len(data["interfaces"]["interfaces"]) running = sum(1 for i in data["interfaces"]["interfaces"] if i.get("running") == True) data["interfaces"]["statistics"] = { "total": total, "running": running, "not_running": total - running } if data["ppp"]["secrets"]: total = len(data["ppp"]["secrets"]) enabled = sum(1 for s in data["ppp"]["secrets"] if s.get("disabled") == False) data["ppp"]["statistics"] = { "total_secrets": total, "enabled_secrets": enabled } print(f"✅ Data gathered: {len(str(data))} bytes") return data def save_data(data, filename="data/mikrotik_mcp_data.json"): """Save data to JSON file""" try: with open(filename, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"💾 Data saved to {filename}") return True except Exception as e: print(f"Error saving data: {e}") return False if __name__ == "__main__": print("🔄 Updating MikroTik MCP data...") # Check credentials if not all([DEFAULT_MIKROTIK_HOST, DEFAULT_MIKROTIK_USER, DEFAULT_MIKROTIK_PASSWORD]): print("⚠️ MikroTik credentials not set in .env file") print(" Using existing data file only") else: data = gather_all_info() if data: save_data(data) print("✅ Update completed successfully") else: print("❌ Failed to gather data")