1044 lines
39 KiB
Python
1044 lines
39 KiB
Python
"""
|
|
Flask Web Server untuk OCR KTP/KK
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import uuid
|
|
import requests
|
|
import difflib
|
|
from functools import lru_cache
|
|
from flask import Flask, render_template, request, jsonify, send_from_directory, session, send_file
|
|
from werkzeug.utils import secure_filename
|
|
from PIL import Image
|
|
import numpy as np
|
|
import numpy as np
|
|
import math
|
|
import io
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Font, Alignment
|
|
|
|
from ocr_engine import get_ocr_engine
|
|
from ktp_extractor import KTPExtractor
|
|
from kk_extractor import KKExtractor
|
|
from database import db, init_db
|
|
from models import KTPRecord, KKRecord
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Konfigurasi
|
|
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), 'uploads')
|
|
KTP_FOLDER = os.path.join(os.path.dirname(__file__), 'KTP') # Defines KTP_FOLDER from previous steps (if not already there check context)
|
|
# Wait, let's make sure KTP_FOLDER is consistently defined.
|
|
# It was added in Step 94.
|
|
# Current view shows it might be there. I will ensure it's correct.
|
|
|
|
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'bmp', 'webp'}
|
|
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max
|
|
ALLOWED_DOC_TYPES = {'ktp', 'kk'}
|
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
app.config['KTP_FOLDER'] = KTP_FOLDER
|
|
app.config['KK_FOLDER'] = os.path.join(os.path.dirname(__file__), 'KK') # New KK Folder
|
|
app.config['MAX_CONTENT_LENGTH'] = MAX_CONTENT_LENGTH
|
|
app.secret_key = 'secure-key-ocr-ktp-app' # Required for session
|
|
|
|
# Simple Security
|
|
ADMIN_USERNAME = 'admin'
|
|
ADMIN_PASSWORD = '123'
|
|
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Production mode flag
|
|
PRODUCTION_MODE = os.environ.get('FLASK_ENV', 'development').lower() == 'production'
|
|
|
|
# Buat folder jika belum ada
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
# Buat folder jika belum ada
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
os.makedirs(KTP_FOLDER, exist_ok=True)
|
|
os.makedirs(app.config['KK_FOLDER'], exist_ok=True)
|
|
|
|
# Helper untuk Perspective Transform menggunakan Numpy (Inverse Matrix)
|
|
def find_coeffs(pa, pb):
|
|
matrix = []
|
|
for p1, p2 in zip(pa, pb):
|
|
matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]])
|
|
matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]])
|
|
|
|
A = np.matrix(matrix, dtype=float)
|
|
B = np.array(pb).reshape(8)
|
|
|
|
res = np.dot(np.linalg.inv(A.T * A) * A.T, B)
|
|
return np.array(res).reshape(8)
|
|
|
|
@app.route('/api/transform-perspective', methods=['POST'])
|
|
def transform_perspective():
|
|
try:
|
|
if 'image' not in request.files:
|
|
return jsonify({'success': False, 'error': 'No image uploaded'}), 400
|
|
|
|
file = request.files['image']
|
|
points_json = request.form.get('points', '[]')
|
|
import json
|
|
points = json.loads(points_json) # Expecting [[x1,y1], [x2,y2], [x3,y3], [x4,y4]] (TL, TR, BR, BL)
|
|
|
|
if len(points) != 4:
|
|
return jsonify({'success': False, 'error': 'Invalid points'}), 400
|
|
|
|
# Load Image
|
|
img = Image.open(file.stream)
|
|
|
|
# Determine format
|
|
fmt = img.format if img.format else 'JPEG'
|
|
if fmt not in ['JPEG', 'PNG', 'WEBP']:
|
|
fmt = 'JPEG'
|
|
|
|
# Target Dimensions (KTP Aspect Ratio)
|
|
# We can estimate width based on distance between top points or default to something high res
|
|
width = max(
|
|
math.hypot(points[1][0] - points[0][0], points[1][1] - points[0][1]),
|
|
math.hypot(points[2][0] - points[3][0], points[2][1] - points[3][1])
|
|
)
|
|
height = max(
|
|
math.hypot(points[3][0] - points[0][0], points[3][1] - points[0][1]),
|
|
math.hypot(points[2][0] - points[1][0], points[2][1] - points[1][1])
|
|
)
|
|
|
|
# Force Aspect Ratio based on Doc Type
|
|
doc_type = request.form.get('doc_type', 'ktp')
|
|
|
|
if doc_type == 'kk':
|
|
target_ratio = 297.0 / 210.0 # A4 Landscape
|
|
else:
|
|
target_ratio = 85.6 / 53.98 # KTP ID-1
|
|
|
|
# Use the calculated width/height as baseline, but adjust to match ratio
|
|
# Ideally, take the larger dimension and derive the other
|
|
if width / height > target_ratio:
|
|
target_width = int(width)
|
|
target_height = int(width / target_ratio)
|
|
else:
|
|
target_height = int(height)
|
|
target_width = int(height * target_ratio)
|
|
|
|
# Destination Points (Rectangular)
|
|
# [0,0], [w,0], [w,h], [0,h]
|
|
dst_points = [(0, 0), (target_width, 0), (target_width, target_height), (0, target_height)]
|
|
|
|
# Calculate coeffs
|
|
# Note: PIL transform perspective method uses INVERSE logic?
|
|
# "transform(size, PERSPECTIVE, data, method, fill, resample)"
|
|
# "data is an 8-tuple (a, b, c, d, e, f, g, h) which contains the first 8 coefficients of the perspective transform matrix."
|
|
# "For each pixel (x, y) in the output image, the new value is taken from a position (P a x + b y + c) / (g x + h y + 1), (d x + e y + f) / (g x + h y + 1) in the input image"
|
|
# So we map DST -> SRC
|
|
|
|
coeffs = find_coeffs(dst_points, points) # Dst -> Src mapping
|
|
|
|
# Perform transform
|
|
new_img = img.transform((target_width, target_height), Image.PERSPECTIVE, coeffs, Image.BICUBIC)
|
|
|
|
# Save to temporary buffer/file to return URL
|
|
filename = f"temp_transformed_{secure_filename(file.filename)}"
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
|
new_img.save(filepath, format=fmt)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'image_url': f"/uploads/{filename}",
|
|
'filename': filename
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Perspective transform error: {e}", exc_info=True)
|
|
return jsonify({'success': False, 'error': str(e)}), 500
|
|
|
|
@app.route('/uploads/<filename>')
|
|
def serve_uploaded_file(filename):
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
|
|
|
|
# ============================================
|
|
# Helper Functions
|
|
# ============================================
|
|
|
|
def sanitize_error_message(error, default_message="Terjadi kesalahan pada server"):
|
|
"""Sanitize error messages untuk production - jangan expose detail"""
|
|
if PRODUCTION_MODE:
|
|
return default_message
|
|
return str(error)
|
|
|
|
|
|
def validate_pagination(page, per_page, max_per_page=100):
|
|
"""Validate and sanitize pagination parameters"""
|
|
try:
|
|
page = int(page) if page else 1
|
|
per_page = int(per_page) if per_page else 10
|
|
|
|
# Ensure positive values
|
|
page = max(1, page)
|
|
per_page = max(1, min(per_page, max_per_page)) # Cap at max_per_page
|
|
|
|
return page, per_page
|
|
except (ValueError, TypeError):
|
|
return 1, 10 # Default values
|
|
|
|
|
|
def validate_nik(nik):
|
|
"""Validate NIK format (16 digits)"""
|
|
if not nik:
|
|
return False
|
|
# NIK should be 16 digits
|
|
return nik.isdigit() and len(nik) == 16
|
|
|
|
|
|
def validate_no_kk(no_kk):
|
|
"""Validate No KK format (16 digits)"""
|
|
if not no_kk:
|
|
return False
|
|
# No KK should be 16 digits
|
|
return no_kk.isdigit() and len(no_kk) == 16
|
|
|
|
# ============================================
|
|
# Error Handlers
|
|
# ============================================
|
|
|
|
@app.errorhandler(404)
|
|
def not_found(error):
|
|
"""Handle 404 errors"""
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Resource not found'
|
|
}), 404
|
|
|
|
|
|
@app.errorhandler(500)
|
|
def internal_error(error):
|
|
"""Handle 500 errors"""
|
|
logger.error(f"Internal server error: {error}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(error, 'Terjadi kesalahan pada server')
|
|
}), 500
|
|
|
|
|
|
@app.errorhandler(413)
|
|
def request_too_large(error):
|
|
"""Handle file too large errors"""
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'File terlalu besar. Maksimal 16MB'
|
|
}), 413
|
|
|
|
# Inisialisasi extractors
|
|
ktp_extractor = KTPExtractor()
|
|
kk_extractor = KKExtractor()
|
|
|
|
# Inisialisasi database
|
|
try:
|
|
init_db(app)
|
|
except Exception as e:
|
|
logger.warning(f"Database connection failed: {e}")
|
|
logger.warning("The app will work but data won't be saved to MySQL.")
|
|
|
|
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Halaman utama"""
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/upload', methods=['POST'])
|
|
def upload_file():
|
|
"""Handle upload dan proses OCR"""
|
|
try:
|
|
# Cek file
|
|
if 'file' not in request.files:
|
|
return jsonify({'success': False, 'error': 'Tidak ada file yang diupload'}), 400
|
|
|
|
file = request.files['file']
|
|
doc_type = request.form.get('doc_type', 'ktp').lower()
|
|
|
|
# Validasi doc_type
|
|
if doc_type not in ALLOWED_DOC_TYPES:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Jenis dokumen tidak valid. Gunakan: {", ".join(ALLOWED_DOC_TYPES)}'
|
|
}), 400
|
|
|
|
if file.filename == '':
|
|
return jsonify({'success': False, 'error': 'Nama file kosong'}), 400
|
|
|
|
if not allowed_file(file.filename):
|
|
return jsonify({'success': False, 'error': 'Format file tidak didukung. Gunakan PNG, JPG, JPEG, BMP, atau WEBP'}), 400
|
|
|
|
# Simpan file dengan unique filename untuk menghindari race condition
|
|
original_filename = secure_filename(file.filename)
|
|
file_ext = os.path.splitext(original_filename)[1]
|
|
unique_filename = f"{uuid.uuid4().hex}{file_ext}"
|
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
|
|
file.save(filepath)
|
|
|
|
try:
|
|
# Jalankan OCR
|
|
ocr_engine = get_ocr_engine()
|
|
ocr_results = ocr_engine.extract_text(filepath)
|
|
|
|
if not ocr_results:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Tidak dapat membaca teks dari gambar. Pastikan gambar jelas dan tidak blur.'
|
|
}), 400
|
|
|
|
# Ekstrak field berdasarkan jenis dokumen
|
|
validation_meta = None
|
|
|
|
if doc_type == 'ktp':
|
|
extracted = ktp_extractor.extract(ocr_results)
|
|
# Auto-correct and validate regions
|
|
extracted, validation_meta = validate_and_correct_regions(extracted)
|
|
else:
|
|
extracted = kk_extractor.extract(ocr_results)
|
|
|
|
# Raw text untuk debugging
|
|
raw_text = '\n'.join([r['text'] for r in ocr_results])
|
|
|
|
# Log raw OCR results untuk debugging
|
|
logger.debug("Raw OCR Results:")
|
|
for i, r in enumerate(ocr_results):
|
|
logger.debug(f"[{i}] {r['text']}")
|
|
|
|
# Simpan ke database (optional - jika save_to_db=true)
|
|
record_id = None
|
|
save_to_db = request.form.get('save_to_db', 'true').lower() == 'true'
|
|
|
|
if save_to_db:
|
|
try:
|
|
if doc_type == 'ktp':
|
|
# Cek apakah NIK sudah ada
|
|
existing = KTPRecord.query.filter_by(nik=extracted.get('nik')).first()
|
|
if existing:
|
|
# Update existing record
|
|
for key, value in extracted.items():
|
|
if hasattr(existing, key) and value:
|
|
setattr(existing, key, value)
|
|
existing.raw_text = raw_text
|
|
db.session.commit()
|
|
record_id = existing.id
|
|
logger.info(f"Updated KTP record: NIK {extracted.get('nik')}")
|
|
else:
|
|
# Create new record
|
|
record = KTPRecord.from_ocr_data(extracted, raw_text)
|
|
db.session.add(record)
|
|
db.session.commit()
|
|
record_id = record.id
|
|
logger.info(f"Created new KTP record: NIK {extracted.get('nik')}")
|
|
else:
|
|
# KK
|
|
existing = KKRecord.query.filter_by(no_kk=extracted.get('no_kk')).first()
|
|
if existing:
|
|
for key, value in extracted.items():
|
|
if hasattr(existing, key) and value:
|
|
setattr(existing, key, value)
|
|
existing.raw_text = raw_text
|
|
db.session.commit()
|
|
record_id = existing.id
|
|
logger.info(f"Updated KK record: No KK {extracted.get('no_kk')}")
|
|
else:
|
|
record = KKRecord.from_ocr_data(extracted, raw_text)
|
|
db.session.add(record)
|
|
db.session.commit()
|
|
record_id = record.id
|
|
logger.info(f"Created new KK record: No KK {extracted.get('no_kk')}")
|
|
except Exception as db_error:
|
|
db.session.rollback()
|
|
logger.error(f"Database save error: {db_error}", exc_info=True)
|
|
# Continue without saving - don't fail the OCR request
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'doc_type': doc_type,
|
|
'data': extracted,
|
|
'validation': validation_meta,
|
|
'raw_text': raw_text,
|
|
'ocr_count': len(ocr_results),
|
|
'record_id': record_id,
|
|
'saved_to_db': record_id is not None
|
|
})
|
|
|
|
finally:
|
|
# Hapus file upload setelah proses (untuk keamanan data pribadi)
|
|
if os.path.exists(filepath):
|
|
try:
|
|
os.remove(filepath)
|
|
except Exception as cleanup_error:
|
|
logger.warning(f"Failed to cleanup file {filepath}: {cleanup_error}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in upload_file: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal memproses file. Pastikan file valid dan coba lagi.')
|
|
}), 500
|
|
|
|
|
|
# ============================================
|
|
# Region Data API (using wilayah.id)
|
|
# ============================================
|
|
|
|
WILAYAH_API_BASE = "https://wilayah.id/api"
|
|
|
|
@lru_cache(maxsize=500) # Optimized: increased from 100
|
|
def fetch_region_data(endpoint):
|
|
"""Fetch region data with caching"""
|
|
try:
|
|
response = requests.get(f"{WILAYAH_API_BASE}/{endpoint}", timeout=10)
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching region data from {endpoint}: {e}")
|
|
return None
|
|
|
|
|
|
def normalize_name(name):
|
|
"""Normalize name for comparison"""
|
|
if not name:
|
|
return ""
|
|
return name.upper().strip().replace(".", "").replace(" ", "")
|
|
|
|
|
|
def find_best_match(search_name, items, key='name', cutoff=0.6):
|
|
"""Find best matching item using difflib for better fuzzy matching"""
|
|
if not search_name or not items:
|
|
return None
|
|
|
|
# Optimized: Create map of uppercase names to items directly
|
|
# This avoids the inefficient iteration at the end
|
|
names_upper = []
|
|
name_map_upper = {}
|
|
|
|
for item in items:
|
|
name = item.get(key, '')
|
|
name_upper = name.upper()
|
|
names_upper.append(name_upper)
|
|
# Store both original name and item for quick lookup
|
|
if name_upper not in name_map_upper:
|
|
name_map_upper[name_upper] = item
|
|
|
|
# Python's difflib is good for typos
|
|
search_upper = search_name.upper()
|
|
matches = difflib.get_close_matches(search_upper, names_upper, n=1, cutoff=cutoff)
|
|
|
|
if matches:
|
|
# Direct lookup - much more efficient
|
|
matched_name_upper = matches[0]
|
|
return name_map_upper.get(matched_name_upper)
|
|
|
|
return None
|
|
|
|
def validate_and_correct_regions(ocr_data):
|
|
"""
|
|
Validate and correct region data cascadingly:
|
|
Prov -> Kab/Kota -> Kec -> Kel/Desa
|
|
|
|
ENHANCED: Jika Provinsi/Kabupaten kosong tapi Kecamatan/Desa terdeteksi,
|
|
lakukan reverse lookup menggunakan:
|
|
1. Kode NIK (2 digit pertama = provinsi, 4 digit = kabupaten)
|
|
2. Search di database wilayah.id
|
|
|
|
Returns tuple (updated_data, validation_codes)
|
|
"""
|
|
result = ocr_data.copy()
|
|
validation_codes = {
|
|
'provinsi': {'valid': False, 'code': None, 'suggestion': None},
|
|
'kabupaten_kota': {'valid': False, 'code': None, 'suggestion': None},
|
|
'kecamatan': {'valid': False, 'code': None, 'suggestion': None},
|
|
'kel_desa': {'valid': False, 'code': None, 'suggestion': None}
|
|
}
|
|
|
|
# ============================================
|
|
# STRATEGY 1: Extract from NIK if available
|
|
# ============================================
|
|
# NIK format: PPKKDD-DDMMYY-XXXX
|
|
# PP = Provinsi, PPKK = Kabupaten, PPKKDD = Kecamatan
|
|
nik = result.get('nik', '')
|
|
prov_code_from_nik = None
|
|
kab_code_from_nik = None
|
|
|
|
if nik and len(nik) >= 6:
|
|
prov_code_from_nik = nik[:2] # 2 digit provinsi
|
|
kab_code_from_nik = nik[:4] # 4 digit kabupaten
|
|
logger.debug(f"NIK LOOKUP: Provinsi code: {prov_code_from_nik}, Kabupaten code: {kab_code_from_nik}")
|
|
|
|
provinces_data = fetch_region_data("provinces.json")
|
|
|
|
# ============================================
|
|
# STRATEGY 2: Forward validation (jika data ada)
|
|
# ============================================
|
|
if provinces_data and 'data' in provinces_data:
|
|
prov_match = None
|
|
|
|
# Coba match dari nama provinsi terlebih dahulu
|
|
if result.get('provinsi'):
|
|
prov_match = find_best_match(result.get('provinsi'), provinces_data['data'])
|
|
|
|
# Jika tidak ada nama provinsi, coba dari NIK
|
|
if not prov_match and prov_code_from_nik:
|
|
for prov in provinces_data['data']:
|
|
if prov['code'] == prov_code_from_nik:
|
|
prov_match = prov
|
|
logger.debug(f"NIK LOOKUP: Found province from NIK: {prov['name']}")
|
|
break
|
|
|
|
if prov_match:
|
|
result['provinsi'] = prov_match['name']
|
|
validation_codes['provinsi'] = {'valid': True, 'code': prov_match['code'], 'suggestion': prov_match['name']}
|
|
|
|
# 2. Validate Regency (using Prov Code)
|
|
regencies_data = fetch_region_data(f"regencies/{prov_match['code']}.json")
|
|
if regencies_data and 'data' in regencies_data:
|
|
reg_match = None
|
|
|
|
# Coba match dari nama kabupaten
|
|
if result.get('kabupaten_kota'):
|
|
reg_match = find_best_match(result.get('kabupaten_kota'), regencies_data['data'])
|
|
|
|
# Jika tidak ada, coba dari NIK
|
|
if not reg_match and kab_code_from_nik:
|
|
for reg in regencies_data['data']:
|
|
if reg['code'] == kab_code_from_nik:
|
|
reg_match = reg
|
|
logger.debug(f"NIK LOOKUP: Found regency from NIK: {reg['name']}")
|
|
break
|
|
|
|
# REVERSE LOOKUP: Jika masih tidak ada, cari dari kecamatan (dengan limit untuk performa)
|
|
if not reg_match and result.get('kecamatan'):
|
|
logger.debug(f"REVERSE LOOKUP: Searching regency by kecamatan: {result.get('kecamatan')}")
|
|
max_reverse_lookup = 50 # Limit untuk performa
|
|
for i, reg in enumerate(regencies_data['data']):
|
|
if i >= max_reverse_lookup:
|
|
logger.warning(f"Reverse lookup stopped at limit {max_reverse_lookup}")
|
|
break
|
|
districts_data = fetch_region_data(f"districts/{reg['code']}.json")
|
|
if districts_data and 'data' in districts_data:
|
|
dist_match = find_best_match(result.get('kecamatan'), districts_data['data'], cutoff=0.7)
|
|
if dist_match:
|
|
reg_match = reg
|
|
logger.debug(f"REVERSE LOOKUP: Found regency: {reg['name']} (via kecamatan {dist_match['name']})")
|
|
break
|
|
|
|
if reg_match:
|
|
result['kabupaten_kota'] = reg_match['name']
|
|
validation_codes['kabupaten_kota'] = {'valid': True, 'code': reg_match['code'], 'suggestion': reg_match['name']}
|
|
|
|
# 3. Validate District (using Kab Code)
|
|
districts_data = fetch_region_data(f"districts/{reg_match['code']}.json")
|
|
if districts_data and 'data' in districts_data:
|
|
dist_match = find_best_match(result.get('kecamatan'), districts_data['data'])
|
|
|
|
# REVERSE LOOKUP: Jika kecamatan kosong tapi kel_desa ada, cari dari desa (dengan limit)
|
|
if not dist_match and result.get('kel_desa'):
|
|
logger.debug(f"REVERSE LOOKUP: Searching kecamatan by kel_desa: {result.get('kel_desa')}")
|
|
max_reverse_lookup = 30 # Limit untuk performa
|
|
for i, dist in enumerate(districts_data['data']):
|
|
if i >= max_reverse_lookup:
|
|
logger.warning(f"Reverse lookup stopped at limit {max_reverse_lookup}")
|
|
break
|
|
villages_data = fetch_region_data(f"villages/{dist['code']}.json")
|
|
if villages_data and 'data' in villages_data:
|
|
vil_match = find_best_match(result.get('kel_desa'), villages_data['data'], cutoff=0.7)
|
|
if vil_match:
|
|
dist_match = dist
|
|
logger.debug(f"REVERSE LOOKUP: Found kecamatan: {dist['name']} (via desa {vil_match['name']})")
|
|
# Juga update result untuk kecamatan
|
|
result['kecamatan'] = dist['name']
|
|
break
|
|
|
|
if dist_match:
|
|
result['kecamatan'] = dist_match['name']
|
|
validation_codes['kecamatan'] = {'valid': True, 'code': dist_match['code'], 'suggestion': dist_match['name']}
|
|
|
|
# 4. Validate Village (using Kec Code)
|
|
villages_data = fetch_region_data(f"villages/{dist_match['code']}.json")
|
|
if villages_data and 'data' in villages_data:
|
|
vil_match = find_best_match(result.get('kel_desa'), villages_data['data'])
|
|
if vil_match:
|
|
result['kel_desa'] = vil_match['name']
|
|
validation_codes['kel_desa'] = {'valid': True, 'code': vil_match['code'], 'suggestion': vil_match['name']}
|
|
|
|
return result, validation_codes
|
|
|
|
@app.route('/api/validate-region', methods=['POST'])
|
|
def validate_region():
|
|
"""Validate OCR region data against official database"""
|
|
try:
|
|
if not request.is_json:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Request harus berupa JSON'
|
|
}), 400
|
|
|
|
ocr_data = request.json
|
|
if not ocr_data:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Data tidak boleh kosong'
|
|
}), 400
|
|
|
|
_, validation_result = validate_and_correct_regions(ocr_data)
|
|
return jsonify({'success': True, 'validation': validation_result})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in validate_region: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal memvalidasi data wilayah')
|
|
}), 500
|
|
|
|
|
|
@app.route('/health')
|
|
def health():
|
|
"""Health check endpoint"""
|
|
return jsonify({'status': 'ok'})
|
|
|
|
@app.route('/api/login', methods=['POST'])
|
|
def login():
|
|
"""Simple login"""
|
|
if not request.is_json:
|
|
return jsonify({'success': False}), 400
|
|
data = request.json
|
|
if data.get('username') == ADMIN_USERNAME and data.get('password') == ADMIN_PASSWORD:
|
|
session['logged_in'] = True
|
|
return jsonify({'success': True})
|
|
return jsonify({'success': False, 'error': 'Username atau Password salah'}), 401
|
|
|
|
@app.route('/api/check-auth')
|
|
def check_auth():
|
|
"""Check session"""
|
|
if session.get('logged_in'):
|
|
return jsonify({'authenticated': True})
|
|
return jsonify({'authenticated': False}), 401
|
|
|
|
|
|
|
|
# ============================================
|
|
# KTP Archive Endpoints
|
|
# ============================================
|
|
|
|
# ============================================
|
|
# KTP Archive Endpoints
|
|
# ============================================
|
|
|
|
@app.route('/api/save-ktp', methods=['POST'])
|
|
def save_ktp():
|
|
"""Save edited KTP data with cropped image"""
|
|
return save_document('ktp')
|
|
|
|
@app.route('/api/save-kk', methods=['POST'])
|
|
def save_kk():
|
|
"""Save edited KK data with cropped image"""
|
|
return save_document('kk')
|
|
|
|
def save_document(doc_type):
|
|
"""Generic save function"""
|
|
try:
|
|
if 'image' not in request.files:
|
|
return jsonify({'success': False, 'error': 'Tidak ada gambar'}), 400
|
|
|
|
image_file = request.files['image']
|
|
data_json = request.form.get('data', '{}')
|
|
|
|
import json
|
|
try:
|
|
data = json.loads(data_json)
|
|
except:
|
|
return jsonify({'success': False, 'error': 'Data tidak valid'}), 400
|
|
|
|
# Determine config based on type
|
|
if doc_type == 'ktp':
|
|
identifier = data.get('nik', '')
|
|
folder = app.config['KTP_FOLDER']
|
|
Model = KTPRecord
|
|
id_field = 'nik'
|
|
else:
|
|
identifier = data.get('no_kk', '')
|
|
folder = app.config['KK_FOLDER']
|
|
Model = KKRecord
|
|
id_field = 'no_kk'
|
|
|
|
if not identifier:
|
|
return jsonify({'success': False, 'error': f'{id_field.upper()} tidak boleh kosong'}), 400
|
|
|
|
# Save image
|
|
file_ext = os.path.splitext(image_file.filename)[1] or '.jpg'
|
|
image_filename = f"{identifier}{file_ext}"
|
|
image_path = os.path.join(folder, image_filename)
|
|
image_file.save(image_path)
|
|
|
|
# DB Update
|
|
existing = Model.query.filter_by(**{id_field: identifier}).first()
|
|
if existing:
|
|
for k, v in data.items():
|
|
if hasattr(existing, k) and v:
|
|
setattr(existing, k, v)
|
|
existing.image_path = image_filename
|
|
db.session.commit()
|
|
record_id = existing.id
|
|
else:
|
|
# New Record
|
|
# Only pass fields that exist in model
|
|
# Filter data to match model columns (naive approach or using from_ocr_data if appropriate)
|
|
# Actually, direct generic init is risky if fields mismatch.
|
|
# Best to use specific logic or robust setattr?
|
|
# Let's use simple generic approach: Create empty, then update.
|
|
record = Model()
|
|
for k, v in data.items():
|
|
if hasattr(record, k):
|
|
setattr(record, k, v)
|
|
setattr(record, id_field, identifier)
|
|
record.image_path = image_filename
|
|
db.session.add(record)
|
|
db.session.commit()
|
|
record_id = record.id
|
|
|
|
return jsonify({'success': True, 'record_id': record_id, 'image_path': image_filename})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Save error: {e}")
|
|
return jsonify({'success': False, 'error': 'Gagal menyimpan'}), 500
|
|
|
|
|
|
@app.route('/ktp-images/<filename>')
|
|
def serve_ktp_image(filename):
|
|
"""Serve KTP images - Protected"""
|
|
if not session.get('logged_in'):
|
|
return "Unauthorized", 401
|
|
return send_from_directory(app.config['KTP_FOLDER'], filename)
|
|
|
|
@app.route('/kk-images/<filename>')
|
|
def serve_kk_image(filename):
|
|
"""Serve KK images - Protected"""
|
|
if not session.get('logged_in'):
|
|
return "Unauthorized", 401
|
|
return send_from_directory(app.config['KK_FOLDER'], filename)
|
|
|
|
|
|
|
|
@app.route('/api/ktp-archive', methods=['GET'])
|
|
def list_ktp_archive():
|
|
"""List all KTP records with images - Protected"""
|
|
if not session.get('logged_in'):
|
|
return jsonify({'success': False, 'error': 'Unauthorized'}), 401
|
|
try:
|
|
page, per_page = validate_pagination(
|
|
request.args.get('page', 1),
|
|
request.args.get('per_page', 20)
|
|
)
|
|
|
|
# Only get records that have images saved
|
|
pagination = KTPRecord.query.filter(
|
|
KTPRecord.image_path.isnot(None)
|
|
).order_by(KTPRecord.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [r.to_dict() for r in pagination.items],
|
|
'total': pagination.total,
|
|
'pages': pagination.pages,
|
|
'current_page': page
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error listing KTP archive: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal mengambil arsip KTP')
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/kk-archive', methods=['GET'])
|
|
def list_kk_archive():
|
|
"""List all KK records with images - Protected"""
|
|
if not session.get('logged_in'):
|
|
return jsonify({'success': False, 'error': 'Unauthorized'}), 401
|
|
try:
|
|
page, per_page = validate_pagination(
|
|
request.args.get('page', 1),
|
|
request.args.get('per_page', 20)
|
|
)
|
|
|
|
# Only get records that have images saved
|
|
pagination = KKRecord.query.filter(
|
|
KKRecord.image_path.isnot(None)
|
|
).order_by(KKRecord.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [r.to_dict() for r in pagination.items],
|
|
'total': pagination.total,
|
|
'pages': pagination.pages,
|
|
'current_page': page
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error listing KK archive: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal mengambil arsip KK')
|
|
}), 500
|
|
|
|
@app.route('/api/export-excel', methods=['POST'])
|
|
def export_excel():
|
|
"""Export data to Excel (XLSX)"""
|
|
try:
|
|
if not request.is_json:
|
|
return jsonify({'error': 'Format data tidak valid'}), 400
|
|
|
|
data = request.json
|
|
if not data:
|
|
return jsonify({'error': 'Data kosong'}), 400
|
|
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "Data KTP"
|
|
|
|
# Headers
|
|
headers = ["ATRIBUT", "NILAI"]
|
|
ws.append(headers)
|
|
|
|
# Style Headers (Bold, Center, Gray Bg optional)
|
|
for cell in ws[1]:
|
|
cell.font = Font(bold=True)
|
|
cell.alignment = Alignment(horizontal='center')
|
|
|
|
# Data
|
|
exclude = ['raw_text', 'image_path', 'updated_at', 'created_at', 'id']
|
|
|
|
# Specific Order if possible
|
|
order = ['nik', 'nama', 'tempat_lahir', 'tanggal_lahir', 'jenis_kelamin', 'gol_darah',
|
|
'alamat', 'rt_rw', 'kel_desa', 'kecamatan', 'kabupaten_kota', 'provinsi',
|
|
'agama', 'status_perkawinan', 'pekerjaan', 'kewarganegaraan', 'berlaku_hingga']
|
|
|
|
start_row = 2
|
|
|
|
# Main Fields in Order
|
|
for key in order:
|
|
val = data.get(key)
|
|
if val:
|
|
ws.append([key.replace('_', ' ').upper(), str(val)])
|
|
|
|
# Other fields not in order
|
|
for k, v in data.items():
|
|
if k not in exclude and k not in order:
|
|
ws.append([k.replace('_', ' ').upper(), str(v)])
|
|
|
|
# Auto-adjust columns
|
|
for col in ws.columns:
|
|
max_length = 0
|
|
column = col[0].column_letter # Get the column name
|
|
for cell in col:
|
|
try:
|
|
if len(str(cell.value)) > max_length:
|
|
max_length = len(str(cell.value))
|
|
except:
|
|
pass
|
|
adjusted_width = (max_length + 2) * 1.2
|
|
ws.column_dimensions[column].width = adjusted_width
|
|
|
|
# Save to buffer
|
|
output = io.BytesIO()
|
|
wb.save(output)
|
|
output.seek(0)
|
|
|
|
filename = f"Data_KTP_{data.get('nik', 'Export')}.xlsx"
|
|
|
|
return send_file(
|
|
output,
|
|
as_attachment=True,
|
|
download_name=filename,
|
|
mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Export Excel Error: {e}")
|
|
return jsonify({'error': 'Gagal export excel'}), 500
|
|
|
|
|
|
|
|
# ============================================
|
|
# Database CRUD API Endpoints
|
|
# ============================================
|
|
|
|
@app.route('/api/ktp', methods=['GET'])
|
|
def list_ktp_records():
|
|
"""List all KTP records with pagination"""
|
|
try:
|
|
page, per_page = validate_pagination(
|
|
request.args.get('page', 1),
|
|
request.args.get('per_page', 10)
|
|
)
|
|
|
|
pagination = KTPRecord.query.order_by(KTPRecord.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [r.to_dict() for r in pagination.items],
|
|
'total': pagination.total,
|
|
'pages': pagination.pages,
|
|
'current_page': page,
|
|
'per_page': per_page
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error in list_ktp_records: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal mengambil data KTP')
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/ktp/<int:id>', methods=['GET'])
|
|
def get_ktp_record(id):
|
|
"""Get KTP record by ID"""
|
|
try:
|
|
record = KTPRecord.query.get_or_404(id)
|
|
return jsonify({'success': True, 'data': record.to_dict()})
|
|
except Exception as e:
|
|
logger.error(f"Error in get_ktp_record: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Data KTP tidak ditemukan')
|
|
}), 404
|
|
|
|
|
|
@app.route('/api/ktp/nik/<nik>', methods=['GET'])
|
|
def get_ktp_by_nik(nik):
|
|
"""Get KTP record by NIK"""
|
|
try:
|
|
# Validasi format NIK
|
|
if not validate_nik(nik):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Format NIK tidak valid. NIK harus 16 digit angka'
|
|
}), 400
|
|
|
|
record = KTPRecord.query.filter_by(nik=nik).first()
|
|
if record:
|
|
return jsonify({'success': True, 'data': record.to_dict()})
|
|
return jsonify({'success': False, 'error': 'NIK tidak ditemukan'}), 404
|
|
except Exception as e:
|
|
logger.error(f"Error in get_ktp_by_nik: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal mencari data KTP')
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/ktp/<int:id>', methods=['DELETE'])
|
|
def delete_ktp_record(id):
|
|
"""Delete KTP record by ID"""
|
|
try:
|
|
record = KTPRecord.query.get_or_404(id)
|
|
db.session.delete(record)
|
|
db.session.commit()
|
|
logger.info(f"Deleted KTP record: ID {id}")
|
|
return jsonify({'success': True, 'message': f'Record {id} berhasil dihapus'})
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error in delete_ktp_record: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal menghapus data KTP')
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/kk', methods=['GET'])
|
|
def list_kk_records():
|
|
"""List all KK records with pagination"""
|
|
try:
|
|
page, per_page = validate_pagination(
|
|
request.args.get('page', 1),
|
|
request.args.get('per_page', 10)
|
|
)
|
|
|
|
pagination = KKRecord.query.order_by(KKRecord.created_at.desc()).paginate(
|
|
page=page, per_page=per_page, error_out=False
|
|
)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'data': [r.to_dict() for r in pagination.items],
|
|
'total': pagination.total,
|
|
'pages': pagination.pages,
|
|
'current_page': page,
|
|
'per_page': per_page
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error in list_kk_records: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal mengambil data KK')
|
|
}), 500
|
|
|
|
|
|
@app.route('/api/kk/<int:id>', methods=['GET'])
|
|
def get_kk_record(id):
|
|
"""Get KK record by ID"""
|
|
try:
|
|
record = KKRecord.query.get_or_404(id)
|
|
return jsonify({'success': True, 'data': record.to_dict()})
|
|
except Exception as e:
|
|
logger.error(f"Error in get_kk_record: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Data KK tidak ditemukan')
|
|
}), 404
|
|
|
|
|
|
@app.route('/api/kk/<int:id>', methods=['DELETE'])
|
|
def delete_kk_record(id):
|
|
"""Delete KK record by ID"""
|
|
try:
|
|
record = KKRecord.query.get_or_404(id)
|
|
db.session.delete(record)
|
|
db.session.commit()
|
|
logger.info(f"Deleted KK record: ID {id}")
|
|
return jsonify({'success': True, 'message': f'Record {id} berhasil dihapus'})
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error in delete_kk_record: {e}", exc_info=True)
|
|
return jsonify({
|
|
'success': False,
|
|
'error': sanitize_error_message(e, 'Gagal menghapus data KK')
|
|
}), 500
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Konfigurasi dari environment variables
|
|
host = os.environ.get('FLASK_HOST', '0.0.0.0')
|
|
port = int(os.environ.get('FLASK_PORT', 5000))
|
|
debug = os.environ.get('FLASK_DEBUG', 'True').lower() == 'true'
|
|
|
|
logger.info("="*50)
|
|
logger.info("OCR KTP/KK Application")
|
|
logger.info("="*50)
|
|
logger.info(f"Membuka: http://{host}:{port}")
|
|
logger.info("Tekan Ctrl+C untuk berhenti")
|
|
logger.info("="*50)
|
|
|
|
app.run(host=host, port=port, debug=debug)
|