Files
xp-probackup/backup_manager.py

722 lines
28 KiB
Python

import os
import sys
import shutil
import zipfile
import time
import datetime
import subprocess
from abc import ABC, abstractmethod
# Try importing DB libraries, but don't fail if they aren't installed yet
# (UI might just disable those options or show error on run)
try:
import pymysql
except ImportError:
pymysql = None
try:
import psycopg2
except ImportError:
psycopg2 = None
def get_mysql_info(host, port, user, password, skip_ssl=True):
"""
Connect to MySQL server and get version + database list.
Returns:
tuple: (version_string, [database_list]) or (None, []) on error
"""
if pymysql is None:
return None, [], "pymysql library not installed"
try:
# Build connection kwargs
conn_kwargs = {
'host': host,
'port': int(port),
'user': user,
'password': password,
'connect_timeout': 10,
}
# Add SSL settings
if skip_ssl:
conn_kwargs['ssl'] = None
conn_kwargs['ssl_disabled'] = True
conn = pymysql.connect(**conn_kwargs)
cursor = conn.cursor()
# Get version
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()[0]
# Get database list
cursor.execute("SHOW DATABASES")
databases = [row[0] for row in cursor.fetchall()]
# Filter out system databases
system_dbs = ['information_schema', 'performance_schema', 'mysql', 'sys']
user_dbs = [db for db in databases if db.lower() not in system_dbs]
cursor.close()
conn.close()
return version, user_dbs, None
except Exception as e:
return None, [], str(e)
def get_postgres_info(host, port, user, password):
"""
Connect to PostgreSQL server and get version + database list.
Returns:
tuple: (version_string, [database_list], error_msg) or (None, [], error) on error
"""
if psycopg2 is None:
return None, [], "psycopg2 library not installed"
try:
conn = psycopg2.connect(
host=host,
port=int(port),
user=user,
password=password,
dbname='postgres', # Connect to default db first
connect_timeout=10
)
cursor = conn.cursor()
# Get version
cursor.execute("SELECT version()")
full_version = cursor.fetchone()[0]
# Extract just version number (e.g., "PostgreSQL 15.2")
version = full_version.split(',')[0] if ',' in full_version else full_version.split(' on ')[0]
# Get database list
cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname")
databases = [row[0] for row in cursor.fetchall()]
# Filter out system databases
system_dbs = ['postgres']
user_dbs = [db for db in databases if db.lower() not in system_dbs]
cursor.close()
conn.close()
return version, user_dbs, None
except Exception as e:
return None, [], str(e)
def format_size(size_bytes):
"""Format bytes to human readable size."""
if size_bytes < 1024:
return "{} B".format(size_bytes)
elif size_bytes < 1024 * 1024:
return "{:.1f} KB".format(size_bytes / 1024)
elif size_bytes < 1024 * 1024 * 1024:
return "{:.2f} MB".format(size_bytes / (1024 * 1024))
else:
return "{:.2f} GB".format(size_bytes / (1024 * 1024 * 1024))
def find_sql_tool(tool_name, server_type=None, server_version=None):
"""
Find SQL tool executable with version support.
Priority:
1. Exact match in sql_tools/{type}/{version}/tool.exe
2. Best match (same major version) in sql_tools/{type}/
3. Root sql_tools/tool.exe (legacy/generic)
4. Bundled sql_tools
Args:
tool_name: e.g., 'mysqldump.exe'
server_type: 'mysql', 'mariadb', 'postgres'
server_version: e.g., '10.6.12', '8.0.32'
"""
# Define tool variants
tool_variants = {
'mysqldump.exe': ['mysqldump.exe', 'mariadb-dump.exe', 'mariadump.exe', 'mysql_dump.exe'],
'pg_dump.exe': ['pg_dump.exe', 'pgdump.exe'],
'pg_dumpall.exe': ['pg_dumpall.exe', 'pgdumpall.exe']
}
names_to_search = tool_variants.get(tool_name, [tool_name])
# Base directories
if getattr(sys, 'frozen', False):
base_dir = os.path.dirname(sys.executable)
bundle_dir = getattr(sys, '_MEIPASS', os.path.dirname(sys.executable))
else:
base_dir = os.path.dirname(os.path.abspath(__file__))
bundle_dir = base_dir
search_roots = [
os.path.join(base_dir, 'sql_tools'),
os.path.join(bundle_dir, 'sql_tools')
]
# Helper to check file existence
def check_exists(path):
for name in names_to_search:
full_path = os.path.join(path, name)
if os.path.exists(full_path):
return full_path
return None
# 1. Version Specific Search
if server_type and server_version:
import re
# Extract major.minor (e.g. 8.0 from 8.0.33)
v_match = re.match(r'(\d+\.\d+)', str(server_version))
if v_match:
short_ver = v_match.group(1)
for root in search_roots:
# Try exact version folder: sql_tools/mysql/8.0/
version_path = os.path.join(root, server_type, short_ver)
found = check_exists(version_path)
if found: return found
# Try closest match (simplified: just list dirs and find same major)
# e.g. looking for 8.0, but only 8.4 is there -> might be okay for upgrade?
# Better: exact match first. If not found, look for newer compatible?
# For now let's stick to simple fallbacks if specific version not found.
# 2. Root Search (Legacy/Fallback)
for root in search_roots:
found = check_exists(root)
if found: return found
return None
class BackupStrategy(ABC):
def __init__(self, update_signal=None, log_signal=None):
self.update_signal = update_signal
self.log_signal = log_signal
self._cancel_flag = False
def log(self, message):
if self.log_signal:
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.log_signal.emit("[{}] {}".format(timestamp, message))
else:
print(message)
def progress(self, value):
if self.update_signal:
self.update_signal.emit(value)
def cancel(self):
self._cancel_flag = True
self.log("Pembatalan (diminta/tertunda)...")
def is_cancelled(self):
return self._cancel_flag
def cleanup_old_backups(self, target_folder, retention_limit):
"""
Count-Based Retention (Scoped):
Keep 'retention_limit' newest items (files/folders) in 'target_folder'.
Do NOT scan recursively. Do NOT look at other date folders.
"""
if retention_limit <= 0:
return
if not os.path.exists(target_folder):
return
self.log("Menjalankan pembersihan di {} (Batasan: {} item terbaru)...".format(os.path.basename(target_folder), retention_limit))
all_backups = []
try:
# 1. Collect all items in this SPECIFIC folder only (Non-recursive)
# Python 3.4 compatible (os.scandir came in 3.5)
for entry in os.listdir(target_folder):
try:
full_path = os.path.join(target_folder, entry)
# Get modified time
mtime = os.path.getmtime(full_path)
all_backups.append((full_path, mtime))
except:
pass
# 2. Sort by time DESCENDING (Newest first)
all_backups.sort(key=lambda x: x[1], reverse=True)
# 3. Check if we have excess
if len(all_backups) > retention_limit:
items_to_delete = all_backups[retention_limit:]
count = 0
for path, _ in items_to_delete:
try:
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
self.log("Pruning item lama: {}".format(os.path.basename(path)))
count += 1
except Exception as e:
self.log("Gagal menghapus {}: {}".format(os.path.basename(path), e))
if count > 0:
self.log("Pembersihan selesai. Dihapus: {} item.".format(count))
else:
self.log("Jumlah item ({}) aman (Batas: {}).".format(len(all_backups), retention_limit))
except Exception as e:
self.log("Error saat pembersihan: {}".format(e))
def _get_dated_dest_dir(self, base_dest, job_name=None):
"""Creates and returns JobName/YYYY/MM/DD subfolder path inside base_dest."""
# Add Job Name folder if provided
if job_name:
# Sanitize job name for folder validity
safe_job_name = "".join(c for c in job_name if c.isalnum() or c in (' ', '.', '_', '-')).strip()
base_dest = os.path.join(base_dest, safe_job_name)
now = datetime.datetime.now()
# Create structure: base/JobName/YYYY/MM/DD
dated_path = os.path.join(base_dest, now.strftime("%Y"), now.strftime("%m"), now.strftime("%d"))
if not os.path.exists(dated_path):
os.makedirs(dated_path, exist_ok=True)
return dated_path
@abstractmethod
def run_backup(self, config):
pass
class FileBackup(BackupStrategy):
def run_backup(self, config):
source = config.get('source')
dest = config.get('dest')
archive = config.get('archive', False)
if not os.path.exists(source):
raise FileNotFoundError("Sumber tidak ditemukan: {}".format(source))
if not os.path.exists(source):
raise FileNotFoundError("Sumber tidak ditemukan: {}".format(source))
# Use dated directory with Job Name
job_name = config.get('name')
target_dest = self._get_dated_dest_dir(dest, job_name)
self.log("Memulai Backup File: {} -> {}".format(source, target_dest))
name = os.path.basename(source.rstrip(os.sep))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H")
if archive:
# ZIP MODE
zip_filename = "{}_{}.zip".format(name, timestamp)
zip_path = os.path.join(target_dest, zip_filename)
self.log("Membuat arsip: {}".format(zip_path))
# Count total files for progress
total_files = 0
for root, dirs, files in os.walk(source):
total_files += len(files)
processed_files = 0
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
if os.path.isfile(source):
zf.write(source, os.path.basename(source))
self.progress(100)
else:
for root, dirs, files in os.walk(source):
if self.is_cancelled():
self.log("Backup dibatalkan.")
return
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, os.path.dirname(source))
zf.write(file_path, arcname)
processed_files += 1
if total_files > 0:
self.progress(int(processed_files / total_files * 100))
self.log("Arsip berhasil dibuat.")
else:
# COPY MODE
dest_path = os.path.join(target_dest, "{}_{}".format(name, timestamp))
self.log("Menyalin ke: {}".format(dest_path))
if os.path.isfile(source):
shutil.copy2(source, dest_path)
self.progress(100)
else:
# Initial implementation: simple copytree
# For better progress, we'd need to walk and copy manually like Zip
# But copytree is robust. Let's do a manual walk for progress.
total_files = 0
for root, dirs, files in os.walk(source):
total_files += len(files)
processed_files = 0
# Re-walk to copy
# We need to replicate directory structure first
# Or just use shutil.copytree but we lose granular progress
# Let's implement manual walk for progress
for root, dirs, files in os.walk(source):
if self.is_cancelled():
self.log("Backup dibatalkan.")
return
# Compute destination directory
rel_path = os.path.relpath(root, source)
current_dest_dir = os.path.join(dest_path, rel_path)
os.makedirs(current_dest_dir, exist_ok=True)
for file in files:
src_file = os.path.join(root, file)
dst_file = os.path.join(current_dest_dir, file)
shutil.copy2(src_file, dst_file)
# Cleanup
retention_days = config.get('retention_days', 0)
if retention_days > 0:
self.cleanup_old_backups(target_dest, retention_days)
self.progress(100)
class MySQLBackup(BackupStrategy):
def run_backup(self, config):
host = config.get('host', 'localhost')
port = str(config.get('port', 3306))
user = config.get('user', 'root')
password = config.get('password', '')
database = config.get('database')
dest = config.get('dest')
all_databases = config.get('all_databases', False)
skip_ssl = config.get('skip_ssl', True)
# Detect Server Version
self.log("Mendeteksi versi server MySQL di {}:{}...".format(host, port))
version, _, error = get_mysql_info(host, port, user, password, skip_ssl)
server_type = 'mysql' # Default
server_version = None
if error:
self.log("Warning: Gagal mendeteksi versi server ({}). Menggunakan tool default.".format(error))
else:
self.log("Versi server terdeteksi: {}".format(version))
server_version = version # Pass full string, find_sql_tool handles parsing
# Check if MariaDB
if 'MariaDB' in version or 'mariadb' in version.lower():
server_type = 'mariadb'
# Find mysqldump: Priority sql_tools (custom) > sql_tools (bundled)
mysqldump_path = find_sql_tool('mysqldump.exe', server_type, server_version)
if not mysqldump_path:
raise FileNotFoundError(
"mysqldump.exe tidak ditemukan!\n"
"Letakkan di folder 'sql_tools' di samping ProBackup.exe\n"
"atau pastikan sql_tools ada di dalam bundle.")
# Use dated directory with Job Name
job_name = config.get('name')
target_dest = self._get_dated_dest_dir(dest, job_name)
# Log tool location for debugging
self.log("Menggunakan mysqldump: {}".format(mysqldump_path))
# Determine backup mode
if all_databases:
self.log("Menghubungkan ke MySQL: {}:{} (SEMUA DATABASE)".format(host, port))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H")
dump_file_path = os.path.join(target_dest, "all_databases_{}.sql".format(timestamp))
else:
self.log("Menghubungkan ke MySQL: {}:{} / {}".format(host, port, database))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H")
dump_file_path = os.path.join(target_dest, "{}_{}.sql".format(database, timestamp))
# Get skip_ssl setting (default True for backward compatibility)
skip_ssl = config.get('skip_ssl', True)
# Command Construction - build properly from start
cmd = [mysqldump_path, '-h', host, '-P', port, '-u', user]
# Add password if provided
if password:
cmd.append('--password={}'.format(password))
# Add skip-ssl if enabled - detect tool type for correct option
if skip_ssl:
tool_basename = os.path.basename(mysqldump_path).lower()
if 'mariadb' in tool_basename or 'mariadump' in tool_basename:
# MariaDB uses --ssl=0 or --skip-ssl depending on version
cmd.append('--ssl=0')
self.log("Menggunakan opsi SSL MariaDB: --ssl=0")
else:
# MySQL uses --skip-ssl
cmd.append('--skip-ssl')
self.log("Menggunakan opsi SSL MySQL: --skip-ssl")
# Add remaining options
cmd.extend([
'--quick',
'--lock-tables=false',
'--result-file=' + dump_file_path,
])
# Add database or all-databases flag
if all_databases:
cmd.append('--all-databases')
self.log("Mode: Backup semua database (--all-databases)")
else:
cmd.append(database)
self.log("Menjalankan mysqldump ke: {}".format(dump_file_path))
self.log("Mohon tunggu...")
try:
# Hide password warning on stderr
# Using Popen to handle cancellation
# Hide console window on Windows
startupinfo = None
if sys.platform == 'win32':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, startupinfo=startupinfo)
# Monitor file size progress
last_log_time = time.time()
log_interval = 2 # Log every 2 seconds
while True:
retcode = process.poll()
if retcode is not None:
break
if self.is_cancelled():
process.terminate()
self.log("Backup dibatalkan.")
if os.path.exists(dump_file_path):
os.remove(dump_file_path)
return
# Log file size progress
current_time = time.time()
if current_time - last_log_time >= log_interval:
if os.path.exists(dump_file_path):
file_size = os.path.getsize(dump_file_path)
self.log("Progress: {} written...".format(format_size(file_size)))
last_log_time = current_time
time.sleep(0.5)
stdout, stderr = process.communicate()
if retcode != 0:
# mysqldump writes "Using a password..." to stderr as warning, not error.
# We need to distinguish real errors.
# Usually exit code 0 is success.
raise Exception("mysqldump error (Code {}): {}".format(retcode, stderr))
# Log final size
if os.path.exists(dump_file_path):
final_size = os.path.getsize(dump_file_path)
self.log("Dump selesai. Total: {}".format(format_size(final_size)))
else:
self.log("Dump selesai.")
self.progress(80)
# Compress if requested
if config.get('compress', False):
zip_path = dump_file_path.replace('.sql', '.zip')
self.log("Mengompres ke: {}".format(zip_path))
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.write(dump_file_path, os.path.basename(dump_file_path))
os.remove(dump_file_path)
self.log("Kompresi selesai.")
self.log("Backup MySQL berhasil diselesaikan.")
self.progress(100)
self.cleanup_old_backups(target_dest, config.get('retention_days', 0))
except Exception as e:
self.log("Gagal melakukan backup: {}".format(e))
raise e
class PostgresBackup(BackupStrategy):
def run_backup(self, config):
host = config.get('host', 'localhost')
port = str(config.get('port', 5432))
user = config.get('user', 'postgres')
password = config.get('password', '')
database = config.get('database')
dest = config.get('dest')
all_databases = config.get('all_databases', False)
# Use dated directory with Job Name
job_name = config.get('name')
target_dest = self._get_dated_dest_dir(dest, job_name)
# Detect Server Version
self.log("Mendeteksi versi server Postgres di {}:{}...".format(host, port))
version, _, error = get_postgres_info(host, port, user, password)
server_version = None
if error:
self.log("Warning: Gagal mendeteksi versi server ({}). Menggunakan tool default.".format(error))
else:
self.log("Versi server terdeteksi: {}".format(version))
server_version = version
# Determine which tool to use based on all_databases option
if all_databases:
# Use pg_dumpall for all databases
pg_tool_path = find_sql_tool('pg_dumpall.exe', 'postgres', server_version)
if not pg_tool_path:
raise FileNotFoundError(
"pg_dumpall.exe tidak ditemukan!\n"
"Letakkan di folder 'sql_tools' di samping ProBackup.exe\n"
"atau pastikan sql_tools ada di dalam bundle.")
self.log("Menggunakan pg_dumpall: {}".format(pg_tool_path))
self.log("Menghubungkan ke Postgres: {}:{} (SEMUA DATABASE)".format(host, port))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H")
dump_file_path = os.path.join(target_dest, "all_databases_{}.sql".format(timestamp))
else:
# Use pg_dump for single database
pg_tool_path = find_sql_tool('pg_dump.exe', 'postgres', server_version)
if not pg_tool_path:
raise FileNotFoundError(
"pg_dump.exe tidak ditemukan!\n"
"Letakkan di folder 'sql_tools' di samping ProBackup.exe\n"
"atau pastikan sql_tools ada di dalam bundle.")
self.log("Menggunakan pg_dump: {}".format(pg_tool_path))
self.log("Menghubungkan ke Postgres: {}:{} / {}".format(host, port, database))
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H")
dump_file_path = os.path.join(target_dest, "{}_{}.sql".format(database, timestamp))
# Prepare Environment for Password
env = os.environ.copy()
env['PGPASSWORD'] = password
# Command Construction
if all_databases:
# pg_dumpall command
cmd = [
pg_tool_path,
'-h', host,
'-p', port,
'-U', user,
'--no-owner',
'--no-acl',
'-f', dump_file_path,
]
self.log("Mode: Backup semua database (pg_dumpall)")
else:
# pg_dump command for single database
cmd = [
pg_tool_path,
'-h', host,
'-p', port,
'-U', user,
'--no-owner',
'--no-acl',
'-F', 'p', # Plain text format
'-f', dump_file_path,
database
]
self.log("Menjalankan backup ke: {}".format(dump_file_path))
self.log("Mohon tunggu, proses ini mungkin memakan waktu...")
try:
# We use subprocess.run. It blocks, which is okay in a WorkerThread.
# To support cancellation, we ideally should use Popen and poll, but for simplicity/robustness:
# If user cancels in UI, the thread might continue until this returns?
# Or we can use Popen.
# Hide console window on Windows
startupinfo = None
if sys.platform == 'win32':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, startupinfo=startupinfo)
# Monitor file size progress
last_log_time = time.time()
log_interval = 2 # Log every 2 seconds
while True:
retcode = process.poll()
if retcode is not None:
break
if self.is_cancelled():
process.terminate()
self.log("Backup dibatalkan (Process Terminated).")
if os.path.exists(dump_file_path):
os.remove(dump_file_path) # Cleanup partial
return
# Log file size progress
current_time = time.time()
if current_time - last_log_time >= log_interval:
if os.path.exists(dump_file_path):
file_size = os.path.getsize(dump_file_path)
self.log("Progress: {} written...".format(format_size(file_size)))
last_log_time = current_time
time.sleep(0.5)
stdout, stderr = process.communicate()
if retcode != 0:
raise Exception("pg_dump error (Code {}): {}".format(retcode, stderr))
# Log final size
if os.path.exists(dump_file_path):
final_size = os.path.getsize(dump_file_path)
self.log("Dump selesai. Total: {}".format(format_size(final_size)))
else:
self.log("Dump selesai.")
self.progress(80)
# Compress if requested
if config.get('compress', False):
zip_path = dump_file_path.replace('.sql', '.zip')
self.log("Mengompres ke: {}".format(zip_path))
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.write(dump_file_path, os.path.basename(dump_file_path))
os.remove(dump_file_path)
self.log("Kompresi selesai.")
self.log("Backup PostgreSQL berhasil diselesaikan.")
self.progress(100)
self.cleanup_old_backups(target_dest, config.get('retention_days', 0))
except Exception as e:
self.log("Gagal melakukan backup: {}".format(e))
raise e