commit c0baf28992d17c4c084e0c74e72270f44224f7fe Author: wartana Date: Thu Feb 5 16:25:38 2026 +0000 Initial commit: Speedtest MCP Server diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..93526df --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +venv/ +__pycache__/ diff --git a/mcp_server.py b/mcp_server.py new file mode 100644 index 0000000..5c07e5b --- /dev/null +++ b/mcp_server.py @@ -0,0 +1,116 @@ +from mcp.server.fastmcp import FastMCP +import subprocess +import json +import logging + +import xml.etree.ElementTree as ET +import httpx + +# Initialize FastMCP server +mcp = FastMCP("Speedtest") + +@mcp.tool() +async def list_servers(search: str = None) -> str: + """ + List available speedtest servers globally. + Args: + search: Optional search term to filter servers (e.g. "Jakarta", "Singapore", "US"). + If omitted, returns a small sample of servers. + Returns: + JSON string of available servers. + """ + url = "https://www.speedtest.net/speedtest-servers-static.php" + try: + # Fetch XML list + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + + root = ET.fromstring(response.content) + + results = [] + # XML structure: + # or sometimes just depending on version, + # but usually it's nested in servers. + + servers_list = root.find("servers") + if servers_list is None: + # Fallback if structure is flat + iterator = root.findall("server") + else: + iterator = servers_list.findall("server") + + for server in iterator: + # Attributes: url, lat, lon, name, country, cc, sponsor, id, host + data = server.attrib + + if search: + search_lower = search.lower() + # Search in country, name, Sponsor + if (search_lower in data.get('country', '').lower() or + search_lower in data.get('name', '').lower() or + search_lower in data.get('sponsor', '').lower()): + results.append(data) + else: + # If no search, we don't want to return 8000 servers. Return top 20. + if len(results) < 20: + results.append(data) + + if len(results) > 50: + results = results[:50] + + # Hardcoded International Servers (Curated for Bandwidth Testing) + # These will always be appended to search results or default list + international_servers = [ + {"id": "13623", "name": "Singtel", "country": "Singapore", "sponsor": "Singtel", "host": "Singapore"}, + {"id": "4871", "name": "M1 Limited", "country": "Singapore", "sponsor": "M1", "host": "Singapore"}, + {"id": "60667", "name": "DigitalOcean", "country": "Singapore", "sponsor": "DigitalOcean", "host": "Singapore"}, + {"id": "21569", "name": "Google Cloud", "country": "Japan", "sponsor": "Google", "host": "Tokyo"}, + {"id": "15047", "name": "AT&T", "country": "United States", "sponsor": "AT&T", "host": "New York, NY"}, + {"id": "18335", "name": "Cloudflare", "country": "United States", "sponsor": "Cloudflare", "host": "San Francisco, CA"}, + ] + + # Merge international servers (avoid duplicates) + existing_ids = {s.get('id') for s in results} + for s in international_servers: + if str(s['id']) not in existing_ids: + # Simple filter matching + if not search: + results.append(s) + elif search: + search_lower = search.lower() + if (search_lower in s['country'].lower() or + search_lower in s['name'].lower() or + search_lower in s['sponsor'].lower()): + results.append(s) + + return json.dumps(results, indent=2) + + except Exception as e: + return f"Error listing servers: {str(e)}" + +@mcp.tool() +def run_speedtest(server_id: int = None) -> str: + """ + Run a speedtest. + Args: + server_id: Optional ID of the server to test against. If omitted, uses auto-selection. + Returns: + JSON string containing the speedtest results (download, upload, ping, etc). + """ + cmd = ["/usr/bin/speedtest", "--accept-license", "--accept-gdpr", "--format=json"] + if server_id: + cmd.extend(["-s", str(server_id)]) + + try: + # This might take a while (15-30s) + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + return result.stdout + except subprocess.CalledProcessError as e: + return f"Error running speedtest: {e.stderr}" + except Exception as e: + return f"Error: {str(e)}" + +if __name__ == "__main__": + # fastmcp runs on stdio by default when called this way + mcp.run() diff --git a/start-browser.sh b/start-browser.sh new file mode 100755 index 0000000..56888aa --- /dev/null +++ b/start-browser.sh @@ -0,0 +1,26 @@ +#!/bin/bash +export DISPLAY=:1 +export HOST_IP=$(hostname -I | awk '{print $1}') + +# Cleanup previous locks +rm -rf /tmp/.X1-lock /tmp/.X11-unix/X1 + +echo "Starting Xvnc on :1..." +Xvnc :1 -geometry 1280x720 -depth 24 -rfbauth ~/.vnc/passwd -rfbport 5901 & +PID_XVNC=$! +sleep 2 + +echo "Starting Chromium..." +# Run Chromium in standard maximized mode (allows address bar) +chromium --no-sandbox --display=:1 --window-position=0,0 --window-size=1280,720 --start-maximized --no-first-run --no-default-browser-check --test-type https://google.com & + +echo "Starting noVNC..." +# noVNC (websockify) maps port 6080 to VNC port 5901 +websockify -D --web=/usr/share/novnc 6080 localhost:5901 + +echo "=================================================" +echo "Access your browser at: http://$HOST_IP:6080/vnc.html" +echo "VNC Password: (configured)" +echo "=================================================" + +wait $PID_XVNC diff --git a/test_mcp.py b/test_mcp.py new file mode 100644 index 0000000..2e709ee --- /dev/null +++ b/test_mcp.py @@ -0,0 +1,10 @@ +import asyncio +from mcp_server import list_servers + +async def main(): + print("Searching for Singapore...") + result = await list_servers("Singapore") + print(result[:500] + "...") # Print first 500 chars + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test_mcp_full.py b/test_mcp_full.py new file mode 100644 index 0000000..c30c7be --- /dev/null +++ b/test_mcp_full.py @@ -0,0 +1,50 @@ +import asyncio +import json +from mcp_server import list_servers, run_speedtest + +async def main(): + print("--- 1. Testing Discovery: list_servers('Singapore') ---") + servers_json = await list_servers("Singapore") + servers = json.loads(servers_json) + print(f"Found {len(servers)} servers.") + + if not servers: + print("No servers found!") + return + + # Print found servers + for s in servers: + print(f"ID: {s['id']} | Name: {s['name']} | Sponsor: {s['sponsor']}") + + # Pick the Singtel server (ID 13623) if available, otherwise the first one + target_server = next((s for s in servers if str(s['id']) == "13623"), servers[0]) + target_id = target_server['id'] + target_name = target_server['sponsor'] + + print(f"\n--- 2. Testing Execution: run_speedtest({target_id}) [{target_name}] ---") + print("Running speedtest... (Please wait ~30 seconds)...") + + # Run the speedtest + result_json = run_speedtest(int(target_id)) + + try: + data = json.loads(result_json) + if "error" in data: + print(f"Error from speedtest: {data['error']}") + else: + # Convert bytes/sec to Mbps (1 Mbps = 125,000 bytes/sec) + dl_mbps = data['download']['bandwidth'] / 125000 + ul_mbps = data['upload']['bandwidth'] / 125000 + ping = data['ping']['latency'] + + print(f"\nSUCCESS! Results for {target_name}:") + print(f"Ping: {ping} ms") + print(f"Download: {dl_mbps:.2f} Mbps") + print(f"Upload: {ul_mbps:.2f} Mbps") + print(f"Link: {data['result']['url']}") + except Exception as e: + print(f"Failed to parse results: {e}") + print(f"Raw Output: {result_json}") + +if __name__ == "__main__": + asyncio.run(main())