Files
iix/sync_routing.py

110 lines
4.4 KiB
Python

import os
import requests
import json
import ipaddress
from dotenv import load_dotenv
# Load configuration dari file .env
load_dotenv()
BGP_ROUTER_IP = os.getenv("BGP_ROUTER_IP")
BGP_ROUTER_USER = os.getenv("BGP_ROUTER_USER")
BGP_ROUTER_PASSWORD = os.getenv("BGP_ROUTER_PASSWORD")
# URL API RouterOS v7 untuk membaca tabel routing
API_URL = f"http://{BGP_ROUTER_IP}/rest/routing/route"
# Kredensial basic auth
auth = (BGP_ROUTER_USER, BGP_ROUTER_PASSWORD)
def get_local_bgp_routes():
"""Mengambil rentang IP Lokal/CDN dari tabel routing BGP secara efisien menggunakan parameter query API MikroTik"""
print(f"Menghubungi router {BGP_ROUTER_IP} via REST API...")
all_local_ips = []
try:
# PENGAMBILAN RUTE KONTEN / CDN
# Di RouterOS v7, rute CDN diset distance-nya menjadi -5 dari base eBGP (20), sehingga menjadi 15.
# Kita menggunakan query string "?distance=15&.proplist=dst-address" agar routeros
# hanya me-return rute yang bersangkutan secara instan tanpa perlu meloop 900.000 rute.
print("Mengambil rute Konten / CDN (Distance 15)... Ini dapat memakan waktu hingga satu menit.")
res_cdn = requests.get(
API_URL,
auth=auth,
params={"distance": "15", ".proplist": "dst-address"},
verify=False,
timeout=120
)
if res_cdn.status_code == 200:
cdn_routes = res_cdn.json()
ids = [r.get("dst-address") for r in cdn_routes if r.get("dst-address")]
print(f" -> Berhasil mengambil {len(ids)} rute CDN.")
all_local_ips.extend(ids)
else:
print(f" -> Error CDN: {res_cdn.status_code} - {res_cdn.text}")
# PENGAMBILAN RUTE NIX / LOKAL OpenIXP
# Berdasarkan konfigurasi bgp-router.rsc, peer NIX menggunakan local.role=ibgp (Distance base: 200).
print("Mengambil rute NIX / OpenIXP (Distance 200)... Ini dapat memakan waktu hingga satu menit.")
res_nix = requests.get(
API_URL,
auth=auth,
params={"distance": "200", ".proplist": "dst-address"},
verify=False,
timeout=120
)
if res_nix.status_code == 200:
nix_routes = res_nix.json()
ids = [r.get("dst-address") for r in nix_routes if r.get("dst-address")]
print(f" -> Berhasil mengambil {len(ids)} rute NIX.")
all_local_ips.extend(ids)
else:
print(f" -> Error NIX: {res_nix.status_code} - {res_nix.text}")
# Hapus default routes (0.0.0.0/0 dan ::/0) jika ada
all_local_ips = [ip for ip in all_local_ips if ip not in ("0.0.0.0/0", "::/0")]
return list(set(all_local_ips)) # Kembalikan list unik
except requests.exceptions.RequestException as e:
print(f"Gagal mengambil data dari router via API: {e}")
return []
def generate_address_list_script(route_list, list_name, filename):
"""Membentuk file .rsc yang berisi perintah create address-list MikroTik"""
if not route_list:
print(f"List kosong, tidak ada rute LOKAL yang ditemukan.")
return
print(f"Membuat file {filename} dengan {len(route_list)} subnet IP...")
with open(filename, "w") as f:
f.write(f"# Dibuat otomatis via Script API Python\n")
f.write(f"/ip firewall address-list\n")
# Bersihkan list lama sebelum memasukkan yang baru
f.write(f"remove [find list=\"{list_name}\"]\n")
for ip in route_list:
f.write(f"add list={list_name} address=\"{ip}\"\n")
print(f"File {filename} berhasil dibuat.")
if __name__ == "__main__":
if not all([BGP_ROUTER_IP, BGP_ROUTER_USER, BGP_ROUTER_PASSWORD]):
print("Error: Variabel kredensial/BGP_ROUTER_IP belum diset dengan benar di file .env")
exit(1)
print("--- Memulai Proses Ekstraksi Ringan Routing LOKAL ---")
local_ips = get_local_bgp_routes()
if local_ips:
print(f"\nRingkasan:")
print(f"Total Subnet IP Lokal & CDN: {len(local_ips)}")
# Generate script address-list khusus untuk trafik Lokal
generate_address_list_script(local_ips, "ip-lokal", "routing-lokal.rsc")
print("\nSelesai! File routing-lokal.rsc siap di-upload ke router distribusi.")
print("Di Router Distribusi jalankan perintah: /import file-name=routing-lokal.rsc")