diff --git a/app/__pycache__/update_data.cpython-311.pyc b/app/__pycache__/update_data.cpython-311.pyc index 405d7de..d3d7692 100644 Binary files a/app/__pycache__/update_data.cpython-311.pyc and b/app/__pycache__/update_data.cpython-311.pyc differ diff --git a/app/mcp_server.py b/app/mcp_server.py index 58ef030..4b4cc32 100644 --- a/app/mcp_server.py +++ b/app/mcp_server.py @@ -10,7 +10,7 @@ from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from dotenv import load_dotenv # Import our helper from update_data -from update_data import gather_all_info, fetch_data, save_data, get_base_url +from update_data import gather_all_info, fetch_data, post_data, save_data, get_base_url load_dotenv() @@ -290,7 +290,21 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten # 1. Handle list_routers (no router connection needed) if name == "list_routers": devices = load_devices() - summary = [{"name": d["name"], "host": d["host"]} for d in devices] + billing_routers = load_billing_routers() + + summary = [] + seen_names = set() + + # Add devices from devices.json + for d in devices: + summary.append({"name": d["name"], "host": d["host"], "source": "devices.json"}) + seen_names.add(d["name"].lower()) + + # Add devices from billing config if not duplicate + for r in billing_routers: + if r["name"].lower() not in seen_names: + summary.append({"name": r["name"], "host": r["host"], "source": "billing-mcp"}) + return [TextContent(type="text", text=json.dumps(summary, indent=2))] # 2. Get target router @@ -325,18 +339,42 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten return [TextContent(type="text", text=f"❌ Gagal mengambil status router: {str(e)}\n\nTips: Pastikan user memiliki izin untuk '/system/resource' dan '/system/identity'.")] elif name == "get_interfaces": - interfaces = fetch_data("interface", auth, base_url) + # Parsing arguments + running_only = arguments.get("running_only", False) if arguments else False + full_data = arguments.get("full_data", False) if arguments else False + user_limit = arguments.get("limit", 50) if arguments else 50 + if user_limit > 200: user_limit = 200 + + # Optimizing fetch with server-side filtering and field selection + endpoint = "interface/print" + payload = {} - # 1. Handle Running Only filter - if arguments and arguments.get("running_only"): - interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True] + # 1. Server-side Query/Filter + query = [] + if running_only: + query.append("running=true") - # 2. Handle Limit - limit = arguments.get("limit", 50) if arguments else 50 - if limit > 200: limit = 200 # Cap at 200 + if query: + payload[".query"] = query + + # 2. Server-side Field Selection (Proplist) + # If full_data is False, request only specific fields to save bandwidth + if not full_data: + payload[".proplist"] = [".id", "name", "type", "running", "disabled", "comment", "default-name", "actual-mtu", "mac-address"] + try: + # Use POST for query/proplist support + interfaces = post_data(endpoint, payload, auth, base_url) + except Exception as e: + # Fallback to standard GET if POST fails (compatibility) + print(f"POST interface/print failed: {e}. Falling back to GET.") + interfaces = fetch_data("interface", auth, base_url) + # Apply client-side filtering since server-side failed + if running_only: + interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True] + total_found = len(interfaces) - interfaces = interfaces[:limit] + interfaces = interfaces[:user_limit] # 3. Handle Field Filtering (Default: Minimal fields) full_data = arguments.get("full_data", False) if arguments else False @@ -671,6 +709,7 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "get_interface_traffic": + # 1. Get List of Interfaces (to know names) interfaces = fetch_data("interface", auth, base_url) # Filter by running only (default true) @@ -683,12 +722,43 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten if interface_name: interfaces = [i for i in interfaces if i.get("name", "").lower() == interface_name.lower()] - def bytes_to_mbps(bytes_val): - """Convert bytes to Mbps (bits per second / 1,000,000)""" - try: - return round(float(bytes_val) * 8 / 1_000_000, 2) - except (ValueError, TypeError): - return 0.0 + # If no interfaces found, return early + if not interfaces: + return [TextContent(type="text", text=f"⚠️ No interfaces found matching criteria.")] + + # 2. Monitor Traffic (Real-time Stats) + target_names = [i.get("name") for i in interfaces] + target_str = ",".join(target_names) + + # Mikrotik monitor-traffic can be heavy if many interfaces. + # If > 10 interfaces, we might want to batch or limit? + # REST API usually handles handle comma separated list fine. + + monitor_data = {} + try: + # Use monitor-traffic command via POST + # Payload: interface=name1,name2&once=true + payload = { + "interface": target_str, + "once": "true" + } + + mon_result = post_data("interface/monitor-traffic", payload, auth, base_url) + + # Map results by name + if isinstance(mon_result, list): + for m in mon_result: + monitor_data[m.get("name")] = m + elif isinstance(mon_result, dict) and mon_result.get("name"): + monitor_data[mon_result.get("name")] = mon_result + + except Exception as e: + # Fallback or log error + # If monitor-traffic fails (e.g. some interface type doesn't support it), + # we might get partial data or error. + # Let's try to continue with basic stats if available, or just report error. + print(f"Monitor traffic error: {e}") + # We will just use empty stats if monitor fails def format_rate(bps_val): """Format bits per second to human readable Mbps/Kbps""" @@ -703,16 +773,18 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten except (ValueError, TypeError): return "0 bps" - # Extract traffic info + # Extract traffic info combining config + monitor data traffic_data = [] for iface in interfaces: - # MikroTik provides tx-bits-per-second and rx-bits-per-second for real-time rate - # For total traffic, it provides tx-byte and rx-byte - tx_bps = iface.get("tx-bits-per-second", iface.get("tx-bit", 0)) - rx_bps = iface.get("rx-bits-per-second", iface.get("rx-bit", 0)) + name = iface.get("name") + mon = monitor_data.get(name, {}) + + # Get stats from monitor data (preferred) or interface data (fallback) + tx_bps = mon.get("tx-bits-per-second", iface.get("tx-bit", 0)) + rx_bps = mon.get("rx-bits-per-second", iface.get("rx-bit", 0)) traffic_data.append({ - "name": iface.get("name"), + "name": name, "type": iface.get("type"), "tx_rate": format_rate(tx_bps), "rx_rate": format_rate(rx_bps), diff --git a/app/update_data.py b/app/update_data.py index a520c29..4d8a660 100644 --- a/app/update_data.py +++ b/app/update_data.py @@ -24,7 +24,15 @@ def get_base_url(host, port): return f"{protocol}://{host}:{port}/rest" def fetch_data(endpoint, auth=None, base_url=None): - """Fetch data from MikroTik REST API with provided credentials""" + """Fetch data from MikroTik REST API with provided credentials (GET)""" + return _send_request("GET", endpoint, auth, base_url) + +def post_data(endpoint, payload, auth=None, base_url=None): + """Send POST request to MikroTik REST API""" + return _send_request("POST", endpoint, auth, base_url, json_data=payload) + +def _send_request(method, endpoint, auth=None, base_url=None, json_data=None): + """Internal helper for HTTP requests""" if auth is None: auth = (DEFAULT_MIKROTIK_USER, DEFAULT_MIKROTIK_PASSWORD) if base_url is None: @@ -37,10 +45,10 @@ def fetch_data(endpoint, auth=None, base_url=None): } try: - response = requests.get(url, auth=auth, headers=headers, timeout=12) - - # Log basic info for debugging if needed (uncomment if very stuck) - # print(f"DEBUG: GET {url} -> Status {response.status_code}") + if method == "GET": + response = requests.get(url, auth=auth, headers=headers, timeout=12) + else: + response = requests.post(url, auth=auth, headers=headers, json=json_data, timeout=12) if response.status_code != 200: status_code = response.status_code @@ -64,6 +72,9 @@ def fetch_data(endpoint, auth=None, base_url=None): raise Exception(error_msg) if not response.text or response.text.strip() == "": + # POST requests might return empty body if successful but no content? + # Usually monitor-traffic returns data. + if method == "POST": return {} raise Exception("Empty Response from Router") return response.json()