Files
Mikrotik-MCP/app/mcp_server.py

837 lines
35 KiB
Python

#!/usr/bin/env python3
import os
import json
import asyncio
from datetime import datetime
from typing import Any, Dict, List, Optional
from mcp.server.stdio import stdio_server
from mcp.server import Server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from dotenv import load_dotenv
# Import our helper from update_data
from update_data import gather_all_info, fetch_data, post_data, save_data, get_base_url
load_dotenv()
# Configuration
DEVICES_FILE = "/home/wartana/myApp/mikrotik-mcp/devices.json"
BILLING_CONFIG_FILE = "/home/wartana/myApp/billing-mcp/config.json"
DATA_DIR = "/home/wartana/myApp/mikrotik-mcp/data"
SNAPSHOTS_DIR = os.path.join(DATA_DIR, "snapshots")
# Create the MCP server
server = Server("mikrotik")
def load_devices() -> List[Dict]:
"""Load list of routers from devices.json"""
try:
if os.path.exists(DEVICES_FILE):
with open(DEVICES_FILE, 'r') as f:
return json.load(f)
return []
except Exception as e:
print(f"Error loading devices: {e}")
return []
def load_billing_routers() -> List[Dict]:
"""Load routers from billing-mcp config.json as fallback"""
try:
if os.path.exists(BILLING_CONFIG_FILE):
with open(BILLING_CONFIG_FILE, 'r') as f:
config = json.load(f)
routers = []
for isp_key, isp_data in config.get("isps", {}).items():
for router_name, router_config in isp_data.get("routers", {}).items():
routers.append({
"name": router_name,
"host": router_config.get("host"),
"port": router_config.get("port", 80),
"user": router_config.get("user"),
"pass": router_config.get("pass"),
"isp": isp_key
})
return routers
return []
except Exception as e:
print(f"Error loading billing config: {e}")
return []
def get_router(router_name: str | None) -> Dict | None:
"""Get router config by name, with fallback to billing config"""
devices = load_devices()
# Try devices.json first
if devices:
if not router_name:
return devices[0]
for d in devices:
if d.get("name").lower() == router_name.lower():
return d
# Fallback to billing config
billing_routers = load_billing_routers()
if billing_routers:
if not router_name and not devices:
return billing_routers[0]
for r in billing_routers:
if r.get("name").lower() == router_name.lower():
return r
return None
def get_router_data_path(router_name: str) -> str:
"""Get path to the snapshot data file for a specific router"""
safe_name = router_name.replace(" ", "_").lower()
return os.path.join(DATA_DIR, f"{safe_name}.json")
def save_snapshot(router_name: str, endpoint: str, data: Any) -> str:
"""Save raw data to a timestamped snapshot file and return the path"""
os.makedirs(SNAPSHOTS_DIR, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_router = router_name.replace(" ", "_").lower()
endpoint_part = endpoint.replace("/", "_")
filename = f"{safe_router}_{endpoint_part}_{timestamp}.json"
filepath = os.path.join(SNAPSHOTS_DIR, filename)
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
return filepath
def ensure_first_item(data: Any) -> Dict:
"""Helper to ensure we get a dictionary, even if MikroTik returns a list or a single object."""
if isinstance(data, list):
return data[0] if len(data) > 0 else {}
if isinstance(data, dict):
return data
return {}
@server.list_tools()
async def handle_list_tools() -> list[Tool]:
"""List available MikroTik tools."""
return [
Tool(
name="list_routers",
description="Melihat daftar router MikroTik yang terdaftar di konfigurasi.",
inputSchema={"type": "object", "properties": {}},
),
Tool(
name="get_status",
description="Mendapatkan informasi dasar router (CPU, Memori, Uptime).",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router (lihat list_routers). Opsional, default ke router pertama."},
},
},
),
Tool(
name="get_interfaces",
description="Mendapatkan daftar interface network dan status traffic.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
"running_only": {"type": "boolean", "description": "Hanya tampilkan interface yang aktif."},
"full_data": {"type": "boolean", "description": "Tampilkan semua field data (Hati-hati: bisa menyebabkan error context length jika data sangat besar)."},
"limit": {"type": "integer", "description": "Batas jumlah interface yang ditampilkan (Default 50, Max 200)."},
},
},
),
Tool(
name="get_hotspot_active",
description="Mendapatkan daftar pengguna hotspot yang sedang aktif.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="get_logs",
description="Mendapatkan baris log terakhir dari router.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
"limit": {"type": "integer", "description": "Jumlah log (default 20)."},
},
},
),
Tool(
name="refresh_data",
description="Sinkronisasi data terbaru langsung dari router ke snapshot file.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="test_permissions",
description="Mengecek izin akses user MikroTik ke berbagai menu API (Diagnostik).",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="add_router",
description="Menambahkan router MikroTik baru ke dalam daftar konfigurasi.",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Nama unik untuk router (misal: Batuaji)."},
"host": {"type": "string", "description": "IP atau Hostname router."},
"port": {"type": "integer", "description": "Port REST API (default 80)."},
"user": {"type": "string", "description": "Username admin."},
"pass": {"type": "string", "description": "Password admin."},
},
"required": ["name", "host", "user", "pass"],
},
),
Tool(
name="get_ppp_active",
description="Mendapatkan daftar sesi PPP (PPPoE/L2TP/VPN) yang saat ini aktif.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="get_ppp_secrets",
description="Mendapatkan daftar user/secrets yang terdaftar di layanan PPP.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="get_queues",
description="Mendapatkan daftar Simple Queues untuk melihat limitasi bandwidth user.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="get_queue_tree",
description="Mendapatkan daftar Queue Tree untuk melihat manajemen bandwidth yang lebih kompleks.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="remove_router",
description="Menghapus router dari daftar konfigurasi.",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Nama router yang ingin dihapus."},
},
"required": ["name"],
},
),
Tool(
name="get_vlans",
description="Mendapatkan daftar interface VLAN (filter by type=vlan).",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
},
},
),
Tool(
name="get_gateway_interfaces",
description="Mendapatkan daftar interface yang berfungsi sebagai gateway (dari routing table).",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
"active_only": {"type": "boolean", "description": "Hanya tampilkan gateway yang aktif (default: true)."},
},
},
),
Tool(
name="get_interface_traffic",
description="Mendapatkan traffic rate (tx/rx) interface dalam format Mbps yang mudah dibaca.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
"interface_name": {"type": "string", "description": "Nama interface spesifik (opsional, jika kosong tampilkan semua yang running)."},
"running_only": {"type": "boolean", "description": "Hanya tampilkan interface yang aktif (default: true)."},
},
},
)
]
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict | None) -> list[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool execution requests."""
try:
# 1. Handle list_routers (no router connection needed)
if name == "list_routers":
devices = load_devices()
billing_routers = load_billing_routers()
summary = []
seen_names = set()
# Add devices from devices.json
for d in devices:
summary.append({"name": d["name"], "host": d["host"], "source": "devices.json"})
seen_names.add(d["name"].lower())
# Add devices from billing config if not duplicate
for r in billing_routers:
if r["name"].lower() not in seen_names:
summary.append({"name": r["name"], "host": r["host"], "source": "billing-mcp"})
return [TextContent(type="text", text=json.dumps(summary, indent=2))]
# 2. Get target router
router_name = arguments.get("router_name") if arguments else None
router = get_router(router_name)
if not router:
return [TextContent(type="text", text=f"❌ Router '{router_name or 'Default'}' tidak ditemukan.")]
# Prepare connection info
auth = (router["user"], router["pass"])
base_url = get_base_url(router["host"], router["port"])
actual_name = router["name"]
if name == "get_status":
try:
resource_raw = fetch_data("system/resource", auth, base_url)
identity_raw = fetch_data("system/identity", auth, base_url)
resource_data = ensure_first_item(resource_raw)
identity_data = ensure_first_item(identity_raw)
identity_name = identity_data.get("name", "Unknown")
data = {
"router_name": actual_name,
"identity": identity_name,
"resource": resource_data
}
return [TextContent(type="text", text=json.dumps(data, indent=2))]
except Exception as e:
return [TextContent(type="text", text=f"❌ Gagal mengambil status router: {str(e)}\n\nTips: Pastikan user memiliki izin untuk '/system/resource' dan '/system/identity'.")]
elif name == "get_interfaces":
# Parsing arguments
running_only = arguments.get("running_only", False) if arguments else False
full_data = arguments.get("full_data", False) if arguments else False
user_limit = arguments.get("limit", 50) if arguments else 50
if user_limit > 200: user_limit = 200
# Optimizing fetch with server-side filtering and field selection
endpoint = "interface/print"
payload = {}
# 1. Server-side Query/Filter
query = []
if running_only:
query.append("running=true")
if query:
payload[".query"] = query
# 2. Server-side Field Selection (Proplist)
# If full_data is False, request only specific fields to save bandwidth
if not full_data:
payload[".proplist"] = [".id", "name", "type", "running", "disabled", "comment", "default-name", "actual-mtu", "mac-address"]
try:
# Use POST for query/proplist support
interfaces = post_data(endpoint, payload, auth, base_url)
except Exception as e:
# Fallback to standard GET if POST fails (compatibility)
print(f"POST interface/print failed: {e}. Falling back to GET.")
interfaces = fetch_data("interface", auth, base_url)
# Apply client-side filtering since server-side failed
if running_only:
interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True]
total_found = len(interfaces)
interfaces = interfaces[:user_limit]
# 3. Handle Field Filtering (Default: Minimal fields)
full_data = arguments.get("full_data", False) if arguments else False
# SAVE FULL DATA SNAPSHOT ALWAYS
snapshot_path = save_snapshot(actual_name, "interface", interfaces)
if not full_data:
# Essential fields only to save context/tokens
essential_fields = ["name", "type", "running", "disabled", "comment", "default-name"]
optimized_interfaces = []
for iface in interfaces:
optimized_interfaces.append({k: iface.get(k) for k in essential_fields if k in iface})
interfaces = optimized_interfaces
# LIMIT RESPONSE SIZE FOR LLM
display_limit = 15
is_truncated = total_found > display_limit
preview_interfaces = interfaces[:display_limit]
result = {
"router": actual_name,
"count": total_found,
"showing": len(preview_interfaces),
"snapshot_file": snapshot_path,
"status": "Truncated for readability." if is_truncated else "Full list.",
"interfaces": preview_interfaces
}
msg = json.dumps(result, indent=2)
if is_truncated:
msg += f"\n\n⚠️ DATA TRUNCATED ({total_found} items). Only showing {display_limit} to save context."
msg += f"\n📂 Full snapshot: {snapshot_path}"
# FINAL SAFETY CAP (30KB)
if len(msg) > 30000:
msg = msg[:30000] + "\n\n... [TRUNCATED DUE TO EXTREME SIZE] ..."
return [TextContent(type="text", text=msg)]
elif name == "get_hotspot_active":
hotspot = fetch_data("ip/hotspot/active", auth, base_url)
total = len(hotspot) if isinstance(hotspot, list) else 0
snapshot_path = save_snapshot(actual_name, "ip_hotspot_active", hotspot)
# Summarize for LLM
display_limit = 10
preview = hotspot[:display_limit] if isinstance(hotspot, list) else hotspot
result = {
"router": actual_name,
"total_active": total,
"snapshot_file": snapshot_path,
"sample_users": preview
}
text_res = json.dumps(result, indent=2)
if total > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} user aktif.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
elif name == "get_logs":
limit = arguments.get("limit", 20) if arguments else 20
logs = fetch_data("log", auth, base_url)
snapshot_path = save_snapshot(actual_name, "log", logs)
recent_logs = logs[-limit:] if logs else []
summary = {
"router": actual_name,
"total_logs_available": len(logs) if logs else 0,
"snapshot_file": snapshot_path,
"recent_entries": recent_logs
}
msg = json.dumps(summary, indent=2)
msg += f"\n\n📂 Log lengkap disimpan di: {snapshot_path}"
# FINAL SAFETY CAP (30KB)
if len(msg) > 30000:
msg = msg[:30000] + "\n\n... [TRUNCATED DUE TO EXTREME SIZE] ..."
return [TextContent(type="text", text=msg)]
elif name == "get_ppp_active":
ppp_active = fetch_data("ppp/active", auth, base_url)
total = len(ppp_active) if isinstance(ppp_active, list) else 0
snapshot_path = save_snapshot(actual_name, "ppp_active", ppp_active)
# Summarize for LLM
display_limit = 10
preview = ppp_active[:display_limit] if isinstance(ppp_active, list) else ppp_active
result = {
"router": actual_name,
"total_active_ppp": total,
"snapshot_file": snapshot_path,
"sample_active": preview
}
text_res = json.dumps(result, indent=2)
if total > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} PPP active.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
elif name == "get_ppp_secrets":
ppp_secrets = fetch_data("ppp/secret", auth, base_url)
total = len(ppp_secrets) if isinstance(ppp_secrets, list) else 0
snapshot_path = save_snapshot(actual_name, "ppp_secret", ppp_secrets)
# Summarize for LLM
display_limit = 10
preview = ppp_secrets[:display_limit] if isinstance(ppp_secrets, list) else ppp_secrets
result = {
"router": actual_name,
"total_secrets": total,
"snapshot_file": snapshot_path,
"sample_secrets": preview
}
text_res = json.dumps(result, indent=2)
if total > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} PPP secrets.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
elif name == "get_queues":
queues = fetch_data("queue/simple", auth, base_url)
total = len(queues) if isinstance(queues, list) else 0
snapshot_path = save_snapshot(actual_name, "queue_simple", queues)
# Summarize for LLM
display_limit = 15
preview = queues[:display_limit] if isinstance(queues, list) else queues
result = {
"router": actual_name,
"total_queues": total,
"snapshot_file": snapshot_path,
"sample_queues": preview
}
text_res = json.dumps(result, indent=2)
if total > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} simple queues.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
elif name == "get_queue_tree":
qtree = fetch_data("queue/tree", auth, base_url)
total = len(qtree) if isinstance(qtree, list) else 0
snapshot_path = save_snapshot(actual_name, "queue_tree", qtree)
# Summarize for LLM
display_limit = 15
preview = qtree[:display_limit] if isinstance(qtree, list) else qtree
result = {
"router": actual_name,
"total_queue_tree": total,
"snapshot_file": snapshot_path,
"sample_queue_tree": preview
}
text_res = json.dumps(result, indent=2)
if total > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} entries di queue tree.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
elif name == "refresh_data":
new_data = gather_all_info(router["host"], router["port"], router["user"], router["pass"])
if new_data:
data_path = get_router_data_path(actual_name)
save_data(new_data, data_path)
return [TextContent(type="text", text=f"✅ Data for router '{actual_name}' successfully updated in {data_path}.")]
else:
return [TextContent(type="text", text=f"❌ Failed to refresh data for '{actual_name}'.")]
elif name == "test_permissions":
endpoints = [
"system/resource", "system/identity", "interface",
"ip/address", "ip/hotspot/active", "log", "user", "ppp/secret"
]
results = {}
failures = []
for ep in endpoints:
try:
fetch_data(ep, auth, base_url)
results[ep] = "✅ OK"
except Exception as e:
results[ep] = f"{str(e)}"
failures.append(f"{ep}: {str(e)}")
troubleshooting = ""
if failures:
troubleshooting = "\n\n🔍 ANALISIS & SOLUSI:\n"
if any("system/resource" in f for f in failures):
troubleshooting += "- Endpoint 'system/resource' gagal. Jika user memiliki akses FULL, coba cek apakah service 'www' (port 80) di router memiliki filter IP atau jika ada firewall yang memblokir request besar.\n"
if any("HTTP 403" in f for f in failures):
troubleshooting += "- Beberapa menu memerlukan izin spesifik. Walaupun user group-nya 'full', pastikan tidak ada kebijakan 'skin' yang membatasi API.\n"
if any("Connection Error" in f or "Timeout" in f for f in failures):
troubleshooting += "- Masalah konektivitas. Pastikan IP Router dapat dijangkau dari server ini dan service API MikroTik aktif.\n"
summary = {
"router": actual_name,
"host": router["host"],
"results": results,
"troubleshooting": troubleshooting
}
return [TextContent(type="text", text=json.dumps(summary, indent=2) + troubleshooting)]
elif name == "add_router":
devices = load_devices()
new_device = {
"name": arguments["name"],
"host": arguments["host"],
"port": arguments.get("port", 80),
"user": arguments["user"],
"pass": arguments["pass"]
}
# Check if name already exists
if any(d["name"].lower() == new_device["name"].lower() for d in devices):
return [TextContent(type="text", text=f"❌ Router dengan nama '{new_device['name']}' sudah ada.")]
devices.append(new_device)
with open(DEVICES_FILE, 'w') as f:
json.dump(devices, f, indent=2)
return [TextContent(type="text", text=f"✅ Router '{new_device['name']}' berhasil ditambahkan.")]
elif name == "remove_router":
devices = load_devices()
name_to_remove = arguments["name"].lower()
new_devices = [d for d in devices if d["name"].lower() != name_to_remove]
if len(new_devices) == len(devices):
return [TextContent(type="text", text=f"❌ Router '{arguments['name']}' tidak ditemukan.")]
with open(DEVICES_FILE, 'w') as f:
json.dump(new_devices, f, indent=2)
return [TextContent(type="text", text=f"✅ Router '{arguments['name']}' berhasil dihapus.")]
elif name == "get_vlans":
interfaces = fetch_data("interface", auth, base_url)
# Filter VLAN only
vlans = [i for i in interfaces if i.get("type") == "vlan"]
total = len(vlans)
snapshot_path = save_snapshot(actual_name, "interface_vlan", vlans)
# Display limit for LLM
display_limit = 30
preview = vlans[:display_limit]
result = {
"router": actual_name,
"total_vlan_interfaces": total,
"snapshot_file": snapshot_path,
"vlans": preview
}
text_res = json.dumps(result, indent=2)
if total > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} VLAN interfaces.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
elif name == "get_gateway_interfaces":
routes = fetch_data("ip/route", auth, base_url)
active_only = arguments.get("active_only", True) if arguments else True
# Extract gateway interfaces from immediate-gw field (format: IP%interface)
gateway_interfaces = {}
for r in routes:
immediate_gw = r.get("immediate-gw", "")
if "%" in immediate_gw:
parts = immediate_gw.split("%")
gw_ip = parts[0]
iface = parts[1]
# Filter active only
if active_only and r.get("active") != "true":
continue
if iface not in gateway_interfaces:
gateway_interfaces[iface] = {
"interface": iface,
"gateway_ips": [],
"routes": []
}
gateway_interfaces[iface]["gateway_ips"].append(gw_ip)
gateway_interfaces[iface]["routes"].append({
"dst": r.get("dst-address"),
"gateway": r.get("gateway"),
"distance": r.get("distance"),
"routing_table": r.get("routing-table"),
"active": r.get("active") == "true"
})
# Deduplicate gateway IPs
for iface in gateway_interfaces:
gateway_interfaces[iface]["gateway_ips"] = list(set(gateway_interfaces[iface]["gateway_ips"]))
snapshot_path = save_snapshot(actual_name, "gateway_interfaces", list(gateway_interfaces.values()))
result = {
"router": actual_name,
"filter": "active_only" if active_only else "all",
"total_gateway_interfaces": len(gateway_interfaces),
"snapshot_file": snapshot_path,
"gateway_interfaces": list(gateway_interfaces.values())
}
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_interface_traffic":
# 1. Get List of Interfaces (to know names)
interfaces = fetch_data("interface", auth, base_url)
# Filter by running only (default true)
running_only = arguments.get("running_only", True) if arguments else True
if running_only:
interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True]
# Filter by specific interface name if provided
interface_name = arguments.get("interface_name") if arguments else None
if interface_name:
interfaces = [i for i in interfaces if i.get("name", "").lower() == interface_name.lower()]
# If no interfaces found, return early
if not interfaces:
return [TextContent(type="text", text=f"⚠️ No interfaces found matching criteria.")]
# 2. Monitor Traffic (Real-time Stats)
target_names = [i.get("name") for i in interfaces]
target_str = ",".join(target_names)
# Mikrotik monitor-traffic can be heavy if many interfaces.
# If > 10 interfaces, we might want to batch or limit?
# REST API usually handles handle comma separated list fine.
monitor_data = {}
try:
# Use monitor-traffic command via POST
# Payload: interface=name1,name2&once=true
payload = {
"interface": target_str,
"once": "true"
}
mon_result = post_data("interface/monitor-traffic", payload, auth, base_url)
# Map results by name
if isinstance(mon_result, list):
for m in mon_result:
monitor_data[m.get("name")] = m
elif isinstance(mon_result, dict) and mon_result.get("name"):
monitor_data[mon_result.get("name")] = mon_result
except Exception as e:
# Fallback or log error
# If monitor-traffic fails (e.g. some interface type doesn't support it),
# we might get partial data or error.
# Let's try to continue with basic stats if available, or just report error.
print(f"Monitor traffic error: {e}")
# We will just use empty stats if monitor fails
def format_rate(bps_val):
"""Format bits per second to human readable Mbps/Kbps"""
try:
bps = float(bps_val)
if bps >= 1_000_000:
return f"{bps / 1_000_000:.2f} Mbps"
elif bps >= 1_000:
return f"{bps / 1_000:.2f} Kbps"
else:
return f"{bps:.0f} bps"
except (ValueError, TypeError):
return "0 bps"
# Extract traffic info combining config + monitor data
traffic_data = []
for iface in interfaces:
name = iface.get("name")
mon = monitor_data.get(name, {})
# Get stats from monitor data (preferred) or interface data (fallback)
tx_bps = mon.get("tx-bits-per-second", iface.get("tx-bit", 0))
rx_bps = mon.get("rx-bits-per-second", iface.get("rx-bit", 0))
traffic_data.append({
"name": name,
"type": iface.get("type"),
"tx_rate": format_rate(tx_bps),
"rx_rate": format_rate(rx_bps),
"tx_rate_raw_bps": tx_bps,
"rx_rate_raw_bps": rx_bps,
"running": iface.get("running"),
"disabled": iface.get("disabled")
})
# Sort by tx_rate descending (highest traffic first)
traffic_data.sort(key=lambda x: float(x.get("tx_rate_raw_bps", 0) or 0), reverse=True)
snapshot_path = save_snapshot(actual_name, "interface_traffic", traffic_data)
# Limit display
display_limit = 20
preview = traffic_data[:display_limit]
result = {
"router": actual_name,
"filter": "running_only" if running_only else "all",
"total_interfaces": len(traffic_data),
"snapshot_file": snapshot_path,
"interfaces": preview
}
text_res = json.dumps(result, indent=2)
if len(traffic_data) > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {len(traffic_data)} interfaces.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
else:
raise ValueError(f"Unknown tool: {name}")
except Exception as e:
return [TextContent(type="text", text=f"Error executing tool {name}: {str(e)}")]
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())