#!/usr/bin/env python3 import os import json import asyncio from datetime import datetime from typing import Any, Dict, List, Optional from mcp.server.stdio import stdio_server from mcp.server import Server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from dotenv import load_dotenv # Import our helper from update_data from update_data import gather_all_info, fetch_data, post_data, save_data, get_base_url load_dotenv() # Configuration DEVICES_FILE = "/home/wartana/myApp/mikrotik-mcp/devices.json" BILLING_CONFIG_FILE = "/home/wartana/myApp/billing-mcp/config.json" DATA_DIR = "/home/wartana/myApp/mikrotik-mcp/data" SNAPSHOTS_DIR = os.path.join(DATA_DIR, "snapshots") # Create the MCP server server = Server("mikrotik") def load_devices() -> List[Dict]: """Load list of routers from devices.json""" try: if os.path.exists(DEVICES_FILE): with open(DEVICES_FILE, 'r') as f: return json.load(f) return [] except Exception as e: print(f"Error loading devices: {e}") return [] def load_billing_routers() -> List[Dict]: """Load routers from billing-mcp config.json as fallback""" try: if os.path.exists(BILLING_CONFIG_FILE): with open(BILLING_CONFIG_FILE, 'r') as f: config = json.load(f) routers = [] for isp_key, isp_data in config.get("isps", {}).items(): for router_name, router_config in isp_data.get("routers", {}).items(): routers.append({ "name": router_name, "host": router_config.get("host"), "port": router_config.get("port", 80), "user": router_config.get("user"), "pass": router_config.get("pass"), "isp": isp_key }) return routers return [] except Exception as e: print(f"Error loading billing config: {e}") return [] def get_router(router_name: str | None) -> Dict | None: """Get router config by name, with fallback to billing config""" devices = load_devices() # Try devices.json first if devices: if not router_name: return devices[0] for d in devices: if d.get("name").lower() == router_name.lower(): return d # Fallback to billing config billing_routers = load_billing_routers() if billing_routers: if not router_name and not devices: return billing_routers[0] for r in billing_routers: if r.get("name").lower() == router_name.lower(): return r return None def get_router_data_path(router_name: str) -> str: """Get path to the snapshot data file for a specific router""" safe_name = router_name.replace(" ", "_").lower() return os.path.join(DATA_DIR, f"{safe_name}.json") def save_snapshot(router_name: str, endpoint: str, data: Any) -> str: """Save raw data to a timestamped snapshot file and return the path""" os.makedirs(SNAPSHOTS_DIR, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_router = router_name.replace(" ", "_").lower() endpoint_part = endpoint.replace("/", "_") filename = f"{safe_router}_{endpoint_part}_{timestamp}.json" filepath = os.path.join(SNAPSHOTS_DIR, filename) with open(filepath, 'w') as f: json.dump(data, f, indent=2) return filepath def ensure_first_item(data: Any) -> Dict: """Helper to ensure we get a dictionary, even if MikroTik returns a list or a single object.""" if isinstance(data, list): return data[0] if len(data) > 0 else {} if isinstance(data, dict): return data return {} @server.list_tools() async def handle_list_tools() -> list[Tool]: """List available MikroTik tools.""" return [ Tool( name="list_routers", description="Melihat daftar router MikroTik yang terdaftar di konfigurasi.", inputSchema={"type": "object", "properties": {}}, ), Tool( name="get_status", description="Mendapatkan informasi dasar router (CPU, Memori, Uptime).", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router (lihat list_routers). Opsional, default ke router pertama."}, }, }, ), Tool( name="get_interfaces", description="Mendapatkan daftar interface network dan status traffic.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, "running_only": {"type": "boolean", "description": "Hanya tampilkan interface yang aktif."}, "full_data": {"type": "boolean", "description": "Tampilkan semua field data (Hati-hati: bisa menyebabkan error context length jika data sangat besar)."}, "limit": {"type": "integer", "description": "Batas jumlah interface yang ditampilkan (Default 50, Max 200)."}, }, }, ), Tool( name="get_hotspot_active", description="Mendapatkan daftar pengguna hotspot yang sedang aktif.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="get_logs", description="Mendapatkan baris log terakhir dari router.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, "limit": {"type": "integer", "description": "Jumlah log (default 20)."}, }, }, ), Tool( name="refresh_data", description="Sinkronisasi data terbaru langsung dari router ke snapshot file.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="test_permissions", description="Mengecek izin akses user MikroTik ke berbagai menu API (Diagnostik).", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="add_router", description="Menambahkan router MikroTik baru ke dalam daftar konfigurasi.", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "Nama unik untuk router (misal: Batuaji)."}, "host": {"type": "string", "description": "IP atau Hostname router."}, "port": {"type": "integer", "description": "Port REST API (default 80)."}, "user": {"type": "string", "description": "Username admin."}, "pass": {"type": "string", "description": "Password admin."}, }, "required": ["name", "host", "user", "pass"], }, ), Tool( name="get_ppp_active", description="Mendapatkan daftar sesi PPP (PPPoE/L2TP/VPN) yang saat ini aktif.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="get_ppp_secrets", description="Mendapatkan daftar user/secrets yang terdaftar di layanan PPP.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="get_queues", description="Mendapatkan daftar Simple Queues untuk melihat limitasi bandwidth user.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="get_queue_tree", description="Mendapatkan daftar Queue Tree untuk melihat manajemen bandwidth yang lebih kompleks.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="remove_router", description="Menghapus router dari daftar konfigurasi.", inputSchema={ "type": "object", "properties": { "name": {"type": "string", "description": "Nama router yang ingin dihapus."}, }, "required": ["name"], }, ), Tool( name="get_vlans", description="Mendapatkan daftar interface VLAN (filter by type=vlan).", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, }, }, ), Tool( name="get_gateway_interfaces", description="Mendapatkan daftar interface yang berfungsi sebagai gateway (dari routing table).", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, "active_only": {"type": "boolean", "description": "Hanya tampilkan gateway yang aktif (default: true)."}, }, }, ), Tool( name="get_interface_traffic", description="Mendapatkan traffic rate (tx/rx) interface dalam format Mbps yang mudah dibaca.", inputSchema={ "type": "object", "properties": { "router_name": {"type": "string", "description": "Nama router."}, "interface_name": {"type": "string", "description": "Nama interface spesifik (opsional, jika kosong tampilkan semua yang running)."}, "running_only": {"type": "boolean", "description": "Hanya tampilkan interface yang aktif (default: true)."}, }, }, ) ] @server.call_tool() async def handle_call_tool(name: str, arguments: dict | None) -> list[TextContent | ImageContent | EmbeddedResource]: """Handle tool execution requests.""" try: # 1. Handle list_routers (no router connection needed) if name == "list_routers": devices = load_devices() billing_routers = load_billing_routers() summary = [] seen_names = set() # Add devices from devices.json for d in devices: summary.append({"name": d["name"], "host": d["host"], "source": "devices.json"}) seen_names.add(d["name"].lower()) # Add devices from billing config if not duplicate for r in billing_routers: if r["name"].lower() not in seen_names: summary.append({"name": r["name"], "host": r["host"], "source": "billing-mcp"}) return [TextContent(type="text", text=json.dumps(summary, indent=2))] # 2. Get target router router_name = arguments.get("router_name") if arguments else None router = get_router(router_name) if not router: return [TextContent(type="text", text=f"āŒ Router '{router_name or 'Default'}' tidak ditemukan.")] # Prepare connection info auth = (router["user"], router["pass"]) base_url = get_base_url(router["host"], router["port"]) actual_name = router["name"] if name == "get_status": try: resource_raw = fetch_data("system/resource", auth, base_url) identity_raw = fetch_data("system/identity", auth, base_url) resource_data = ensure_first_item(resource_raw) identity_data = ensure_first_item(identity_raw) identity_name = identity_data.get("name", "Unknown") data = { "router_name": actual_name, "identity": identity_name, "resource": resource_data } return [TextContent(type="text", text=json.dumps(data, indent=2))] except Exception as e: return [TextContent(type="text", text=f"āŒ Gagal mengambil status router: {str(e)}\n\nTips: Pastikan user memiliki izin untuk '/system/resource' dan '/system/identity'.")] elif name == "get_interfaces": # Parsing arguments running_only = arguments.get("running_only", False) if arguments else False full_data = arguments.get("full_data", False) if arguments else False user_limit = arguments.get("limit", 50) if arguments else 50 if user_limit > 200: user_limit = 200 # Optimizing fetch with server-side filtering and field selection endpoint = "interface/print" payload = {} # 1. Server-side Query/Filter query = [] if running_only: query.append("running=true") if query: payload[".query"] = query # 2. Server-side Field Selection (Proplist) # If full_data is False, request only specific fields to save bandwidth if not full_data: payload[".proplist"] = [".id", "name", "type", "running", "disabled", "comment", "default-name", "actual-mtu", "mac-address"] try: # Use POST for query/proplist support interfaces = post_data(endpoint, payload, auth, base_url) except Exception as e: # Fallback to standard GET if POST fails (compatibility) print(f"POST interface/print failed: {e}. Falling back to GET.") interfaces = fetch_data("interface", auth, base_url) # Apply client-side filtering since server-side failed if running_only: interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True] total_found = len(interfaces) interfaces = interfaces[:user_limit] # 3. Handle Field Filtering (Default: Minimal fields) full_data = arguments.get("full_data", False) if arguments else False # SAVE FULL DATA SNAPSHOT ALWAYS snapshot_path = save_snapshot(actual_name, "interface", interfaces) if not full_data: # Essential fields only to save context/tokens essential_fields = ["name", "type", "running", "disabled", "comment", "default-name"] optimized_interfaces = [] for iface in interfaces: optimized_interfaces.append({k: iface.get(k) for k in essential_fields if k in iface}) interfaces = optimized_interfaces # LIMIT RESPONSE SIZE FOR LLM display_limit = 15 is_truncated = total_found > display_limit preview_interfaces = interfaces[:display_limit] result = { "router": actual_name, "count": total_found, "showing": len(preview_interfaces), "snapshot_file": snapshot_path, "status": "Truncated for readability." if is_truncated else "Full list.", "interfaces": preview_interfaces } msg = json.dumps(result, indent=2) if is_truncated: msg += f"\n\nāš ļø DATA TRUNCATED ({total_found} items). Only showing {display_limit} to save context." msg += f"\nšŸ“‚ Full snapshot: {snapshot_path}" # FINAL SAFETY CAP (30KB) if len(msg) > 30000: msg = msg[:30000] + "\n\n... [TRUNCATED DUE TO EXTREME SIZE] ..." return [TextContent(type="text", text=msg)] elif name == "get_hotspot_active": hotspot = fetch_data("ip/hotspot/active", auth, base_url) total = len(hotspot) if isinstance(hotspot, list) else 0 snapshot_path = save_snapshot(actual_name, "ip_hotspot_active", hotspot) # Summarize for LLM display_limit = 10 preview = hotspot[:display_limit] if isinstance(hotspot, list) else hotspot result = { "router": actual_name, "total_active": total, "snapshot_file": snapshot_path, "sample_users": preview } text_res = json.dumps(result, indent=2) if total > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {total} user aktif.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] elif name == "get_logs": limit = arguments.get("limit", 20) if arguments else 20 logs = fetch_data("log", auth, base_url) snapshot_path = save_snapshot(actual_name, "log", logs) recent_logs = logs[-limit:] if logs else [] summary = { "router": actual_name, "total_logs_available": len(logs) if logs else 0, "snapshot_file": snapshot_path, "recent_entries": recent_logs } msg = json.dumps(summary, indent=2) msg += f"\n\nšŸ“‚ Log lengkap disimpan di: {snapshot_path}" # FINAL SAFETY CAP (30KB) if len(msg) > 30000: msg = msg[:30000] + "\n\n... [TRUNCATED DUE TO EXTREME SIZE] ..." return [TextContent(type="text", text=msg)] elif name == "get_ppp_active": ppp_active = fetch_data("ppp/active", auth, base_url) total = len(ppp_active) if isinstance(ppp_active, list) else 0 snapshot_path = save_snapshot(actual_name, "ppp_active", ppp_active) # Summarize for LLM display_limit = 10 preview = ppp_active[:display_limit] if isinstance(ppp_active, list) else ppp_active result = { "router": actual_name, "total_active_ppp": total, "snapshot_file": snapshot_path, "sample_active": preview } text_res = json.dumps(result, indent=2) if total > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {total} PPP active.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] elif name == "get_ppp_secrets": ppp_secrets = fetch_data("ppp/secret", auth, base_url) total = len(ppp_secrets) if isinstance(ppp_secrets, list) else 0 snapshot_path = save_snapshot(actual_name, "ppp_secret", ppp_secrets) # Summarize for LLM display_limit = 10 preview = ppp_secrets[:display_limit] if isinstance(ppp_secrets, list) else ppp_secrets result = { "router": actual_name, "total_secrets": total, "snapshot_file": snapshot_path, "sample_secrets": preview } text_res = json.dumps(result, indent=2) if total > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {total} PPP secrets.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] elif name == "get_queues": queues = fetch_data("queue/simple", auth, base_url) total = len(queues) if isinstance(queues, list) else 0 snapshot_path = save_snapshot(actual_name, "queue_simple", queues) # Summarize for LLM display_limit = 15 preview = queues[:display_limit] if isinstance(queues, list) else queues result = { "router": actual_name, "total_queues": total, "snapshot_file": snapshot_path, "sample_queues": preview } text_res = json.dumps(result, indent=2) if total > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {total} simple queues.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] elif name == "get_queue_tree": qtree = fetch_data("queue/tree", auth, base_url) total = len(qtree) if isinstance(qtree, list) else 0 snapshot_path = save_snapshot(actual_name, "queue_tree", qtree) # Summarize for LLM display_limit = 15 preview = qtree[:display_limit] if isinstance(qtree, list) else qtree result = { "router": actual_name, "total_queue_tree": total, "snapshot_file": snapshot_path, "sample_queue_tree": preview } text_res = json.dumps(result, indent=2) if total > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {total} entries di queue tree.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] elif name == "refresh_data": new_data = gather_all_info(router["host"], router["port"], router["user"], router["pass"]) if new_data: data_path = get_router_data_path(actual_name) save_data(new_data, data_path) return [TextContent(type="text", text=f"āœ… Data for router '{actual_name}' successfully updated in {data_path}.")] else: return [TextContent(type="text", text=f"āŒ Failed to refresh data for '{actual_name}'.")] elif name == "test_permissions": endpoints = [ "system/resource", "system/identity", "interface", "ip/address", "ip/hotspot/active", "log", "user", "ppp/secret" ] results = {} failures = [] for ep in endpoints: try: fetch_data(ep, auth, base_url) results[ep] = "āœ… OK" except Exception as e: results[ep] = f"āŒ {str(e)}" failures.append(f"{ep}: {str(e)}") troubleshooting = "" if failures: troubleshooting = "\n\nšŸ” ANALISIS & SOLUSI:\n" if any("system/resource" in f for f in failures): troubleshooting += "- Endpoint 'system/resource' gagal. Jika user memiliki akses FULL, coba cek apakah service 'www' (port 80) di router memiliki filter IP atau jika ada firewall yang memblokir request besar.\n" if any("HTTP 403" in f for f in failures): troubleshooting += "- Beberapa menu memerlukan izin spesifik. Walaupun user group-nya 'full', pastikan tidak ada kebijakan 'skin' yang membatasi API.\n" if any("Connection Error" in f or "Timeout" in f for f in failures): troubleshooting += "- Masalah konektivitas. Pastikan IP Router dapat dijangkau dari server ini dan service API MikroTik aktif.\n" summary = { "router": actual_name, "host": router["host"], "results": results, "troubleshooting": troubleshooting } return [TextContent(type="text", text=json.dumps(summary, indent=2) + troubleshooting)] elif name == "add_router": devices = load_devices() new_device = { "name": arguments["name"], "host": arguments["host"], "port": arguments.get("port", 80), "user": arguments["user"], "pass": arguments["pass"] } # Check if name already exists if any(d["name"].lower() == new_device["name"].lower() for d in devices): return [TextContent(type="text", text=f"āŒ Router dengan nama '{new_device['name']}' sudah ada.")] devices.append(new_device) with open(DEVICES_FILE, 'w') as f: json.dump(devices, f, indent=2) return [TextContent(type="text", text=f"āœ… Router '{new_device['name']}' berhasil ditambahkan.")] elif name == "remove_router": devices = load_devices() name_to_remove = arguments["name"].lower() new_devices = [d for d in devices if d["name"].lower() != name_to_remove] if len(new_devices) == len(devices): return [TextContent(type="text", text=f"āŒ Router '{arguments['name']}' tidak ditemukan.")] with open(DEVICES_FILE, 'w') as f: json.dump(new_devices, f, indent=2) return [TextContent(type="text", text=f"āœ… Router '{arguments['name']}' berhasil dihapus.")] elif name == "get_vlans": interfaces = fetch_data("interface", auth, base_url) # Filter VLAN only vlans = [i for i in interfaces if i.get("type") == "vlan"] total = len(vlans) snapshot_path = save_snapshot(actual_name, "interface_vlan", vlans) # Display limit for LLM display_limit = 30 preview = vlans[:display_limit] result = { "router": actual_name, "total_vlan_interfaces": total, "snapshot_file": snapshot_path, "vlans": preview } text_res = json.dumps(result, indent=2) if total > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {total} VLAN interfaces.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] elif name == "get_gateway_interfaces": routes = fetch_data("ip/route", auth, base_url) active_only = arguments.get("active_only", True) if arguments else True # Extract gateway interfaces from immediate-gw field (format: IP%interface) gateway_interfaces = {} for r in routes: immediate_gw = r.get("immediate-gw", "") if "%" in immediate_gw: parts = immediate_gw.split("%") gw_ip = parts[0] iface = parts[1] # Filter active only if active_only and r.get("active") != "true": continue if iface not in gateway_interfaces: gateway_interfaces[iface] = { "interface": iface, "gateway_ips": [], "routes": [] } gateway_interfaces[iface]["gateway_ips"].append(gw_ip) gateway_interfaces[iface]["routes"].append({ "dst": r.get("dst-address"), "gateway": r.get("gateway"), "distance": r.get("distance"), "routing_table": r.get("routing-table"), "active": r.get("active") == "true" }) # Deduplicate gateway IPs for iface in gateway_interfaces: gateway_interfaces[iface]["gateway_ips"] = list(set(gateway_interfaces[iface]["gateway_ips"])) snapshot_path = save_snapshot(actual_name, "gateway_interfaces", list(gateway_interfaces.values())) result = { "router": actual_name, "filter": "active_only" if active_only else "all", "total_gateway_interfaces": len(gateway_interfaces), "snapshot_file": snapshot_path, "gateway_interfaces": list(gateway_interfaces.values()) } return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "get_interface_traffic": # 1. Get List of Interfaces (to know names) interfaces = fetch_data("interface", auth, base_url) # Filter by running only (default true) running_only = arguments.get("running_only", True) if arguments else True if running_only: interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True] # Filter by specific interface name if provided interface_name = arguments.get("interface_name") if arguments else None if interface_name: interfaces = [i for i in interfaces if i.get("name", "").lower() == interface_name.lower()] # If no interfaces found, return early if not interfaces: return [TextContent(type="text", text=f"āš ļø No interfaces found matching criteria.")] # 2. Monitor Traffic (Real-time Stats) target_names = [i.get("name") for i in interfaces] target_str = ",".join(target_names) # Mikrotik monitor-traffic can be heavy if many interfaces. # If > 10 interfaces, we might want to batch or limit? # REST API usually handles handle comma separated list fine. monitor_data = {} try: # Use monitor-traffic command via POST # Payload: interface=name1,name2&once=true payload = { "interface": target_str, "once": "true" } mon_result = post_data("interface/monitor-traffic", payload, auth, base_url) # Map results by name if isinstance(mon_result, list): for m in mon_result: monitor_data[m.get("name")] = m elif isinstance(mon_result, dict) and mon_result.get("name"): monitor_data[mon_result.get("name")] = mon_result except Exception as e: # Fallback or log error # If monitor-traffic fails (e.g. some interface type doesn't support it), # we might get partial data or error. # Let's try to continue with basic stats if available, or just report error. print(f"Monitor traffic error: {e}") # We will just use empty stats if monitor fails def format_rate(bps_val): """Format bits per second to human readable Mbps/Kbps""" try: bps = float(bps_val) if bps >= 1_000_000: return f"{bps / 1_000_000:.2f} Mbps" elif bps >= 1_000: return f"{bps / 1_000:.2f} Kbps" else: return f"{bps:.0f} bps" except (ValueError, TypeError): return "0 bps" # Extract traffic info combining config + monitor data traffic_data = [] for iface in interfaces: name = iface.get("name") mon = monitor_data.get(name, {}) # Get stats from monitor data (preferred) or interface data (fallback) tx_bps = mon.get("tx-bits-per-second", iface.get("tx-bit", 0)) rx_bps = mon.get("rx-bits-per-second", iface.get("rx-bit", 0)) traffic_data.append({ "name": name, "type": iface.get("type"), "tx_rate": format_rate(tx_bps), "rx_rate": format_rate(rx_bps), "tx_rate_raw_bps": tx_bps, "rx_rate_raw_bps": rx_bps, "running": iface.get("running"), "disabled": iface.get("disabled") }) # Sort by tx_rate descending (highest traffic first) traffic_data.sort(key=lambda x: float(x.get("tx_rate_raw_bps", 0) or 0), reverse=True) snapshot_path = save_snapshot(actual_name, "interface_traffic", traffic_data) # Limit display display_limit = 20 preview = traffic_data[:display_limit] result = { "router": actual_name, "filter": "running_only" if running_only else "all", "total_interfaces": len(traffic_data), "snapshot_file": snapshot_path, "interfaces": preview } text_res = json.dumps(result, indent=2) if len(traffic_data) > display_limit: text_res += f"\n\nāš ļø Hanya menampilkan {display_limit} dari {len(traffic_data)} interfaces.\n" text_res += f"šŸ“‚ Data lengkap: {snapshot_path}" return [TextContent(type="text", text=text_res)] else: raise ValueError(f"Unknown tool: {name}") except Exception as e: return [TextContent(type="text", text=f"Error executing tool {name}: {str(e)}")] async def main(): async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())