From 6b6e56a082df0205cdefb3bf19def8e69dc586b4 Mon Sep 17 00:00:00 2001 From: wartana Date: Tue, 3 Feb 2026 22:41:07 +0800 Subject: [PATCH] Feat: Implement live traffic monitoring via monitor-traffic command and optimize interface fetching --- app/__pycache__/update_data.cpython-311.pyc | Bin 8384 -> 9088 bytes app/mcp_server.py | 116 ++++++++++++++++---- app/update_data.py | 21 +++- 3 files changed, 110 insertions(+), 27 deletions(-) diff --git a/app/__pycache__/update_data.cpython-311.pyc b/app/__pycache__/update_data.cpython-311.pyc index 405d7def1f8406ce4faa78996fa467211cfbab7d..d3d76927a6b2e65d72d71f8bab6dbec8f4078f29 100644 GIT binary patch delta 2394 zcmZ`)Z)_Ar6rb7MyWP9(^{&0U-nFIe7W&5nf5%@)H`x1tpCJCw^8P^RnCfW*Y$1+1{ z)0UFs=Afj-EZX#E-jDk zPvB>fU9PdN zP2|hU@}HZP)8(cLVe&N0KdnQSV_`{P<5~V8v$7=*b7&3`?+AwoHL+BhRfwAP01kQB z!?n(@%X-rC8idh)lopSP`_Tj@MV12Kg{;?7h%XP=ZFx!YaUViiZ#n{U{}Op1FIPcc zl9#*Sd}&T@d1wH3LCZrh3B4(l^Vpm^5f76)Wb#{>C(FCMl1||QVt#;DE#1wi?5j|S zU2^%is`=0n_|D<{S{OpTXsPd&J%Ib3Da!jOX0p4a5t7W&aB*Q(hiCvV_Y-05?yBa8 zXZ=gg)@9XYF{5T}L5q;3vT1jHyZX91fz9Yc`EW^L4bij}(aM+R%s(Uh%U!!aRIoq@ zY}z1d{X>bRF8_+d!GbPZv=p6H1oyRK1;UJ6ztKTBfpV83R(ZU3PukRJI+@Z2_0*V7 zwSf%PcK7x56snf}EbnQf{P*4~0u~dULbT;UJxO$G*s?h^s%OScs8^>#V{pipcrjDn zGgw?n3AC17QNG3@W_rdY3F?*o=4o)zbzC3xb~X8I_o!|TW{6Yr4=bIn=FUXuYt(V8 zP~8~Iqz#>JVQ0K+8~Bgnjo<+}ZIANmcNjM}rrBL@@6PqErXSRjGcUf?KGptHq~<+q zX2&eK8o#x7KGL%g=~;~QOzoc9eHS#@nci9R>b_g8^RFJ7k0ci&$;C)=YWFW{kiD#a zxnAS}h0;!Uv));FJSj&Bmb z-Q2^Fcce|8f`~pjaqIfSm7CMhNvrssW z-(?5GJ@^p&JY0njvfJT_`Xrck(|+zupod|yv0?D1H{V{V4r|sDuEXcpJ7sSNHo}%} z;_PwGo?wlUZ5z7ylJ_?HEHGPg9-INbKg9R4o=*0s^g;_~??=|U#zFDQ+>OXf(sTS8 zfX(>m7)`^{Ne?nJxL6q&N`dvLZsP$b&6UmHNFLR5&Hp5v|7dPt)!%rH*xlV7lYn){ zFg*(LSeY#+5+rjtk)TaH#yXa*UOlif8Lr{aqw`o&{+i_O5(i7Uf7;F&M`?@6RLa44 z5joF^bM^jFo*K=N@s$1yJq?F24C5aAxBA950k_>p%b6nJZp?)%xYCN7?xW@GDEqGF G_x}JuT~;iTyJ~afR0D4`Cex;i7{0OGMc4*M10|(==;Z<_qUN=Y8I< z=e)VO_wC%Zao^`&uL`If{i;^XN6-7(;pX}NU!H77zO!(~i#m{>?miSl8r}VR00kE~ z7LR8+plc|!1_8K4S3@A6Xpl_7__Bt=_wi{cau7w_ahByVPu_v`oT zp`o__siUDN^R%^+VlCK8$3NpZO{PtFy8gLH^+&*lpbF-fLxz&bwL57B1V^a8`=UAFH36zMq+=lA0Eh7gz^6rB<{qBBd@VfMSfp zj?^VX>ID!?1A~8)p9V`1skTUab9LDkk$ZRAqAerOX)gfFb=inghquMyV-AYDDem9J zT{I!kz-r!u8*=_4@ zEp+hdV1>w*lp;A<>-RJpmMHm#qyP>;u&+XxEX#@PUaYSrHdR;l#YU{IrWXKPA=`4_ zm~uv0f>zreUJ5kNv|I8(x3<=sv0;&{(mfq)W&l8QhH2N{J;YY_536)9x~{gevsL<3 zMGr8%qV#Dy43|LTw@-v*jJIy;np=QlH(!5?kIyN22DpvGr%_8U2 zGP$c>gCTOq9U?!uXGzwR5*?vrRt<8}^ZdhZN7fCrSS=bBet_0;)6o}DfzD9<;$CGJS=GRdC!_ig#i_}%=hv1FS z ze+b=-86IFtqbe=n3_BlTid`2!M!b;;cyfIzLSXD!79u%vMYjs5U=N1dTa{UuH!) z{0d8erpexn$s~@@4bMY8Wf9M@95x9$gn0(br;25oUC|wQ3Lhaq zb|zeJQ2fIBU!8Yhr*Mo)8sY_(5q6|}9#!Y^dHf)?@gR}9d!9JRmwDLV2=5G?vB0b# qev~QJ-r)=7GM*<3-7)nHwb?hp{FZ#sy%|0XGq=HhwMa list[TextConten # 1. Handle list_routers (no router connection needed) if name == "list_routers": devices = load_devices() - summary = [{"name": d["name"], "host": d["host"]} for d in devices] + billing_routers = load_billing_routers() + + summary = [] + seen_names = set() + + # Add devices from devices.json + for d in devices: + summary.append({"name": d["name"], "host": d["host"], "source": "devices.json"}) + seen_names.add(d["name"].lower()) + + # Add devices from billing config if not duplicate + for r in billing_routers: + if r["name"].lower() not in seen_names: + summary.append({"name": r["name"], "host": r["host"], "source": "billing-mcp"}) + return [TextContent(type="text", text=json.dumps(summary, indent=2))] # 2. Get target router @@ -325,18 +339,42 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten return [TextContent(type="text", text=f"❌ Gagal mengambil status router: {str(e)}\n\nTips: Pastikan user memiliki izin untuk '/system/resource' dan '/system/identity'.")] elif name == "get_interfaces": - interfaces = fetch_data("interface", auth, base_url) + # Parsing arguments + running_only = arguments.get("running_only", False) if arguments else False + full_data = arguments.get("full_data", False) if arguments else False + user_limit = arguments.get("limit", 50) if arguments else 50 + if user_limit > 200: user_limit = 200 + + # Optimizing fetch with server-side filtering and field selection + endpoint = "interface/print" + payload = {} - # 1. Handle Running Only filter - if arguments and arguments.get("running_only"): - interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True] + # 1. Server-side Query/Filter + query = [] + if running_only: + query.append("running=true") - # 2. Handle Limit - limit = arguments.get("limit", 50) if arguments else 50 - if limit > 200: limit = 200 # Cap at 200 + if query: + payload[".query"] = query + + # 2. Server-side Field Selection (Proplist) + # If full_data is False, request only specific fields to save bandwidth + if not full_data: + payload[".proplist"] = [".id", "name", "type", "running", "disabled", "comment", "default-name", "actual-mtu", "mac-address"] + try: + # Use POST for query/proplist support + interfaces = post_data(endpoint, payload, auth, base_url) + except Exception as e: + # Fallback to standard GET if POST fails (compatibility) + print(f"POST interface/print failed: {e}. Falling back to GET.") + interfaces = fetch_data("interface", auth, base_url) + # Apply client-side filtering since server-side failed + if running_only: + interfaces = [i for i in interfaces if i.get("running") == "true" or i.get("running") is True] + total_found = len(interfaces) - interfaces = interfaces[:limit] + interfaces = interfaces[:user_limit] # 3. Handle Field Filtering (Default: Minimal fields) full_data = arguments.get("full_data", False) if arguments else False @@ -671,6 +709,7 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten return [TextContent(type="text", text=json.dumps(result, indent=2))] elif name == "get_interface_traffic": + # 1. Get List of Interfaces (to know names) interfaces = fetch_data("interface", auth, base_url) # Filter by running only (default true) @@ -683,12 +722,43 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten if interface_name: interfaces = [i for i in interfaces if i.get("name", "").lower() == interface_name.lower()] - def bytes_to_mbps(bytes_val): - """Convert bytes to Mbps (bits per second / 1,000,000)""" - try: - return round(float(bytes_val) * 8 / 1_000_000, 2) - except (ValueError, TypeError): - return 0.0 + # If no interfaces found, return early + if not interfaces: + return [TextContent(type="text", text=f"⚠️ No interfaces found matching criteria.")] + + # 2. Monitor Traffic (Real-time Stats) + target_names = [i.get("name") for i in interfaces] + target_str = ",".join(target_names) + + # Mikrotik monitor-traffic can be heavy if many interfaces. + # If > 10 interfaces, we might want to batch or limit? + # REST API usually handles handle comma separated list fine. + + monitor_data = {} + try: + # Use monitor-traffic command via POST + # Payload: interface=name1,name2&once=true + payload = { + "interface": target_str, + "once": "true" + } + + mon_result = post_data("interface/monitor-traffic", payload, auth, base_url) + + # Map results by name + if isinstance(mon_result, list): + for m in mon_result: + monitor_data[m.get("name")] = m + elif isinstance(mon_result, dict) and mon_result.get("name"): + monitor_data[mon_result.get("name")] = mon_result + + except Exception as e: + # Fallback or log error + # If monitor-traffic fails (e.g. some interface type doesn't support it), + # we might get partial data or error. + # Let's try to continue with basic stats if available, or just report error. + print(f"Monitor traffic error: {e}") + # We will just use empty stats if monitor fails def format_rate(bps_val): """Format bits per second to human readable Mbps/Kbps""" @@ -703,16 +773,18 @@ async def handle_call_tool(name: str, arguments: dict | None) -> list[TextConten except (ValueError, TypeError): return "0 bps" - # Extract traffic info + # Extract traffic info combining config + monitor data traffic_data = [] for iface in interfaces: - # MikroTik provides tx-bits-per-second and rx-bits-per-second for real-time rate - # For total traffic, it provides tx-byte and rx-byte - tx_bps = iface.get("tx-bits-per-second", iface.get("tx-bit", 0)) - rx_bps = iface.get("rx-bits-per-second", iface.get("rx-bit", 0)) + name = iface.get("name") + mon = monitor_data.get(name, {}) + + # Get stats from monitor data (preferred) or interface data (fallback) + tx_bps = mon.get("tx-bits-per-second", iface.get("tx-bit", 0)) + rx_bps = mon.get("rx-bits-per-second", iface.get("rx-bit", 0)) traffic_data.append({ - "name": iface.get("name"), + "name": name, "type": iface.get("type"), "tx_rate": format_rate(tx_bps), "rx_rate": format_rate(rx_bps), diff --git a/app/update_data.py b/app/update_data.py index a520c29..4d8a660 100644 --- a/app/update_data.py +++ b/app/update_data.py @@ -24,7 +24,15 @@ def get_base_url(host, port): return f"{protocol}://{host}:{port}/rest" def fetch_data(endpoint, auth=None, base_url=None): - """Fetch data from MikroTik REST API with provided credentials""" + """Fetch data from MikroTik REST API with provided credentials (GET)""" + return _send_request("GET", endpoint, auth, base_url) + +def post_data(endpoint, payload, auth=None, base_url=None): + """Send POST request to MikroTik REST API""" + return _send_request("POST", endpoint, auth, base_url, json_data=payload) + +def _send_request(method, endpoint, auth=None, base_url=None, json_data=None): + """Internal helper for HTTP requests""" if auth is None: auth = (DEFAULT_MIKROTIK_USER, DEFAULT_MIKROTIK_PASSWORD) if base_url is None: @@ -37,10 +45,10 @@ def fetch_data(endpoint, auth=None, base_url=None): } try: - response = requests.get(url, auth=auth, headers=headers, timeout=12) - - # Log basic info for debugging if needed (uncomment if very stuck) - # print(f"DEBUG: GET {url} -> Status {response.status_code}") + if method == "GET": + response = requests.get(url, auth=auth, headers=headers, timeout=12) + else: + response = requests.post(url, auth=auth, headers=headers, json=json_data, timeout=12) if response.status_code != 200: status_code = response.status_code @@ -64,6 +72,9 @@ def fetch_data(endpoint, auth=None, base_url=None): raise Exception(error_msg) if not response.text or response.text.strip() == "": + # POST requests might return empty body if successful but no content? + # Usually monitor-traffic returns data. + if method == "POST": return {} raise Exception("Empty Response from Router") return response.json()