Feat: Implement live traffic monitoring via monitor-traffic command and optimize interface fetching

This commit is contained in:
2026-02-03 22:41:07 +08:00
parent 3f6c955310
commit 6b6e56a082
3 changed files with 110 additions and 27 deletions

View File

@@ -10,7 +10,7 @@ from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from dotenv import load_dotenv
# Import our helper from update_data
from update_data import gather_all_info, fetch_data, save_data, get_base_url
from update_data import gather_all_info, fetch_data, post_data, save_data, get_base_url
load_dotenv()
@@ -290,7 +290,21 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten
# 1. Handle list_routers (no router connection needed)
if name == "list_routers":
devices = load_devices()
summary = [{"name": d["name"], "host": d["host"]} for d in devices]
billing_routers = load_billing_routers()
summary = []
seen_names = set()
# Add devices from devices.json
for d in devices:
summary.append({"name": d["name"], "host": d["host"], "source": "devices.json"})
seen_names.add(d["name"].lower())
# Add devices from billing config if not duplicate
for r in billing_routers:
if r["name"].lower() not in seen_names:
summary.append({"name": r["name"], "host": r["host"], "source": "billing-mcp"})
return [TextContent(type="text", text=json.dumps(summary, indent=2))]
# 2. Get target router
@@ -325,18 +339,42 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten
return [TextContent(type="text", text=f"❌ Gagal mengambil status router: {str(e)}\n\nTips: Pastikan user memiliki izin untuk '/system/resource' dan '/system/identity'.")]
elif name == "get_interfaces":
interfaces = fetch_data("interface", auth, base_url)
# Parsing arguments
running_only = arguments.get("running_only", False) if arguments else False
full_data = arguments.get("full_data", False) if arguments else False
user_limit = arguments.get("limit", 50) if arguments else 50
if user_limit > 200: user_limit = 200
# Optimizing fetch with server-side filtering and field selection
endpoint = "interface/print"
payload = {}
# 1. Handle Running Only filter
if arguments and arguments.get("running_only"):
interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True]
# 1. Server-side Query/Filter
query = []
if running_only:
query.append("running=true")
# 2. Handle Limit
limit = arguments.get("limit", 50) if arguments else 50
if limit > 200: limit = 200 # Cap at 200
if query:
payload[".query"] = query
# 2. Server-side Field Selection (Proplist)
# If full_data is False, request only specific fields to save bandwidth
if not full_data:
payload[".proplist"] = [".id", "name", "type", "running", "disabled", "comment", "default-name", "actual-mtu", "mac-address"]
try:
# Use POST for query/proplist support
interfaces = post_data(endpoint, payload, auth, base_url)
except Exception as e:
# Fallback to standard GET if POST fails (compatibility)
print(f"POST interface/print failed: {e}. Falling back to GET.")
interfaces = fetch_data("interface", auth, base_url)
# Apply client-side filtering since server-side failed
if running_only:
interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True]
total_found = len(interfaces)
interfaces = interfaces[:limit]
interfaces = interfaces[:user_limit]
# 3. Handle Field Filtering (Default: Minimal fields)
full_data = arguments.get("full_data", False) if arguments else False
@@ -671,6 +709,7 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_interface_traffic":
# 1. Get List of Interfaces (to know names)
interfaces = fetch_data("interface", auth, base_url)
# Filter by running only (default true)
@@ -683,12 +722,43 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten
if interface_name:
interfaces = [i for i in interfaces if i.get("name", "").lower() == interface_name.lower()]
def bytes_to_mbps(bytes_val):
"""Convert bytes to Mbps (bits per second / 1,000,000)"""
try:
return round(float(bytes_val) * 8 / 1_000_000, 2)
except (ValueError, TypeError):
return 0.0
# If no interfaces found, return early
if not interfaces:
return [TextContent(type="text", text=f"⚠️ No interfaces found matching criteria.")]
# 2. Monitor Traffic (Real-time Stats)
target_names = [i.get("name") for i in interfaces]
target_str = ",".join(target_names)
# Mikrotik monitor-traffic can be heavy if many interfaces.
# If > 10 interfaces, we might want to batch or limit?
# REST API usually handles handle comma separated list fine.
monitor_data = {}
try:
# Use monitor-traffic command via POST
# Payload: interface=name1,name2&once=true
payload = {
"interface": target_str,
"once": "true"
}
mon_result = post_data("interface/monitor-traffic", payload, auth, base_url)
# Map results by name
if isinstance(mon_result, list):
for m in mon_result:
monitor_data[m.get("name")] = m
elif isinstance(mon_result, dict) and mon_result.get("name"):
monitor_data[mon_result.get("name")] = mon_result
except Exception as e:
# Fallback or log error
# If monitor-traffic fails (e.g. some interface type doesn't support it),
# we might get partial data or error.
# Let's try to continue with basic stats if available, or just report error.
print(f"Monitor traffic error: {e}")
# We will just use empty stats if monitor fails
def format_rate(bps_val):
"""Format bits per second to human readable Mbps/Kbps"""
@@ -703,16 +773,18 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten
except (ValueError, TypeError):
return "0 bps"
# Extract traffic info
# Extract traffic info combining config + monitor data
traffic_data = []
for iface in interfaces:
# MikroTik provides tx-bits-per-second and rx-bits-per-second for real-time rate
# For total traffic, it provides tx-byte and rx-byte
tx_bps = iface.get("tx-bits-per-second", iface.get("tx-bit", 0))
rx_bps = iface.get("rx-bits-per-second", iface.get("rx-bit", 0))
name = iface.get("name")
mon = monitor_data.get(name, {})
# Get stats from monitor data (preferred) or interface data (fallback)
tx_bps = mon.get("tx-bits-per-second", iface.get("tx-bit", 0))
rx_bps = mon.get("rx-bits-per-second", iface.get("rx-bit", 0))
traffic_data.append({
"name": iface.get("name"),
"name": name,
"type": iface.get("type"),
"tx_rate": format_rate(tx_bps),
"rx_rate": format_rate(rx_bps),

View File

@@ -24,7 +24,15 @@ def get_base_url(host, port):
return f"{protocol}://{host}:{port}/rest"
def fetch_data(endpoint, auth=None, base_url=None):
"""Fetch data from MikroTik REST API with provided credentials"""
"""Fetch data from MikroTik REST API with provided credentials (GET)"""
return _send_request("GET", endpoint, auth, base_url)
def post_data(endpoint, payload, auth=None, base_url=None):
"""Send POST request to MikroTik REST API"""
return _send_request("POST", endpoint, auth, base_url, json_data=payload)
def _send_request(method, endpoint, auth=None, base_url=None, json_data=None):
"""Internal helper for HTTP requests"""
if auth is None:
auth = (DEFAULT_MIKROTIK_USER, DEFAULT_MIKROTIK_PASSWORD)
if base_url is None:
@@ -37,10 +45,10 @@ def fetch_data(endpoint, auth=None, base_url=None):
}
try:
response = requests.get(url, auth=auth, headers=headers, timeout=12)
# Log basic info for debugging if needed (uncomment if very stuck)
# print(f"DEBUG: GET {url} -> Status {response.status_code}")
if method == "GET":
response = requests.get(url, auth=auth, headers=headers, timeout=12)
else:
response = requests.post(url, auth=auth, headers=headers, json=json_data, timeout=12)
if response.status_code != 200:
status_code = response.status_code
@@ -64,6 +72,9 @@ def fetch_data(endpoint, auth=None, base_url=None):
raise Exception(error_msg)
if not response.text or response.text.strip() == "":
# POST requests might return empty body if successful but no content?
# Usually monitor-traffic returns data.
if method == "POST": return {}
raise Exception("Empty Response from Router")
return response.json()