""" Flask Web Server untuk OCR KTP/KK """ import os import logging import uuid import requests import difflib from functools import lru_cache from flask import Flask, render_template, request, jsonify, send_from_directory, session, send_file from werkzeug.utils import secure_filename from PIL import Image import numpy as np import numpy as np import math import io from openpyxl import Workbook from openpyxl.styles import Font, Alignment from ocr_engine import get_ocr_engine from ktp_extractor import KTPExtractor from kk_extractor import KKExtractor from database import db, init_db from models import KTPRecord, KKRecord app = Flask(__name__) # Konfigurasi UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads') KTP_FOLDER = os.path.join(os.path.dirname(__file__), 'KTP') # Defines KTP_FOLDER from previous steps (if not already there check context) # Wait, let's make sure KTP_FOLDER is consistently defined. # It was added in Step 94. # Current view shows it might be there. I will ensure it's correct. ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'bmp', 'webp'} MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max ALLOWED_DOC_TYPES = {'ktp', 'kk'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['KTP_FOLDER'] = KTP_FOLDER app.config['KK_FOLDER'] = os.path.join(os.path.dirname(__file__), 'KK') # New KK Folder app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH app.secret_key = 'secure-key-ocr-ktp-app' # Required for session # Simple Security ADMIN_USERNAME = 'admin' ADMIN_PASSWORD = '123' # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Production mode flag PRODUCTION_MODE = os.environ.get('FLASK_ENV', 'development').lower() == 'production' # Buat folder jika belum ada os.makedirs(UPLOAD_FOLDER, exist_ok=True) # Buat folder jika belum ada os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(KTP_FOLDER, exist_ok=True) os.makedirs(app.config['KK_FOLDER'], exist_ok=True) # Helper untuk Perspective Transform menggunakan Numpy (Inverse Matrix) def find_coeffs(pa, pb): matrix = [] for p1, p2 in zip(pa, pb): matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]]) matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]]) A = np.matrix(matrix, dtype=float) B = np.array(pb).reshape(8) res = np.dot(np.linalg.inv(A.T * A) * A.T, B) return np.array(res).reshape(8) @app.route('/api/transform-perspective', methods=['POST']) def transform_perspective(): try: if 'image' not in request.files: return jsonify({'success': False, 'error': 'No image uploaded'}), 400 file = request.files['image'] points_json = request.form.get('points', '[]') import json points = json.loads(points_json) # Expecting [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] (TL, TR, BR, BL) if len(points) != 4: return jsonify({'success': False, 'error': 'Invalid points'}), 400 # Load Image img = Image.open(file.stream) # Determine format fmt = img.format if img.format else 'JPEG' if fmt not in ['JPEG', 'PNG', 'WEBP']: fmt = 'JPEG' # Target Dimensions (KTP Aspect Ratio) # We can estimate width based on distance between top points or default to something high res width = max( math.hypot(points[1][0] - points[0][0], points[1][1] - points[0][1]), math.hypot(points[2][0] - points[3][0], points[2][1] - points[3][1]) ) height = max( math.hypot(points[3][0] - points[0][0], points[3][1] - points[0][1]), math.hypot(points[2][0] - points[1][0], points[2][1] - points[1][1]) ) # Force Aspect Ratio based on Doc Type doc_type = request.form.get('doc_type', 'ktp') if doc_type == 'kk': target_ratio = 297.0 / 210.0 # A4 Landscape else: target_ratio = 85.6 / 53.98 # KTP ID-1 # Use the calculated width/height as baseline, but adjust to match ratio # Ideally, take the larger dimension and derive the other if width / height > target_ratio: target_width = int(width) target_height = int(width / target_ratio) else: target_height = int(height) target_width = int(height * target_ratio) # Destination Points (Rectangular) # [0,0], [w,0], [w,h], [0,h] dst_points = [(0, 0), (target_width, 0), (target_width, target_height), (0, target_height)] # Calculate coeffs # Note: PIL transform perspective method uses INVERSE logic? # "transform(size, PERSPECTIVE, data, method, fill, resample)" # "data is an 8-tuple (a, b, c, d, e, f, g, h) which contains the first 8 coefficients of the perspective transform matrix." # "For each pixel (x, y) in the output image, the new value is taken from a position (P a x + b y + c) / (g x + h y + 1), (d x + e y + f) / (g x + h y + 1) in the input image" # So we map DST -> SRC coeffs = find_coeffs(dst_points, points) # Dst -> Src mapping # Perform transform new_img = img.transform((target_width, target_height), Image.PERSPECTIVE, coeffs, Image.BICUBIC) # Save to temporary buffer/file to return URL filename = f"temp_transformed_{secure_filename(file.filename)}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) new_img.save(filepath, format=fmt) return jsonify({ 'success': True, 'image_url': f"/uploads/{filename}", 'filename': filename }) except Exception as e: logger.error(f"Perspective transform error: {e}", exc_info=True) return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/uploads/') def serve_uploaded_file(filename): return send_from_directory(app.config['UPLOAD_FOLDER'], filename) # ============================================ # Helper Functions # ============================================ def sanitize_error_message(error, default_message="Terjadi kesalahan pada server"): """Sanitize error messages untuk production - jangan expose detail""" if PRODUCTION_MODE: return default_message return str(error) def validate_pagination(page, per_page, max_per_page=100): """Validate and sanitize pagination parameters""" try: page = int(page) if page else 1 per_page = int(per_page) if per_page else 10 # Ensure positive values page = max(1, page) per_page = max(1, min(per_page, max_per_page)) # Cap at max_per_page return page, per_page except (ValueError, TypeError): return 1, 10 # Default values def validate_nik(nik): """Validate NIK format (16 digits)""" if not nik: return False # NIK should be 16 digits return nik.isdigit() and len(nik) == 16 def validate_no_kk(no_kk): """Validate No KK format (16 digits)""" if not no_kk: return False # No KK should be 16 digits return no_kk.isdigit() and len(no_kk) == 16 # ============================================ # Error Handlers # ============================================ @app.errorhandler(404) def not_found(error): """Handle 404 errors""" return jsonify({ 'success': False, 'error': 'Resource not found' }), 404 @app.errorhandler(500) def internal_error(error): """Handle 500 errors""" logger.error(f"Internal server error: {error}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(error, 'Terjadi kesalahan pada server') }), 500 @app.errorhandler(413) def request_too_large(error): """Handle file too large errors""" return jsonify({ 'success': False, 'error': 'File terlalu besar. Maksimal 16MB' }), 413 # Inisialisasi extractors ktp_extractor = KTPExtractor() kk_extractor = KKExtractor() # Inisialisasi database try: init_db(app) except Exception as e: logger.warning(f"Database connection failed: {e}") logger.warning("The app will work but data won't be saved to MySQL.") def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route('/') def index(): """Halaman utama""" return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): """Handle upload dan proses OCR""" try: # Cek file if 'file' not in request.files: return jsonify({'success': False, 'error': 'Tidak ada file yang diupload'}), 400 file = request.files['file'] doc_type = request.form.get('doc_type', 'ktp').lower() # Validasi doc_type if doc_type not in ALLOWED_DOC_TYPES: return jsonify({ 'success': False, 'error': f'Jenis dokumen tidak valid. Gunakan: {", ".join(ALLOWED_DOC_TYPES)}' }), 400 if file.filename == '': return jsonify({'success': False, 'error': 'Nama file kosong'}), 400 if not allowed_file(file.filename): return jsonify({'success': False, 'error': 'Format file tidak didukung. Gunakan PNG, JPG, JPEG, BMP, atau WEBP'}), 400 # Simpan file dengan unique filename untuk menghindari race condition original_filename = secure_filename(file.filename) file_ext = os.path.splitext(original_filename)[1] unique_filename = f"{uuid.uuid4().hex}{file_ext}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) file.save(filepath) try: # Jalankan OCR ocr_engine = get_ocr_engine() ocr_results = ocr_engine.extract_text(filepath) if not ocr_results: return jsonify({ 'success': False, 'error': 'Tidak dapat membaca teks dari gambar. Pastikan gambar jelas dan tidak blur.' }), 400 # Ekstrak field berdasarkan jenis dokumen validation_meta = None if doc_type == 'ktp': extracted = ktp_extractor.extract(ocr_results) # Auto-correct and validate regions extracted, validation_meta = validate_and_correct_regions(extracted) else: extracted = kk_extractor.extract(ocr_results) # Raw text untuk debugging raw_text = '\n'.join([r['text'] for r in ocr_results]) # Log raw OCR results untuk debugging logger.debug("Raw OCR Results:") for i, r in enumerate(ocr_results): logger.debug(f"[{i}] {r['text']}") # Simpan ke database (optional - jika save_to_db=true) record_id = None save_to_db = request.form.get('save_to_db', 'true').lower() == 'true' if save_to_db: try: if doc_type == 'ktp': # Cek apakah NIK sudah ada existing = KTPRecord.query.filter_by(nik=extracted.get('nik')).first() if existing: # Update existing record for key, value in extracted.items(): if hasattr(existing, key) and value: setattr(existing, key, value) existing.raw_text = raw_text db.session.commit() record_id = existing.id logger.info(f"Updated KTP record: NIK {extracted.get('nik')}") else: # Create new record record = KTPRecord.from_ocr_data(extracted, raw_text) db.session.add(record) db.session.commit() record_id = record.id logger.info(f"Created new KTP record: NIK {extracted.get('nik')}") else: # KK existing = KKRecord.query.filter_by(no_kk=extracted.get('no_kk')).first() if existing: for key, value in extracted.items(): if hasattr(existing, key) and value: setattr(existing, key, value) existing.raw_text = raw_text db.session.commit() record_id = existing.id logger.info(f"Updated KK record: No KK {extracted.get('no_kk')}") else: record = KKRecord.from_ocr_data(extracted, raw_text) db.session.add(record) db.session.commit() record_id = record.id logger.info(f"Created new KK record: No KK {extracted.get('no_kk')}") except Exception as db_error: db.session.rollback() logger.error(f"Database save error: {db_error}", exc_info=True) # Continue without saving - don't fail the OCR request return jsonify({ 'success': True, 'doc_type': doc_type, 'data': extracted, 'validation': validation_meta, 'raw_text': raw_text, 'ocr_count': len(ocr_results), 'record_id': record_id, 'saved_to_db': record_id is not None }) finally: # Hapus file upload setelah proses (untuk keamanan data pribadi) if os.path.exists(filepath): try: os.remove(filepath) except Exception as cleanup_error: logger.warning(f"Failed to cleanup file {filepath}: {cleanup_error}") except Exception as e: logger.error(f"Error in upload_file: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal memproses file. Pastikan file valid dan coba lagi.') }), 500 # ============================================ # Region Data API (using wilayah.id) # ============================================ WILAYAH_API_BASE = "https://wilayah.id/api" @lru_cache(maxsize=500) # Optimized: increased from 100 def fetch_region_data(endpoint): """Fetch region data with caching""" try: response = requests.get(f"{WILAYAH_API_BASE}/{endpoint}", timeout=10) if response.status_code == 200: return response.json() return None except Exception as e: logger.error(f"Error fetching region data from {endpoint}: {e}") return None def normalize_name(name): """Normalize name for comparison""" if not name: return "" return name.upper().strip().replace(".", "").replace(" ", "") def find_best_match(search_name, items, key='name', cutoff=0.6): """Find best matching item using difflib for better fuzzy matching""" if not search_name or not items: return None # Optimized: Create map of uppercase names to items directly # This avoids the inefficient iteration at the end names_upper = [] name_map_upper = {} for item in items: name = item.get(key, '') name_upper = name.upper() names_upper.append(name_upper) # Store both original name and item for quick lookup if name_upper not in name_map_upper: name_map_upper[name_upper] = item # Python's difflib is good for typos search_upper = search_name.upper() matches = difflib.get_close_matches(search_upper, names_upper, n=1, cutoff=cutoff) if matches: # Direct lookup - much more efficient matched_name_upper = matches[0] return name_map_upper.get(matched_name_upper) return None def validate_and_correct_regions(ocr_data): """ Validate and correct region data cascadingly: Prov -> Kab/Kota -> Kec -> Kel/Desa ENHANCED: Jika Provinsi/Kabupaten kosong tapi Kecamatan/Desa terdeteksi, lakukan reverse lookup menggunakan: 1. Kode NIK (2 digit pertama = provinsi, 4 digit = kabupaten) 2. Search di database wilayah.id Returns tuple (updated_data, validation_codes) """ result = ocr_data.copy() validation_codes = { 'provinsi': {'valid': False, 'code': None, 'suggestion': None}, 'kabupaten_kota': {'valid': False, 'code': None, 'suggestion': None}, 'kecamatan': {'valid': False, 'code': None, 'suggestion': None}, 'kel_desa': {'valid': False, 'code': None, 'suggestion': None} } # ============================================ # STRATEGY 1: Extract from NIK if available # ============================================ # NIK format: PPKKDD-DDMMYY-XXXX # PP = Provinsi, PPKK = Kabupaten, PPKKDD = Kecamatan nik = result.get('nik', '') prov_code_from_nik = None kab_code_from_nik = None if nik and len(nik) >= 6: prov_code_from_nik = nik[:2] # 2 digit provinsi kab_code_from_nik = nik[:4] # 4 digit kabupaten logger.debug(f"NIK LOOKUP: Provinsi code: {prov_code_from_nik}, Kabupaten code: {kab_code_from_nik}") provinces_data = fetch_region_data("provinces.json") # ============================================ # STRATEGY 2: Forward validation (jika data ada) # ============================================ if provinces_data and 'data' in provinces_data: prov_match = None # Coba match dari nama provinsi terlebih dahulu if result.get('provinsi'): prov_match = find_best_match(result.get('provinsi'), provinces_data['data']) # Jika tidak ada nama provinsi, coba dari NIK if not prov_match and prov_code_from_nik: for prov in provinces_data['data']: if prov['code'] == prov_code_from_nik: prov_match = prov logger.debug(f"NIK LOOKUP: Found province from NIK: {prov['name']}") break if prov_match: result['provinsi'] = prov_match['name'] validation_codes['provinsi'] = {'valid': True, 'code': prov_match['code'], 'suggestion': prov_match['name']} # 2. Validate Regency (using Prov Code) regencies_data = fetch_region_data(f"regencies/{prov_match['code']}.json") if regencies_data and 'data' in regencies_data: reg_match = None # Coba match dari nama kabupaten if result.get('kabupaten_kota'): reg_match = find_best_match(result.get('kabupaten_kota'), regencies_data['data']) # Jika tidak ada, coba dari NIK if not reg_match and kab_code_from_nik: for reg in regencies_data['data']: if reg['code'] == kab_code_from_nik: reg_match = reg logger.debug(f"NIK LOOKUP: Found regency from NIK: {reg['name']}") break # REVERSE LOOKUP: Jika masih tidak ada, cari dari kecamatan (dengan limit untuk performa) if not reg_match and result.get('kecamatan'): logger.debug(f"REVERSE LOOKUP: Searching regency by kecamatan: {result.get('kecamatan')}") max_reverse_lookup = 50 # Limit untuk performa for i, reg in enumerate(regencies_data['data']): if i >= max_reverse_lookup: logger.warning(f"Reverse lookup stopped at limit {max_reverse_lookup}") break districts_data = fetch_region_data(f"districts/{reg['code']}.json") if districts_data and 'data' in districts_data: dist_match = find_best_match(result.get('kecamatan'), districts_data['data'], cutoff=0.7) if dist_match: reg_match = reg logger.debug(f"REVERSE LOOKUP: Found regency: {reg['name']} (via kecamatan {dist_match['name']})") break if reg_match: result['kabupaten_kota'] = reg_match['name'] validation_codes['kabupaten_kota'] = {'valid': True, 'code': reg_match['code'], 'suggestion': reg_match['name']} # 3. Validate District (using Kab Code) districts_data = fetch_region_data(f"districts/{reg_match['code']}.json") if districts_data and 'data' in districts_data: dist_match = find_best_match(result.get('kecamatan'), districts_data['data']) # REVERSE LOOKUP: Jika kecamatan kosong tapi kel_desa ada, cari dari desa (dengan limit) if not dist_match and result.get('kel_desa'): logger.debug(f"REVERSE LOOKUP: Searching kecamatan by kel_desa: {result.get('kel_desa')}") max_reverse_lookup = 30 # Limit untuk performa for i, dist in enumerate(districts_data['data']): if i >= max_reverse_lookup: logger.warning(f"Reverse lookup stopped at limit {max_reverse_lookup}") break villages_data = fetch_region_data(f"villages/{dist['code']}.json") if villages_data and 'data' in villages_data: vil_match = find_best_match(result.get('kel_desa'), villages_data['data'], cutoff=0.7) if vil_match: dist_match = dist logger.debug(f"REVERSE LOOKUP: Found kecamatan: {dist['name']} (via desa {vil_match['name']})") # Juga update result untuk kecamatan result['kecamatan'] = dist['name'] break if dist_match: result['kecamatan'] = dist_match['name'] validation_codes['kecamatan'] = {'valid': True, 'code': dist_match['code'], 'suggestion': dist_match['name']} # 4. Validate Village (using Kec Code) villages_data = fetch_region_data(f"villages/{dist_match['code']}.json") if villages_data and 'data' in villages_data: vil_match = find_best_match(result.get('kel_desa'), villages_data['data']) if vil_match: result['kel_desa'] = vil_match['name'] validation_codes['kel_desa'] = {'valid': True, 'code': vil_match['code'], 'suggestion': vil_match['name']} return result, validation_codes @app.route('/api/validate-region', methods=['POST']) def validate_region(): """Validate OCR region data against official database""" try: if not request.is_json: return jsonify({ 'success': False, 'error': 'Request harus berupa JSON' }), 400 ocr_data = request.json if not ocr_data: return jsonify({ 'success': False, 'error': 'Data tidak boleh kosong' }), 400 _, validation_result = validate_and_correct_regions(ocr_data) return jsonify({'success': True, 'validation': validation_result}) except Exception as e: logger.error(f"Error in validate_region: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal memvalidasi data wilayah') }), 500 @app.route('/health') def health(): """Health check endpoint""" return jsonify({'status': 'ok'}) @app.route('/api/login', methods=['POST']) def login(): """Simple login""" if not request.is_json: return jsonify({'success': False}), 400 data = request.json if data.get('username') == ADMIN_USERNAME and data.get('password') == ADMIN_PASSWORD: session['logged_in'] = True return jsonify({'success': True}) return jsonify({'success': False, 'error': 'Username atau Password salah'}), 401 @app.route('/api/check-auth') def check_auth(): """Check session""" if session.get('logged_in'): return jsonify({'authenticated': True}) return jsonify({'authenticated': False}), 401 # ============================================ # KTP Archive Endpoints # ============================================ # ============================================ # KTP Archive Endpoints # ============================================ @app.route('/api/save-ktp', methods=['POST']) def save_ktp(): """Save edited KTP data with cropped image""" return save_document('ktp') @app.route('/api/save-kk', methods=['POST']) def save_kk(): """Save edited KK data with cropped image""" return save_document('kk') def save_document(doc_type): """Generic save function""" try: if 'image' not in request.files: return jsonify({'success': False, 'error': 'Tidak ada gambar'}), 400 image_file = request.files['image'] data_json = request.form.get('data', '{}') import json try: data = json.loads(data_json) except: return jsonify({'success': False, 'error': 'Data tidak valid'}), 400 # Determine config based on type if doc_type == 'ktp': identifier = data.get('nik', '') folder = app.config['KTP_FOLDER'] Model = KTPRecord id_field = 'nik' else: identifier = data.get('no_kk', '') folder = app.config['KK_FOLDER'] Model = KKRecord id_field = 'no_kk' if not identifier: return jsonify({'success': False, 'error': f'{id_field.upper()} tidak boleh kosong'}), 400 # Save image file_ext = os.path.splitext(image_file.filename)[1] or '.jpg' image_filename = f"{identifier}{file_ext}" image_path = os.path.join(folder, image_filename) image_file.save(image_path) # DB Update existing = Model.query.filter_by(**{id_field: identifier}).first() if existing: for k, v in data.items(): if hasattr(existing, k) and v: setattr(existing, k, v) existing.image_path = image_filename db.session.commit() record_id = existing.id else: # New Record # Only pass fields that exist in model # Filter data to match model columns (naive approach or using from_ocr_data if appropriate) # Actually, direct generic init is risky if fields mismatch. # Best to use specific logic or robust setattr? # Let's use simple generic approach: Create empty, then update. record = Model() for k, v in data.items(): if hasattr(record, k): setattr(record, k, v) setattr(record, id_field, identifier) record.image_path = image_filename db.session.add(record) db.session.commit() record_id = record.id return jsonify({'success': True, 'record_id': record_id, 'image_path': image_filename}) except Exception as e: db.session.rollback() logger.error(f"Save error: {e}") return jsonify({'success': False, 'error': 'Gagal menyimpan'}), 500 @app.route('/ktp-images/') def serve_ktp_image(filename): """Serve KTP images - Protected""" if not session.get('logged_in'): return "Unauthorized", 401 return send_from_directory(app.config['KTP_FOLDER'], filename) @app.route('/kk-images/') def serve_kk_image(filename): """Serve KK images - Protected""" if not session.get('logged_in'): return "Unauthorized", 401 return send_from_directory(app.config['KK_FOLDER'], filename) @app.route('/api/ktp-archive', methods=['GET']) def list_ktp_archive(): """List all KTP records with images - Protected""" if not session.get('logged_in'): return jsonify({'success': False, 'error': 'Unauthorized'}), 401 try: page, per_page = validate_pagination( request.args.get('page', 1), request.args.get('per_page', 20) ) # Only get records that have images saved pagination = KTPRecord.query.filter( KTPRecord.image_path.isnot(None) ).order_by(KTPRecord.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'success': True, 'data': [r.to_dict() for r in pagination.items], 'total': pagination.total, 'pages': pagination.pages, 'current_page': page }) except Exception as e: logger.error(f"Error listing KTP archive: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal mengambil arsip KTP') }), 500 @app.route('/api/kk-archive', methods=['GET']) def list_kk_archive(): """List all KK records with images - Protected""" if not session.get('logged_in'): return jsonify({'success': False, 'error': 'Unauthorized'}), 401 try: page, per_page = validate_pagination( request.args.get('page', 1), request.args.get('per_page', 20) ) # Only get records that have images saved pagination = KKRecord.query.filter( KKRecord.image_path.isnot(None) ).order_by(KKRecord.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'success': True, 'data': [r.to_dict() for r in pagination.items], 'total': pagination.total, 'pages': pagination.pages, 'current_page': page }) except Exception as e: logger.error(f"Error listing KK archive: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal mengambil arsip KK') }), 500 @app.route('/api/export-excel', methods=['POST']) def export_excel(): """Export data to Excel (XLSX)""" try: if not request.is_json: return jsonify({'error': 'Format data tidak valid'}), 400 data = request.json if not data: return jsonify({'error': 'Data kosong'}), 400 wb = Workbook() ws = wb.active ws.title = "Data KTP" # Headers headers = ["ATRIBUT", "NILAI"] ws.append(headers) # Style Headers (Bold, Center, Gray Bg optional) for cell in ws[1]: cell.font = Font(bold=True) cell.alignment = Alignment(horizontal='center') # Data exclude = ['raw_text', 'image_path', 'updated_at', 'created_at', 'id'] # Specific Order if possible order = ['nik', 'nama', 'tempat_lahir', 'tanggal_lahir', 'jenis_kelamin', 'gol_darah', 'alamat', 'rt_rw', 'kel_desa', 'kecamatan', 'kabupaten_kota', 'provinsi', 'agama', 'status_perkawinan', 'pekerjaan', 'kewarganegaraan', 'berlaku_hingga'] start_row = 2 # Main Fields in Order for key in order: val = data.get(key) if val: ws.append([key.replace('_', ' ').upper(), str(val)]) # Other fields not in order for k, v in data.items(): if k not in exclude and k not in order: ws.append([k.replace('_', ' ').upper(), str(v)]) # Auto-adjust columns for col in ws.columns: max_length = 0 column = col[0].column_letter # Get the column name for cell in col: try: if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except: pass adjusted_width = (max_length + 2) * 1.2 ws.column_dimensions[column].width = adjusted_width # Save to buffer output = io.BytesIO() wb.save(output) output.seek(0) filename = f"Data_KTP_{data.get('nik', 'Export')}.xlsx" return send_file( output, as_attachment=True, download_name=filename, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' ) except Exception as e: logger.error(f"Export Excel Error: {e}") return jsonify({'error': 'Gagal export excel'}), 500 # ============================================ # Database CRUD API Endpoints # ============================================ @app.route('/api/ktp', methods=['GET']) def list_ktp_records(): """List all KTP records with pagination""" try: page, per_page = validate_pagination( request.args.get('page', 1), request.args.get('per_page', 10) ) pagination = KTPRecord.query.order_by(KTPRecord.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'success': True, 'data': [r.to_dict() for r in pagination.items], 'total': pagination.total, 'pages': pagination.pages, 'current_page': page, 'per_page': per_page }) except Exception as e: logger.error(f"Error in list_ktp_records: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal mengambil data KTP') }), 500 @app.route('/api/ktp/', methods=['GET']) def get_ktp_record(id): """Get KTP record by ID""" try: record = KTPRecord.query.get_or_404(id) return jsonify({'success': True, 'data': record.to_dict()}) except Exception as e: logger.error(f"Error in get_ktp_record: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Data KTP tidak ditemukan') }), 404 @app.route('/api/ktp/nik/', methods=['GET']) def get_ktp_by_nik(nik): """Get KTP record by NIK""" try: # Validasi format NIK if not validate_nik(nik): return jsonify({ 'success': False, 'error': 'Format NIK tidak valid. NIK harus 16 digit angka' }), 400 record = KTPRecord.query.filter_by(nik=nik).first() if record: return jsonify({'success': True, 'data': record.to_dict()}) return jsonify({'success': False, 'error': 'NIK tidak ditemukan'}), 404 except Exception as e: logger.error(f"Error in get_ktp_by_nik: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal mencari data KTP') }), 500 @app.route('/api/ktp/', methods=['DELETE']) def delete_ktp_record(id): """Delete KTP record by ID""" try: record = KTPRecord.query.get_or_404(id) db.session.delete(record) db.session.commit() logger.info(f"Deleted KTP record: ID {id}") return jsonify({'success': True, 'message': f'Record {id} berhasil dihapus'}) except Exception as e: db.session.rollback() logger.error(f"Error in delete_ktp_record: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal menghapus data KTP') }), 500 @app.route('/api/kk', methods=['GET']) def list_kk_records(): """List all KK records with pagination""" try: page, per_page = validate_pagination( request.args.get('page', 1), request.args.get('per_page', 10) ) pagination = KKRecord.query.order_by(KKRecord.created_at.desc()).paginate( page=page, per_page=per_page, error_out=False ) return jsonify({ 'success': True, 'data': [r.to_dict() for r in pagination.items], 'total': pagination.total, 'pages': pagination.pages, 'current_page': page, 'per_page': per_page }) except Exception as e: logger.error(f"Error in list_kk_records: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal mengambil data KK') }), 500 @app.route('/api/kk/', methods=['GET']) def get_kk_record(id): """Get KK record by ID""" try: record = KKRecord.query.get_or_404(id) return jsonify({'success': True, 'data': record.to_dict()}) except Exception as e: logger.error(f"Error in get_kk_record: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Data KK tidak ditemukan') }), 404 @app.route('/api/kk/', methods=['DELETE']) def delete_kk_record(id): """Delete KK record by ID""" try: record = KKRecord.query.get_or_404(id) db.session.delete(record) db.session.commit() logger.info(f"Deleted KK record: ID {id}") return jsonify({'success': True, 'message': f'Record {id} berhasil dihapus'}) except Exception as e: db.session.rollback() logger.error(f"Error in delete_kk_record: {e}", exc_info=True) return jsonify({ 'success': False, 'error': sanitize_error_message(e, 'Gagal menghapus data KK') }), 500 if __name__ == '__main__': # Konfigurasi dari environment variables host = os.environ.get('FLASK_HOST', '0.0.0.0') port = int(os.environ.get('FLASK_PORT', 5000)) debug = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true' logger.info("="*50) logger.info("OCR KTP/KK Application") logger.info("="*50) logger.info(f"Membuka: http://{host}:{port}") logger.info("Tekan Ctrl+C untuk berhenti") logger.info("="*50) app.run(host=host, port=port, debug=debug)