Files
billing-MCP/audit_users.py

93 lines
3.3 KiB
Python

#!/usr/bin/env python3
import sys
import os
import json
import requests
from requests.auth import HTTPBasicAuth
# Add src to path to import BillingDatabase
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from src.billing import BillingDatabase
# Connection helper
def fetch_router_secrets(host, port, user, password):
url = f"http://{host}:{port}/rest/ppp/secret"
try:
print(f"Connecting to {host}...")
resp = requests.get(url, auth=HTTPBasicAuth(user, password), timeout=10)
resp.raise_for_status()
data = resp.json()
# Ensure list
return data if isinstance(data, list) else [data] if data else []
except Exception as e:
print(f"Error fetching from {host}: {e}")
return []
def audit():
# 1. Load Billing Data
print("--- Loading Billing Data ---")
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
with open(config_path, 'r') as f:
config = json.load(f)
billing = BillingDatabase(config['billing_databases'])
# Force refresh or just load? Let's use search with empty query to get all from cache
# If cache is old, audit might be wrong. But user didn't ask to refresh.
# However, for safety, let's try to get all from current snapshot server.
# Assuming 'dimensi' is the main one.
# We need to know which billing server to compare against. Assuming ALL users in billing are valid.
# So we get all users from the default/active snapshot.
res = billing.search_customers("", limit=10000) # Get all
if not res['success']:
print(f"Failed to load billing data: {res.get('error')}")
return
billing_users = {c.get('user_mikrotik') for c in res['customers'] if c.get('user_mikrotik')}
print(f"Loaded {len(billing_users)} users from Billing.")
# 2. Define Routers
routers = [
{
"name": "router-dimensi-dell",
"host": "103.138.63.178",
"port": 80,
"user": "chatbot",
"pass": "K0s0ng11@2026"
},
{
"name": "ccr1036",
"host": "103.138.63.184",
"port": 80,
"user": "chatbot",
"pass": "K0s0ng11@2026"
}
]
# 3. Audit Each Router
for r in routers:
print(f"\n--- Auditing {r['name']} ({r['host']}) ---")
secrets = fetch_router_secrets(r['host'], r['port'], r['user'], r['pass'])
print(f"Found {len(secrets)} PPP secrets on router.")
unregistered = []
for s in secrets:
name = s.get('name')
# Ignore auto-generated or system users if any? usually simple check
if name and name not in billing_users:
unregistered.append(name)
if unregistered:
print(f"⚠️ FOUND {len(unregistered)} UNREGISTERED USERS (Exist on Router but NOT in Billing):")
for u in sorted(unregistered):
# Maybe print profile too to see if it's a real user
# finding the secret object again for detail
sec_obj = next((x for x in secrets if x['name'] == u), {})
profile = sec_obj.get('profile', '?')
print(f" - {u} (Profile: {profile})")
else:
print("✅ All users on this router are registered in billing.")
if __name__ == "__main__":
audit()