import io import os import subprocess import threading import time import tkinter as tk from datetime import datetime import face_recognition import numpy as np import requests from PIL import Image, ImageTk BASE_DIR = os.path.expanduser("~/faces") SERVO_PI_IP = "10.71.203.135" TOLERANCE = 0.48 # Portrait preview PREVIEW_SIZE = (480, 640) PREVIEW_WIDGET_SIZE = (360, 480) WINDOW_SIZE = "480x800+0+0" CAPTURE_PATH = "/tmp/safe_capture.jpg" os.makedirs(BASE_DIR, exist_ok=True) KNOWN_ENCODINGS = [] SAFE_UNLOCKED = False STREAM_PROCESS = None STREAM_THREAD = None STREAM_STOP = threading.Event() LAST_FRAME = None FRAME_LOCK = threading.Lock() # ---------------- CAMERA ---------------- def _build_preview_command(): return [ "rpicam-vid", "-t", "0", "--inline", "--codec", "mjpeg", "--width", str(PREVIEW_SIZE[0]), "--height", str(PREVIEW_SIZE[1]), "--nopreview", "-o", "-", ] def init_camera_stream(): global STREAM_PROCESS, STREAM_THREAD if STREAM_PROCESS is not None: return STREAM_STOP.clear() STREAM_PROCESS = subprocess.Popen( _build_preview_command(), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=0, ) STREAM_THREAD = threading.Thread(target=_stream_reader_loop, daemon=True) STREAM_THREAD.start() def _stream_reader_loop(): global LAST_FRAME if STREAM_PROCESS is None or STREAM_PROCESS.stdout is None: return buffer = bytearray() stream = STREAM_PROCESS.stdout while not STREAM_STOP.is_set(): chunk = stream.read(4096) if not chunk: break buffer.extend(chunk) while True: start = buffer.find(b"\xff\xd8") if start == -1: if len(buffer) > 1024 * 1024: del buffer[:-2] break end = buffer.find(b"\xff\xd9", start + 2) if end == -1: if start > 0: del buffer[:start] break jpeg_bytes = bytes(buffer[start:end + 2]) del buffer[:end + 2] try: image = Image.open(io.BytesIO(jpeg_bytes)).convert("RGB") frame = np.array(image) with FRAME_LOCK: LAST_FRAME = frame except Exception: continue def get_current_frame(): with FRAME_LOCK: if LAST_FRAME is None: raise RuntimeError("No preview frame available yet") return LAST_FRAME.copy() def capture_still_image(path): command = [ "rpicam-still", "-o", path, "--nopreview", "--width", "1280", "--height", "720", "--timeout", "400", ] subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) def capture_face_frame(): return get_current_frame() def frame_to_photo(frame, size=PREVIEW_WIDGET_SIZE): image = Image.fromarray(frame) image = image.resize(size, Image.Resampling.LANCZOS) return ImageTk.PhotoImage(image) def update_preview(): try: frame = get_current_frame() photo = frame_to_photo(frame) camera_label.config(image=photo, text="") camera_label.image = photo except Exception: camera_label.config(image="", text="Waiting for camera...", fg="#9fb1d1") root.after(80, update_preview) def shutdown_camera_stream(): global STREAM_PROCESS STREAM_STOP.set() if STREAM_PROCESS is not None: try: STREAM_PROCESS.terminate() STREAM_PROCESS.wait(timeout=2) except Exception: try: STREAM_PROCESS.kill() except Exception: pass STREAM_PROCESS = None # ---------------- SERVO WIFI ---------------- def unlock_safe_remote(): requests.get(f"http://{SERVO_PI_IP}:5000/unlock", timeout=3) def lock_safe_remote(): requests.get(f"http://{SERVO_PI_IP}:5000/lock", timeout=3) # ---------------- FACES ---------------- def load_faces(): KNOWN_ENCODINGS.clear() for filename in sorted(os.listdir(BASE_DIR)): if not filename.lower().endswith((".jpg", ".jpeg", ".png")): continue path = os.path.join(BASE_DIR, filename) try: img = face_recognition.load_image_file(path) encodings = face_recognition.face_encodings(img) if len(encodings) == 1: KNOWN_ENCODINGS.append(encodings[0]) else: log(f"Skipped {filename}: expected 1 face") except Exception as e: log(f"Skipped {filename}: {e}") status_faces.config(text=str(len(KNOWN_ENCODINGS))) log(f"{len(KNOWN_ENCODINGS)} authorized face(s) loaded") update_controls() def add_face(): if not can_add_face(): log("Add face is unavailable right now") return try: log("Capturing authorized face...") frame = capture_face_frame() encodings = face_recognition.face_encodings(frame) if len(encodings) != 1: log("Error: exactly one face must be visible") return filename = datetime.now().strftime("face_%Y%m%d_%H%M%S.jpg") path = os.path.join(BASE_DIR, filename) Image.fromarray(frame).save(path, quality=95) KNOWN_ENCODINGS.append(encodings[0]) status_faces.config(text=str(len(KNOWN_ENCODINGS))) log("Authorized face added") update_controls() except Exception as e: log(f"Add face failed: {e}") # ---------------- VERIFY / STATE ---------------- def can_add_face(): return len(KNOWN_ENCODINGS) == 0 or SAFE_UNLOCKED def set_unlock_state(is_unlocked): global SAFE_UNLOCKED SAFE_UNLOCKED = is_unlocked if SAFE_UNLOCKED: status_value.config(text="Unlocked", fg=SUCCESS) else: status_value.config(text="Locked", fg=WARN) update_controls() def update_controls(): has_faces = len(KNOWN_ENCODINGS) > 0 add_btn.config(state=tk.NORMAL if can_add_face() else tk.DISABLED) unlock_btn.config(state=tk.NORMAL if (has_faces and not SAFE_UNLOCKED) else tk.DISABLED) lock_btn.config(state=tk.NORMAL if SAFE_UNLOCKED else tk.DISABLED) def unlock_flow(): if SAFE_UNLOCKED: log("Safe is already unlocked") return if not KNOWN_ENCODINGS: log("No authorized faces found. Add a face first") update_controls() return try: log("Scanning face...") frame = capture_face_frame() locations = face_recognition.face_locations(frame) if not locations: log("No face detected") return encodings = face_recognition.face_encodings(frame, locations) for encoding in encodings: distances = face_recognition.face_distance(KNOWN_ENCODINGS, encoding) if len(distances) and float(np.min(distances)) <= TOLERANCE: unlock_safe_remote() log("AUTHORIZED") set_unlock_state(True) return log("ACCESS DENIED") except Exception as e: log(f"Unlock failed: {e}") def lock_flow(): if not SAFE_UNLOCKED: log("Safe is already locked") return try: lock_safe_remote() log("SAFE LOCKED") set_unlock_state(False) except Exception as e: log(f"Lock failed: {e}") # ---------------- LOG ---------------- def log(message): timestamp = datetime.now().strftime("%H:%M:%S") log_box.insert(tk.END, f"[{timestamp}] {message}\n") log_box.see(tk.END) # ---------------- UI ---------------- # ---------------- UI ---------------- root = tk.Tk() root.title("AI Safe") root.geometry(WINDOW_SIZE) root.attributes("-fullscreen", True) root.configure(bg="#0b1220") root.resizable(False, False) HEADER_BG = "#111827" CARD_BG = "#172033" PANEL_BG = "#0f172a" TEXT = "#e5eefc" MUTED = "#9fb1d1" ACCENT = "#4f8cff" SUCCESS = "#10b981" WARN = "#f59e0b" DANGER = "#ef4444" root.grid_columnconfigure(0, weight=1) root.grid_rowconfigure(4, weight=1) # HEADER header = tk.Frame(root, bg=HEADER_BG, height=60) header.grid(row=0, column=0, sticky="ew") header.grid_propagate(False) brand = tk.Label(header, text="AI SAFE", font=("Arial", 20, "bold"), fg=TEXT, bg=HEADER_BG) brand.pack(side="left", padx=16) status_frame = tk.Frame(header, bg=HEADER_BG) status_frame.pack(side="right", padx=16) status_label = tk.Label(status_frame, text="Status", font=("Arial", 10), fg=MUTED, bg=HEADER_BG) status_label.pack(anchor="e") status_value = tk.Label(status_frame, text="Locked", font=("Arial", 14, "bold"), fg=WARN, bg=HEADER_BG) status_value.pack(anchor="e") # CAMERA PREVIEW preview_card = tk.Frame(root, bg=CARD_BG, highlightthickness=1, highlightbackground="#24324a") preview_card.grid(row=1, column=0, sticky="ew", padx=14, pady=(10,6)) preview_title = tk.Label(preview_card, text="Live Camera", font=("Arial", 14, "bold"), fg=TEXT, bg=CARD_BG) preview_title.pack(anchor="w", padx=16, pady=(10,4)) camera_label = tk.Label(preview_card, bg="#050b16", width=360, height=240, text="Starting camera...", font=("Arial", 12), fg=MUTED) camera_label.pack(padx=16, pady=(0,10)) # BUTTON CARD (MOVED ABOVE LOG) button_card = tk.Frame(root, bg=CARD_BG, highlightthickness=1, highlightbackground="#24324a") button_card.grid(row=2, column=0, sticky="ew", padx=14, pady=(0,6)) button_card.grid_columnconfigure(0, weight=1) button_card.grid_columnconfigure(1, weight=1) add_btn = tk.Button( button_card, text="Add Face", command=add_face, font=("Arial", 14, "bold"), bg=ACCENT, fg="white", activebackground="#3b73dd", activeforeground="white", bd=0, padx=12, pady=12, cursor="hand2", ) add_btn.grid(row=0, column=0, columnspan=2, sticky="ew", padx=16, pady=(12,8)) unlock_btn = tk.Button( button_card, text="Unlock", command=unlock_flow, font=("Arial", 13, "bold"), bg=SUCCESS, fg="#06120d", activebackground="#0ea371", bd=0, padx=12, pady=12, cursor="hand2", ) unlock_btn.grid(row=1, column=0, sticky="ew", padx=(16,8), pady=(0,12)) lock_btn = tk.Button( button_card, text="Lock", command=lock_flow, font=("Arial", 13, "bold"), bg=WARN, fg="#1f1300", activebackground="#db8b08", bd=0, padx=12, pady=12, cursor="hand2", ) lock_btn.grid(row=1, column=1, sticky="ew", padx=(8,16), pady=(0,12)) # SYSTEM OVERVIEW summary_card = tk.Frame(root, bg=CARD_BG, highlightthickness=1, highlightbackground="#24324a") summary_card.grid(row=3, column=0, sticky="ew", padx=14, pady=(0,6)) summary_title = tk.Label(summary_card, text="System Overview", font=("Arial", 14, "bold"), fg=TEXT, bg=CARD_BG) summary_title.pack(anchor="w", padx=16, pady=(10,4)) faces_label = tk.Label(summary_card, text="Authorized faces", font=("Arial", 10), fg=MUTED, bg=CARD_BG) faces_label.pack(anchor="w", padx=16) status_faces = tk.Label(summary_card, text="0", font=("Arial", 22, "bold"), fg=TEXT, bg=CARD_BG) status_faces.pack(anchor="w", padx=16, pady=(0,8)) ip_label = tk.Label(summary_card, text=f"Servo endpoint {SERVO_PI_IP}:5000", font=("Arial", 10), fg=MUTED, bg=CARD_BG) ip_label.pack(anchor="w", padx=16, pady=(0,10)) # ACTIVITY LOG (MOVED BELOW BUTTONS) log_card = tk.Frame(root, bg=CARD_BG, highlightthickness=1, highlightbackground="#24324a") log_card.grid(row=4, column=0, sticky="nsew", padx=14, pady=(0,12)) log_card.grid_rowconfigure(1, weight=1) log_card.grid_columnconfigure(0, weight=1) log_title = tk.Label(log_card, text="Activity Log", font=("Arial", 14, "bold"), fg=TEXT, bg=CARD_BG) log_title.grid(row=0, column=0, sticky="w", padx=16, pady=(10,4)) log_box = tk.Text( log_card, height=6, font=("Consolas", 10), bg=PANEL_BG, fg=TEXT, insertbackground=TEXT, bd=0, wrap="word", padx=12, pady=10, ) log_box.grid(row=1, column=0, sticky="nsew", padx=16, pady=(0,10)) def on_close(): shutdown_camera_stream() root.destroy() root.protocol("WM_DELETE_WINDOW", on_close) init_camera_stream() load_faces() set_unlock_state(False) log("System ready") update_preview() root.mainloop()