Parking Finder
The Parking Finder system is a smart parking solution designed to help drivers efficiently locate available
Loading...
Searching...
No Matches
find_distance.py
Go to the documentation of this file.
1
44
45import traceback
46import atexit
47import cv2
48from ultralytics import YOLO
49from flask import Flask, jsonify
50from flask_cors import CORS
51from multiprocessing import Process, Lock, Event, Value
52from multiprocessing.shared_memory import SharedMemory
53import numpy as np
54import time
55from threading import Thread
56import torch
57import os
58import re
59import threading
60from datetime import datetime
61import argparse
62import signal
63import subprocess
64
65# ── Env / torch tuning ────────────────────────────────────────────────────────
66os.environ['CUDA_LAUNCH_BLOCKING'] = '0'
67os.environ['OPENCV_VIDEOIO_PRIORITY_GSTREAMER'] = '0'
68torch.backends.cudnn.benchmark = True
69torch.backends.cudnn.enabled = True
70torch.set_num_threads(1)
71cv2.setNumThreads(1)
72
73
81def set_jetson_clocks(enable: bool):
82 try:
83 if enable:
84 subprocess.run(['sudo', 'jetson_clocks', '--store', JETSON_CLOCKS_CONF], check=True)
85 subprocess.run(['sudo', 'jetson_clocks'], check=True)
86 print("\033[1;91mWarning: [SYSTEM] JETSON CLOCKS LOCKED\033[0m")
87 else:
88 subprocess.run(['sudo', 'jetson_clocks', '--restore', JETSON_CLOCKS_CONF], check=True)
89 print("\033[1;91mWarning: [SYSTEM] JETSON CLOCKS RESTORED\033[0m")
90 except Exception as e:
91 print(f"\033[1;91mWarning: [SYSTEM] JETSON CLOCKS FAILED: {e}\033[0m")
92
93
101def parse_args():
102 p = argparse.ArgumentParser(description="Parking Finder")
103 p.add_argument("-t", "-T", "--test", action="store_true", help="Enable display")
104 p.add_argument("-R", "-r", "--record", action="store_true", help="Record")
105 p.add_argument("-A", "-a", "--annotate", action="store_true", help="Records the annotated version (requires -r)")
106 return p.parse_args()
107
108
110JETSON_CLOCKS_CONF = '/tmp/jetson_clocks_backup.conf'
111
112
114CAP_W = 640
115
117CAP_H = 480
118
120DISP_W = 640
121
123DISP_H = 480
124
126CAP_SHAPE = (CAP_H, CAP_W, 3)
127
129DISP_SHAPE = (DISP_H, DISP_W, 3)
130
135SOURCES = ['./recordings/left_side_cam.mp4', './recordings/right_side_ccam.mp4']
136
138CAPTURE_FPS = 30
139
141INFERENCEFPS = 15
142
144IMGSZ = 128
145
147CONF = 0.25
148
150MODEL_PATH = "./models/yolo11n.engine"
151# MODEL_PATH = "./yolo11n.pt"
152# MODEL_PATH = "./models/yolo26x.engine"
153
156MAX_BATCH = 3
157
159CLASSES = [2, 3, 5, 7]
160
162CAM_ORDER = ['Left', 'Right']
163
165DISPLAY_ORDER = [0, 1]
166
168INF_IDX = [0, 1]
169
171SPOT_CAMS = {'Left', 'Right'}
172
175MIN_BOX_H = {'Left': 30, 'Right': 30}
176
179INTERSECT_ALLOWANCE = 0.15
180
182AUTO_CALIBRATE_INTERVAL = 0
183
186ROIS = {
187 'Left': [
188 ],
189 'Right': [
190 ],
191}
192
195SPOT_MASK: dict[str, list[np.ndarray]] = {}
196
197
210CALIB_PARAMS = {
211 'Left': {'vanishing_point': (381, 214), 'near_y': 406, 'far_y': 274,
212 'left_x': 0, 'right_x': 640, 'n_spots': 6, 'n_rows': 1, 'fisheye_distortion': 0.0,
213 'perspective_strength': 0.16, 'min_roi_height': 63},
214 'Right': {'vanishing_point': (277, 223), 'near_y': 451, 'far_y': 337,
215 'left_x': 0, 'right_x': 640, 'n_spots': 5, 'n_rows': 1, 'fisheye_distortion': 0.03,
216 'perspective_strength': 0.32, 'min_roi_height': 70},
217}
218
219for cam, spots in ROIS.items():
220 SPOT_MASK[cam] = []
221 for spot in spots:
222 m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)
223 cv2.fillPoly(m, [np.array(spot['poly'], dtype=np.int32)], 1)
224 SPOT_MASK[cam].append(m.astype(bool))
225
227OCCUPIED_COLOR = (0, 0, 220)
228
230EMPTY_COLOR = (0, 220, 80)
231
233SPOT_ALPHA = 0.25
234
235
238store_lock = threading.Lock()
239
240
243spot_states: dict[str, list[str]] = {
244 cam: ['Empty'] * len(spots) for cam, spots in ROIS.items()
245}
246
247
249RAW_SHM_OBJS: list[SharedMemory] = []
250
252ANN_SHM_OBJS: list[SharedMemory] = []
253
255RAW_BUFS: list[np.ndarray] = []
256
258ANN_BUFS: list[np.ndarray] = []
259
261ANN_LOCKS: list = []
262
264RAW_LOCKS: list = []
265
268RAW_FRAME_ID: list = []
269
271PROCESSES: list = []
272
275_MAIN_PID = 0
276
279STOP_EVENT = Event()
280
283_last_boxes: dict[str, list] = {cam: [] for cam in CAM_ORDER}
284
287_last_ann_boxes: dict[str, list] = {cam: [] for cam in CAM_ORDER}
288
292_empty_streak: dict[str, list[int]] = {cam: [] for cam in CAM_ORDER}
293
296_manual_bounds: dict[str, list[float]] = {cam: [] for cam in CAM_ORDER}
297
300_debug_frames: dict[str, np.ndarray] = {}
301
304EMPTY_CONFIRM_FRAMES = 3
305
306
314def shm_ndarray(shm: SharedMemory, shape: tuple) -> np.ndarray:
315 return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
316
317
318app = Flask(__name__)
319CORS(app)
320
321
339@app.route('/detections', methods=['GET'])
340def get_detections():
341 with store_lock:
342 data = {
343 "timestamp": datetime.now().strftime('%H:%M:%S'),
344 'spots': {cam: list(states) for cam, states in spot_states.items()},
345 }
346 return jsonify(data)
347
348
362def check_parking_spots(cam_name: str, disp_boxes: list):
363 """
364 Spot is occupied if bounding box overlaps 10% of its ROI
365 """
366
367 masks = SPOT_MASK.get(cam_name, [])
368 min_h = MIN_BOX_H.get(cam_name, 0)
369 if not masks:
370 return
371
372 if len(_empty_streak[cam_name]) != len(masks):
373 _empty_streak[cam_name] = [0] * len(masks)
374
375 car_pixels = []
376 # occupied_pixels = np.zeros((DISP_H,DISP_W), dtype=bool)
377
378 for box in disp_boxes:
379 x1, y1, x2, y2 = box
380
381 if (y2 - y1) < min_h:
382 continue
383
384 x1, y1 = max(0, int(x1)), max(0, int(y1))
385 x2, y2 = min(DISP_W, int(x2)), min(DISP_H, int(y2))
386 if x2 > x1 and y2 > y1:
387 m = np.zeros((DISP_H, DISP_W), dtype=bool)
388 m[y1:y2, x1:x2] = True
389 car_pixels.append(m)
390
391 spot_claimed_by = {}
392 for car_mask in car_pixels:
393 best_spot = -1
394 best_overlap = 0
395 for idx, mask in enumerate(masks):
396 overlap = np.count_nonzero(car_mask & mask)
397 if overlap > best_overlap:
398 best_overlap = overlap
399 best_spot = idx
400 if best_spot >= 0 and best_overlap >= masks[best_spot].sum() * INTERSECT_ALLOWANCE:
401 if best_spot not in spot_claimed_by or best_overlap > spot_claimed_by[best_spot]:
402 spot_claimed_by[best_spot] = best_overlap
403
404 with store_lock:
405 current_states = list(spot_states.get(cam_name, ['Empty'] * len(masks)))
406 new_states = list(current_states)
407
408 for idx in range(len(masks)):
409 if idx in spot_claimed_by:
410 new_states[idx] = 'Car'
411 _empty_streak[cam_name][idx] = 0
412 else:
413 _empty_streak[cam_name][idx] += 1
414 if _empty_streak[cam_name][idx] >= EMPTY_CONFIRM_FRAMES:
415 new_states[idx] = 'Empty'
416 spot_states[cam_name] = new_states
417
418
430def annotate_frame(frame: np.ndarray, disp_boxes: list, scores: list, names: list, cam_name: str):
431 """
432 Draw annotation directly on frames, no copy less memory overhead
433 """
434 min_h = MIN_BOX_H.get(cam_name, 0)
435 for box, score, name in zip(disp_boxes, scores, names):
436 x1, y1, x2, y2 = box
437 if (y2 - y1) < min_h:
438 continue
439 x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
440
441 color = (0, 150, 150)
442 cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
443
444 label = f'{name} {score:.2f}'
445 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX, 0.5, 1)
446 cv2.rectangle(frame, (x1, y1 - th - 8), (x1 + tw + 6, y1), color, -1)
447 cv2.putText(frame, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_COMPLEX, 0.5, (15, 15, 15), 1, cv2.LINE_AA)
448
449
460def draw_spot_overlays(frame: np.ndarray, cam_name: str) -> np.ndarray:
461 """
462 Blend ROI polygons into the frame
463 """
464
465 spots = ROIS.get(cam_name, [])
466 states = spot_states.get(cam_name, ['Empty'] * len(spots))
467 overlay = frame.copy()
468
469 for spot, state in zip(spots, states):
470 pts = np.array(spot['poly'], dtype=np.int32)
471 color = OCCUPIED_COLOR if state == 'Car' else EMPTY_COLOR
472 border = (255, 80, 80) if state == 'Car' else (80, 255, 130)
473
474 cv2.fillPoly(overlay, [pts], color)
475 cv2.polylines(overlay, [pts], isClosed=True, color=border, thickness=2)
476
477 cx = int(np.mean([p[0] for p in spot['poly']]))
478 cy = int(np.mean([p[1] for p in spot['poly']]))
479 label = f"{spot['id']}: {state}"
480 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1)
481
482 cv2.rectangle(overlay,
483 (cx - tw // 2 - 3, cy - th - 6),
484 (cx + tw // 2 + 3, cy + 4),
485 (20, 20, 20), -1)
486
487 cv2.putText(overlay, label, (cx - tw // 2, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.45, border, 1, cv2.LINE_AA)
488
489 return cv2.addWeighted(overlay, SPOT_ALPHA, frame, 1 - SPOT_ALPHA, 0)
490
491
526def generate_rois(
527 cam_id: str,
528 vanishing_point: tuple,
529 near_y: int,
530 far_y: int,
531 left_x: int,
532 right_x: int,
533 n_spots: int = 5,
534 n_rows: int = 1,
535 fisheye_distortion: float = 0.0,
536 perspective_strength: float = 1.0,
537 name: str = None,
538 spot_boundaries: list = None,
539 **kwargs,
540) -> list[dict]:
541 vpx, vpy = vanishing_point
542 W, H = DISP_H, DISP_W
543 cx, cy = W / 2, H / 2
544 prefix = name or cam_id[0].upper()
545
546 def horizontal_bounds(y):
547
548 denominator = vpy - near_y
549 if denominator == 0:
550 denominator = far_y - near_y
551 if denominator == 0:
552 return left_x, right_x
553 t = (y - near_y) / denominator
554 t = max(0.0, min(t, 1.0))
555 xl_perspective = left_x + (vpx - left_x) * t
556 xr_perspective = right_x + (vpx - right_x) * t
557 xl_rect = left_x
558 xr_rect = right_x
559
560 xl = xl_rect + (xl_perspective - xl_rect) * perspective_strength
561 xr = xr_rect + (xr_perspective - xr_rect) * perspective_strength
562
563 return xl, xr
564
565 def scale_x_to_y(x_at_near, y):
566 denominator = vpy - near_y
567 if denominator == 0:
568 denominator = far_y - near_y
569 if denominator == 0:
570 return x_at_near
571 t = (y - near_y) / denominator
572 t = max(0.0, min(t, 1.0))
573 return x_at_near + (vpx - x_at_near) * t * perspective_strength
574
575 def fisheye(x, y):
576
577 if fisheye_distortion == 0.0:
578 return x, y
579 nx, ny = (x - cx) / cx, (y - cy) / cy
580 r2 = nx * nx + ny * ny
581 f = 1 + fisheye_distortion * r2
582 return cx + nx * f * cx, cy + ny * f * cy
583
584 rois = []
585 for row in range(n_rows):
586 t_near = row / n_rows
587 t_far = (row + 1) / n_rows
588 row_near_y = near_y + (far_y - near_y) * t_near
589 row_far_y = near_y + (far_y - near_y) * t_far
590
591 for s in range(n_spots):
592 if spot_boundaries and len(spot_boundaries) == n_spots + 1:
593 if isinstance(spot_boundaries[0], tuple):
594 x0_near, x0_far = spot_boundaries[s]
595 x1_near, x1_far = spot_boundaries[s + 1]
596 else:
597 x0_near = scale_x_to_y(spot_boundaries[s], row_near_y)
598 x1_near = scale_x_to_y(spot_boundaries[s + 1], row_near_y)
599 x0_far = scale_x_to_y(spot_boundaries[s], row_far_y)
600 x1_far = scale_x_to_y(spot_boundaries[s + 1], row_far_y)
601 else:
602 near_xl, near_xr = horizontal_bounds(row_near_y)
603 far_xl, far_xr = horizontal_bounds(row_far_y)
604 t0, t1 = s / n_spots, (s + 1) / n_spots
605 x0_near = near_xl + (near_xr - near_xl) * t0
606 x1_near = near_xl + (near_xr - near_xl) * t1
607 x0_far = far_xl + (far_xr - far_xl) * t0
608 x1_far = far_xl + (far_xr - far_xl) * t1
609
610 pts_raw = [
611 (x0_near, row_near_y),
612 (x1_near, row_near_y),
613 (x1_far, row_far_y),
614 (x0_far, row_far_y),
615 ]
616
617 poly = [
618 (int(round(fx)), int(round(fy)))
619 for x, y in pts_raw
620 for fx, fy in [fisheye(x, y)]
621 ]
622 rois.append({'id': f'{prefix}{row * n_spots + s + 1}', 'poly': poly})
623 return rois
624
625
649def auto_caliberate_rois(cam_name: str, frame: np.ndarray, current_params: dict, detections: list = None) -> list[dict]:
650 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
651 blur = cv2.GaussianBlur(gray, (5, 5), 0)
652 edges = cv2.Canny(blur, 30, 100)
653
654 roi_mask = np.zeros_like(edges)
655 roi_mask[frame.shape[0] // 3:, :] = 255
656 edges = cv2.bitwise_and(edges, roi_mask)
657 lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=60, minLineLength=80, maxLineGap=20)
658 left_x = current_params['left_x']
659 right_x = current_params['right_x']
660 near_y = current_params['near_y']
661 far_y_ref = current_params.get('far_y', 0)
662 if lines is None or len(lines) < 4:
663 print(f"\033[1;91mWarning: [Auto Caliberation] Not enough lines detected, keeping current ROIS\033[0m")
664 return None
665
666 car_y_exclusions = []
667 if detections:
668 for x1, y1, x2, y2 in detections:
669 car_cx = (x1 + x2) / 2
670 car_cy = (y1 + y2) / 2
671 if not (left_x <= car_cx <= right_x and far_y_ref <= car_cy <= near_y):
672 continue
673 pad = 15
674 car_y_exclusions.append((int(y1) - pad, int(y1) + pad))
675 car_y_exclusions.append((int(y2) - pad, int(y2) + pad))
676
677 def is_near_car_edge(y):
678 for low, hi in car_y_exclusions:
679 if low <= y <= hi:
680 return True
681 return False
682
683 h_lines = []
684 for l in lines:
685 x1, y1, x2, y2 = l[0]
686 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
687 if angle < 20 or angle > 160:
688 mid_y = (y1 + y2) / 2
689 if not is_near_car_edge(mid_y):
690 h_lines.append((x1, y1, x2, y2))
691
692 if len(h_lines) < 2:
693 print(f"\033[1;91mWarning: [Auto Caliberation] Not enough horizontal lines, keeping current ROIS\033[0m")
694 return None
695
696 ys = np.array([(y1 + y2) / 2 for x1, y1, x2, y2 in h_lines])
697 ys_sorted = np.sort(ys)
698
699 gaps = np.diff(ys_sorted)
700 big_gaps = np.where(gaps > 20)[0]
701
702 cluster_centers = []
703 prev = 0
704 for g in big_gaps:
705 cluster_centers.append(np.mean(ys_sorted[prev:g + 1]))
706 prev = g + 1
707 cluster_centers.append(np.mean(ys_sorted[prev:]))
708
709 if len(cluster_centers) < 2:
710 print(f"\033[1;91mWarning: [Auto Caliberation] Could not find row boundaries\033[0m")
711 return None
712
713 min_far_y = near_y - 250
714 valid_clusters = [c for c in cluster_centers if min_far_y < c < near_y]
715 if not valid_clusters:
716 print(f"\033[1;91mWarning: [Auto Caliberation] no valid far_y found\033[0m")
717 return None
718 far_y = int(min(valid_clusters, key=lambda c: near_y - c))
719
720 n_spots = current_params.get('n_spots', 5)
721 n_rows = current_params.get('n_rows', 1)
722 distortion = current_params.get('fisheye_distortion', 0.0)
723 strength = current_params.get('perspective_strength', 1.0)
724
725 MIN_ROI_HEIGHT = current_params.get('min_roi_height', 60)
726
727 if near_y - far_y < MIN_ROI_HEIGHT:
728 far_y = near_y - MIN_ROI_HEIGHT
729 print(f"\033[1;93mWarning: [Auto Caliberation] far_y too close to near_y, clamped to {far_y}\033[0m")
730
731 far_y = max(0, far_y)
732 original_far_y = CALIB_PARAMS[cam_name].get('far_y', far_y)
733 max_drift = 50
734 far_y = max(far_y, original_far_y - max_drift)
735 far_y = min(far_y, original_far_y + max_drift)
736 vanish_lines = []
737 for l in lines[:]:
738 x1, y1, x2, y2 = l[0]
739 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
740 if 10 < angle < 80 or 100 < angle < 170:
741 vanish_lines.append((x1, y1, x2, y2))
742
743 vanishing_point = current_params['vanishing_point']
744
745 if len(vanish_lines) >= 2:
746 intersections = []
747 for i in range(min(len(vanish_lines), 8)):
748 for j in range(i + 1, min(len(vanish_lines), 8)):
749 x1, y1, x2, y2 = vanish_lines[i]
750 x3, y3, x4, y4 = vanish_lines[j]
751 d = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
752 if abs(d) < 1e-6:
753 continue
754 t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / d
755 ix = x1 + t * (x2 - x1)
756 iy = y1 + t * (y2 - y1)
757 if 0 < ix < DISP_W and 0 < iy < far_y + 20:
758 intersections.append((ix, iy))
759
760 if intersections:
761 vanishing_point = (
762 int(np.median([p[0] for p in intersections])),
763 int(np.median([p[1] for p in intersections])))
764
765 CALIB_PARAMS[cam_name]['far_y'] = far_y
766 CALIB_PARAMS[cam_name]['vanishing_point'] = vanishing_point
767
768 if calib_window.open and calib_window.active_cam == cam_name:
769 cv2.setTrackbarPos('Far Y', CalibWindow.WIN, far_y)
770 cv2.setTrackbarPos('VP X', CalibWindow.WIN, vanishing_point[0])
771 cv2.setTrackbarPos('VP Y', CalibWindow.WIN, vanishing_point[1])
772
773 roi_detections = [
774 box for box in (detections or [])
775 if left_x <= (box[0] + box[2]) / 2 <= right_x
776 and far_y <= (box[1] + box[3]) / 2 <= near_y
777 ]
778 boundaries, detected_n_spots = detect_spot_boundaries(frame, near_y, far_y, left_x, right_x, n_spots,
779 detections=roi_detections, cam_name=cam_name)
780 manual = _manual_bounds.get(cam_name, [])
781 if manual:
782 boundaries = blend_boundaries(boundaries, manual, manual_weight=0.15)
783
784 return generate_rois(cam_id=cam_name, vanishing_point=vanishing_point, near_y=near_y, far_y=far_y, left_x=left_x,
785 right_x=right_x, n_spots=detected_n_spots, n_rows=n_rows, fisheye_distortion=distortion,
786 perspective_strength=strength, spot_boundaries=boundaries)
787
788
799def rebuild_spot_masks(cam_name: str, new_rois: list[dict], reset_states: bool = False):
800 global ROIS, SPOT_MASK, spot_states
801
802 new_masks = []
803 for spot in new_rois:
804 m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)
805 cv2.fillPoly(m, [np.array(spot['poly'], dtype=np.int32)], 1)
806 new_masks.append(m.astype(bool))
807 with store_lock:
808 ROIS[cam_name] = new_rois
809 SPOT_MASK[cam_name] = new_masks
810
811 if reset_states or len(new_rois) != len(spot_states.get(cam_name, [])):
812 spot_states[cam_name] = ['Empty'] * len(new_rois)
813 _empty_streak[cam_name] = [0] * len(new_rois)
814 # else:
815 # _empty_streak[cam_name] = [0] * len(new_rois)
816 print(f"\033[1;91mWarning: [Auto Caliberation] {cam_name}: {len(new_rois)} spots rebuilt\033[0m")
817
818
833 """
834 Interactive polygon editor. Lives entirely in the display thread.
835
836 Controls (active when editor_mode=True):
837 Left click - add point
838 Right click - undo last point
839 Enter - finish polygon, print to terminal
840 Backspace - clear current polygon
841 Tab - cycle active camera
842 I - toggle editor on/off
843 """
844
845 POINT_COLOR = (0, 255, 255) # cyan dots
846 LINE_COLOR = (0, 200, 255) # cyan-ish edges
847 CLOSE_COLOR = (180, 0, 255) # purple closing edge
848 FILL_COLOR = (0, 180, 255) # translucent fill
849 CURSOR_COLOR = (200, 200, 200) # crosshair
850 TEXT_COLOR = (255, 255, 255)
851 SHADOW_COLOR = (0, 0, 0)
852
853 def __init__(self):
854 self.editor_mode: bool = False
855 self.points: list[tuple[int, int]] = []
856 self.mouse_pos: tuple[int, int] = (0, 0)
857 # Which camera panel the user is editing (index into DISPLAY_ORDER)
858 self.cam_panel_idx: int = 0
859 # Running count per camera so auto-IDs don't repeat within a session
860 self._spot_counters: dict[str, int] = {c: len(ROIS.get(c, [])) for c in CAM_ORDER}
861
862 # ── property helpers ──────────────────────────────────────────────────────
863
864 @property
865 def active_cam(self) -> str:
866 return CAM_ORDER[DISPLAY_ORDER[self.cam_panel_idx]]
867
868 def _next_id(self, cam: str) -> str:
869 self._spot_counters[cam] += 1
870 prefix = cam[0].upper()
871 return f"{prefix}{self._spot_counters[cam]}"
872
873 # ── mouse callback ────────────────────────────────────────────────────────
874
875 def mouse_cb(self, event, x, y, flags, param):
876 """OpenCV mouse callback (runs in display thread)."""
877 if not self.editor_mode:
878 return
879
880 # x is in the stitched window; clamp to the active panel column
881 panel_w = DISP_W
882 # left panel = DISPLAY_ORDER[0], right panel = DISPLAY_ORDER[1]
883 # we only care about the panel that matches self.cam_panel_idx
884 col_start = self.cam_panel_idx * panel_w
885 col_end = col_start + panel_w
886
887 # Track mouse regardless of which panel (for crosshair)
888 self.mouse_pos = (x, y)
889
890 if event == cv2.EVENT_LBUTTONDOWN:
891 if col_start <= x < col_end:
892 # Convert to panel-local coordinates
893 lx = x - col_start
894 self.points.append((lx, y))
895
896 elif event == cv2.EVENT_RBUTTONDOWN:
897 if self.points:
898 self.points.pop()
899
900 # ── keyboard handling ─────────────────────────────────────────────────────
901
902 def handle_key(self, key: int) -> bool:
903 """
904 Process a keypress.
905 Returns True if the key was consumed (caller should not process further).
906 Returns False if the key should be handled by the normal display loop.
907 """
908 if key == ord('i') or key == ord('I'):
909 self.editor_mode = not self.editor_mode
910 if self.editor_mode:
911 print(f"\n[ROI Editor] ON — camera: {self.active_cam}")
912 print(" Left-click to add points | Right-click to undo")
913 print(" Enter = finish polygon | Backspace = clear")
914 print(" Tab = switch camera | I = exit editor\n")
915 else:
916 print("[ROI Editor] OFF")
917 self.points.clear()
918 return True
919
920 if not self.editor_mode:
921 return False
922
923 # Enter — finish polygon
924 if key in (13, 10): # CR or LF
925 self._finish_polygon()
926 return True
927
928 # Backspace — clear
929 if key == 8:
930 self.points.clear()
931 print("[ROI Editor] Points cleared")
932 return True
933
934 # Tab — cycle camera
935 if key == 9:
936 self.cam_panel_idx = (self.cam_panel_idx + 1) % len(DISPLAY_ORDER)
937 self.points.clear()
938 print(f"[ROI Editor] Active camera: {self.active_cam} (points cleared)")
939 return True
940
941 return False
942
943 def _finish_polygon(self):
944 if len(self.points) < 3:
945 print(f"[ROI Editor] Need at least 3 points (have {len(self.points)})")
946 return
947
948 cam = self.active_cam
949 sid = self._next_id(cam)
950 poly = list(self.points)
951
952 print(f"\n# ── ROI output ──────────────────────────────────────────")
953 print(f"# Camera : {cam} ID : {sid}")
954 print(f"{{'id': '{sid}', 'poly': {poly}}},")
955 print(f"# ─────────────────────────────────────────────────────────\n")
956
957 self.points.clear()
958
959 def draw_overlay(self, canvas: np.ndarray):
960 if not self.editor_mode:
961 return
962
963 num_panels = len(DISPLAY_ORDER)
964 panel_w = DISP_W
965 col_start = self.cam_panel_idx * panel_w
966
967 # ── translucent panel highlight ───────────────────────────────────────
968 overlay = canvas.copy()
969 cv2.rectangle(overlay, (col_start, 0), (col_start + panel_w, DISP_H),
970 (0, 60, 100), -1)
971 cv2.addWeighted(overlay, 0.15, canvas, 0.85, 0, canvas)
972
973 mx, my = self.mouse_pos
974 cv2.line(canvas, (mx, 0), (mx, DISP_H), self.CURSOR_COLOR, 1, cv2.LINE_AA)
975 cv2.line(canvas, (0, my), (canvas.shape[1], my), self.CURSOR_COLOR, 1, cv2.LINE_AA)
976
977 canvas_pts = [(col_start + px, py) for (px, py) in self.points]
978
979 if len(canvas_pts) >= 3:
980 fill_overlay = canvas.copy()
981 cv2.fillPoly(fill_overlay,
982 [np.array(canvas_pts, dtype=np.int32)],
983 self.FILL_COLOR)
984 cv2.addWeighted(fill_overlay, 0.25, canvas, 0.75, 0, canvas)
985
986 for i in range(1, len(canvas_pts)):
987 cv2.line(canvas, canvas_pts[i - 1], canvas_pts[i],
988 self.LINE_COLOR, 2, cv2.LINE_AA)
989
990 if len(canvas_pts) >= 3:
991 p0, pn = canvas_pts[0], canvas_pts[-1]
992
993 dx = p0[0] - pn[0];
994 dy = p0[1] - pn[1]
995 dist = max(1, int((dx ** 2 + dy ** 2) ** 0.5))
996 segs = max(4, dist // 10)
997 for s in range(segs):
998 if s % 2 == 0:
999 t0 = s / segs
1000 t1 = (s + 1) / segs
1001 pt0 = (int(pn[0] + dx * t0), int(pn[1] + dy * t0))
1002 pt1 = (int(pn[0] + dx * t1), int(pn[1] + dy * t1))
1003 cv2.line(canvas, pt0, pt1, self.CLOSE_COLOR, 2, cv2.LINE_AA)
1004
1005 for idx, (cx, cy) in enumerate(canvas_pts):
1006 cv2.circle(canvas, (cx, cy), 5, self.POINT_COLOR, -1, cv2.LINE_AA)
1007 cv2.circle(canvas, (cx, cy), 5, (0, 0, 0), 1, cv2.LINE_AA)
1008 lbl = str(idx)
1009 (tw, th), _ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
1010 cv2.putText(canvas, lbl, (cx + 7, cy + 4),
1011 cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.SHADOW_COLOR, 2, cv2.LINE_AA)
1012 cv2.putText(canvas, lbl, (cx + 7, cy + 4),
1013 cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.POINT_COLOR, 1, cv2.LINE_AA)
1014
1015 hud_lines = [
1016 f"ROI EDITOR | cam: {self.active_cam} | pts: {len(self.points)}",
1017 "LClick=add RClick=undo Enter=done Bksp=clear Tab=cam I=exit",
1018 ]
1019 for li, line in enumerate(hud_lines):
1020 yy = DISP_H - 12 - (len(hud_lines) - 1 - li) * 18
1021 (tw, th), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1)
1022 cv2.rectangle(canvas,
1023 (col_start + 4, yy - th - 4),
1024 (col_start + tw + 10, yy + 4),
1025 (10, 10, 10), -1)
1026 cv2.putText(canvas, line,
1027 (col_start + 7, yy),
1028 cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.TEXT_COLOR, 1, cv2.LINE_AA)
1029
1030 if col_start <= mx < col_start + panel_w:
1031 lx = mx - col_start
1032 tip = f"({lx}, {my})"
1033 (tw, th), _ = cv2.getTextSize(tip, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
1034 tx = min(mx + 10, canvas.shape[1] - tw - 6)
1035 ty = max(my - 10, th + 4)
1036 cv2.rectangle(canvas, (tx - 2, ty - th - 2), (tx + tw + 2, ty + 2),
1037 (20, 20, 20), -1)
1038 cv2.putText(canvas, tip, (tx, ty),
1039 cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.TEXT_COLOR, 1, cv2.LINE_AA)
1040
1041
1042roi_editor = ROIEditor()
1043
1044
1065def capture_worker(cam_id: int, src: int, raw_shm_name: str, raw_lock, raw_frame_id, frame_ready_event, stop_event):
1066 signal.signal(signal.SIGINT, signal.SIG_IGN)
1067 os.environ['OPENBLAS_NUM_THREADS'] = '2'
1068 os.environ['MALLOC_TRIM_THRESHOLD_'] = '100000'
1069
1070 # cap = cv2.VideoCapture(src, cv2.CAP_V4L2)
1071 for attempt in range(5):
1072 cap = cv2.VideoCapture(src)
1073 if cap.isOpened():
1074 break
1075 time.sleep(0.2)
1076
1077 if not cap.isOpened():
1078 print(f"[CAM {cam_id}] Failed to open source {src}")
1079 return
1080
1081 is_live = isinstance(src, int)
1082
1083 if is_live:
1084 cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'YUYV'))
1085 cap.set(cv2.CAP_PROP_FPS, CAPTURE_FPS)
1086 cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # only keep most recent frame
1087
1088 cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAP_W)
1089 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAP_H)
1090
1091 actual_fps = cap.get(cv2.CAP_PROP_FPS)
1092 if actual_fps <= 0:
1093 actual_fps = CAPTURE_FPS
1094
1095 frame_delay = 1.0 / actual_fps
1096 print(f"[cam {cam_id}] Video FPS detected: {actual_fps}, frame_delay: {frame_delay:.4f}s")
1097
1098 print(f'[cam {cam_id}] {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x'
1099 f'{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} '
1100 f'@ {cap.get(cv2.CAP_PROP_FPS):.0f} FPS')
1101
1102 shm = SharedMemory(name=raw_shm_name)
1103 buf = shm_ndarray(shm, CAP_SHAPE)
1104
1105 local_id = 0
1106 last_frame_time = time.perf_counter()
1107
1108 while not stop_event.is_set():
1109
1110 now = time.perf_counter()
1111 elapsed = now - last_frame_time
1112 sleep_time = frame_delay - elapsed
1113 if sleep_time > 0.001:
1114 time.sleep(sleep_time)
1115 last_frame_time = time.perf_counter()
1116
1117 ok, frame = cap.read()
1118 if not ok:
1119 if not is_live:
1120 cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
1121 last_frame_time = time.perf_counter()
1122 continue
1123 print(f"[cam {cam_id}] read failed")
1124 break
1125
1126 if frame.shape[:2] != (CAP_H, CAP_W):
1127 frame = cv2.resize(frame, (CAP_W, CAP_H), interpolation=cv2.INTER_NEAREST)
1128
1129 with raw_lock:
1130 np.copyto(buf, frame)
1131
1132 local_id += 1
1133 raw_frame_id.value = local_id
1134
1135 frame_ready_event.set() # Wake inference thread immediately to work on the frame
1136
1137 if is_live:
1138 drift = time.perf_counter() - last_frame_time
1139 if drift > frame_delay * 2:
1140 last_frame_time = time.perf_counter()
1141
1142 cap.release()
1143 shm.close()
1144
1145
1167def inference_loop(need_annotations: bool, frame_ready_event, stop_event):
1168 try:
1169 os.sched_setaffinity(0, {4})
1170 except Exception:
1171 pass
1172
1173 model = YOLO(MODEL_PATH, task='detect')
1174 if not MODEL_PATH.endswith('.engine'):
1175 model.fuse()
1176
1177 dummy = np.zeros((IMGSZ, IMGSZ, 3), dtype=np.uint8)
1178 # model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF,device='cuda', verbose=False, half=True)
1179 for _ in range(5):
1180 model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF, device='cuda', verbose=False, half=True)
1181 # model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF, verbose=False, half=True)
1182
1183 print('[Inference] warm-up done')
1184
1185 sx = DISP_W / IMGSZ
1186 sy = DISP_H / IMGSZ
1187
1188 last_seen = [0] * 2
1189 frame_time = 1.0 / INFERENCEFPS
1190 last_time = 0.0
1191
1192 while not stop_event.is_set():
1193 now = time.time()
1194
1195 if now - last_time < frame_time:
1196 frame_ready_event.wait(timeout=0.05)
1197 frame_ready_event.clear()
1198 continue
1199
1200 frames = []
1201 raw_full_list = []
1202 valid_indices = []
1203
1204 for i in INF_IDX:
1205 fid = RAW_FRAME_ID[i].value
1206
1207 if fid == 0 or fid == last_seen[i]:
1208 continue
1209
1210 with RAW_LOCKS[i]:
1211 raw_full = RAW_BUFS[i].copy()
1212
1213 raw_full_list.append(raw_full)
1214
1215 frames.append(cv2.resize(raw_full, (IMGSZ, IMGSZ), interpolation=cv2.INTER_NEAREST))
1216 valid_indices.append(i)
1217
1218 if not frames:
1219 time.sleep(0.005)
1220 continue
1221
1222 # padded = frames
1223 padded = list(frames)
1224 while len(padded) < MAX_BATCH:
1225 padded.append(np.zeros((IMGSZ, IMGSZ, 3), dtype=np.uint8))
1226
1227 try:
1228 results = model.predict(padded, imgsz=IMGSZ, conf=CONF, device='cuda', classes=CLASSES, verbose=False,
1229 half=True)
1230
1231 # results = model.predict(padded, imgsz=IMGSZ, conf=CONF, classes=CLASSES, verbose=False,
1232 # half=True)
1233
1234 for result, i, raw_full in zip(results, valid_indices, raw_full_list):
1235 cam_name = CAM_ORDER[i]
1236
1237 disp_boxes = [
1238 [x1 * sx, y1 * sy, x2 * sx, y2 * sy]
1239 for x1, y1, x2, y2 in result.boxes.xyxy.tolist()
1240 ]
1241
1242 if cam_name in SPOT_CAMS:
1243 check_parking_spots(cam_name, disp_boxes)
1244 with store_lock:
1245 _last_boxes[cam_name] = list(disp_boxes)
1246
1247 scores = result.boxes.conf.tolist()
1248 names = [model.names[int(c)] for c in result.boxes.cls]
1249 with store_lock:
1250 _last_ann_boxes[cam_name] = list(zip(disp_boxes, scores, names))
1251
1252 if need_annotations:
1253
1254 disp = raw_full.copy()
1255 annotate_frame(disp, disp_boxes, scores, names, cam_name)
1256
1257 if cam_name in SPOT_CAMS:
1258 disp = draw_spot_overlays(disp, cam_name)
1259
1260 with ANN_LOCKS[i]:
1261 np.copyto(ANN_BUFS[i], disp)
1262
1263 last_seen[i] = RAW_FRAME_ID[i].value
1264
1265 except Exception:
1266 traceback.print_exc()
1267 time.sleep(0.1)
1268
1269 last_time = now
1270
1271
1290def display_loop(stop_event):
1291 """
1292 Displays what the cams see.
1293 Press I to toggle the interactive ROI editor.
1294 """
1295
1296 try:
1297 os.sched_setaffinity(0, {5})
1298 except Exception:
1299 pass
1300
1301 win = 'Parking Finder'
1302 cv2.namedWindow(win, cv2.WINDOW_NORMAL)
1303 cv2.resizeWindow(win, DISP_W * len(DISPLAY_ORDER), DISP_H)
1304 cv2.setMouseCallback(win, roi_editor.mouse_cb)
1305
1306 frame_time = 1.0 / 30
1307 last_time = 0.0
1308
1309 while not stop_event.is_set():
1310 now = time.time()
1311 if now - last_time < frame_time:
1312 time.sleep(0.005)
1313 continue
1314
1315 panels = []
1316 for i in DISPLAY_ORDER:
1317 cam_name = CAM_ORDER[i]
1318 with RAW_LOCKS[i]:
1319 panel = RAW_BUFS[i].copy()
1320
1321 if panel.shape[:2] != (DISP_H, DISP_W):
1322 panel = cv2.resize(panel, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1323
1324 if cam_name in SPOT_CAMS:
1325 panel = draw_spot_overlays(panel, cam_name)
1326
1327 with store_lock:
1328 ann_boxes = list(_last_ann_boxes.get(cam_name, []))
1329
1330 min_h = MIN_BOX_H.get(cam_name, 0)
1331 for box, score, name in ann_boxes:
1332 x1, y1, x2, y2 = box
1333 if (y2 - y1) < min_h:
1334 continue
1335 x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
1336 color = (0, 150, 150)
1337 cv2.rectangle(panel, (x1, y1), (x2, y2), color, 2)
1338 label = f'{name} {score:.2f}'
1339 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX, 0.5, 1)
1340 cv2.rectangle(panel, (x1, y1 - th - 8), (x1 + tw + 6, y1), color, -1)
1341 cv2.putText(panel, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_COMPLEX, 0.5, (15, 15, 15), 1, cv2.LINE_AA)
1342
1343 cv2.putText(panel, cam_name, (10, 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 230, 200), 2, cv2.LINE_AA)
1344 if cam_name in SPOT_CAMS:
1345 with store_lock:
1346 states = spot_states.get(cam_name, [])
1347 occ = sum(1 for s in states if s == 'Car')
1348 cv2.putText(panel, f'{occ}/{len(states)} occupied', (10, 56), cv2.FONT_HERSHEY_SIMPLEX, 0.6,
1349 (0, 230, 200), 1, cv2.LINE_AA)
1350
1351 panels.append(panel)
1352
1353 # Stitch panels side by side
1354 canvas = np.hstack(panels)
1355
1356 roi_editor.draw_overlay(canvas)
1357
1358 cv2.imshow(win, canvas)
1359
1360 if os.environ.get('CALIB_DEBUG') and _debug_frames:
1361 debug_panels = []
1362 for cam in CAM_ORDER:
1363 p = _debug_frames.get(cam)
1364 if p is None:
1365 p = np.zeros((DISP_H, DISP_W, 3), dtype=np.uint8)
1366 if p.shape[:2] != (DISP_H, DISP_W):
1367 p = cv2.resize(p, (DISP_W, DISP_H))
1368 cv2.putText(p, f'DEBUG: {cam}', (10, 25),
1369 cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
1370 debug_panels.append(p)
1371 cv2.imshow('boundary_debug', np.hstack(debug_panels))
1372
1373 key = cv2.waitKey(1) & 0xFF
1374
1375 if roi_editor.handle_key(key):
1376 continue
1377
1378 if key == ord('u') or key == ord('U'):
1379 for active_idx in range(len(SOURCES)):
1380 cam_name = CAM_ORDER[active_idx]
1381 with RAW_LOCKS[active_idx]:
1382 snap = RAW_BUFS[active_idx].copy()
1383 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1384 with store_lock:
1385 boxes = _last_boxes.get(cam_name, [])
1386 if len(boxes) > 2:
1387 print(f"\033[1;93mWarning: [Auto Caliberation] {cam_name} has {len(boxes)} detections")
1388 new_rois = auto_caliberate_rois(cam_name, snap_disp, CALIB_PARAMS[cam_name], boxes)
1389 if new_rois:
1390 rebuild_spot_masks(cam_name, new_rois)
1391 continue
1392
1393 if key == ord('c') or key == ord('C'):
1394 calib_window.toggle(roi_editor.active_cam)
1395 continue
1396 if key == ord('q'):
1397 stop_event.set()
1398 break
1399
1400 calib_window.tick()
1401 last_time = now
1402
1403 calib_window.close()
1404 cv2.destroyAllWindows()
1405
1406# @brief Blend detected spot boundaries with manually set ones from the CalibWindow.
1407#
1408# Performs a weighted average between auto-detected and manually specified boundary
1409# X positions. The manual weight is kept intentionally small (default 0.15) so the
1410# auto-detection result dominates while the manual input provides a gentle stabilising
1411# nudge. The perspective angle offset (far_x - near_x) of the detected boundary is
1412# preserved through the blend.
1413#
1414# Returns detected unchanged if manual is empty or the lists differ in length.
1415#
1416# @param detected List of detected boundary positions. Each entry is either
1417# a float (uniform) or a (near_x, far_x) tuple.
1418# @param manual List of manually specified boundary X positions (floats).
1419# @param manual_weight Blend weight for the manual values (0.0–1.0, default 0.15).
1420# @return Blended boundary list as (near_x, far_x) tuples.
1421def blend_boundaries(detected: list[float], manual: list[float], manual_weight: float = 0.15) -> list[float]:
1422 if not manual or len(detected) != len(manual):
1423 return detected
1424 result = []
1425 for d, m in zip(detected, manual):
1426 dn = d[0] if isinstance(d, tuple) else d
1427 df = d[1] if isinstance(d, tuple) else d
1428 mn = m[0] if isinstance(m, tuple) else m
1429
1430 angle_offset = df - dn
1431 blended_near = dn * (1 - manual_weight) + mn * manual_weight
1432 result.append((blended_near, blended_near + angle_offset))
1433 return result
1434
1435
1450 """
1451 Separate OpenCV window with trackbars for live CALIB_PARAMS tuning.
1452 Opens when C is pressed, closes when C is pressed again.
1453 Automatically regenerates ROIs on any slider change.
1454 """
1455 WIN = 'Calibration'
1456
1457 def __init__(self):
1458 self.open = False
1459 self.active_cam = 'Right' # tracks which cam's params are shown
1460 self._last_vals = {}
1461 self._pending_update = False
1462 self.suppress_auto = False
1463
1464 def _cam_idx(self):
1465 return CAM_ORDER.index(self.active_cam)
1466
1467 def toggle(self, cam_name: str):
1468 self.active_cam = cam_name
1469 if self.open:
1470 self.close()
1471 else:
1472 self._build()
1473
1474 def _build(self):
1475 p = CALIB_PARAMS[self.active_cam]
1476 cv2.namedWindow(self.WIN, cv2.WINDOW_NORMAL)
1477 cv2.resizeWindow(self.WIN, 500, 280)
1478
1479 # Each trackbar: (name, min, max, initial)
1480 # Multiply floats by 100 to use int trackbars
1481 cv2.createTrackbar('VP X', self.WIN, p['vanishing_point'][0], DISP_W, lambda v: self._on_change())
1482 cv2.createTrackbar('VP Y', self.WIN, p['vanishing_point'][1], DISP_H, lambda v: self._on_change())
1483 cv2.createTrackbar('Near Y', self.WIN, p['near_y'], DISP_H, lambda v: self._on_change())
1484 cv2.createTrackbar('Far Y', self.WIN, p['far_y'], DISP_H, lambda v: self._on_change())
1485 cv2.createTrackbar('Left X', self.WIN, p['left_x'], DISP_W, lambda v: self._on_change())
1486 cv2.createTrackbar('Right X', self.WIN, p['right_x'], DISP_W, lambda v: self._on_change())
1487 cv2.createTrackbar('Spots', self.WIN, p['n_spots'], 20, lambda v: self._on_change())
1488 cv2.createTrackbar('Rows', self.WIN, p['n_rows'], 6, lambda v: self._on_change())
1489 # fisheye_distortion: range -0.30 to +0.30, stored as int -30..30 (divide by 100)
1490 fisheye_int = int(p['fisheye_distortion'] * 100) + 30 # shift so 0 maps to 30
1491 cv2.createTrackbar('Fisheye x100', self.WIN, fisheye_int, 60, lambda v: self._on_change())
1492 strength_int = int(p.get('perspective_strength', 1.0) * 100)
1493 cv2.createTrackbar('Persp x100', self.WIN, strength_int, 100, lambda v: self._on_change())
1494 cv2.createTrackbar('Min Height', self.WIN, p.get('min_roi_height', 60), DISP_H, lambda v: self._on_change())
1495 self.suppress_auto = True
1496 self.open = True
1497 print(f"[Calib] Window open for {self.active_cam}. Drag sliders to tune, C to close.")
1498
1499 def _read(self) -> dict:
1500 fisheye_int = cv2.getTrackbarPos('Fisheye x100', self.WIN)
1501 return {
1502 'vanishing_point': (
1503 cv2.getTrackbarPos('VP X', self.WIN),
1504 cv2.getTrackbarPos('VP Y', self.WIN),
1505 ),
1506 'near_y': cv2.getTrackbarPos('Near Y', self.WIN),
1507 'far_y': cv2.getTrackbarPos('Far Y', self.WIN),
1508 'left_x': cv2.getTrackbarPos('Left X', self.WIN),
1509 'right_x': cv2.getTrackbarPos('Right X', self.WIN),
1510 'n_spots': max(1, cv2.getTrackbarPos('Spots', self.WIN)),
1511 'n_rows': max(1, cv2.getTrackbarPos('Rows', self.WIN)),
1512 'fisheye_distortion': (fisheye_int - 30) / 100.0,
1513 'perspective_strength': cv2.getTrackbarPos('Persp x100', self.WIN) / 100.0,
1514 'min_roi_height': max(20, cv2.getTrackbarPos('Min Height', self.WIN)),
1515 }
1516
1517 def _on_change(self):
1518 if self.open:
1519 self._pending_update = True
1520
1521 def tick(self):
1522 """Call once per display loop iteration to keep the window alive."""
1523 if not self.open:
1524 return
1525
1526 if not self._pending_update:
1527 return
1528 self._pending_update = False
1529
1530 try:
1531 params = self._read()
1532 near_y = params['near_y']
1533 far_y = params['far_y']
1534 vpy = params['vanishing_point'][1]
1535
1536 if near_y == far_y or vpy >= near_y:
1537 return
1538
1539 CALIB_PARAMS[self.active_cam] = params
1540
1541 cam_idx = CAM_ORDER.index(self.active_cam)
1542 with RAW_LOCKS[cam_idx]:
1543 snap = RAW_BUFS[cam_idx].copy()
1544 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1545 with store_lock:
1546 boxes = _last_boxes.get(self.active_cam, [])
1547 boundaries, detected_n = detect_spot_boundaries(
1548 snap_disp, near_y, far_y,
1549 params['left_x'], params['right_x'], params['n_spots'],
1550 detections=boxes, cam_name=self.active_cam
1551 )
1552 roi_params = {k: v for k, v in params.items() if k not in ('min_roi_height',)}
1553 new_rois = generate_rois(
1554 cam_id=self.active_cam,
1555 n_spots=detected_n,
1556 spot_boundaries=boundaries,
1557 **{k: v for k, v in roi_params.items() if k != 'n_spots'},
1558 )
1559 if new_rois:
1560 rebuild_spot_masks(self.active_cam, new_rois)
1561 n = params['n_spots']
1562 manual = [params['left_x'] + (params['right_x'] - params['left_x']) * i / n for i in range(n + 1)]
1563 _manual_bounds[self.active_cam] = manual
1564 except Exception as e:
1565 print(f"[Calib] tick error: {e}")
1566
1567 def close(self):
1568 if self.open:
1569 try:
1570 cv2.destroyWindow(self.WIN)
1571 except Exception:
1572 pass
1573 self.open = False
1574 self.suppress_auto = False
1575 print(f"[Calib] Window closed. Final params for {self.active_cam}:")
1576 print(f" CALIB_PARAMS['{self.active_cam}'] = {CALIB_PARAMS[self.active_cam]}")
1577
1578
1579calib_window = CalibWindow()
1580
1581
1595def record_loop(stop_event, use_anns=False):
1596 """
1597 Writes raw cam frames to the output file (Can be edited to write annotated frames)
1598 """
1599 try:
1600 os.sched_setaffinity(0, {5})
1601 except Exception:
1602 pass
1603
1604 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
1605 os.makedirs('recordings', exist_ok=True)
1606 fourcc = cv2.VideoWriter_fourcc(*'mp4v')
1607 writers = []
1608 suffix = 'Annotated' if use_anns else 'Raw'
1609 for cam_name in CAM_ORDER:
1610 path = f"recordings/{timestamp}_{cam_name}.mp4"
1611 writers.append(cv2.VideoWriter(path, fourcc, CAPTURE_FPS, (CAP_W, CAP_H)))
1612 print(f"[Record] {cam_name} -> {path}")
1613
1614 frame_time = 1.0 / CAPTURE_FPS
1615 last_time = 0.0
1616
1617 if use_anns:
1618 print("\033[1;93mWarning: Recording annotated version\033[0m")
1619 while not stop_event.is_set():
1620 now = time.time()
1621 if now - last_time < frame_time:
1622 time.sleep(0.003)
1623 continue
1624
1625 for i, writer in enumerate(writers):
1626 # change here to ann versions
1627 if use_anns:
1628
1629 with ANN_LOCKS[i]:
1630 writer.write(ANN_BUFS[i])
1631 else:
1632 with RAW_LOCKS[i]:
1633 writer.write(RAW_BUFS[i])
1634 last_time = now
1635
1636 for w in writers:
1637 w.release()
1638 print('[record] files saved')
1639
1640
1653def auto_calibrate_loop(stop_event):
1654 try:
1655 os.sched_setaffinity(0, {3})
1656 except Exception:
1657 pass
1658
1659 print('[Auto Calibrate] Background Calibration thread started')
1660 while not stop_event.is_set():
1661 time.sleep(AUTO_CALIBRATE_INTERVAL)
1662
1663 if stop_event.is_set():
1664 break
1665
1666 for idx, cam_name in enumerate(CAM_ORDER):
1667 if cam_name not in SPOT_CAMS:
1668 continue
1669
1670 if calib_window.suppress_auto and calib_window.active_cam == cam_name:
1671 continue
1672 try:
1673 with RAW_LOCKS[idx]:
1674 snap = RAW_BUFS[idx].copy()
1675
1676 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1677
1678 with store_lock:
1679 boxes = _last_boxes.get(cam_name, [])
1680
1681 if len(boxes) > 3:
1682 print(f'[Auto Calibrate] {cam_name}: skipping --- too many detections ({len(boxes)})')
1683 continue
1684
1685 new_rois = auto_caliberate_rois(cam_name, snap_disp, CALIB_PARAMS[cam_name], boxes)
1686
1687 if new_rois:
1688 rebuild_spot_masks(cam_name, new_rois)
1689 print(f'[Auto Calibrate] {cam_name}: updated')
1690
1691 except Exception as e:
1692 print(f'[Auto Calibrate] {cam_name} error: {e}')
1693
1694
1705def merge_narrow_spots(boundaries: list[float], min_width: float = 80.0) -> list[float]:
1706 if len(boundaries) < 2:
1707 return boundaries
1708
1709 merged = [boundaries[0]]
1710 for i in range(1, len(boundaries)):
1711 width = boundaries[i] - merged[-1]
1712 if width < min_width and len(merged) > 1:
1713 continue
1714 merged.append(boundaries[i])
1715
1716 if merged[-1] != boundaries[-1]:
1717 merged[-1] = boundaries[-1]
1718
1719 return merged
1720
1721
1748def detect_spot_boundaries(frame: np.ndarray, near_y: int, far_y: int, left_x: int, right_x: int, n_spots: int,
1749 detections: list = None, cam_name: str = None) -> tuple[list[float], int]:
1750 uniform = [left_x + (right_x - left_x) * i / n_spots for i in range(n_spots + 1)]
1751 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
1752 _, white_mask = cv2.threshold(gray, 160, 255, cv2.THRESH_BINARY)
1753 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
1754 white_mask = cv2.morphologyEx(white_mask, cv2.MORPH_CLOSE, kernel)
1755 # blur = cv2.GaussianBlur(gray, (3,3), 0)
1756 edges = cv2.Canny(white_mask, 50, 150)
1757
1758 roi_mask = np.zeros_like(edges)
1759 band_top = max(0, far_y - 10)
1760 roi_mask[band_top:near_y + 10, left_x:right_x] = 255
1761 edges = cv2.bitwise_and(edges, roi_mask)
1762
1763 lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=20, minLineLength=25, maxLineGap=30)
1764 divider_xs = []
1765 for l in lines:
1766 x1, y1, x2, y2 = l[0]
1767 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
1768 if 45 < angle < 135:
1769
1770 if y2 != y1:
1771 t = (near_y - y1) / (y2 - y1)
1772 x_at_near = x1 + t * (x2 - x1)
1773 t_far = (far_y - y1) / (y2 - y1)
1774 x_far = x1 + t_far * (x2 - x1)
1775 else:
1776 x_at_near = x_far = (x1 + x2) / 2
1777 if left_x <= x_at_near <= right_x:
1778 divider_xs.append((x_at_near, x_far))
1779 if os.environ.get('CALIB_DEBUG'):
1780 dbg = frame.copy()
1781 if lines is not None:
1782 for l in lines:
1783 x1, y1, x2, y2 = l[0]
1784 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
1785 if 45 < angle < 135:
1786 cv2.line(dbg, (x1, y1), (x2, y2), (255, 0, 0), 1)
1787 for xn, xf in divider_xs:
1788 cv2.line(dbg, (int(xn), near_y), (int(xf), far_y), (0, 255, 0), 2)
1789 cv2.line(dbg, (left_x, near_y), (right_x, near_y), (0, 200, 255), 1)
1790 cv2.line(dbg, (left_x, far_y), (right_x, far_y), (0, 200, 255), 1)
1791 if cam_name:
1792 _debug_frames[cam_name] = dbg
1793
1794 if lines is None or len(divider_xs) < 2:
1795 return _try_from_cars(detections, near_y, left_x, right_x, n_spots, uniform)
1796
1797 divider_xs.sort(key=lambda d: d[0])
1798 clustered = []
1799 group = [divider_xs[0]]
1800 for x in divider_xs[1:]:
1801 if x[0] - group[0][0] < 20:
1802 group.append(x)
1803 else:
1804 clustered.append((np.mean([g[0] for g in group]), np.mean([g[1] for g in group])))
1805 group = [x]
1806 clustered.append((np.mean([g[0] for g in group]), np.mean([g[1] for g in group])))
1807
1808 xs_near = [left_x] + [c[0] for c in clustered] + [right_x]
1809 xs_far = [left_x] + [c[1] for c in clustered] + [right_x]
1810 all_bounds = sorted(zip(xs_near, xs_far), key=lambda p: p[0])
1811 xs_near = [p[0] for p in all_bounds]
1812 xs_far = [p[1] for p in all_bounds]
1813
1814 paired = list(zip(xs_near, xs_far))
1815 merged_pairs = [paired[0]]
1816 for i in range(1, len(paired)):
1817 width = paired[i][0] - merged_pairs[-1][0]
1818 if width >= 80.0 or len(merged_pairs) == 1:
1819 merged_pairs.append(paired[i])
1820 else:
1821 merged_pairs[-1] = paired[i]
1822 xs_far = [p[1] for p in merged_pairs]
1823 xs_near = [p[0] for p in merged_pairs]
1824 if len(xs_near) == n_spots + 1:
1825 return list(zip(xs_near, xs_far)), len(xs_near) - 1
1826
1827 if len(xs_near) > n_spots + 1:
1828 expected = (right_x - left_x) / n_spots
1829 best_near, best_far = [left_x], [left_x]
1830 interior = list(zip(xs_near[1:-1], xs_far[1:-1]))
1831 for i in range(1, n_spots):
1832 target = left_x + expected * i
1833 closest = min(interior, key=lambda p: abs(p[0] - target))
1834 best_near.append(closest[0])
1835 best_far.append(closest[1])
1836 best_near.append(right_x)
1837 best_far.append(right_x)
1838 return list(zip(best_near, best_far)), len(best_near) - 1
1839
1840 return _try_from_cars(detections, near_y, left_x, right_x, n_spots, uniform)
1841
1842
1857def _try_from_cars(detections, near_y, left_x, right_x, n_spots, fallback):
1858 if not detections:
1859 fallback = merge_narrow_spots(fallback)
1860 return fallback, len(fallback) - 1
1861
1862 car_centers = []
1863 for x1, y1, x2, y2 in detections:
1864 if abs(y2 - near_y) < 80:
1865 car_centers.append((x1 + x2) / 2)
1866
1867 car_centers = sorted(c for c in car_centers if left_x < c < right_x)
1868
1869 if len(car_centers) < 2:
1870 fallback = merge_narrow_spots(fallback)
1871 return fallback, len(fallback) - 1
1872
1873 boundaries = [left_x]
1874 for i in range(len(car_centers) - 1):
1875 boundaries.append((car_centers[i] + car_centers[i + 1]) / 2)
1876 boundaries.append(right_x)
1877
1878 if len(boundaries) == n_spots + 1:
1879 boundaries = merge_narrow_spots(boundaries)
1880 return boundaries, len(boundaries) - 1
1881 fallback = merge_narrow_spots(fallback)
1882 return [(x, x) for x in fallback], len(fallback) - 1
1883
1884
1892@atexit.register
1893def shutdown():
1894 if os.getpid() != _MAIN_PID:
1895 return
1896
1897 STOP_EVENT.set()
1898 print("Shutting down...")
1899
1900 for p in PROCESSES:
1901 try:
1902 p.join(timeout=3)
1903 except Exception:
1904 pass
1905
1906 for shm in RAW_SHM_OBJS + ANN_SHM_OBJS:
1907 try:
1908 shm.close()
1909 shm.unlink()
1910 except Exception:
1911 pass
1912 set_jetson_clocks(False)
1913 print('Done')
1914
1915
1916if __name__ == "__main__":
1917 args = parse_args()
1918 _MAIN_PID = os.getpid()
1919 frame_ready_event = Event()
1920 set_jetson_clocks(True)
1921 if args.annotate and not args.record:
1922 print("\033[1;91mWarning: -a/-A/--annotate has no effect without -r/-R/--record\033[0m")
1923 need_annotation = args.test or args.record
1924 os.system('v4l2-ctl --list-devices > ./utils/camInfo.txt')
1925 with open('./utils/camInfo.txt') as f:
1926 usb_1 = 2.1
1927 usb_2 = 2.2
1928 while True:
1929 line = f.readline()
1930 if not line:
1931 break
1932
1933 if 'Arducam USB Camera' in line:
1934 port = float(re.search(r'\d+\.\d+', line).group())
1935 line = f.readline()
1936 idx = int(re.search(r'\d+', line).group())
1937 if port == usb_1:
1938 SOURCES[0] = idx
1939 elif port == usb_2:
1940 SOURCES[1] = idx
1941
1942 if len(set(SOURCES)) != len(SOURCES):
1943 raise RuntimeError(f"Camera detection failed - possible duplicate sources: {SOURCES}")
1944
1945 for src in SOURCES:
1946 probe = cv2.VideoCapture(src)
1947 ok, _ = probe.read()
1948 probe.release()
1949 if not ok:
1950 raise RuntimeError(f'Cannot read from camera source {src}')
1951
1952 for _ in range(len(SOURCES)):
1953 raw = SharedMemory(create=True, size=CAP_H * CAP_W * 3)
1954 ann = SharedMemory(create=True, size=DISP_H * DISP_W * 3)
1955 RAW_SHM_OBJS.append(raw)
1956 ANN_SHM_OBJS.append(ann)
1957 RAW_BUFS.append(shm_ndarray(raw, CAP_SHAPE))
1958 ANN_BUFS.append(shm_ndarray(ann, DISP_SHAPE))
1959
1960 # initialize ann buffs to 0 so we dont have garbage pixels in display
1961 for buf in ANN_BUFS:
1962 buf[:] = 0
1963
1964 RAW_LOCKS = [Lock() for _ in range(len(SOURCES))]
1965 ANN_LOCKS = [Lock() for _ in range(len(SOURCES))]
1966 RAW_FRAME_ID = [Value('i', 0) for _ in range(len(SOURCES))]
1967
1968 for i, src in enumerate(SOURCES):
1969 p = Process(
1970 target=capture_worker,
1971 args=(i, src, RAW_SHM_OBJS[i].name, RAW_LOCKS[i], RAW_FRAME_ID[i], frame_ready_event, STOP_EVENT),
1972 daemon=False,
1973 )
1974 p.start()
1975 try:
1976 os.sched_setaffinity(p.pid, {i + 1})
1977 except Exception:
1978 pass
1979 PROCESSES.append(p)
1980
1981 print("Waiting for cameras...")
1982 deadline = time.time() + 15
1983 for i in range(len(SOURCES)):
1984 while RAW_FRAME_ID[i].value == 0:
1985 if not PROCESSES[i].is_alive():
1986 raise RuntimeError(
1987 f"Camera {i} ({CAM_ORDER[i]}) process died before sending a frame."
1988 )
1989 if time.time() > deadline:
1990 raise RuntimeError(
1991 f"Camera {i} ({CAM_ORDER[i]}) timed out waiting for the first frame"
1992 )
1993 time.sleep(0.1)
1994 print(f'Camera {i} ({CAM_ORDER[i]}) ready')
1995 auto_calibrate_t = Thread(
1996 target=auto_calibrate_loop,
1997 args=(STOP_EVENT,),
1998 daemon=True,
1999 )
2000 auto_calibrate_t.start()
2001 inf_t = Thread(
2002 target=inference_loop,
2003 args=(need_annotation, frame_ready_event, STOP_EVENT),
2004 daemon=True
2005 )
2006 inf_t.start()
2007 Thread(
2008 target=lambda: app.run(host='localhost', port=5001, debug=False, use_reloader=False, threaded=True),
2009 daemon=True,
2010 ).start()
2011 print('Flask Api -> http:/localhost/detections')
2012
2013 if args.record:
2014 rec_t = Thread(target=record_loop, args=(STOP_EVENT, args.annotate), daemon=True)
2015 rec_t.start()
2016
2017 if args.test:
2018 display_loop(STOP_EVENT)
2019 else:
2020 print("Headless mode. Ctrl + C to stop")
2021 try:
2022 while not STOP_EVENT.is_set():
2023 time.sleep(1)
2024 except KeyboardInterrupt:
2025 STOP_EVENT.set()
Live calibration window with OpenCV trackbars for tuning CALIB_PARAMS.
Interactive polygon editor for defining parking spot ROIs at runtime.
bool handle_key(self, int key)
mouse_cb(self, event, x, y, flags, param)
str _next_id(self, str cam)