- Add get_vlans tool to filter interfaces by type=vlan - Add get_gateway_interfaces tool to identify gateway interfaces from routes - Add fallback to billing-mcp/config.json when router not in devices.json - Update .gitignore to exclude venv, pycache, snapshots
677 lines
28 KiB
Python
677 lines
28 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import json
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
from mcp.server.stdio import stdio_server
|
|
from mcp.server import Server
|
|
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
|
|
from dotenv import load_dotenv
|
|
|
|
# Import our helper from update_data
|
|
from update_data import gather_all_info, fetch_data, save_data, get_base_url
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
DEVICES_FILE = "/home/wartana/myApp/mikrotik-mcp/devices.json"
|
|
BILLING_CONFIG_FILE = "/home/wartana/myApp/billing-mcp/config.json"
|
|
DATA_DIR = "/home/wartana/myApp/mikrotik-mcp/data"
|
|
SNAPSHOTS_DIR = os.path.join(DATA_DIR, "snapshots")
|
|
|
|
# Create the MCP server
|
|
server = Server("mikrotik")
|
|
|
|
def load_devices() -> List[Dict]:
|
|
"""Load list of routers from devices.json"""
|
|
try:
|
|
if os.path.exists(DEVICES_FILE):
|
|
with open(DEVICES_FILE, 'r') as f:
|
|
return json.load(f)
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error loading devices: {e}")
|
|
return []
|
|
|
|
def load_billing_routers() -> List[Dict]:
|
|
"""Load routers from billing-mcp config.json as fallback"""
|
|
try:
|
|
if os.path.exists(BILLING_CONFIG_FILE):
|
|
with open(BILLING_CONFIG_FILE, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
routers = []
|
|
for isp_key, isp_data in config.get("isps", {}).items():
|
|
for router_name, router_config in isp_data.get("routers", {}).items():
|
|
routers.append({
|
|
"name": router_name,
|
|
"host": router_config.get("host"),
|
|
"port": router_config.get("port", 80),
|
|
"user": router_config.get("user"),
|
|
"pass": router_config.get("pass"),
|
|
"isp": isp_key
|
|
})
|
|
return routers
|
|
return []
|
|
except Exception as e:
|
|
print(f"Error loading billing config: {e}")
|
|
return []
|
|
|
|
def get_router(router_name: str | None) -> Dict | None:
|
|
"""Get router config by name, with fallback to billing config"""
|
|
devices = load_devices()
|
|
|
|
# Try devices.json first
|
|
if devices:
|
|
if not router_name:
|
|
return devices[0]
|
|
for d in devices:
|
|
if d.get("name").lower() == router_name.lower():
|
|
return d
|
|
|
|
# Fallback to billing config
|
|
billing_routers = load_billing_routers()
|
|
if billing_routers:
|
|
if not router_name and not devices:
|
|
return billing_routers[0]
|
|
for r in billing_routers:
|
|
if r.get("name").lower() == router_name.lower():
|
|
return r
|
|
|
|
return None
|
|
|
|
def get_router_data_path(router_name: str) -> str:
|
|
"""Get path to the snapshot data file for a specific router"""
|
|
safe_name = router_name.replace(" ", "_").lower()
|
|
return os.path.join(DATA_DIR, f"{safe_name}.json")
|
|
|
|
def save_snapshot(router_name: str, endpoint: str, data: Any) -> str:
|
|
"""Save raw data to a timestamped snapshot file and return the path"""
|
|
os.makedirs(SNAPSHOTS_DIR, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe_router = router_name.replace(" ", "_").lower()
|
|
endpoint_part = endpoint.replace("/", "_")
|
|
filename = f"{safe_router}_{endpoint_part}_{timestamp}.json"
|
|
filepath = os.path.join(SNAPSHOTS_DIR, filename)
|
|
|
|
with open(filepath, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
return filepath
|
|
|
|
def ensure_first_item(data: Any) -> Dict:
|
|
"""Helper to ensure we get a dictionary, even if MikroTik returns a list or a single object."""
|
|
if isinstance(data, list):
|
|
return data[0] if len(data) > 0 else {}
|
|
if isinstance(data, dict):
|
|
return data
|
|
return {}
|
|
|
|
@server.list_tools()
|
|
async def handle_list_tools() -> list[Tool]:
|
|
"""List available MikroTik tools."""
|
|
return [
|
|
Tool(
|
|
name="list_routers",
|
|
description="Melihat daftar router MikroTik yang terdaftar di konfigurasi.",
|
|
inputSchema={"type": "object", "properties": {}},
|
|
),
|
|
Tool(
|
|
name="get_status",
|
|
description="Mendapatkan informasi dasar router (CPU, Memori, Uptime).",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router (lihat list_routers). Opsional, default ke router pertama."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_interfaces",
|
|
description="Mendapatkan daftar interface network dan status traffic.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
"running_only": {"type": "boolean", "description": "Hanya tampilkan interface yang aktif."},
|
|
"full_data": {"type": "boolean", "description": "Tampilkan semua field data (Hati-hati: bisa menyebabkan error context length jika data sangat besar)."},
|
|
"limit": {"type": "integer", "description": "Batas jumlah interface yang ditampilkan (Default 50, Max 200)."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_hotspot_active",
|
|
description="Mendapatkan daftar pengguna hotspot yang sedang aktif.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_logs",
|
|
description="Mendapatkan baris log terakhir dari router.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
"limit": {"type": "integer", "description": "Jumlah log (default 20)."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="refresh_data",
|
|
description="Sinkronisasi data terbaru langsung dari router ke snapshot file.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="test_permissions",
|
|
description="Mengecek izin akses user MikroTik ke berbagai menu API (Diagnostik).",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="add_router",
|
|
description="Menambahkan router MikroTik baru ke dalam daftar konfigurasi.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "Nama unik untuk router (misal: Batuaji)."},
|
|
"host": {"type": "string", "description": "IP atau Hostname router."},
|
|
"port": {"type": "integer", "description": "Port REST API (default 80)."},
|
|
"user": {"type": "string", "description": "Username admin."},
|
|
"pass": {"type": "string", "description": "Password admin."},
|
|
},
|
|
"required": ["name", "host", "user", "pass"],
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_ppp_active",
|
|
description="Mendapatkan daftar sesi PPP (PPPoE/L2TP/VPN) yang saat ini aktif.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_ppp_secrets",
|
|
description="Mendapatkan daftar user/secrets yang terdaftar di layanan PPP.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_queues",
|
|
description="Mendapatkan daftar Simple Queues untuk melihat limitasi bandwidth user.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_queue_tree",
|
|
description="Mendapatkan daftar Queue Tree untuk melihat manajemen bandwidth yang lebih kompleks.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="remove_router",
|
|
description="Menghapus router dari daftar konfigurasi.",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string", "description": "Nama router yang ingin dihapus."},
|
|
},
|
|
"required": ["name"],
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_vlans",
|
|
description="Mendapatkan daftar interface VLAN (filter by type=vlan).",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
},
|
|
},
|
|
),
|
|
Tool(
|
|
name="get_gateway_interfaces",
|
|
description="Mendapatkan daftar interface yang berfungsi sebagai gateway (dari routing table).",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"router_name": {"type": "string", "description": "Nama router."},
|
|
"active_only": {"type": "boolean", "description": "Hanya tampilkan gateway yang aktif (default: true)."},
|
|
},
|
|
},
|
|
)
|
|
]
|
|
|
|
@server.call_tool()
|
|
async def handle_call_tool(name: str, arguments: dict | None) -> list[TextContent | ImageContent | EmbeddedResource]:
|
|
"""Handle tool execution requests."""
|
|
|
|
try:
|
|
# 1. Handle list_routers (no router connection needed)
|
|
if name == "list_routers":
|
|
devices = load_devices()
|
|
summary = [{"name": d["name"], "host": d["host"]} for d in devices]
|
|
return [TextContent(type="text", text=json.dumps(summary, indent=2))]
|
|
|
|
# 2. Get target router
|
|
router_name = arguments.get("router_name") if arguments else None
|
|
router = get_router(router_name)
|
|
|
|
if not router:
|
|
return [TextContent(type="text", text=f"❌ Router '{router_name or 'Default'}' tidak ditemukan.")]
|
|
|
|
# Prepare connection info
|
|
auth = (router["user"], router["pass"])
|
|
base_url = get_base_url(router["host"], router["port"])
|
|
actual_name = router["name"]
|
|
|
|
if name == "get_status":
|
|
try:
|
|
resource_raw = fetch_data("system/resource", auth, base_url)
|
|
identity_raw = fetch_data("system/identity", auth, base_url)
|
|
|
|
resource_data = ensure_first_item(resource_raw)
|
|
identity_data = ensure_first_item(identity_raw)
|
|
|
|
identity_name = identity_data.get("name", "Unknown")
|
|
|
|
data = {
|
|
"router_name": actual_name,
|
|
"identity": identity_name,
|
|
"resource": resource_data
|
|
}
|
|
return [TextContent(type="text", text=json.dumps(data, indent=2))]
|
|
except Exception as e:
|
|
return [TextContent(type="text", text=f"❌ Gagal mengambil status router: {str(e)}\n\nTips: Pastikan user memiliki izin untuk '/system/resource' dan '/system/identity'.")]
|
|
|
|
elif name == "get_interfaces":
|
|
interfaces = fetch_data("interface", auth, base_url)
|
|
|
|
# 1. Handle Running Only filter
|
|
if arguments and arguments.get("running_only"):
|
|
interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True]
|
|
|
|
# 2. Handle Limit
|
|
limit = arguments.get("limit", 50) if arguments else 50
|
|
if limit > 200: limit = 200 # Cap at 200
|
|
|
|
total_found = len(interfaces)
|
|
interfaces = interfaces[:limit]
|
|
|
|
# 3. Handle Field Filtering (Default: Minimal fields)
|
|
full_data = arguments.get("full_data", False) if arguments else False
|
|
|
|
# SAVE FULL DATA SNAPSHOT ALWAYS
|
|
snapshot_path = save_snapshot(actual_name, "interface", interfaces)
|
|
|
|
if not full_data:
|
|
# Essential fields only to save context/tokens
|
|
essential_fields = ["name", "type", "running", "disabled", "comment", "default-name"]
|
|
optimized_interfaces = []
|
|
for iface in interfaces:
|
|
optimized_interfaces.append({k: iface.get(k) for k in essential_fields if k in iface})
|
|
interfaces = optimized_interfaces
|
|
|
|
# LIMIT RESPONSE SIZE FOR LLM
|
|
display_limit = 15
|
|
is_truncated = total_found > display_limit
|
|
preview_interfaces = interfaces[:display_limit]
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"count": total_found,
|
|
"showing": len(preview_interfaces),
|
|
"snapshot_file": snapshot_path,
|
|
"status": "Truncated for readability." if is_truncated else "Full list.",
|
|
"interfaces": preview_interfaces
|
|
}
|
|
|
|
msg = json.dumps(result, indent=2)
|
|
if is_truncated:
|
|
msg += f"\n\n⚠️ DATA TRUNCATED ({total_found} items). Only showing {display_limit} to save context."
|
|
msg += f"\n📂 Full snapshot: {snapshot_path}"
|
|
|
|
# FINAL SAFETY CAP (30KB)
|
|
if len(msg) > 30000:
|
|
msg = msg[:30000] + "\n\n... [TRUNCATED DUE TO EXTREME SIZE] ..."
|
|
|
|
return [TextContent(type="text", text=msg)]
|
|
|
|
elif name == "get_hotspot_active":
|
|
hotspot = fetch_data("ip/hotspot/active", auth, base_url)
|
|
total = len(hotspot) if isinstance(hotspot, list) else 0
|
|
|
|
snapshot_path = save_snapshot(actual_name, "ip_hotspot_active", hotspot)
|
|
|
|
# Summarize for LLM
|
|
display_limit = 10
|
|
preview = hotspot[:display_limit] if isinstance(hotspot, list) else hotspot
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"total_active": total,
|
|
"snapshot_file": snapshot_path,
|
|
"sample_users": preview
|
|
}
|
|
|
|
text_res = json.dumps(result, indent=2)
|
|
if total > display_limit:
|
|
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} user aktif.\n"
|
|
text_res += f"📂 Data lengkap: {snapshot_path}"
|
|
|
|
return [TextContent(type="text", text=text_res)]
|
|
|
|
elif name == "get_logs":
|
|
limit = arguments.get("limit", 20) if arguments else 20
|
|
logs = fetch_data("log", auth, base_url)
|
|
|
|
snapshot_path = save_snapshot(actual_name, "log", logs)
|
|
|
|
recent_logs = logs[-limit:] if logs else []
|
|
|
|
summary = {
|
|
"router": actual_name,
|
|
"total_logs_available": len(logs) if logs else 0,
|
|
"snapshot_file": snapshot_path,
|
|
"recent_entries": recent_logs
|
|
}
|
|
|
|
msg = json.dumps(summary, indent=2)
|
|
msg += f"\n\n📂 Log lengkap disimpan di: {snapshot_path}"
|
|
|
|
# FINAL SAFETY CAP (30KB)
|
|
if len(msg) > 30000:
|
|
msg = msg[:30000] + "\n\n... [TRUNCATED DUE TO EXTREME SIZE] ..."
|
|
|
|
return [TextContent(type="text", text=msg)]
|
|
|
|
elif name == "get_ppp_active":
|
|
ppp_active = fetch_data("ppp/active", auth, base_url)
|
|
total = len(ppp_active) if isinstance(ppp_active, list) else 0
|
|
|
|
snapshot_path = save_snapshot(actual_name, "ppp_active", ppp_active)
|
|
|
|
# Summarize for LLM
|
|
display_limit = 10
|
|
preview = ppp_active[:display_limit] if isinstance(ppp_active, list) else ppp_active
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"total_active_ppp": total,
|
|
"snapshot_file": snapshot_path,
|
|
"sample_active": preview
|
|
}
|
|
|
|
text_res = json.dumps(result, indent=2)
|
|
if total > display_limit:
|
|
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} PPP active.\n"
|
|
text_res += f"📂 Data lengkap: {snapshot_path}"
|
|
|
|
return [TextContent(type="text", text=text_res)]
|
|
|
|
elif name == "get_ppp_secrets":
|
|
ppp_secrets = fetch_data("ppp/secret", auth, base_url)
|
|
total = len(ppp_secrets) if isinstance(ppp_secrets, list) else 0
|
|
|
|
snapshot_path = save_snapshot(actual_name, "ppp_secret", ppp_secrets)
|
|
|
|
# Summarize for LLM
|
|
display_limit = 10
|
|
preview = ppp_secrets[:display_limit] if isinstance(ppp_secrets, list) else ppp_secrets
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"total_secrets": total,
|
|
"snapshot_file": snapshot_path,
|
|
"sample_secrets": preview
|
|
}
|
|
|
|
text_res = json.dumps(result, indent=2)
|
|
if total > display_limit:
|
|
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} PPP secrets.\n"
|
|
text_res += f"📂 Data lengkap: {snapshot_path}"
|
|
|
|
return [TextContent(type="text", text=text_res)]
|
|
|
|
elif name == "get_queues":
|
|
queues = fetch_data("queue/simple", auth, base_url)
|
|
total = len(queues) if isinstance(queues, list) else 0
|
|
|
|
snapshot_path = save_snapshot(actual_name, "queue_simple", queues)
|
|
|
|
# Summarize for LLM
|
|
display_limit = 15
|
|
preview = queues[:display_limit] if isinstance(queues, list) else queues
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"total_queues": total,
|
|
"snapshot_file": snapshot_path,
|
|
"sample_queues": preview
|
|
}
|
|
|
|
text_res = json.dumps(result, indent=2)
|
|
if total > display_limit:
|
|
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} simple queues.\n"
|
|
text_res += f"📂 Data lengkap: {snapshot_path}"
|
|
|
|
return [TextContent(type="text", text=text_res)]
|
|
|
|
elif name == "get_queue_tree":
|
|
qtree = fetch_data("queue/tree", auth, base_url)
|
|
total = len(qtree) if isinstance(qtree, list) else 0
|
|
|
|
snapshot_path = save_snapshot(actual_name, "queue_tree", qtree)
|
|
|
|
# Summarize for LLM
|
|
display_limit = 15
|
|
preview = qtree[:display_limit] if isinstance(qtree, list) else qtree
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"total_queue_tree": total,
|
|
"snapshot_file": snapshot_path,
|
|
"sample_queue_tree": preview
|
|
}
|
|
|
|
text_res = json.dumps(result, indent=2)
|
|
if total > display_limit:
|
|
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} entries di queue tree.\n"
|
|
text_res += f"📂 Data lengkap: {snapshot_path}"
|
|
|
|
return [TextContent(type="text", text=text_res)]
|
|
|
|
elif name == "refresh_data":
|
|
new_data = gather_all_info(router["host"], router["port"], router["user"], router["pass"])
|
|
if new_data:
|
|
data_path = get_router_data_path(actual_name)
|
|
save_data(new_data, data_path)
|
|
return [TextContent(type="text", text=f"✅ Data for router '{actual_name}' successfully updated in {data_path}.")]
|
|
else:
|
|
return [TextContent(type="text", text=f"❌ Failed to refresh data for '{actual_name}'.")]
|
|
|
|
elif name == "test_permissions":
|
|
endpoints = [
|
|
"system/resource", "system/identity", "interface",
|
|
"ip/address", "ip/hotspot/active", "log", "user", "ppp/secret"
|
|
]
|
|
results = {}
|
|
failures = []
|
|
for ep in endpoints:
|
|
try:
|
|
fetch_data(ep, auth, base_url)
|
|
results[ep] = "✅ OK"
|
|
except Exception as e:
|
|
results[ep] = f"❌ {str(e)}"
|
|
failures.append(f"{ep}: {str(e)}")
|
|
|
|
troubleshooting = ""
|
|
if failures:
|
|
troubleshooting = "\n\n🔍 ANALISIS & SOLUSI:\n"
|
|
if any("system/resource" in f for f in failures):
|
|
troubleshooting += "- Endpoint 'system/resource' gagal. Jika user memiliki akses FULL, coba cek apakah service 'www' (port 80) di router memiliki filter IP atau jika ada firewall yang memblokir request besar.\n"
|
|
if any("HTTP 403" in f for f in failures):
|
|
troubleshooting += "- Beberapa menu memerlukan izin spesifik. Walaupun user group-nya 'full', pastikan tidak ada kebijakan 'skin' yang membatasi API.\n"
|
|
if any("Connection Error" in f or "Timeout" in f for f in failures):
|
|
troubleshooting += "- Masalah konektivitas. Pastikan IP Router dapat dijangkau dari server ini dan service API MikroTik aktif.\n"
|
|
|
|
summary = {
|
|
"router": actual_name,
|
|
"host": router["host"],
|
|
"results": results,
|
|
"troubleshooting": troubleshooting
|
|
}
|
|
return [TextContent(type="text", text=json.dumps(summary, indent=2) + troubleshooting)]
|
|
|
|
elif name == "add_router":
|
|
devices = load_devices()
|
|
new_device = {
|
|
"name": arguments["name"],
|
|
"host": arguments["host"],
|
|
"port": arguments.get("port", 80),
|
|
"user": arguments["user"],
|
|
"pass": arguments["pass"]
|
|
}
|
|
# Check if name already exists
|
|
if any(d["name"].lower() == new_device["name"].lower() for d in devices):
|
|
return [TextContent(type="text", text=f"❌ Router dengan nama '{new_device['name']}' sudah ada.")]
|
|
|
|
devices.append(new_device)
|
|
with open(DEVICES_FILE, 'w') as f:
|
|
json.dump(devices, f, indent=2)
|
|
return [TextContent(type="text", text=f"✅ Router '{new_device['name']}' berhasil ditambahkan.")]
|
|
|
|
elif name == "remove_router":
|
|
devices = load_devices()
|
|
name_to_remove = arguments["name"].lower()
|
|
new_devices = [d for d in devices if d["name"].lower() != name_to_remove]
|
|
|
|
if len(new_devices) == len(devices):
|
|
return [TextContent(type="text", text=f"❌ Router '{arguments['name']}' tidak ditemukan.")]
|
|
|
|
with open(DEVICES_FILE, 'w') as f:
|
|
json.dump(new_devices, f, indent=2)
|
|
return [TextContent(type="text", text=f"✅ Router '{arguments['name']}' berhasil dihapus.")]
|
|
|
|
elif name == "get_vlans":
|
|
interfaces = fetch_data("interface", auth, base_url)
|
|
|
|
# Filter VLAN only
|
|
vlans = [i for i in interfaces if i.get("type") == "vlan"]
|
|
total = len(vlans)
|
|
|
|
snapshot_path = save_snapshot(actual_name, "interface_vlan", vlans)
|
|
|
|
# Display limit for LLM
|
|
display_limit = 30
|
|
preview = vlans[:display_limit]
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"total_vlan_interfaces": total,
|
|
"snapshot_file": snapshot_path,
|
|
"vlans": preview
|
|
}
|
|
|
|
text_res = json.dumps(result, indent=2)
|
|
if total > display_limit:
|
|
text_res += f"\n\n⚠️ Hanya menampilkan {display_limit} dari {total} VLAN interfaces.\n"
|
|
text_res += f"📂 Data lengkap: {snapshot_path}"
|
|
|
|
return [TextContent(type="text", text=text_res)]
|
|
|
|
elif name == "get_gateway_interfaces":
|
|
routes = fetch_data("ip/route", auth, base_url)
|
|
|
|
active_only = arguments.get("active_only", True) if arguments else True
|
|
|
|
# Extract gateway interfaces from immediate-gw field (format: IP%interface)
|
|
gateway_interfaces = {}
|
|
for r in routes:
|
|
immediate_gw = r.get("immediate-gw", "")
|
|
if "%" in immediate_gw:
|
|
parts = immediate_gw.split("%")
|
|
gw_ip = parts[0]
|
|
iface = parts[1]
|
|
|
|
# Filter active only
|
|
if active_only and r.get("active") != "true":
|
|
continue
|
|
|
|
if iface not in gateway_interfaces:
|
|
gateway_interfaces[iface] = {
|
|
"interface": iface,
|
|
"gateway_ips": [],
|
|
"routes": []
|
|
}
|
|
|
|
gateway_interfaces[iface]["gateway_ips"].append(gw_ip)
|
|
gateway_interfaces[iface]["routes"].append({
|
|
"dst": r.get("dst-address"),
|
|
"gateway": r.get("gateway"),
|
|
"distance": r.get("distance"),
|
|
"routing_table": r.get("routing-table"),
|
|
"active": r.get("active") == "true"
|
|
})
|
|
|
|
# Deduplicate gateway IPs
|
|
for iface in gateway_interfaces:
|
|
gateway_interfaces[iface]["gateway_ips"] = list(set(gateway_interfaces[iface]["gateway_ips"]))
|
|
|
|
snapshot_path = save_snapshot(actual_name, "gateway_interfaces", list(gateway_interfaces.values()))
|
|
|
|
result = {
|
|
"router": actual_name,
|
|
"filter": "active_only" if active_only else "all",
|
|
"total_gateway_interfaces": len(gateway_interfaces),
|
|
"snapshot_file": snapshot_path,
|
|
"gateway_interfaces": list(gateway_interfaces.values())
|
|
}
|
|
|
|
return [TextContent(type="text", text=json.dumps(result, indent=2))]
|
|
|
|
else:
|
|
raise ValueError(f"Unknown tool: {name}")
|
|
|
|
except Exception as e:
|
|
return [TextContent(type="text", text=f"Error executing tool {name}: {str(e)}")]
|
|
|
|
async def main():
|
|
async with stdio_server() as (read_stream, write_stream):
|
|
await server.run(
|
|
read_stream,
|
|
write_stream,
|
|
server.create_initialization_options()
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|