257 lines
9.7 KiB
Python
257 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Context Optimizer untuk MikroTik Gemini Integration
|
|
|
|
Optimasi data context untuk Gemini AI agar sesuai dengan token limits
|
|
"""
|
|
|
|
import json
|
|
from typing import Dict, List, Any
|
|
|
|
class ContextOptimizer:
|
|
"""Optimize MikroTik data for Gemini context"""
|
|
|
|
def __init__(self, max_tokens: int = 8000):
|
|
"""
|
|
Initialize optimizer dengan max tokens
|
|
"""
|
|
self.max_tokens = max_tokens
|
|
self.estimated_tokens_per_char = 0.25 # Estimasi kasar
|
|
|
|
def _is_true(self, value: Any) -> bool:
|
|
"""Helper untuk cek boolean dari MikroTik (bisa bool atau string 'true')"""
|
|
if isinstance(value, bool):
|
|
return value
|
|
if isinstance(value, str):
|
|
return value.lower() == "true"
|
|
return False
|
|
|
|
def estimate_tokens(self, text: str) -> int:
|
|
"""Estimasi jumlah tokens dari text"""
|
|
return int(len(text) * self.estimated_tokens_per_char)
|
|
|
|
def optimize_system_info(self, system_data: Dict) -> Dict:
|
|
"""Optimize system information"""
|
|
resource = system_data.get("resource", {})
|
|
|
|
# Ambil hanya field penting
|
|
optimized = {
|
|
"board_name": resource.get("board-name", "N/A"),
|
|
"cpu_load": resource.get("cpu-load", "N/A"),
|
|
"cpu_count": resource.get("cpu-count", "N/A"),
|
|
"free_memory": self._format_memory(resource.get("free-memory", 0)),
|
|
"total_memory": self._format_memory(resource.get("total-memory", 0)),
|
|
"uptime": resource.get("uptime", "N/A"),
|
|
"version": resource.get("version", "N/A"),
|
|
}
|
|
|
|
return optimized
|
|
|
|
def optimize_interfaces(self, interfaces: List[Dict], max_interfaces: int = 20) -> List[Dict]:
|
|
"""Optimize interfaces list"""
|
|
if not interfaces:
|
|
return []
|
|
|
|
# Prioritize: running interfaces, then physical, then others
|
|
running = [i for i in interfaces if self._is_true(i.get("running"))]
|
|
not_running = [i for i in interfaces if not self._is_true(i.get("running"))]
|
|
|
|
# Ambil sample dari masing-masing
|
|
optimized = []
|
|
|
|
# Running interfaces (prioritas tinggi)
|
|
for i in running[:max_interfaces//2]:
|
|
optimized.append({
|
|
"name": i.get("name", "N/A"),
|
|
"type": i.get("type", "N/A"),
|
|
"mtu": i.get("mtu", "N/A"),
|
|
"rx": i.get("rx-byte", "N/A"),
|
|
"tx": i.get("tx-byte", "N/A"),
|
|
"status": "running"
|
|
})
|
|
|
|
# Non-running interfaces
|
|
for i in not_running[:max_interfaces//4]:
|
|
optimized.append({
|
|
"name": i.get("name", "N/A"),
|
|
"type": i.get("type", "N/A"),
|
|
"status": "not running",
|
|
"disabled": i.get("disabled", "N/A")
|
|
})
|
|
|
|
return optimized
|
|
|
|
def optimize_ppp_secrets(self, secrets: List[Dict], max_secrets: int = 10) -> List[Dict]:
|
|
"""Optimize PPP secrets list"""
|
|
if not secrets:
|
|
return []
|
|
|
|
# Ambil hanya field penting
|
|
optimized = []
|
|
for secret in secrets[:max_secrets]:
|
|
optimized.append({
|
|
"name": secret.get("name", "N/A"),
|
|
"service": secret.get("service", "N/A"),
|
|
"disabled": self._is_true(secret.get("disabled")),
|
|
"profile": secret.get("profile", "N/A")
|
|
})
|
|
|
|
return optimized
|
|
|
|
def optimize_routes(self, routes: List[Dict], max_routes: int = 10) -> List[Dict]:
|
|
"""Optimize routes list"""
|
|
if not routes:
|
|
return []
|
|
|
|
# Prioritize default routes dan active routes
|
|
default_routes = [r for r in routes if r.get("dst-address", "") == "0.0.0.0/0"]
|
|
other_routes = [r for r in routes if r.get("dst-address", "") != "0.0.0.0/0"]
|
|
|
|
optimized = []
|
|
|
|
# Default routes
|
|
for route in default_routes[:2]:
|
|
optimized.append({
|
|
"dst": route.get("dst-address", "N/A"),
|
|
"gateway": route.get("gateway", "N/A"),
|
|
"distance": route.get("distance", "N/A"),
|
|
"scope": route.get("scope", "N/A")
|
|
})
|
|
|
|
# Other important routes
|
|
for route in other_routes[:max_routes-2]:
|
|
optimized.append({
|
|
"dst": route.get("dst-address", "N/A"),
|
|
"gateway": route.get("gateway", "N/A")
|
|
})
|
|
|
|
return optimized
|
|
|
|
def optimize_logs(self, logs: List[Dict], max_logs: int = 40) -> List[Dict]:
|
|
"""Optimize and filter logs (Prioritas: critical, error, warning)"""
|
|
if not logs:
|
|
return []
|
|
|
|
# Prioritas topik
|
|
high_priority = ['critical', 'error', 'warning', 'caps']
|
|
|
|
important = []
|
|
others = []
|
|
|
|
# Sort logs (MikroTik biasanya kasih dari terlama ke terbaru, kita mau terbaru dulu)
|
|
sorted_logs = list(reversed(logs))
|
|
|
|
for log in sorted_logs:
|
|
topics = log.get("topics", "")
|
|
message = log.get("message", "")
|
|
|
|
# Cek apakah mengandung topik prioritas
|
|
if any(p in topics for p in high_priority):
|
|
important.append({
|
|
"time": log.get("time", "N/A"),
|
|
"topics": topics,
|
|
"msg": message
|
|
})
|
|
else:
|
|
others.append({
|
|
"time": log.get("time", "N/A"),
|
|
"topics": topics,
|
|
"msg": message
|
|
})
|
|
|
|
# Gabungkan: Important dulu, lalu others sampai limit
|
|
optimized = important[:max_logs]
|
|
if len(optimized) < max_logs:
|
|
optimized.extend(others[:max_logs - len(optimized)])
|
|
|
|
return optimized
|
|
|
|
def optimize_hotspot(self, active_users: List[Dict], max_users: int = 15) -> List[Dict]:
|
|
"""Optimize hotspot active users list"""
|
|
if not active_users:
|
|
return []
|
|
|
|
optimized = []
|
|
for user in active_users[:max_users]:
|
|
optimized.append({
|
|
"user": user.get("user", "N/A"),
|
|
"address": user.get("address", "N/A"),
|
|
"uptime": user.get("uptime", "N/A"),
|
|
"bytes_in": self._format_memory(user.get("bytes-in", 0)),
|
|
"bytes_out": self._format_memory(user.get("bytes-out", 0))
|
|
})
|
|
return optimized
|
|
|
|
def create_optimized_context(self, mikrotik_data: Dict, question: str = "") -> Dict:
|
|
"""Create optimized context untuk Gemini"""
|
|
optimized = {
|
|
"metadata": {
|
|
"original_size": len(json.dumps(mikrotik_data)),
|
|
"optimized_for": question[:50] + "..." if len(question) > 50 else question,
|
|
"strategy": "priority_based_compression"
|
|
},
|
|
"summary": {
|
|
"system": self.optimize_system_info(mikrotik_data.get("system", {})),
|
|
"interfaces_count": len(mikrotik_data.get("interfaces", {}).get("interfaces", [])),
|
|
"running_interfaces_count": sum(1 for i in mikrotik_data.get("interfaces", {}).get("interfaces", []) if self._is_true(i.get("running"))),
|
|
"ppp_secrets_count": len(mikrotik_data.get("ppp", {}).get("secrets", [])),
|
|
"routes_count": len(mikrotik_data.get("network", {}).get("routes", [])),
|
|
},
|
|
"details": {
|
|
"interfaces_sample": self.optimize_interfaces(mikrotik_data.get("interfaces", {}).get("interfaces", [])),
|
|
"ppp_secrets_sample": self.optimize_ppp_secrets(mikrotik_data.get("ppp", {}).get("secrets", [])),
|
|
"routes_sample": self.optimize_routes(mikrotik_data.get("network", {}).get("routes", [])),
|
|
"recent_logs": self.optimize_logs(mikrotik_data.get("logs", [])),
|
|
"hotspot_active": self.optimize_hotspot(mikrotik_data.get("hotspot", {}).get("active", [])),
|
|
}
|
|
}
|
|
|
|
# Estimate tokens
|
|
context_text = json.dumps(optimized)
|
|
optimized["metadata"]["estimated_tokens"] = self.estimate_tokens(context_text)
|
|
optimized["metadata"]["compression_ratio"] = optimized["metadata"]["original_size"] / len(context_text) if len(context_text) > 0 else 1
|
|
|
|
return optimized
|
|
|
|
def _format_memory(self, bytes_value: Any) -> str:
|
|
"""Format memory bytes to human readable"""
|
|
try:
|
|
val = float(bytes_value)
|
|
except (ValueError, TypeError):
|
|
return "0 B"
|
|
|
|
if val == 0:
|
|
return "0 B"
|
|
|
|
for unit in ['B', 'KB', 'MB', 'GB']:
|
|
if val < 1024.0:
|
|
return f"{val:.1f} {unit}"
|
|
val /= 1024.0
|
|
return f"{val:.1f} TB"
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
# Load sample data
|
|
try:
|
|
with open("../mikrotik_mcp_data.json", "r") as f:
|
|
data = json.load(f)
|
|
except:
|
|
print("Error loading data")
|
|
sys.exit(1)
|
|
|
|
optimizer = ContextOptimizer()
|
|
optimized = optimizer.create_optimized_context(data, "test question")
|
|
|
|
print(f"Original size: {optimized['metadata']['original_size']} bytes")
|
|
print(f"Optimized size: {len(json.dumps(optimized))} bytes")
|
|
print(f"Compression ratio: {optimized['metadata']['compression_ratio']:.1f}x")
|
|
print(f"Estimated tokens: {optimized['metadata']['estimated_tokens']}")
|
|
print(f"Max tokens: {optimizer.max_tokens}")
|
|
|
|
if optimized["metadata"]["estimated_tokens"] > optimizer.max_tokens:
|
|
print("⚠️ Warning: Context mungkin terlalu besar")
|
|
else:
|
|
print("✅ Context size OK")
|