build: Install Python dependencies and initialize the virtual environment.
This commit is contained in:
92
audit_users.py
Normal file
92
audit_users.py
Normal file
@@ -0,0 +1,92 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
# Add src to path to import BillingDatabase
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from src.billing import BillingDatabase
|
||||
|
||||
# Connection helper
|
||||
def fetch_router_secrets(host, port, user, password):
|
||||
url = f"http://{host}:{port}/rest/ppp/secret"
|
||||
try:
|
||||
print(f"Connecting to {host}...")
|
||||
resp = requests.get(url, auth=HTTPBasicAuth(user, password), timeout=10)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
# Ensure list
|
||||
return data if isinstance(data, list) else [data] if data else []
|
||||
except Exception as e:
|
||||
print(f"Error fetching from {host}: {e}")
|
||||
return []
|
||||
|
||||
def audit():
|
||||
# 1. Load Billing Data
|
||||
print("--- Loading Billing Data ---")
|
||||
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
|
||||
with open(config_path, 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
billing = BillingDatabase(config['billing_databases'])
|
||||
# Force refresh or just load? Let's use search with empty query to get all from cache
|
||||
# If cache is old, audit might be wrong. But user didn't ask to refresh.
|
||||
# However, for safety, let's try to get all from current snapshot server.
|
||||
# Assuming 'dimensi' is the main one.
|
||||
|
||||
# We need to know which billing server to compare against. Assuming ALL users in billing are valid.
|
||||
# So we get all users from the default/active snapshot.
|
||||
res = billing.search_customers("", limit=10000) # Get all
|
||||
if not res['success']:
|
||||
print(f"Failed to load billing data: {res.get('error')}")
|
||||
return
|
||||
|
||||
billing_users = {c.get('user_mikrotik') for c in res['customers'] if c.get('user_mikrotik')}
|
||||
print(f"Loaded {len(billing_users)} users from Billing.")
|
||||
|
||||
# 2. Define Routers
|
||||
routers = [
|
||||
{
|
||||
"name": "router-dimensi-dell",
|
||||
"host": "103.138.63.178",
|
||||
"port": 80,
|
||||
"user": "chatbot",
|
||||
"pass": "K0s0ng11@2026"
|
||||
},
|
||||
{
|
||||
"name": "ccr1036",
|
||||
"host": "103.138.63.184",
|
||||
"port": 80,
|
||||
"user": "chatbot",
|
||||
"pass": "K0s0ng11@2026"
|
||||
}
|
||||
]
|
||||
|
||||
# 3. Audit Each Router
|
||||
for r in routers:
|
||||
print(f"\n--- Auditing {r['name']} ({r['host']}) ---")
|
||||
secrets = fetch_router_secrets(r['host'], r['port'], r['user'], r['pass'])
|
||||
print(f"Found {len(secrets)} PPP secrets on router.")
|
||||
|
||||
unregistered = []
|
||||
for s in secrets:
|
||||
name = s.get('name')
|
||||
# Ignore auto-generated or system users if any? usually simple check
|
||||
if name and name not in billing_users:
|
||||
unregistered.append(name)
|
||||
|
||||
if unregistered:
|
||||
print(f"⚠️ FOUND {len(unregistered)} UNREGISTERED USERS (Exist on Router but NOT in Billing):")
|
||||
for u in sorted(unregistered):
|
||||
# Maybe print profile too to see if it's a real user
|
||||
# finding the secret object again for detail
|
||||
sec_obj = next((x for x in secrets if x['name'] == u), {})
|
||||
profile = sec_obj.get('profile', '?')
|
||||
print(f" - {u} (Profile: {profile})")
|
||||
else:
|
||||
print("✅ All users on this router are registered in billing.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
audit()
|
||||
Reference in New Issue
Block a user