#!/usr/bin/env python3 import sys import os import json import requests from requests.auth import HTTPBasicAuth # Add src to path to import BillingDatabase sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from src.billing import BillingDatabase # Connection helper def fetch_router_secrets(host, port, user, password): url = f"http://{host}:{port}/rest/ppp/secret" try: print(f"Connecting to {host}...") resp = requests.get(url, auth=HTTPBasicAuth(user, password), timeout=10) resp.raise_for_status() data = resp.json() # Ensure list return data if isinstance(data, list) else [data] if data else [] except Exception as e: print(f"Error fetching from {host}: {e}") return [] def audit(): # 1. Load Billing Data print("--- Loading Billing Data ---") config_path = os.path.join(os.path.dirname(__file__), 'config.json') with open(config_path, 'r') as f: config = json.load(f) billing = BillingDatabase(config['billing_databases']) # Force refresh or just load? Let's use search with empty query to get all from cache # If cache is old, audit might be wrong. But user didn't ask to refresh. # However, for safety, let's try to get all from current snapshot server. # Assuming 'dimensi' is the main one. # We need to know which billing server to compare against. Assuming ALL users in billing are valid. # So we get all users from the default/active snapshot. res = billing.search_customers("", limit=10000) # Get all if not res['success']: print(f"Failed to load billing data: {res.get('error')}") return billing_users = {c.get('user_mikrotik') for c in res['customers'] if c.get('user_mikrotik')} print(f"Loaded {len(billing_users)} users from Billing.") # 2. Define Routers routers = [ { "name": "router-dimensi-dell", "host": "103.138.63.178", "port": 80, "user": "chatbot", "pass": "K0s0ng11@2026" }, { "name": "ccr1036", "host": "103.138.63.184", "port": 80, "user": "chatbot", "pass": "K0s0ng11@2026" } ] # 3. Audit Each Router for r in routers: print(f"\n--- Auditing {r['name']} ({r['host']}) ---") secrets = fetch_router_secrets(r['host'], r['port'], r['user'], r['pass']) print(f"Found {len(secrets)} PPP secrets on router.") unregistered = [] for s in secrets: name = s.get('name') # Ignore auto-generated or system users if any? usually simple check if name and name not in billing_users: unregistered.append(name) if unregistered: print(f"⚠️ FOUND {len(unregistered)} UNREGISTERED USERS (Exist on Router but NOT in Billing):") for u in sorted(unregistered): # Maybe print profile too to see if it's a real user # finding the secret object again for detail sec_obj = next((x for x in secrets if x['name'] == u), {}) profile = sec_obj.get('profile', '?') print(f" - {u} (Profile: {profile})") else: print("✅ All users on this router are registered in billing.") if __name__ == "__main__": audit()