134 lines
4.5 KiB
Python
134 lines
4.5 KiB
Python
import json
|
|
import os
|
|
import uuid
|
|
import base64
|
|
|
|
import sys
|
|
|
|
class ConfigManager:
|
|
DEFAULT_JOB = {
|
|
"id": "",
|
|
"name": "New Backup Job",
|
|
"type": 0, # 0: File, 1: MySQL, 2: Postgres
|
|
"dest": "",
|
|
# File
|
|
"source": "",
|
|
"zip": True,
|
|
# DB
|
|
"host": "localhost",
|
|
"port": "",
|
|
"user": "",
|
|
"password": "",
|
|
"database": "",
|
|
"retention_days": 0 # 0 means forever
|
|
}
|
|
|
|
DEFAULT_CONFIG = {
|
|
"jobs": [],
|
|
"schedule_enabled": False,
|
|
"schedule_interval": 60,
|
|
"schedule_unit": "Menit"
|
|
}
|
|
|
|
@staticmethod
|
|
def _get_base_dir():
|
|
if getattr(sys, 'frozen', False):
|
|
return os.path.dirname(sys.executable)
|
|
return os.path.dirname(os.path.abspath(__file__))
|
|
|
|
@staticmethod
|
|
def _get_config_path(filename="config.dat"):
|
|
return os.path.join(ConfigManager._get_base_dir(), filename)
|
|
|
|
@staticmethod
|
|
def create_new_job():
|
|
job = ConfigManager.DEFAULT_JOB.copy()
|
|
job["id"] = str(uuid.uuid4())
|
|
return job
|
|
|
|
@staticmethod
|
|
def load_config():
|
|
config_dat = ConfigManager._get_config_path("config.dat")
|
|
config_json = ConfigManager._get_config_path("config.json")
|
|
|
|
# Check for old unencrypted config.json and migrate
|
|
if os.path.exists(config_json):
|
|
try:
|
|
with open(config_json, 'r') as f:
|
|
data = json.load(f)
|
|
print("Migrasi config.json...")
|
|
config = ConfigManager._migrate_old_config(data)
|
|
ConfigManager.save_config(config)
|
|
os.rename(config_json, config_json + ".bak")
|
|
return config
|
|
except Exception as e:
|
|
print("Error migrasi config: {}".format(e))
|
|
|
|
if not os.path.exists(config_dat):
|
|
return ConfigManager.DEFAULT_CONFIG.copy()
|
|
|
|
try:
|
|
with open(config_dat, 'rb') as f:
|
|
encoded_data = f.read()
|
|
|
|
# Simple Base64 Decode
|
|
decoded_data = base64.b64decode(encoded_data)
|
|
data = json.loads(decoded_data.decode('utf-8'))
|
|
|
|
config = ConfigManager.DEFAULT_CONFIG.copy()
|
|
config.update(data)
|
|
return config
|
|
except Exception as e:
|
|
print("Error loading config: {}".format(e))
|
|
return ConfigManager.DEFAULT_CONFIG.copy()
|
|
|
|
@staticmethod
|
|
def _migrate_old_config(data):
|
|
"""Migrate old config format to new format"""
|
|
config = ConfigManager.DEFAULT_CONFIG.copy()
|
|
|
|
# Check if it's the very old single-job format
|
|
if "backup_type" in data:
|
|
job = ConfigManager.create_new_job()
|
|
job["name"] = "Default Backup"
|
|
job["type"] = data.get("backup_type", 0)
|
|
job["dest"] = data.get("dest_path", "")
|
|
job["source"] = data.get("file_source", "")
|
|
job["zip"] = data.get("zip_enabled", True)
|
|
|
|
if job["type"] == 1: # MySQL
|
|
job["host"] = data.get("mysql_host", "localhost")
|
|
job["port"] = data.get("mysql_port", "3306")
|
|
job["user"] = data.get("mysql_user", "root")
|
|
job["password"] = data.get("mysql_pass", "")
|
|
job["database"] = data.get("mysql_db", "")
|
|
elif job["type"] == 2: # Postgres
|
|
job["host"] = data.get("pg_host", "localhost")
|
|
job["port"] = data.get("pg_port", "5432")
|
|
job["user"] = data.get("pg_user", "postgres")
|
|
job["password"] = data.get("pg_pass", "")
|
|
job["database"] = data.get("pg_db", "")
|
|
|
|
config["jobs"] = [job]
|
|
config["schedule_enabled"] = data.get("schedule_enabled", False)
|
|
config["schedule_interval"] = data.get("schedule_interval", 60)
|
|
config["schedule_unit"] = data.get("schedule_unit", "Menit")
|
|
else:
|
|
# Already in new format
|
|
config.update(data)
|
|
|
|
return config
|
|
|
|
@staticmethod
|
|
def save_config(config):
|
|
try:
|
|
json_data = json.dumps(config, indent=4)
|
|
# Simple Base64 Encode
|
|
encoded_data = base64.b64encode(json_data.encode('utf-8'))
|
|
|
|
config_path = ConfigManager._get_config_path("config.dat")
|
|
with open(config_path, 'wb') as f:
|
|
f.write(encoded_data)
|
|
except Exception as e:
|
|
print("Error saving config: {}".format(e))
|