48from ultralytics
import YOLO
49from flask
import Flask, jsonify
50from flask_cors
import CORS
51from multiprocessing
import Process, Lock, Event, Value
52from multiprocessing.shared_memory
import SharedMemory
55from threading
import Thread
60from datetime
import datetime
66os.environ[
'CUDA_LAUNCH_BLOCKING'] =
'0'
67os.environ[
'OPENCV_VIDEOIO_PRIORITY_GSTREAMER'] =
'0'
68torch.backends.cudnn.benchmark =
True
69torch.backends.cudnn.enabled =
True
70torch.set_num_threads(1)
81def set_jetson_clocks(enable: bool):
84 subprocess.run([
'sudo',
'jetson_clocks',
'--store', JETSON_CLOCKS_CONF], check=
True)
85 subprocess.run([
'sudo',
'jetson_clocks'], check=
True)
86 print(
"\033[1;91mWarning: [SYSTEM] JETSON CLOCKS LOCKED\033[0m")
88 subprocess.run([
'sudo',
'jetson_clocks',
'--restore', JETSON_CLOCKS_CONF], check=
True)
89 print(
"\033[1;91mWarning: [SYSTEM] JETSON CLOCKS RESTORED\033[0m")
90 except Exception
as e:
91 print(f
"\033[1;91mWarning: [SYSTEM] JETSON CLOCKS FAILED: {e}\033[0m")
102 p = argparse.ArgumentParser(description=
"Parking Finder")
103 p.add_argument(
"-t",
"-T",
"--test", action=
"store_true", help=
"Enable display")
104 p.add_argument(
"-R",
"-r",
"--record", action=
"store_true", help=
"Record")
105 p.add_argument(
"-A",
"-a",
"--annotate", action=
"store_true", help=
"Records the annotated version (requires -r)")
106 return p.parse_args()
110JETSON_CLOCKS_CONF =
'/tmp/jetson_clocks_backup.conf'
126CAP_SHAPE = (CAP_H, CAP_W, 3)
129DISP_SHAPE = (DISP_H, DISP_W, 3)
135SOURCES = [
'./recordings/left_side_cam.mp4',
'./recordings/right_side_ccam.mp4']
150MODEL_PATH =
"./models/yolo11n.engine"
159CLASSES = [2, 3, 5, 7]
162CAM_ORDER = [
'Left',
'Right']
165DISPLAY_ORDER = [0, 1]
171SPOT_CAMS = {
'Left',
'Right'}
175MIN_BOX_H = {
'Left': 30,
'Right': 30}
179INTERSECT_ALLOWANCE = 0.15
182AUTO_CALIBRATE_INTERVAL = 0
195SPOT_MASK: dict[str, list[np.ndarray]] = {}
211 'Left': {
'vanishing_point': (381, 214),
'near_y': 406,
'far_y': 274,
212 'left_x': 0,
'right_x': 640,
'n_spots': 6,
'n_rows': 1,
'fisheye_distortion': 0.0,
213 'perspective_strength': 0.16,
'min_roi_height': 63},
214 'Right': {
'vanishing_point': (277, 223),
'near_y': 451,
'far_y': 337,
215 'left_x': 0,
'right_x': 640,
'n_spots': 5,
'n_rows': 1,
'fisheye_distortion': 0.03,
216 'perspective_strength': 0.32,
'min_roi_height': 70},
219for cam, spots
in ROIS.items():
222 m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)
223 cv2.fillPoly(m, [np.array(spot[
'poly'], dtype=np.int32)], 1)
224 SPOT_MASK[cam].append(m.astype(bool))
227OCCUPIED_COLOR = (0, 0, 220)
230EMPTY_COLOR = (0, 220, 80)
238store_lock = threading.Lock()
243spot_states: dict[str, list[str]] = {
244 cam: [
'Empty'] * len(spots)
for cam, spots
in ROIS.items()
249RAW_SHM_OBJS: list[SharedMemory] = []
252ANN_SHM_OBJS: list[SharedMemory] = []
255RAW_BUFS: list[np.ndarray] = []
258ANN_BUFS: list[np.ndarray] = []
268RAW_FRAME_ID: list = []
283_last_boxes: dict[str, list] = {cam: []
for cam
in CAM_ORDER}
287_last_ann_boxes: dict[str, list] = {cam: []
for cam
in CAM_ORDER}
292_empty_streak: dict[str, list[int]] = {cam: []
for cam
in CAM_ORDER}
296_manual_bounds: dict[str, list[float]] = {cam: []
for cam
in CAM_ORDER}
300_debug_frames: dict[str, np.ndarray] = {}
304EMPTY_CONFIRM_FRAMES = 3
314def shm_ndarray(shm: SharedMemory, shape: tuple) -> np.ndarray:
315 return np.ndarray(shape, dtype=np.uint8, buffer=shm.buf)
339@app.route('/detections', methods=['GET'])
343 "timestamp": datetime.now().strftime(
'%H:%M:%S'),
344 'spots': {cam: list(states)
for cam, states
in spot_states.items()},
362def check_parking_spots(cam_name: str, disp_boxes: list):
364 Spot is occupied if bounding box overlaps 10% of its ROI
367 masks = SPOT_MASK.get(cam_name, [])
368 min_h = MIN_BOX_H.get(cam_name, 0)
372 if len(_empty_streak[cam_name]) != len(masks):
373 _empty_streak[cam_name] = [0] * len(masks)
378 for box
in disp_boxes:
381 if (y2 - y1) < min_h:
384 x1, y1 = max(0, int(x1)), max(0, int(y1))
385 x2, y2 = min(DISP_W, int(x2)), min(DISP_H, int(y2))
386 if x2 > x1
and y2 > y1:
387 m = np.zeros((DISP_H, DISP_W), dtype=bool)
388 m[y1:y2, x1:x2] =
True
392 for car_mask
in car_pixels:
395 for idx, mask
in enumerate(masks):
396 overlap = np.count_nonzero(car_mask & mask)
397 if overlap > best_overlap:
398 best_overlap = overlap
400 if best_spot >= 0
and best_overlap >= masks[best_spot].sum() * INTERSECT_ALLOWANCE:
401 if best_spot
not in spot_claimed_by
or best_overlap > spot_claimed_by[best_spot]:
402 spot_claimed_by[best_spot] = best_overlap
405 current_states = list(spot_states.get(cam_name, [
'Empty'] * len(masks)))
406 new_states = list(current_states)
408 for idx
in range(len(masks)):
409 if idx
in spot_claimed_by:
410 new_states[idx] =
'Car'
411 _empty_streak[cam_name][idx] = 0
413 _empty_streak[cam_name][idx] += 1
414 if _empty_streak[cam_name][idx] >= EMPTY_CONFIRM_FRAMES:
415 new_states[idx] =
'Empty'
416 spot_states[cam_name] = new_states
430def annotate_frame(frame: np.ndarray, disp_boxes: list, scores: list, names: list, cam_name: str):
432 Draw annotation directly on frames, no copy less memory overhead
434 min_h = MIN_BOX_H.get(cam_name, 0)
435 for box, score, name
in zip(disp_boxes, scores, names):
437 if (y2 - y1) < min_h:
439 x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
441 color = (0, 150, 150)
442 cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
444 label = f
'{name} {score:.2f}'
445 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX, 0.5, 1)
446 cv2.rectangle(frame, (x1, y1 - th - 8), (x1 + tw + 6, y1), color, -1)
447 cv2.putText(frame, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_COMPLEX, 0.5, (15, 15, 15), 1, cv2.LINE_AA)
460def draw_spot_overlays(frame: np.ndarray, cam_name: str) -> np.ndarray:
462 Blend ROI polygons into the frame
465 spots = ROIS.get(cam_name, [])
466 states = spot_states.get(cam_name, [
'Empty'] * len(spots))
467 overlay = frame.copy()
469 for spot, state
in zip(spots, states):
470 pts = np.array(spot[
'poly'], dtype=np.int32)
471 color = OCCUPIED_COLOR
if state ==
'Car' else EMPTY_COLOR
472 border = (255, 80, 80)
if state ==
'Car' else (80, 255, 130)
474 cv2.fillPoly(overlay, [pts], color)
475 cv2.polylines(overlay, [pts], isClosed=
True, color=border, thickness=2)
477 cx = int(np.mean([p[0]
for p
in spot[
'poly']]))
478 cy = int(np.mean([p[1]
for p
in spot[
'poly']]))
479 label = f
"{spot['id']}: {state}"
480 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1)
482 cv2.rectangle(overlay,
483 (cx - tw // 2 - 3, cy - th - 6),
484 (cx + tw // 2 + 3, cy + 4),
487 cv2.putText(overlay, label, (cx - tw // 2, cy), cv2.FONT_HERSHEY_SIMPLEX, 0.45, border, 1, cv2.LINE_AA)
489 return cv2.addWeighted(overlay, SPOT_ALPHA, frame, 1 - SPOT_ALPHA, 0)
528 vanishing_point: tuple,
535 fisheye_distortion: float = 0.0,
536 perspective_strength: float = 1.0,
538 spot_boundaries: list =
None,
541 vpx, vpy = vanishing_point
542 W, H = DISP_H, DISP_W
543 cx, cy = W / 2, H / 2
544 prefix = name
or cam_id[0].upper()
546 def horizontal_bounds(y):
548 denominator = vpy - near_y
550 denominator = far_y - near_y
552 return left_x, right_x
553 t = (y - near_y) / denominator
554 t = max(0.0, min(t, 1.0))
555 xl_perspective = left_x + (vpx - left_x) * t
556 xr_perspective = right_x + (vpx - right_x) * t
560 xl = xl_rect + (xl_perspective - xl_rect) * perspective_strength
561 xr = xr_rect + (xr_perspective - xr_rect) * perspective_strength
565 def scale_x_to_y(x_at_near, y):
566 denominator = vpy - near_y
568 denominator = far_y - near_y
571 t = (y - near_y) / denominator
572 t = max(0.0, min(t, 1.0))
573 return x_at_near + (vpx - x_at_near) * t * perspective_strength
577 if fisheye_distortion == 0.0:
579 nx, ny = (x - cx) / cx, (y - cy) / cy
580 r2 = nx * nx + ny * ny
581 f = 1 + fisheye_distortion * r2
582 return cx + nx * f * cx, cy + ny * f * cy
585 for row
in range(n_rows):
586 t_near = row / n_rows
587 t_far = (row + 1) / n_rows
588 row_near_y = near_y + (far_y - near_y) * t_near
589 row_far_y = near_y + (far_y - near_y) * t_far
591 for s
in range(n_spots):
592 if spot_boundaries
and len(spot_boundaries) == n_spots + 1:
593 if isinstance(spot_boundaries[0], tuple):
594 x0_near, x0_far = spot_boundaries[s]
595 x1_near, x1_far = spot_boundaries[s + 1]
597 x0_near = scale_x_to_y(spot_boundaries[s], row_near_y)
598 x1_near = scale_x_to_y(spot_boundaries[s + 1], row_near_y)
599 x0_far = scale_x_to_y(spot_boundaries[s], row_far_y)
600 x1_far = scale_x_to_y(spot_boundaries[s + 1], row_far_y)
602 near_xl, near_xr = horizontal_bounds(row_near_y)
603 far_xl, far_xr = horizontal_bounds(row_far_y)
604 t0, t1 = s / n_spots, (s + 1) / n_spots
605 x0_near = near_xl + (near_xr - near_xl) * t0
606 x1_near = near_xl + (near_xr - near_xl) * t1
607 x0_far = far_xl + (far_xr - far_xl) * t0
608 x1_far = far_xl + (far_xr - far_xl) * t1
611 (x0_near, row_near_y),
612 (x1_near, row_near_y),
618 (int(round(fx)), int(round(fy)))
620 for fx, fy
in [fisheye(x, y)]
622 rois.append({
'id': f
'{prefix}{row * n_spots + s + 1}',
'poly': poly})
649def auto_caliberate_rois(cam_name: str, frame: np.ndarray, current_params: dict, detections: list =
None) -> list[dict]:
650 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
651 blur = cv2.GaussianBlur(gray, (5, 5), 0)
652 edges = cv2.Canny(blur, 30, 100)
654 roi_mask = np.zeros_like(edges)
655 roi_mask[frame.shape[0] // 3:, :] = 255
656 edges = cv2.bitwise_and(edges, roi_mask)
657 lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=60, minLineLength=80, maxLineGap=20)
658 left_x = current_params[
'left_x']
659 right_x = current_params[
'right_x']
660 near_y = current_params[
'near_y']
661 far_y_ref = current_params.get(
'far_y', 0)
662 if lines
is None or len(lines) < 4:
663 print(f
"\033[1;91mWarning: [Auto Caliberation] Not enough lines detected, keeping current ROIS\033[0m")
666 car_y_exclusions = []
668 for x1, y1, x2, y2
in detections:
669 car_cx = (x1 + x2) / 2
670 car_cy = (y1 + y2) / 2
671 if not (left_x <= car_cx <= right_x
and far_y_ref <= car_cy <= near_y):
674 car_y_exclusions.append((int(y1) - pad, int(y1) + pad))
675 car_y_exclusions.append((int(y2) - pad, int(y2) + pad))
677 def is_near_car_edge(y):
678 for low, hi
in car_y_exclusions:
685 x1, y1, x2, y2 = l[0]
686 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
687 if angle < 20
or angle > 160:
688 mid_y = (y1 + y2) / 2
689 if not is_near_car_edge(mid_y):
690 h_lines.append((x1, y1, x2, y2))
693 print(f
"\033[1;91mWarning: [Auto Caliberation] Not enough horizontal lines, keeping current ROIS\033[0m")
696 ys = np.array([(y1 + y2) / 2
for x1, y1, x2, y2
in h_lines])
697 ys_sorted = np.sort(ys)
699 gaps = np.diff(ys_sorted)
700 big_gaps = np.where(gaps > 20)[0]
705 cluster_centers.append(np.mean(ys_sorted[prev:g + 1]))
707 cluster_centers.append(np.mean(ys_sorted[prev:]))
709 if len(cluster_centers) < 2:
710 print(f
"\033[1;91mWarning: [Auto Caliberation] Could not find row boundaries\033[0m")
713 min_far_y = near_y - 250
714 valid_clusters = [c
for c
in cluster_centers
if min_far_y < c < near_y]
715 if not valid_clusters:
716 print(f
"\033[1;91mWarning: [Auto Caliberation] no valid far_y found\033[0m")
718 far_y = int(min(valid_clusters, key=
lambda c: near_y - c))
720 n_spots = current_params.get(
'n_spots', 5)
721 n_rows = current_params.get(
'n_rows', 1)
722 distortion = current_params.get(
'fisheye_distortion', 0.0)
723 strength = current_params.get(
'perspective_strength', 1.0)
725 MIN_ROI_HEIGHT = current_params.get(
'min_roi_height', 60)
727 if near_y - far_y < MIN_ROI_HEIGHT:
728 far_y = near_y - MIN_ROI_HEIGHT
729 print(f
"\033[1;93mWarning: [Auto Caliberation] far_y too close to near_y, clamped to {far_y}\033[0m")
731 far_y = max(0, far_y)
732 original_far_y = CALIB_PARAMS[cam_name].get(
'far_y', far_y)
734 far_y = max(far_y, original_far_y - max_drift)
735 far_y = min(far_y, original_far_y + max_drift)
738 x1, y1, x2, y2 = l[0]
739 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
740 if 10 < angle < 80
or 100 < angle < 170:
741 vanish_lines.append((x1, y1, x2, y2))
743 vanishing_point = current_params[
'vanishing_point']
745 if len(vanish_lines) >= 2:
747 for i
in range(min(len(vanish_lines), 8)):
748 for j
in range(i + 1, min(len(vanish_lines), 8)):
749 x1, y1, x2, y2 = vanish_lines[i]
750 x3, y3, x4, y4 = vanish_lines[j]
751 d = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
754 t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / d
755 ix = x1 + t * (x2 - x1)
756 iy = y1 + t * (y2 - y1)
757 if 0 < ix < DISP_W
and 0 < iy < far_y + 20:
758 intersections.append((ix, iy))
762 int(np.median([p[0]
for p
in intersections])),
763 int(np.median([p[1]
for p
in intersections])))
765 CALIB_PARAMS[cam_name][
'far_y'] = far_y
766 CALIB_PARAMS[cam_name][
'vanishing_point'] = vanishing_point
768 if calib_window.open
and calib_window.active_cam == cam_name:
769 cv2.setTrackbarPos(
'Far Y', CalibWindow.WIN, far_y)
770 cv2.setTrackbarPos(
'VP X', CalibWindow.WIN, vanishing_point[0])
771 cv2.setTrackbarPos(
'VP Y', CalibWindow.WIN, vanishing_point[1])
774 box
for box
in (detections
or [])
775 if left_x <= (box[0] + box[2]) / 2 <= right_x
776 and far_y <= (box[1] + box[3]) / 2 <= near_y
778 boundaries, detected_n_spots = detect_spot_boundaries(frame, near_y, far_y, left_x, right_x, n_spots,
779 detections=roi_detections, cam_name=cam_name)
780 manual = _manual_bounds.get(cam_name, [])
782 boundaries = blend_boundaries(boundaries, manual, manual_weight=0.15)
784 return generate_rois(cam_id=cam_name, vanishing_point=vanishing_point, near_y=near_y, far_y=far_y, left_x=left_x,
785 right_x=right_x, n_spots=detected_n_spots, n_rows=n_rows, fisheye_distortion=distortion,
786 perspective_strength=strength, spot_boundaries=boundaries)
799def rebuild_spot_masks(cam_name: str, new_rois: list[dict], reset_states: bool =
False):
800 global ROIS, SPOT_MASK, spot_states
803 for spot
in new_rois:
804 m = np.zeros((DISP_H, DISP_W), dtype=np.uint8)
805 cv2.fillPoly(m, [np.array(spot[
'poly'], dtype=np.int32)], 1)
806 new_masks.append(m.astype(bool))
808 ROIS[cam_name] = new_rois
809 SPOT_MASK[cam_name] = new_masks
811 if reset_states
or len(new_rois) != len(spot_states.get(cam_name, [])):
812 spot_states[cam_name] = [
'Empty'] * len(new_rois)
813 _empty_streak[cam_name] = [0] * len(new_rois)
816 print(f
"\033[1;91mWarning: [Auto Caliberation] {cam_name}: {len(new_rois)} spots rebuilt\033[0m")
834 Interactive polygon editor. Lives entirely in the display thread.
836 Controls (active when editor_mode=True):
837 Left click - add point
838 Right click - undo last point
839 Enter - finish polygon, print to terminal
840 Backspace - clear current polygon
841 Tab - cycle active camera
842 I - toggle editor on/off
845 POINT_COLOR = (0, 255, 255)
846 LINE_COLOR = (0, 200, 255)
847 CLOSE_COLOR = (180, 0, 255)
848 FILL_COLOR = (0, 180, 255)
849 CURSOR_COLOR = (200, 200, 200)
850 TEXT_COLOR = (255, 255, 255)
851 SHADOW_COLOR = (0, 0, 0)
855 self.
points: list[tuple[int, int]] = []
860 self.
_spot_counters: dict[str, int] = {c: len(ROIS.get(c, []))
for c
in CAM_ORDER}
865 def active_cam(self) -> str:
868 def _next_id(self, cam: str) -> str:
870 prefix = cam[0].upper()
871 return f
"{prefix}{self._spot_counters[cam]}"
876 """OpenCV mouse callback (runs in display thread)."""
885 col_end = col_start + panel_w
890 if event == cv2.EVENT_LBUTTONDOWN:
891 if col_start <= x < col_end:
894 self.
points.append((lx, y))
896 elif event == cv2.EVENT_RBUTTONDOWN:
905 Returns True if the key was consumed (caller should not process further).
906 Returns False if the key should be handled by the normal display loop.
908 if key == ord(
'i')
or key == ord(
'I'):
911 print(f
"\n[ROI Editor] ON — camera: {self.active_cam}")
912 print(
" Left-click to add points | Right-click to undo")
913 print(
" Enter = finish polygon | Backspace = clear")
914 print(
" Tab = switch camera | I = exit editor\n")
916 print(
"[ROI Editor] OFF")
931 print(
"[ROI Editor] Points cleared")
938 print(f
"[ROI Editor] Active camera: {self.active_cam} (points cleared)")
943 def _finish_polygon(self):
945 print(f
"[ROI Editor] Need at least 3 points (have {len(self.points)})")
952 print(f
"\n# ── ROI output ──────────────────────────────────────────")
953 print(f
"# Camera : {cam} ID : {sid}")
954 print(f
"{{'id': '{sid}', 'poly': {poly}}},")
955 print(f
"# ─────────────────────────────────────────────────────────\n")
959 def draw_overlay(self, canvas: np.ndarray):
963 num_panels = len(DISPLAY_ORDER)
968 overlay = canvas.copy()
969 cv2.rectangle(overlay, (col_start, 0), (col_start + panel_w, DISP_H),
971 cv2.addWeighted(overlay, 0.15, canvas, 0.85, 0, canvas)
974 cv2.line(canvas, (mx, 0), (mx, DISP_H), self.
CURSOR_COLOR, 1, cv2.LINE_AA)
975 cv2.line(canvas, (0, my), (canvas.shape[1], my), self.
CURSOR_COLOR, 1, cv2.LINE_AA)
977 canvas_pts = [(col_start + px, py)
for (px, py)
in self.
points]
979 if len(canvas_pts) >= 3:
980 fill_overlay = canvas.copy()
981 cv2.fillPoly(fill_overlay,
982 [np.array(canvas_pts, dtype=np.int32)],
984 cv2.addWeighted(fill_overlay, 0.25, canvas, 0.75, 0, canvas)
986 for i
in range(1, len(canvas_pts)):
987 cv2.line(canvas, canvas_pts[i - 1], canvas_pts[i],
990 if len(canvas_pts) >= 3:
991 p0, pn = canvas_pts[0], canvas_pts[-1]
995 dist = max(1, int((dx ** 2 + dy ** 2) ** 0.5))
996 segs = max(4, dist // 10)
997 for s
in range(segs):
1001 pt0 = (int(pn[0] + dx * t0), int(pn[1] + dy * t0))
1002 pt1 = (int(pn[0] + dx * t1), int(pn[1] + dy * t1))
1003 cv2.line(canvas, pt0, pt1, self.
CLOSE_COLOR, 2, cv2.LINE_AA)
1005 for idx, (cx, cy)
in enumerate(canvas_pts):
1006 cv2.circle(canvas, (cx, cy), 5, self.
POINT_COLOR, -1, cv2.LINE_AA)
1007 cv2.circle(canvas, (cx, cy), 5, (0, 0, 0), 1, cv2.LINE_AA)
1009 (tw, th), _ = cv2.getTextSize(lbl, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
1010 cv2.putText(canvas, lbl, (cx + 7, cy + 4),
1011 cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.
SHADOW_COLOR, 2, cv2.LINE_AA)
1012 cv2.putText(canvas, lbl, (cx + 7, cy + 4),
1013 cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.
POINT_COLOR, 1, cv2.LINE_AA)
1016 f
"ROI EDITOR | cam: {self.active_cam} | pts: {len(self.points)}",
1017 "LClick=add RClick=undo Enter=done Bksp=clear Tab=cam I=exit",
1019 for li, line
in enumerate(hud_lines):
1020 yy = DISP_H - 12 - (len(hud_lines) - 1 - li) * 18
1021 (tw, th), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.45, 1)
1022 cv2.rectangle(canvas,
1023 (col_start + 4, yy - th - 4),
1024 (col_start + tw + 10, yy + 4),
1026 cv2.putText(canvas, line,
1027 (col_start + 7, yy),
1028 cv2.FONT_HERSHEY_SIMPLEX, 0.45, self.
TEXT_COLOR, 1, cv2.LINE_AA)
1030 if col_start <= mx < col_start + panel_w:
1032 tip = f
"({lx}, {my})"
1033 (tw, th), _ = cv2.getTextSize(tip, cv2.FONT_HERSHEY_SIMPLEX, 0.4, 1)
1034 tx = min(mx + 10, canvas.shape[1] - tw - 6)
1035 ty = max(my - 10, th + 4)
1036 cv2.rectangle(canvas, (tx - 2, ty - th - 2), (tx + tw + 2, ty + 2),
1038 cv2.putText(canvas, tip, (tx, ty),
1039 cv2.FONT_HERSHEY_SIMPLEX, 0.4, self.
TEXT_COLOR, 1, cv2.LINE_AA)
1065def capture_worker(cam_id: int, src: int, raw_shm_name: str, raw_lock, raw_frame_id, frame_ready_event, stop_event):
1066 signal.signal(signal.SIGINT, signal.SIG_IGN)
1067 os.environ[
'OPENBLAS_NUM_THREADS'] =
'2'
1068 os.environ[
'MALLOC_TRIM_THRESHOLD_'] =
'100000'
1071 for attempt
in range(5):
1072 cap = cv2.VideoCapture(src)
1077 if not cap.isOpened():
1078 print(f
"[CAM {cam_id}] Failed to open source {src}")
1081 is_live = isinstance(src, int)
1084 cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*
'YUYV'))
1085 cap.set(cv2.CAP_PROP_FPS, CAPTURE_FPS)
1086 cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
1088 cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAP_W)
1089 cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAP_H)
1091 actual_fps = cap.get(cv2.CAP_PROP_FPS)
1093 actual_fps = CAPTURE_FPS
1095 frame_delay = 1.0 / actual_fps
1096 print(f
"[cam {cam_id}] Video FPS detected: {actual_fps}, frame_delay: {frame_delay:.4f}s")
1098 print(f
'[cam {cam_id}] {int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x'
1099 f
'{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))} '
1100 f
'@ {cap.get(cv2.CAP_PROP_FPS):.0f} FPS')
1102 shm = SharedMemory(name=raw_shm_name)
1103 buf = shm_ndarray(shm, CAP_SHAPE)
1106 last_frame_time = time.perf_counter()
1108 while not stop_event.is_set():
1110 now = time.perf_counter()
1111 elapsed = now - last_frame_time
1112 sleep_time = frame_delay - elapsed
1113 if sleep_time > 0.001:
1114 time.sleep(sleep_time)
1115 last_frame_time = time.perf_counter()
1117 ok, frame = cap.read()
1120 cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
1121 last_frame_time = time.perf_counter()
1123 print(f
"[cam {cam_id}] read failed")
1126 if frame.shape[:2] != (CAP_H, CAP_W):
1127 frame = cv2.resize(frame, (CAP_W, CAP_H), interpolation=cv2.INTER_NEAREST)
1130 np.copyto(buf, frame)
1133 raw_frame_id.value = local_id
1135 frame_ready_event.set()
1138 drift = time.perf_counter() - last_frame_time
1139 if drift > frame_delay * 2:
1140 last_frame_time = time.perf_counter()
1167def inference_loop(need_annotations: bool, frame_ready_event, stop_event):
1169 os.sched_setaffinity(0, {4})
1173 model = YOLO(MODEL_PATH, task=
'detect')
1174 if not MODEL_PATH.endswith(
'.engine'):
1177 dummy = np.zeros((IMGSZ, IMGSZ, 3), dtype=np.uint8)
1180 model.predict([dummy] * MAX_BATCH, imgsz=IMGSZ, conf=CONF, device=
'cuda', verbose=
False, half=
True)
1183 print(
'[Inference] warm-up done')
1189 frame_time = 1.0 / INFERENCEFPS
1192 while not stop_event.is_set():
1195 if now - last_time < frame_time:
1196 frame_ready_event.wait(timeout=0.05)
1197 frame_ready_event.clear()
1205 fid = RAW_FRAME_ID[i].value
1207 if fid == 0
or fid == last_seen[i]:
1211 raw_full = RAW_BUFS[i].copy()
1213 raw_full_list.append(raw_full)
1215 frames.append(cv2.resize(raw_full, (IMGSZ, IMGSZ), interpolation=cv2.INTER_NEAREST))
1216 valid_indices.append(i)
1223 padded = list(frames)
1224 while len(padded) < MAX_BATCH:
1225 padded.append(np.zeros((IMGSZ, IMGSZ, 3), dtype=np.uint8))
1228 results = model.predict(padded, imgsz=IMGSZ, conf=CONF, device=
'cuda', classes=CLASSES, verbose=
False,
1234 for result, i, raw_full
in zip(results, valid_indices, raw_full_list):
1235 cam_name = CAM_ORDER[i]
1238 [x1 * sx, y1 * sy, x2 * sx, y2 * sy]
1239 for x1, y1, x2, y2
in result.boxes.xyxy.tolist()
1242 if cam_name
in SPOT_CAMS:
1243 check_parking_spots(cam_name, disp_boxes)
1245 _last_boxes[cam_name] = list(disp_boxes)
1247 scores = result.boxes.conf.tolist()
1248 names = [model.names[int(c)]
for c
in result.boxes.cls]
1250 _last_ann_boxes[cam_name] = list(zip(disp_boxes, scores, names))
1252 if need_annotations:
1254 disp = raw_full.copy()
1255 annotate_frame(disp, disp_boxes, scores, names, cam_name)
1257 if cam_name
in SPOT_CAMS:
1258 disp = draw_spot_overlays(disp, cam_name)
1261 np.copyto(ANN_BUFS[i], disp)
1263 last_seen[i] = RAW_FRAME_ID[i].value
1266 traceback.print_exc()
1290def display_loop(stop_event):
1292 Displays what the cams see.
1293 Press I to toggle the interactive ROI editor.
1297 os.sched_setaffinity(0, {5})
1301 win =
'Parking Finder'
1302 cv2.namedWindow(win, cv2.WINDOW_NORMAL)
1303 cv2.resizeWindow(win, DISP_W * len(DISPLAY_ORDER), DISP_H)
1304 cv2.setMouseCallback(win, roi_editor.mouse_cb)
1306 frame_time = 1.0 / 30
1309 while not stop_event.is_set():
1311 if now - last_time < frame_time:
1316 for i
in DISPLAY_ORDER:
1317 cam_name = CAM_ORDER[i]
1319 panel = RAW_BUFS[i].copy()
1321 if panel.shape[:2] != (DISP_H, DISP_W):
1322 panel = cv2.resize(panel, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1324 if cam_name
in SPOT_CAMS:
1325 panel = draw_spot_overlays(panel, cam_name)
1328 ann_boxes = list(_last_ann_boxes.get(cam_name, []))
1330 min_h = MIN_BOX_H.get(cam_name, 0)
1331 for box, score, name
in ann_boxes:
1332 x1, y1, x2, y2 = box
1333 if (y2 - y1) < min_h:
1335 x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
1336 color = (0, 150, 150)
1337 cv2.rectangle(panel, (x1, y1), (x2, y2), color, 2)
1338 label = f
'{name} {score:.2f}'
1339 (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_COMPLEX, 0.5, 1)
1340 cv2.rectangle(panel, (x1, y1 - th - 8), (x1 + tw + 6, y1), color, -1)
1341 cv2.putText(panel, label, (x1 + 3, y1 - 4), cv2.FONT_HERSHEY_COMPLEX, 0.5, (15, 15, 15), 1, cv2.LINE_AA)
1343 cv2.putText(panel, cam_name, (10, 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 230, 200), 2, cv2.LINE_AA)
1344 if cam_name
in SPOT_CAMS:
1346 states = spot_states.get(cam_name, [])
1347 occ = sum(1
for s
in states
if s ==
'Car')
1348 cv2.putText(panel, f
'{occ}/{len(states)} occupied', (10, 56), cv2.FONT_HERSHEY_SIMPLEX, 0.6,
1349 (0, 230, 200), 1, cv2.LINE_AA)
1351 panels.append(panel)
1354 canvas = np.hstack(panels)
1356 roi_editor.draw_overlay(canvas)
1358 cv2.imshow(win, canvas)
1360 if os.environ.get(
'CALIB_DEBUG')
and _debug_frames:
1362 for cam
in CAM_ORDER:
1363 p = _debug_frames.get(cam)
1365 p = np.zeros((DISP_H, DISP_W, 3), dtype=np.uint8)
1366 if p.shape[:2] != (DISP_H, DISP_W):
1367 p = cv2.resize(p, (DISP_W, DISP_H))
1368 cv2.putText(p, f
'DEBUG: {cam}', (10, 25),
1369 cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
1370 debug_panels.append(p)
1371 cv2.imshow(
'boundary_debug', np.hstack(debug_panels))
1373 key = cv2.waitKey(1) & 0xFF
1375 if roi_editor.handle_key(key):
1378 if key == ord(
'u')
or key == ord(
'U'):
1379 for active_idx
in range(len(SOURCES)):
1380 cam_name = CAM_ORDER[active_idx]
1381 with RAW_LOCKS[active_idx]:
1382 snap = RAW_BUFS[active_idx].copy()
1383 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1385 boxes = _last_boxes.get(cam_name, [])
1387 print(f
"\033[1;93mWarning: [Auto Caliberation] {cam_name} has {len(boxes)} detections")
1388 new_rois = auto_caliberate_rois(cam_name, snap_disp, CALIB_PARAMS[cam_name], boxes)
1390 rebuild_spot_masks(cam_name, new_rois)
1393 if key == ord(
'c')
or key == ord(
'C'):
1394 calib_window.toggle(roi_editor.active_cam)
1403 calib_window.close()
1404 cv2.destroyAllWindows()
1421def blend_boundaries(detected: list[float], manual: list[float], manual_weight: float = 0.15) -> list[float]:
1422 if not manual
or len(detected) != len(manual):
1425 for d, m
in zip(detected, manual):
1426 dn = d[0]
if isinstance(d, tuple)
else d
1427 df = d[1]
if isinstance(d, tuple)
else d
1428 mn = m[0]
if isinstance(m, tuple)
else m
1430 angle_offset = df - dn
1431 blended_near = dn * (1 - manual_weight) + mn * manual_weight
1432 result.append((blended_near, blended_near + angle_offset))
1451 Separate OpenCV window with trackbars for live CALIB_PARAMS tuning.
1452 Opens when C is pressed, closes when C is pressed again.
1453 Automatically regenerates ROIs on any slider change.
1467 def toggle(self, cam_name: str):
1476 cv2.namedWindow(self.
WIN, cv2.WINDOW_NORMAL)
1477 cv2.resizeWindow(self.
WIN, 500, 280)
1481 cv2.createTrackbar(
'VP X', self.
WIN, p[
'vanishing_point'][0], DISP_W,
lambda v: self.
_on_change())
1482 cv2.createTrackbar(
'VP Y', self.
WIN, p[
'vanishing_point'][1], DISP_H,
lambda v: self.
_on_change())
1483 cv2.createTrackbar(
'Near Y', self.
WIN, p[
'near_y'], DISP_H,
lambda v: self.
_on_change())
1484 cv2.createTrackbar(
'Far Y', self.
WIN, p[
'far_y'], DISP_H,
lambda v: self.
_on_change())
1485 cv2.createTrackbar(
'Left X', self.
WIN, p[
'left_x'], DISP_W,
lambda v: self.
_on_change())
1486 cv2.createTrackbar(
'Right X', self.
WIN, p[
'right_x'], DISP_W,
lambda v: self.
_on_change())
1487 cv2.createTrackbar(
'Spots', self.
WIN, p[
'n_spots'], 20,
lambda v: self.
_on_change())
1488 cv2.createTrackbar(
'Rows', self.
WIN, p[
'n_rows'], 6,
lambda v: self.
_on_change())
1490 fisheye_int = int(p[
'fisheye_distortion'] * 100) + 30
1491 cv2.createTrackbar(
'Fisheye x100', self.
WIN, fisheye_int, 60,
lambda v: self.
_on_change())
1492 strength_int = int(p.get(
'perspective_strength', 1.0) * 100)
1493 cv2.createTrackbar(
'Persp x100', self.
WIN, strength_int, 100,
lambda v: self.
_on_change())
1494 cv2.createTrackbar(
'Min Height', self.
WIN, p.get(
'min_roi_height', 60), DISP_H,
lambda v: self.
_on_change())
1497 print(f
"[Calib] Window open for {self.active_cam}. Drag sliders to tune, C to close.")
1499 def _read(self) -> dict:
1500 fisheye_int = cv2.getTrackbarPos(
'Fisheye x100', self.
WIN)
1502 'vanishing_point': (
1503 cv2.getTrackbarPos(
'VP X', self.
WIN),
1504 cv2.getTrackbarPos(
'VP Y', self.
WIN),
1506 'near_y': cv2.getTrackbarPos(
'Near Y', self.
WIN),
1507 'far_y': cv2.getTrackbarPos(
'Far Y', self.
WIN),
1508 'left_x': cv2.getTrackbarPos(
'Left X', self.
WIN),
1509 'right_x': cv2.getTrackbarPos(
'Right X', self.
WIN),
1510 'n_spots': max(1, cv2.getTrackbarPos(
'Spots', self.
WIN)),
1511 'n_rows': max(1, cv2.getTrackbarPos(
'Rows', self.
WIN)),
1512 'fisheye_distortion': (fisheye_int - 30) / 100.0,
1513 'perspective_strength': cv2.getTrackbarPos(
'Persp x100', self.
WIN) / 100.0,
1514 'min_roi_height': max(20, cv2.getTrackbarPos(
'Min Height', self.
WIN)),
1517 def _on_change(self):
1522 """Call once per display loop iteration to keep the window alive."""
1531 params = self.
_read()
1532 near_y = params[
'near_y']
1533 far_y = params[
'far_y']
1534 vpy = params[
'vanishing_point'][1]
1536 if near_y == far_y
or vpy >= near_y:
1542 with RAW_LOCKS[cam_idx]:
1543 snap = RAW_BUFS[cam_idx].copy()
1544 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1547 boundaries, detected_n = detect_spot_boundaries(
1548 snap_disp, near_y, far_y,
1549 params[
'left_x'], params[
'right_x'], params[
'n_spots'],
1552 roi_params = {k: v
for k, v
in params.items()
if k
not in (
'min_roi_height',)}
1553 new_rois = generate_rois(
1556 spot_boundaries=boundaries,
1557 **{k: v
for k, v
in roi_params.items()
if k !=
'n_spots'},
1560 rebuild_spot_masks(self.
active_cam, new_rois)
1561 n = params[
'n_spots']
1562 manual = [params[
'left_x'] + (params[
'right_x'] - params[
'left_x']) * i / n
for i
in range(n + 1)]
1564 except Exception
as e:
1565 print(f
"[Calib] tick error: {e}")
1570 cv2.destroyWindow(self.
WIN)
1575 print(f
"[Calib] Window closed. Final params for {self.active_cam}:")
1576 print(f
" CALIB_PARAMS['{self.active_cam}'] = {CALIB_PARAMS[self.active_cam]}")
1595def record_loop(stop_event, use_anns=False):
1597 Writes raw cam frames to the output file (Can be edited to write annotated frames)
1600 os.sched_setaffinity(0, {5})
1604 timestamp = datetime.now().strftime(
'%Y%m%d_%H%M%S')
1605 os.makedirs(
'recordings', exist_ok=
True)
1606 fourcc = cv2.VideoWriter_fourcc(*
'mp4v')
1608 suffix =
'Annotated' if use_anns
else 'Raw'
1609 for cam_name
in CAM_ORDER:
1610 path = f
"recordings/{timestamp}_{cam_name}.mp4"
1611 writers.append(cv2.VideoWriter(path, fourcc, CAPTURE_FPS, (CAP_W, CAP_H)))
1612 print(f
"[Record] {cam_name} -> {path}")
1614 frame_time = 1.0 / CAPTURE_FPS
1618 print(
"\033[1;93mWarning: Recording annotated version\033[0m")
1619 while not stop_event.is_set():
1621 if now - last_time < frame_time:
1625 for i, writer
in enumerate(writers):
1630 writer.write(ANN_BUFS[i])
1633 writer.write(RAW_BUFS[i])
1638 print(
'[record] files saved')
1653def auto_calibrate_loop(stop_event):
1655 os.sched_setaffinity(0, {3})
1659 print(
'[Auto Calibrate] Background Calibration thread started')
1660 while not stop_event.is_set():
1661 time.sleep(AUTO_CALIBRATE_INTERVAL)
1663 if stop_event.is_set():
1666 for idx, cam_name
in enumerate(CAM_ORDER):
1667 if cam_name
not in SPOT_CAMS:
1670 if calib_window.suppress_auto
and calib_window.active_cam == cam_name:
1673 with RAW_LOCKS[idx]:
1674 snap = RAW_BUFS[idx].copy()
1676 snap_disp = cv2.resize(snap, (DISP_W, DISP_H), interpolation=cv2.INTER_NEAREST)
1679 boxes = _last_boxes.get(cam_name, [])
1682 print(f
'[Auto Calibrate] {cam_name}: skipping --- too many detections ({len(boxes)})')
1685 new_rois = auto_caliberate_rois(cam_name, snap_disp, CALIB_PARAMS[cam_name], boxes)
1688 rebuild_spot_masks(cam_name, new_rois)
1689 print(f
'[Auto Calibrate] {cam_name}: updated')
1691 except Exception
as e:
1692 print(f
'[Auto Calibrate] {cam_name} error: {e}')
1705def merge_narrow_spots(boundaries: list[float], min_width: float = 80.0) -> list[float]:
1706 if len(boundaries) < 2:
1709 merged = [boundaries[0]]
1710 for i
in range(1, len(boundaries)):
1711 width = boundaries[i] - merged[-1]
1712 if width < min_width
and len(merged) > 1:
1714 merged.append(boundaries[i])
1716 if merged[-1] != boundaries[-1]:
1717 merged[-1] = boundaries[-1]
1748def detect_spot_boundaries(frame: np.ndarray, near_y: int, far_y: int, left_x: int, right_x: int, n_spots: int,
1749 detections: list =
None, cam_name: str =
None) -> tuple[list[float], int]:
1750 uniform = [left_x + (right_x - left_x) * i / n_spots
for i
in range(n_spots + 1)]
1751 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
1752 _, white_mask = cv2.threshold(gray, 160, 255, cv2.THRESH_BINARY)
1753 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
1754 white_mask = cv2.morphologyEx(white_mask, cv2.MORPH_CLOSE, kernel)
1756 edges = cv2.Canny(white_mask, 50, 150)
1758 roi_mask = np.zeros_like(edges)
1759 band_top = max(0, far_y - 10)
1760 roi_mask[band_top:near_y + 10, left_x:right_x] = 255
1761 edges = cv2.bitwise_and(edges, roi_mask)
1763 lines = cv2.HoughLinesP(edges, 1, np.pi / 180, threshold=20, minLineLength=25, maxLineGap=30)
1766 x1, y1, x2, y2 = l[0]
1767 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
1768 if 45 < angle < 135:
1771 t = (near_y - y1) / (y2 - y1)
1772 x_at_near = x1 + t * (x2 - x1)
1773 t_far = (far_y - y1) / (y2 - y1)
1774 x_far = x1 + t_far * (x2 - x1)
1776 x_at_near = x_far = (x1 + x2) / 2
1777 if left_x <= x_at_near <= right_x:
1778 divider_xs.append((x_at_near, x_far))
1779 if os.environ.get(
'CALIB_DEBUG'):
1781 if lines
is not None:
1783 x1, y1, x2, y2 = l[0]
1784 angle = abs(np.degrees(np.arctan2(y2 - y1, x2 - x1)))
1785 if 45 < angle < 135:
1786 cv2.line(dbg, (x1, y1), (x2, y2), (255, 0, 0), 1)
1787 for xn, xf
in divider_xs:
1788 cv2.line(dbg, (int(xn), near_y), (int(xf), far_y), (0, 255, 0), 2)
1789 cv2.line(dbg, (left_x, near_y), (right_x, near_y), (0, 200, 255), 1)
1790 cv2.line(dbg, (left_x, far_y), (right_x, far_y), (0, 200, 255), 1)
1792 _debug_frames[cam_name] = dbg
1794 if lines
is None or len(divider_xs) < 2:
1795 return _try_from_cars(detections, near_y, left_x, right_x, n_spots, uniform)
1797 divider_xs.sort(key=
lambda d: d[0])
1799 group = [divider_xs[0]]
1800 for x
in divider_xs[1:]:
1801 if x[0] - group[0][0] < 20:
1804 clustered.append((np.mean([g[0]
for g
in group]), np.mean([g[1]
for g
in group])))
1806 clustered.append((np.mean([g[0]
for g
in group]), np.mean([g[1]
for g
in group])))
1808 xs_near = [left_x] + [c[0]
for c
in clustered] + [right_x]
1809 xs_far = [left_x] + [c[1]
for c
in clustered] + [right_x]
1810 all_bounds = sorted(zip(xs_near, xs_far), key=
lambda p: p[0])
1811 xs_near = [p[0]
for p
in all_bounds]
1812 xs_far = [p[1]
for p
in all_bounds]
1814 paired = list(zip(xs_near, xs_far))
1815 merged_pairs = [paired[0]]
1816 for i
in range(1, len(paired)):
1817 width = paired[i][0] - merged_pairs[-1][0]
1818 if width >= 80.0
or len(merged_pairs) == 1:
1819 merged_pairs.append(paired[i])
1821 merged_pairs[-1] = paired[i]
1822 xs_far = [p[1]
for p
in merged_pairs]
1823 xs_near = [p[0]
for p
in merged_pairs]
1824 if len(xs_near) == n_spots + 1:
1825 return list(zip(xs_near, xs_far)), len(xs_near) - 1
1827 if len(xs_near) > n_spots + 1:
1828 expected = (right_x - left_x) / n_spots
1829 best_near, best_far = [left_x], [left_x]
1830 interior = list(zip(xs_near[1:-1], xs_far[1:-1]))
1831 for i
in range(1, n_spots):
1832 target = left_x + expected * i
1833 closest = min(interior, key=
lambda p: abs(p[0] - target))
1834 best_near.append(closest[0])
1835 best_far.append(closest[1])
1836 best_near.append(right_x)
1837 best_far.append(right_x)
1838 return list(zip(best_near, best_far)), len(best_near) - 1
1840 return _try_from_cars(detections, near_y, left_x, right_x, n_spots, uniform)
1857def _try_from_cars(detections, near_y, left_x, right_x, n_spots, fallback):
1859 fallback = merge_narrow_spots(fallback)
1860 return fallback, len(fallback) - 1
1863 for x1, y1, x2, y2
in detections:
1864 if abs(y2 - near_y) < 80:
1865 car_centers.append((x1 + x2) / 2)
1867 car_centers = sorted(c
for c
in car_centers
if left_x < c < right_x)
1869 if len(car_centers) < 2:
1870 fallback = merge_narrow_spots(fallback)
1871 return fallback, len(fallback) - 1
1873 boundaries = [left_x]
1874 for i
in range(len(car_centers) - 1):
1875 boundaries.append((car_centers[i] + car_centers[i + 1]) / 2)
1876 boundaries.append(right_x)
1878 if len(boundaries) == n_spots + 1:
1879 boundaries = merge_narrow_spots(boundaries)
1880 return boundaries, len(boundaries) - 1
1881 fallback = merge_narrow_spots(fallback)
1882 return [(x, x)
for x
in fallback], len(fallback) - 1
1894 if os.getpid() != _MAIN_PID:
1898 print(
"Shutting down...")
1906 for shm
in RAW_SHM_OBJS + ANN_SHM_OBJS:
1912 set_jetson_clocks(
False)
1916if __name__ ==
"__main__":
1918 _MAIN_PID = os.getpid()
1919 frame_ready_event = Event()
1920 set_jetson_clocks(
True)
1921 if args.annotate
and not args.record:
1922 print(
"\033[1;91mWarning: -a/-A/--annotate has no effect without -r/-R/--record\033[0m")
1923 need_annotation = args.test
or args.record
1924 os.system(
'v4l2-ctl --list-devices > ./utils/camInfo.txt')
1925 with open(
'./utils/camInfo.txt')
as f:
1933 if 'Arducam USB Camera' in line:
1934 port = float(re.search(
r'\d+\.\d+', line).group())
1936 idx = int(re.search(
r'\d+', line).group())
1942 if len(set(SOURCES)) != len(SOURCES):
1943 raise RuntimeError(f
"Camera detection failed - possible duplicate sources: {SOURCES}")
1946 probe = cv2.VideoCapture(src)
1947 ok, _ = probe.read()
1950 raise RuntimeError(f
'Cannot read from camera source {src}')
1952 for _
in range(len(SOURCES)):
1953 raw = SharedMemory(create=
True, size=CAP_H * CAP_W * 3)
1954 ann = SharedMemory(create=
True, size=DISP_H * DISP_W * 3)
1955 RAW_SHM_OBJS.append(raw)
1956 ANN_SHM_OBJS.append(ann)
1957 RAW_BUFS.append(shm_ndarray(raw, CAP_SHAPE))
1958 ANN_BUFS.append(shm_ndarray(ann, DISP_SHAPE))
1961 for buf
in ANN_BUFS:
1964 RAW_LOCKS = [Lock()
for _
in range(len(SOURCES))]
1965 ANN_LOCKS = [Lock()
for _
in range(len(SOURCES))]
1966 RAW_FRAME_ID = [Value(
'i', 0)
for _
in range(len(SOURCES))]
1968 for i, src
in enumerate(SOURCES):
1970 target=capture_worker,
1971 args=(i, src, RAW_SHM_OBJS[i].name, RAW_LOCKS[i], RAW_FRAME_ID[i], frame_ready_event, STOP_EVENT),
1976 os.sched_setaffinity(p.pid, {i + 1})
1981 print(
"Waiting for cameras...")
1982 deadline = time.time() + 15
1983 for i
in range(len(SOURCES)):
1984 while RAW_FRAME_ID[i].value == 0:
1985 if not PROCESSES[i].is_alive():
1987 f
"Camera {i} ({CAM_ORDER[i]}) process died before sending a frame."
1989 if time.time() > deadline:
1991 f
"Camera {i} ({CAM_ORDER[i]}) timed out waiting for the first frame"
1994 print(f
'Camera {i} ({CAM_ORDER[i]}) ready')
1995 auto_calibrate_t = Thread(
1996 target=auto_calibrate_loop,
2000 auto_calibrate_t.start()
2002 target=inference_loop,
2003 args=(need_annotation, frame_ready_event, STOP_EVENT),
2008 target=
lambda: app.run(host=
'localhost', port=5001, debug=
False, use_reloader=
False, threaded=
True),
2011 print(
'Flask Api -> http:/localhost/detections')
2014 rec_t = Thread(target=record_loop, args=(STOP_EVENT, args.annotate), daemon=
True)
2018 display_loop(STOP_EVENT)
2020 print(
"Headless mode. Ctrl + C to stop")
2022 while not STOP_EVENT.is_set():
2024 except KeyboardInterrupt:
Live calibration window with OpenCV trackbars for tuning CALIB_PARAMS.
Interactive polygon editor for defining parking spot ROIs at runtime.
bool handle_key(self, int key)
mouse_cb(self, event, x, y, flags, param)
str _next_id(self, str cam)