Files
local-ocr/app.py
2025-12-31 01:38:01 +08:00

1044 lines
39 KiB
Python

"""
Flask Web Server untuk OCR KTP/KK
"""
import os
import logging
import uuid
import requests
import difflib
from functools import lru_cache
from flask import Flask, render_template, request, jsonify, send_from_directory, session, send_file
from werkzeug.utils import secure_filename
from PIL import Image
import numpy as np
import numpy as np
import math
import io
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment
from ocr_engine import get_ocr_engine
from ktp_extractor import KTPExtractor
from kk_extractor import KKExtractor
from database import db, init_db
from models import KTPRecord, KKRecord
app = Flask(__name__)
# Konfigurasi
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads')
KTP_FOLDER = os.path.join(os.path.dirname(__file__), 'KTP') # Defines KTP_FOLDER from previous steps (if not already there check context)
# Wait, let's make sure KTP_FOLDER is consistently defined.
# It was added in Step 94.
# Current view shows it might be there. I will ensure it's correct.
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'bmp', 'webp'}
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max
ALLOWED_DOC_TYPES = {'ktp', 'kk'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['KTP_FOLDER'] = KTP_FOLDER
app.config['KK_FOLDER'] = os.path.join(os.path.dirname(__file__), 'KK') # New KK Folder
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
app.secret_key = 'secure-key-ocr-ktp-app' # Required for session
# Simple Security
ADMIN_USERNAME = 'admin'
ADMIN_PASSWORD = '123'
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Production mode flag
PRODUCTION_MODE = os.environ.get('FLASK_ENV', 'development').lower() == 'production'
# Buat folder jika belum ada
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Buat folder jika belum ada
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(KTP_FOLDER, exist_ok=True)
os.makedirs(app.config['KK_FOLDER'], exist_ok=True)
# Helper untuk Perspective Transform menggunakan Numpy (Inverse Matrix)
def find_coeffs(pa, pb):
matrix = []
for p1, p2 in zip(pa, pb):
matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]])
matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]])
A = np.matrix(matrix, dtype=float)
B = np.array(pb).reshape(8)
res = np.dot(np.linalg.inv(A.T * A) * A.T, B)
return np.array(res).reshape(8)
@app.route('/api/transform-perspective', methods=['POST'])
def transform_perspective():
try:
if 'image' not in request.files:
return jsonify({'success': False, 'error': 'No image uploaded'}), 400
file = request.files['image']
points_json = request.form.get('points', '[]')
import json
points = json.loads(points_json) # Expecting [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] (TL, TR, BR, BL)
if len(points) != 4:
return jsonify({'success': False, 'error': 'Invalid points'}), 400
# Load Image
img = Image.open(file.stream)
# Determine format
fmt = img.format if img.format else 'JPEG'
if fmt not in ['JPEG', 'PNG', 'WEBP']:
fmt = 'JPEG'
# Target Dimensions (KTP Aspect Ratio)
# We can estimate width based on distance between top points or default to something high res
width = max(
math.hypot(points[1][0] - points[0][0], points[1][1] - points[0][1]),
math.hypot(points[2][0] - points[3][0], points[2][1] - points[3][1])
)
height = max(
math.hypot(points[3][0] - points[0][0], points[3][1] - points[0][1]),
math.hypot(points[2][0] - points[1][0], points[2][1] - points[1][1])
)
# Force Aspect Ratio based on Doc Type
doc_type = request.form.get('doc_type', 'ktp')
if doc_type == 'kk':
target_ratio = 297.0 / 210.0 # A4 Landscape
else:
target_ratio = 85.6 / 53.98 # KTP ID-1
# Use the calculated width/height as baseline, but adjust to match ratio
# Ideally, take the larger dimension and derive the other
if width / height > target_ratio:
target_width = int(width)
target_height = int(width / target_ratio)
else:
target_height = int(height)
target_width = int(height * target_ratio)
# Destination Points (Rectangular)
# [0,0], [w,0], [w,h], [0,h]
dst_points = [(0, 0), (target_width, 0), (target_width, target_height), (0, target_height)]
# Calculate coeffs
# Note: PIL transform perspective method uses INVERSE logic?
# "transform(size, PERSPECTIVE, data, method, fill, resample)"
# "data is an 8-tuple (a, b, c, d, e, f, g, h) which contains the first 8 coefficients of the perspective transform matrix."
# "For each pixel (x, y) in the output image, the new value is taken from a position (P a x + b y + c) / (g x + h y + 1), (d x + e y + f) / (g x + h y + 1) in the input image"
# So we map DST -> SRC
coeffs = find_coeffs(dst_points, points) # Dst -> Src mapping
# Perform transform
new_img = img.transform((target_width, target_height), Image.PERSPECTIVE, coeffs, Image.BICUBIC)
# Save to temporary buffer/file to return URL
filename = f"temp_transformed_{secure_filename(file.filename)}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
new_img.save(filepath, format=fmt)
return jsonify({
'success': True,
'image_url': f"/uploads/{filename}",
'filename': filename
})
except Exception as e:
logger.error(f"Perspective transform error: {e}", exc_info=True)
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/uploads/<filename>')
def serve_uploaded_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
# ============================================
# Helper Functions
# ============================================
def sanitize_error_message(error, default_message="Terjadi kesalahan pada server"):
"""Sanitize error messages untuk production - jangan expose detail"""
if PRODUCTION_MODE:
return default_message
return str(error)
def validate_pagination(page, per_page, max_per_page=100):
"""Validate and sanitize pagination parameters"""
try:
page = int(page) if page else 1
per_page = int(per_page) if per_page else 10
# Ensure positive values
page = max(1, page)
per_page = max(1, min(per_page, max_per_page)) # Cap at max_per_page
return page, per_page
except (ValueError, TypeError):
return 1, 10 # Default values
def validate_nik(nik):
"""Validate NIK format (16 digits)"""
if not nik:
return False
# NIK should be 16 digits
return nik.isdigit() and len(nik) == 16
def validate_no_kk(no_kk):
"""Validate No KK format (16 digits)"""
if not no_kk:
return False
# No KK should be 16 digits
return no_kk.isdigit() and len(no_kk) == 16
# ============================================
# Error Handlers
# ============================================
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors"""
return jsonify({
'success': False,
'error': 'Resource not found'
}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors"""
logger.error(f"Internal server error: {error}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(error, 'Terjadi kesalahan pada server')
}), 500
@app.errorhandler(413)
def request_too_large(error):
"""Handle file too large errors"""
return jsonify({
'success': False,
'error': 'File terlalu besar. Maksimal 16MB'
}), 413
# Inisialisasi extractors
ktp_extractor = KTPExtractor()
kk_extractor = KKExtractor()
# Inisialisasi database
try:
init_db(app)
except Exception as e:
logger.warning(f"Database connection failed: {e}")
logger.warning("The app will work but data won't be saved to MySQL.")
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
"""Halaman utama"""
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
"""Handle upload dan proses OCR"""
try:
# Cek file
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'Tidak ada file yang diupload'}), 400
file = request.files['file']
doc_type = request.form.get('doc_type', 'ktp').lower()
# Validasi doc_type
if doc_type not in ALLOWED_DOC_TYPES:
return jsonify({
'success': False,
'error': f'Jenis dokumen tidak valid. Gunakan: {", ".join(ALLOWED_DOC_TYPES)}'
}), 400
if file.filename == '':
return jsonify({'success': False, 'error': 'Nama file kosong'}), 400
if not allowed_file(file.filename):
return jsonify({'success': False, 'error': 'Format file tidak didukung. Gunakan PNG, JPG, JPEG, BMP, atau WEBP'}), 400
# Simpan file dengan unique filename untuk menghindari race condition
original_filename = secure_filename(file.filename)
file_ext = os.path.splitext(original_filename)[1]
unique_filename = f"{uuid.uuid4().hex}{file_ext}"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(filepath)
try:
# Jalankan OCR
ocr_engine = get_ocr_engine()
ocr_results = ocr_engine.extract_text(filepath)
if not ocr_results:
return jsonify({
'success': False,
'error': 'Tidak dapat membaca teks dari gambar. Pastikan gambar jelas dan tidak blur.'
}), 400
# Ekstrak field berdasarkan jenis dokumen
validation_meta = None
if doc_type == 'ktp':
extracted = ktp_extractor.extract(ocr_results)
# Auto-correct and validate regions
extracted, validation_meta = validate_and_correct_regions(extracted)
else:
extracted = kk_extractor.extract(ocr_results)
# Raw text untuk debugging
raw_text = '\n'.join([r['text'] for r in ocr_results])
# Log raw OCR results untuk debugging
logger.debug("Raw OCR Results:")
for i, r in enumerate(ocr_results):
logger.debug(f"[{i}] {r['text']}")
# Simpan ke database (optional - jika save_to_db=true)
record_id = None
save_to_db = request.form.get('save_to_db', 'true').lower() == 'true'
if save_to_db:
try:
if doc_type == 'ktp':
# Cek apakah NIK sudah ada
existing = KTPRecord.query.filter_by(nik=extracted.get('nik')).first()
if existing:
# Update existing record
for key, value in extracted.items():
if hasattr(existing, key) and value:
setattr(existing, key, value)
existing.raw_text = raw_text
db.session.commit()
record_id = existing.id
logger.info(f"Updated KTP record: NIK {extracted.get('nik')}")
else:
# Create new record
record = KTPRecord.from_ocr_data(extracted, raw_text)
db.session.add(record)
db.session.commit()
record_id = record.id
logger.info(f"Created new KTP record: NIK {extracted.get('nik')}")
else:
# KK
existing = KKRecord.query.filter_by(no_kk=extracted.get('no_kk')).first()
if existing:
for key, value in extracted.items():
if hasattr(existing, key) and value:
setattr(existing, key, value)
existing.raw_text = raw_text
db.session.commit()
record_id = existing.id
logger.info(f"Updated KK record: No KK {extracted.get('no_kk')}")
else:
record = KKRecord.from_ocr_data(extracted, raw_text)
db.session.add(record)
db.session.commit()
record_id = record.id
logger.info(f"Created new KK record: No KK {extracted.get('no_kk')}")
except Exception as db_error:
db.session.rollback()
logger.error(f"Database save error: {db_error}", exc_info=True)
# Continue without saving - don't fail the OCR request
return jsonify({
'success': True,
'doc_type': doc_type,
'data': extracted,
'validation': validation_meta,
'raw_text': raw_text,
'ocr_count': len(ocr_results),
'record_id': record_id,
'saved_to_db': record_id is not None
})
finally:
# Hapus file upload setelah proses (untuk keamanan data pribadi)
if os.path.exists(filepath):
try:
os.remove(filepath)
except Exception as cleanup_error:
logger.warning(f"Failed to cleanup file {filepath}: {cleanup_error}")
except Exception as e:
logger.error(f"Error in upload_file: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal memproses file. Pastikan file valid dan coba lagi.')
}), 500
# ============================================
# Region Data API (using wilayah.id)
# ============================================
WILAYAH_API_BASE = "https://wilayah.id/api"
@lru_cache(maxsize=500) # Optimized: increased from 100
def fetch_region_data(endpoint):
"""Fetch region data with caching"""
try:
response = requests.get(f"{WILAYAH_API_BASE}/{endpoint}", timeout=10)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
logger.error(f"Error fetching region data from {endpoint}: {e}")
return None
def normalize_name(name):
"""Normalize name for comparison"""
if not name:
return ""
return name.upper().strip().replace(".", "").replace(" ", "")
def find_best_match(search_name, items, key='name', cutoff=0.6):
"""Find best matching item using difflib for better fuzzy matching"""
if not search_name or not items:
return None
# Optimized: Create map of uppercase names to items directly
# This avoids the inefficient iteration at the end
names_upper = []
name_map_upper = {}
for item in items:
name = item.get(key, '')
name_upper = name.upper()
names_upper.append(name_upper)
# Store both original name and item for quick lookup
if name_upper not in name_map_upper:
name_map_upper[name_upper] = item
# Python's difflib is good for typos
search_upper = search_name.upper()
matches = difflib.get_close_matches(search_upper, names_upper, n=1, cutoff=cutoff)
if matches:
# Direct lookup - much more efficient
matched_name_upper = matches[0]
return name_map_upper.get(matched_name_upper)
return None
def validate_and_correct_regions(ocr_data):
"""
Validate and correct region data cascadingly:
Prov -> Kab/Kota -> Kec -> Kel/Desa
ENHANCED: Jika Provinsi/Kabupaten kosong tapi Kecamatan/Desa terdeteksi,
lakukan reverse lookup menggunakan:
1. Kode NIK (2 digit pertama = provinsi, 4 digit = kabupaten)
2. Search di database wilayah.id
Returns tuple (updated_data, validation_codes)
"""
result = ocr_data.copy()
validation_codes = {
'provinsi': {'valid': False, 'code': None, 'suggestion': None},
'kabupaten_kota': {'valid': False, 'code': None, 'suggestion': None},
'kecamatan': {'valid': False, 'code': None, 'suggestion': None},
'kel_desa': {'valid': False, 'code': None, 'suggestion': None}
}
# ============================================
# STRATEGY 1: Extract from NIK if available
# ============================================
# NIK format: PPKKDD-DDMMYY-XXXX
# PP = Provinsi, PPKK = Kabupaten, PPKKDD = Kecamatan
nik = result.get('nik', '')
prov_code_from_nik = None
kab_code_from_nik = None
if nik and len(nik) >= 6:
prov_code_from_nik = nik[:2] # 2 digit provinsi
kab_code_from_nik = nik[:4] # 4 digit kabupaten
logger.debug(f"NIK LOOKUP: Provinsi code: {prov_code_from_nik}, Kabupaten code: {kab_code_from_nik}")
provinces_data = fetch_region_data("provinces.json")
# ============================================
# STRATEGY 2: Forward validation (jika data ada)
# ============================================
if provinces_data and 'data' in provinces_data:
prov_match = None
# Coba match dari nama provinsi terlebih dahulu
if result.get('provinsi'):
prov_match = find_best_match(result.get('provinsi'), provinces_data['data'])
# Jika tidak ada nama provinsi, coba dari NIK
if not prov_match and prov_code_from_nik:
for prov in provinces_data['data']:
if prov['code'] == prov_code_from_nik:
prov_match = prov
logger.debug(f"NIK LOOKUP: Found province from NIK: {prov['name']}")
break
if prov_match:
result['provinsi'] = prov_match['name']
validation_codes['provinsi'] = {'valid': True, 'code': prov_match['code'], 'suggestion': prov_match['name']}
# 2. Validate Regency (using Prov Code)
regencies_data = fetch_region_data(f"regencies/{prov_match['code']}.json")
if regencies_data and 'data' in regencies_data:
reg_match = None
# Coba match dari nama kabupaten
if result.get('kabupaten_kota'):
reg_match = find_best_match(result.get('kabupaten_kota'), regencies_data['data'])
# Jika tidak ada, coba dari NIK
if not reg_match and kab_code_from_nik:
for reg in regencies_data['data']:
if reg['code'] == kab_code_from_nik:
reg_match = reg
logger.debug(f"NIK LOOKUP: Found regency from NIK: {reg['name']}")
break
# REVERSE LOOKUP: Jika masih tidak ada, cari dari kecamatan (dengan limit untuk performa)
if not reg_match and result.get('kecamatan'):
logger.debug(f"REVERSE LOOKUP: Searching regency by kecamatan: {result.get('kecamatan')}")
max_reverse_lookup = 50 # Limit untuk performa
for i, reg in enumerate(regencies_data['data']):
if i >= max_reverse_lookup:
logger.warning(f"Reverse lookup stopped at limit {max_reverse_lookup}")
break
districts_data = fetch_region_data(f"districts/{reg['code']}.json")
if districts_data and 'data' in districts_data:
dist_match = find_best_match(result.get('kecamatan'), districts_data['data'], cutoff=0.7)
if dist_match:
reg_match = reg
logger.debug(f"REVERSE LOOKUP: Found regency: {reg['name']} (via kecamatan {dist_match['name']})")
break
if reg_match:
result['kabupaten_kota'] = reg_match['name']
validation_codes['kabupaten_kota'] = {'valid': True, 'code': reg_match['code'], 'suggestion': reg_match['name']}
# 3. Validate District (using Kab Code)
districts_data = fetch_region_data(f"districts/{reg_match['code']}.json")
if districts_data and 'data' in districts_data:
dist_match = find_best_match(result.get('kecamatan'), districts_data['data'])
# REVERSE LOOKUP: Jika kecamatan kosong tapi kel_desa ada, cari dari desa (dengan limit)
if not dist_match and result.get('kel_desa'):
logger.debug(f"REVERSE LOOKUP: Searching kecamatan by kel_desa: {result.get('kel_desa')}")
max_reverse_lookup = 30 # Limit untuk performa
for i, dist in enumerate(districts_data['data']):
if i >= max_reverse_lookup:
logger.warning(f"Reverse lookup stopped at limit {max_reverse_lookup}")
break
villages_data = fetch_region_data(f"villages/{dist['code']}.json")
if villages_data and 'data' in villages_data:
vil_match = find_best_match(result.get('kel_desa'), villages_data['data'], cutoff=0.7)
if vil_match:
dist_match = dist
logger.debug(f"REVERSE LOOKUP: Found kecamatan: {dist['name']} (via desa {vil_match['name']})")
# Juga update result untuk kecamatan
result['kecamatan'] = dist['name']
break
if dist_match:
result['kecamatan'] = dist_match['name']
validation_codes['kecamatan'] = {'valid': True, 'code': dist_match['code'], 'suggestion': dist_match['name']}
# 4. Validate Village (using Kec Code)
villages_data = fetch_region_data(f"villages/{dist_match['code']}.json")
if villages_data and 'data' in villages_data:
vil_match = find_best_match(result.get('kel_desa'), villages_data['data'])
if vil_match:
result['kel_desa'] = vil_match['name']
validation_codes['kel_desa'] = {'valid': True, 'code': vil_match['code'], 'suggestion': vil_match['name']}
return result, validation_codes
@app.route('/api/validate-region', methods=['POST'])
def validate_region():
"""Validate OCR region data against official database"""
try:
if not request.is_json:
return jsonify({
'success': False,
'error': 'Request harus berupa JSON'
}), 400
ocr_data = request.json
if not ocr_data:
return jsonify({
'success': False,
'error': 'Data tidak boleh kosong'
}), 400
_, validation_result = validate_and_correct_regions(ocr_data)
return jsonify({'success': True, 'validation': validation_result})
except Exception as e:
logger.error(f"Error in validate_region: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal memvalidasi data wilayah')
}), 500
@app.route('/health')
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok'})
@app.route('/api/login', methods=['POST'])
def login():
"""Simple login"""
if not request.is_json:
return jsonify({'success': False}), 400
data = request.json
if data.get('username') == ADMIN_USERNAME and data.get('password') == ADMIN_PASSWORD:
session['logged_in'] = True
return jsonify({'success': True})
return jsonify({'success': False, 'error': 'Username atau Password salah'}), 401
@app.route('/api/check-auth')
def check_auth():
"""Check session"""
if session.get('logged_in'):
return jsonify({'authenticated': True})
return jsonify({'authenticated': False}), 401
# ============================================
# KTP Archive Endpoints
# ============================================
# ============================================
# KTP Archive Endpoints
# ============================================
@app.route('/api/save-ktp', methods=['POST'])
def save_ktp():
"""Save edited KTP data with cropped image"""
return save_document('ktp')
@app.route('/api/save-kk', methods=['POST'])
def save_kk():
"""Save edited KK data with cropped image"""
return save_document('kk')
def save_document(doc_type):
"""Generic save function"""
try:
if 'image' not in request.files:
return jsonify({'success': False, 'error': 'Tidak ada gambar'}), 400
image_file = request.files['image']
data_json = request.form.get('data', '{}')
import json
try:
data = json.loads(data_json)
except:
return jsonify({'success': False, 'error': 'Data tidak valid'}), 400
# Determine config based on type
if doc_type == 'ktp':
identifier = data.get('nik', '')
folder = app.config['KTP_FOLDER']
Model = KTPRecord
id_field = 'nik'
else:
identifier = data.get('no_kk', '')
folder = app.config['KK_FOLDER']
Model = KKRecord
id_field = 'no_kk'
if not identifier:
return jsonify({'success': False, 'error': f'{id_field.upper()} tidak boleh kosong'}), 400
# Save image
file_ext = os.path.splitext(image_file.filename)[1] or '.jpg'
image_filename = f"{identifier}{file_ext}"
image_path = os.path.join(folder, image_filename)
image_file.save(image_path)
# DB Update
existing = Model.query.filter_by(**{id_field: identifier}).first()
if existing:
for k, v in data.items():
if hasattr(existing, k) and v:
setattr(existing, k, v)
existing.image_path = image_filename
db.session.commit()
record_id = existing.id
else:
# New Record
# Only pass fields that exist in model
# Filter data to match model columns (naive approach or using from_ocr_data if appropriate)
# Actually, direct generic init is risky if fields mismatch.
# Best to use specific logic or robust setattr?
# Let's use simple generic approach: Create empty, then update.
record = Model()
for k, v in data.items():
if hasattr(record, k):
setattr(record, k, v)
setattr(record, id_field, identifier)
record.image_path = image_filename
db.session.add(record)
db.session.commit()
record_id = record.id
return jsonify({'success': True, 'record_id': record_id, 'image_path': image_filename})
except Exception as e:
db.session.rollback()
logger.error(f"Save error: {e}")
return jsonify({'success': False, 'error': 'Gagal menyimpan'}), 500
@app.route('/ktp-images/<filename>')
def serve_ktp_image(filename):
"""Serve KTP images - Protected"""
if not session.get('logged_in'):
return "Unauthorized", 401
return send_from_directory(app.config['KTP_FOLDER'], filename)
@app.route('/kk-images/<filename>')
def serve_kk_image(filename):
"""Serve KK images - Protected"""
if not session.get('logged_in'):
return "Unauthorized", 401
return send_from_directory(app.config['KK_FOLDER'], filename)
@app.route('/api/ktp-archive', methods=['GET'])
def list_ktp_archive():
"""List all KTP records with images - Protected"""
if not session.get('logged_in'):
return jsonify({'success': False, 'error': 'Unauthorized'}), 401
try:
page, per_page = validate_pagination(
request.args.get('page', 1),
request.args.get('per_page', 20)
)
# Only get records that have images saved
pagination = KTPRecord.query.filter(
KTPRecord.image_path.isnot(None)
).order_by(KTPRecord.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'success': True,
'data': [r.to_dict() for r in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
})
except Exception as e:
logger.error(f"Error listing KTP archive: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal mengambil arsip KTP')
}), 500
@app.route('/api/kk-archive', methods=['GET'])
def list_kk_archive():
"""List all KK records with images - Protected"""
if not session.get('logged_in'):
return jsonify({'success': False, 'error': 'Unauthorized'}), 401
try:
page, per_page = validate_pagination(
request.args.get('page', 1),
request.args.get('per_page', 20)
)
# Only get records that have images saved
pagination = KKRecord.query.filter(
KKRecord.image_path.isnot(None)
).order_by(KKRecord.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'success': True,
'data': [r.to_dict() for r in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page
})
except Exception as e:
logger.error(f"Error listing KK archive: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal mengambil arsip KK')
}), 500
@app.route('/api/export-excel', methods=['POST'])
def export_excel():
"""Export data to Excel (XLSX)"""
try:
if not request.is_json:
return jsonify({'error': 'Format data tidak valid'}), 400
data = request.json
if not data:
return jsonify({'error': 'Data kosong'}), 400
wb = Workbook()
ws = wb.active
ws.title = "Data KTP"
# Headers
headers = ["ATRIBUT", "NILAI"]
ws.append(headers)
# Style Headers (Bold, Center, Gray Bg optional)
for cell in ws[1]:
cell.font = Font(bold=True)
cell.alignment = Alignment(horizontal='center')
# Data
exclude = ['raw_text', 'image_path', 'updated_at', 'created_at', 'id']
# Specific Order if possible
order = ['nik', 'nama', 'tempat_lahir', 'tanggal_lahir', 'jenis_kelamin', 'gol_darah',
'alamat', 'rt_rw', 'kel_desa', 'kecamatan', 'kabupaten_kota', 'provinsi',
'agama', 'status_perkawinan', 'pekerjaan', 'kewarganegaraan', 'berlaku_hingga']
start_row = 2
# Main Fields in Order
for key in order:
val = data.get(key)
if val:
ws.append([key.replace('_', ' ').upper(), str(val)])
# Other fields not in order
for k, v in data.items():
if k not in exclude and k not in order:
ws.append([k.replace('_', ' ').upper(), str(v)])
# Auto-adjust columns
for col in ws.columns:
max_length = 0
column = col[0].column_letter # Get the column name
for cell in col:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = (max_length + 2) * 1.2
ws.column_dimensions[column].width = adjusted_width
# Save to buffer
output = io.BytesIO()
wb.save(output)
output.seek(0)
filename = f"Data_KTP_{data.get('nik', 'Export')}.xlsx"
return send_file(
output,
as_attachment=True,
download_name=filename,
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
)
except Exception as e:
logger.error(f"Export Excel Error: {e}")
return jsonify({'error': 'Gagal export excel'}), 500
# ============================================
# Database CRUD API Endpoints
# ============================================
@app.route('/api/ktp', methods=['GET'])
def list_ktp_records():
"""List all KTP records with pagination"""
try:
page, per_page = validate_pagination(
request.args.get('page', 1),
request.args.get('per_page', 10)
)
pagination = KTPRecord.query.order_by(KTPRecord.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'success': True,
'data': [r.to_dict() for r in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page,
'per_page': per_page
})
except Exception as e:
logger.error(f"Error in list_ktp_records: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal mengambil data KTP')
}), 500
@app.route('/api/ktp/<int:id>', methods=['GET'])
def get_ktp_record(id):
"""Get KTP record by ID"""
try:
record = KTPRecord.query.get_or_404(id)
return jsonify({'success': True, 'data': record.to_dict()})
except Exception as e:
logger.error(f"Error in get_ktp_record: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Data KTP tidak ditemukan')
}), 404
@app.route('/api/ktp/nik/<nik>', methods=['GET'])
def get_ktp_by_nik(nik):
"""Get KTP record by NIK"""
try:
# Validasi format NIK
if not validate_nik(nik):
return jsonify({
'success': False,
'error': 'Format NIK tidak valid. NIK harus 16 digit angka'
}), 400
record = KTPRecord.query.filter_by(nik=nik).first()
if record:
return jsonify({'success': True, 'data': record.to_dict()})
return jsonify({'success': False, 'error': 'NIK tidak ditemukan'}), 404
except Exception as e:
logger.error(f"Error in get_ktp_by_nik: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal mencari data KTP')
}), 500
@app.route('/api/ktp/<int:id>', methods=['DELETE'])
def delete_ktp_record(id):
"""Delete KTP record by ID"""
try:
record = KTPRecord.query.get_or_404(id)
db.session.delete(record)
db.session.commit()
logger.info(f"Deleted KTP record: ID {id}")
return jsonify({'success': True, 'message': f'Record {id} berhasil dihapus'})
except Exception as e:
db.session.rollback()
logger.error(f"Error in delete_ktp_record: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal menghapus data KTP')
}), 500
@app.route('/api/kk', methods=['GET'])
def list_kk_records():
"""List all KK records with pagination"""
try:
page, per_page = validate_pagination(
request.args.get('page', 1),
request.args.get('per_page', 10)
)
pagination = KKRecord.query.order_by(KKRecord.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'success': True,
'data': [r.to_dict() for r in pagination.items],
'total': pagination.total,
'pages': pagination.pages,
'current_page': page,
'per_page': per_page
})
except Exception as e:
logger.error(f"Error in list_kk_records: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal mengambil data KK')
}), 500
@app.route('/api/kk/<int:id>', methods=['GET'])
def get_kk_record(id):
"""Get KK record by ID"""
try:
record = KKRecord.query.get_or_404(id)
return jsonify({'success': True, 'data': record.to_dict()})
except Exception as e:
logger.error(f"Error in get_kk_record: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Data KK tidak ditemukan')
}), 404
@app.route('/api/kk/<int:id>', methods=['DELETE'])
def delete_kk_record(id):
"""Delete KK record by ID"""
try:
record = KKRecord.query.get_or_404(id)
db.session.delete(record)
db.session.commit()
logger.info(f"Deleted KK record: ID {id}")
return jsonify({'success': True, 'message': f'Record {id} berhasil dihapus'})
except Exception as e:
db.session.rollback()
logger.error(f"Error in delete_kk_record: {e}", exc_info=True)
return jsonify({
'success': False,
'error': sanitize_error_message(e, 'Gagal menghapus data KK')
}), 500
if __name__ == '__main__':
# Konfigurasi dari environment variables
host = os.environ.get('FLASK_HOST', '0.0.0.0')
port = int(os.environ.get('FLASK_PORT', 5000))
debug = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true'
logger.info("="*50)
logger.info("OCR KTP/KK Application")
logger.info("="*50)
logger.info(f"Membuka: http://{host}:{port}")
logger.info("Tekan Ctrl+C untuk berhenti")
logger.info("="*50)
app.run(host=host, port=port, debug=debug)