feat: Add get_interface_traffic tool to retrieve and format router interface traffic, and include a new router in devices.json.

This commit is contained in:
2026-02-03 07:53:20 +08:00
parent 00b61c3ab5
commit 3f6c955310
2 changed files with 95 additions and 0 deletions

View File

@@ -267,6 +267,18 @@ async def handle_list_tools() -> list[Tool]:
"active_only": {"type": "boolean", "description": "Hanya tampilkan gateway yang aktif (default: true)."},
},
},
),
Tool(
name="get_interface_traffic",
description="Mendapatkan traffic rate (tx/rx) interface dalam format Mbps yang mudah dibaca.",
inputSchema={
"type": "object",
"properties": {
"router_name": {"type": "string", "description": "Nama router."},
"interface_name": {"type": "string", "description": "Nama interface spesifik (opsional, jika kosong tampilkan semua yang running)."},
"running_only": {"type": "boolean", "description": "Hanya tampilkan interface yang aktif (default: true)."},
},
},
)
]
@@ -658,6 +670,82 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_interface_traffic":
interfaces = fetch_data("interface", auth, base_url)
# Filter by running only (default true)
running_only = arguments.get("running_only", True) if arguments else True
if running_only:
interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True]
# Filter by specific interface name if provided
interface_name = arguments.get("interface_name") if arguments else None
if interface_name:
interfaces = [i for i in interfaces if i.get("name", "").lower() == interface_name.lower()]
def bytes_to_mbps(bytes_val):
"""Convert bytes to Mbps (bits per second / 1,000,000)"""
try:
return round(float(bytes_val) * 8 / 1_000_000, 2)
except (ValueError, TypeError):
return 0.0
def format_rate(bps_val):
"""Format bits per second to human readable Mbps/Kbps"""
try:
bps = float(bps_val)
if bps >= 1_000_000:
return f"{bps / 1_000_000:.2f} Mbps"
elif bps >= 1_000:
return f"{bps / 1_000:.2f} Kbps"
else:
return f"{bps:.0f} bps"
except (ValueError, TypeError):
return "0 bps"
# Extract traffic info
traffic_data = []
for iface in interfaces:
# MikroTik provides tx-bits-per-second and rx-bits-per-second for real-time rate
# For total traffic, it provides tx-byte and rx-byte
tx_bps = iface.get("tx-bits-per-second", iface.get("tx-bit", 0))
rx_bps = iface.get("rx-bits-per-second", iface.get("rx-bit", 0))
traffic_data.append({
"name": iface.get("name"),
"type": iface.get("type"),
"tx_rate": format_rate(tx_bps),
"rx_rate": format_rate(rx_bps),
"tx_rate_raw_bps": tx_bps,
"rx_rate_raw_bps": rx_bps,
"running": iface.get("running"),
"disabled": iface.get("disabled")
})
# Sort by tx_rate descending (highest traffic first)
traffic_data.sort(key=lambda x: float(x.get("tx_rate_raw_bps", 0) or 0), reverse=True)
snapshot_path = save_snapshot(actual_name, "interface_traffic", traffic_data)
# Limit display
display_limit = 20
preview = traffic_data[:display_limit]
result = {
"router": actual_name,
"filter": "running_only" if running_only else "all",
"total_interfaces": len(traffic_data),
"snapshot_file": snapshot_path,
"interfaces": preview
}
text_res = json.dumps(result, indent=2)
if len(traffic_data) > display_limit:
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {len(traffic_data)} interfaces.\n"
text_res += f"📂 Data lengkap: {snapshot_path}"
return [TextContent(type="text", text=text_res)]
else:
raise ValueError(f"Unknown tool: {name}")

View File

@@ -5,5 +5,12 @@
"port": 80,
"user": "admin11",
"pass": "Dua75316__"
},
{
"name": "Router-Batuaji",
"host": "10.8.0.15",
"port": 80,
"user": "admin11",
"pass": "Dua75316__"
}
]