93 lines
3.3 KiB
Python
93 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
import sys
|
|
import os
|
|
import json
|
|
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
|
|
# Add src to path to import BillingDatabase
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
from src.billing import BillingDatabase
|
|
|
|
# Connection helper
|
|
def fetch_router_secrets(host, port, user, password):
|
|
url = f"http://{host}:{port}/rest/ppp/secret"
|
|
try:
|
|
print(f"Connecting to {host}...")
|
|
resp = requests.get(url, auth=HTTPBasicAuth(user, password), timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
# Ensure list
|
|
return data if isinstance(data, list) else [data] if data else []
|
|
except Exception as e:
|
|
print(f"Error fetching from {host}: {e}")
|
|
return []
|
|
|
|
def audit():
|
|
# 1. Load Billing Data
|
|
print("--- Loading Billing Data ---")
|
|
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
|
|
with open(config_path, 'r') as f:
|
|
config = json.load(f)
|
|
|
|
billing = BillingDatabase(config['billing_databases'])
|
|
# Force refresh or just load? Let's use search with empty query to get all from cache
|
|
# If cache is old, audit might be wrong. But user didn't ask to refresh.
|
|
# However, for safety, let's try to get all from current snapshot server.
|
|
# Assuming 'dimensi' is the main one.
|
|
|
|
# We need to know which billing server to compare against. Assuming ALL users in billing are valid.
|
|
# So we get all users from the default/active snapshot.
|
|
res = billing.search_customers("", limit=10000) # Get all
|
|
if not res['success']:
|
|
print(f"Failed to load billing data: {res.get('error')}")
|
|
return
|
|
|
|
billing_users = {c.get('user_mikrotik') for c in res['customers'] if c.get('user_mikrotik')}
|
|
print(f"Loaded {len(billing_users)} users from Billing.")
|
|
|
|
# 2. Define Routers
|
|
routers = [
|
|
{
|
|
"name": "router-dimensi-dell",
|
|
"host": "103.138.63.178",
|
|
"port": 80,
|
|
"user": "chatbot",
|
|
"pass": "K0s0ng11@2026"
|
|
},
|
|
{
|
|
"name": "ccr1036",
|
|
"host": "103.138.63.184",
|
|
"port": 80,
|
|
"user": "chatbot",
|
|
"pass": "K0s0ng11@2026"
|
|
}
|
|
]
|
|
|
|
# 3. Audit Each Router
|
|
for r in routers:
|
|
print(f"\n--- Auditing {r['name']} ({r['host']}) ---")
|
|
secrets = fetch_router_secrets(r['host'], r['port'], r['user'], r['pass'])
|
|
print(f"Found {len(secrets)} PPP secrets on router.")
|
|
|
|
unregistered = []
|
|
for s in secrets:
|
|
name = s.get('name')
|
|
# Ignore auto-generated or system users if any? usually simple check
|
|
if name and name not in billing_users:
|
|
unregistered.append(name)
|
|
|
|
if unregistered:
|
|
print(f"⚠️ FOUND {len(unregistered)} UNREGISTERED USERS (Exist on Router but NOT in Billing):")
|
|
for u in sorted(unregistered):
|
|
# Maybe print profile too to see if it's a real user
|
|
# finding the secret object again for detail
|
|
sec_obj = next((x for x in secrets if x['name'] == u), {})
|
|
profile = sec_obj.get('profile', '?')
|
|
print(f" - {u} (Profile: {profile})")
|
|
else:
|
|
print("✅ All users on this router are registered in billing.")
|
|
|
|
if __name__ == "__main__":
|
|
audit()
|